Skip to main content

theater_server/
server.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9    ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10    ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21    RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{default_init_state, ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::{resolve_reference, ResourceCache};
28use theater::TheaterRuntimeError;
29
30// Import Theater-specific handlers only
31// DEPRECATED: WASI handlers (environment, filesystem, http, io, etc.) moved to crates/deprecated/
32use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42    StartActor {
43        manifest: String,
44        initial_state: Option<Vec<u8>>,
45        parent: bool,
46        subscribe: bool,
47    },
48    StopActor {
49        id: TheaterId,
50    },
51    TerminateActor {
52        id: TheaterId,
53    },
54    ListActors,
55    SubscribeToActor {
56        id: TheaterId,
57    },
58    UnsubscribeFromActor {
59        id: TheaterId,
60        subscription_id: Uuid,
61    },
62    SendActorMessage {
63        id: TheaterId,
64        data: Vec<u8>,
65    },
66    RequestActorMessage {
67        id: TheaterId,
68        data: Vec<u8>,
69    },
70    GetActorManifest {
71        id: TheaterId,
72    },
73    GetActorStatus {
74        id: TheaterId,
75    },
76    RestartActor {
77        id: TheaterId,
78    },
79    GetActorState {
80        id: TheaterId,
81    },
82    GetActorMetrics {
83        id: TheaterId,
84    },
85    UpdateActorPackage {
86        id: TheaterId,
87        package: String,
88    },
89    // Channel management commands
90    OpenChannel {
91        actor_id: ChannelParticipant,
92        initial_message: Vec<u8>,
93    },
94    SendOnChannel {
95        channel_id: String,
96        message: Vec<u8>,
97    },
98    CloseChannel {
99        channel_id: String,
100    },
101
102    // Store commands
103    NewStore {},
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[allow(clippy::large_enum_variant)]
108pub enum ManagementResponse {
109    ActorStarted {
110        id: TheaterId,
111    },
112    ActorStopped {
113        id: TheaterId,
114    },
115    ActorList {
116        actors: Vec<(TheaterId, String)>,
117    },
118    Subscribed {
119        id: TheaterId,
120        subscription_id: Uuid,
121    },
122    Unsubscribed {
123        id: TheaterId,
124    },
125    ActorEvent {
126        event: ChainEvent,
127    },
128    ActorResult(ActorResult),
129    Error {
130        error: ManagementError,
131    },
132    RequestedMessage {
133        id: TheaterId,
134        message: Vec<u8>,
135    },
136    SentMessage {
137        id: TheaterId,
138    },
139    ActorStatus {
140        id: TheaterId,
141        status: ActorStatus,
142    },
143    Restarted {
144        id: TheaterId,
145    },
146    ActorManifest {
147        id: TheaterId,
148        manifest: ManifestConfig,
149    },
150    ActorState {
151        id: TheaterId,
152        state: Value,
153    },
154    ActorMetrics {
155        id: TheaterId,
156        metrics: serde_json::Value,
157    },
158    ActorPackageUpdated {
159        id: TheaterId,
160    },
161    // Channel management responses
162    ChannelOpened {
163        channel_id: String,
164        actor_id: ChannelParticipant,
165    },
166    MessageSent {
167        channel_id: String,
168    },
169    ChannelMessage {
170        channel_id: String,
171        sender_id: ChannelParticipant,
172        message: Vec<u8>,
173    },
174    ChannelClosed {
175        channel_id: String,
176    },
177
178    // Store responses
179    StoreCreated {
180        store_id: String,
181    },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub enum ManagementError {
186    // Actor-related errors
187    ActorNotFound,
188    ActorAlreadyExists,
189    ActorNotRunning,
190    ActorError(String),
191
192    // Channel-related errors
193    ChannelNotFound,
194    ChannelClosed,
195    ChannelRejected,
196
197    // Store-related errors
198    StoreError(String),
199
200    // Communication errors
201    CommunicationError(String),
202
203    // Request handling errors
204    InvalidRequest(String),
205    Timeout,
206
207    // System errors
208    RuntimeError(String),
209    InternalError(String),
210
211    // Serialization/deserialization errors
212    SerializationError(String),
213
214    // Actor initialization errors
215    ActorInitializationError(String),
216}
217
218// Allow converting from TheaterRuntimeError to ManagementError
219impl From<TheaterRuntimeError> for ManagementError {
220    fn from(err: TheaterRuntimeError) -> Self {
221        match err {
222            TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
223            TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
224            TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
225            TheaterRuntimeError::ActorOperationFailed(msg) => {
226                ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
227            }
228            TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
229            TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
230            TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
231            TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
232            TheaterRuntimeError::SerializationError(msg) => {
233                ManagementError::SerializationError(msg)
234            }
235            TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
236            TheaterRuntimeError::ActorInitializationError(msg) => {
237                ManagementError::ActorInitializationError(msg)
238            }
239        }
240    }
241}
242
243// Implement Display for ManagementError to provide better error messages
244impl std::fmt::Display for ManagementError {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        match self {
247            ManagementError::ActorNotFound => write!(f, "Actor not found"),
248            ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
249            ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
250            ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
251            ManagementError::ChannelNotFound => write!(f, "Channel not found"),
252            ManagementError::ChannelClosed => write!(f, "Channel is closed"),
253            ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
254            ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
255            ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
256            ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
257            ManagementError::Timeout => write!(f, "Operation timed out"),
258            ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
259            ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
260            ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
261            ManagementError::ActorInitializationError(msg) => {
262                write!(f, "Actor initialization error: {}", msg)
263            }
264        }
265    }
266}
267
268// Implement Error trait for ManagementError
269impl std::error::Error for ManagementError {}
270
271#[derive(Debug)]
272#[allow(dead_code)]
273struct Subscription {
274    id: Uuid,
275    client_tx: mpsc::Sender<ManagementResponse>,
276}
277
278impl Eq for Subscription {}
279impl PartialEq for Subscription {
280    fn eq(&self, other: &Self) -> bool {
281        self.id == other.id
282    }
283}
284impl std::hash::Hash for Subscription {
285    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
286        self.id.hash(state);
287    }
288}
289
290// ChannelEvent is now imported from theater::ChannelEvent
291
292// Structure to track active channel subscriptions
293#[derive(Debug)]
294#[allow(dead_code)]
295struct ChannelSubscription {
296    channel_id: String,
297    initiator_id: ChannelParticipant,
298    target_id: ChannelParticipant,
299    client_tx: mpsc::Sender<ManagementResponse>,
300}
301
302/// Creates a HandlerRegistry with Theater-specific handlers.
303///
304/// NOTE: WASI handlers (environment, filesystem, http, io, sockets, timing, random, process)
305/// have been deprecated and moved to crates/deprecated/. They will be redesigned for
306/// Composite runtime support later.
307///
308/// Returns both the HandlerRegistry and the MessageRouter, allowing the server
309/// to use the MessageRouter for external client messaging.
310fn create_root_handler_registry(
311    theater_tx: mpsc::Sender<TheaterCommand>,
312    resource_cache: Arc<ResourceCache>,
313) -> (
314    HandlerRegistry,
315    theater_handler_message_server::MessageRouter,
316) {
317    let mut registry = HandlerRegistry::new();
318
319    info!("Initializing Theater server with Theater-specific handlers...");
320
321    // Runtime handler - provides actor runtime information and control
322    let runtime_config = RuntimeHostConfig {};
323    registry.register(RuntimeHandler::new(
324        runtime_config,
325        theater_tx.clone(),
326        None,
327    ));
328
329    // Store handler - provides key-value storage for actors
330    let store_config = StoreHandlerConfig::default();
331    registry.register(StoreHandler::new(store_config, None));
332
333    // Supervisor handler - allows actors to spawn and manage child actors.
334    // The resource cache is shared across every supervisor-capable actor;
335    // children whose manifests opt in via `static_package = true` skip
336    // the wasm-bytes fetch on repeat spawns.
337    let supervisor_config = SupervisorHostConfig {};
338    registry.register(
339        SupervisorHandler::new(supervisor_config, None).with_resource_cache(resource_cache),
340    );
341
342    // Message server handler - provides inter-actor messaging
343    let message_router = theater_handler_message_server::MessageRouter::new();
344    registry.register(MessageServerHandler::new(None, message_router.clone()));
345
346    // TCP handler - provides raw TCP networking for actors
347    let tcp_config = TcpHandlerConfig {
348        listen: None,
349        max_connections: None,
350        ..Default::default()
351    };
352    registry.register(TcpHandler::new(tcp_config));
353
354    info!("✓ 5 Theater-specific handlers registered");
355    info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
356
357    (registry, message_router)
358}
359
360pub struct TheaterServer {
361    runtime: TheaterRuntime,
362    theater_tx: mpsc::Sender<TheaterCommand>,
363    management_socket: TcpListener,
364    subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
365    // Field to track channel subscriptions
366    channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
367    // Channel for runtime to send channel events back to server
368    #[allow(dead_code)]
369    channel_events_tx: mpsc::Sender<ChannelEvent>,
370    // MessageRouter for external client messaging
371    message_router: theater_handler_message_server::MessageRouter,
372}
373
374impl TheaterServer {
375    // Process channel events and forward them to subscribed clients
376    async fn process_channel_events(
377        mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
378        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
379    ) {
380        while let Some(event) = channel_events_rx.recv().await {
381            match event {
382                ChannelEvent::Message {
383                    channel_id,
384                    sender_id,
385                    message,
386                } => {
387                    tracing::debug!("Received channel message for {}", channel_id);
388                    // Forward to subscribed clients
389                    let subs = channel_subscriptions.lock().await;
390                    if let Some(sub) = subs.get(&channel_id.0) {
391                        let response = ManagementResponse::ChannelMessage {
392                            channel_id: channel_id.0.clone(),
393                            sender_id,
394                            message,
395                        };
396
397                        tracing::debug!("Forwarding channel message to client: {:?}", response);
398
399                        if let Err(e) = sub.client_tx.send(response).await {
400                            tracing::warn!("Failed to forward channel message: {}", e);
401                        } else {
402                            tracing::debug!("Forwarded channel message to client");
403                        }
404                    }
405                }
406                ChannelEvent::Close { channel_id } => {
407                    tracing::debug!("Received channel close event for {}", channel_id);
408                    // Forward to subscribed clients
409                    let mut subs = channel_subscriptions.lock().await;
410                    if let Some(sub) = subs.remove(&channel_id.0) {
411                        let response = ManagementResponse::ChannelClosed {
412                            channel_id: channel_id.0.clone(),
413                        };
414
415                        if let Err(e) = sub.client_tx.send(response).await {
416                            tracing::warn!("Failed to forward channel close event: {}", e);
417                        } else {
418                            tracing::debug!("Forwarded channel close event to client");
419                        }
420                    }
421                }
422            }
423        }
424    }
425
426    pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
427        let (theater_tx, theater_rx) = mpsc::channel(32);
428
429        // Create channel for runtime to send channel events back to server
430        let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
431
432        // Shared URL→bytes cache; one per theater process. Wired into
433        // the supervisor host fn only — children spawned via
434        // `theater:simple/supervisor.spawn` whose manifest sets
435        // `static_package = true` pay the wasm fetch once per process.
436        // The runtime's other entry points (the top-level `StartActor`
437        // management command below, and `TheaterCommand::ResumeActor`
438        // in `theater_runtime.rs`) still call `resolve_reference`
439        // directly and do not consult this cache. Threading it onto
440        // `TheaterRuntime` is the obvious next layer; the motivating
441        // consumer (frontdoor per-conn-child) goes through the
442        // supervisor path, so it is not in scope here.
443        let resource_cache = Arc::new(ResourceCache::new());
444
445        // Create handler registry with all migrated handlers (root permissions)
446        // Also get the MessageRouter for external client messaging
447        let (handler_registry, message_router) =
448            create_root_handler_registry(theater_tx.clone(), resource_cache.clone());
449
450        // Create the runtime with the handler registry
451        let runtime = TheaterRuntime::new(
452            theater_tx.clone(),
453            theater_rx,
454            Some(channel_events_tx.clone()),
455            handler_registry,
456        )
457        .await?;
458        let management_socket = TcpListener::bind(address).await?;
459
460        let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
461
462        // Start task to process channel events
463        let channel_subs_clone = channel_subscriptions.clone();
464        tokio::spawn(async move {
465            Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
466        });
467
468        Ok(Self {
469            runtime,
470            theater_tx,
471            management_socket,
472            subscriptions: Arc::new(Mutex::new(HashMap::new())),
473            channel_subscriptions,
474            channel_events_tx,
475            message_router,
476        })
477    }
478
479    pub async fn run(mut self) -> Result<()> {
480        info!(
481            "Theater server starting on {:?}",
482            self.management_socket.local_addr()?
483        );
484
485        // Start the theater runtime in its own task
486        let runtime_handle = tokio::spawn(async move {
487            match self.runtime.run().await {
488                Ok(_) => Ok(()),
489                Err(e) => {
490                    error!("Theater runtime failed: {}", e);
491                    Err(e)
492                }
493            }
494        });
495
496        // Accept and handle management connections
497        while let Ok((socket, addr)) = self.management_socket.accept().await {
498            info!("New management connection from {}", addr);
499            let runtime_tx = self.theater_tx.clone();
500            let subscriptions = self.subscriptions.clone();
501            let channel_subscriptions = self.channel_subscriptions.clone();
502            let message_router = self.message_router.clone();
503
504            tokio::spawn(async move {
505                if let Err(e) = Self::handle_management_connection(
506                    socket,
507                    runtime_tx,
508                    subscriptions,
509                    channel_subscriptions,
510                    message_router,
511                )
512                .await
513                {
514                    error!("Error handling management connection: {}", e);
515                }
516            });
517        }
518
519        runtime_handle.await??;
520        Ok(())
521    }
522
523    async fn handle_management_connection(
524        socket: TcpStream,
525        runtime_tx: mpsc::Sender<TheaterCommand>,
526        subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
527        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
528        message_router: theater_handler_message_server::MessageRouter,
529    ) -> Result<()> {
530        // Create a channel for sending responses to this client
531        let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
532
533        let codec = FragmentingCodec::new();
534        let framed = Framed::new(socket, codec);
535
536        // Split the framed connection into read and write parts
537        let (mut framed_sink, mut framed_stream) = framed.split();
538
539        // Clone the client_tx for use in the command loop
540        let cmd_client_tx = client_tx.clone();
541
542        // Start a task to forward responses to the client
543        let _response_task = tokio::spawn(async move {
544            while let Some(response) = client_rx.recv().await {
545                match serde_json::to_vec(&response) {
546                    Ok(data) => {
547                        debug!("Serialized response: {} bytes", data.len());
548                        if data.len() > 10 * 1024 * 1024 {
549                            debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
550                        }
551                        if let Err(e) = framed_sink.send(Bytes::from(data)).await {
552                            debug!("Error sending response to client: {}", e);
553                            break;
554                        }
555                    }
556                    Err(e) => {
557                        error!("Error serializing response: {}", e);
558                    }
559                }
560            }
561            debug!("Response forwarder for client closed");
562        });
563
564        // Store active subscriptions for this connection to clean up on disconnect
565        let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
566
567        // Store active channel subscriptions for cleanup
568        let mut connection_channel_subscriptions: Vec<String> = Vec::new();
569
570        // Loop until connection closes or an error occurs
571        'connection: while let Some(msg) = framed_stream.next().await {
572            debug!("Received management message");
573            let msg = match msg {
574                Ok(m) => m,
575                Err(e) => {
576                    error!("Error receiving message: {}", e);
577                    break 'connection;
578                }
579            };
580
581            let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
582                Ok(c) => c,
583                Err(e) => {
584                    error!(
585                        "Error parsing command: {} {}",
586                        e,
587                        String::from_utf8_lossy(&msg)
588                    );
589                    continue;
590                }
591            };
592            debug!("Parsed command: {:?}", cmd);
593
594            // Store the command for reference (used for subscription tracking)
595            let _cmd_clone = cmd.clone();
596
597            let response = match cmd {
598                ManagementCommand::StartActor {
599                    manifest,
600                    initial_state: _initial_state,
601                    parent,
602                    subscribe,
603                } => {
604                    info!("Starting actor from manifest: {:?}", manifest);
605
606                    // Load and parse manifest
607                    let manifest_str = match resolve_reference(&manifest).await {
608                        Ok(bytes) => match String::from_utf8(bytes) {
609                            Ok(s) => s,
610                            Err(e) => {
611                                error!("Invalid manifest encoding: {}", e);
612                                cmd_client_tx
613                                    .send(ManagementResponse::Error {
614                                        error: ManagementError::ActorInitializationError(format!(
615                                            "Invalid manifest encoding: {}",
616                                            e
617                                        )),
618                                    })
619                                    .await
620                                    .ok();
621                                continue;
622                            }
623                        },
624                        Err(e) => {
625                            error!("Failed to load manifest: {}", e);
626                            cmd_client_tx
627                                .send(ManagementResponse::Error {
628                                    error: ManagementError::ActorInitializationError(format!(
629                                        "Failed to load manifest: {}",
630                                        e
631                                    )),
632                                })
633                                .await
634                                .ok();
635                            continue;
636                        }
637                    };
638
639                    let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
640                        Ok(m) => m,
641                        Err(e) => {
642                            error!("Failed to parse manifest: {}", e);
643                            cmd_client_tx
644                                .send(ManagementResponse::Error {
645                                    error: ManagementError::ActorInitializationError(format!(
646                                        "Failed to parse manifest: {}",
647                                        e
648                                    )),
649                                })
650                                .await
651                                .ok();
652                            continue;
653                        }
654                    };
655
656                    // Load wasm bytes
657                    let wasm_bytes = match resolve_reference(&manifest_config.package).await {
658                        Ok(bytes) => bytes,
659                        Err(e) => {
660                            error!("Failed to load WASM: {}", e);
661                            cmd_client_tx
662                                .send(ManagementResponse::Error {
663                                    error: ManagementError::ActorInitializationError(format!(
664                                        "Failed to load WASM: {}",
665                                        e
666                                    )),
667                                })
668                                .await
669                                .ok();
670                            continue;
671                        }
672                    };
673
674                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
675                    debug!("Sending SpawnActor command to runtime");
676                    let supervisor_tx = if parent {
677                        let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
678                        let cmd_client_tx = cmd_client_tx.clone();
679                        tokio::spawn(async move {
680                            while let Some(res) = supervisor_rx.recv().await {
681                                debug!("Received supervisor response: {:?}", res);
682                                if let Err(e) = cmd_client_tx
683                                    .send(ManagementResponse::ActorResult(res))
684                                    .await
685                                {
686                                    error!("Failed to send supervisor response: {}", e);
687                                    break;
688                                }
689                            }
690                        });
691                        Some(supervisor_tx)
692                    } else {
693                        None
694                    };
695                    let subscription_tx = if subscribe {
696                        let (event_tx, mut event_rx) = mpsc::channel(32);
697
698                        // set up a task to forward events to the client
699                        let cmd_client_tx = cmd_client_tx.clone();
700                        tokio::spawn(async move {
701                            while let Some((_actor_id, event)) = event_rx.recv().await {
702                                debug!("Received event for subscription");
703                                let response = ManagementResponse::ActorEvent { event };
704                                if let Err(e) = cmd_client_tx.send(response).await {
705                                    debug!("Failed to forward event to client: {}", e);
706                                    break;
707                                }
708                            }
709                            debug!("Event forwarder for subscription stopped");
710                        });
711
712                        Some(event_tx)
713                    } else {
714                        None
715                    };
716                    match runtime_tx
717                        .send(TheaterCommand::SetupActor {
718                            wasm_bytes,
719                            name: Some(manifest_config.name.clone()),
720                            manifest: Some(manifest_config),
721                            init_state: default_init_state(),
722                            response_tx: cmd_tx,
723                            supervisor_tx,
724                            subscription_tx,
725                        })
726                        .await
727                    {
728                        Ok(_) => {
729                            debug!("SpawnActor command sent to runtime, awaiting response");
730                            match cmd_rx.await {
731                                Ok(result) => match result {
732                                    Ok(actor_id) => {
733                                        info!("Actor started with ID: {:?}", actor_id);
734                                        ManagementResponse::ActorStarted { id: actor_id }
735                                    }
736                                    Err(e) => {
737                                        error!("Runtime failed to start actor: {}", e);
738                                        ManagementResponse::Error {
739                                            error: ManagementError::RuntimeError(format!(
740                                                "Failed to start actor: {}",
741                                                e
742                                            )),
743                                        }
744                                    }
745                                },
746                                Err(e) => {
747                                    error!("Failed to receive spawn response: {}", e);
748                                    ManagementResponse::Error {
749                                        error: ManagementError::CommunicationError(format!(
750                                            "Failed to receive spawn response: {}",
751                                            e
752                                        )),
753                                    }
754                                }
755                            }
756                        }
757                        Err(e) => {
758                            error!("Failed to send SpawnActor command: {}", e);
759                            ManagementResponse::Error {
760                                error: ManagementError::CommunicationError(format!(
761                                    "Failed to send spawn command: {}",
762                                    e
763                                )),
764                            }
765                        }
766                    }
767                }
768                ManagementCommand::StopActor { id } => {
769                    info!("Stopping actor: {:?}", id);
770                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
771                    runtime_tx
772                        .send(TheaterCommand::StopActor {
773                            actor_id: id,
774                            response_tx: cmd_tx,
775                        })
776                        .await?;
777
778                    match cmd_rx.await? {
779                        Ok(_) => {
780                            subscriptions.lock().await.remove(&id);
781                            ManagementResponse::ActorStopped { id }
782                        }
783                        Err(e) => ManagementResponse::Error {
784                            error: ManagementError::RuntimeError(format!(
785                                "Failed to stop actor: {}",
786                                e
787                            )),
788                        },
789                    }
790                }
791                ManagementCommand::TerminateActor { id } => {
792                    info!("Terminating actor: {:?}", id);
793                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
794                    runtime_tx
795                        .send(TheaterCommand::TerminateActor {
796                            actor_id: id,
797                            response_tx: cmd_tx,
798                        })
799                        .await?;
800
801                    match cmd_rx.await? {
802                        Ok(_) => {
803                            subscriptions.lock().await.remove(&id);
804                            ManagementResponse::ActorStopped { id }
805                        }
806                        Err(e) => ManagementResponse::Error {
807                            error: ManagementError::RuntimeError(format!(
808                                "Failed to terminate actor: {}",
809                                e
810                            )),
811                        },
812                    }
813                }
814                ManagementCommand::ListActors => {
815                    debug!("Listing actors");
816                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
817                    runtime_tx
818                        .send(TheaterCommand::GetActors {
819                            response_tx: cmd_tx,
820                        })
821                        .await?;
822
823                    match cmd_rx.await? {
824                        Ok(actors) => {
825                            info!("Found {} actors", actors.len());
826                            ManagementResponse::ActorList { actors }
827                        }
828                        Err(e) => ManagementResponse::Error {
829                            error: ManagementError::RuntimeError(format!(
830                                "Failed to list actors: {}",
831                                e
832                            )),
833                        },
834                    }
835                }
836                ManagementCommand::SubscribeToActor { id } => {
837                    info!("New subscription request for actor: {:?}", id);
838                    let subscription_id = Uuid::new_v4();
839                    let subscription = Subscription {
840                        id: subscription_id,
841                        client_tx: cmd_client_tx.clone(),
842                    };
843
844                    debug!("Subscription created with ID: {}", subscription_id);
845
846                    // Register the subscription in the global map
847                    subscriptions
848                        .lock()
849                        .await
850                        .entry(id)
851                        .or_default()
852                        .insert(subscription);
853
854                    // Set up the event channel for the subscription
855                    let (event_tx, mut event_rx) = mpsc::channel(32);
856                    runtime_tx
857                        .send(TheaterCommand::SubscribeToActor {
858                            actor_id: id,
859                            event_tx,
860                        })
861                        .await
862                        .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
863
864                    // Add to the list of subscriptions for this connection
865                    connection_subscriptions.push((id, subscription_id));
866
867                    // Create a task to forward events to this client
868                    let client_tx_clone = cmd_client_tx.clone();
869                    tokio::spawn(async move {
870                        debug!(
871                            "Starting event forwarder for subscription {}",
872                            subscription_id
873                        );
874                        while let Some((_actor_id, event)) = event_rx.recv().await {
875                            debug!("Received event for subscription {}", subscription_id);
876                            let response = ManagementResponse::ActorEvent { event };
877                            if let Err(e) = client_tx_clone.send(response).await {
878                                debug!("Failed to forward event to client: {}", e);
879                                break;
880                            }
881                        }
882                        debug!(
883                            "Event forwarder for subscription {} stopped",
884                            subscription_id
885                        );
886                    });
887
888                    ManagementResponse::Subscribed {
889                        id,
890                        subscription_id,
891                    }
892                }
893                ManagementCommand::UnsubscribeFromActor {
894                    id,
895                    subscription_id,
896                } => {
897                    debug!(
898                        "Removing subscription {} for actor {:?}",
899                        subscription_id, id
900                    );
901
902                    // Remove subscription from the tracking list for this connection
903                    connection_subscriptions
904                        .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
905
906                    // Remove from the global subscriptions map
907                    let mut subs = subscriptions.lock().await;
908                    if let Some(actor_subs) = subs.get_mut(&id) {
909                        actor_subs.retain(|sub| sub.id != subscription_id);
910
911                        // Remove the entry if no subscriptions remain
912                        if actor_subs.is_empty() {
913                            subs.remove(&id);
914                        }
915                    }
916
917                    debug!("Subscription removed");
918                    ManagementResponse::Unsubscribed { id }
919                }
920                ManagementCommand::SendActorMessage { id, data } => {
921                    info!("Sending message to actor: {:?}", id);
922
923                    // Create response channel for routing
924                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
925
926                    // Create ActorMessage
927                    let message = ActorMessage::Send(ActorSend { data });
928
929                    // Route via MessageRouter
930                    match message_router
931                        .route_message(theater::messages::MessageCommand::SendMessage {
932                            target_id: id,
933                            message,
934                            response_tx,
935                        })
936                        .await
937                    {
938                        Ok(_) => {
939                            // Wait for routing result
940                            match response_rx.await {
941                                Ok(Ok(())) => {
942                                    info!("Message sent successfully to actor: {:?}", id);
943                                    ManagementResponse::SentMessage { id }
944                                }
945                                Ok(Err(e)) => {
946                                    error!("Failed to send message to actor: {}", e);
947                                    ManagementResponse::Error {
948                                        error: ManagementError::RuntimeError(format!(
949                                            "Failed to send: {}",
950                                            e
951                                        )),
952                                    }
953                                }
954                                Err(e) => {
955                                    error!("Failed to receive routing response: {}", e);
956                                    ManagementResponse::Error {
957                                        error: ManagementError::CommunicationError(format!(
958                                            "Failed to receive routing response: {}",
959                                            e
960                                        )),
961                                    }
962                                }
963                            }
964                        }
965                        Err(e) => {
966                            error!("Failed to route message: {}", e);
967                            ManagementResponse::Error {
968                                error: ManagementError::RuntimeError(format!(
969                                    "Failed to route message: {}",
970                                    e
971                                )),
972                            }
973                        }
974                    }
975                }
976                ManagementCommand::RequestActorMessage { id, data } => {
977                    info!("Requesting message from actor: {:?}", id);
978
979                    // Create channels for request-response pattern
980                    let (route_tx, route_rx) = tokio::sync::oneshot::channel();
981                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
982
983                    // Create ActorMessage with response channel embedded
984                    let message = ActorMessage::Request(ActorRequest { data, response_tx });
985
986                    // Route via MessageRouter
987                    match message_router
988                        .route_message(theater::messages::MessageCommand::SendMessage {
989                            target_id: id,
990                            message,
991                            response_tx: route_tx,
992                        })
993                        .await
994                    {
995                        Ok(_) => {
996                            // Wait for routing to complete
997                            match route_rx.await {
998                                Ok(Ok(())) => {
999                                    // Routing succeeded, now wait for actor's response
1000                                    match response_rx.await {
1001                                        Ok(response_data) => {
1002                                            info!("Received response from actor: {:?}", id);
1003                                            ManagementResponse::RequestedMessage {
1004                                                id,
1005                                                message: response_data,
1006                                            }
1007                                        }
1008                                        Err(e) => {
1009                                            error!("Actor didn't respond: {}", e);
1010                                            ManagementResponse::Error {
1011                                                error: ManagementError::RuntimeError(format!(
1012                                                    "Actor didn't respond: {}",
1013                                                    e
1014                                                )),
1015                                            }
1016                                        }
1017                                    }
1018                                }
1019                                Ok(Err(e)) => {
1020                                    error!("Failed to route request to actor: {}", e);
1021                                    ManagementResponse::Error {
1022                                        error: ManagementError::RuntimeError(format!(
1023                                            "Failed to route: {}",
1024                                            e
1025                                        )),
1026                                    }
1027                                }
1028                                Err(e) => {
1029                                    error!("Failed to receive routing response: {}", e);
1030                                    ManagementResponse::Error {
1031                                        error: ManagementError::CommunicationError(format!(
1032                                            "Failed to receive routing response: {}",
1033                                            e
1034                                        )),
1035                                    }
1036                                }
1037                            }
1038                        }
1039                        Err(e) => {
1040                            error!("Failed to route request: {}", e);
1041                            ManagementResponse::Error {
1042                                error: ManagementError::RuntimeError(format!(
1043                                    "Failed to route request: {}",
1044                                    e
1045                                )),
1046                            }
1047                        }
1048                    }
1049                }
1050                ManagementCommand::GetActorManifest { id } => {
1051                    info!("Getting manifest for actor: {:?}", id);
1052                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1053                    runtime_tx
1054                        .send(TheaterCommand::GetActorManifest {
1055                            actor_id: id,
1056                            response_tx: cmd_tx,
1057                        })
1058                        .await?;
1059
1060                    let manifest = cmd_rx.await?;
1061                    ManagementResponse::ActorManifest {
1062                        id,
1063                        manifest: manifest?,
1064                    }
1065                }
1066                ManagementCommand::GetActorStatus { id } => {
1067                    info!("Getting status for actor: {:?}", id);
1068                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1069                    runtime_tx
1070                        .send(TheaterCommand::GetActorStatus {
1071                            actor_id: id,
1072                            response_tx: cmd_tx,
1073                        })
1074                        .await?;
1075
1076                    let status = cmd_rx.await?;
1077                    ManagementResponse::ActorStatus {
1078                        id,
1079                        status: status?,
1080                    }
1081                }
1082                ManagementCommand::RestartActor { id } => {
1083                    info!("Restarting actor: {:?}", id);
1084                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1085                    runtime_tx
1086                        .send(TheaterCommand::RestartActor {
1087                            actor_id: id,
1088                            response_tx: cmd_tx,
1089                        })
1090                        .await?;
1091
1092                    match cmd_rx.await? {
1093                        Ok(_) => ManagementResponse::Restarted { id },
1094                        Err(e) => ManagementResponse::Error {
1095                            error: ManagementError::RuntimeError(format!(
1096                                "Failed to restart actor: {}",
1097                                e
1098                            )),
1099                        },
1100                    }
1101                }
1102                ManagementCommand::GetActorState { id } => {
1103                    info!("Getting state for actor: {:?}", id);
1104                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1105                    runtime_tx
1106                        .send(TheaterCommand::GetActorState {
1107                            actor_id: id,
1108                            response_tx: cmd_tx,
1109                        })
1110                        .await?;
1111
1112                    let state = cmd_rx.await?;
1113                    ManagementResponse::ActorState { id, state: state? }
1114                }
1115                ManagementCommand::GetActorMetrics { id } => {
1116                    info!("Getting metrics for actor: {:?}", id);
1117                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1118                    runtime_tx
1119                        .send(TheaterCommand::GetActorMetrics {
1120                            actor_id: id,
1121                            response_tx: cmd_tx,
1122                        })
1123                        .await?;
1124
1125                    let metrics = cmd_rx.await?;
1126                    ManagementResponse::ActorMetrics {
1127                        id,
1128                        metrics: serde_json::to_value(metrics?)?,
1129                    }
1130                }
1131                ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1132                    // TODO: Re-implement actor package updates
1133                    ManagementResponse::Error {
1134                        error: ManagementError::RuntimeError(
1135                            "UpdateActorPackage not yet implemented".to_string(),
1136                        ),
1137                    }
1138                }
1139                // Handle channel management commands
1140                ManagementCommand::OpenChannel {
1141                    actor_id,
1142                    initial_message,
1143                } => {
1144                    info!("Opening channel to actor: {:?}", actor_id);
1145
1146                    // Create a response channel
1147                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1148
1149                    // Generate a channel ID
1150                    let client_id = ChannelParticipant::External;
1151                    let channel_id = ChannelId::new(&client_id, &actor_id);
1152                    let channel_id_str = channel_id.0.clone();
1153
1154                    // Send the channel open command via MessageRouter
1155                    message_router
1156                        .route_message(theater::messages::MessageCommand::OpenChannel {
1157                            initiator_id: client_id.clone(),
1158                            target_id: actor_id.clone(),
1159                            channel_id: channel_id.clone(),
1160                            initial_message,
1161                            response_tx,
1162                        })
1163                        .await
1164                        .map_err(|e| {
1165                            anyhow::anyhow!("Failed to send channel open command: {}", e)
1166                        })?;
1167
1168                    // Wait for the response
1169                    match response_rx.await {
1170                        Ok(result) => {
1171                            match result {
1172                                Ok(accepted) => {
1173                                    if accepted {
1174                                        // Channel opened successfully
1175                                        info!("Channel opened successfully: {}", channel_id_str);
1176
1177                                        // Register the channel subscription to receive messages
1178                                        let channel_sub = ChannelSubscription {
1179                                            channel_id: channel_id_str.clone(),
1180                                            initiator_id: client_id.clone(),
1181                                            target_id: actor_id.clone(),
1182                                            client_tx: cmd_client_tx.clone(),
1183                                        };
1184
1185                                        channel_subscriptions
1186                                            .lock()
1187                                            .await
1188                                            .insert(channel_id_str.clone(), channel_sub);
1189
1190                                        // Track this channel for cleanup on disconnect
1191                                        connection_channel_subscriptions
1192                                            .push(channel_id_str.clone());
1193
1194                                        ManagementResponse::ChannelOpened {
1195                                            channel_id: channel_id_str,
1196                                            actor_id,
1197                                        }
1198                                    } else {
1199                                        // Channel rejected by target
1200                                        ManagementResponse::Error {
1201                                            error: ManagementError::ChannelRejected,
1202                                        }
1203                                    }
1204                                }
1205                                Err(e) => ManagementResponse::Error {
1206                                    error: ManagementError::RuntimeError(format!(
1207                                        "Error opening channel: {}",
1208                                        e
1209                                    )),
1210                                },
1211                            }
1212                        }
1213                        Err(e) => ManagementResponse::Error {
1214                            error: ManagementError::CommunicationError(format!(
1215                                "Failed to receive channel open response: {}",
1216                                e
1217                            )),
1218                        },
1219                    }
1220                }
1221                ManagementCommand::SendOnChannel {
1222                    channel_id,
1223                    message,
1224                } => {
1225                    info!("Sending message on channel: {}", channel_id);
1226
1227                    // Create response channel
1228                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1229
1230                    // Parse the channel ID
1231                    let channel_id_parsed = ChannelId(channel_id.clone());
1232
1233                    // Send the message on the channel via MessageRouter
1234                    let sender_id = ChannelParticipant::External;
1235                    match message_router
1236                        .route_message(theater::messages::MessageCommand::ChannelMessage {
1237                            channel_id: channel_id_parsed,
1238                            sender_id,
1239                            message,
1240                            response_tx,
1241                        })
1242                        .await
1243                    {
1244                        Ok(_) => {
1245                            // Wait for routing result
1246                            match response_rx.await {
1247                                Ok(Ok(())) => {
1248                                    info!("Message sent successfully on channel: {}", channel_id);
1249                                    ManagementResponse::MessageSent { channel_id }
1250                                }
1251                                Ok(Err(e)) => {
1252                                    error!("Failed to send on channel: {}", e);
1253                                    ManagementResponse::Error {
1254                                        error: ManagementError::RuntimeError(format!(
1255                                            "Failed to send on channel: {}",
1256                                            e
1257                                        )),
1258                                    }
1259                                }
1260                                Err(e) => {
1261                                    error!("Failed to receive channel send response: {}", e);
1262                                    ManagementResponse::Error {
1263                                        error: ManagementError::CommunicationError(format!(
1264                                            "Failed to receive channel send response: {}",
1265                                            e
1266                                        )),
1267                                    }
1268                                }
1269                            }
1270                        }
1271                        Err(e) => {
1272                            error!("Failed to route channel message: {}", e);
1273                            ManagementResponse::Error {
1274                                error: ManagementError::RuntimeError(format!(
1275                                    "Failed to route channel message: {}",
1276                                    e
1277                                )),
1278                            }
1279                        }
1280                    }
1281                }
1282                ManagementCommand::CloseChannel { channel_id } => {
1283                    info!("Closing channel: {}", channel_id);
1284
1285                    // Create response channel
1286                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1287
1288                    // Parse the channel ID
1289                    let channel_id_parsed = ChannelId(channel_id.clone());
1290
1291                    // Close the channel via MessageRouter
1292                    let sender_id = ChannelParticipant::External;
1293                    match message_router
1294                        .route_message(theater::messages::MessageCommand::ChannelClose {
1295                            channel_id: channel_id_parsed,
1296                            sender_id,
1297                            response_tx,
1298                        })
1299                        .await
1300                    {
1301                        Ok(_) => {
1302                            // Wait for routing result
1303                            match response_rx.await {
1304                                Ok(Ok(())) => {
1305                                    info!("Channel closed successfully: {}", channel_id);
1306
1307                                    // Remove from channel subscriptions
1308                                    channel_subscriptions.lock().await.remove(&channel_id);
1309                                    connection_channel_subscriptions.retain(|id| id != &channel_id);
1310
1311                                    ManagementResponse::ChannelClosed { channel_id }
1312                                }
1313                                Ok(Err(e)) => {
1314                                    error!("Failed to close channel: {}", e);
1315                                    ManagementResponse::Error {
1316                                        error: ManagementError::RuntimeError(format!(
1317                                            "Failed to close channel: {}",
1318                                            e
1319                                        )),
1320                                    }
1321                                }
1322                                Err(e) => {
1323                                    error!("Failed to receive channel close response: {}", e);
1324                                    ManagementResponse::Error {
1325                                        error: ManagementError::CommunicationError(format!(
1326                                            "Failed to receive channel close response: {}",
1327                                            e
1328                                        )),
1329                                    }
1330                                }
1331                            }
1332                        }
1333                        Err(e) => {
1334                            error!("Failed to route channel close: {}", e);
1335                            ManagementResponse::Error {
1336                                error: ManagementError::RuntimeError(format!(
1337                                    "Failed to route channel close: {}",
1338                                    e
1339                                )),
1340                            }
1341                        }
1342                    }
1343                }
1344                ManagementCommand::NewStore {} => {
1345                    info!("Creating new store");
1346                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1347                    runtime_tx
1348                        .send(TheaterCommand::NewStore {
1349                            response_tx: cmd_tx,
1350                        })
1351                        .await?;
1352
1353                    let store_id = cmd_rx.await?;
1354                    ManagementResponse::StoreCreated {
1355                        store_id: store_id?.id,
1356                    }
1357                }
1358            };
1359
1360            debug!("Sending response: {:?}", response);
1361            if let Err(e) = client_tx.send(response).await {
1362                error!("Failed to send response: {}", e);
1363                break;
1364            }
1365            debug!("Response sent");
1366        }
1367
1368        // Clean up all subscriptions for this connection
1369        debug!(
1370            "Connection closed, cleaning up {} subscriptions",
1371            connection_subscriptions.len()
1372        );
1373        let mut subs = subscriptions.lock().await;
1374
1375        for (actor_id, sub_id) in connection_subscriptions {
1376            if let Some(actor_subs) = subs.get_mut(&actor_id) {
1377                actor_subs.retain(|sub| sub.id != sub_id);
1378
1379                // Remove the entry if no subscriptions remain
1380                if actor_subs.is_empty() {
1381                    subs.remove(&actor_id);
1382                }
1383            }
1384        }
1385
1386        // Clean up channel subscriptions
1387        debug!(
1388            "Connection closed, cleaning up {} channel subscriptions",
1389            connection_channel_subscriptions.len()
1390        );
1391        let mut channel_subs = channel_subscriptions.lock().await;
1392
1393        for channel_id in connection_channel_subscriptions {
1394            channel_subs.remove(&channel_id);
1395        }
1396
1397        debug!("Cleaned up all subscriptions for the connection");
1398        Ok(())
1399    }
1400}