theater_server/
server.rs

1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9    ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10    ChannelParticipant,
11};
12use theater::{ActorError, ChainEvent, ManifestConfig};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::sync::{mpsc, Mutex};
15use tokio_util::codec::Framed;
16use tracing::{debug, error, info};
17use uuid::Uuid;
18
19use theater::id::TheaterId;
20use theater::messages::{ChannelId, TheaterCommand};
21use theater::theater_runtime::TheaterRuntime;
22use theater::TheaterRuntimeError;
23
24use crate::fragmenting_codec::FragmentingCodec;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub enum ManagementCommand {
28    StartActor {
29        manifest: String,
30        initial_state: Option<Vec<u8>>,
31        parent: bool,
32        subscribe: bool,
33    },
34    StopActor {
35        id: TheaterId,
36    },
37    TerminateActor {
38        id: TheaterId,
39    },
40    ListActors,
41    SubscribeToActor {
42        id: TheaterId,
43    },
44    UnsubscribeFromActor {
45        id: TheaterId,
46        subscription_id: Uuid,
47    },
48    SendActorMessage {
49        id: TheaterId,
50        data: Vec<u8>,
51    },
52    RequestActorMessage {
53        id: TheaterId,
54        data: Vec<u8>,
55    },
56    GetActorManifest {
57        id: TheaterId,
58    },
59    GetActorStatus {
60        id: TheaterId,
61    },
62    RestartActor {
63        id: TheaterId,
64    },
65    GetActorState {
66        id: TheaterId,
67    },
68    GetActorEvents {
69        id: TheaterId,
70    },
71    GetActorMetrics {
72        id: TheaterId,
73    },
74    UpdateActorComponent {
75        id: TheaterId,
76        component: String,
77    },
78    // Channel management commands
79    OpenChannel {
80        actor_id: ChannelParticipant,
81        initial_message: Vec<u8>,
82    },
83    SendOnChannel {
84        channel_id: String,
85        message: Vec<u8>,
86    },
87    CloseChannel {
88        channel_id: String,
89    },
90
91    // Store commands
92    NewStore {},
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub enum ManagementResponse {
97    ActorStarted {
98        id: TheaterId,
99    },
100    ActorStopped {
101        id: TheaterId,
102    },
103    ActorList {
104        actors: Vec<(TheaterId, String)>,
105    },
106    Subscribed {
107        id: TheaterId,
108        subscription_id: Uuid,
109    },
110    Unsubscribed {
111        id: TheaterId,
112    },
113    ActorEvent {
114        event: ChainEvent,
115    },
116    ActorResult(ActorResult),
117    ActorError {
118        error: ActorError,
119    },
120    Error {
121        error: ManagementError,
122    },
123    RequestedMessage {
124        id: TheaterId,
125        message: Vec<u8>,
126    },
127    SentMessage {
128        id: TheaterId,
129    },
130    ActorStatus {
131        id: TheaterId,
132        status: ActorStatus,
133    },
134    Restarted {
135        id: TheaterId,
136    },
137    ActorManifest {
138        id: TheaterId,
139        manifest: ManifestConfig,
140    },
141    ActorState {
142        id: TheaterId,
143        state: Option<Vec<u8>>,
144    },
145    ActorEvents {
146        id: TheaterId,
147        events: Vec<ChainEvent>,
148    },
149    ActorMetrics {
150        id: TheaterId,
151        metrics: serde_json::Value,
152    },
153    ActorComponentUpdated {
154        id: TheaterId,
155    },
156    // Channel management responses
157    ChannelOpened {
158        channel_id: String,
159        actor_id: ChannelParticipant,
160    },
161    MessageSent {
162        channel_id: String,
163    },
164    ChannelMessage {
165        channel_id: String,
166        sender_id: ChannelParticipant,
167        message: Vec<u8>,
168    },
169    ChannelClosed {
170        channel_id: String,
171    },
172
173    // Store responses
174    StoreCreated {
175        store_id: String,
176    },
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub enum ManagementError {
181    // Actor-related errors
182    ActorNotFound,
183    ActorAlreadyExists,
184    ActorNotRunning,
185    ActorError(String),
186
187    // Channel-related errors
188    ChannelNotFound,
189    ChannelClosed,
190    ChannelRejected,
191
192    // Store-related errors
193    StoreError(String),
194
195    // Communication errors
196    CommunicationError(String),
197
198    // Request handling errors
199    InvalidRequest(String),
200    Timeout,
201
202    // System errors
203    RuntimeError(String),
204    InternalError(String),
205
206    // Serialization/deserialization errors
207    SerializationError(String),
208}
209
210// Allow converting from TheaterRuntimeError to ManagementError
211impl From<TheaterRuntimeError> for ManagementError {
212    fn from(err: TheaterRuntimeError) -> Self {
213        match err {
214            TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
215            TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
216            TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
217            TheaterRuntimeError::ActorOperationFailed(msg) => {
218                ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
219            }
220            TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
221            TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
222            TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
223            TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
224            TheaterRuntimeError::SerializationError(msg) => {
225                ManagementError::SerializationError(msg)
226            }
227            TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
228        }
229    }
230}
231
232#[derive(Debug)]
233#[allow(dead_code)]
234struct Subscription {
235    id: Uuid,
236    client_tx: mpsc::Sender<ManagementResponse>,
237}
238
239impl Eq for Subscription {}
240impl PartialEq for Subscription {
241    fn eq(&self, other: &Self) -> bool {
242        self.id == other.id
243    }
244}
245impl std::hash::Hash for Subscription {
246    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
247        self.id.hash(state);
248    }
249}
250
251// ChannelEvent is now imported from theater::ChannelEvent
252
253// Structure to track active channel subscriptions
254#[derive(Debug)]
255#[allow(dead_code)]
256struct ChannelSubscription {
257    channel_id: String,
258    initiator_id: ChannelParticipant,
259    target_id: ChannelParticipant,
260    client_tx: mpsc::Sender<ManagementResponse>,
261}
262
263pub struct TheaterServer {
264    runtime: TheaterRuntime,
265    theater_tx: mpsc::Sender<TheaterCommand>,
266    management_socket: TcpListener,
267    subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
268    // Field to track channel subscriptions
269    channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
270    // Channel for runtime to send channel events back to server
271    #[allow(dead_code)]
272    channel_events_tx: mpsc::Sender<ChannelEvent>,
273}
274
275impl TheaterServer {
276    // Process channel events and forward them to subscribed clients
277    async fn process_channel_events(
278        mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
279        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
280    ) {
281        while let Some(event) = channel_events_rx.recv().await {
282            match event {
283                ChannelEvent::Message {
284                    channel_id,
285                    sender_id,
286                    message,
287                } => {
288                    tracing::debug!("Received channel message for {}", channel_id);
289                    // Forward to subscribed clients
290                    let subs = channel_subscriptions.lock().await;
291                    if let Some(sub) = subs.get(&channel_id.0) {
292                        let response = ManagementResponse::ChannelMessage {
293                            channel_id: channel_id.0.clone(),
294                            sender_id,
295                            message,
296                        };
297
298                        tracing::debug!("Forwarding channel message to client: {:?}", response);
299
300                        if let Err(e) = sub.client_tx.send(response).await {
301                            tracing::warn!("Failed to forward channel message: {}", e);
302                        } else {
303                            tracing::debug!("Forwarded channel message to client");
304                        }
305                    }
306                }
307                ChannelEvent::Close { channel_id } => {
308                    tracing::debug!("Received channel close event for {}", channel_id);
309                    // Forward to subscribed clients
310                    let mut subs = channel_subscriptions.lock().await;
311                    if let Some(sub) = subs.remove(&channel_id.0) {
312                        let response = ManagementResponse::ChannelClosed {
313                            channel_id: channel_id.0.clone(),
314                        };
315
316                        if let Err(e) = sub.client_tx.send(response).await {
317                            tracing::warn!("Failed to forward channel close event: {}", e);
318                        } else {
319                            tracing::debug!("Forwarded channel close event to client");
320                        }
321                    }
322                }
323            }
324        }
325    }
326
327    pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
328        let (theater_tx, theater_rx) = mpsc::channel(32);
329
330        // Create channel for runtime to send channel events back to server
331        let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
332
333        // Pass channel_events_tx to runtime during initialization
334        let runtime = TheaterRuntime::new(
335            theater_tx.clone(),
336            theater_rx,
337            Some(channel_events_tx.clone()),
338            theater::config::permissions::HandlerPermission::root(), // Root permissions for server
339        )
340        .await?;
341        let management_socket = TcpListener::bind(address).await?;
342
343        let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
344
345        // Start task to process channel events
346        let channel_subs_clone = channel_subscriptions.clone();
347        tokio::spawn(async move {
348            Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
349        });
350
351        Ok(Self {
352            runtime,
353            theater_tx,
354            management_socket,
355            subscriptions: Arc::new(Mutex::new(HashMap::new())),
356            channel_subscriptions,
357            channel_events_tx,
358        })
359    }
360
361    pub async fn run(mut self) -> Result<()> {
362        info!(
363            "Theater server starting on {:?}",
364            self.management_socket.local_addr()?
365        );
366
367        // Start the theater runtime in its own task
368        let runtime_handle = tokio::spawn(async move {
369            match self.runtime.run().await {
370                Ok(_) => Ok(()),
371                Err(e) => {
372                    error!("Theater runtime failed: {}", e);
373                    Err(e)
374                }
375            }
376        });
377
378        // Accept and handle management connections
379        while let Ok((socket, addr)) = self.management_socket.accept().await {
380            info!("New management connection from {}", addr);
381            let runtime_tx = self.theater_tx.clone();
382            let subscriptions = self.subscriptions.clone();
383            let channel_subscriptions = self.channel_subscriptions.clone();
384
385            tokio::spawn(async move {
386                if let Err(e) = Self::handle_management_connection(
387                    socket,
388                    runtime_tx,
389                    subscriptions,
390                    channel_subscriptions,
391                )
392                .await
393                {
394                    error!("Error handling management connection: {}", e);
395                }
396            });
397        }
398
399        runtime_handle.await??;
400        Ok(())
401    }
402
403    async fn handle_management_connection(
404        socket: TcpStream,
405        runtime_tx: mpsc::Sender<TheaterCommand>,
406        subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
407        channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
408    ) -> Result<()> {
409        // Create a channel for sending responses to this client
410        let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
411
412        let codec = FragmentingCodec::new();
413        let framed = Framed::new(socket, codec);
414
415        // Split the framed connection into read and write parts
416        let (mut framed_sink, mut framed_stream) = framed.split();
417
418        // Clone the client_tx for use in the command loop
419        let cmd_client_tx = client_tx.clone();
420
421        // Start a task to forward responses to the client
422        let _response_task = tokio::spawn(async move {
423            while let Some(response) = client_rx.recv().await {
424                match serde_json::to_vec(&response) {
425                    Ok(data) => {
426                        if let Err(e) = framed_sink.send(Bytes::from(data)).await {
427                            debug!("Error sending response to client: {}", e);
428                            break;
429                        }
430                    }
431                    Err(e) => {
432                        error!("Error serializing response: {}", e);
433                    }
434                }
435            }
436            debug!("Response forwarder for client closed");
437        });
438
439        // Store active subscriptions for this connection to clean up on disconnect
440        let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
441
442        // Store active channel subscriptions for cleanup
443        let mut connection_channel_subscriptions: Vec<String> = Vec::new();
444
445        // Loop until connection closes or an error occurs
446        'connection: while let Some(msg) = framed_stream.next().await {
447            debug!("Received management message");
448            let msg = match msg {
449                Ok(m) => m,
450                Err(e) => {
451                    error!("Error receiving message: {}", e);
452                    break 'connection;
453                }
454            };
455
456            let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
457                Ok(c) => c,
458                Err(e) => {
459                    error!(
460                        "Error parsing command: {} {}",
461                        e,
462                        String::from_utf8_lossy(&msg)
463                    );
464                    continue;
465                }
466            };
467            debug!("Parsed command: {:?}", cmd);
468
469            // Store the command for reference (used for subscription tracking)
470            let _cmd_clone = cmd.clone();
471
472            let response = match cmd {
473                ManagementCommand::StartActor {
474                    manifest,
475                    initial_state,
476                    parent,
477                    subscribe,
478                } => {
479                    info!("Starting actor from manifest: {:?}", manifest);
480                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
481                    debug!("Sending SpawnActor command to runtime");
482                    let supervisor_tx = if parent {
483                        let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
484                        let cmd_client_tx = cmd_client_tx.clone();
485                        tokio::spawn(async move {
486                            while let Some(res) = supervisor_rx.recv().await {
487                                debug!("Received supervisor response: {:?}", res);
488                                if let Err(e) = cmd_client_tx
489                                    .send(ManagementResponse::ActorResult(res))
490                                    .await
491                                {
492                                    error!("Failed to send supervisor response: {}", e);
493                                    break;
494                                }
495                            }
496                        });
497                        Some(supervisor_tx)
498                    } else {
499                        None
500                    };
501                    let subscription_tx = if subscribe {
502                        let (event_tx, mut event_rx) = mpsc::channel(32);
503
504                        // set up a task to forward events to the client
505                        let cmd_client_tx = cmd_client_tx.clone();
506                        tokio::spawn(async move {
507                            while let Some(event) = event_rx.recv().await {
508                                debug!("Received event for subscription");
509                                let response = match event {
510                                    Ok(event) => ManagementResponse::ActorEvent { event },
511                                    Err(e) => ManagementResponse::ActorError { error: e },
512                                };
513                                if let Err(e) = cmd_client_tx.send(response).await {
514                                    debug!("Failed to forward event to client: {}", e);
515                                    break;
516                                }
517                            }
518                            debug!("Event forwarder for subscription stopped");
519                        });
520
521                        Some(event_tx)
522                    } else {
523                        None
524                    };
525                    match runtime_tx
526                        .send(TheaterCommand::SpawnActor {
527                            manifest_path: manifest.clone(),
528                            init_bytes: initial_state,
529                            response_tx: cmd_tx,
530                            parent_id: None,
531                            supervisor_tx,
532                            subscription_tx,
533                        })
534                        .await
535                    {
536                        Ok(_) => {
537                            debug!("SpawnActor command sent to runtime, awaiting response");
538                            match cmd_rx.await {
539                                Ok(result) => match result {
540                                    Ok(actor_id) => {
541                                        info!("Actor started with ID: {:?}", actor_id);
542                                        ManagementResponse::ActorStarted { id: actor_id }
543                                    }
544                                    Err(e) => {
545                                        error!("Runtime failed to start actor: {}", e);
546                                        ManagementResponse::Error {
547                                            error: ManagementError::RuntimeError(format!(
548                                                "Failed to start actor: {}",
549                                                e
550                                            )),
551                                        }
552                                    }
553                                },
554                                Err(e) => {
555                                    error!("Failed to receive spawn response: {}", e);
556                                    ManagementResponse::Error {
557                                        error: ManagementError::CommunicationError(format!(
558                                            "Failed to receive spawn response: {}",
559                                            e
560                                        )),
561                                    }
562                                }
563                            }
564                        }
565                        Err(e) => {
566                            error!("Failed to send SpawnActor command: {}", e);
567                            ManagementResponse::Error {
568                                error: ManagementError::CommunicationError(format!(
569                                    "Failed to send spawn command: {}",
570                                    e
571                                )),
572                            }
573                        }
574                    }
575                }
576                ManagementCommand::StopActor { id } => {
577                    info!("Stopping actor: {:?}", id);
578                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
579                    runtime_tx
580                        .send(TheaterCommand::StopActor {
581                            actor_id: id.clone(),
582                            response_tx: cmd_tx,
583                        })
584                        .await?;
585
586                    match cmd_rx.await? {
587                        Ok(_) => {
588                            subscriptions.lock().await.remove(&id);
589                            ManagementResponse::ActorStopped { id }
590                        }
591                        Err(e) => ManagementResponse::Error {
592                            error: ManagementError::RuntimeError(format!(
593                                "Failed to stop actor: {}",
594                                e
595                            )),
596                        },
597                    }
598                }
599                ManagementCommand::TerminateActor { id } => {
600                    info!("Terminating actor: {:?}", id);
601                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
602                    runtime_tx
603                        .send(TheaterCommand::TerminateActor {
604                            actor_id: id.clone(),
605                            response_tx: cmd_tx,
606                        })
607                        .await?;
608
609                    match cmd_rx.await? {
610                        Ok(_) => {
611                            subscriptions.lock().await.remove(&id);
612                            ManagementResponse::ActorStopped { id }
613                        }
614                        Err(e) => ManagementResponse::Error {
615                            error: ManagementError::RuntimeError(format!(
616                                "Failed to terminate actor: {}",
617                                e
618                            )),
619                        },
620                    }
621                }
622                ManagementCommand::ListActors => {
623                    debug!("Listing actors");
624                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
625                    runtime_tx
626                        .send(TheaterCommand::GetActors {
627                            response_tx: cmd_tx,
628                        })
629                        .await?;
630
631                    match cmd_rx.await? {
632                        Ok(actors) => {
633                            info!("Found {} actors", actors.len());
634                            ManagementResponse::ActorList { actors }
635                        }
636                        Err(e) => ManagementResponse::Error {
637                            error: ManagementError::RuntimeError(format!(
638                                "Failed to list actors: {}",
639                                e
640                            )),
641                        },
642                    }
643                }
644                ManagementCommand::SubscribeToActor { id } => {
645                    info!("New subscription request for actor: {:?}", id);
646                    let subscription_id = Uuid::new_v4();
647                    let subscription = Subscription {
648                        id: subscription_id,
649                        client_tx: cmd_client_tx.clone(),
650                    };
651
652                    debug!("Subscription created with ID: {}", subscription_id);
653
654                    // Register the subscription in the global map
655                    subscriptions
656                        .lock()
657                        .await
658                        .entry(id.clone())
659                        .or_default()
660                        .insert(subscription);
661
662                    // Set up the event channel for the subscription
663                    let (event_tx, mut event_rx) = mpsc::channel(32);
664                    runtime_tx
665                        .send(TheaterCommand::SubscribeToActor {
666                            actor_id: id.clone(),
667                            event_tx,
668                        })
669                        .await
670                        .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
671
672                    // Add to the list of subscriptions for this connection
673                    connection_subscriptions.push((id.clone(), subscription_id));
674
675                    // Create a task to forward events to this client
676                    let client_tx_clone = cmd_client_tx.clone();
677                    tokio::spawn(async move {
678                        debug!(
679                            "Starting event forwarder for subscription {}",
680                            subscription_id
681                        );
682                        while let Some(event) = event_rx.recv().await {
683                            debug!("Received event for subscription {}", subscription_id);
684                            let response = match event {
685                                Ok(event) => ManagementResponse::ActorEvent { event },
686                                Err(e) => ManagementResponse::ActorError { error: e },
687                            };
688                            if let Err(e) = client_tx_clone.send(response).await {
689                                debug!("Failed to forward event to client: {}", e);
690                                break;
691                            }
692                        }
693                        debug!(
694                            "Event forwarder for subscription {} stopped",
695                            subscription_id
696                        );
697                    });
698
699                    ManagementResponse::Subscribed {
700                        id,
701                        subscription_id,
702                    }
703                }
704                ManagementCommand::UnsubscribeFromActor {
705                    id,
706                    subscription_id,
707                } => {
708                    debug!(
709                        "Removing subscription {} for actor {:?}",
710                        subscription_id, id
711                    );
712
713                    // Remove subscription from the tracking list for this connection
714                    connection_subscriptions
715                        .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
716
717                    // Remove from the global subscriptions map
718                    let mut subs = subscriptions.lock().await;
719                    if let Some(actor_subs) = subs.get_mut(&id) {
720                        actor_subs.retain(|sub| sub.id != subscription_id);
721
722                        // Remove the entry if no subscriptions remain
723                        if actor_subs.is_empty() {
724                            subs.remove(&id);
725                        }
726                    }
727
728                    debug!("Subscription removed");
729                    ManagementResponse::Unsubscribed { id }
730                }
731                ManagementCommand::SendActorMessage { id, data } => {
732                    info!("Sending message to actor: {:?}", id);
733                    runtime_tx
734                        .send(TheaterCommand::SendMessage {
735                            actor_id: id.clone(),
736                            actor_message: ActorMessage::Send(ActorSend { data: data.clone() }),
737                        })
738                        .await?;
739
740                    ManagementResponse::SentMessage { id }
741                }
742                ManagementCommand::RequestActorMessage { id, data } => {
743                    info!("Requesting message from actor: {:?}", id);
744                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
745                    runtime_tx
746                        .send(TheaterCommand::SendMessage {
747                            actor_id: id.clone(),
748                            actor_message: ActorMessage::Request(ActorRequest {
749                                data: data.clone(),
750                                response_tx: cmd_tx,
751                            }),
752                        })
753                        .await?;
754
755                    let response = cmd_rx.await?;
756                    ManagementResponse::RequestedMessage {
757                        id,
758                        message: response,
759                    }
760                }
761                ManagementCommand::GetActorManifest { id } => {
762                    info!("Getting manifest for actor: {:?}", id);
763                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
764                    runtime_tx
765                        .send(TheaterCommand::GetActorManifest {
766                            actor_id: id.clone(),
767                            response_tx: cmd_tx,
768                        })
769                        .await?;
770
771                    let manifest = cmd_rx.await?;
772                    ManagementResponse::ActorManifest {
773                        id,
774                        manifest: manifest?,
775                    }
776                }
777                ManagementCommand::GetActorStatus { id } => {
778                    info!("Getting status for actor: {:?}", id);
779                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
780                    runtime_tx
781                        .send(TheaterCommand::GetActorStatus {
782                            actor_id: id.clone(),
783                            response_tx: cmd_tx,
784                        })
785                        .await?;
786
787                    let status = cmd_rx.await?;
788                    ManagementResponse::ActorStatus {
789                        id,
790                        status: status?,
791                    }
792                }
793                ManagementCommand::RestartActor { id } => {
794                    info!("Restarting actor: {:?}", id);
795                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
796                    runtime_tx
797                        .send(TheaterCommand::RestartActor {
798                            actor_id: id.clone(),
799                            response_tx: cmd_tx,
800                        })
801                        .await?;
802
803                    match cmd_rx.await? {
804                        Ok(_) => ManagementResponse::Restarted { id },
805                        Err(e) => ManagementResponse::Error {
806                            error: ManagementError::RuntimeError(format!(
807                                "Failed to restart actor: {}",
808                                e
809                            )),
810                        },
811                    }
812                }
813                ManagementCommand::GetActorState { id } => {
814                    info!("Getting state for actor: {:?}", id);
815                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
816                    runtime_tx
817                        .send(TheaterCommand::GetActorState {
818                            actor_id: id.clone(),
819                            response_tx: cmd_tx,
820                        })
821                        .await?;
822
823                    let state = cmd_rx.await?;
824                    ManagementResponse::ActorState { id, state: state? }
825                }
826                ManagementCommand::GetActorEvents { id } => {
827                    info!("Getting events for actor: {:?}", id);
828                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
829                    runtime_tx
830                        .send(TheaterCommand::GetActorEvents {
831                            actor_id: id.clone(),
832                            response_tx: cmd_tx,
833                        })
834                        .await?;
835
836                    match cmd_rx.await {
837                        Ok(result) => match result {
838                            Ok(events) => {
839                                debug!(
840                                    "Successfully retrieved {} events for actor {}",
841                                    events.len(),
842                                    id
843                                );
844                                ManagementResponse::ActorEvents { id, events }
845                            }
846                            Err(e) => {
847                                debug!("Error getting events for actor {}: {}", id, e);
848                                ManagementResponse::Error { error: e.into() }
849                            }
850                        },
851                        Err(e) => {
852                            error!("Failed to receive events response: {}", e);
853                            ManagementResponse::Error {
854                                error: ManagementError::CommunicationError(format!(
855                                    "Failed to receive events response: {}",
856                                    e
857                                )),
858                            }
859                        }
860                    }
861                }
862                ManagementCommand::GetActorMetrics { id } => {
863                    info!("Getting metrics for actor: {:?}", id);
864                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
865                    runtime_tx
866                        .send(TheaterCommand::GetActorMetrics {
867                            actor_id: id.clone(),
868                            response_tx: cmd_tx,
869                        })
870                        .await?;
871
872                    let metrics = cmd_rx.await?;
873                    ManagementResponse::ActorMetrics {
874                        id,
875                        metrics: serde_json::to_value(metrics?)?,
876                    }
877                }
878                ManagementCommand::UpdateActorComponent { id, component } => {
879                    info!("Updating component for actor {:?} to {}", id, component);
880                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
881                    runtime_tx
882                        .send(TheaterCommand::UpdateActorComponent {
883                            actor_id: id.clone(),
884                            component: component.clone(),
885                            response_tx: cmd_tx,
886                        })
887                        .await?;
888
889                    match cmd_rx.await? {
890                        Ok(_) => ManagementResponse::ActorComponentUpdated { id },
891                        Err(e) => ManagementResponse::Error {
892                            error: ManagementError::RuntimeError(format!(
893                                "Failed to update actor component: {}",
894                                e
895                            )),
896                        },
897                    }
898                }
899                // Handle channel management commands
900                ManagementCommand::OpenChannel {
901                    actor_id,
902                    initial_message,
903                } => {
904                    info!("Opening channel to actor: {:?}", actor_id);
905
906                    // Create a response channel
907                    let (response_tx, response_rx) = tokio::sync::oneshot::channel();
908
909                    // Generate a channel ID
910                    let client_id = ChannelParticipant::External;
911                    let channel_id = ChannelId::new(&client_id, &actor_id);
912                    let channel_id_str = channel_id.0.clone();
913
914                    // Send the channel open command to the runtime
915                    runtime_tx
916                        .send(TheaterCommand::ChannelOpen {
917                            initiator_id: client_id.clone(),
918                            target_id: actor_id.clone(),
919                            channel_id: channel_id.clone(),
920                            initial_message,
921                            response_tx,
922                        })
923                        .await
924                        .map_err(|e| {
925                            anyhow::anyhow!("Failed to send channel open command: {}", e)
926                        })?;
927
928                    // Wait for the response
929                    match response_rx.await {
930                        Ok(result) => {
931                            match result {
932                                Ok(accepted) => {
933                                    if accepted {
934                                        // Channel opened successfully
935                                        info!("Channel opened successfully: {}", channel_id_str);
936
937                                        // Register the channel subscription to receive messages
938                                        let channel_sub = ChannelSubscription {
939                                            channel_id: channel_id_str.clone(),
940                                            initiator_id: client_id.clone(),
941                                            target_id: actor_id.clone(),
942                                            client_tx: cmd_client_tx.clone(),
943                                        };
944
945                                        channel_subscriptions
946                                            .lock()
947                                            .await
948                                            .insert(channel_id_str.clone(), channel_sub);
949
950                                        // Track this channel for cleanup on disconnect
951                                        connection_channel_subscriptions
952                                            .push(channel_id_str.clone());
953
954                                        ManagementResponse::ChannelOpened {
955                                            channel_id: channel_id_str,
956                                            actor_id,
957                                        }
958                                    } else {
959                                        // Channel rejected by target
960                                        ManagementResponse::Error {
961                                            error: ManagementError::ChannelRejected,
962                                        }
963                                    }
964                                }
965                                Err(e) => ManagementResponse::Error {
966                                    error: ManagementError::RuntimeError(format!(
967                                        "Error opening channel: {}",
968                                        e
969                                    )),
970                                },
971                            }
972                        }
973                        Err(e) => ManagementResponse::Error {
974                            error: ManagementError::CommunicationError(format!(
975                                "Failed to receive channel open response: {}",
976                                e
977                            )),
978                        },
979                    }
980                }
981                ManagementCommand::SendOnChannel {
982                    channel_id,
983                    message,
984                } => {
985                    info!("Sending message on channel: {}", channel_id);
986
987                    // Parse the channel ID
988                    let channel_id_parsed = ChannelId(channel_id.clone());
989                    let client_id = ChannelParticipant::External;
990
991                    // Send the message on the channel
992                    runtime_tx
993                        .send(TheaterCommand::ChannelMessage {
994                            channel_id: channel_id_parsed,
995                            message,
996                            sender_id: client_id,
997                        })
998                        .await
999                        .map_err(|e| anyhow::anyhow!("Failed to send message on channel: {}", e))?;
1000
1001                    ManagementResponse::MessageSent { channel_id }
1002                }
1003                ManagementCommand::CloseChannel { channel_id } => {
1004                    info!("Closing channel: {}", channel_id);
1005
1006                    // Parse the channel ID
1007                    let channel_id_parsed = ChannelId(channel_id.clone());
1008
1009                    // Close the channel
1010                    runtime_tx
1011                        .send(TheaterCommand::ChannelClose {
1012                            channel_id: channel_id_parsed,
1013                        })
1014                        .await
1015                        .map_err(|e| anyhow::anyhow!("Failed to close channel: {}", e))?;
1016
1017                    // Remove from channel subscriptions
1018                    channel_subscriptions.lock().await.remove(&channel_id);
1019                    connection_channel_subscriptions.retain(|id| id != &channel_id);
1020
1021                    ManagementResponse::ChannelClosed { channel_id }
1022                }
1023                ManagementCommand::NewStore {} => {
1024                    info!("Creating new store");
1025                    let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1026                    runtime_tx
1027                        .send(TheaterCommand::NewStore {
1028                            response_tx: cmd_tx,
1029                        })
1030                        .await?;
1031
1032                    let store_id = cmd_rx.await?;
1033                    ManagementResponse::StoreCreated {
1034                        store_id: store_id?.id,
1035                    }
1036                }
1037            };
1038
1039            debug!("Sending response: {:?}", response);
1040            if let Err(e) = client_tx.send(response).await {
1041                error!("Failed to send response: {}", e);
1042                break;
1043            }
1044            debug!("Response sent");
1045        }
1046
1047        // Clean up all subscriptions for this connection
1048        debug!(
1049            "Connection closed, cleaning up {} subscriptions",
1050            connection_subscriptions.len()
1051        );
1052        let mut subs = subscriptions.lock().await;
1053
1054        for (actor_id, sub_id) in connection_subscriptions {
1055            if let Some(actor_subs) = subs.get_mut(&actor_id) {
1056                actor_subs.retain(|sub| sub.id != sub_id);
1057
1058                // Remove the entry if no subscriptions remain
1059                if actor_subs.is_empty() {
1060                    subs.remove(&actor_id);
1061                }
1062            }
1063        }
1064
1065        // Clean up channel subscriptions
1066        debug!(
1067            "Connection closed, cleaning up {} channel subscriptions",
1068            connection_channel_subscriptions.len()
1069        );
1070        let mut channel_subs = channel_subscriptions.lock().await;
1071
1072        for channel_id in connection_channel_subscriptions {
1073            channel_subs.remove(&channel_id);
1074        }
1075
1076        debug!("Cleaned up all subscriptions for the connection");
1077        Ok(())
1078    }
1079}