Skip to main content

theater_server/
server.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9    ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10    ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21    RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::resolve_reference;
28use theater::TheaterRuntimeError;
29
30// Import Theater-specific handlers only
31// DEPRECATED: WASI handlers (environment, filesystem, http, io, etc.) moved to crates/deprecated/
32use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42    StartActor {
43        manifest: String,
44        initial_state: Option<Vec<u8>>,
45        parent: bool,
46        subscribe: bool,
47    },
48    StopActor {
49        id: TheaterId,
50    },
51    TerminateActor {
52        id: TheaterId,
53    },
54    ListActors,
55    SubscribeToActor {
56        id: TheaterId,
57    },
58    UnsubscribeFromActor {
59        id: TheaterId,
60        subscription_id: Uuid,
61    },
62    SendActorMessage {
63        id: TheaterId,
64        data: Vec<u8>,
65    },
66    RequestActorMessage {
67        id: TheaterId,
68        data: Vec<u8>,
69    },
70    GetActorManifest {
71        id: TheaterId,
72    },
73    GetActorStatus {
74        id: TheaterId,
75    },
76    RestartActor {
77        id: TheaterId,
78    },
79    GetActorState {
80        id: TheaterId,
81    },
82    GetActorEvents {
83        id: TheaterId,
84    },
85    GetActorMetrics {
86        id: TheaterId,
87    },
88    UpdateActorPackage {
89        id: TheaterId,
90        package: String,
91    },
92    // Channel management commands
93    OpenChannel {
94        actor_id: ChannelParticipant,
95        initial_message: Vec<u8>,
96    },
97    SendOnChannel {
98        channel_id: String,
99        message: Vec<u8>,
100    },
101    CloseChannel {
102        channel_id: String,
103    },
104
105    // Store commands
106    NewStore {},
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[allow(clippy::large_enum_variant)]
111pub enum ManagementResponse {
112    ActorStarted {
113        id: TheaterId,
114    },
115    ActorStopped {
116        id: TheaterId,
117    },
118    ActorList {
119        actors: Vec<(TheaterId, String)>,
120    },
121    Subscribed {
122        id: TheaterId,
123        subscription_id: Uuid,
124    },
125    Unsubscribed {
126        id: TheaterId,
127    },
128    ActorEvent {
129        event: ChainEvent,
130    },
131    ActorResult(ActorResult),
132    Error {
133        error: ManagementError,
134    },
135    RequestedMessage {
136        id: TheaterId,
137        message: Vec<u8>,
138    },
139    SentMessage {
140        id: TheaterId,
141    },
142    ActorStatus {
143        id: TheaterId,
144        status: ActorStatus,
145    },
146    Restarted {
147        id: TheaterId,
148    },
149    ActorManifest {
150        id: TheaterId,
151        manifest: ManifestConfig,
152    },
153    ActorState {
154        id: TheaterId,
155        state: Value,
156    },
157    ActorEvents {
158        id: TheaterId,
159        events: Vec<ChainEvent>,
160    },
161    ActorMetrics {
162        id: TheaterId,
163        metrics: serde_json::Value,
164    },
165    ActorPackageUpdated {
166        id: TheaterId,
167    },
168    // Channel management responses
169    ChannelOpened {
170        channel_id: String,
171        actor_id: ChannelParticipant,
172    },
173    MessageSent {
174        channel_id: String,
175    },
176    ChannelMessage {
177        channel_id: String,
178        sender_id: ChannelParticipant,
179        message: Vec<u8>,
180    },
181    ChannelClosed {
182        channel_id: String,
183    },
184
185    // Store responses
186    StoreCreated {
187        store_id: String,
188    },
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub enum ManagementError {
193    // Actor-related errors
194    ActorNotFound,
195    ActorAlreadyExists,
196    ActorNotRunning,
197    ActorError(String),
198
199    // Channel-related errors
200    ChannelNotFound,
201    ChannelClosed,
202    ChannelRejected,
203
204    // Store-related errors
205    StoreError(String),
206
207    // Communication errors
208    CommunicationError(String),
209
210    // Request handling errors
211    InvalidRequest(String),
212    Timeout,
213
214    // System errors
215    RuntimeError(String),
216    InternalError(String),
217
218    // Serialization/deserialization errors
219    SerializationError(String),
220
221    // Actor initialization errors
222    ActorInitializationError(String),
223}
224
225// Allow converting from TheaterRuntimeError to ManagementError
226impl From<TheaterRuntimeError> for ManagementError {
227    fn from(err: TheaterRuntimeError) -> Self {
228        match err {
229            TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
230            TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
231            TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
232            TheaterRuntimeError::ActorOperationFailed(msg) => {
233                ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
234            }
235            TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
236            TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
237            TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
238            TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
239            TheaterRuntimeError::SerializationError(msg) => {
240                ManagementError::SerializationError(msg)
241            }
242            TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
243            TheaterRuntimeError::ActorInitializationError(msg) => {
244                ManagementError::ActorInitializationError(msg)
245            }
246        }
247    }
248}
249
250// Implement Display for ManagementError to provide better error messages
251impl std::fmt::Display for ManagementError {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        match self {
254            ManagementError::ActorNotFound => write!(f, "Actor not found"),
255            ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
256            ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
257            ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
258            ManagementError::ChannelNotFound => write!(f, "Channel not found"),
259            ManagementError::ChannelClosed => write!(f, "Channel is closed"),
260            ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
261            ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
262            ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
263            ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
264            ManagementError::Timeout => write!(f, "Operation timed out"),
265            ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
266            ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
267            ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
268            ManagementError::ActorInitializationError(msg) => {
269                write!(f, "Actor initialization error: {}", msg)
270            }
271        }
272    }
273}
274
275// Implement Error trait for ManagementError
276impl std::error::Error for ManagementError {}
277
278#[derive(Debug)]
279#[allow(dead_code)]
280struct Subscription {
281    id: Uuid,
282    client_tx: mpsc::Sender<ManagementResponse>,
283}
284
285impl Eq for Subscription {}
286impl PartialEq for Subscription {
287    fn eq(&self, other: &Self) -> bool {
288        self.id == other.id
289    }
290}
291impl std::hash::Hash for Subscription {
292    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
293        self.id.hash(state);
294    }
295}
296
297// ChannelEvent is now imported from theater::ChannelEvent
298
299// Structure to track active channel subscriptions
300#[derive(Debug)]
301#[allow(dead_code)]
302struct ChannelSubscription {
303    channel_id: String,
304    initiator_id: ChannelParticipant,
305    target_id: ChannelParticipant,
306    client_tx: mpsc::Sender<ManagementResponse>,
307}
308
309/// Creates a HandlerRegistry with Theater-specific handlers.
310///
311/// NOTE: WASI handlers (environment, filesystem, http, io, sockets, timing, random, process)
312/// have been deprecated and moved to crates/deprecated/. They will be redesigned for
313/// Composite runtime support later.
314///
315/// Returns both the HandlerRegistry and the MessageRouter, allowing the server
316/// to use the MessageRouter for external client messaging.
317fn create_root_handler_registry(
318    theater_tx: mpsc::Sender<TheaterCommand>,
319) -> (
320    HandlerRegistry,
321    theater_handler_message_server::MessageRouter,
322) {
323    let mut registry = HandlerRegistry::new();
324
325    info!("Initializing Theater server with Theater-specific handlers...");
326
327    // Runtime handler - provides actor runtime information and control
328    let runtime_config = RuntimeHostConfig {};
329    registry.register(RuntimeHandler::new(
330        runtime_config,
331        theater_tx.clone(),
332        None,
333    ));
334
335    // Store handler - provides key-value storage for actors
336    let store_config = StoreHandlerConfig::default();
337    registry.register(StoreHandler::new(store_config, None));
338
339    // Supervisor handler - allows actors to spawn and manage child actors
340    let supervisor_config = SupervisorHostConfig {};
341    registry.register(SupervisorHandler::new(supervisor_config, None));
342
343    // Message server handler - provides inter-actor messaging
344    let message_router = theater_handler_message_server::MessageRouter::new();
345    registry.register(MessageServerHandler::new(None, message_router.clone()));
346
347    // TCP handler - provides raw TCP networking for actors
348    let tcp_config = TcpHandlerConfig {
349        listen: None,
350        max_connections: None,
351        ..Default::default()
352    };
353    registry.register(TcpHandler::new(tcp_config));
354
355    info!("✓ 5 Theater-specific handlers registered");
356    info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
357
358    (registry, message_router)
359}
360
361pub struct TheaterServer {
362    runtime: TheaterRuntime,
363    theater_tx: mpsc::Sender<TheaterCommand>,
364    management_socket: TcpListener,
365    subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
366    // Field to track channel subscriptions
367    channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
368    // Channel for runtime to send channel events back to server
369    #[allow(dead_code)]
370    channel_events_tx: mpsc::Sender<ChannelEvent>,
371    // MessageRouter for external client messaging
372    message_router: theater_handler_message_server::MessageRouter,
373}
374
375impl TheaterServer {
376    // Process channel events and forward them to subscribed clients
377    async fn process_channel_events(
378        mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
379        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
380    ) {
381        while let Some(event) = channel_events_rx.recv().await {
382            match event {
383                ChannelEvent::Message {
384                    channel_id,
385                    sender_id,
386                    message,
387                } => {
388                    tracing::debug!("Received channel message for {}", channel_id);
389                    // Forward to subscribed clients
390                    let subs = channel_subscriptions.lock().await;
391                    if let Some(sub) = subs.get(&channel_id.0) {
392                        let response = ManagementResponse::ChannelMessage {
393                            channel_id: channel_id.0.clone(),
394                            sender_id,
395                            message,
396                        };
397
398                        tracing::debug!("Forwarding channel message to client: {:?}", response);
399
400                        if let Err(e) = sub.client_tx.send(response).await {
401                            tracing::warn!("Failed to forward channel message: {}", e);
402                        } else {
403                            tracing::debug!("Forwarded channel message to client");
404                        }
405                    }
406                }
407                ChannelEvent::Close { channel_id } => {
408                    tracing::debug!("Received channel close event for {}", channel_id);
409                    // Forward to subscribed clients
410                    let mut subs = channel_subscriptions.lock().await;
411                    if let Some(sub) = subs.remove(&channel_id.0) {
412                        let response = ManagementResponse::ChannelClosed {
413                            channel_id: channel_id.0.clone(),
414                        };
415
416                        if let Err(e) = sub.client_tx.send(response).await {
417                            tracing::warn!("Failed to forward channel close event: {}", e);
418                        } else {
419                            tracing::debug!("Forwarded channel close event to client");
420                        }
421                    }
422                }
423            }
424        }
425    }
426
427    pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
428        let (theater_tx, theater_rx) = mpsc::channel(32);
429
430        // Create channel for runtime to send channel events back to server
431        let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
432
433        // Create handler registry with all migrated handlers (root permissions)
434        // Also get the MessageRouter for external client messaging
435        let (handler_registry, message_router) = create_root_handler_registry(theater_tx.clone());
436
437        // Create the runtime with the handler registry
438        let runtime = TheaterRuntime::new(
439            theater_tx.clone(),
440            theater_rx,
441            Some(channel_events_tx.clone()),
442            handler_registry,
443        )
444        .await?;
445        let management_socket = TcpListener::bind(address).await?;
446
447        let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
448
449        // Start task to process channel events
450        let channel_subs_clone = channel_subscriptions.clone();
451        tokio::spawn(async move {
452            Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
453        });
454
455        Ok(Self {
456            runtime,
457            theater_tx,
458            management_socket,
459            subscriptions: Arc::new(Mutex::new(HashMap::new())),
460            channel_subscriptions,
461            channel_events_tx,
462            message_router,
463        })
464    }
465
466    pub async fn run(mut self) -> Result<()> {
467        info!(
468            "Theater server starting on {:?}",
469            self.management_socket.local_addr()?
470        );
471
472        // Start the theater runtime in its own task
473        let runtime_handle = tokio::spawn(async move {
474            match self.runtime.run().await {
475                Ok(_) => Ok(()),
476                Err(e) => {
477                    error!("Theater runtime failed: {}", e);
478                    Err(e)
479                }
480            }
481        });
482
483        // Accept and handle management connections
484        while let Ok((socket, addr)) = self.management_socket.accept().await {
485            info!("New management connection from {}", addr);
486            let runtime_tx = self.theater_tx.clone();
487            let subscriptions = self.subscriptions.clone();
488            let channel_subscriptions = self.channel_subscriptions.clone();
489            let message_router = self.message_router.clone();
490
491            tokio::spawn(async move {
492                if let Err(e) = Self::handle_management_connection(
493                    socket,
494                    runtime_tx,
495                    subscriptions,
496                    channel_subscriptions,
497                    message_router,
498                )
499                .await
500                {
501                    error!("Error handling management connection: {}", e);
502                }
503            });
504        }
505
506        runtime_handle.await??;
507        Ok(())
508    }
509
510    async fn handle_management_connection(
511        socket: TcpStream,
512        runtime_tx: mpsc::Sender<TheaterCommand>,
513        subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
514        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
515        message_router: theater_handler_message_server::MessageRouter,
516    ) -> Result<()> {
517        // Create a channel for sending responses to this client
518        let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
519
520        let codec = FragmentingCodec::new();
521        let framed = Framed::new(socket, codec);
522
523        // Split the framed connection into read and write parts
524        let (mut framed_sink, mut framed_stream) = framed.split();
525
526        // Clone the client_tx for use in the command loop
527        let cmd_client_tx = client_tx.clone();
528
529        // Start a task to forward responses to the client
530        let _response_task = tokio::spawn(async move {
531            while let Some(response) = client_rx.recv().await {
532                match serde_json::to_vec(&response) {
533                    Ok(data) => {
534                        debug!("Serialized response: {} bytes", data.len());
535                        if data.len() > 10 * 1024 * 1024 {
536                            debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
537                        }
538                        if let Err(e) = framed_sink.send(Bytes::from(data)).await {
539                            debug!("Error sending response to client: {}", e);
540                            break;
541                        }
542                    }
543                    Err(e) => {
544                        error!("Error serializing response: {}", e);
545                    }
546                }
547            }
548            debug!("Response forwarder for client closed");
549        });
550
551        // Store active subscriptions for this connection to clean up on disconnect
552        let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
553
554        // Store active channel subscriptions for cleanup
555        let mut connection_channel_subscriptions: Vec<String> = Vec::new();
556
557        // Loop until connection closes or an error occurs
558        'connection: while let Some(msg) = framed_stream.next().await {
559            debug!("Received management message");
560            let msg = match msg {
561                Ok(m) => m,
562                Err(e) => {
563                    error!("Error receiving message: {}", e);
564                    break 'connection;
565                }
566            };
567
568            let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
569                Ok(c) => c,
570                Err(e) => {
571                    error!(
572                        "Error parsing command: {} {}",
573                        e,
574                        String::from_utf8_lossy(&msg)
575                    );
576                    continue;
577                }
578            };
579            debug!("Parsed command: {:?}", cmd);
580
581            // Store the command for reference (used for subscription tracking)
582            let _cmd_clone = cmd.clone();
583
584            let response = match cmd {
585                ManagementCommand::StartActor {
586                    manifest,
587                    initial_state: _initial_state,
588                    parent,
589                    subscribe,
590                } => {
591                    info!("Starting actor from manifest: {:?}", manifest);
592
593                    // Load and parse manifest
594                    let manifest_str = match resolve_reference(&manifest).await {
595                        Ok(bytes) => match String::from_utf8(bytes) {
596                            Ok(s) => s,
597                            Err(e) => {
598                                error!("Invalid manifest encoding: {}", e);
599                                cmd_client_tx
600                                    .send(ManagementResponse::Error {
601                                        error: ManagementError::ActorInitializationError(format!(
602                                            "Invalid manifest encoding: {}",
603                                            e
604                                        )),
605                                    })
606                                    .await
607                                    .ok();
608                                continue;
609                            }
610                        },
611                        Err(e) => {
612                            error!("Failed to load manifest: {}", e);
613                            cmd_client_tx
614                                .send(ManagementResponse::Error {
615                                    error: ManagementError::ActorInitializationError(format!(
616                                        "Failed to load manifest: {}",
617                                        e
618                                    )),
619                                })
620                                .await
621                                .ok();
622                            continue;
623                        }
624                    };
625
626                    let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
627                        Ok(m) => m,
628                        Err(e) => {
629                            error!("Failed to parse manifest: {}", e);
630                            cmd_client_tx
631                                .send(ManagementResponse::Error {
632                                    error: ManagementError::ActorInitializationError(format!(
633                                        "Failed to parse manifest: {}",
634                                        e
635                                    )),
636                                })
637                                .await
638                                .ok();
639                            continue;
640                        }
641                    };
642
643                    // Load wasm bytes
644                    let wasm_bytes = match resolve_reference(&manifest_config.package).await {
645                        Ok(bytes) => bytes,
646                        Err(e) => {
647                            error!("Failed to load WASM: {}", e);
648                            cmd_client_tx
649                                .send(ManagementResponse::Error {
650                                    error: ManagementError::ActorInitializationError(format!(
651                                        "Failed to load WASM: {}",
652                                        e
653                                    )),
654                                })
655                                .await
656                                .ok();
657                            continue;
658                        }
659                    };
660
661                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
662                    debug!("Sending SpawnActor command to runtime");
663                    let supervisor_tx = if parent {
664                        let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
665                        let cmd_client_tx = cmd_client_tx.clone();
666                        tokio::spawn(async move {
667                            while let Some(res) = supervisor_rx.recv().await {
668                                debug!("Received supervisor response: {:?}", res);
669                                if let Err(e) = cmd_client_tx
670                                    .send(ManagementResponse::ActorResult(res))
671                                    .await
672                                {
673                                    error!("Failed to send supervisor response: {}", e);
674                                    break;
675                                }
676                            }
677                        });
678                        Some(supervisor_tx)
679                    } else {
680                        None
681                    };
682                    let subscription_tx = if subscribe {
683                        let (event_tx, mut event_rx) = mpsc::channel(32);
684
685                        // set up a task to forward events to the client
686                        let cmd_client_tx = cmd_client_tx.clone();
687                        tokio::spawn(async move {
688                            while let Some(event) = event_rx.recv().await {
689                                debug!("Received event for subscription");
690                                let response = match event {
691                                    Ok(event) => ManagementResponse::ActorEvent { event },
692                                    Err(_e) => {
693                                        // Actor errors are already captured in the event chain
694                                        // No need to send duplicate error responses
695                                        debug!("Actor error received, but already in event chain");
696                                        continue;
697                                    }
698                                };
699                                if let Err(e) = cmd_client_tx.send(response).await {
700                                    debug!("Failed to forward event to client: {}", e);
701                                    break;
702                                }
703                            }
704                            debug!("Event forwarder for subscription stopped");
705                        });
706
707                        Some(event_tx)
708                    } else {
709                        None
710                    };
711                    match runtime_tx
712                        .send(TheaterCommand::SpawnActor {
713                            wasm_bytes,
714                            name: Some(manifest_config.name.clone()),
715                            manifest: Some(manifest_config),
716                            init_bytes: None,
717                            response_tx: cmd_tx,
718                            supervisor_tx,
719                            subscription_tx,
720                        })
721                        .await
722                    {
723                        Ok(_) => {
724                            debug!("SpawnActor command sent to runtime, awaiting response");
725                            match cmd_rx.await {
726                                Ok(result) => match result {
727                                    Ok(actor_id) => {
728                                        info!("Actor started with ID: {:?}", actor_id);
729                                        ManagementResponse::ActorStarted { id: actor_id }
730                                    }
731                                    Err(e) => {
732                                        error!("Runtime failed to start actor: {}", e);
733                                        ManagementResponse::Error {
734                                            error: ManagementError::RuntimeError(format!(
735                                                "Failed to start actor: {}",
736                                                e
737                                            )),
738                                        }
739                                    }
740                                },
741                                Err(e) => {
742                                    error!("Failed to receive spawn response: {}", e);
743                                    ManagementResponse::Error {
744                                        error: ManagementError::CommunicationError(format!(
745                                            "Failed to receive spawn response: {}",
746                                            e
747                                        )),
748                                    }
749                                }
750                            }
751                        }
752                        Err(e) => {
753                            error!("Failed to send SpawnActor command: {}", e);
754                            ManagementResponse::Error {
755                                error: ManagementError::CommunicationError(format!(
756                                    "Failed to send spawn command: {}",
757                                    e
758                                )),
759                            }
760                        }
761                    }
762                }
763                ManagementCommand::StopActor { id } => {
764                    info!("Stopping actor: {:?}", id);
765                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
766                    runtime_tx
767                        .send(TheaterCommand::StopActor {
768                            actor_id: id,
769                            response_tx: cmd_tx,
770                        })
771                        .await?;
772
773                    match cmd_rx.await? {
774                        Ok(_) => {
775                            subscriptions.lock().await.remove(&id);
776                            ManagementResponse::ActorStopped { id }
777                        }
778                        Err(e) => ManagementResponse::Error {
779                            error: ManagementError::RuntimeError(format!(
780                                "Failed to stop actor: {}",
781                                e
782                            )),
783                        },
784                    }
785                }
786                ManagementCommand::TerminateActor { id } => {
787                    info!("Terminating actor: {:?}", id);
788                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
789                    runtime_tx
790                        .send(TheaterCommand::TerminateActor {
791                            actor_id: id,
792                            response_tx: cmd_tx,
793                        })
794                        .await?;
795
796                    match cmd_rx.await? {
797                        Ok(_) => {
798                            subscriptions.lock().await.remove(&id);
799                            ManagementResponse::ActorStopped { id }
800                        }
801                        Err(e) => ManagementResponse::Error {
802                            error: ManagementError::RuntimeError(format!(
803                                "Failed to terminate actor: {}",
804                                e
805                            )),
806                        },
807                    }
808                }
809                ManagementCommand::ListActors => {
810                    debug!("Listing actors");
811                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
812                    runtime_tx
813                        .send(TheaterCommand::GetActors {
814                            response_tx: cmd_tx,
815                        })
816                        .await?;
817
818                    match cmd_rx.await? {
819                        Ok(actors) => {
820                            info!("Found {} actors", actors.len());
821                            ManagementResponse::ActorList { actors }
822                        }
823                        Err(e) => ManagementResponse::Error {
824                            error: ManagementError::RuntimeError(format!(
825                                "Failed to list actors: {}",
826                                e
827                            )),
828                        },
829                    }
830                }
831                ManagementCommand::SubscribeToActor { id } => {
832                    info!("New subscription request for actor: {:?}", id);
833                    let subscription_id = Uuid::new_v4();
834                    let subscription = Subscription {
835                        id: subscription_id,
836                        client_tx: cmd_client_tx.clone(),
837                    };
838
839                    debug!("Subscription created with ID: {}", subscription_id);
840
841                    // Register the subscription in the global map
842                    subscriptions
843                        .lock()
844                        .await
845                        .entry(id)
846                        .or_default()
847                        .insert(subscription);
848
849                    // Set up the event channel for the subscription
850                    let (event_tx, mut event_rx) = mpsc::channel(32);
851                    runtime_tx
852                        .send(TheaterCommand::SubscribeToActor {
853                            actor_id: id,
854                            event_tx,
855                        })
856                        .await
857                        .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
858
859                    // Add to the list of subscriptions for this connection
860                    connection_subscriptions.push((id, subscription_id));
861
862                    // Create a task to forward events to this client
863                    let client_tx_clone = cmd_client_tx.clone();
864                    tokio::spawn(async move {
865                        debug!(
866                            "Starting event forwarder for subscription {}",
867                            subscription_id
868                        );
869                        while let Some(event) = event_rx.recv().await {
870                            debug!("Received event for subscription {}", subscription_id);
871                            let response = match event {
872                                Ok(event) => ManagementResponse::ActorEvent { event },
873                                Err(_e) => {
874                                    // Actor errors are already captured in the event chain
875                                    // No need to send duplicate error responses
876                                    debug!("Actor error received, but already in event chain");
877                                    continue;
878                                }
879                            };
880                            if let Err(e) = client_tx_clone.send(response).await {
881                                debug!("Failed to forward event to client: {}", e);
882                                break;
883                            }
884                        }
885                        debug!(
886                            "Event forwarder for subscription {} stopped",
887                            subscription_id
888                        );
889                    });
890
891                    ManagementResponse::Subscribed {
892                        id,
893                        subscription_id,
894                    }
895                }
896                ManagementCommand::UnsubscribeFromActor {
897                    id,
898                    subscription_id,
899                } => {
900                    debug!(
901                        "Removing subscription {} for actor {:?}",
902                        subscription_id, id
903                    );
904
905                    // Remove subscription from the tracking list for this connection
906                    connection_subscriptions
907                        .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
908
909                    // Remove from the global subscriptions map
910                    let mut subs = subscriptions.lock().await;
911                    if let Some(actor_subs) = subs.get_mut(&id) {
912                        actor_subs.retain(|sub| sub.id != subscription_id);
913
914                        // Remove the entry if no subscriptions remain
915                        if actor_subs.is_empty() {
916                            subs.remove(&id);
917                        }
918                    }
919
920                    debug!("Subscription removed");
921                    ManagementResponse::Unsubscribed { id }
922                }
923                ManagementCommand::SendActorMessage { id, data } => {
924                    info!("Sending message to actor: {:?}", id);
925
926                    // Create response channel for routing
927                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
928
929                    // Create ActorMessage
930                    let message = ActorMessage::Send(ActorSend { data });
931
932                    // Route via MessageRouter
933                    match message_router
934                        .route_message(theater::messages::MessageCommand::SendMessage {
935                            target_id: id,
936                            message,
937                            response_tx,
938                        })
939                        .await
940                    {
941                        Ok(_) => {
942                            // Wait for routing result
943                            match response_rx.await {
944                                Ok(Ok(())) => {
945                                    info!("Message sent successfully to actor: {:?}", id);
946                                    ManagementResponse::SentMessage { id }
947                                }
948                                Ok(Err(e)) => {
949                                    error!("Failed to send message to actor: {}", e);
950                                    ManagementResponse::Error {
951                                        error: ManagementError::RuntimeError(format!(
952                                            "Failed to send: {}",
953                                            e
954                                        )),
955                                    }
956                                }
957                                Err(e) => {
958                                    error!("Failed to receive routing response: {}", e);
959                                    ManagementResponse::Error {
960                                        error: ManagementError::CommunicationError(format!(
961                                            "Failed to receive routing response: {}",
962                                            e
963                                        )),
964                                    }
965                                }
966                            }
967                        }
968                        Err(e) => {
969                            error!("Failed to route message: {}", e);
970                            ManagementResponse::Error {
971                                error: ManagementError::RuntimeError(format!(
972                                    "Failed to route message: {}",
973                                    e
974                                )),
975                            }
976                        }
977                    }
978                }
979                ManagementCommand::RequestActorMessage { id, data } => {
980                    info!("Requesting message from actor: {:?}", id);
981
982                    // Create channels for request-response pattern
983                    let (route_tx, route_rx) = tokio::sync::oneshot::channel();
984                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
985
986                    // Create ActorMessage with response channel embedded
987                    let message = ActorMessage::Request(ActorRequest { data, response_tx });
988
989                    // Route via MessageRouter
990                    match message_router
991                        .route_message(theater::messages::MessageCommand::SendMessage {
992                            target_id: id,
993                            message,
994                            response_tx: route_tx,
995                        })
996                        .await
997                    {
998                        Ok(_) => {
999                            // Wait for routing to complete
1000                            match route_rx.await {
1001                                Ok(Ok(())) => {
1002                                    // Routing succeeded, now wait for actor's response
1003                                    match response_rx.await {
1004                                        Ok(response_data) => {
1005                                            info!("Received response from actor: {:?}", id);
1006                                            ManagementResponse::RequestedMessage {
1007                                                id,
1008                                                message: response_data,
1009                                            }
1010                                        }
1011                                        Err(e) => {
1012                                            error!("Actor didn't respond: {}", e);
1013                                            ManagementResponse::Error {
1014                                                error: ManagementError::RuntimeError(format!(
1015                                                    "Actor didn't respond: {}",
1016                                                    e
1017                                                )),
1018                                            }
1019                                        }
1020                                    }
1021                                }
1022                                Ok(Err(e)) => {
1023                                    error!("Failed to route request to actor: {}", e);
1024                                    ManagementResponse::Error {
1025                                        error: ManagementError::RuntimeError(format!(
1026                                            "Failed to route: {}",
1027                                            e
1028                                        )),
1029                                    }
1030                                }
1031                                Err(e) => {
1032                                    error!("Failed to receive routing response: {}", e);
1033                                    ManagementResponse::Error {
1034                                        error: ManagementError::CommunicationError(format!(
1035                                            "Failed to receive routing response: {}",
1036                                            e
1037                                        )),
1038                                    }
1039                                }
1040                            }
1041                        }
1042                        Err(e) => {
1043                            error!("Failed to route request: {}", e);
1044                            ManagementResponse::Error {
1045                                error: ManagementError::RuntimeError(format!(
1046                                    "Failed to route request: {}",
1047                                    e
1048                                )),
1049                            }
1050                        }
1051                    }
1052                }
1053                ManagementCommand::GetActorManifest { id } => {
1054                    info!("Getting manifest for actor: {:?}", id);
1055                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1056                    runtime_tx
1057                        .send(TheaterCommand::GetActorManifest {
1058                            actor_id: id,
1059                            response_tx: cmd_tx,
1060                        })
1061                        .await?;
1062
1063                    let manifest = cmd_rx.await?;
1064                    ManagementResponse::ActorManifest {
1065                        id,
1066                        manifest: manifest?,
1067                    }
1068                }
1069                ManagementCommand::GetActorStatus { id } => {
1070                    info!("Getting status for actor: {:?}", id);
1071                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1072                    runtime_tx
1073                        .send(TheaterCommand::GetActorStatus {
1074                            actor_id: id,
1075                            response_tx: cmd_tx,
1076                        })
1077                        .await?;
1078
1079                    let status = cmd_rx.await?;
1080                    ManagementResponse::ActorStatus {
1081                        id,
1082                        status: status?,
1083                    }
1084                }
1085                ManagementCommand::RestartActor { id } => {
1086                    info!("Restarting actor: {:?}", id);
1087                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1088                    runtime_tx
1089                        .send(TheaterCommand::RestartActor {
1090                            actor_id: id,
1091                            response_tx: cmd_tx,
1092                        })
1093                        .await?;
1094
1095                    match cmd_rx.await? {
1096                        Ok(_) => ManagementResponse::Restarted { id },
1097                        Err(e) => ManagementResponse::Error {
1098                            error: ManagementError::RuntimeError(format!(
1099                                "Failed to restart actor: {}",
1100                                e
1101                            )),
1102                        },
1103                    }
1104                }
1105                ManagementCommand::GetActorState { id } => {
1106                    info!("Getting state for actor: {:?}", id);
1107                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1108                    runtime_tx
1109                        .send(TheaterCommand::GetActorState {
1110                            actor_id: id,
1111                            response_tx: cmd_tx,
1112                        })
1113                        .await?;
1114
1115                    let state = cmd_rx.await?;
1116                    ManagementResponse::ActorState { id, state: state? }
1117                }
1118                ManagementCommand::GetActorEvents { id } => {
1119                    info!("Getting events for actor: {:?}", id);
1120                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1121                    runtime_tx
1122                        .send(TheaterCommand::GetActorEvents {
1123                            actor_id: id,
1124                            response_tx: cmd_tx,
1125                        })
1126                        .await?;
1127
1128                    match cmd_rx.await {
1129                        Ok(result) => match result {
1130                            Ok(events) => {
1131                                debug!(
1132                                    "Successfully retrieved {} events for actor {}",
1133                                    events.len(),
1134                                    id
1135                                );
1136                                ManagementResponse::ActorEvents { id, events }
1137                            }
1138                            Err(e) => {
1139                                debug!("Error getting events for actor {}: {}", id, e);
1140                                ManagementResponse::Error { error: e.into() }
1141                            }
1142                        },
1143                        Err(e) => {
1144                            error!("Failed to receive events response: {}", e);
1145                            ManagementResponse::Error {
1146                                error: ManagementError::CommunicationError(format!(
1147                                    "Failed to receive events response: {}",
1148                                    e
1149                                )),
1150                            }
1151                        }
1152                    }
1153                }
1154                ManagementCommand::GetActorMetrics { id } => {
1155                    info!("Getting metrics for actor: {:?}", id);
1156                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1157                    runtime_tx
1158                        .send(TheaterCommand::GetActorMetrics {
1159                            actor_id: id,
1160                            response_tx: cmd_tx,
1161                        })
1162                        .await?;
1163
1164                    let metrics = cmd_rx.await?;
1165                    ManagementResponse::ActorMetrics {
1166                        id,
1167                        metrics: serde_json::to_value(metrics?)?,
1168                    }
1169                }
1170                ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1171                    // TODO: Re-implement actor package updates
1172                    ManagementResponse::Error {
1173                        error: ManagementError::RuntimeError(
1174                            "UpdateActorPackage not yet implemented".to_string(),
1175                        ),
1176                    }
1177                }
1178                // Handle channel management commands
1179                ManagementCommand::OpenChannel {
1180                    actor_id,
1181                    initial_message,
1182                } => {
1183                    info!("Opening channel to actor: {:?}", actor_id);
1184
1185                    // Create a response channel
1186                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1187
1188                    // Generate a channel ID
1189                    let client_id = ChannelParticipant::External;
1190                    let channel_id = ChannelId::new(&client_id, &actor_id);
1191                    let channel_id_str = channel_id.0.clone();
1192
1193                    // Send the channel open command via MessageRouter
1194                    message_router
1195                        .route_message(theater::messages::MessageCommand::OpenChannel {
1196                            initiator_id: client_id.clone(),
1197                            target_id: actor_id.clone(),
1198                            channel_id: channel_id.clone(),
1199                            initial_message,
1200                            response_tx,
1201                        })
1202                        .await
1203                        .map_err(|e| {
1204                            anyhow::anyhow!("Failed to send channel open command: {}", e)
1205                        })?;
1206
1207                    // Wait for the response
1208                    match response_rx.await {
1209                        Ok(result) => {
1210                            match result {
1211                                Ok(accepted) => {
1212                                    if accepted {
1213                                        // Channel opened successfully
1214                                        info!("Channel opened successfully: {}", channel_id_str);
1215
1216                                        // Register the channel subscription to receive messages
1217                                        let channel_sub = ChannelSubscription {
1218                                            channel_id: channel_id_str.clone(),
1219                                            initiator_id: client_id.clone(),
1220                                            target_id: actor_id.clone(),
1221                                            client_tx: cmd_client_tx.clone(),
1222                                        };
1223
1224                                        channel_subscriptions
1225                                            .lock()
1226                                            .await
1227                                            .insert(channel_id_str.clone(), channel_sub);
1228
1229                                        // Track this channel for cleanup on disconnect
1230                                        connection_channel_subscriptions
1231                                            .push(channel_id_str.clone());
1232
1233                                        ManagementResponse::ChannelOpened {
1234                                            channel_id: channel_id_str,
1235                                            actor_id,
1236                                        }
1237                                    } else {
1238                                        // Channel rejected by target
1239                                        ManagementResponse::Error {
1240                                            error: ManagementError::ChannelRejected,
1241                                        }
1242                                    }
1243                                }
1244                                Err(e) => ManagementResponse::Error {
1245                                    error: ManagementError::RuntimeError(format!(
1246                                        "Error opening channel: {}",
1247                                        e
1248                                    )),
1249                                },
1250                            }
1251                        }
1252                        Err(e) => ManagementResponse::Error {
1253                            error: ManagementError::CommunicationError(format!(
1254                                "Failed to receive channel open response: {}",
1255                                e
1256                            )),
1257                        },
1258                    }
1259                }
1260                ManagementCommand::SendOnChannel {
1261                    channel_id,
1262                    message,
1263                } => {
1264                    info!("Sending message on channel: {}", channel_id);
1265
1266                    // Create response channel
1267                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1268
1269                    // Parse the channel ID
1270                    let channel_id_parsed = ChannelId(channel_id.clone());
1271
1272                    // Send the message on the channel via MessageRouter
1273                    let sender_id = ChannelParticipant::External;
1274                    match message_router
1275                        .route_message(theater::messages::MessageCommand::ChannelMessage {
1276                            channel_id: channel_id_parsed,
1277                            sender_id,
1278                            message,
1279                            response_tx,
1280                        })
1281                        .await
1282                    {
1283                        Ok(_) => {
1284                            // Wait for routing result
1285                            match response_rx.await {
1286                                Ok(Ok(())) => {
1287                                    info!("Message sent successfully on channel: {}", channel_id);
1288                                    ManagementResponse::MessageSent { channel_id }
1289                                }
1290                                Ok(Err(e)) => {
1291                                    error!("Failed to send on channel: {}", e);
1292                                    ManagementResponse::Error {
1293                                        error: ManagementError::RuntimeError(format!(
1294                                            "Failed to send on channel: {}",
1295                                            e
1296                                        )),
1297                                    }
1298                                }
1299                                Err(e) => {
1300                                    error!("Failed to receive channel send response: {}", e);
1301                                    ManagementResponse::Error {
1302                                        error: ManagementError::CommunicationError(format!(
1303                                            "Failed to receive channel send response: {}",
1304                                            e
1305                                        )),
1306                                    }
1307                                }
1308                            }
1309                        }
1310                        Err(e) => {
1311                            error!("Failed to route channel message: {}", e);
1312                            ManagementResponse::Error {
1313                                error: ManagementError::RuntimeError(format!(
1314                                    "Failed to route channel message: {}",
1315                                    e
1316                                )),
1317                            }
1318                        }
1319                    }
1320                }
1321                ManagementCommand::CloseChannel { channel_id } => {
1322                    info!("Closing channel: {}", channel_id);
1323
1324                    // Create response channel
1325                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1326
1327                    // Parse the channel ID
1328                    let channel_id_parsed = ChannelId(channel_id.clone());
1329
1330                    // Close the channel via MessageRouter
1331                    let sender_id = ChannelParticipant::External;
1332                    match message_router
1333                        .route_message(theater::messages::MessageCommand::ChannelClose {
1334                            channel_id: channel_id_parsed,
1335                            sender_id,
1336                            response_tx,
1337                        })
1338                        .await
1339                    {
1340                        Ok(_) => {
1341                            // Wait for routing result
1342                            match response_rx.await {
1343                                Ok(Ok(())) => {
1344                                    info!("Channel closed successfully: {}", channel_id);
1345
1346                                    // Remove from channel subscriptions
1347                                    channel_subscriptions.lock().await.remove(&channel_id);
1348                                    connection_channel_subscriptions.retain(|id| id != &channel_id);
1349
1350                                    ManagementResponse::ChannelClosed { channel_id }
1351                                }
1352                                Ok(Err(e)) => {
1353                                    error!("Failed to close channel: {}", e);
1354                                    ManagementResponse::Error {
1355                                        error: ManagementError::RuntimeError(format!(
1356                                            "Failed to close channel: {}",
1357                                            e
1358                                        )),
1359                                    }
1360                                }
1361                                Err(e) => {
1362                                    error!("Failed to receive channel close response: {}", e);
1363                                    ManagementResponse::Error {
1364                                        error: ManagementError::CommunicationError(format!(
1365                                            "Failed to receive channel close response: {}",
1366                                            e
1367                                        )),
1368                                    }
1369                                }
1370                            }
1371                        }
1372                        Err(e) => {
1373                            error!("Failed to route channel close: {}", e);
1374                            ManagementResponse::Error {
1375                                error: ManagementError::RuntimeError(format!(
1376                                    "Failed to route channel close: {}",
1377                                    e
1378                                )),
1379                            }
1380                        }
1381                    }
1382                }
1383                ManagementCommand::NewStore {} => {
1384                    info!("Creating new store");
1385                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1386                    runtime_tx
1387                        .send(TheaterCommand::NewStore {
1388                            response_tx: cmd_tx,
1389                        })
1390                        .await?;
1391
1392                    let store_id = cmd_rx.await?;
1393                    ManagementResponse::StoreCreated {
1394                        store_id: store_id?.id,
1395                    }
1396                }
1397            };
1398
1399            debug!("Sending response: {:?}", response);
1400            if let Err(e) = client_tx.send(response).await {
1401                error!("Failed to send response: {}", e);
1402                break;
1403            }
1404            debug!("Response sent");
1405        }
1406
1407        // Clean up all subscriptions for this connection
1408        debug!(
1409            "Connection closed, cleaning up {} subscriptions",
1410            connection_subscriptions.len()
1411        );
1412        let mut subs = subscriptions.lock().await;
1413
1414        for (actor_id, sub_id) in connection_subscriptions {
1415            if let Some(actor_subs) = subs.get_mut(&actor_id) {
1416                actor_subs.retain(|sub| sub.id != sub_id);
1417
1418                // Remove the entry if no subscriptions remain
1419                if actor_subs.is_empty() {
1420                    subs.remove(&actor_id);
1421                }
1422            }
1423        }
1424
1425        // Clean up channel subscriptions
1426        debug!(
1427            "Connection closed, cleaning up {} channel subscriptions",
1428            connection_channel_subscriptions.len()
1429        );
1430        let mut channel_subs = channel_subscriptions.lock().await;
1431
1432        for channel_id in connection_channel_subscriptions {
1433            channel_subs.remove(&channel_id);
1434        }
1435
1436        debug!("Cleaned up all subscriptions for the connection");
1437        Ok(())
1438    }
1439}