Skip to main content

theater_server/
server.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9    ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10    ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21    RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{default_init_state, ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::resolve_reference;
28use theater::TheaterRuntimeError;
29
30// Import Theater-specific handlers only
31// DEPRECATED: WASI handlers (environment, filesystem, http, io, etc.) moved to crates/deprecated/
32use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42    StartActor {
43        manifest: String,
44        initial_state: Option<Vec<u8>>,
45        parent: bool,
46        subscribe: bool,
47    },
48    StopActor {
49        id: TheaterId,
50    },
51    TerminateActor {
52        id: TheaterId,
53    },
54    ListActors,
55    SubscribeToActor {
56        id: TheaterId,
57    },
58    UnsubscribeFromActor {
59        id: TheaterId,
60        subscription_id: Uuid,
61    },
62    SendActorMessage {
63        id: TheaterId,
64        data: Vec<u8>,
65    },
66    RequestActorMessage {
67        id: TheaterId,
68        data: Vec<u8>,
69    },
70    GetActorManifest {
71        id: TheaterId,
72    },
73    GetActorStatus {
74        id: TheaterId,
75    },
76    RestartActor {
77        id: TheaterId,
78    },
79    GetActorState {
80        id: TheaterId,
81    },
82    GetActorMetrics {
83        id: TheaterId,
84    },
85    UpdateActorPackage {
86        id: TheaterId,
87        package: String,
88    },
89    // Channel management commands
90    OpenChannel {
91        actor_id: ChannelParticipant,
92        initial_message: Vec<u8>,
93    },
94    SendOnChannel {
95        channel_id: String,
96        message: Vec<u8>,
97    },
98    CloseChannel {
99        channel_id: String,
100    },
101
102    // Store commands
103    NewStore {},
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[allow(clippy::large_enum_variant)]
108pub enum ManagementResponse {
109    ActorStarted {
110        id: TheaterId,
111    },
112    ActorStopped {
113        id: TheaterId,
114    },
115    ActorList {
116        actors: Vec<(TheaterId, String)>,
117    },
118    Subscribed {
119        id: TheaterId,
120        subscription_id: Uuid,
121    },
122    Unsubscribed {
123        id: TheaterId,
124    },
125    ActorEvent {
126        event: ChainEvent,
127    },
128    ActorResult(ActorResult),
129    Error {
130        error: ManagementError,
131    },
132    RequestedMessage {
133        id: TheaterId,
134        message: Vec<u8>,
135    },
136    SentMessage {
137        id: TheaterId,
138    },
139    ActorStatus {
140        id: TheaterId,
141        status: ActorStatus,
142    },
143    Restarted {
144        id: TheaterId,
145    },
146    ActorManifest {
147        id: TheaterId,
148        manifest: ManifestConfig,
149    },
150    ActorState {
151        id: TheaterId,
152        state: Value,
153    },
154    ActorMetrics {
155        id: TheaterId,
156        metrics: serde_json::Value,
157    },
158    ActorPackageUpdated {
159        id: TheaterId,
160    },
161    // Channel management responses
162    ChannelOpened {
163        channel_id: String,
164        actor_id: ChannelParticipant,
165    },
166    MessageSent {
167        channel_id: String,
168    },
169    ChannelMessage {
170        channel_id: String,
171        sender_id: ChannelParticipant,
172        message: Vec<u8>,
173    },
174    ChannelClosed {
175        channel_id: String,
176    },
177
178    // Store responses
179    StoreCreated {
180        store_id: String,
181    },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub enum ManagementError {
186    // Actor-related errors
187    ActorNotFound,
188    ActorAlreadyExists,
189    ActorNotRunning,
190    ActorError(String),
191
192    // Channel-related errors
193    ChannelNotFound,
194    ChannelClosed,
195    ChannelRejected,
196
197    // Store-related errors
198    StoreError(String),
199
200    // Communication errors
201    CommunicationError(String),
202
203    // Request handling errors
204    InvalidRequest(String),
205    Timeout,
206
207    // System errors
208    RuntimeError(String),
209    InternalError(String),
210
211    // Serialization/deserialization errors
212    SerializationError(String),
213
214    // Actor initialization errors
215    ActorInitializationError(String),
216}
217
218// Allow converting from TheaterRuntimeError to ManagementError
219impl From<TheaterRuntimeError> for ManagementError {
220    fn from(err: TheaterRuntimeError) -> Self {
221        match err {
222            TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
223            TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
224            TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
225            TheaterRuntimeError::ActorOperationFailed(msg) => {
226                ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
227            }
228            TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
229            TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
230            TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
231            TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
232            TheaterRuntimeError::SerializationError(msg) => {
233                ManagementError::SerializationError(msg)
234            }
235            TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
236            TheaterRuntimeError::ActorInitializationError(msg) => {
237                ManagementError::ActorInitializationError(msg)
238            }
239        }
240    }
241}
242
243// Implement Display for ManagementError to provide better error messages
244impl std::fmt::Display for ManagementError {
245    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246        match self {
247            ManagementError::ActorNotFound => write!(f, "Actor not found"),
248            ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
249            ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
250            ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
251            ManagementError::ChannelNotFound => write!(f, "Channel not found"),
252            ManagementError::ChannelClosed => write!(f, "Channel is closed"),
253            ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
254            ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
255            ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
256            ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
257            ManagementError::Timeout => write!(f, "Operation timed out"),
258            ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
259            ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
260            ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
261            ManagementError::ActorInitializationError(msg) => {
262                write!(f, "Actor initialization error: {}", msg)
263            }
264        }
265    }
266}
267
268// Implement Error trait for ManagementError
269impl std::error::Error for ManagementError {}
270
271#[derive(Debug)]
272#[allow(dead_code)]
273struct Subscription {
274    id: Uuid,
275    client_tx: mpsc::Sender<ManagementResponse>,
276}
277
278impl Eq for Subscription {}
279impl PartialEq for Subscription {
280    fn eq(&self, other: &Self) -> bool {
281        self.id == other.id
282    }
283}
284impl std::hash::Hash for Subscription {
285    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
286        self.id.hash(state);
287    }
288}
289
290// ChannelEvent is now imported from theater::ChannelEvent
291
292// Structure to track active channel subscriptions
293#[derive(Debug)]
294#[allow(dead_code)]
295struct ChannelSubscription {
296    channel_id: String,
297    initiator_id: ChannelParticipant,
298    target_id: ChannelParticipant,
299    client_tx: mpsc::Sender<ManagementResponse>,
300}
301
302/// Creates a HandlerRegistry with Theater-specific handlers.
303///
304/// NOTE: WASI handlers (environment, filesystem, http, io, sockets, timing, random, process)
305/// have been deprecated and moved to crates/deprecated/. They will be redesigned for
306/// Composite runtime support later.
307///
308/// Returns both the HandlerRegistry and the MessageRouter, allowing the server
309/// to use the MessageRouter for external client messaging.
310fn create_root_handler_registry(
311    theater_tx: mpsc::Sender<TheaterCommand>,
312) -> (
313    HandlerRegistry,
314    theater_handler_message_server::MessageRouter,
315) {
316    let mut registry = HandlerRegistry::new();
317
318    info!("Initializing Theater server with Theater-specific handlers...");
319
320    // Runtime handler - provides actor runtime information and control
321    let runtime_config = RuntimeHostConfig {};
322    registry.register(RuntimeHandler::new(
323        runtime_config,
324        theater_tx.clone(),
325        None,
326    ));
327
328    // Store handler - provides key-value storage for actors
329    let store_config = StoreHandlerConfig::default();
330    registry.register(StoreHandler::new(store_config, None));
331
332    // Supervisor handler - allows actors to spawn and manage child actors
333    let supervisor_config = SupervisorHostConfig {};
334    registry.register(SupervisorHandler::new(supervisor_config, None));
335
336    // Message server handler - provides inter-actor messaging
337    let message_router = theater_handler_message_server::MessageRouter::new();
338    registry.register(MessageServerHandler::new(None, message_router.clone()));
339
340    // TCP handler - provides raw TCP networking for actors
341    let tcp_config = TcpHandlerConfig {
342        listen: None,
343        max_connections: None,
344        ..Default::default()
345    };
346    registry.register(TcpHandler::new(tcp_config));
347
348    info!("✓ 5 Theater-specific handlers registered");
349    info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
350
351    (registry, message_router)
352}
353
354pub struct TheaterServer {
355    runtime: TheaterRuntime,
356    theater_tx: mpsc::Sender<TheaterCommand>,
357    management_socket: TcpListener,
358    subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
359    // Field to track channel subscriptions
360    channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
361    // Channel for runtime to send channel events back to server
362    #[allow(dead_code)]
363    channel_events_tx: mpsc::Sender<ChannelEvent>,
364    // MessageRouter for external client messaging
365    message_router: theater_handler_message_server::MessageRouter,
366}
367
368impl TheaterServer {
369    // Process channel events and forward them to subscribed clients
370    async fn process_channel_events(
371        mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
372        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
373    ) {
374        while let Some(event) = channel_events_rx.recv().await {
375            match event {
376                ChannelEvent::Message {
377                    channel_id,
378                    sender_id,
379                    message,
380                } => {
381                    tracing::debug!("Received channel message for {}", channel_id);
382                    // Forward to subscribed clients
383                    let subs = channel_subscriptions.lock().await;
384                    if let Some(sub) = subs.get(&channel_id.0) {
385                        let response = ManagementResponse::ChannelMessage {
386                            channel_id: channel_id.0.clone(),
387                            sender_id,
388                            message,
389                        };
390
391                        tracing::debug!("Forwarding channel message to client: {:?}", response);
392
393                        if let Err(e) = sub.client_tx.send(response).await {
394                            tracing::warn!("Failed to forward channel message: {}", e);
395                        } else {
396                            tracing::debug!("Forwarded channel message to client");
397                        }
398                    }
399                }
400                ChannelEvent::Close { channel_id } => {
401                    tracing::debug!("Received channel close event for {}", channel_id);
402                    // Forward to subscribed clients
403                    let mut subs = channel_subscriptions.lock().await;
404                    if let Some(sub) = subs.remove(&channel_id.0) {
405                        let response = ManagementResponse::ChannelClosed {
406                            channel_id: channel_id.0.clone(),
407                        };
408
409                        if let Err(e) = sub.client_tx.send(response).await {
410                            tracing::warn!("Failed to forward channel close event: {}", e);
411                        } else {
412                            tracing::debug!("Forwarded channel close event to client");
413                        }
414                    }
415                }
416            }
417        }
418    }
419
420    pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
421        let (theater_tx, theater_rx) = mpsc::channel(32);
422
423        // Create channel for runtime to send channel events back to server
424        let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
425
426        // Create handler registry with all migrated handlers (root permissions)
427        // Also get the MessageRouter for external client messaging
428        let (handler_registry, message_router) = create_root_handler_registry(theater_tx.clone());
429
430        // Create the runtime with the handler registry
431        let runtime = TheaterRuntime::new(
432            theater_tx.clone(),
433            theater_rx,
434            Some(channel_events_tx.clone()),
435            handler_registry,
436        )
437        .await?;
438        let management_socket = TcpListener::bind(address).await?;
439
440        let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
441
442        // Start task to process channel events
443        let channel_subs_clone = channel_subscriptions.clone();
444        tokio::spawn(async move {
445            Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
446        });
447
448        Ok(Self {
449            runtime,
450            theater_tx,
451            management_socket,
452            subscriptions: Arc::new(Mutex::new(HashMap::new())),
453            channel_subscriptions,
454            channel_events_tx,
455            message_router,
456        })
457    }
458
459    pub async fn run(mut self) -> Result<()> {
460        info!(
461            "Theater server starting on {:?}",
462            self.management_socket.local_addr()?
463        );
464
465        // Start the theater runtime in its own task
466        let runtime_handle = tokio::spawn(async move {
467            match self.runtime.run().await {
468                Ok(_) => Ok(()),
469                Err(e) => {
470                    error!("Theater runtime failed: {}", e);
471                    Err(e)
472                }
473            }
474        });
475
476        // Accept and handle management connections
477        while let Ok((socket, addr)) = self.management_socket.accept().await {
478            info!("New management connection from {}", addr);
479            let runtime_tx = self.theater_tx.clone();
480            let subscriptions = self.subscriptions.clone();
481            let channel_subscriptions = self.channel_subscriptions.clone();
482            let message_router = self.message_router.clone();
483
484            tokio::spawn(async move {
485                if let Err(e) = Self::handle_management_connection(
486                    socket,
487                    runtime_tx,
488                    subscriptions,
489                    channel_subscriptions,
490                    message_router,
491                )
492                .await
493                {
494                    error!("Error handling management connection: {}", e);
495                }
496            });
497        }
498
499        runtime_handle.await??;
500        Ok(())
501    }
502
503    async fn handle_management_connection(
504        socket: TcpStream,
505        runtime_tx: mpsc::Sender<TheaterCommand>,
506        subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
507        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
508        message_router: theater_handler_message_server::MessageRouter,
509    ) -> Result<()> {
510        // Create a channel for sending responses to this client
511        let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
512
513        let codec = FragmentingCodec::new();
514        let framed = Framed::new(socket, codec);
515
516        // Split the framed connection into read and write parts
517        let (mut framed_sink, mut framed_stream) = framed.split();
518
519        // Clone the client_tx for use in the command loop
520        let cmd_client_tx = client_tx.clone();
521
522        // Start a task to forward responses to the client
523        let _response_task = tokio::spawn(async move {
524            while let Some(response) = client_rx.recv().await {
525                match serde_json::to_vec(&response) {
526                    Ok(data) => {
527                        debug!("Serialized response: {} bytes", data.len());
528                        if data.len() > 10 * 1024 * 1024 {
529                            debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
530                        }
531                        if let Err(e) = framed_sink.send(Bytes::from(data)).await {
532                            debug!("Error sending response to client: {}", e);
533                            break;
534                        }
535                    }
536                    Err(e) => {
537                        error!("Error serializing response: {}", e);
538                    }
539                }
540            }
541            debug!("Response forwarder for client closed");
542        });
543
544        // Store active subscriptions for this connection to clean up on disconnect
545        let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
546
547        // Store active channel subscriptions for cleanup
548        let mut connection_channel_subscriptions: Vec<String> = Vec::new();
549
550        // Loop until connection closes or an error occurs
551        'connection: while let Some(msg) = framed_stream.next().await {
552            debug!("Received management message");
553            let msg = match msg {
554                Ok(m) => m,
555                Err(e) => {
556                    error!("Error receiving message: {}", e);
557                    break 'connection;
558                }
559            };
560
561            let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
562                Ok(c) => c,
563                Err(e) => {
564                    error!(
565                        "Error parsing command: {} {}",
566                        e,
567                        String::from_utf8_lossy(&msg)
568                    );
569                    continue;
570                }
571            };
572            debug!("Parsed command: {:?}", cmd);
573
574            // Store the command for reference (used for subscription tracking)
575            let _cmd_clone = cmd.clone();
576
577            let response = match cmd {
578                ManagementCommand::StartActor {
579                    manifest,
580                    initial_state: _initial_state,
581                    parent,
582                    subscribe,
583                } => {
584                    info!("Starting actor from manifest: {:?}", manifest);
585
586                    // Load and parse manifest
587                    let manifest_str = match resolve_reference(&manifest).await {
588                        Ok(bytes) => match String::from_utf8(bytes) {
589                            Ok(s) => s,
590                            Err(e) => {
591                                error!("Invalid manifest encoding: {}", e);
592                                cmd_client_tx
593                                    .send(ManagementResponse::Error {
594                                        error: ManagementError::ActorInitializationError(format!(
595                                            "Invalid manifest encoding: {}",
596                                            e
597                                        )),
598                                    })
599                                    .await
600                                    .ok();
601                                continue;
602                            }
603                        },
604                        Err(e) => {
605                            error!("Failed to load manifest: {}", e);
606                            cmd_client_tx
607                                .send(ManagementResponse::Error {
608                                    error: ManagementError::ActorInitializationError(format!(
609                                        "Failed to load manifest: {}",
610                                        e
611                                    )),
612                                })
613                                .await
614                                .ok();
615                            continue;
616                        }
617                    };
618
619                    let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
620                        Ok(m) => m,
621                        Err(e) => {
622                            error!("Failed to parse manifest: {}", e);
623                            cmd_client_tx
624                                .send(ManagementResponse::Error {
625                                    error: ManagementError::ActorInitializationError(format!(
626                                        "Failed to parse manifest: {}",
627                                        e
628                                    )),
629                                })
630                                .await
631                                .ok();
632                            continue;
633                        }
634                    };
635
636                    // Load wasm bytes
637                    let wasm_bytes = match resolve_reference(&manifest_config.package).await {
638                        Ok(bytes) => bytes,
639                        Err(e) => {
640                            error!("Failed to load WASM: {}", e);
641                            cmd_client_tx
642                                .send(ManagementResponse::Error {
643                                    error: ManagementError::ActorInitializationError(format!(
644                                        "Failed to load WASM: {}",
645                                        e
646                                    )),
647                                })
648                                .await
649                                .ok();
650                            continue;
651                        }
652                    };
653
654                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
655                    debug!("Sending SpawnActor command to runtime");
656                    let supervisor_tx = if parent {
657                        let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
658                        let cmd_client_tx = cmd_client_tx.clone();
659                        tokio::spawn(async move {
660                            while let Some(res) = supervisor_rx.recv().await {
661                                debug!("Received supervisor response: {:?}", res);
662                                if let Err(e) = cmd_client_tx
663                                    .send(ManagementResponse::ActorResult(res))
664                                    .await
665                                {
666                                    error!("Failed to send supervisor response: {}", e);
667                                    break;
668                                }
669                            }
670                        });
671                        Some(supervisor_tx)
672                    } else {
673                        None
674                    };
675                    let subscription_tx = if subscribe {
676                        let (event_tx, mut event_rx) = mpsc::channel(32);
677
678                        // set up a task to forward events to the client
679                        let cmd_client_tx = cmd_client_tx.clone();
680                        tokio::spawn(async move {
681                            while let Some(event) = event_rx.recv().await {
682                                debug!("Received event for subscription");
683                                let response = match event {
684                                    Ok(event) => ManagementResponse::ActorEvent { event },
685                                    Err(_e) => {
686                                        // Actor errors are already captured in the event chain
687                                        // No need to send duplicate error responses
688                                        debug!("Actor error received, but already in event chain");
689                                        continue;
690                                    }
691                                };
692                                if let Err(e) = cmd_client_tx.send(response).await {
693                                    debug!("Failed to forward event to client: {}", e);
694                                    break;
695                                }
696                            }
697                            debug!("Event forwarder for subscription stopped");
698                        });
699
700                        Some(event_tx)
701                    } else {
702                        None
703                    };
704                    match runtime_tx
705                        .send(TheaterCommand::SetupActor {
706                            wasm_bytes,
707                            name: Some(manifest_config.name.clone()),
708                            manifest: Some(manifest_config),
709                            init_state: default_init_state(),
710                            response_tx: cmd_tx,
711                            supervisor_tx,
712                            subscription_tx,
713                        })
714                        .await
715                    {
716                        Ok(_) => {
717                            debug!("SpawnActor command sent to runtime, awaiting response");
718                            match cmd_rx.await {
719                                Ok(result) => match result {
720                                    Ok(actor_id) => {
721                                        info!("Actor started with ID: {:?}", actor_id);
722                                        ManagementResponse::ActorStarted { id: actor_id }
723                                    }
724                                    Err(e) => {
725                                        error!("Runtime failed to start actor: {}", e);
726                                        ManagementResponse::Error {
727                                            error: ManagementError::RuntimeError(format!(
728                                                "Failed to start actor: {}",
729                                                e
730                                            )),
731                                        }
732                                    }
733                                },
734                                Err(e) => {
735                                    error!("Failed to receive spawn response: {}", e);
736                                    ManagementResponse::Error {
737                                        error: ManagementError::CommunicationError(format!(
738                                            "Failed to receive spawn response: {}",
739                                            e
740                                        )),
741                                    }
742                                }
743                            }
744                        }
745                        Err(e) => {
746                            error!("Failed to send SpawnActor command: {}", e);
747                            ManagementResponse::Error {
748                                error: ManagementError::CommunicationError(format!(
749                                    "Failed to send spawn command: {}",
750                                    e
751                                )),
752                            }
753                        }
754                    }
755                }
756                ManagementCommand::StopActor { id } => {
757                    info!("Stopping actor: {:?}", id);
758                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
759                    runtime_tx
760                        .send(TheaterCommand::StopActor {
761                            actor_id: id,
762                            response_tx: cmd_tx,
763                        })
764                        .await?;
765
766                    match cmd_rx.await? {
767                        Ok(_) => {
768                            subscriptions.lock().await.remove(&id);
769                            ManagementResponse::ActorStopped { id }
770                        }
771                        Err(e) => ManagementResponse::Error {
772                            error: ManagementError::RuntimeError(format!(
773                                "Failed to stop actor: {}",
774                                e
775                            )),
776                        },
777                    }
778                }
779                ManagementCommand::TerminateActor { id } => {
780                    info!("Terminating actor: {:?}", id);
781                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
782                    runtime_tx
783                        .send(TheaterCommand::TerminateActor {
784                            actor_id: id,
785                            response_tx: cmd_tx,
786                        })
787                        .await?;
788
789                    match cmd_rx.await? {
790                        Ok(_) => {
791                            subscriptions.lock().await.remove(&id);
792                            ManagementResponse::ActorStopped { id }
793                        }
794                        Err(e) => ManagementResponse::Error {
795                            error: ManagementError::RuntimeError(format!(
796                                "Failed to terminate actor: {}",
797                                e
798                            )),
799                        },
800                    }
801                }
802                ManagementCommand::ListActors => {
803                    debug!("Listing actors");
804                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
805                    runtime_tx
806                        .send(TheaterCommand::GetActors {
807                            response_tx: cmd_tx,
808                        })
809                        .await?;
810
811                    match cmd_rx.await? {
812                        Ok(actors) => {
813                            info!("Found {} actors", actors.len());
814                            ManagementResponse::ActorList { actors }
815                        }
816                        Err(e) => ManagementResponse::Error {
817                            error: ManagementError::RuntimeError(format!(
818                                "Failed to list actors: {}",
819                                e
820                            )),
821                        },
822                    }
823                }
824                ManagementCommand::SubscribeToActor { id } => {
825                    info!("New subscription request for actor: {:?}", id);
826                    let subscription_id = Uuid::new_v4();
827                    let subscription = Subscription {
828                        id: subscription_id,
829                        client_tx: cmd_client_tx.clone(),
830                    };
831
832                    debug!("Subscription created with ID: {}", subscription_id);
833
834                    // Register the subscription in the global map
835                    subscriptions
836                        .lock()
837                        .await
838                        .entry(id)
839                        .or_default()
840                        .insert(subscription);
841
842                    // Set up the event channel for the subscription
843                    let (event_tx, mut event_rx) = mpsc::channel(32);
844                    runtime_tx
845                        .send(TheaterCommand::SubscribeToActor {
846                            actor_id: id,
847                            event_tx,
848                        })
849                        .await
850                        .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
851
852                    // Add to the list of subscriptions for this connection
853                    connection_subscriptions.push((id, subscription_id));
854
855                    // Create a task to forward events to this client
856                    let client_tx_clone = cmd_client_tx.clone();
857                    tokio::spawn(async move {
858                        debug!(
859                            "Starting event forwarder for subscription {}",
860                            subscription_id
861                        );
862                        while let Some(event) = event_rx.recv().await {
863                            debug!("Received event for subscription {}", subscription_id);
864                            let response = match event {
865                                Ok(event) => ManagementResponse::ActorEvent { event },
866                                Err(_e) => {
867                                    // Actor errors are already captured in the event chain
868                                    // No need to send duplicate error responses
869                                    debug!("Actor error received, but already in event chain");
870                                    continue;
871                                }
872                            };
873                            if let Err(e) = client_tx_clone.send(response).await {
874                                debug!("Failed to forward event to client: {}", e);
875                                break;
876                            }
877                        }
878                        debug!(
879                            "Event forwarder for subscription {} stopped",
880                            subscription_id
881                        );
882                    });
883
884                    ManagementResponse::Subscribed {
885                        id,
886                        subscription_id,
887                    }
888                }
889                ManagementCommand::UnsubscribeFromActor {
890                    id,
891                    subscription_id,
892                } => {
893                    debug!(
894                        "Removing subscription {} for actor {:?}",
895                        subscription_id, id
896                    );
897
898                    // Remove subscription from the tracking list for this connection
899                    connection_subscriptions
900                        .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
901
902                    // Remove from the global subscriptions map
903                    let mut subs = subscriptions.lock().await;
904                    if let Some(actor_subs) = subs.get_mut(&id) {
905                        actor_subs.retain(|sub| sub.id != subscription_id);
906
907                        // Remove the entry if no subscriptions remain
908                        if actor_subs.is_empty() {
909                            subs.remove(&id);
910                        }
911                    }
912
913                    debug!("Subscription removed");
914                    ManagementResponse::Unsubscribed { id }
915                }
916                ManagementCommand::SendActorMessage { id, data } => {
917                    info!("Sending message to actor: {:?}", id);
918
919                    // Create response channel for routing
920                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
921
922                    // Create ActorMessage
923                    let message = ActorMessage::Send(ActorSend { data });
924
925                    // Route via MessageRouter
926                    match message_router
927                        .route_message(theater::messages::MessageCommand::SendMessage {
928                            target_id: id,
929                            message,
930                            response_tx,
931                        })
932                        .await
933                    {
934                        Ok(_) => {
935                            // Wait for routing result
936                            match response_rx.await {
937                                Ok(Ok(())) => {
938                                    info!("Message sent successfully to actor: {:?}", id);
939                                    ManagementResponse::SentMessage { id }
940                                }
941                                Ok(Err(e)) => {
942                                    error!("Failed to send message to actor: {}", e);
943                                    ManagementResponse::Error {
944                                        error: ManagementError::RuntimeError(format!(
945                                            "Failed to send: {}",
946                                            e
947                                        )),
948                                    }
949                                }
950                                Err(e) => {
951                                    error!("Failed to receive routing response: {}", e);
952                                    ManagementResponse::Error {
953                                        error: ManagementError::CommunicationError(format!(
954                                            "Failed to receive routing response: {}",
955                                            e
956                                        )),
957                                    }
958                                }
959                            }
960                        }
961                        Err(e) => {
962                            error!("Failed to route message: {}", e);
963                            ManagementResponse::Error {
964                                error: ManagementError::RuntimeError(format!(
965                                    "Failed to route message: {}",
966                                    e
967                                )),
968                            }
969                        }
970                    }
971                }
972                ManagementCommand::RequestActorMessage { id, data } => {
973                    info!("Requesting message from actor: {:?}", id);
974
975                    // Create channels for request-response pattern
976                    let (route_tx, route_rx) = tokio::sync::oneshot::channel();
977                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
978
979                    // Create ActorMessage with response channel embedded
980                    let message = ActorMessage::Request(ActorRequest { data, response_tx });
981
982                    // Route via MessageRouter
983                    match message_router
984                        .route_message(theater::messages::MessageCommand::SendMessage {
985                            target_id: id,
986                            message,
987                            response_tx: route_tx,
988                        })
989                        .await
990                    {
991                        Ok(_) => {
992                            // Wait for routing to complete
993                            match route_rx.await {
994                                Ok(Ok(())) => {
995                                    // Routing succeeded, now wait for actor's response
996                                    match response_rx.await {
997                                        Ok(response_data) => {
998                                            info!("Received response from actor: {:?}", id);
999                                            ManagementResponse::RequestedMessage {
1000                                                id,
1001                                                message: response_data,
1002                                            }
1003                                        }
1004                                        Err(e) => {
1005                                            error!("Actor didn't respond: {}", e);
1006                                            ManagementResponse::Error {
1007                                                error: ManagementError::RuntimeError(format!(
1008                                                    "Actor didn't respond: {}",
1009                                                    e
1010                                                )),
1011                                            }
1012                                        }
1013                                    }
1014                                }
1015                                Ok(Err(e)) => {
1016                                    error!("Failed to route request to actor: {}", e);
1017                                    ManagementResponse::Error {
1018                                        error: ManagementError::RuntimeError(format!(
1019                                            "Failed to route: {}",
1020                                            e
1021                                        )),
1022                                    }
1023                                }
1024                                Err(e) => {
1025                                    error!("Failed to receive routing response: {}", e);
1026                                    ManagementResponse::Error {
1027                                        error: ManagementError::CommunicationError(format!(
1028                                            "Failed to receive routing response: {}",
1029                                            e
1030                                        )),
1031                                    }
1032                                }
1033                            }
1034                        }
1035                        Err(e) => {
1036                            error!("Failed to route request: {}", e);
1037                            ManagementResponse::Error {
1038                                error: ManagementError::RuntimeError(format!(
1039                                    "Failed to route request: {}",
1040                                    e
1041                                )),
1042                            }
1043                        }
1044                    }
1045                }
1046                ManagementCommand::GetActorManifest { id } => {
1047                    info!("Getting manifest for actor: {:?}", id);
1048                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1049                    runtime_tx
1050                        .send(TheaterCommand::GetActorManifest {
1051                            actor_id: id,
1052                            response_tx: cmd_tx,
1053                        })
1054                        .await?;
1055
1056                    let manifest = cmd_rx.await?;
1057                    ManagementResponse::ActorManifest {
1058                        id,
1059                        manifest: manifest?,
1060                    }
1061                }
1062                ManagementCommand::GetActorStatus { id } => {
1063                    info!("Getting status for actor: {:?}", id);
1064                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1065                    runtime_tx
1066                        .send(TheaterCommand::GetActorStatus {
1067                            actor_id: id,
1068                            response_tx: cmd_tx,
1069                        })
1070                        .await?;
1071
1072                    let status = cmd_rx.await?;
1073                    ManagementResponse::ActorStatus {
1074                        id,
1075                        status: status?,
1076                    }
1077                }
1078                ManagementCommand::RestartActor { id } => {
1079                    info!("Restarting actor: {:?}", id);
1080                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1081                    runtime_tx
1082                        .send(TheaterCommand::RestartActor {
1083                            actor_id: id,
1084                            response_tx: cmd_tx,
1085                        })
1086                        .await?;
1087
1088                    match cmd_rx.await? {
1089                        Ok(_) => ManagementResponse::Restarted { id },
1090                        Err(e) => ManagementResponse::Error {
1091                            error: ManagementError::RuntimeError(format!(
1092                                "Failed to restart actor: {}",
1093                                e
1094                            )),
1095                        },
1096                    }
1097                }
1098                ManagementCommand::GetActorState { id } => {
1099                    info!("Getting state for actor: {:?}", id);
1100                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1101                    runtime_tx
1102                        .send(TheaterCommand::GetActorState {
1103                            actor_id: id,
1104                            response_tx: cmd_tx,
1105                        })
1106                        .await?;
1107
1108                    let state = cmd_rx.await?;
1109                    ManagementResponse::ActorState { id, state: state? }
1110                }
1111                ManagementCommand::GetActorMetrics { id } => {
1112                    info!("Getting metrics for actor: {:?}", id);
1113                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1114                    runtime_tx
1115                        .send(TheaterCommand::GetActorMetrics {
1116                            actor_id: id,
1117                            response_tx: cmd_tx,
1118                        })
1119                        .await?;
1120
1121                    let metrics = cmd_rx.await?;
1122                    ManagementResponse::ActorMetrics {
1123                        id,
1124                        metrics: serde_json::to_value(metrics?)?,
1125                    }
1126                }
1127                ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1128                    // TODO: Re-implement actor package updates
1129                    ManagementResponse::Error {
1130                        error: ManagementError::RuntimeError(
1131                            "UpdateActorPackage not yet implemented".to_string(),
1132                        ),
1133                    }
1134                }
1135                // Handle channel management commands
1136                ManagementCommand::OpenChannel {
1137                    actor_id,
1138                    initial_message,
1139                } => {
1140                    info!("Opening channel to actor: {:?}", actor_id);
1141
1142                    // Create a response channel
1143                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1144
1145                    // Generate a channel ID
1146                    let client_id = ChannelParticipant::External;
1147                    let channel_id = ChannelId::new(&client_id, &actor_id);
1148                    let channel_id_str = channel_id.0.clone();
1149
1150                    // Send the channel open command via MessageRouter
1151                    message_router
1152                        .route_message(theater::messages::MessageCommand::OpenChannel {
1153                            initiator_id: client_id.clone(),
1154                            target_id: actor_id.clone(),
1155                            channel_id: channel_id.clone(),
1156                            initial_message,
1157                            response_tx,
1158                        })
1159                        .await
1160                        .map_err(|e| {
1161                            anyhow::anyhow!("Failed to send channel open command: {}", e)
1162                        })?;
1163
1164                    // Wait for the response
1165                    match response_rx.await {
1166                        Ok(result) => {
1167                            match result {
1168                                Ok(accepted) => {
1169                                    if accepted {
1170                                        // Channel opened successfully
1171                                        info!("Channel opened successfully: {}", channel_id_str);
1172
1173                                        // Register the channel subscription to receive messages
1174                                        let channel_sub = ChannelSubscription {
1175                                            channel_id: channel_id_str.clone(),
1176                                            initiator_id: client_id.clone(),
1177                                            target_id: actor_id.clone(),
1178                                            client_tx: cmd_client_tx.clone(),
1179                                        };
1180
1181                                        channel_subscriptions
1182                                            .lock()
1183                                            .await
1184                                            .insert(channel_id_str.clone(), channel_sub);
1185
1186                                        // Track this channel for cleanup on disconnect
1187                                        connection_channel_subscriptions
1188                                            .push(channel_id_str.clone());
1189
1190                                        ManagementResponse::ChannelOpened {
1191                                            channel_id: channel_id_str,
1192                                            actor_id,
1193                                        }
1194                                    } else {
1195                                        // Channel rejected by target
1196                                        ManagementResponse::Error {
1197                                            error: ManagementError::ChannelRejected,
1198                                        }
1199                                    }
1200                                }
1201                                Err(e) => ManagementResponse::Error {
1202                                    error: ManagementError::RuntimeError(format!(
1203                                        "Error opening channel: {}",
1204                                        e
1205                                    )),
1206                                },
1207                            }
1208                        }
1209                        Err(e) => ManagementResponse::Error {
1210                            error: ManagementError::CommunicationError(format!(
1211                                "Failed to receive channel open response: {}",
1212                                e
1213                            )),
1214                        },
1215                    }
1216                }
1217                ManagementCommand::SendOnChannel {
1218                    channel_id,
1219                    message,
1220                } => {
1221                    info!("Sending message on channel: {}", channel_id);
1222
1223                    // Create response channel
1224                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1225
1226                    // Parse the channel ID
1227                    let channel_id_parsed = ChannelId(channel_id.clone());
1228
1229                    // Send the message on the channel via MessageRouter
1230                    let sender_id = ChannelParticipant::External;
1231                    match message_router
1232                        .route_message(theater::messages::MessageCommand::ChannelMessage {
1233                            channel_id: channel_id_parsed,
1234                            sender_id,
1235                            message,
1236                            response_tx,
1237                        })
1238                        .await
1239                    {
1240                        Ok(_) => {
1241                            // Wait for routing result
1242                            match response_rx.await {
1243                                Ok(Ok(())) => {
1244                                    info!("Message sent successfully on channel: {}", channel_id);
1245                                    ManagementResponse::MessageSent { channel_id }
1246                                }
1247                                Ok(Err(e)) => {
1248                                    error!("Failed to send on channel: {}", e);
1249                                    ManagementResponse::Error {
1250                                        error: ManagementError::RuntimeError(format!(
1251                                            "Failed to send on channel: {}",
1252                                            e
1253                                        )),
1254                                    }
1255                                }
1256                                Err(e) => {
1257                                    error!("Failed to receive channel send response: {}", e);
1258                                    ManagementResponse::Error {
1259                                        error: ManagementError::CommunicationError(format!(
1260                                            "Failed to receive channel send response: {}",
1261                                            e
1262                                        )),
1263                                    }
1264                                }
1265                            }
1266                        }
1267                        Err(e) => {
1268                            error!("Failed to route channel message: {}", e);
1269                            ManagementResponse::Error {
1270                                error: ManagementError::RuntimeError(format!(
1271                                    "Failed to route channel message: {}",
1272                                    e
1273                                )),
1274                            }
1275                        }
1276                    }
1277                }
1278                ManagementCommand::CloseChannel { channel_id } => {
1279                    info!("Closing channel: {}", channel_id);
1280
1281                    // Create response channel
1282                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1283
1284                    // Parse the channel ID
1285                    let channel_id_parsed = ChannelId(channel_id.clone());
1286
1287                    // Close the channel via MessageRouter
1288                    let sender_id = ChannelParticipant::External;
1289                    match message_router
1290                        .route_message(theater::messages::MessageCommand::ChannelClose {
1291                            channel_id: channel_id_parsed,
1292                            sender_id,
1293                            response_tx,
1294                        })
1295                        .await
1296                    {
1297                        Ok(_) => {
1298                            // Wait for routing result
1299                            match response_rx.await {
1300                                Ok(Ok(())) => {
1301                                    info!("Channel closed successfully: {}", channel_id);
1302
1303                                    // Remove from channel subscriptions
1304                                    channel_subscriptions.lock().await.remove(&channel_id);
1305                                    connection_channel_subscriptions.retain(|id| id != &channel_id);
1306
1307                                    ManagementResponse::ChannelClosed { channel_id }
1308                                }
1309                                Ok(Err(e)) => {
1310                                    error!("Failed to close channel: {}", e);
1311                                    ManagementResponse::Error {
1312                                        error: ManagementError::RuntimeError(format!(
1313                                            "Failed to close channel: {}",
1314                                            e
1315                                        )),
1316                                    }
1317                                }
1318                                Err(e) => {
1319                                    error!("Failed to receive channel close response: {}", e);
1320                                    ManagementResponse::Error {
1321                                        error: ManagementError::CommunicationError(format!(
1322                                            "Failed to receive channel close response: {}",
1323                                            e
1324                                        )),
1325                                    }
1326                                }
1327                            }
1328                        }
1329                        Err(e) => {
1330                            error!("Failed to route channel close: {}", e);
1331                            ManagementResponse::Error {
1332                                error: ManagementError::RuntimeError(format!(
1333                                    "Failed to route channel close: {}",
1334                                    e
1335                                )),
1336                            }
1337                        }
1338                    }
1339                }
1340                ManagementCommand::NewStore {} => {
1341                    info!("Creating new store");
1342                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1343                    runtime_tx
1344                        .send(TheaterCommand::NewStore {
1345                            response_tx: cmd_tx,
1346                        })
1347                        .await?;
1348
1349                    let store_id = cmd_rx.await?;
1350                    ManagementResponse::StoreCreated {
1351                        store_id: store_id?.id,
1352                    }
1353                }
1354            };
1355
1356            debug!("Sending response: {:?}", response);
1357            if let Err(e) = client_tx.send(response).await {
1358                error!("Failed to send response: {}", e);
1359                break;
1360            }
1361            debug!("Response sent");
1362        }
1363
1364        // Clean up all subscriptions for this connection
1365        debug!(
1366            "Connection closed, cleaning up {} subscriptions",
1367            connection_subscriptions.len()
1368        );
1369        let mut subs = subscriptions.lock().await;
1370
1371        for (actor_id, sub_id) in connection_subscriptions {
1372            if let Some(actor_subs) = subs.get_mut(&actor_id) {
1373                actor_subs.retain(|sub| sub.id != sub_id);
1374
1375                // Remove the entry if no subscriptions remain
1376                if actor_subs.is_empty() {
1377                    subs.remove(&actor_id);
1378                }
1379            }
1380        }
1381
1382        // Clean up channel subscriptions
1383        debug!(
1384            "Connection closed, cleaning up {} channel subscriptions",
1385            connection_channel_subscriptions.len()
1386        );
1387        let mut channel_subs = channel_subscriptions.lock().await;
1388
1389        for channel_id in connection_channel_subscriptions {
1390            channel_subs.remove(&channel_id);
1391        }
1392
1393        debug!("Cleaned up all subscriptions for the connection");
1394        Ok(())
1395    }
1396}