1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9 ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10 ChannelParticipant,
11};
12use theater::{ActorError, ChainEvent, ManifestConfig};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::sync::{mpsc, Mutex};
15use tokio_util::codec::Framed;
16use tracing::{debug, error, info};
17use uuid::Uuid;
18
19use theater::id::TheaterId;
20use theater::messages::{ChannelId, TheaterCommand};
21use theater::theater_runtime::TheaterRuntime;
22use theater::TheaterRuntimeError;
23
24use crate::fragmenting_codec::FragmentingCodec;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub enum ManagementCommand {
28 StartActor {
29 manifest: String,
30 initial_state: Option<Vec<u8>>,
31 parent: bool,
32 subscribe: bool,
33 },
34 StopActor {
35 id: TheaterId,
36 },
37 TerminateActor {
38 id: TheaterId,
39 },
40 ListActors,
41 SubscribeToActor {
42 id: TheaterId,
43 },
44 UnsubscribeFromActor {
45 id: TheaterId,
46 subscription_id: Uuid,
47 },
48 SendActorMessage {
49 id: TheaterId,
50 data: Vec<u8>,
51 },
52 RequestActorMessage {
53 id: TheaterId,
54 data: Vec<u8>,
55 },
56 GetActorManifest {
57 id: TheaterId,
58 },
59 GetActorStatus {
60 id: TheaterId,
61 },
62 RestartActor {
63 id: TheaterId,
64 },
65 GetActorState {
66 id: TheaterId,
67 },
68 GetActorEvents {
69 id: TheaterId,
70 },
71 GetActorMetrics {
72 id: TheaterId,
73 },
74 UpdateActorComponent {
75 id: TheaterId,
76 component: String,
77 },
78 OpenChannel {
80 actor_id: ChannelParticipant,
81 initial_message: Vec<u8>,
82 },
83 SendOnChannel {
84 channel_id: String,
85 message: Vec<u8>,
86 },
87 CloseChannel {
88 channel_id: String,
89 },
90
91 NewStore {},
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub enum ManagementResponse {
97 ActorStarted {
98 id: TheaterId,
99 },
100 ActorStopped {
101 id: TheaterId,
102 },
103 ActorList {
104 actors: Vec<(TheaterId, String)>,
105 },
106 Subscribed {
107 id: TheaterId,
108 subscription_id: Uuid,
109 },
110 Unsubscribed {
111 id: TheaterId,
112 },
113 ActorEvent {
114 event: ChainEvent,
115 },
116 ActorResult(ActorResult),
117 ActorError {
118 error: ActorError,
119 },
120 Error {
121 error: ManagementError,
122 },
123 RequestedMessage {
124 id: TheaterId,
125 message: Vec<u8>,
126 },
127 SentMessage {
128 id: TheaterId,
129 },
130 ActorStatus {
131 id: TheaterId,
132 status: ActorStatus,
133 },
134 Restarted {
135 id: TheaterId,
136 },
137 ActorManifest {
138 id: TheaterId,
139 manifest: ManifestConfig,
140 },
141 ActorState {
142 id: TheaterId,
143 state: Option<Vec<u8>>,
144 },
145 ActorEvents {
146 id: TheaterId,
147 events: Vec<ChainEvent>,
148 },
149 ActorMetrics {
150 id: TheaterId,
151 metrics: serde_json::Value,
152 },
153 ActorComponentUpdated {
154 id: TheaterId,
155 },
156 ChannelOpened {
158 channel_id: String,
159 actor_id: ChannelParticipant,
160 },
161 MessageSent {
162 channel_id: String,
163 },
164 ChannelMessage {
165 channel_id: String,
166 sender_id: ChannelParticipant,
167 message: Vec<u8>,
168 },
169 ChannelClosed {
170 channel_id: String,
171 },
172
173 StoreCreated {
175 store_id: String,
176 },
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub enum ManagementError {
181 ActorNotFound,
183 ActorAlreadyExists,
184 ActorNotRunning,
185 ActorError(String),
186
187 ChannelNotFound,
189 ChannelClosed,
190 ChannelRejected,
191
192 StoreError(String),
194
195 CommunicationError(String),
197
198 InvalidRequest(String),
200 Timeout,
201
202 RuntimeError(String),
204 InternalError(String),
205
206 SerializationError(String),
208}
209
210impl From<TheaterRuntimeError> for ManagementError {
212 fn from(err: TheaterRuntimeError) -> Self {
213 match err {
214 TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
215 TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
216 TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
217 TheaterRuntimeError::ActorOperationFailed(msg) => {
218 ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
219 }
220 TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
221 TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
222 TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
223 TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
224 TheaterRuntimeError::SerializationError(msg) => {
225 ManagementError::SerializationError(msg)
226 }
227 TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
228 }
229 }
230}
231
232#[derive(Debug)]
233#[allow(dead_code)]
234struct Subscription {
235 id: Uuid,
236 client_tx: mpsc::Sender<ManagementResponse>,
237}
238
239impl Eq for Subscription {}
240impl PartialEq for Subscription {
241 fn eq(&self, other: &Self) -> bool {
242 self.id == other.id
243 }
244}
245impl std::hash::Hash for Subscription {
246 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
247 self.id.hash(state);
248 }
249}
250
251#[derive(Debug)]
255#[allow(dead_code)]
256struct ChannelSubscription {
257 channel_id: String,
258 initiator_id: ChannelParticipant,
259 target_id: ChannelParticipant,
260 client_tx: mpsc::Sender<ManagementResponse>,
261}
262
263pub struct TheaterServer {
264 runtime: TheaterRuntime,
265 theater_tx: mpsc::Sender<TheaterCommand>,
266 management_socket: TcpListener,
267 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
268 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
270 #[allow(dead_code)]
272 channel_events_tx: mpsc::Sender<ChannelEvent>,
273}
274
275impl TheaterServer {
276 async fn process_channel_events(
278 mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
279 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
280 ) {
281 while let Some(event) = channel_events_rx.recv().await {
282 match event {
283 ChannelEvent::Message {
284 channel_id,
285 sender_id,
286 message,
287 } => {
288 tracing::debug!("Received channel message for {}", channel_id);
289 let subs = channel_subscriptions.lock().await;
291 if let Some(sub) = subs.get(&channel_id.0) {
292 let response = ManagementResponse::ChannelMessage {
293 channel_id: channel_id.0.clone(),
294 sender_id,
295 message,
296 };
297
298 tracing::debug!("Forwarding channel message to client: {:?}", response);
299
300 if let Err(e) = sub.client_tx.send(response).await {
301 tracing::warn!("Failed to forward channel message: {}", e);
302 } else {
303 tracing::debug!("Forwarded channel message to client");
304 }
305 }
306 }
307 ChannelEvent::Close { channel_id } => {
308 tracing::debug!("Received channel close event for {}", channel_id);
309 let mut subs = channel_subscriptions.lock().await;
311 if let Some(sub) = subs.remove(&channel_id.0) {
312 let response = ManagementResponse::ChannelClosed {
313 channel_id: channel_id.0.clone(),
314 };
315
316 if let Err(e) = sub.client_tx.send(response).await {
317 tracing::warn!("Failed to forward channel close event: {}", e);
318 } else {
319 tracing::debug!("Forwarded channel close event to client");
320 }
321 }
322 }
323 }
324 }
325 }
326
327 pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
328 let (theater_tx, theater_rx) = mpsc::channel(32);
329
330 let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
332
333 let runtime = TheaterRuntime::new(
335 theater_tx.clone(),
336 theater_rx,
337 Some(channel_events_tx.clone()),
338 theater::config::permissions::HandlerPermission::root(), )
340 .await?;
341 let management_socket = TcpListener::bind(address).await?;
342
343 let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
344
345 let channel_subs_clone = channel_subscriptions.clone();
347 tokio::spawn(async move {
348 Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
349 });
350
351 Ok(Self {
352 runtime,
353 theater_tx,
354 management_socket,
355 subscriptions: Arc::new(Mutex::new(HashMap::new())),
356 channel_subscriptions,
357 channel_events_tx,
358 })
359 }
360
361 pub async fn run(mut self) -> Result<()> {
362 info!(
363 "Theater server starting on {:?}",
364 self.management_socket.local_addr()?
365 );
366
367 let runtime_handle = tokio::spawn(async move {
369 match self.runtime.run().await {
370 Ok(_) => Ok(()),
371 Err(e) => {
372 error!("Theater runtime failed: {}", e);
373 Err(e)
374 }
375 }
376 });
377
378 while let Ok((socket, addr)) = self.management_socket.accept().await {
380 info!("New management connection from {}", addr);
381 let runtime_tx = self.theater_tx.clone();
382 let subscriptions = self.subscriptions.clone();
383 let channel_subscriptions = self.channel_subscriptions.clone();
384
385 tokio::spawn(async move {
386 if let Err(e) = Self::handle_management_connection(
387 socket,
388 runtime_tx,
389 subscriptions,
390 channel_subscriptions,
391 )
392 .await
393 {
394 error!("Error handling management connection: {}", e);
395 }
396 });
397 }
398
399 runtime_handle.await??;
400 Ok(())
401 }
402
403 async fn handle_management_connection(
404 socket: TcpStream,
405 runtime_tx: mpsc::Sender<TheaterCommand>,
406 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
407 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
408 ) -> Result<()> {
409 let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
411
412 let codec = FragmentingCodec::new();
413 let framed = Framed::new(socket, codec);
414
415 let (mut framed_sink, mut framed_stream) = framed.split();
417
418 let cmd_client_tx = client_tx.clone();
420
421 let _response_task = tokio::spawn(async move {
423 while let Some(response) = client_rx.recv().await {
424 match serde_json::to_vec(&response) {
425 Ok(data) => {
426 if let Err(e) = framed_sink.send(Bytes::from(data)).await {
427 debug!("Error sending response to client: {}", e);
428 break;
429 }
430 }
431 Err(e) => {
432 error!("Error serializing response: {}", e);
433 }
434 }
435 }
436 debug!("Response forwarder for client closed");
437 });
438
439 let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
441
442 let mut connection_channel_subscriptions: Vec<String> = Vec::new();
444
445 'connection: while let Some(msg) = framed_stream.next().await {
447 debug!("Received management message");
448 let msg = match msg {
449 Ok(m) => m,
450 Err(e) => {
451 error!("Error receiving message: {}", e);
452 break 'connection;
453 }
454 };
455
456 let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
457 Ok(c) => c,
458 Err(e) => {
459 error!(
460 "Error parsing command: {} {}",
461 e,
462 String::from_utf8_lossy(&msg)
463 );
464 continue;
465 }
466 };
467 debug!("Parsed command: {:?}", cmd);
468
469 let _cmd_clone = cmd.clone();
471
472 let response = match cmd {
473 ManagementCommand::StartActor {
474 manifest,
475 initial_state,
476 parent,
477 subscribe,
478 } => {
479 info!("Starting actor from manifest: {:?}", manifest);
480 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
481 debug!("Sending SpawnActor command to runtime");
482 let supervisor_tx = if parent {
483 let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
484 let cmd_client_tx = cmd_client_tx.clone();
485 tokio::spawn(async move {
486 while let Some(res) = supervisor_rx.recv().await {
487 debug!("Received supervisor response: {:?}", res);
488 if let Err(e) = cmd_client_tx
489 .send(ManagementResponse::ActorResult(res))
490 .await
491 {
492 error!("Failed to send supervisor response: {}", e);
493 break;
494 }
495 }
496 });
497 Some(supervisor_tx)
498 } else {
499 None
500 };
501 let subscription_tx = if subscribe {
502 let (event_tx, mut event_rx) = mpsc::channel(32);
503
504 let cmd_client_tx = cmd_client_tx.clone();
506 tokio::spawn(async move {
507 while let Some(event) = event_rx.recv().await {
508 debug!("Received event for subscription");
509 let response = match event {
510 Ok(event) => ManagementResponse::ActorEvent { event },
511 Err(e) => ManagementResponse::ActorError { error: e },
512 };
513 if let Err(e) = cmd_client_tx.send(response).await {
514 debug!("Failed to forward event to client: {}", e);
515 break;
516 }
517 }
518 debug!("Event forwarder for subscription stopped");
519 });
520
521 Some(event_tx)
522 } else {
523 None
524 };
525 match runtime_tx
526 .send(TheaterCommand::SpawnActor {
527 manifest_path: manifest.clone(),
528 init_bytes: initial_state,
529 response_tx: cmd_tx,
530 parent_id: None,
531 supervisor_tx,
532 subscription_tx,
533 })
534 .await
535 {
536 Ok(_) => {
537 debug!("SpawnActor command sent to runtime, awaiting response");
538 match cmd_rx.await {
539 Ok(result) => match result {
540 Ok(actor_id) => {
541 info!("Actor started with ID: {:?}", actor_id);
542 ManagementResponse::ActorStarted { id: actor_id }
543 }
544 Err(e) => {
545 error!("Runtime failed to start actor: {}", e);
546 ManagementResponse::Error {
547 error: ManagementError::RuntimeError(format!(
548 "Failed to start actor: {}",
549 e
550 )),
551 }
552 }
553 },
554 Err(e) => {
555 error!("Failed to receive spawn response: {}", e);
556 ManagementResponse::Error {
557 error: ManagementError::CommunicationError(format!(
558 "Failed to receive spawn response: {}",
559 e
560 )),
561 }
562 }
563 }
564 }
565 Err(e) => {
566 error!("Failed to send SpawnActor command: {}", e);
567 ManagementResponse::Error {
568 error: ManagementError::CommunicationError(format!(
569 "Failed to send spawn command: {}",
570 e
571 )),
572 }
573 }
574 }
575 }
576 ManagementCommand::StopActor { id } => {
577 info!("Stopping actor: {:?}", id);
578 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
579 runtime_tx
580 .send(TheaterCommand::StopActor {
581 actor_id: id.clone(),
582 response_tx: cmd_tx,
583 })
584 .await?;
585
586 match cmd_rx.await? {
587 Ok(_) => {
588 subscriptions.lock().await.remove(&id);
589 ManagementResponse::ActorStopped { id }
590 }
591 Err(e) => ManagementResponse::Error {
592 error: ManagementError::RuntimeError(format!(
593 "Failed to stop actor: {}",
594 e
595 )),
596 },
597 }
598 }
599 ManagementCommand::TerminateActor { id } => {
600 info!("Terminating actor: {:?}", id);
601 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
602 runtime_tx
603 .send(TheaterCommand::TerminateActor {
604 actor_id: id.clone(),
605 response_tx: cmd_tx,
606 })
607 .await?;
608
609 match cmd_rx.await? {
610 Ok(_) => {
611 subscriptions.lock().await.remove(&id);
612 ManagementResponse::ActorStopped { id }
613 }
614 Err(e) => ManagementResponse::Error {
615 error: ManagementError::RuntimeError(format!(
616 "Failed to terminate actor: {}",
617 e
618 )),
619 },
620 }
621 }
622 ManagementCommand::ListActors => {
623 debug!("Listing actors");
624 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
625 runtime_tx
626 .send(TheaterCommand::GetActors {
627 response_tx: cmd_tx,
628 })
629 .await?;
630
631 match cmd_rx.await? {
632 Ok(actors) => {
633 info!("Found {} actors", actors.len());
634 ManagementResponse::ActorList { actors }
635 }
636 Err(e) => ManagementResponse::Error {
637 error: ManagementError::RuntimeError(format!(
638 "Failed to list actors: {}",
639 e
640 )),
641 },
642 }
643 }
644 ManagementCommand::SubscribeToActor { id } => {
645 info!("New subscription request for actor: {:?}", id);
646 let subscription_id = Uuid::new_v4();
647 let subscription = Subscription {
648 id: subscription_id,
649 client_tx: cmd_client_tx.clone(),
650 };
651
652 debug!("Subscription created with ID: {}", subscription_id);
653
654 subscriptions
656 .lock()
657 .await
658 .entry(id.clone())
659 .or_default()
660 .insert(subscription);
661
662 let (event_tx, mut event_rx) = mpsc::channel(32);
664 runtime_tx
665 .send(TheaterCommand::SubscribeToActor {
666 actor_id: id.clone(),
667 event_tx,
668 })
669 .await
670 .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
671
672 connection_subscriptions.push((id.clone(), subscription_id));
674
675 let client_tx_clone = cmd_client_tx.clone();
677 tokio::spawn(async move {
678 debug!(
679 "Starting event forwarder for subscription {}",
680 subscription_id
681 );
682 while let Some(event) = event_rx.recv().await {
683 debug!("Received event for subscription {}", subscription_id);
684 let response = match event {
685 Ok(event) => ManagementResponse::ActorEvent { event },
686 Err(e) => ManagementResponse::ActorError { error: e },
687 };
688 if let Err(e) = client_tx_clone.send(response).await {
689 debug!("Failed to forward event to client: {}", e);
690 break;
691 }
692 }
693 debug!(
694 "Event forwarder for subscription {} stopped",
695 subscription_id
696 );
697 });
698
699 ManagementResponse::Subscribed {
700 id,
701 subscription_id,
702 }
703 }
704 ManagementCommand::UnsubscribeFromActor {
705 id,
706 subscription_id,
707 } => {
708 debug!(
709 "Removing subscription {} for actor {:?}",
710 subscription_id, id
711 );
712
713 connection_subscriptions
715 .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
716
717 let mut subs = subscriptions.lock().await;
719 if let Some(actor_subs) = subs.get_mut(&id) {
720 actor_subs.retain(|sub| sub.id != subscription_id);
721
722 if actor_subs.is_empty() {
724 subs.remove(&id);
725 }
726 }
727
728 debug!("Subscription removed");
729 ManagementResponse::Unsubscribed { id }
730 }
731 ManagementCommand::SendActorMessage { id, data } => {
732 info!("Sending message to actor: {:?}", id);
733 runtime_tx
734 .send(TheaterCommand::SendMessage {
735 actor_id: id.clone(),
736 actor_message: ActorMessage::Send(ActorSend { data: data.clone() }),
737 })
738 .await?;
739
740 ManagementResponse::SentMessage { id }
741 }
742 ManagementCommand::RequestActorMessage { id, data } => {
743 info!("Requesting message from actor: {:?}", id);
744 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
745 runtime_tx
746 .send(TheaterCommand::SendMessage {
747 actor_id: id.clone(),
748 actor_message: ActorMessage::Request(ActorRequest {
749 data: data.clone(),
750 response_tx: cmd_tx,
751 }),
752 })
753 .await?;
754
755 let response = cmd_rx.await?;
756 ManagementResponse::RequestedMessage {
757 id,
758 message: response,
759 }
760 }
761 ManagementCommand::GetActorManifest { id } => {
762 info!("Getting manifest for actor: {:?}", id);
763 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
764 runtime_tx
765 .send(TheaterCommand::GetActorManifest {
766 actor_id: id.clone(),
767 response_tx: cmd_tx,
768 })
769 .await?;
770
771 let manifest = cmd_rx.await?;
772 ManagementResponse::ActorManifest {
773 id,
774 manifest: manifest?,
775 }
776 }
777 ManagementCommand::GetActorStatus { id } => {
778 info!("Getting status for actor: {:?}", id);
779 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
780 runtime_tx
781 .send(TheaterCommand::GetActorStatus {
782 actor_id: id.clone(),
783 response_tx: cmd_tx,
784 })
785 .await?;
786
787 let status = cmd_rx.await?;
788 ManagementResponse::ActorStatus {
789 id,
790 status: status?,
791 }
792 }
793 ManagementCommand::RestartActor { id } => {
794 info!("Restarting actor: {:?}", id);
795 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
796 runtime_tx
797 .send(TheaterCommand::RestartActor {
798 actor_id: id.clone(),
799 response_tx: cmd_tx,
800 })
801 .await?;
802
803 match cmd_rx.await? {
804 Ok(_) => ManagementResponse::Restarted { id },
805 Err(e) => ManagementResponse::Error {
806 error: ManagementError::RuntimeError(format!(
807 "Failed to restart actor: {}",
808 e
809 )),
810 },
811 }
812 }
813 ManagementCommand::GetActorState { id } => {
814 info!("Getting state for actor: {:?}", id);
815 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
816 runtime_tx
817 .send(TheaterCommand::GetActorState {
818 actor_id: id.clone(),
819 response_tx: cmd_tx,
820 })
821 .await?;
822
823 let state = cmd_rx.await?;
824 ManagementResponse::ActorState { id, state: state? }
825 }
826 ManagementCommand::GetActorEvents { id } => {
827 info!("Getting events for actor: {:?}", id);
828 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
829 runtime_tx
830 .send(TheaterCommand::GetActorEvents {
831 actor_id: id.clone(),
832 response_tx: cmd_tx,
833 })
834 .await?;
835
836 match cmd_rx.await {
837 Ok(result) => match result {
838 Ok(events) => {
839 debug!(
840 "Successfully retrieved {} events for actor {}",
841 events.len(),
842 id
843 );
844 ManagementResponse::ActorEvents { id, events }
845 }
846 Err(e) => {
847 debug!("Error getting events for actor {}: {}", id, e);
848 ManagementResponse::Error { error: e.into() }
849 }
850 },
851 Err(e) => {
852 error!("Failed to receive events response: {}", e);
853 ManagementResponse::Error {
854 error: ManagementError::CommunicationError(format!(
855 "Failed to receive events response: {}",
856 e
857 )),
858 }
859 }
860 }
861 }
862 ManagementCommand::GetActorMetrics { id } => {
863 info!("Getting metrics for actor: {:?}", id);
864 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
865 runtime_tx
866 .send(TheaterCommand::GetActorMetrics {
867 actor_id: id.clone(),
868 response_tx: cmd_tx,
869 })
870 .await?;
871
872 let metrics = cmd_rx.await?;
873 ManagementResponse::ActorMetrics {
874 id,
875 metrics: serde_json::to_value(metrics?)?,
876 }
877 }
878 ManagementCommand::UpdateActorComponent { id, component } => {
879 info!("Updating component for actor {:?} to {}", id, component);
880 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
881 runtime_tx
882 .send(TheaterCommand::UpdateActorComponent {
883 actor_id: id.clone(),
884 component: component.clone(),
885 response_tx: cmd_tx,
886 })
887 .await?;
888
889 match cmd_rx.await? {
890 Ok(_) => ManagementResponse::ActorComponentUpdated { id },
891 Err(e) => ManagementResponse::Error {
892 error: ManagementError::RuntimeError(format!(
893 "Failed to update actor component: {}",
894 e
895 )),
896 },
897 }
898 }
899 ManagementCommand::OpenChannel {
901 actor_id,
902 initial_message,
903 } => {
904 info!("Opening channel to actor: {:?}", actor_id);
905
906 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
908
909 let client_id = ChannelParticipant::External;
911 let channel_id = ChannelId::new(&client_id, &actor_id);
912 let channel_id_str = channel_id.0.clone();
913
914 runtime_tx
916 .send(TheaterCommand::ChannelOpen {
917 initiator_id: client_id.clone(),
918 target_id: actor_id.clone(),
919 channel_id: channel_id.clone(),
920 initial_message,
921 response_tx,
922 })
923 .await
924 .map_err(|e| {
925 anyhow::anyhow!("Failed to send channel open command: {}", e)
926 })?;
927
928 match response_rx.await {
930 Ok(result) => {
931 match result {
932 Ok(accepted) => {
933 if accepted {
934 info!("Channel opened successfully: {}", channel_id_str);
936
937 let channel_sub = ChannelSubscription {
939 channel_id: channel_id_str.clone(),
940 initiator_id: client_id.clone(),
941 target_id: actor_id.clone(),
942 client_tx: cmd_client_tx.clone(),
943 };
944
945 channel_subscriptions
946 .lock()
947 .await
948 .insert(channel_id_str.clone(), channel_sub);
949
950 connection_channel_subscriptions
952 .push(channel_id_str.clone());
953
954 ManagementResponse::ChannelOpened {
955 channel_id: channel_id_str,
956 actor_id,
957 }
958 } else {
959 ManagementResponse::Error {
961 error: ManagementError::ChannelRejected,
962 }
963 }
964 }
965 Err(e) => ManagementResponse::Error {
966 error: ManagementError::RuntimeError(format!(
967 "Error opening channel: {}",
968 e
969 )),
970 },
971 }
972 }
973 Err(e) => ManagementResponse::Error {
974 error: ManagementError::CommunicationError(format!(
975 "Failed to receive channel open response: {}",
976 e
977 )),
978 },
979 }
980 }
981 ManagementCommand::SendOnChannel {
982 channel_id,
983 message,
984 } => {
985 info!("Sending message on channel: {}", channel_id);
986
987 let channel_id_parsed = ChannelId(channel_id.clone());
989 let client_id = ChannelParticipant::External;
990
991 runtime_tx
993 .send(TheaterCommand::ChannelMessage {
994 channel_id: channel_id_parsed,
995 message,
996 sender_id: client_id,
997 })
998 .await
999 .map_err(|e| anyhow::anyhow!("Failed to send message on channel: {}", e))?;
1000
1001 ManagementResponse::MessageSent { channel_id }
1002 }
1003 ManagementCommand::CloseChannel { channel_id } => {
1004 info!("Closing channel: {}", channel_id);
1005
1006 let channel_id_parsed = ChannelId(channel_id.clone());
1008
1009 runtime_tx
1011 .send(TheaterCommand::ChannelClose {
1012 channel_id: channel_id_parsed,
1013 })
1014 .await
1015 .map_err(|e| anyhow::anyhow!("Failed to close channel: {}", e))?;
1016
1017 channel_subscriptions.lock().await.remove(&channel_id);
1019 connection_channel_subscriptions.retain(|id| id != &channel_id);
1020
1021 ManagementResponse::ChannelClosed { channel_id }
1022 }
1023 ManagementCommand::NewStore {} => {
1024 info!("Creating new store");
1025 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1026 runtime_tx
1027 .send(TheaterCommand::NewStore {
1028 response_tx: cmd_tx,
1029 })
1030 .await?;
1031
1032 let store_id = cmd_rx.await?;
1033 ManagementResponse::StoreCreated {
1034 store_id: store_id?.id,
1035 }
1036 }
1037 };
1038
1039 debug!("Sending response: {:?}", response);
1040 if let Err(e) = client_tx.send(response).await {
1041 error!("Failed to send response: {}", e);
1042 break;
1043 }
1044 debug!("Response sent");
1045 }
1046
1047 debug!(
1049 "Connection closed, cleaning up {} subscriptions",
1050 connection_subscriptions.len()
1051 );
1052 let mut subs = subscriptions.lock().await;
1053
1054 for (actor_id, sub_id) in connection_subscriptions {
1055 if let Some(actor_subs) = subs.get_mut(&actor_id) {
1056 actor_subs.retain(|sub| sub.id != sub_id);
1057
1058 if actor_subs.is_empty() {
1060 subs.remove(&actor_id);
1061 }
1062 }
1063 }
1064
1065 debug!(
1067 "Connection closed, cleaning up {} channel subscriptions",
1068 connection_channel_subscriptions.len()
1069 );
1070 let mut channel_subs = channel_subscriptions.lock().await;
1071
1072 for channel_id in connection_channel_subscriptions {
1073 channel_subs.remove(&channel_id);
1074 }
1075
1076 debug!("Cleaned up all subscriptions for the connection");
1077 Ok(())
1078 }
1079}