theater_server/
server.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9    ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10    ChannelParticipant,
11};
12use theater::{ActorError, ChainEvent, ManifestConfig};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::sync::{mpsc, Mutex};
15use tokio_util::codec::Framed;
16use tracing::{debug, error, info};
17use uuid::Uuid;
18
19use theater::id::TheaterId;
20use theater::messages::{ChannelId, TheaterCommand};
21use theater::theater_runtime::TheaterRuntime;
22use theater::TheaterRuntimeError;
23
24use crate::fragmenting_codec::FragmentingCodec;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub enum ManagementCommand {
28    StartActor {
29        manifest: String,
30        initial_state: Option<Vec<u8>>,
31        parent: bool,
32        subscribe: bool,
33    },
34    StopActor {
35        id: TheaterId,
36    },
37    TerminateActor {
38        id: TheaterId,
39    },
40    ListActors,
41    SubscribeToActor {
42        id: TheaterId,
43    },
44    UnsubscribeFromActor {
45        id: TheaterId,
46        subscription_id: Uuid,
47    },
48    SendActorMessage {
49        id: TheaterId,
50        data: Vec<u8>,
51    },
52    RequestActorMessage {
53        id: TheaterId,
54        data: Vec<u8>,
55    },
56    GetActorManifest {
57        id: TheaterId,
58    },
59    GetActorStatus {
60        id: TheaterId,
61    },
62    RestartActor {
63        id: TheaterId,
64    },
65    GetActorState {
66        id: TheaterId,
67    },
68    GetActorEvents {
69        id: TheaterId,
70    },
71    GetActorMetrics {
72        id: TheaterId,
73    },
74    UpdateActorComponent {
75        id: TheaterId,
76        component: String,
77    },
78    // Channel management commands
79    OpenChannel {
80        actor_id: ChannelParticipant,
81        initial_message: Vec<u8>,
82    },
83    SendOnChannel {
84        channel_id: String,
85        message: Vec<u8>,
86    },
87    CloseChannel {
88        channel_id: String,
89    },
90
91    // Store commands
92    NewStore {},
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub enum ManagementResponse {
97    ActorStarted {
98        id: TheaterId,
99    },
100    ActorStopped {
101        id: TheaterId,
102    },
103    ActorList {
104        actors: Vec<(TheaterId, String)>,
105    },
106    Subscribed {
107        id: TheaterId,
108        subscription_id: Uuid,
109    },
110    Unsubscribed {
111        id: TheaterId,
112    },
113    ActorEvent {
114        event: ChainEvent,
115    },
116    ActorResult(ActorResult),
117    ActorError {
118        error: ActorError,
119    },
120    Error {
121        error: ManagementError,
122    },
123    RequestedMessage {
124        id: TheaterId,
125        message: Vec<u8>,
126    },
127    SentMessage {
128        id: TheaterId,
129    },
130    ActorStatus {
131        id: TheaterId,
132        status: ActorStatus,
133    },
134    Restarted {
135        id: TheaterId,
136    },
137    ActorManifest {
138        id: TheaterId,
139        manifest: ManifestConfig,
140    },
141    ActorState {
142        id: TheaterId,
143        state: Option<Vec<u8>>,
144    },
145    ActorEvents {
146        id: TheaterId,
147        events: Vec<ChainEvent>,
148    },
149    ActorMetrics {
150        id: TheaterId,
151        metrics: serde_json::Value,
152    },
153    ActorComponentUpdated {
154        id: TheaterId,
155    },
156    // Channel management responses
157    ChannelOpened {
158        channel_id: String,
159        actor_id: ChannelParticipant,
160    },
161    MessageSent {
162        channel_id: String,
163    },
164    ChannelMessage {
165        channel_id: String,
166        sender_id: ChannelParticipant,
167        message: Vec<u8>,
168    },
169    ChannelClosed {
170        channel_id: String,
171    },
172
173    // Store responses
174    StoreCreated {
175        store_id: String,
176    },
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub enum ManagementError {
181    // Actor-related errors
182    ActorNotFound,
183    ActorAlreadyExists,
184    ActorNotRunning,
185    ActorError(String),
186
187    // Channel-related errors
188    ChannelNotFound,
189    ChannelClosed,
190    ChannelRejected,
191
192    // Store-related errors
193    StoreError(String),
194
195    // Communication errors
196    CommunicationError(String),
197
198    // Request handling errors
199    InvalidRequest(String),
200    Timeout,
201
202    // System errors
203    RuntimeError(String),
204    InternalError(String),
205
206    // Serialization/deserialization errors
207    SerializationError(String),
208}
209
210// Allow converting from TheaterRuntimeError to ManagementError
211impl From<TheaterRuntimeError> for ManagementError {
212    fn from(err: TheaterRuntimeError) -> Self {
213        match err {
214            TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
215            TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
216            TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
217            TheaterRuntimeError::ActorOperationFailed(msg) => {
218                ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
219            }
220            TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
221            TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
222            TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
223            TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
224            TheaterRuntimeError::SerializationError(msg) => {
225                ManagementError::SerializationError(msg)
226            }
227            TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
228        }
229    }
230}
231
232// Implement Display for ManagementError to provide better error messages
233impl std::fmt::Display for ManagementError {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        match self {
236            ManagementError::ActorNotFound => write!(f, "Actor not found"),
237            ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
238            ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
239            ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
240            ManagementError::ChannelNotFound => write!(f, "Channel not found"),
241            ManagementError::ChannelClosed => write!(f, "Channel is closed"),
242            ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
243            ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
244            ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
245            ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
246            ManagementError::Timeout => write!(f, "Operation timed out"),
247            ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
248            ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
249            ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
250        }
251    }
252}
253
254// Implement Error trait for ManagementError
255impl std::error::Error for ManagementError {}
256
257#[derive(Debug)]
258#[allow(dead_code)]
259struct Subscription {
260    id: Uuid,
261    client_tx: mpsc::Sender<ManagementResponse>,
262}
263
264impl Eq for Subscription {}
265impl PartialEq for Subscription {
266    fn eq(&self, other: &Self) -> bool {
267        self.id == other.id
268    }
269}
270impl std::hash::Hash for Subscription {
271    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
272        self.id.hash(state);
273    }
274}
275
276// ChannelEvent is now imported from theater::ChannelEvent
277
278// Structure to track active channel subscriptions
279#[derive(Debug)]
280#[allow(dead_code)]
281struct ChannelSubscription {
282    channel_id: String,
283    initiator_id: ChannelParticipant,
284    target_id: ChannelParticipant,
285    client_tx: mpsc::Sender<ManagementResponse>,
286}
287
288pub struct TheaterServer {
289    runtime: TheaterRuntime,
290    theater_tx: mpsc::Sender<TheaterCommand>,
291    management_socket: TcpListener,
292    subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
293    // Field to track channel subscriptions
294    channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
295    // Channel for runtime to send channel events back to server
296    #[allow(dead_code)]
297    channel_events_tx: mpsc::Sender<ChannelEvent>,
298}
299
300impl TheaterServer {
301    // Process channel events and forward them to subscribed clients
302    async fn process_channel_events(
303        mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
304        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
305    ) {
306        while let Some(event) = channel_events_rx.recv().await {
307            match event {
308                ChannelEvent::Message {
309                    channel_id,
310                    sender_id,
311                    message,
312                } => {
313                    tracing::debug!("Received channel message for {}", channel_id);
314                    // Forward to subscribed clients
315                    let subs = channel_subscriptions.lock().await;
316                    if let Some(sub) = subs.get(&channel_id.0) {
317                        let response = ManagementResponse::ChannelMessage {
318                            channel_id: channel_id.0.clone(),
319                            sender_id,
320                            message,
321                        };
322
323                        tracing::debug!("Forwarding channel message to client: {:?}", response);
324
325                        if let Err(e) = sub.client_tx.send(response).await {
326                            tracing::warn!("Failed to forward channel message: {}", e);
327                        } else {
328                            tracing::debug!("Forwarded channel message to client");
329                        }
330                    }
331                }
332                ChannelEvent::Close { channel_id } => {
333                    tracing::debug!("Received channel close event for {}", channel_id);
334                    // Forward to subscribed clients
335                    let mut subs = channel_subscriptions.lock().await;
336                    if let Some(sub) = subs.remove(&channel_id.0) {
337                        let response = ManagementResponse::ChannelClosed {
338                            channel_id: channel_id.0.clone(),
339                        };
340
341                        if let Err(e) = sub.client_tx.send(response).await {
342                            tracing::warn!("Failed to forward channel close event: {}", e);
343                        } else {
344                            tracing::debug!("Forwarded channel close event to client");
345                        }
346                    }
347                }
348            }
349        }
350    }
351
352    pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
353        let (theater_tx, theater_rx) = mpsc::channel(32);
354
355        // Create channel for runtime to send channel events back to server
356        let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
357
358        // Pass channel_events_tx to runtime during initialization
359        let runtime = TheaterRuntime::new(
360            theater_tx.clone(),
361            theater_rx,
362            Some(channel_events_tx.clone()),
363            theater::config::permissions::HandlerPermission::root(), // Root permissions for server
364        )
365        .await?;
366        let management_socket = TcpListener::bind(address).await?;
367
368        let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
369
370        // Start task to process channel events
371        let channel_subs_clone = channel_subscriptions.clone();
372        tokio::spawn(async move {
373            Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
374        });
375
376        Ok(Self {
377            runtime,
378            theater_tx,
379            management_socket,
380            subscriptions: Arc::new(Mutex::new(HashMap::new())),
381            channel_subscriptions,
382            channel_events_tx,
383        })
384    }
385
386    pub async fn run(mut self) -> Result<()> {
387        info!(
388            "Theater server starting on {:?}",
389            self.management_socket.local_addr()?
390        );
391
392        // Start the theater runtime in its own task
393        let runtime_handle = tokio::spawn(async move {
394            match self.runtime.run().await {
395                Ok(_) => Ok(()),
396                Err(e) => {
397                    error!("Theater runtime failed: {}", e);
398                    Err(e)
399                }
400            }
401        });
402
403        // Accept and handle management connections
404        while let Ok((socket, addr)) = self.management_socket.accept().await {
405            info!("New management connection from {}", addr);
406            let runtime_tx = self.theater_tx.clone();
407            let subscriptions = self.subscriptions.clone();
408            let channel_subscriptions = self.channel_subscriptions.clone();
409
410            tokio::spawn(async move {
411                if let Err(e) = Self::handle_management_connection(
412                    socket,
413                    runtime_tx,
414                    subscriptions,
415                    channel_subscriptions,
416                )
417                .await
418                {
419                    error!("Error handling management connection: {}", e);
420                }
421            });
422        }
423
424        runtime_handle.await??;
425        Ok(())
426    }
427
428    async fn handle_management_connection(
429        socket: TcpStream,
430        runtime_tx: mpsc::Sender<TheaterCommand>,
431        subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
432        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
433    ) -> Result<()> {
434        // Create a channel for sending responses to this client
435        let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
436
437        let codec = FragmentingCodec::new();
438        let framed = Framed::new(socket, codec);
439
440        // Split the framed connection into read and write parts
441        let (mut framed_sink, mut framed_stream) = framed.split();
442
443        // Clone the client_tx for use in the command loop
444        let cmd_client_tx = client_tx.clone();
445
446        // Start a task to forward responses to the client
447        let _response_task = tokio::spawn(async move {
448            while let Some(response) = client_rx.recv().await {
449                match serde_json::to_vec(&response) {
450                    Ok(data) => {
451                        if let Err(e) = framed_sink.send(Bytes::from(data)).await {
452                            debug!("Error sending response to client: {}", e);
453                            break;
454                        }
455                    }
456                    Err(e) => {
457                        error!("Error serializing response: {}", e);
458                    }
459                }
460            }
461            debug!("Response forwarder for client closed");
462        });
463
464        // Store active subscriptions for this connection to clean up on disconnect
465        let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
466
467        // Store active channel subscriptions for cleanup
468        let mut connection_channel_subscriptions: Vec<String> = Vec::new();
469
470        // Loop until connection closes or an error occurs
471        'connection: while let Some(msg) = framed_stream.next().await {
472            debug!("Received management message");
473            let msg = match msg {
474                Ok(m) => m,
475                Err(e) => {
476                    error!("Error receiving message: {}", e);
477                    break 'connection;
478                }
479            };
480
481            let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
482                Ok(c) => c,
483                Err(e) => {
484                    error!(
485                        "Error parsing command: {} {}",
486                        e,
487                        String::from_utf8_lossy(&msg)
488                    );
489                    continue;
490                }
491            };
492            debug!("Parsed command: {:?}", cmd);
493
494            // Store the command for reference (used for subscription tracking)
495            let _cmd_clone = cmd.clone();
496
497            let response = match cmd {
498                ManagementCommand::StartActor {
499                    manifest,
500                    initial_state,
501                    parent,
502                    subscribe,
503                } => {
504                    info!("Starting actor from manifest: {:?}", manifest);
505                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
506                    debug!("Sending SpawnActor command to runtime");
507                    let supervisor_tx = if parent {
508                        let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
509                        let cmd_client_tx = cmd_client_tx.clone();
510                        tokio::spawn(async move {
511                            while let Some(res) = supervisor_rx.recv().await {
512                                debug!("Received supervisor response: {:?}", res);
513                                if let Err(e) = cmd_client_tx
514                                    .send(ManagementResponse::ActorResult(res))
515                                    .await
516                                {
517                                    error!("Failed to send supervisor response: {}", e);
518                                    break;
519                                }
520                            }
521                        });
522                        Some(supervisor_tx)
523                    } else {
524                        None
525                    };
526                    let subscription_tx = if subscribe {
527                        let (event_tx, mut event_rx) = mpsc::channel(32);
528
529                        // set up a task to forward events to the client
530                        let cmd_client_tx = cmd_client_tx.clone();
531                        tokio::spawn(async move {
532                            while let Some(event) = event_rx.recv().await {
533                                debug!("Received event for subscription");
534                                let response = match event {
535                                    Ok(event) => ManagementResponse::ActorEvent { event },
536                                    Err(e) => ManagementResponse::ActorError { error: e },
537                                };
538                                if let Err(e) = cmd_client_tx.send(response).await {
539                                    debug!("Failed to forward event to client: {}", e);
540                                    break;
541                                }
542                            }
543                            debug!("Event forwarder for subscription stopped");
544                        });
545
546                        Some(event_tx)
547                    } else {
548                        None
549                    };
550                    match runtime_tx
551                        .send(TheaterCommand::SpawnActor {
552                            manifest_path: manifest.clone(),
553                            init_bytes: initial_state,
554                            response_tx: cmd_tx,
555                            parent_id: None,
556                            supervisor_tx,
557                            subscription_tx,
558                        })
559                        .await
560                    {
561                        Ok(_) => {
562                            debug!("SpawnActor command sent to runtime, awaiting response");
563                            match cmd_rx.await {
564                                Ok(result) => match result {
565                                    Ok(actor_id) => {
566                                        info!("Actor started with ID: {:?}", actor_id);
567                                        ManagementResponse::ActorStarted { id: actor_id }
568                                    }
569                                    Err(e) => {
570                                        error!("Runtime failed to start actor: {}", e);
571                                        ManagementResponse::Error {
572                                            error: ManagementError::RuntimeError(format!(
573                                                "Failed to start actor: {}",
574                                                e
575                                            )),
576                                        }
577                                    }
578                                },
579                                Err(e) => {
580                                    error!("Failed to receive spawn response: {}", e);
581                                    ManagementResponse::Error {
582                                        error: ManagementError::CommunicationError(format!(
583                                            "Failed to receive spawn response: {}",
584                                            e
585                                        )),
586                                    }
587                                }
588                            }
589                        }
590                        Err(e) => {
591                            error!("Failed to send SpawnActor command: {}", e);
592                            ManagementResponse::Error {
593                                error: ManagementError::CommunicationError(format!(
594                                    "Failed to send spawn command: {}",
595                                    e
596                                )),
597                            }
598                        }
599                    }
600                }
601                ManagementCommand::StopActor { id } => {
602                    info!("Stopping actor: {:?}", id);
603                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
604                    runtime_tx
605                        .send(TheaterCommand::StopActor {
606                            actor_id: id.clone(),
607                            response_tx: cmd_tx,
608                        })
609                        .await?;
610
611                    match cmd_rx.await? {
612                        Ok(_) => {
613                            subscriptions.lock().await.remove(&id);
614                            ManagementResponse::ActorStopped { id }
615                        }
616                        Err(e) => ManagementResponse::Error {
617                            error: ManagementError::RuntimeError(format!(
618                                "Failed to stop actor: {}",
619                                e
620                            )),
621                        },
622                    }
623                }
624                ManagementCommand::TerminateActor { id } => {
625                    info!("Terminating actor: {:?}", id);
626                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
627                    runtime_tx
628                        .send(TheaterCommand::TerminateActor {
629                            actor_id: id.clone(),
630                            response_tx: cmd_tx,
631                        })
632                        .await?;
633
634                    match cmd_rx.await? {
635                        Ok(_) => {
636                            subscriptions.lock().await.remove(&id);
637                            ManagementResponse::ActorStopped { id }
638                        }
639                        Err(e) => ManagementResponse::Error {
640                            error: ManagementError::RuntimeError(format!(
641                                "Failed to terminate actor: {}",
642                                e
643                            )),
644                        },
645                    }
646                }
647                ManagementCommand::ListActors => {
648                    debug!("Listing actors");
649                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
650                    runtime_tx
651                        .send(TheaterCommand::GetActors {
652                            response_tx: cmd_tx,
653                        })
654                        .await?;
655
656                    match cmd_rx.await? {
657                        Ok(actors) => {
658                            info!("Found {} actors", actors.len());
659                            ManagementResponse::ActorList { actors }
660                        }
661                        Err(e) => ManagementResponse::Error {
662                            error: ManagementError::RuntimeError(format!(
663                                "Failed to list actors: {}",
664                                e
665                            )),
666                        },
667                    }
668                }
669                ManagementCommand::SubscribeToActor { id } => {
670                    info!("New subscription request for actor: {:?}", id);
671                    let subscription_id = Uuid::new_v4();
672                    let subscription = Subscription {
673                        id: subscription_id,
674                        client_tx: cmd_client_tx.clone(),
675                    };
676
677                    debug!("Subscription created with ID: {}", subscription_id);
678
679                    // Register the subscription in the global map
680                    subscriptions
681                        .lock()
682                        .await
683                        .entry(id.clone())
684                        .or_default()
685                        .insert(subscription);
686
687                    // Set up the event channel for the subscription
688                    let (event_tx, mut event_rx) = mpsc::channel(32);
689                    runtime_tx
690                        .send(TheaterCommand::SubscribeToActor {
691                            actor_id: id.clone(),
692                            event_tx,
693                        })
694                        .await
695                        .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
696
697                    // Add to the list of subscriptions for this connection
698                    connection_subscriptions.push((id.clone(), subscription_id));
699
700                    // Create a task to forward events to this client
701                    let client_tx_clone = cmd_client_tx.clone();
702                    tokio::spawn(async move {
703                        debug!(
704                            "Starting event forwarder for subscription {}",
705                            subscription_id
706                        );
707                        while let Some(event) = event_rx.recv().await {
708                            debug!("Received event for subscription {}", subscription_id);
709                            let response = match event {
710                                Ok(event) => ManagementResponse::ActorEvent { event },
711                                Err(e) => ManagementResponse::ActorError { error: e },
712                            };
713                            if let Err(e) = client_tx_clone.send(response).await {
714                                debug!("Failed to forward event to client: {}", e);
715                                break;
716                            }
717                        }
718                        debug!(
719                            "Event forwarder for subscription {} stopped",
720                            subscription_id
721                        );
722                    });
723
724                    ManagementResponse::Subscribed {
725                        id,
726                        subscription_id,
727                    }
728                }
729                ManagementCommand::UnsubscribeFromActor {
730                    id,
731                    subscription_id,
732                } => {
733                    debug!(
734                        "Removing subscription {} for actor {:?}",
735                        subscription_id, id
736                    );
737
738                    // Remove subscription from the tracking list for this connection
739                    connection_subscriptions
740                        .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
741
742                    // Remove from the global subscriptions map
743                    let mut subs = subscriptions.lock().await;
744                    if let Some(actor_subs) = subs.get_mut(&id) {
745                        actor_subs.retain(|sub| sub.id != subscription_id);
746
747                        // Remove the entry if no subscriptions remain
748                        if actor_subs.is_empty() {
749                            subs.remove(&id);
750                        }
751                    }
752
753                    debug!("Subscription removed");
754                    ManagementResponse::Unsubscribed { id }
755                }
756                ManagementCommand::SendActorMessage { id, data } => {
757                    info!("Sending message to actor: {:?}", id);
758                    runtime_tx
759                        .send(TheaterCommand::SendMessage {
760                            actor_id: id.clone(),
761                            actor_message: ActorMessage::Send(ActorSend { data: data.clone() }),
762                        })
763                        .await?;
764
765                    ManagementResponse::SentMessage { id }
766                }
767                ManagementCommand::RequestActorMessage { id, data } => {
768                    info!("Requesting message from actor: {:?}", id);
769                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
770                    runtime_tx
771                        .send(TheaterCommand::SendMessage {
772                            actor_id: id.clone(),
773                            actor_message: ActorMessage::Request(ActorRequest {
774                                data: data.clone(),
775                                response_tx: cmd_tx,
776                            }),
777                        })
778                        .await?;
779
780                    let response = cmd_rx.await?;
781                    ManagementResponse::RequestedMessage {
782                        id,
783                        message: response,
784                    }
785                }
786                ManagementCommand::GetActorManifest { id } => {
787                    info!("Getting manifest for actor: {:?}", id);
788                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
789                    runtime_tx
790                        .send(TheaterCommand::GetActorManifest {
791                            actor_id: id.clone(),
792                            response_tx: cmd_tx,
793                        })
794                        .await?;
795
796                    let manifest = cmd_rx.await?;
797                    ManagementResponse::ActorManifest {
798                        id,
799                        manifest: manifest?,
800                    }
801                }
802                ManagementCommand::GetActorStatus { id } => {
803                    info!("Getting status for actor: {:?}", id);
804                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
805                    runtime_tx
806                        .send(TheaterCommand::GetActorStatus {
807                            actor_id: id.clone(),
808                            response_tx: cmd_tx,
809                        })
810                        .await?;
811
812                    let status = cmd_rx.await?;
813                    ManagementResponse::ActorStatus {
814                        id,
815                        status: status?,
816                    }
817                }
818                ManagementCommand::RestartActor { id } => {
819                    info!("Restarting actor: {:?}", id);
820                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
821                    runtime_tx
822                        .send(TheaterCommand::RestartActor {
823                            actor_id: id.clone(),
824                            response_tx: cmd_tx,
825                        })
826                        .await?;
827
828                    match cmd_rx.await? {
829                        Ok(_) => ManagementResponse::Restarted { id },
830                        Err(e) => ManagementResponse::Error {
831                            error: ManagementError::RuntimeError(format!(
832                                "Failed to restart actor: {}",
833                                e
834                            )),
835                        },
836                    }
837                }
838                ManagementCommand::GetActorState { id } => {
839                    info!("Getting state for actor: {:?}", id);
840                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
841                    runtime_tx
842                        .send(TheaterCommand::GetActorState {
843                            actor_id: id.clone(),
844                            response_tx: cmd_tx,
845                        })
846                        .await?;
847
848                    let state = cmd_rx.await?;
849                    ManagementResponse::ActorState { id, state: state? }
850                }
851                ManagementCommand::GetActorEvents { id } => {
852                    info!("Getting events for actor: {:?}", id);
853                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
854                    runtime_tx
855                        .send(TheaterCommand::GetActorEvents {
856                            actor_id: id.clone(),
857                            response_tx: cmd_tx,
858                        })
859                        .await?;
860
861                    match cmd_rx.await {
862                        Ok(result) => match result {
863                            Ok(events) => {
864                                debug!(
865                                    "Successfully retrieved {} events for actor {}",
866                                    events.len(),
867                                    id
868                                );
869                                ManagementResponse::ActorEvents { id, events }
870                            }
871                            Err(e) => {
872                                debug!("Error getting events for actor {}: {}", id, e);
873                                ManagementResponse::Error { error: e.into() }
874                            }
875                        },
876                        Err(e) => {
877                            error!("Failed to receive events response: {}", e);
878                            ManagementResponse::Error {
879                                error: ManagementError::CommunicationError(format!(
880                                    "Failed to receive events response: {}",
881                                    e
882                                )),
883                            }
884                        }
885                    }
886                }
887                ManagementCommand::GetActorMetrics { id } => {
888                    info!("Getting metrics for actor: {:?}", id);
889                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
890                    runtime_tx
891                        .send(TheaterCommand::GetActorMetrics {
892                            actor_id: id.clone(),
893                            response_tx: cmd_tx,
894                        })
895                        .await?;
896
897                    let metrics = cmd_rx.await?;
898                    ManagementResponse::ActorMetrics {
899                        id,
900                        metrics: serde_json::to_value(metrics?)?,
901                    }
902                }
903                ManagementCommand::UpdateActorComponent { id, component } => {
904                    info!("Updating component for actor {:?} to {}", id, component);
905                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
906                    runtime_tx
907                        .send(TheaterCommand::UpdateActorComponent {
908                            actor_id: id.clone(),
909                            component: component.clone(),
910                            response_tx: cmd_tx,
911                        })
912                        .await?;
913
914                    match cmd_rx.await? {
915                        Ok(_) => ManagementResponse::ActorComponentUpdated { id },
916                        Err(e) => ManagementResponse::Error {
917                            error: ManagementError::RuntimeError(format!(
918                                "Failed to update actor component: {}",
919                                e
920                            )),
921                        },
922                    }
923                }
924                // Handle channel management commands
925                ManagementCommand::OpenChannel {
926                    actor_id,
927                    initial_message,
928                } => {
929                    info!("Opening channel to actor: {:?}", actor_id);
930
931                    // Create a response channel
932                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
933
934                    // Generate a channel ID
935                    let client_id = ChannelParticipant::External;
936                    let channel_id = ChannelId::new(&client_id, &actor_id);
937                    let channel_id_str = channel_id.0.clone();
938
939                    // Send the channel open command to the runtime
940                    runtime_tx
941                        .send(TheaterCommand::ChannelOpen {
942                            initiator_id: client_id.clone(),
943                            target_id: actor_id.clone(),
944                            channel_id: channel_id.clone(),
945                            initial_message,
946                            response_tx,
947                        })
948                        .await
949                        .map_err(|e| {
950                            anyhow::anyhow!("Failed to send channel open command: {}", e)
951                        })?;
952
953                    // Wait for the response
954                    match response_rx.await {
955                        Ok(result) => {
956                            match result {
957                                Ok(accepted) => {
958                                    if accepted {
959                                        // Channel opened successfully
960                                        info!("Channel opened successfully: {}", channel_id_str);
961
962                                        // Register the channel subscription to receive messages
963                                        let channel_sub = ChannelSubscription {
964                                            channel_id: channel_id_str.clone(),
965                                            initiator_id: client_id.clone(),
966                                            target_id: actor_id.clone(),
967                                            client_tx: cmd_client_tx.clone(),
968                                        };
969
970                                        channel_subscriptions
971                                            .lock()
972                                            .await
973                                            .insert(channel_id_str.clone(), channel_sub);
974
975                                        // Track this channel for cleanup on disconnect
976                                        connection_channel_subscriptions
977                                            .push(channel_id_str.clone());
978
979                                        ManagementResponse::ChannelOpened {
980                                            channel_id: channel_id_str,
981                                            actor_id,
982                                        }
983                                    } else {
984                                        // Channel rejected by target
985                                        ManagementResponse::Error {
986                                            error: ManagementError::ChannelRejected,
987                                        }
988                                    }
989                                }
990                                Err(e) => ManagementResponse::Error {
991                                    error: ManagementError::RuntimeError(format!(
992                                        "Error opening channel: {}",
993                                        e
994                                    )),
995                                },
996                            }
997                        }
998                        Err(e) => ManagementResponse::Error {
999                            error: ManagementError::CommunicationError(format!(
1000                                "Failed to receive channel open response: {}",
1001                                e
1002                            )),
1003                        },
1004                    }
1005                }
1006                ManagementCommand::SendOnChannel {
1007                    channel_id,
1008                    message,
1009                } => {
1010                    info!("Sending message on channel: {}", channel_id);
1011
1012                    // Parse the channel ID
1013                    let channel_id_parsed = ChannelId(channel_id.clone());
1014                    let client_id = ChannelParticipant::External;
1015
1016                    // Send the message on the channel
1017                    runtime_tx
1018                        .send(TheaterCommand::ChannelMessage {
1019                            channel_id: channel_id_parsed,
1020                            message,
1021                            sender_id: client_id,
1022                        })
1023                        .await
1024                        .map_err(|e| anyhow::anyhow!("Failed to send message on channel: {}", e))?;
1025
1026                    ManagementResponse::MessageSent { channel_id }
1027                }
1028                ManagementCommand::CloseChannel { channel_id } => {
1029                    info!("Closing channel: {}", channel_id);
1030
1031                    // Parse the channel ID
1032                    let channel_id_parsed = ChannelId(channel_id.clone());
1033
1034                    // Close the channel
1035                    runtime_tx
1036                        .send(TheaterCommand::ChannelClose {
1037                            channel_id: channel_id_parsed,
1038                        })
1039                        .await
1040                        .map_err(|e| anyhow::anyhow!("Failed to close channel: {}", e))?;
1041
1042                    // Remove from channel subscriptions
1043                    channel_subscriptions.lock().await.remove(&channel_id);
1044                    connection_channel_subscriptions.retain(|id| id != &channel_id);
1045
1046                    ManagementResponse::ChannelClosed { channel_id }
1047                }
1048                ManagementCommand::NewStore {} => {
1049                    info!("Creating new store");
1050                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1051                    runtime_tx
1052                        .send(TheaterCommand::NewStore {
1053                            response_tx: cmd_tx,
1054                        })
1055                        .await?;
1056
1057                    let store_id = cmd_rx.await?;
1058                    ManagementResponse::StoreCreated {
1059                        store_id: store_id?.id,
1060                    }
1061                }
1062            };
1063
1064            debug!("Sending response: {:?}", response);
1065            if let Err(e) = client_tx.send(response).await {
1066                error!("Failed to send response: {}", e);
1067                break;
1068            }
1069            debug!("Response sent");
1070        }
1071
1072        // Clean up all subscriptions for this connection
1073        debug!(
1074            "Connection closed, cleaning up {} subscriptions",
1075            connection_subscriptions.len()
1076        );
1077        let mut subs = subscriptions.lock().await;
1078
1079        for (actor_id, sub_id) in connection_subscriptions {
1080            if let Some(actor_subs) = subs.get_mut(&actor_id) {
1081                actor_subs.retain(|sub| sub.id != sub_id);
1082
1083                // Remove the entry if no subscriptions remain
1084                if actor_subs.is_empty() {
1085                    subs.remove(&actor_id);
1086                }
1087            }
1088        }
1089
1090        // Clean up channel subscriptions
1091        debug!(
1092            "Connection closed, cleaning up {} channel subscriptions",
1093            connection_channel_subscriptions.len()
1094        );
1095        let mut channel_subs = channel_subscriptions.lock().await;
1096
1097        for channel_id in connection_channel_subscriptions {
1098            channel_subs.remove(&channel_id);
1099        }
1100
1101        debug!("Cleaned up all subscriptions for the connection");
1102        Ok(())
1103    }
1104}