Skip to main content

theater_server/
server.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9    ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10    ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21    RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{default_init_state, ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::resolve_reference;
28use theater::TheaterRuntimeError;
29
30// Import Theater-specific handlers only
31// DEPRECATED: WASI handlers (environment, filesystem, http, io, etc.) moved to crates/deprecated/
32use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42    StartActor {
43        manifest: String,
44        initial_state: Option<Vec<u8>>,
45        parent: bool,
46        subscribe: bool,
47    },
48    StopActor {
49        id: TheaterId,
50    },
51    TerminateActor {
52        id: TheaterId,
53    },
54    ListActors,
55    SubscribeToActor {
56        id: TheaterId,
57    },
58    UnsubscribeFromActor {
59        id: TheaterId,
60        subscription_id: Uuid,
61    },
62    SendActorMessage {
63        id: TheaterId,
64        data: Vec<u8>,
65    },
66    RequestActorMessage {
67        id: TheaterId,
68        data: Vec<u8>,
69    },
70    GetActorManifest {
71        id: TheaterId,
72    },
73    GetActorStatus {
74        id: TheaterId,
75    },
76    RestartActor {
77        id: TheaterId,
78    },
79    GetActorState {
80        id: TheaterId,
81    },
82    GetActorMetrics {
83        id: TheaterId,
84    },
85    UpdateActorPackage {
86        id: TheaterId,
87        package: String,
88    },
89    // Channel management commands
90    OpenChannel {
91        actor_id: ChannelParticipant,
92        initial_message: Vec<u8>,
93    },
94    SendOnChannel {
95        channel_id: String,
96        message: Vec<u8>,
97    },
98    CloseChannel {
99        channel_id: String,
100    },
101
102    // Store commands
103    NewStore {},
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[allow(clippy::large_enum_variant)]
108pub enum ManagementResponse {
109    ActorStarted {
110        id: TheaterId,
111    },
112    ActorStopped {
113        id: TheaterId,
114    },
115    ActorList {
116        actors: Vec<(TheaterId, String)>,
117    },
118    Subscribed {
119        id: TheaterId,
120        subscription_id: Uuid,
121    },
122    Unsubscribed {
123        id: TheaterId,
124    },
125    ActorEvent {
126        event: ChainEvent,
127    },
128    ActorResult(ActorResult),
129    Error {
130        error: ManagementError,
131    },
132    RequestedMessage {
133        id: TheaterId,
134        message: Vec<u8>,
135    },
136    SentMessage {
137        id: TheaterId,
138    },
139    ActorStatus {
140        id: TheaterId,
141        status: ActorStatus,
142    },
143    Restarted {
144        id: TheaterId,
145    },
146    ActorManifest {
147        id: TheaterId,
148        manifest: ManifestConfig,
149    },
150    ActorState {
151        id: TheaterId,
152        state: Value,
153    },
154    ActorMetrics {
155        id: TheaterId,
156        metrics: serde_json::Value,
157    },
158    ActorPackageUpdated {
159        id: TheaterId,
160    },
161    // Channel management responses
162    ChannelOpened {
163        channel_id: String,
164        actor_id: ChannelParticipant,
165    },
166    MessageSent {
167        channel_id: String,
168    },
169    ChannelMessage {
170        channel_id: String,
171        sender_id: ChannelParticipant,
172        message: Vec<u8>,
173    },
174    ChannelClosed {
175        channel_id: String,
176    },
177
178    // Store responses
179    StoreCreated {
180        store_id: String,
181    },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub enum ManagementError {
186    // Actor-related errors
187    ActorNotFound,
188    ActorAlreadyExists,
189    ActorNotRunning,
190    ActorError(String),
191
192    // Channel-related errors
193    ChannelNotFound,
194    ChannelClosed,
195    ChannelRejected,
196
197    // Store-related errors
198    StoreError(String),
199
200    // Communication errors
201    CommunicationError(String),
202
203    // Request handling errors
204    InvalidRequest(String),
205    Timeout,
206
207    // System errors
208    RuntimeError(String),
209    InternalError(String),
210
211    // Serialization/deserialization errors
212    SerializationError(String),
213
214    // Actor initialization errors
215    ActorInitializationError(String),
216}
217
218// Allow converting from TheaterRuntimeError to ManagementError
219impl From<TheaterRuntimeError> for ManagementError {
220    fn from(err: TheaterRuntimeError) -> Self {
221        match err {
222            TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
223            TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
224            TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
225            TheaterRuntimeError::ActorOperationFailed(msg) => {
226                ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
227            }
228            TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
229            TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
230            TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
231            TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
232            TheaterRuntimeError::SerializationError(msg) => {
233                ManagementError::SerializationError(msg)
234            }
235            TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
236            TheaterRuntimeError::ActorInitializationError(msg) => {
237                ManagementError::ActorInitializationError(msg)
238            }
239        }
240    }
241}
242
243// Implement Display for ManagementError to provide better error messages
244impl std::fmt::Display for ManagementError {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        match self {
247            ManagementError::ActorNotFound => write!(f, "Actor not found"),
248            ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
249            ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
250            ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
251            ManagementError::ChannelNotFound => write!(f, "Channel not found"),
252            ManagementError::ChannelClosed => write!(f, "Channel is closed"),
253            ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
254            ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
255            ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
256            ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
257            ManagementError::Timeout => write!(f, "Operation timed out"),
258            ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
259            ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
260            ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
261            ManagementError::ActorInitializationError(msg) => {
262                write!(f, "Actor initialization error: {}", msg)
263            }
264        }
265    }
266}
267
268// Implement Error trait for ManagementError
269impl std::error::Error for ManagementError {}
270
271#[derive(Debug)]
272#[allow(dead_code)]
273struct Subscription {
274    id: Uuid,
275    client_tx: mpsc::Sender<ManagementResponse>,
276}
277
278impl Eq for Subscription {}
279impl PartialEq for Subscription {
280    fn eq(&self, other: &Self) -> bool {
281        self.id == other.id
282    }
283}
284impl std::hash::Hash for Subscription {
285    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
286        self.id.hash(state);
287    }
288}
289
290// ChannelEvent is now imported from theater::ChannelEvent
291
292// Structure to track active channel subscriptions
293#[derive(Debug)]
294#[allow(dead_code)]
295struct ChannelSubscription {
296    channel_id: String,
297    initiator_id: ChannelParticipant,
298    target_id: ChannelParticipant,
299    client_tx: mpsc::Sender<ManagementResponse>,
300}
301
302/// Creates a HandlerRegistry with Theater-specific handlers.
303///
304/// NOTE: WASI handlers (environment, filesystem, http, io, sockets, timing, random, process)
305/// have been deprecated and moved to crates/deprecated/. They will be redesigned for
306/// Composite runtime support later.
307///
308/// Returns both the HandlerRegistry and the MessageRouter, allowing the server
309/// to use the MessageRouter for external client messaging.
310fn create_root_handler_registry(
311    theater_tx: mpsc::Sender<TheaterCommand>,
312) -> (
313    HandlerRegistry,
314    theater_handler_message_server::MessageRouter,
315) {
316    let mut registry = HandlerRegistry::new();
317
318    info!("Initializing Theater server with Theater-specific handlers...");
319
320    // Runtime handler - provides actor runtime information and control
321    let runtime_config = RuntimeHostConfig {};
322    registry.register(RuntimeHandler::new(
323        runtime_config,
324        theater_tx.clone(),
325        None,
326    ));
327
328    // Store handler - provides key-value storage for actors
329    let store_config = StoreHandlerConfig::default();
330    registry.register(StoreHandler::new(store_config, None));
331
332    // Supervisor handler - allows actors to spawn and manage child actors
333    let supervisor_config = SupervisorHostConfig {};
334    registry.register(SupervisorHandler::new(supervisor_config, None));
335
336    // Message server handler - provides inter-actor messaging
337    let message_router = theater_handler_message_server::MessageRouter::new();
338    registry.register(MessageServerHandler::new(None, message_router.clone()));
339
340    // TCP handler - provides raw TCP networking for actors
341    let tcp_config = TcpHandlerConfig {
342        listen: None,
343        max_connections: None,
344        ..Default::default()
345    };
346    registry.register(TcpHandler::new(tcp_config));
347
348    info!("✓ 5 Theater-specific handlers registered");
349    info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
350
351    (registry, message_router)
352}
353
354pub struct TheaterServer {
355    runtime: TheaterRuntime,
356    theater_tx: mpsc::Sender<TheaterCommand>,
357    management_socket: TcpListener,
358    subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
359    // Field to track channel subscriptions
360    channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
361    // Channel for runtime to send channel events back to server
362    #[allow(dead_code)]
363    channel_events_tx: mpsc::Sender<ChannelEvent>,
364    // MessageRouter for external client messaging
365    message_router: theater_handler_message_server::MessageRouter,
366}
367
368impl TheaterServer {
369    // Process channel events and forward them to subscribed clients
370    async fn process_channel_events(
371        mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
372        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
373    ) {
374        while let Some(event) = channel_events_rx.recv().await {
375            match event {
376                ChannelEvent::Message {
377                    channel_id,
378                    sender_id,
379                    message,
380                } => {
381                    tracing::debug!("Received channel message for {}", channel_id);
382                    // Forward to subscribed clients
383                    let subs = channel_subscriptions.lock().await;
384                    if let Some(sub) = subs.get(&channel_id.0) {
385                        let response = ManagementResponse::ChannelMessage {
386                            channel_id: channel_id.0.clone(),
387                            sender_id,
388                            message,
389                        };
390
391                        tracing::debug!("Forwarding channel message to client: {:?}", response);
392
393                        if let Err(e) = sub.client_tx.send(response).await {
394                            tracing::warn!("Failed to forward channel message: {}", e);
395                        } else {
396                            tracing::debug!("Forwarded channel message to client");
397                        }
398                    }
399                }
400                ChannelEvent::Close { channel_id } => {
401                    tracing::debug!("Received channel close event for {}", channel_id);
402                    // Forward to subscribed clients
403                    let mut subs = channel_subscriptions.lock().await;
404                    if let Some(sub) = subs.remove(&channel_id.0) {
405                        let response = ManagementResponse::ChannelClosed {
406                            channel_id: channel_id.0.clone(),
407                        };
408
409                        if let Err(e) = sub.client_tx.send(response).await {
410                            tracing::warn!("Failed to forward channel close event: {}", e);
411                        } else {
412                            tracing::debug!("Forwarded channel close event to client");
413                        }
414                    }
415                }
416            }
417        }
418    }
419
420    pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
421        let (theater_tx, theater_rx) = mpsc::channel(32);
422
423        // Create channel for runtime to send channel events back to server
424        let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
425
426        // Create handler registry with all migrated handlers (root permissions)
427        // Also get the MessageRouter for external client messaging
428        let (handler_registry, message_router) = create_root_handler_registry(theater_tx.clone());
429
430        // Create the runtime with the handler registry
431        let runtime = TheaterRuntime::new(
432            theater_tx.clone(),
433            theater_rx,
434            Some(channel_events_tx.clone()),
435            handler_registry,
436        )
437        .await?;
438        let management_socket = TcpListener::bind(address).await?;
439
440        let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
441
442        // Start task to process channel events
443        let channel_subs_clone = channel_subscriptions.clone();
444        tokio::spawn(async move {
445            Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
446        });
447
448        Ok(Self {
449            runtime,
450            theater_tx,
451            management_socket,
452            subscriptions: Arc::new(Mutex::new(HashMap::new())),
453            channel_subscriptions,
454            channel_events_tx,
455            message_router,
456        })
457    }
458
459    pub async fn run(mut self) -> Result<()> {
460        info!(
461            "Theater server starting on {:?}",
462            self.management_socket.local_addr()?
463        );
464
465        // Start the theater runtime in its own task
466        let runtime_handle = tokio::spawn(async move {
467            match self.runtime.run().await {
468                Ok(_) => Ok(()),
469                Err(e) => {
470                    error!("Theater runtime failed: {}", e);
471                    Err(e)
472                }
473            }
474        });
475
476        // Accept and handle management connections
477        while let Ok((socket, addr)) = self.management_socket.accept().await {
478            info!("New management connection from {}", addr);
479            let runtime_tx = self.theater_tx.clone();
480            let subscriptions = self.subscriptions.clone();
481            let channel_subscriptions = self.channel_subscriptions.clone();
482            let message_router = self.message_router.clone();
483
484            tokio::spawn(async move {
485                if let Err(e) = Self::handle_management_connection(
486                    socket,
487                    runtime_tx,
488                    subscriptions,
489                    channel_subscriptions,
490                    message_router,
491                )
492                .await
493                {
494                    error!("Error handling management connection: {}", e);
495                }
496            });
497        }
498
499        runtime_handle.await??;
500        Ok(())
501    }
502
503    async fn handle_management_connection(
504        socket: TcpStream,
505        runtime_tx: mpsc::Sender<TheaterCommand>,
506        subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
507        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
508        message_router: theater_handler_message_server::MessageRouter,
509    ) -> Result<()> {
510        // Create a channel for sending responses to this client
511        let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
512
513        let codec = FragmentingCodec::new();
514        let framed = Framed::new(socket, codec);
515
516        // Split the framed connection into read and write parts
517        let (mut framed_sink, mut framed_stream) = framed.split();
518
519        // Clone the client_tx for use in the command loop
520        let cmd_client_tx = client_tx.clone();
521
522        // Start a task to forward responses to the client
523        let _response_task = tokio::spawn(async move {
524            while let Some(response) = client_rx.recv().await {
525                match serde_json::to_vec(&response) {
526                    Ok(data) => {
527                        debug!("Serialized response: {} bytes", data.len());
528                        if data.len() > 10 * 1024 * 1024 {
529                            debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
530                        }
531                        if let Err(e) = framed_sink.send(Bytes::from(data)).await {
532                            debug!("Error sending response to client: {}", e);
533                            break;
534                        }
535                    }
536                    Err(e) => {
537                        error!("Error serializing response: {}", e);
538                    }
539                }
540            }
541            debug!("Response forwarder for client closed");
542        });
543
544        // Store active subscriptions for this connection to clean up on disconnect
545        let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
546
547        // Store active channel subscriptions for cleanup
548        let mut connection_channel_subscriptions: Vec<String> = Vec::new();
549
550        // Loop until connection closes or an error occurs
551        'connection: while let Some(msg) = framed_stream.next().await {
552            debug!("Received management message");
553            let msg = match msg {
554                Ok(m) => m,
555                Err(e) => {
556                    error!("Error receiving message: {}", e);
557                    break 'connection;
558                }
559            };
560
561            let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
562                Ok(c) => c,
563                Err(e) => {
564                    error!(
565                        "Error parsing command: {} {}",
566                        e,
567                        String::from_utf8_lossy(&msg)
568                    );
569                    continue;
570                }
571            };
572            debug!("Parsed command: {:?}", cmd);
573
574            // Store the command for reference (used for subscription tracking)
575            let _cmd_clone = cmd.clone();
576
577            let response = match cmd {
578                ManagementCommand::StartActor {
579                    manifest,
580                    initial_state: _initial_state,
581                    parent,
582                    subscribe,
583                } => {
584                    info!("Starting actor from manifest: {:?}", manifest);
585
586                    // Load and parse manifest
587                    let manifest_str = match resolve_reference(&manifest).await {
588                        Ok(bytes) => match String::from_utf8(bytes) {
589                            Ok(s) => s,
590                            Err(e) => {
591                                error!("Invalid manifest encoding: {}", e);
592                                cmd_client_tx
593                                    .send(ManagementResponse::Error {
594                                        error: ManagementError::ActorInitializationError(format!(
595                                            "Invalid manifest encoding: {}",
596                                            e
597                                        )),
598                                    })
599                                    .await
600                                    .ok();
601                                continue;
602                            }
603                        },
604                        Err(e) => {
605                            error!("Failed to load manifest: {}", e);
606                            cmd_client_tx
607                                .send(ManagementResponse::Error {
608                                    error: ManagementError::ActorInitializationError(format!(
609                                        "Failed to load manifest: {}",
610                                        e
611                                    )),
612                                })
613                                .await
614                                .ok();
615                            continue;
616                        }
617                    };
618
619                    let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
620                        Ok(m) => m,
621                        Err(e) => {
622                            error!("Failed to parse manifest: {}", e);
623                            cmd_client_tx
624                                .send(ManagementResponse::Error {
625                                    error: ManagementError::ActorInitializationError(format!(
626                                        "Failed to parse manifest: {}",
627                                        e
628                                    )),
629                                })
630                                .await
631                                .ok();
632                            continue;
633                        }
634                    };
635
636                    // Load wasm bytes
637                    let wasm_bytes = match resolve_reference(&manifest_config.package).await {
638                        Ok(bytes) => bytes,
639                        Err(e) => {
640                            error!("Failed to load WASM: {}", e);
641                            cmd_client_tx
642                                .send(ManagementResponse::Error {
643                                    error: ManagementError::ActorInitializationError(format!(
644                                        "Failed to load WASM: {}",
645                                        e
646                                    )),
647                                })
648                                .await
649                                .ok();
650                            continue;
651                        }
652                    };
653
654                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
655                    debug!("Sending SpawnActor command to runtime");
656                    let supervisor_tx = if parent {
657                        let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
658                        let cmd_client_tx = cmd_client_tx.clone();
659                        tokio::spawn(async move {
660                            while let Some(res) = supervisor_rx.recv().await {
661                                debug!("Received supervisor response: {:?}", res);
662                                if let Err(e) = cmd_client_tx
663                                    .send(ManagementResponse::ActorResult(res))
664                                    .await
665                                {
666                                    error!("Failed to send supervisor response: {}", e);
667                                    break;
668                                }
669                            }
670                        });
671                        Some(supervisor_tx)
672                    } else {
673                        None
674                    };
675                    let subscription_tx = if subscribe {
676                        let (event_tx, mut event_rx) = mpsc::channel(32);
677
678                        // set up a task to forward events to the client
679                        let cmd_client_tx = cmd_client_tx.clone();
680                        tokio::spawn(async move {
681                            while let Some((_actor_id, event)) = event_rx.recv().await {
682                                debug!("Received event for subscription");
683                                let response = ManagementResponse::ActorEvent { event };
684                                if let Err(e) = cmd_client_tx.send(response).await {
685                                    debug!("Failed to forward event to client: {}", e);
686                                    break;
687                                }
688                            }
689                            debug!("Event forwarder for subscription stopped");
690                        });
691
692                        Some(event_tx)
693                    } else {
694                        None
695                    };
696                    match runtime_tx
697                        .send(TheaterCommand::SetupActor {
698                            wasm_bytes,
699                            name: Some(manifest_config.name.clone()),
700                            manifest: Some(manifest_config),
701                            init_state: default_init_state(),
702                            response_tx: cmd_tx,
703                            supervisor_tx,
704                            subscription_tx,
705                        })
706                        .await
707                    {
708                        Ok(_) => {
709                            debug!("SpawnActor command sent to runtime, awaiting response");
710                            match cmd_rx.await {
711                                Ok(result) => match result {
712                                    Ok(actor_id) => {
713                                        info!("Actor started with ID: {:?}", actor_id);
714                                        ManagementResponse::ActorStarted { id: actor_id }
715                                    }
716                                    Err(e) => {
717                                        error!("Runtime failed to start actor: {}", e);
718                                        ManagementResponse::Error {
719                                            error: ManagementError::RuntimeError(format!(
720                                                "Failed to start actor: {}",
721                                                e
722                                            )),
723                                        }
724                                    }
725                                },
726                                Err(e) => {
727                                    error!("Failed to receive spawn response: {}", e);
728                                    ManagementResponse::Error {
729                                        error: ManagementError::CommunicationError(format!(
730                                            "Failed to receive spawn response: {}",
731                                            e
732                                        )),
733                                    }
734                                }
735                            }
736                        }
737                        Err(e) => {
738                            error!("Failed to send SpawnActor command: {}", e);
739                            ManagementResponse::Error {
740                                error: ManagementError::CommunicationError(format!(
741                                    "Failed to send spawn command: {}",
742                                    e
743                                )),
744                            }
745                        }
746                    }
747                }
748                ManagementCommand::StopActor { id } => {
749                    info!("Stopping actor: {:?}", id);
750                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
751                    runtime_tx
752                        .send(TheaterCommand::StopActor {
753                            actor_id: id,
754                            response_tx: cmd_tx,
755                        })
756                        .await?;
757
758                    match cmd_rx.await? {
759                        Ok(_) => {
760                            subscriptions.lock().await.remove(&id);
761                            ManagementResponse::ActorStopped { id }
762                        }
763                        Err(e) => ManagementResponse::Error {
764                            error: ManagementError::RuntimeError(format!(
765                                "Failed to stop actor: {}",
766                                e
767                            )),
768                        },
769                    }
770                }
771                ManagementCommand::TerminateActor { id } => {
772                    info!("Terminating actor: {:?}", id);
773                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
774                    runtime_tx
775                        .send(TheaterCommand::TerminateActor {
776                            actor_id: id,
777                            response_tx: cmd_tx,
778                        })
779                        .await?;
780
781                    match cmd_rx.await? {
782                        Ok(_) => {
783                            subscriptions.lock().await.remove(&id);
784                            ManagementResponse::ActorStopped { id }
785                        }
786                        Err(e) => ManagementResponse::Error {
787                            error: ManagementError::RuntimeError(format!(
788                                "Failed to terminate actor: {}",
789                                e
790                            )),
791                        },
792                    }
793                }
794                ManagementCommand::ListActors => {
795                    debug!("Listing actors");
796                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
797                    runtime_tx
798                        .send(TheaterCommand::GetActors {
799                            response_tx: cmd_tx,
800                        })
801                        .await?;
802
803                    match cmd_rx.await? {
804                        Ok(actors) => {
805                            info!("Found {} actors", actors.len());
806                            ManagementResponse::ActorList { actors }
807                        }
808                        Err(e) => ManagementResponse::Error {
809                            error: ManagementError::RuntimeError(format!(
810                                "Failed to list actors: {}",
811                                e
812                            )),
813                        },
814                    }
815                }
816                ManagementCommand::SubscribeToActor { id } => {
817                    info!("New subscription request for actor: {:?}", id);
818                    let subscription_id = Uuid::new_v4();
819                    let subscription = Subscription {
820                        id: subscription_id,
821                        client_tx: cmd_client_tx.clone(),
822                    };
823
824                    debug!("Subscription created with ID: {}", subscription_id);
825
826                    // Register the subscription in the global map
827                    subscriptions
828                        .lock()
829                        .await
830                        .entry(id)
831                        .or_default()
832                        .insert(subscription);
833
834                    // Set up the event channel for the subscription
835                    let (event_tx, mut event_rx) = mpsc::channel(32);
836                    runtime_tx
837                        .send(TheaterCommand::SubscribeToActor {
838                            actor_id: id,
839                            event_tx,
840                        })
841                        .await
842                        .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
843
844                    // Add to the list of subscriptions for this connection
845                    connection_subscriptions.push((id, subscription_id));
846
847                    // Create a task to forward events to this client
848                    let client_tx_clone = cmd_client_tx.clone();
849                    tokio::spawn(async move {
850                        debug!(
851                            "Starting event forwarder for subscription {}",
852                            subscription_id
853                        );
854                        while let Some((_actor_id, event)) = event_rx.recv().await {
855                            debug!("Received event for subscription {}", subscription_id);
856                            let response = ManagementResponse::ActorEvent { event };
857                            if let Err(e) = client_tx_clone.send(response).await {
858                                debug!("Failed to forward event to client: {}", e);
859                                break;
860                            }
861                        }
862                        debug!(
863                            "Event forwarder for subscription {} stopped",
864                            subscription_id
865                        );
866                    });
867
868                    ManagementResponse::Subscribed {
869                        id,
870                        subscription_id,
871                    }
872                }
873                ManagementCommand::UnsubscribeFromActor {
874                    id,
875                    subscription_id,
876                } => {
877                    debug!(
878                        "Removing subscription {} for actor {:?}",
879                        subscription_id, id
880                    );
881
882                    // Remove subscription from the tracking list for this connection
883                    connection_subscriptions
884                        .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
885
886                    // Remove from the global subscriptions map
887                    let mut subs = subscriptions.lock().await;
888                    if let Some(actor_subs) = subs.get_mut(&id) {
889                        actor_subs.retain(|sub| sub.id != subscription_id);
890
891                        // Remove the entry if no subscriptions remain
892                        if actor_subs.is_empty() {
893                            subs.remove(&id);
894                        }
895                    }
896
897                    debug!("Subscription removed");
898                    ManagementResponse::Unsubscribed { id }
899                }
900                ManagementCommand::SendActorMessage { id, data } => {
901                    info!("Sending message to actor: {:?}", id);
902
903                    // Create response channel for routing
904                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
905
906                    // Create ActorMessage
907                    let message = ActorMessage::Send(ActorSend { data });
908
909                    // Route via MessageRouter
910                    match message_router
911                        .route_message(theater::messages::MessageCommand::SendMessage {
912                            target_id: id,
913                            message,
914                            response_tx,
915                        })
916                        .await
917                    {
918                        Ok(_) => {
919                            // Wait for routing result
920                            match response_rx.await {
921                                Ok(Ok(())) => {
922                                    info!("Message sent successfully to actor: {:?}", id);
923                                    ManagementResponse::SentMessage { id }
924                                }
925                                Ok(Err(e)) => {
926                                    error!("Failed to send message to actor: {}", e);
927                                    ManagementResponse::Error {
928                                        error: ManagementError::RuntimeError(format!(
929                                            "Failed to send: {}",
930                                            e
931                                        )),
932                                    }
933                                }
934                                Err(e) => {
935                                    error!("Failed to receive routing response: {}", e);
936                                    ManagementResponse::Error {
937                                        error: ManagementError::CommunicationError(format!(
938                                            "Failed to receive routing response: {}",
939                                            e
940                                        )),
941                                    }
942                                }
943                            }
944                        }
945                        Err(e) => {
946                            error!("Failed to route message: {}", e);
947                            ManagementResponse::Error {
948                                error: ManagementError::RuntimeError(format!(
949                                    "Failed to route message: {}",
950                                    e
951                                )),
952                            }
953                        }
954                    }
955                }
956                ManagementCommand::RequestActorMessage { id, data } => {
957                    info!("Requesting message from actor: {:?}", id);
958
959                    // Create channels for request-response pattern
960                    let (route_tx, route_rx) = tokio::sync::oneshot::channel();
961                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
962
963                    // Create ActorMessage with response channel embedded
964                    let message = ActorMessage::Request(ActorRequest { data, response_tx });
965
966                    // Route via MessageRouter
967                    match message_router
968                        .route_message(theater::messages::MessageCommand::SendMessage {
969                            target_id: id,
970                            message,
971                            response_tx: route_tx,
972                        })
973                        .await
974                    {
975                        Ok(_) => {
976                            // Wait for routing to complete
977                            match route_rx.await {
978                                Ok(Ok(())) => {
979                                    // Routing succeeded, now wait for actor's response
980                                    match response_rx.await {
981                                        Ok(response_data) => {
982                                            info!("Received response from actor: {:?}", id);
983                                            ManagementResponse::RequestedMessage {
984                                                id,
985                                                message: response_data,
986                                            }
987                                        }
988                                        Err(e) => {
989                                            error!("Actor didn't respond: {}", e);
990                                            ManagementResponse::Error {
991                                                error: ManagementError::RuntimeError(format!(
992                                                    "Actor didn't respond: {}",
993                                                    e
994                                                )),
995                                            }
996                                        }
997                                    }
998                                }
999                                Ok(Err(e)) => {
1000                                    error!("Failed to route request to actor: {}", e);
1001                                    ManagementResponse::Error {
1002                                        error: ManagementError::RuntimeError(format!(
1003                                            "Failed to route: {}",
1004                                            e
1005                                        )),
1006                                    }
1007                                }
1008                                Err(e) => {
1009                                    error!("Failed to receive routing response: {}", e);
1010                                    ManagementResponse::Error {
1011                                        error: ManagementError::CommunicationError(format!(
1012                                            "Failed to receive routing response: {}",
1013                                            e
1014                                        )),
1015                                    }
1016                                }
1017                            }
1018                        }
1019                        Err(e) => {
1020                            error!("Failed to route request: {}", e);
1021                            ManagementResponse::Error {
1022                                error: ManagementError::RuntimeError(format!(
1023                                    "Failed to route request: {}",
1024                                    e
1025                                )),
1026                            }
1027                        }
1028                    }
1029                }
1030                ManagementCommand::GetActorManifest { id } => {
1031                    info!("Getting manifest for actor: {:?}", id);
1032                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1033                    runtime_tx
1034                        .send(TheaterCommand::GetActorManifest {
1035                            actor_id: id,
1036                            response_tx: cmd_tx,
1037                        })
1038                        .await?;
1039
1040                    let manifest = cmd_rx.await?;
1041                    ManagementResponse::ActorManifest {
1042                        id,
1043                        manifest: manifest?,
1044                    }
1045                }
1046                ManagementCommand::GetActorStatus { id } => {
1047                    info!("Getting status for actor: {:?}", id);
1048                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1049                    runtime_tx
1050                        .send(TheaterCommand::GetActorStatus {
1051                            actor_id: id,
1052                            response_tx: cmd_tx,
1053                        })
1054                        .await?;
1055
1056                    let status = cmd_rx.await?;
1057                    ManagementResponse::ActorStatus {
1058                        id,
1059                        status: status?,
1060                    }
1061                }
1062                ManagementCommand::RestartActor { id } => {
1063                    info!("Restarting actor: {:?}", id);
1064                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1065                    runtime_tx
1066                        .send(TheaterCommand::RestartActor {
1067                            actor_id: id,
1068                            response_tx: cmd_tx,
1069                        })
1070                        .await?;
1071
1072                    match cmd_rx.await? {
1073                        Ok(_) => ManagementResponse::Restarted { id },
1074                        Err(e) => ManagementResponse::Error {
1075                            error: ManagementError::RuntimeError(format!(
1076                                "Failed to restart actor: {}",
1077                                e
1078                            )),
1079                        },
1080                    }
1081                }
1082                ManagementCommand::GetActorState { id } => {
1083                    info!("Getting state for actor: {:?}", id);
1084                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1085                    runtime_tx
1086                        .send(TheaterCommand::GetActorState {
1087                            actor_id: id,
1088                            response_tx: cmd_tx,
1089                        })
1090                        .await?;
1091
1092                    let state = cmd_rx.await?;
1093                    ManagementResponse::ActorState { id, state: state? }
1094                }
1095                ManagementCommand::GetActorMetrics { id } => {
1096                    info!("Getting metrics for actor: {:?}", id);
1097                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1098                    runtime_tx
1099                        .send(TheaterCommand::GetActorMetrics {
1100                            actor_id: id,
1101                            response_tx: cmd_tx,
1102                        })
1103                        .await?;
1104
1105                    let metrics = cmd_rx.await?;
1106                    ManagementResponse::ActorMetrics {
1107                        id,
1108                        metrics: serde_json::to_value(metrics?)?,
1109                    }
1110                }
1111                ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1112                    // TODO: Re-implement actor package updates
1113                    ManagementResponse::Error {
1114                        error: ManagementError::RuntimeError(
1115                            "UpdateActorPackage not yet implemented".to_string(),
1116                        ),
1117                    }
1118                }
1119                // Handle channel management commands
1120                ManagementCommand::OpenChannel {
1121                    actor_id,
1122                    initial_message,
1123                } => {
1124                    info!("Opening channel to actor: {:?}", actor_id);
1125
1126                    // Create a response channel
1127                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1128
1129                    // Generate a channel ID
1130                    let client_id = ChannelParticipant::External;
1131                    let channel_id = ChannelId::new(&client_id, &actor_id);
1132                    let channel_id_str = channel_id.0.clone();
1133
1134                    // Send the channel open command via MessageRouter
1135                    message_router
1136                        .route_message(theater::messages::MessageCommand::OpenChannel {
1137                            initiator_id: client_id.clone(),
1138                            target_id: actor_id.clone(),
1139                            channel_id: channel_id.clone(),
1140                            initial_message,
1141                            response_tx,
1142                        })
1143                        .await
1144                        .map_err(|e| {
1145                            anyhow::anyhow!("Failed to send channel open command: {}", e)
1146                        })?;
1147
1148                    // Wait for the response
1149                    match response_rx.await {
1150                        Ok(result) => {
1151                            match result {
1152                                Ok(accepted) => {
1153                                    if accepted {
1154                                        // Channel opened successfully
1155                                        info!("Channel opened successfully: {}", channel_id_str);
1156
1157                                        // Register the channel subscription to receive messages
1158                                        let channel_sub = ChannelSubscription {
1159                                            channel_id: channel_id_str.clone(),
1160                                            initiator_id: client_id.clone(),
1161                                            target_id: actor_id.clone(),
1162                                            client_tx: cmd_client_tx.clone(),
1163                                        };
1164
1165                                        channel_subscriptions
1166                                            .lock()
1167                                            .await
1168                                            .insert(channel_id_str.clone(), channel_sub);
1169
1170                                        // Track this channel for cleanup on disconnect
1171                                        connection_channel_subscriptions
1172                                            .push(channel_id_str.clone());
1173
1174                                        ManagementResponse::ChannelOpened {
1175                                            channel_id: channel_id_str,
1176                                            actor_id,
1177                                        }
1178                                    } else {
1179                                        // Channel rejected by target
1180                                        ManagementResponse::Error {
1181                                            error: ManagementError::ChannelRejected,
1182                                        }
1183                                    }
1184                                }
1185                                Err(e) => ManagementResponse::Error {
1186                                    error: ManagementError::RuntimeError(format!(
1187                                        "Error opening channel: {}",
1188                                        e
1189                                    )),
1190                                },
1191                            }
1192                        }
1193                        Err(e) => ManagementResponse::Error {
1194                            error: ManagementError::CommunicationError(format!(
1195                                "Failed to receive channel open response: {}",
1196                                e
1197                            )),
1198                        },
1199                    }
1200                }
1201                ManagementCommand::SendOnChannel {
1202                    channel_id,
1203                    message,
1204                } => {
1205                    info!("Sending message on channel: {}", channel_id);
1206
1207                    // Create response channel
1208                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1209
1210                    // Parse the channel ID
1211                    let channel_id_parsed = ChannelId(channel_id.clone());
1212
1213                    // Send the message on the channel via MessageRouter
1214                    let sender_id = ChannelParticipant::External;
1215                    match message_router
1216                        .route_message(theater::messages::MessageCommand::ChannelMessage {
1217                            channel_id: channel_id_parsed,
1218                            sender_id,
1219                            message,
1220                            response_tx,
1221                        })
1222                        .await
1223                    {
1224                        Ok(_) => {
1225                            // Wait for routing result
1226                            match response_rx.await {
1227                                Ok(Ok(())) => {
1228                                    info!("Message sent successfully on channel: {}", channel_id);
1229                                    ManagementResponse::MessageSent { channel_id }
1230                                }
1231                                Ok(Err(e)) => {
1232                                    error!("Failed to send on channel: {}", e);
1233                                    ManagementResponse::Error {
1234                                        error: ManagementError::RuntimeError(format!(
1235                                            "Failed to send on channel: {}",
1236                                            e
1237                                        )),
1238                                    }
1239                                }
1240                                Err(e) => {
1241                                    error!("Failed to receive channel send response: {}", e);
1242                                    ManagementResponse::Error {
1243                                        error: ManagementError::CommunicationError(format!(
1244                                            "Failed to receive channel send response: {}",
1245                                            e
1246                                        )),
1247                                    }
1248                                }
1249                            }
1250                        }
1251                        Err(e) => {
1252                            error!("Failed to route channel message: {}", e);
1253                            ManagementResponse::Error {
1254                                error: ManagementError::RuntimeError(format!(
1255                                    "Failed to route channel message: {}",
1256                                    e
1257                                )),
1258                            }
1259                        }
1260                    }
1261                }
1262                ManagementCommand::CloseChannel { channel_id } => {
1263                    info!("Closing channel: {}", channel_id);
1264
1265                    // Create response channel
1266                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1267
1268                    // Parse the channel ID
1269                    let channel_id_parsed = ChannelId(channel_id.clone());
1270
1271                    // Close the channel via MessageRouter
1272                    let sender_id = ChannelParticipant::External;
1273                    match message_router
1274                        .route_message(theater::messages::MessageCommand::ChannelClose {
1275                            channel_id: channel_id_parsed,
1276                            sender_id,
1277                            response_tx,
1278                        })
1279                        .await
1280                    {
1281                        Ok(_) => {
1282                            // Wait for routing result
1283                            match response_rx.await {
1284                                Ok(Ok(())) => {
1285                                    info!("Channel closed successfully: {}", channel_id);
1286
1287                                    // Remove from channel subscriptions
1288                                    channel_subscriptions.lock().await.remove(&channel_id);
1289                                    connection_channel_subscriptions.retain(|id| id != &channel_id);
1290
1291                                    ManagementResponse::ChannelClosed { channel_id }
1292                                }
1293                                Ok(Err(e)) => {
1294                                    error!("Failed to close channel: {}", e);
1295                                    ManagementResponse::Error {
1296                                        error: ManagementError::RuntimeError(format!(
1297                                            "Failed to close channel: {}",
1298                                            e
1299                                        )),
1300                                    }
1301                                }
1302                                Err(e) => {
1303                                    error!("Failed to receive channel close response: {}", e);
1304                                    ManagementResponse::Error {
1305                                        error: ManagementError::CommunicationError(format!(
1306                                            "Failed to receive channel close response: {}",
1307                                            e
1308                                        )),
1309                                    }
1310                                }
1311                            }
1312                        }
1313                        Err(e) => {
1314                            error!("Failed to route channel close: {}", e);
1315                            ManagementResponse::Error {
1316                                error: ManagementError::RuntimeError(format!(
1317                                    "Failed to route channel close: {}",
1318                                    e
1319                                )),
1320                            }
1321                        }
1322                    }
1323                }
1324                ManagementCommand::NewStore {} => {
1325                    info!("Creating new store");
1326                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1327                    runtime_tx
1328                        .send(TheaterCommand::NewStore {
1329                            response_tx: cmd_tx,
1330                        })
1331                        .await?;
1332
1333                    let store_id = cmd_rx.await?;
1334                    ManagementResponse::StoreCreated {
1335                        store_id: store_id?.id,
1336                    }
1337                }
1338            };
1339
1340            debug!("Sending response: {:?}", response);
1341            if let Err(e) = client_tx.send(response).await {
1342                error!("Failed to send response: {}", e);
1343                break;
1344            }
1345            debug!("Response sent");
1346        }
1347
1348        // Clean up all subscriptions for this connection
1349        debug!(
1350            "Connection closed, cleaning up {} subscriptions",
1351            connection_subscriptions.len()
1352        );
1353        let mut subs = subscriptions.lock().await;
1354
1355        for (actor_id, sub_id) in connection_subscriptions {
1356            if let Some(actor_subs) = subs.get_mut(&actor_id) {
1357                actor_subs.retain(|sub| sub.id != sub_id);
1358
1359                // Remove the entry if no subscriptions remain
1360                if actor_subs.is_empty() {
1361                    subs.remove(&actor_id);
1362                }
1363            }
1364        }
1365
1366        // Clean up channel subscriptions
1367        debug!(
1368            "Connection closed, cleaning up {} channel subscriptions",
1369            connection_channel_subscriptions.len()
1370        );
1371        let mut channel_subs = channel_subscriptions.lock().await;
1372
1373        for channel_id in connection_channel_subscriptions {
1374            channel_subs.remove(&channel_id);
1375        }
1376
1377        debug!("Cleaned up all subscriptions for the connection");
1378        Ok(())
1379    }
1380}