Skip to main content

theater_server/
server.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9    ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10    ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21    RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{default_init_state, ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::{resolve_reference, resolve_reference_cached, ResourceCache};
28use theater::TheaterRuntimeError;
29
30// Import Theater-specific handlers only
31// DEPRECATED: WASI handlers (environment, filesystem, http, io, etc.) moved to crates/deprecated/
32use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42    StartActor {
43        manifest: String,
44        initial_state: Option<Vec<u8>>,
45        parent: bool,
46        subscribe: bool,
47    },
48    StopActor {
49        id: TheaterId,
50    },
51    TerminateActor {
52        id: TheaterId,
53    },
54    ListActors,
55    SubscribeToActor {
56        id: TheaterId,
57    },
58    UnsubscribeFromActor {
59        id: TheaterId,
60        subscription_id: Uuid,
61    },
62    SendActorMessage {
63        id: TheaterId,
64        data: Vec<u8>,
65    },
66    RequestActorMessage {
67        id: TheaterId,
68        data: Vec<u8>,
69    },
70    GetActorManifest {
71        id: TheaterId,
72    },
73    GetActorStatus {
74        id: TheaterId,
75    },
76    RestartActor {
77        id: TheaterId,
78    },
79    GetActorState {
80        id: TheaterId,
81    },
82    GetActorMetrics {
83        id: TheaterId,
84    },
85    UpdateActorPackage {
86        id: TheaterId,
87        package: String,
88    },
89    // Channel management commands
90    OpenChannel {
91        actor_id: ChannelParticipant,
92        initial_message: Vec<u8>,
93    },
94    SendOnChannel {
95        channel_id: String,
96        message: Vec<u8>,
97    },
98    CloseChannel {
99        channel_id: String,
100    },
101
102    // Store commands
103    NewStore {},
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[allow(clippy::large_enum_variant)]
108pub enum ManagementResponse {
109    ActorStarted {
110        id: TheaterId,
111    },
112    ActorStopped {
113        id: TheaterId,
114    },
115    ActorList {
116        actors: Vec<(TheaterId, String)>,
117    },
118    Subscribed {
119        id: TheaterId,
120        subscription_id: Uuid,
121    },
122    Unsubscribed {
123        id: TheaterId,
124    },
125    ActorEvent {
126        event: ChainEvent,
127    },
128    ActorResult(ActorResult),
129    Error {
130        error: ManagementError,
131    },
132    RequestedMessage {
133        id: TheaterId,
134        message: Vec<u8>,
135    },
136    SentMessage {
137        id: TheaterId,
138    },
139    ActorStatus {
140        id: TheaterId,
141        status: ActorStatus,
142    },
143    Restarted {
144        id: TheaterId,
145    },
146    ActorManifest {
147        id: TheaterId,
148        manifest: ManifestConfig,
149    },
150    ActorState {
151        id: TheaterId,
152        state: Value,
153    },
154    ActorMetrics {
155        id: TheaterId,
156        metrics: serde_json::Value,
157    },
158    ActorPackageUpdated {
159        id: TheaterId,
160    },
161    // Channel management responses
162    ChannelOpened {
163        channel_id: String,
164        actor_id: ChannelParticipant,
165    },
166    MessageSent {
167        channel_id: String,
168    },
169    ChannelMessage {
170        channel_id: String,
171        sender_id: ChannelParticipant,
172        message: Vec<u8>,
173    },
174    ChannelClosed {
175        channel_id: String,
176    },
177
178    // Store responses
179    StoreCreated {
180        store_id: String,
181    },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub enum ManagementError {
186    // Actor-related errors
187    ActorNotFound,
188    ActorAlreadyExists,
189    ActorNotRunning,
190    ActorError(String),
191
192    // Channel-related errors
193    ChannelNotFound,
194    ChannelClosed,
195    ChannelRejected,
196
197    // Store-related errors
198    StoreError(String),
199
200    // Communication errors
201    CommunicationError(String),
202
203    // Request handling errors
204    InvalidRequest(String),
205    Timeout,
206
207    // System errors
208    RuntimeError(String),
209    InternalError(String),
210
211    // Serialization/deserialization errors
212    SerializationError(String),
213
214    // Actor initialization errors
215    ActorInitializationError(String),
216}
217
218// Allow converting from TheaterRuntimeError to ManagementError
219impl From<TheaterRuntimeError> for ManagementError {
220    fn from(err: TheaterRuntimeError) -> Self {
221        match err {
222            TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
223            TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
224            TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
225            TheaterRuntimeError::ActorOperationFailed(msg) => {
226                ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
227            }
228            TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
229            TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
230            TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
231            TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
232            TheaterRuntimeError::SerializationError(msg) => {
233                ManagementError::SerializationError(msg)
234            }
235            TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
236            TheaterRuntimeError::ActorInitializationError(msg) => {
237                ManagementError::ActorInitializationError(msg)
238            }
239        }
240    }
241}
242
243// Implement Display for ManagementError to provide better error messages
244impl std::fmt::Display for ManagementError {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        match self {
247            ManagementError::ActorNotFound => write!(f, "Actor not found"),
248            ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
249            ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
250            ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
251            ManagementError::ChannelNotFound => write!(f, "Channel not found"),
252            ManagementError::ChannelClosed => write!(f, "Channel is closed"),
253            ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
254            ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
255            ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
256            ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
257            ManagementError::Timeout => write!(f, "Operation timed out"),
258            ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
259            ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
260            ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
261            ManagementError::ActorInitializationError(msg) => {
262                write!(f, "Actor initialization error: {}", msg)
263            }
264        }
265    }
266}
267
268// Implement Error trait for ManagementError
269impl std::error::Error for ManagementError {}
270
271#[derive(Debug)]
272#[allow(dead_code)]
273struct Subscription {
274    id: Uuid,
275    client_tx: mpsc::Sender<ManagementResponse>,
276}
277
278impl Eq for Subscription {}
279impl PartialEq for Subscription {
280    fn eq(&self, other: &Self) -> bool {
281        self.id == other.id
282    }
283}
284impl std::hash::Hash for Subscription {
285    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
286        self.id.hash(state);
287    }
288}
289
290// ChannelEvent is now imported from theater::ChannelEvent
291
292// Structure to track active channel subscriptions
293#[derive(Debug)]
294#[allow(dead_code)]
295struct ChannelSubscription {
296    channel_id: String,
297    initiator_id: ChannelParticipant,
298    target_id: ChannelParticipant,
299    client_tx: mpsc::Sender<ManagementResponse>,
300}
301
302/// Creates a HandlerRegistry with Theater-specific handlers.
303///
304/// NOTE: WASI handlers (environment, filesystem, http, io, sockets, timing, random, process)
305/// have been deprecated and moved to crates/deprecated/. They will be redesigned for
306/// Composite runtime support later.
307///
308/// Returns both the HandlerRegistry and the MessageRouter, allowing the server
309/// to use the MessageRouter for external client messaging.
310fn create_root_handler_registry(
311    theater_tx: mpsc::Sender<TheaterCommand>,
312    resource_cache: Arc<ResourceCache>,
313) -> (
314    HandlerRegistry,
315    theater_handler_message_server::MessageRouter,
316) {
317    let mut registry = HandlerRegistry::new();
318
319    info!("Initializing Theater server with Theater-specific handlers...");
320
321    // Runtime handler - provides actor runtime information and control
322    let runtime_config = RuntimeHostConfig {};
323    registry.register(RuntimeHandler::new(
324        runtime_config,
325        theater_tx.clone(),
326        None,
327    ));
328
329    // Store handler - provides key-value storage for actors
330    let store_config = StoreHandlerConfig::default();
331    registry.register(StoreHandler::new(store_config, None));
332
333    // Supervisor handler - allows actors to spawn and manage child actors.
334    // The resource cache is shared across every supervisor-capable actor;
335    // children whose manifests opt in via `static_package = true` skip
336    // the wasm-bytes fetch on repeat spawns.
337    let supervisor_config = SupervisorHostConfig {};
338    registry.register(
339        SupervisorHandler::new(supervisor_config, None).with_resource_cache(resource_cache),
340    );
341
342    // Message server handler - provides inter-actor messaging
343    let message_router = theater_handler_message_server::MessageRouter::new();
344    registry.register(MessageServerHandler::new(None, message_router.clone()));
345
346    // TCP handler - provides raw TCP networking for actors
347    let tcp_config = TcpHandlerConfig {
348        listen: None,
349        max_connections: None,
350        ..Default::default()
351    };
352    registry.register(TcpHandler::new(tcp_config));
353
354    info!("✓ 5 Theater-specific handlers registered");
355    info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
356
357    (registry, message_router)
358}
359
360pub struct TheaterServer {
361    runtime: TheaterRuntime,
362    theater_tx: mpsc::Sender<TheaterCommand>,
363    management_socket: TcpListener,
364    subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
365    // Field to track channel subscriptions
366    channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
367    // Channel for runtime to send channel events back to server
368    #[allow(dead_code)]
369    channel_events_tx: mpsc::Sender<ChannelEvent>,
370    // MessageRouter for external client messaging
371    message_router: theater_handler_message_server::MessageRouter,
372}
373
374impl TheaterServer {
375    // Process channel events and forward them to subscribed clients
376    async fn process_channel_events(
377        mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
378        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
379    ) {
380        while let Some(event) = channel_events_rx.recv().await {
381            match event {
382                ChannelEvent::Message {
383                    channel_id,
384                    sender_id,
385                    message,
386                } => {
387                    tracing::debug!("Received channel message for {}", channel_id);
388                    // Forward to subscribed clients
389                    let subs = channel_subscriptions.lock().await;
390                    if let Some(sub) = subs.get(&channel_id.0) {
391                        let response = ManagementResponse::ChannelMessage {
392                            channel_id: channel_id.0.clone(),
393                            sender_id,
394                            message,
395                        };
396
397                        tracing::debug!("Forwarding channel message to client: {:?}", response);
398
399                        if let Err(e) = sub.client_tx.send(response).await {
400                            tracing::warn!("Failed to forward channel message: {}", e);
401                        } else {
402                            tracing::debug!("Forwarded channel message to client");
403                        }
404                    }
405                }
406                ChannelEvent::Close { channel_id } => {
407                    tracing::debug!("Received channel close event for {}", channel_id);
408                    // Forward to subscribed clients
409                    let mut subs = channel_subscriptions.lock().await;
410                    if let Some(sub) = subs.remove(&channel_id.0) {
411                        let response = ManagementResponse::ChannelClosed {
412                            channel_id: channel_id.0.clone(),
413                        };
414
415                        if let Err(e) = sub.client_tx.send(response).await {
416                            tracing::warn!("Failed to forward channel close event: {}", e);
417                        } else {
418                            tracing::debug!("Forwarded channel close event to client");
419                        }
420                    }
421                }
422            }
423        }
424    }
425
426    pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
427        let (theater_tx, theater_rx) = mpsc::channel(32);
428
429        // Create channel for runtime to send channel events back to server
430        let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
431
432        // Shared URL→bytes cache; one per theater process. Threaded
433        // through every entry point that fetches an actor's wasm: the
434        // supervisor host fn (via the handler), `ResumeActor` (via
435        // TheaterRuntime), and `ManagementCommand::StartActor` below
436        // (via `self.runtime.resource_cache()`). Opt-in per child
437        // manifest with `static_package = true` — fetch once per
438        // process, hit forever after.
439        let resource_cache = Arc::new(ResourceCache::new());
440
441        // Create handler registry with all migrated handlers (root permissions)
442        // Also get the MessageRouter for external client messaging
443        let (handler_registry, message_router) =
444            create_root_handler_registry(theater_tx.clone(), resource_cache.clone());
445
446        // Create the runtime with the handler registry
447        let runtime = TheaterRuntime::new(
448            theater_tx.clone(),
449            theater_rx,
450            Some(channel_events_tx.clone()),
451            handler_registry,
452            resource_cache,
453        )
454        .await?;
455        let management_socket = TcpListener::bind(address).await?;
456
457        let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
458
459        // Start task to process channel events
460        let channel_subs_clone = channel_subscriptions.clone();
461        tokio::spawn(async move {
462            Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
463        });
464
465        Ok(Self {
466            runtime,
467            theater_tx,
468            management_socket,
469            subscriptions: Arc::new(Mutex::new(HashMap::new())),
470            channel_subscriptions,
471            channel_events_tx,
472            message_router,
473        })
474    }
475
476    pub async fn run(mut self) -> Result<()> {
477        info!(
478            "Theater server starting on {:?}",
479            self.management_socket.local_addr()?
480        );
481
482        // Snapshot the cache handle before the runtime moves into its task.
483        let resource_cache = self.runtime.resource_cache().clone();
484
485        // Start the theater runtime in its own task
486        let runtime_handle = tokio::spawn(async move {
487            match self.runtime.run().await {
488                Ok(_) => Ok(()),
489                Err(e) => {
490                    error!("Theater runtime failed: {}", e);
491                    Err(e)
492                }
493            }
494        });
495
496        // Accept and handle management connections
497        while let Ok((socket, addr)) = self.management_socket.accept().await {
498            info!("New management connection from {}", addr);
499            let runtime_tx = self.theater_tx.clone();
500            let subscriptions = self.subscriptions.clone();
501            let channel_subscriptions = self.channel_subscriptions.clone();
502            let message_router = self.message_router.clone();
503            let resource_cache = resource_cache.clone();
504
505            tokio::spawn(async move {
506                if let Err(e) = Self::handle_management_connection(
507                    socket,
508                    runtime_tx,
509                    subscriptions,
510                    channel_subscriptions,
511                    message_router,
512                    resource_cache,
513                )
514                .await
515                {
516                    error!("Error handling management connection: {}", e);
517                }
518            });
519        }
520
521        runtime_handle.await??;
522        Ok(())
523    }
524
525    async fn handle_management_connection(
526        socket: TcpStream,
527        runtime_tx: mpsc::Sender<TheaterCommand>,
528        subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
529        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
530        message_router: theater_handler_message_server::MessageRouter,
531        resource_cache: Arc<ResourceCache>,
532    ) -> Result<()> {
533        // Create a channel for sending responses to this client
534        let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
535
536        let codec = FragmentingCodec::new();
537        let framed = Framed::new(socket, codec);
538
539        // Split the framed connection into read and write parts
540        let (mut framed_sink, mut framed_stream) = framed.split();
541
542        // Clone the client_tx for use in the command loop
543        let cmd_client_tx = client_tx.clone();
544
545        // Start a task to forward responses to the client
546        let _response_task = tokio::spawn(async move {
547            while let Some(response) = client_rx.recv().await {
548                match serde_json::to_vec(&response) {
549                    Ok(data) => {
550                        debug!("Serialized response: {} bytes", data.len());
551                        if data.len() > 10 * 1024 * 1024 {
552                            debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
553                        }
554                        if let Err(e) = framed_sink.send(Bytes::from(data)).await {
555                            debug!("Error sending response to client: {}", e);
556                            break;
557                        }
558                    }
559                    Err(e) => {
560                        error!("Error serializing response: {}", e);
561                    }
562                }
563            }
564            debug!("Response forwarder for client closed");
565        });
566
567        // Store active subscriptions for this connection to clean up on disconnect
568        let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
569
570        // Store active channel subscriptions for cleanup
571        let mut connection_channel_subscriptions: Vec<String> = Vec::new();
572
573        // Loop until connection closes or an error occurs
574        'connection: while let Some(msg) = framed_stream.next().await {
575            debug!("Received management message");
576            let msg = match msg {
577                Ok(m) => m,
578                Err(e) => {
579                    error!("Error receiving message: {}", e);
580                    break 'connection;
581                }
582            };
583
584            let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
585                Ok(c) => c,
586                Err(e) => {
587                    error!(
588                        "Error parsing command: {} {}",
589                        e,
590                        String::from_utf8_lossy(&msg)
591                    );
592                    continue;
593                }
594            };
595            debug!("Parsed command: {:?}", cmd);
596
597            // Store the command for reference (used for subscription tracking)
598            let _cmd_clone = cmd.clone();
599
600            let response = match cmd {
601                ManagementCommand::StartActor {
602                    manifest,
603                    initial_state: _initial_state,
604                    parent,
605                    subscribe,
606                } => {
607                    info!("Starting actor from manifest: {:?}", manifest);
608
609                    // Load and parse manifest
610                    let manifest_str = match resolve_reference(&manifest).await {
611                        Ok(bytes) => match String::from_utf8(bytes) {
612                            Ok(s) => s,
613                            Err(e) => {
614                                error!("Invalid manifest encoding: {}", e);
615                                cmd_client_tx
616                                    .send(ManagementResponse::Error {
617                                        error: ManagementError::ActorInitializationError(format!(
618                                            "Invalid manifest encoding: {}",
619                                            e
620                                        )),
621                                    })
622                                    .await
623                                    .ok();
624                                continue;
625                            }
626                        },
627                        Err(e) => {
628                            error!("Failed to load manifest: {}", e);
629                            cmd_client_tx
630                                .send(ManagementResponse::Error {
631                                    error: ManagementError::ActorInitializationError(format!(
632                                        "Failed to load manifest: {}",
633                                        e
634                                    )),
635                                })
636                                .await
637                                .ok();
638                            continue;
639                        }
640                    };
641
642                    let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
643                        Ok(m) => m,
644                        Err(e) => {
645                            error!("Failed to parse manifest: {}", e);
646                            cmd_client_tx
647                                .send(ManagementResponse::Error {
648                                    error: ManagementError::ActorInitializationError(format!(
649                                        "Failed to parse manifest: {}",
650                                        e
651                                    )),
652                                })
653                                .await
654                                .ok();
655                            continue;
656                        }
657                    };
658
659                    // Load wasm bytes. Cache-respecting when the manifest
660                    // opts in via `static_package = true`; same shared cache
661                    // as `ResumeActor` and the supervisor host fn.
662                    let wasm_bytes_result = if manifest_config.static_package {
663                        resolve_reference_cached(&manifest_config.package, &resource_cache)
664                            .await
665                            .map(|(arc, _)| (*arc).clone())
666                    } else {
667                        resolve_reference(&manifest_config.package).await
668                    };
669                    let wasm_bytes = match wasm_bytes_result {
670                        Ok(bytes) => bytes,
671                        Err(e) => {
672                            error!("Failed to load WASM: {}", e);
673                            cmd_client_tx
674                                .send(ManagementResponse::Error {
675                                    error: ManagementError::ActorInitializationError(format!(
676                                        "Failed to load WASM: {}",
677                                        e
678                                    )),
679                                })
680                                .await
681                                .ok();
682                            continue;
683                        }
684                    };
685
686                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
687                    debug!("Sending SpawnActor command to runtime");
688                    let supervisor_tx = if parent {
689                        let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
690                        let cmd_client_tx = cmd_client_tx.clone();
691                        tokio::spawn(async move {
692                            while let Some(res) = supervisor_rx.recv().await {
693                                debug!("Received supervisor response: {:?}", res);
694                                if let Err(e) = cmd_client_tx
695                                    .send(ManagementResponse::ActorResult(res))
696                                    .await
697                                {
698                                    error!("Failed to send supervisor response: {}", e);
699                                    break;
700                                }
701                            }
702                        });
703                        Some(supervisor_tx)
704                    } else {
705                        None
706                    };
707                    let subscription_tx = if subscribe {
708                        let (event_tx, mut event_rx) = mpsc::channel(32);
709
710                        // set up a task to forward events to the client
711                        let cmd_client_tx = cmd_client_tx.clone();
712                        tokio::spawn(async move {
713                            while let Some((_actor_id, event)) = event_rx.recv().await {
714                                debug!("Received event for subscription");
715                                let response = ManagementResponse::ActorEvent { event };
716                                if let Err(e) = cmd_client_tx.send(response).await {
717                                    debug!("Failed to forward event to client: {}", e);
718                                    break;
719                                }
720                            }
721                            debug!("Event forwarder for subscription stopped");
722                        });
723
724                        Some(event_tx)
725                    } else {
726                        None
727                    };
728                    match runtime_tx
729                        .send(TheaterCommand::SetupActor {
730                            wasm_bytes,
731                            name: Some(manifest_config.name.clone()),
732                            manifest: Some(manifest_config),
733                            init_state: default_init_state(),
734                            response_tx: cmd_tx,
735                            supervisor_tx,
736                            subscription_tx,
737                        })
738                        .await
739                    {
740                        Ok(_) => {
741                            debug!("SpawnActor command sent to runtime, awaiting response");
742                            match cmd_rx.await {
743                                Ok(result) => match result {
744                                    Ok(actor_id) => {
745                                        info!("Actor started with ID: {:?}", actor_id);
746                                        ManagementResponse::ActorStarted { id: actor_id }
747                                    }
748                                    Err(e) => {
749                                        error!("Runtime failed to start actor: {}", e);
750                                        ManagementResponse::Error {
751                                            error: ManagementError::RuntimeError(format!(
752                                                "Failed to start actor: {}",
753                                                e
754                                            )),
755                                        }
756                                    }
757                                },
758                                Err(e) => {
759                                    error!("Failed to receive spawn response: {}", e);
760                                    ManagementResponse::Error {
761                                        error: ManagementError::CommunicationError(format!(
762                                            "Failed to receive spawn response: {}",
763                                            e
764                                        )),
765                                    }
766                                }
767                            }
768                        }
769                        Err(e) => {
770                            error!("Failed to send SpawnActor command: {}", e);
771                            ManagementResponse::Error {
772                                error: ManagementError::CommunicationError(format!(
773                                    "Failed to send spawn command: {}",
774                                    e
775                                )),
776                            }
777                        }
778                    }
779                }
780                ManagementCommand::StopActor { id } => {
781                    info!("Stopping actor: {:?}", id);
782                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
783                    runtime_tx
784                        .send(TheaterCommand::StopActor {
785                            actor_id: id,
786                            response_tx: cmd_tx,
787                        })
788                        .await?;
789
790                    match cmd_rx.await? {
791                        Ok(_) => {
792                            subscriptions.lock().await.remove(&id);
793                            ManagementResponse::ActorStopped { id }
794                        }
795                        Err(e) => ManagementResponse::Error {
796                            error: ManagementError::RuntimeError(format!(
797                                "Failed to stop actor: {}",
798                                e
799                            )),
800                        },
801                    }
802                }
803                ManagementCommand::TerminateActor { id } => {
804                    info!("Terminating actor: {:?}", id);
805                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
806                    runtime_tx
807                        .send(TheaterCommand::TerminateActor {
808                            actor_id: id,
809                            response_tx: cmd_tx,
810                        })
811                        .await?;
812
813                    match cmd_rx.await? {
814                        Ok(_) => {
815                            subscriptions.lock().await.remove(&id);
816                            ManagementResponse::ActorStopped { id }
817                        }
818                        Err(e) => ManagementResponse::Error {
819                            error: ManagementError::RuntimeError(format!(
820                                "Failed to terminate actor: {}",
821                                e
822                            )),
823                        },
824                    }
825                }
826                ManagementCommand::ListActors => {
827                    debug!("Listing actors");
828                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
829                    runtime_tx
830                        .send(TheaterCommand::GetActors {
831                            response_tx: cmd_tx,
832                        })
833                        .await?;
834
835                    match cmd_rx.await? {
836                        Ok(actors) => {
837                            info!("Found {} actors", actors.len());
838                            ManagementResponse::ActorList { actors }
839                        }
840                        Err(e) => ManagementResponse::Error {
841                            error: ManagementError::RuntimeError(format!(
842                                "Failed to list actors: {}",
843                                e
844                            )),
845                        },
846                    }
847                }
848                ManagementCommand::SubscribeToActor { id } => {
849                    info!("New subscription request for actor: {:?}", id);
850                    let subscription_id = Uuid::new_v4();
851                    let subscription = Subscription {
852                        id: subscription_id,
853                        client_tx: cmd_client_tx.clone(),
854                    };
855
856                    debug!("Subscription created with ID: {}", subscription_id);
857
858                    // Register the subscription in the global map
859                    subscriptions
860                        .lock()
861                        .await
862                        .entry(id)
863                        .or_default()
864                        .insert(subscription);
865
866                    // Set up the event channel for the subscription
867                    let (event_tx, mut event_rx) = mpsc::channel(32);
868                    runtime_tx
869                        .send(TheaterCommand::SubscribeToActor {
870                            actor_id: id,
871                            event_tx,
872                        })
873                        .await
874                        .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
875
876                    // Add to the list of subscriptions for this connection
877                    connection_subscriptions.push((id, subscription_id));
878
879                    // Create a task to forward events to this client
880                    let client_tx_clone = cmd_client_tx.clone();
881                    tokio::spawn(async move {
882                        debug!(
883                            "Starting event forwarder for subscription {}",
884                            subscription_id
885                        );
886                        while let Some((_actor_id, event)) = event_rx.recv().await {
887                            debug!("Received event for subscription {}", subscription_id);
888                            let response = ManagementResponse::ActorEvent { event };
889                            if let Err(e) = client_tx_clone.send(response).await {
890                                debug!("Failed to forward event to client: {}", e);
891                                break;
892                            }
893                        }
894                        debug!(
895                            "Event forwarder for subscription {} stopped",
896                            subscription_id
897                        );
898                    });
899
900                    ManagementResponse::Subscribed {
901                        id,
902                        subscription_id,
903                    }
904                }
905                ManagementCommand::UnsubscribeFromActor {
906                    id,
907                    subscription_id,
908                } => {
909                    debug!(
910                        "Removing subscription {} for actor {:?}",
911                        subscription_id, id
912                    );
913
914                    // Remove subscription from the tracking list for this connection
915                    connection_subscriptions
916                        .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
917
918                    // Remove from the global subscriptions map
919                    let mut subs = subscriptions.lock().await;
920                    if let Some(actor_subs) = subs.get_mut(&id) {
921                        actor_subs.retain(|sub| sub.id != subscription_id);
922
923                        // Remove the entry if no subscriptions remain
924                        if actor_subs.is_empty() {
925                            subs.remove(&id);
926                        }
927                    }
928
929                    debug!("Subscription removed");
930                    ManagementResponse::Unsubscribed { id }
931                }
932                ManagementCommand::SendActorMessage { id, data } => {
933                    info!("Sending message to actor: {:?}", id);
934
935                    // Create response channel for routing
936                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
937
938                    // Create ActorMessage
939                    let message = ActorMessage::Send(ActorSend { data });
940
941                    // Route via MessageRouter
942                    match message_router
943                        .route_message(theater::messages::MessageCommand::SendMessage {
944                            target_id: id,
945                            message,
946                            response_tx,
947                        })
948                        .await
949                    {
950                        Ok(_) => {
951                            // Wait for routing result
952                            match response_rx.await {
953                                Ok(Ok(())) => {
954                                    info!("Message sent successfully to actor: {:?}", id);
955                                    ManagementResponse::SentMessage { id }
956                                }
957                                Ok(Err(e)) => {
958                                    error!("Failed to send message to actor: {}", e);
959                                    ManagementResponse::Error {
960                                        error: ManagementError::RuntimeError(format!(
961                                            "Failed to send: {}",
962                                            e
963                                        )),
964                                    }
965                                }
966                                Err(e) => {
967                                    error!("Failed to receive routing response: {}", e);
968                                    ManagementResponse::Error {
969                                        error: ManagementError::CommunicationError(format!(
970                                            "Failed to receive routing response: {}",
971                                            e
972                                        )),
973                                    }
974                                }
975                            }
976                        }
977                        Err(e) => {
978                            error!("Failed to route message: {}", e);
979                            ManagementResponse::Error {
980                                error: ManagementError::RuntimeError(format!(
981                                    "Failed to route message: {}",
982                                    e
983                                )),
984                            }
985                        }
986                    }
987                }
988                ManagementCommand::RequestActorMessage { id, data } => {
989                    info!("Requesting message from actor: {:?}", id);
990
991                    // Create channels for request-response pattern
992                    let (route_tx, route_rx) = tokio::sync::oneshot::channel();
993                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
994
995                    // Create ActorMessage with response channel embedded
996                    let message = ActorMessage::Request(ActorRequest { data, response_tx });
997
998                    // Route via MessageRouter
999                    match message_router
1000                        .route_message(theater::messages::MessageCommand::SendMessage {
1001                            target_id: id,
1002                            message,
1003                            response_tx: route_tx,
1004                        })
1005                        .await
1006                    {
1007                        Ok(_) => {
1008                            // Wait for routing to complete
1009                            match route_rx.await {
1010                                Ok(Ok(())) => {
1011                                    // Routing succeeded, now wait for actor's response
1012                                    match response_rx.await {
1013                                        Ok(response_data) => {
1014                                            info!("Received response from actor: {:?}", id);
1015                                            ManagementResponse::RequestedMessage {
1016                                                id,
1017                                                message: response_data,
1018                                            }
1019                                        }
1020                                        Err(e) => {
1021                                            error!("Actor didn't respond: {}", e);
1022                                            ManagementResponse::Error {
1023                                                error: ManagementError::RuntimeError(format!(
1024                                                    "Actor didn't respond: {}",
1025                                                    e
1026                                                )),
1027                                            }
1028                                        }
1029                                    }
1030                                }
1031                                Ok(Err(e)) => {
1032                                    error!("Failed to route request to actor: {}", e);
1033                                    ManagementResponse::Error {
1034                                        error: ManagementError::RuntimeError(format!(
1035                                            "Failed to route: {}",
1036                                            e
1037                                        )),
1038                                    }
1039                                }
1040                                Err(e) => {
1041                                    error!("Failed to receive routing response: {}", e);
1042                                    ManagementResponse::Error {
1043                                        error: ManagementError::CommunicationError(format!(
1044                                            "Failed to receive routing response: {}",
1045                                            e
1046                                        )),
1047                                    }
1048                                }
1049                            }
1050                        }
1051                        Err(e) => {
1052                            error!("Failed to route request: {}", e);
1053                            ManagementResponse::Error {
1054                                error: ManagementError::RuntimeError(format!(
1055                                    "Failed to route request: {}",
1056                                    e
1057                                )),
1058                            }
1059                        }
1060                    }
1061                }
1062                ManagementCommand::GetActorManifest { id } => {
1063                    info!("Getting manifest for actor: {:?}", id);
1064                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1065                    runtime_tx
1066                        .send(TheaterCommand::GetActorManifest {
1067                            actor_id: id,
1068                            response_tx: cmd_tx,
1069                        })
1070                        .await?;
1071
1072                    let manifest = cmd_rx.await?;
1073                    ManagementResponse::ActorManifest {
1074                        id,
1075                        manifest: manifest?,
1076                    }
1077                }
1078                ManagementCommand::GetActorStatus { id } => {
1079                    info!("Getting status for actor: {:?}", id);
1080                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1081                    runtime_tx
1082                        .send(TheaterCommand::GetActorStatus {
1083                            actor_id: id,
1084                            response_tx: cmd_tx,
1085                        })
1086                        .await?;
1087
1088                    let status = cmd_rx.await?;
1089                    ManagementResponse::ActorStatus {
1090                        id,
1091                        status: status?,
1092                    }
1093                }
1094                ManagementCommand::RestartActor { id } => {
1095                    info!("Restarting actor: {:?}", id);
1096                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1097                    runtime_tx
1098                        .send(TheaterCommand::RestartActor {
1099                            actor_id: id,
1100                            response_tx: cmd_tx,
1101                        })
1102                        .await?;
1103
1104                    match cmd_rx.await? {
1105                        Ok(_) => ManagementResponse::Restarted { id },
1106                        Err(e) => ManagementResponse::Error {
1107                            error: ManagementError::RuntimeError(format!(
1108                                "Failed to restart actor: {}",
1109                                e
1110                            )),
1111                        },
1112                    }
1113                }
1114                ManagementCommand::GetActorState { id } => {
1115                    info!("Getting state for actor: {:?}", id);
1116                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1117                    runtime_tx
1118                        .send(TheaterCommand::GetActorState {
1119                            actor_id: id,
1120                            response_tx: cmd_tx,
1121                        })
1122                        .await?;
1123
1124                    let state = cmd_rx.await?;
1125                    ManagementResponse::ActorState { id, state: state? }
1126                }
1127                ManagementCommand::GetActorMetrics { id } => {
1128                    info!("Getting metrics for actor: {:?}", id);
1129                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1130                    runtime_tx
1131                        .send(TheaterCommand::GetActorMetrics {
1132                            actor_id: id,
1133                            response_tx: cmd_tx,
1134                        })
1135                        .await?;
1136
1137                    let metrics = cmd_rx.await?;
1138                    ManagementResponse::ActorMetrics {
1139                        id,
1140                        metrics: serde_json::to_value(metrics?)?,
1141                    }
1142                }
1143                ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1144                    // TODO: Re-implement actor package updates
1145                    ManagementResponse::Error {
1146                        error: ManagementError::RuntimeError(
1147                            "UpdateActorPackage not yet implemented".to_string(),
1148                        ),
1149                    }
1150                }
1151                // Handle channel management commands
1152                ManagementCommand::OpenChannel {
1153                    actor_id,
1154                    initial_message,
1155                } => {
1156                    info!("Opening channel to actor: {:?}", actor_id);
1157
1158                    // Create a response channel
1159                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1160
1161                    // Generate a channel ID
1162                    let client_id = ChannelParticipant::External;
1163                    let channel_id = ChannelId::new(&client_id, &actor_id);
1164                    let channel_id_str = channel_id.0.clone();
1165
1166                    // Send the channel open command via MessageRouter
1167                    message_router
1168                        .route_message(theater::messages::MessageCommand::OpenChannel {
1169                            initiator_id: client_id.clone(),
1170                            target_id: actor_id.clone(),
1171                            channel_id: channel_id.clone(),
1172                            initial_message,
1173                            response_tx,
1174                        })
1175                        .await
1176                        .map_err(|e| {
1177                            anyhow::anyhow!("Failed to send channel open command: {}", e)
1178                        })?;
1179
1180                    // Wait for the response
1181                    match response_rx.await {
1182                        Ok(result) => {
1183                            match result {
1184                                Ok(accepted) => {
1185                                    if accepted {
1186                                        // Channel opened successfully
1187                                        info!("Channel opened successfully: {}", channel_id_str);
1188
1189                                        // Register the channel subscription to receive messages
1190                                        let channel_sub = ChannelSubscription {
1191                                            channel_id: channel_id_str.clone(),
1192                                            initiator_id: client_id.clone(),
1193                                            target_id: actor_id.clone(),
1194                                            client_tx: cmd_client_tx.clone(),
1195                                        };
1196
1197                                        channel_subscriptions
1198                                            .lock()
1199                                            .await
1200                                            .insert(channel_id_str.clone(), channel_sub);
1201
1202                                        // Track this channel for cleanup on disconnect
1203                                        connection_channel_subscriptions
1204                                            .push(channel_id_str.clone());
1205
1206                                        ManagementResponse::ChannelOpened {
1207                                            channel_id: channel_id_str,
1208                                            actor_id,
1209                                        }
1210                                    } else {
1211                                        // Channel rejected by target
1212                                        ManagementResponse::Error {
1213                                            error: ManagementError::ChannelRejected,
1214                                        }
1215                                    }
1216                                }
1217                                Err(e) => ManagementResponse::Error {
1218                                    error: ManagementError::RuntimeError(format!(
1219                                        "Error opening channel: {}",
1220                                        e
1221                                    )),
1222                                },
1223                            }
1224                        }
1225                        Err(e) => ManagementResponse::Error {
1226                            error: ManagementError::CommunicationError(format!(
1227                                "Failed to receive channel open response: {}",
1228                                e
1229                            )),
1230                        },
1231                    }
1232                }
1233                ManagementCommand::SendOnChannel {
1234                    channel_id,
1235                    message,
1236                } => {
1237                    info!("Sending message on channel: {}", channel_id);
1238
1239                    // Create response channel
1240                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1241
1242                    // Parse the channel ID
1243                    let channel_id_parsed = ChannelId(channel_id.clone());
1244
1245                    // Send the message on the channel via MessageRouter
1246                    let sender_id = ChannelParticipant::External;
1247                    match message_router
1248                        .route_message(theater::messages::MessageCommand::ChannelMessage {
1249                            channel_id: channel_id_parsed,
1250                            sender_id,
1251                            message,
1252                            response_tx,
1253                        })
1254                        .await
1255                    {
1256                        Ok(_) => {
1257                            // Wait for routing result
1258                            match response_rx.await {
1259                                Ok(Ok(())) => {
1260                                    info!("Message sent successfully on channel: {}", channel_id);
1261                                    ManagementResponse::MessageSent { channel_id }
1262                                }
1263                                Ok(Err(e)) => {
1264                                    error!("Failed to send on channel: {}", e);
1265                                    ManagementResponse::Error {
1266                                        error: ManagementError::RuntimeError(format!(
1267                                            "Failed to send on channel: {}",
1268                                            e
1269                                        )),
1270                                    }
1271                                }
1272                                Err(e) => {
1273                                    error!("Failed to receive channel send response: {}", e);
1274                                    ManagementResponse::Error {
1275                                        error: ManagementError::CommunicationError(format!(
1276                                            "Failed to receive channel send response: {}",
1277                                            e
1278                                        )),
1279                                    }
1280                                }
1281                            }
1282                        }
1283                        Err(e) => {
1284                            error!("Failed to route channel message: {}", e);
1285                            ManagementResponse::Error {
1286                                error: ManagementError::RuntimeError(format!(
1287                                    "Failed to route channel message: {}",
1288                                    e
1289                                )),
1290                            }
1291                        }
1292                    }
1293                }
1294                ManagementCommand::CloseChannel { channel_id } => {
1295                    info!("Closing channel: {}", channel_id);
1296
1297                    // Create response channel
1298                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1299
1300                    // Parse the channel ID
1301                    let channel_id_parsed = ChannelId(channel_id.clone());
1302
1303                    // Close the channel via MessageRouter
1304                    let sender_id = ChannelParticipant::External;
1305                    match message_router
1306                        .route_message(theater::messages::MessageCommand::ChannelClose {
1307                            channel_id: channel_id_parsed,
1308                            sender_id,
1309                            response_tx,
1310                        })
1311                        .await
1312                    {
1313                        Ok(_) => {
1314                            // Wait for routing result
1315                            match response_rx.await {
1316                                Ok(Ok(())) => {
1317                                    info!("Channel closed successfully: {}", channel_id);
1318
1319                                    // Remove from channel subscriptions
1320                                    channel_subscriptions.lock().await.remove(&channel_id);
1321                                    connection_channel_subscriptions.retain(|id| id != &channel_id);
1322
1323                                    ManagementResponse::ChannelClosed { channel_id }
1324                                }
1325                                Ok(Err(e)) => {
1326                                    error!("Failed to close channel: {}", e);
1327                                    ManagementResponse::Error {
1328                                        error: ManagementError::RuntimeError(format!(
1329                                            "Failed to close channel: {}",
1330                                            e
1331                                        )),
1332                                    }
1333                                }
1334                                Err(e) => {
1335                                    error!("Failed to receive channel close response: {}", e);
1336                                    ManagementResponse::Error {
1337                                        error: ManagementError::CommunicationError(format!(
1338                                            "Failed to receive channel close response: {}",
1339                                            e
1340                                        )),
1341                                    }
1342                                }
1343                            }
1344                        }
1345                        Err(e) => {
1346                            error!("Failed to route channel close: {}", e);
1347                            ManagementResponse::Error {
1348                                error: ManagementError::RuntimeError(format!(
1349                                    "Failed to route channel close: {}",
1350                                    e
1351                                )),
1352                            }
1353                        }
1354                    }
1355                }
1356                ManagementCommand::NewStore {} => {
1357                    info!("Creating new store");
1358                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1359                    runtime_tx
1360                        .send(TheaterCommand::NewStore {
1361                            response_tx: cmd_tx,
1362                        })
1363                        .await?;
1364
1365                    let store_id = cmd_rx.await?;
1366                    ManagementResponse::StoreCreated {
1367                        store_id: store_id?.id,
1368                    }
1369                }
1370            };
1371
1372            debug!("Sending response: {:?}", response);
1373            if let Err(e) = client_tx.send(response).await {
1374                error!("Failed to send response: {}", e);
1375                break;
1376            }
1377            debug!("Response sent");
1378        }
1379
1380        // Clean up all subscriptions for this connection
1381        debug!(
1382            "Connection closed, cleaning up {} subscriptions",
1383            connection_subscriptions.len()
1384        );
1385        let mut subs = subscriptions.lock().await;
1386
1387        for (actor_id, sub_id) in connection_subscriptions {
1388            if let Some(actor_subs) = subs.get_mut(&actor_id) {
1389                actor_subs.retain(|sub| sub.id != sub_id);
1390
1391                // Remove the entry if no subscriptions remain
1392                if actor_subs.is_empty() {
1393                    subs.remove(&actor_id);
1394                }
1395            }
1396        }
1397
1398        // Clean up channel subscriptions
1399        debug!(
1400            "Connection closed, cleaning up {} channel subscriptions",
1401            connection_channel_subscriptions.len()
1402        );
1403        let mut channel_subs = channel_subscriptions.lock().await;
1404
1405        for channel_id in connection_channel_subscriptions {
1406            channel_subs.remove(&channel_id);
1407        }
1408
1409        debug!("Cleaned up all subscriptions for the connection");
1410        Ok(())
1411    }
1412}