1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9 ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10 ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21 RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{default_init_state, ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::resolve_reference;
28use theater::TheaterRuntimeError;
29
30use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42 StartActor {
43 manifest: String,
44 initial_state: Option<Vec<u8>>,
45 parent: bool,
46 subscribe: bool,
47 },
48 StopActor {
49 id: TheaterId,
50 },
51 TerminateActor {
52 id: TheaterId,
53 },
54 ListActors,
55 SubscribeToActor {
56 id: TheaterId,
57 },
58 UnsubscribeFromActor {
59 id: TheaterId,
60 subscription_id: Uuid,
61 },
62 SendActorMessage {
63 id: TheaterId,
64 data: Vec<u8>,
65 },
66 RequestActorMessage {
67 id: TheaterId,
68 data: Vec<u8>,
69 },
70 GetActorManifest {
71 id: TheaterId,
72 },
73 GetActorStatus {
74 id: TheaterId,
75 },
76 RestartActor {
77 id: TheaterId,
78 },
79 GetActorState {
80 id: TheaterId,
81 },
82 GetActorMetrics {
83 id: TheaterId,
84 },
85 UpdateActorPackage {
86 id: TheaterId,
87 package: String,
88 },
89 OpenChannel {
91 actor_id: ChannelParticipant,
92 initial_message: Vec<u8>,
93 },
94 SendOnChannel {
95 channel_id: String,
96 message: Vec<u8>,
97 },
98 CloseChannel {
99 channel_id: String,
100 },
101
102 NewStore {},
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[allow(clippy::large_enum_variant)]
108pub enum ManagementResponse {
109 ActorStarted {
110 id: TheaterId,
111 },
112 ActorStopped {
113 id: TheaterId,
114 },
115 ActorList {
116 actors: Vec<(TheaterId, String)>,
117 },
118 Subscribed {
119 id: TheaterId,
120 subscription_id: Uuid,
121 },
122 Unsubscribed {
123 id: TheaterId,
124 },
125 ActorEvent {
126 event: ChainEvent,
127 },
128 ActorResult(ActorResult),
129 Error {
130 error: ManagementError,
131 },
132 RequestedMessage {
133 id: TheaterId,
134 message: Vec<u8>,
135 },
136 SentMessage {
137 id: TheaterId,
138 },
139 ActorStatus {
140 id: TheaterId,
141 status: ActorStatus,
142 },
143 Restarted {
144 id: TheaterId,
145 },
146 ActorManifest {
147 id: TheaterId,
148 manifest: ManifestConfig,
149 },
150 ActorState {
151 id: TheaterId,
152 state: Value,
153 },
154 ActorMetrics {
155 id: TheaterId,
156 metrics: serde_json::Value,
157 },
158 ActorPackageUpdated {
159 id: TheaterId,
160 },
161 ChannelOpened {
163 channel_id: String,
164 actor_id: ChannelParticipant,
165 },
166 MessageSent {
167 channel_id: String,
168 },
169 ChannelMessage {
170 channel_id: String,
171 sender_id: ChannelParticipant,
172 message: Vec<u8>,
173 },
174 ChannelClosed {
175 channel_id: String,
176 },
177
178 StoreCreated {
180 store_id: String,
181 },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub enum ManagementError {
186 ActorNotFound,
188 ActorAlreadyExists,
189 ActorNotRunning,
190 ActorError(String),
191
192 ChannelNotFound,
194 ChannelClosed,
195 ChannelRejected,
196
197 StoreError(String),
199
200 CommunicationError(String),
202
203 InvalidRequest(String),
205 Timeout,
206
207 RuntimeError(String),
209 InternalError(String),
210
211 SerializationError(String),
213
214 ActorInitializationError(String),
216}
217
218impl From<TheaterRuntimeError> for ManagementError {
220 fn from(err: TheaterRuntimeError) -> Self {
221 match err {
222 TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
223 TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
224 TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
225 TheaterRuntimeError::ActorOperationFailed(msg) => {
226 ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
227 }
228 TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
229 TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
230 TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
231 TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
232 TheaterRuntimeError::SerializationError(msg) => {
233 ManagementError::SerializationError(msg)
234 }
235 TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
236 TheaterRuntimeError::ActorInitializationError(msg) => {
237 ManagementError::ActorInitializationError(msg)
238 }
239 }
240 }
241}
242
243impl std::fmt::Display for ManagementError {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 match self {
247 ManagementError::ActorNotFound => write!(f, "Actor not found"),
248 ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
249 ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
250 ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
251 ManagementError::ChannelNotFound => write!(f, "Channel not found"),
252 ManagementError::ChannelClosed => write!(f, "Channel is closed"),
253 ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
254 ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
255 ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
256 ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
257 ManagementError::Timeout => write!(f, "Operation timed out"),
258 ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
259 ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
260 ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
261 ManagementError::ActorInitializationError(msg) => {
262 write!(f, "Actor initialization error: {}", msg)
263 }
264 }
265 }
266}
267
268impl std::error::Error for ManagementError {}
270
271#[derive(Debug)]
272#[allow(dead_code)]
273struct Subscription {
274 id: Uuid,
275 client_tx: mpsc::Sender<ManagementResponse>,
276}
277
278impl Eq for Subscription {}
279impl PartialEq for Subscription {
280 fn eq(&self, other: &Self) -> bool {
281 self.id == other.id
282 }
283}
284impl std::hash::Hash for Subscription {
285 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
286 self.id.hash(state);
287 }
288}
289
290#[derive(Debug)]
294#[allow(dead_code)]
295struct ChannelSubscription {
296 channel_id: String,
297 initiator_id: ChannelParticipant,
298 target_id: ChannelParticipant,
299 client_tx: mpsc::Sender<ManagementResponse>,
300}
301
302fn create_root_handler_registry(
311 theater_tx: mpsc::Sender<TheaterCommand>,
312) -> (
313 HandlerRegistry,
314 theater_handler_message_server::MessageRouter,
315) {
316 let mut registry = HandlerRegistry::new();
317
318 info!("Initializing Theater server with Theater-specific handlers...");
319
320 let runtime_config = RuntimeHostConfig {};
322 registry.register(RuntimeHandler::new(
323 runtime_config,
324 theater_tx.clone(),
325 None,
326 ));
327
328 let store_config = StoreHandlerConfig::default();
330 registry.register(StoreHandler::new(store_config, None));
331
332 let supervisor_config = SupervisorHostConfig {};
334 registry.register(SupervisorHandler::new(supervisor_config, None));
335
336 let message_router = theater_handler_message_server::MessageRouter::new();
338 registry.register(MessageServerHandler::new(None, message_router.clone()));
339
340 let tcp_config = TcpHandlerConfig {
342 listen: None,
343 max_connections: None,
344 ..Default::default()
345 };
346 registry.register(TcpHandler::new(tcp_config));
347
348 info!("✓ 5 Theater-specific handlers registered");
349 info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
350
351 (registry, message_router)
352}
353
354pub struct TheaterServer {
355 runtime: TheaterRuntime,
356 theater_tx: mpsc::Sender<TheaterCommand>,
357 management_socket: TcpListener,
358 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
359 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
361 #[allow(dead_code)]
363 channel_events_tx: mpsc::Sender<ChannelEvent>,
364 message_router: theater_handler_message_server::MessageRouter,
366}
367
368impl TheaterServer {
369 async fn process_channel_events(
371 mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
372 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
373 ) {
374 while let Some(event) = channel_events_rx.recv().await {
375 match event {
376 ChannelEvent::Message {
377 channel_id,
378 sender_id,
379 message,
380 } => {
381 tracing::debug!("Received channel message for {}", channel_id);
382 let subs = channel_subscriptions.lock().await;
384 if let Some(sub) = subs.get(&channel_id.0) {
385 let response = ManagementResponse::ChannelMessage {
386 channel_id: channel_id.0.clone(),
387 sender_id,
388 message,
389 };
390
391 tracing::debug!("Forwarding channel message to client: {:?}", response);
392
393 if let Err(e) = sub.client_tx.send(response).await {
394 tracing::warn!("Failed to forward channel message: {}", e);
395 } else {
396 tracing::debug!("Forwarded channel message to client");
397 }
398 }
399 }
400 ChannelEvent::Close { channel_id } => {
401 tracing::debug!("Received channel close event for {}", channel_id);
402 let mut subs = channel_subscriptions.lock().await;
404 if let Some(sub) = subs.remove(&channel_id.0) {
405 let response = ManagementResponse::ChannelClosed {
406 channel_id: channel_id.0.clone(),
407 };
408
409 if let Err(e) = sub.client_tx.send(response).await {
410 tracing::warn!("Failed to forward channel close event: {}", e);
411 } else {
412 tracing::debug!("Forwarded channel close event to client");
413 }
414 }
415 }
416 }
417 }
418 }
419
420 pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
421 let (theater_tx, theater_rx) = mpsc::channel(32);
422
423 let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
425
426 let (handler_registry, message_router) = create_root_handler_registry(theater_tx.clone());
429
430 let runtime = TheaterRuntime::new(
432 theater_tx.clone(),
433 theater_rx,
434 Some(channel_events_tx.clone()),
435 handler_registry,
436 )
437 .await?;
438 let management_socket = TcpListener::bind(address).await?;
439
440 let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
441
442 let channel_subs_clone = channel_subscriptions.clone();
444 tokio::spawn(async move {
445 Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
446 });
447
448 Ok(Self {
449 runtime,
450 theater_tx,
451 management_socket,
452 subscriptions: Arc::new(Mutex::new(HashMap::new())),
453 channel_subscriptions,
454 channel_events_tx,
455 message_router,
456 })
457 }
458
459 pub async fn run(mut self) -> Result<()> {
460 info!(
461 "Theater server starting on {:?}",
462 self.management_socket.local_addr()?
463 );
464
465 let runtime_handle = tokio::spawn(async move {
467 match self.runtime.run().await {
468 Ok(_) => Ok(()),
469 Err(e) => {
470 error!("Theater runtime failed: {}", e);
471 Err(e)
472 }
473 }
474 });
475
476 while let Ok((socket, addr)) = self.management_socket.accept().await {
478 info!("New management connection from {}", addr);
479 let runtime_tx = self.theater_tx.clone();
480 let subscriptions = self.subscriptions.clone();
481 let channel_subscriptions = self.channel_subscriptions.clone();
482 let message_router = self.message_router.clone();
483
484 tokio::spawn(async move {
485 if let Err(e) = Self::handle_management_connection(
486 socket,
487 runtime_tx,
488 subscriptions,
489 channel_subscriptions,
490 message_router,
491 )
492 .await
493 {
494 error!("Error handling management connection: {}", e);
495 }
496 });
497 }
498
499 runtime_handle.await??;
500 Ok(())
501 }
502
503 async fn handle_management_connection(
504 socket: TcpStream,
505 runtime_tx: mpsc::Sender<TheaterCommand>,
506 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
507 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
508 message_router: theater_handler_message_server::MessageRouter,
509 ) -> Result<()> {
510 let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
512
513 let codec = FragmentingCodec::new();
514 let framed = Framed::new(socket, codec);
515
516 let (mut framed_sink, mut framed_stream) = framed.split();
518
519 let cmd_client_tx = client_tx.clone();
521
522 let _response_task = tokio::spawn(async move {
524 while let Some(response) = client_rx.recv().await {
525 match serde_json::to_vec(&response) {
526 Ok(data) => {
527 debug!("Serialized response: {} bytes", data.len());
528 if data.len() > 10 * 1024 * 1024 {
529 debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
530 }
531 if let Err(e) = framed_sink.send(Bytes::from(data)).await {
532 debug!("Error sending response to client: {}", e);
533 break;
534 }
535 }
536 Err(e) => {
537 error!("Error serializing response: {}", e);
538 }
539 }
540 }
541 debug!("Response forwarder for client closed");
542 });
543
544 let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
546
547 let mut connection_channel_subscriptions: Vec<String> = Vec::new();
549
550 'connection: while let Some(msg) = framed_stream.next().await {
552 debug!("Received management message");
553 let msg = match msg {
554 Ok(m) => m,
555 Err(e) => {
556 error!("Error receiving message: {}", e);
557 break 'connection;
558 }
559 };
560
561 let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
562 Ok(c) => c,
563 Err(e) => {
564 error!(
565 "Error parsing command: {} {}",
566 e,
567 String::from_utf8_lossy(&msg)
568 );
569 continue;
570 }
571 };
572 debug!("Parsed command: {:?}", cmd);
573
574 let _cmd_clone = cmd.clone();
576
577 let response = match cmd {
578 ManagementCommand::StartActor {
579 manifest,
580 initial_state: _initial_state,
581 parent,
582 subscribe,
583 } => {
584 info!("Starting actor from manifest: {:?}", manifest);
585
586 let manifest_str = match resolve_reference(&manifest).await {
588 Ok(bytes) => match String::from_utf8(bytes) {
589 Ok(s) => s,
590 Err(e) => {
591 error!("Invalid manifest encoding: {}", e);
592 cmd_client_tx
593 .send(ManagementResponse::Error {
594 error: ManagementError::ActorInitializationError(format!(
595 "Invalid manifest encoding: {}",
596 e
597 )),
598 })
599 .await
600 .ok();
601 continue;
602 }
603 },
604 Err(e) => {
605 error!("Failed to load manifest: {}", e);
606 cmd_client_tx
607 .send(ManagementResponse::Error {
608 error: ManagementError::ActorInitializationError(format!(
609 "Failed to load manifest: {}",
610 e
611 )),
612 })
613 .await
614 .ok();
615 continue;
616 }
617 };
618
619 let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
620 Ok(m) => m,
621 Err(e) => {
622 error!("Failed to parse manifest: {}", e);
623 cmd_client_tx
624 .send(ManagementResponse::Error {
625 error: ManagementError::ActorInitializationError(format!(
626 "Failed to parse manifest: {}",
627 e
628 )),
629 })
630 .await
631 .ok();
632 continue;
633 }
634 };
635
636 let wasm_bytes = match resolve_reference(&manifest_config.package).await {
638 Ok(bytes) => bytes,
639 Err(e) => {
640 error!("Failed to load WASM: {}", e);
641 cmd_client_tx
642 .send(ManagementResponse::Error {
643 error: ManagementError::ActorInitializationError(format!(
644 "Failed to load WASM: {}",
645 e
646 )),
647 })
648 .await
649 .ok();
650 continue;
651 }
652 };
653
654 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
655 debug!("Sending SpawnActor command to runtime");
656 let supervisor_tx = if parent {
657 let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
658 let cmd_client_tx = cmd_client_tx.clone();
659 tokio::spawn(async move {
660 while let Some(res) = supervisor_rx.recv().await {
661 debug!("Received supervisor response: {:?}", res);
662 if let Err(e) = cmd_client_tx
663 .send(ManagementResponse::ActorResult(res))
664 .await
665 {
666 error!("Failed to send supervisor response: {}", e);
667 break;
668 }
669 }
670 });
671 Some(supervisor_tx)
672 } else {
673 None
674 };
675 let subscription_tx = if subscribe {
676 let (event_tx, mut event_rx) = mpsc::channel(32);
677
678 let cmd_client_tx = cmd_client_tx.clone();
680 tokio::spawn(async move {
681 while let Some((_actor_id, event)) = event_rx.recv().await {
682 debug!("Received event for subscription");
683 let response = ManagementResponse::ActorEvent { event };
684 if let Err(e) = cmd_client_tx.send(response).await {
685 debug!("Failed to forward event to client: {}", e);
686 break;
687 }
688 }
689 debug!("Event forwarder for subscription stopped");
690 });
691
692 Some(event_tx)
693 } else {
694 None
695 };
696 match runtime_tx
697 .send(TheaterCommand::SetupActor {
698 wasm_bytes,
699 name: Some(manifest_config.name.clone()),
700 manifest: Some(manifest_config),
701 init_state: default_init_state(),
702 response_tx: cmd_tx,
703 supervisor_tx,
704 subscription_tx,
705 })
706 .await
707 {
708 Ok(_) => {
709 debug!("SpawnActor command sent to runtime, awaiting response");
710 match cmd_rx.await {
711 Ok(result) => match result {
712 Ok(actor_id) => {
713 info!("Actor started with ID: {:?}", actor_id);
714 ManagementResponse::ActorStarted { id: actor_id }
715 }
716 Err(e) => {
717 error!("Runtime failed to start actor: {}", e);
718 ManagementResponse::Error {
719 error: ManagementError::RuntimeError(format!(
720 "Failed to start actor: {}",
721 e
722 )),
723 }
724 }
725 },
726 Err(e) => {
727 error!("Failed to receive spawn response: {}", e);
728 ManagementResponse::Error {
729 error: ManagementError::CommunicationError(format!(
730 "Failed to receive spawn response: {}",
731 e
732 )),
733 }
734 }
735 }
736 }
737 Err(e) => {
738 error!("Failed to send SpawnActor command: {}", e);
739 ManagementResponse::Error {
740 error: ManagementError::CommunicationError(format!(
741 "Failed to send spawn command: {}",
742 e
743 )),
744 }
745 }
746 }
747 }
748 ManagementCommand::StopActor { id } => {
749 info!("Stopping actor: {:?}", id);
750 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
751 runtime_tx
752 .send(TheaterCommand::StopActor {
753 actor_id: id,
754 response_tx: cmd_tx,
755 })
756 .await?;
757
758 match cmd_rx.await? {
759 Ok(_) => {
760 subscriptions.lock().await.remove(&id);
761 ManagementResponse::ActorStopped { id }
762 }
763 Err(e) => ManagementResponse::Error {
764 error: ManagementError::RuntimeError(format!(
765 "Failed to stop actor: {}",
766 e
767 )),
768 },
769 }
770 }
771 ManagementCommand::TerminateActor { id } => {
772 info!("Terminating actor: {:?}", id);
773 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
774 runtime_tx
775 .send(TheaterCommand::TerminateActor {
776 actor_id: id,
777 response_tx: cmd_tx,
778 })
779 .await?;
780
781 match cmd_rx.await? {
782 Ok(_) => {
783 subscriptions.lock().await.remove(&id);
784 ManagementResponse::ActorStopped { id }
785 }
786 Err(e) => ManagementResponse::Error {
787 error: ManagementError::RuntimeError(format!(
788 "Failed to terminate actor: {}",
789 e
790 )),
791 },
792 }
793 }
794 ManagementCommand::ListActors => {
795 debug!("Listing actors");
796 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
797 runtime_tx
798 .send(TheaterCommand::GetActors {
799 response_tx: cmd_tx,
800 })
801 .await?;
802
803 match cmd_rx.await? {
804 Ok(actors) => {
805 info!("Found {} actors", actors.len());
806 ManagementResponse::ActorList { actors }
807 }
808 Err(e) => ManagementResponse::Error {
809 error: ManagementError::RuntimeError(format!(
810 "Failed to list actors: {}",
811 e
812 )),
813 },
814 }
815 }
816 ManagementCommand::SubscribeToActor { id } => {
817 info!("New subscription request for actor: {:?}", id);
818 let subscription_id = Uuid::new_v4();
819 let subscription = Subscription {
820 id: subscription_id,
821 client_tx: cmd_client_tx.clone(),
822 };
823
824 debug!("Subscription created with ID: {}", subscription_id);
825
826 subscriptions
828 .lock()
829 .await
830 .entry(id)
831 .or_default()
832 .insert(subscription);
833
834 let (event_tx, mut event_rx) = mpsc::channel(32);
836 runtime_tx
837 .send(TheaterCommand::SubscribeToActor {
838 actor_id: id,
839 event_tx,
840 })
841 .await
842 .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
843
844 connection_subscriptions.push((id, subscription_id));
846
847 let client_tx_clone = cmd_client_tx.clone();
849 tokio::spawn(async move {
850 debug!(
851 "Starting event forwarder for subscription {}",
852 subscription_id
853 );
854 while let Some((_actor_id, event)) = event_rx.recv().await {
855 debug!("Received event for subscription {}", subscription_id);
856 let response = ManagementResponse::ActorEvent { event };
857 if let Err(e) = client_tx_clone.send(response).await {
858 debug!("Failed to forward event to client: {}", e);
859 break;
860 }
861 }
862 debug!(
863 "Event forwarder for subscription {} stopped",
864 subscription_id
865 );
866 });
867
868 ManagementResponse::Subscribed {
869 id,
870 subscription_id,
871 }
872 }
873 ManagementCommand::UnsubscribeFromActor {
874 id,
875 subscription_id,
876 } => {
877 debug!(
878 "Removing subscription {} for actor {:?}",
879 subscription_id, id
880 );
881
882 connection_subscriptions
884 .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
885
886 let mut subs = subscriptions.lock().await;
888 if let Some(actor_subs) = subs.get_mut(&id) {
889 actor_subs.retain(|sub| sub.id != subscription_id);
890
891 if actor_subs.is_empty() {
893 subs.remove(&id);
894 }
895 }
896
897 debug!("Subscription removed");
898 ManagementResponse::Unsubscribed { id }
899 }
900 ManagementCommand::SendActorMessage { id, data } => {
901 info!("Sending message to actor: {:?}", id);
902
903 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
905
906 let message = ActorMessage::Send(ActorSend { data });
908
909 match message_router
911 .route_message(theater::messages::MessageCommand::SendMessage {
912 target_id: id,
913 message,
914 response_tx,
915 })
916 .await
917 {
918 Ok(_) => {
919 match response_rx.await {
921 Ok(Ok(())) => {
922 info!("Message sent successfully to actor: {:?}", id);
923 ManagementResponse::SentMessage { id }
924 }
925 Ok(Err(e)) => {
926 error!("Failed to send message to actor: {}", e);
927 ManagementResponse::Error {
928 error: ManagementError::RuntimeError(format!(
929 "Failed to send: {}",
930 e
931 )),
932 }
933 }
934 Err(e) => {
935 error!("Failed to receive routing response: {}", e);
936 ManagementResponse::Error {
937 error: ManagementError::CommunicationError(format!(
938 "Failed to receive routing response: {}",
939 e
940 )),
941 }
942 }
943 }
944 }
945 Err(e) => {
946 error!("Failed to route message: {}", e);
947 ManagementResponse::Error {
948 error: ManagementError::RuntimeError(format!(
949 "Failed to route message: {}",
950 e
951 )),
952 }
953 }
954 }
955 }
956 ManagementCommand::RequestActorMessage { id, data } => {
957 info!("Requesting message from actor: {:?}", id);
958
959 let (route_tx, route_rx) = tokio::sync::oneshot::channel();
961 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
962
963 let message = ActorMessage::Request(ActorRequest { data, response_tx });
965
966 match message_router
968 .route_message(theater::messages::MessageCommand::SendMessage {
969 target_id: id,
970 message,
971 response_tx: route_tx,
972 })
973 .await
974 {
975 Ok(_) => {
976 match route_rx.await {
978 Ok(Ok(())) => {
979 match response_rx.await {
981 Ok(response_data) => {
982 info!("Received response from actor: {:?}", id);
983 ManagementResponse::RequestedMessage {
984 id,
985 message: response_data,
986 }
987 }
988 Err(e) => {
989 error!("Actor didn't respond: {}", e);
990 ManagementResponse::Error {
991 error: ManagementError::RuntimeError(format!(
992 "Actor didn't respond: {}",
993 e
994 )),
995 }
996 }
997 }
998 }
999 Ok(Err(e)) => {
1000 error!("Failed to route request to actor: {}", e);
1001 ManagementResponse::Error {
1002 error: ManagementError::RuntimeError(format!(
1003 "Failed to route: {}",
1004 e
1005 )),
1006 }
1007 }
1008 Err(e) => {
1009 error!("Failed to receive routing response: {}", e);
1010 ManagementResponse::Error {
1011 error: ManagementError::CommunicationError(format!(
1012 "Failed to receive routing response: {}",
1013 e
1014 )),
1015 }
1016 }
1017 }
1018 }
1019 Err(e) => {
1020 error!("Failed to route request: {}", e);
1021 ManagementResponse::Error {
1022 error: ManagementError::RuntimeError(format!(
1023 "Failed to route request: {}",
1024 e
1025 )),
1026 }
1027 }
1028 }
1029 }
1030 ManagementCommand::GetActorManifest { id } => {
1031 info!("Getting manifest for actor: {:?}", id);
1032 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1033 runtime_tx
1034 .send(TheaterCommand::GetActorManifest {
1035 actor_id: id,
1036 response_tx: cmd_tx,
1037 })
1038 .await?;
1039
1040 let manifest = cmd_rx.await?;
1041 ManagementResponse::ActorManifest {
1042 id,
1043 manifest: manifest?,
1044 }
1045 }
1046 ManagementCommand::GetActorStatus { id } => {
1047 info!("Getting status for actor: {:?}", id);
1048 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1049 runtime_tx
1050 .send(TheaterCommand::GetActorStatus {
1051 actor_id: id,
1052 response_tx: cmd_tx,
1053 })
1054 .await?;
1055
1056 let status = cmd_rx.await?;
1057 ManagementResponse::ActorStatus {
1058 id,
1059 status: status?,
1060 }
1061 }
1062 ManagementCommand::RestartActor { id } => {
1063 info!("Restarting actor: {:?}", id);
1064 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1065 runtime_tx
1066 .send(TheaterCommand::RestartActor {
1067 actor_id: id,
1068 response_tx: cmd_tx,
1069 })
1070 .await?;
1071
1072 match cmd_rx.await? {
1073 Ok(_) => ManagementResponse::Restarted { id },
1074 Err(e) => ManagementResponse::Error {
1075 error: ManagementError::RuntimeError(format!(
1076 "Failed to restart actor: {}",
1077 e
1078 )),
1079 },
1080 }
1081 }
1082 ManagementCommand::GetActorState { id } => {
1083 info!("Getting state for actor: {:?}", id);
1084 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1085 runtime_tx
1086 .send(TheaterCommand::GetActorState {
1087 actor_id: id,
1088 response_tx: cmd_tx,
1089 })
1090 .await?;
1091
1092 let state = cmd_rx.await?;
1093 ManagementResponse::ActorState { id, state: state? }
1094 }
1095 ManagementCommand::GetActorMetrics { id } => {
1096 info!("Getting metrics for actor: {:?}", id);
1097 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1098 runtime_tx
1099 .send(TheaterCommand::GetActorMetrics {
1100 actor_id: id,
1101 response_tx: cmd_tx,
1102 })
1103 .await?;
1104
1105 let metrics = cmd_rx.await?;
1106 ManagementResponse::ActorMetrics {
1107 id,
1108 metrics: serde_json::to_value(metrics?)?,
1109 }
1110 }
1111 ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1112 ManagementResponse::Error {
1114 error: ManagementError::RuntimeError(
1115 "UpdateActorPackage not yet implemented".to_string(),
1116 ),
1117 }
1118 }
1119 ManagementCommand::OpenChannel {
1121 actor_id,
1122 initial_message,
1123 } => {
1124 info!("Opening channel to actor: {:?}", actor_id);
1125
1126 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1128
1129 let client_id = ChannelParticipant::External;
1131 let channel_id = ChannelId::new(&client_id, &actor_id);
1132 let channel_id_str = channel_id.0.clone();
1133
1134 message_router
1136 .route_message(theater::messages::MessageCommand::OpenChannel {
1137 initiator_id: client_id.clone(),
1138 target_id: actor_id.clone(),
1139 channel_id: channel_id.clone(),
1140 initial_message,
1141 response_tx,
1142 })
1143 .await
1144 .map_err(|e| {
1145 anyhow::anyhow!("Failed to send channel open command: {}", e)
1146 })?;
1147
1148 match response_rx.await {
1150 Ok(result) => {
1151 match result {
1152 Ok(accepted) => {
1153 if accepted {
1154 info!("Channel opened successfully: {}", channel_id_str);
1156
1157 let channel_sub = ChannelSubscription {
1159 channel_id: channel_id_str.clone(),
1160 initiator_id: client_id.clone(),
1161 target_id: actor_id.clone(),
1162 client_tx: cmd_client_tx.clone(),
1163 };
1164
1165 channel_subscriptions
1166 .lock()
1167 .await
1168 .insert(channel_id_str.clone(), channel_sub);
1169
1170 connection_channel_subscriptions
1172 .push(channel_id_str.clone());
1173
1174 ManagementResponse::ChannelOpened {
1175 channel_id: channel_id_str,
1176 actor_id,
1177 }
1178 } else {
1179 ManagementResponse::Error {
1181 error: ManagementError::ChannelRejected,
1182 }
1183 }
1184 }
1185 Err(e) => ManagementResponse::Error {
1186 error: ManagementError::RuntimeError(format!(
1187 "Error opening channel: {}",
1188 e
1189 )),
1190 },
1191 }
1192 }
1193 Err(e) => ManagementResponse::Error {
1194 error: ManagementError::CommunicationError(format!(
1195 "Failed to receive channel open response: {}",
1196 e
1197 )),
1198 },
1199 }
1200 }
1201 ManagementCommand::SendOnChannel {
1202 channel_id,
1203 message,
1204 } => {
1205 info!("Sending message on channel: {}", channel_id);
1206
1207 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1209
1210 let channel_id_parsed = ChannelId(channel_id.clone());
1212
1213 let sender_id = ChannelParticipant::External;
1215 match message_router
1216 .route_message(theater::messages::MessageCommand::ChannelMessage {
1217 channel_id: channel_id_parsed,
1218 sender_id,
1219 message,
1220 response_tx,
1221 })
1222 .await
1223 {
1224 Ok(_) => {
1225 match response_rx.await {
1227 Ok(Ok(())) => {
1228 info!("Message sent successfully on channel: {}", channel_id);
1229 ManagementResponse::MessageSent { channel_id }
1230 }
1231 Ok(Err(e)) => {
1232 error!("Failed to send on channel: {}", e);
1233 ManagementResponse::Error {
1234 error: ManagementError::RuntimeError(format!(
1235 "Failed to send on channel: {}",
1236 e
1237 )),
1238 }
1239 }
1240 Err(e) => {
1241 error!("Failed to receive channel send response: {}", e);
1242 ManagementResponse::Error {
1243 error: ManagementError::CommunicationError(format!(
1244 "Failed to receive channel send response: {}",
1245 e
1246 )),
1247 }
1248 }
1249 }
1250 }
1251 Err(e) => {
1252 error!("Failed to route channel message: {}", e);
1253 ManagementResponse::Error {
1254 error: ManagementError::RuntimeError(format!(
1255 "Failed to route channel message: {}",
1256 e
1257 )),
1258 }
1259 }
1260 }
1261 }
1262 ManagementCommand::CloseChannel { channel_id } => {
1263 info!("Closing channel: {}", channel_id);
1264
1265 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1267
1268 let channel_id_parsed = ChannelId(channel_id.clone());
1270
1271 let sender_id = ChannelParticipant::External;
1273 match message_router
1274 .route_message(theater::messages::MessageCommand::ChannelClose {
1275 channel_id: channel_id_parsed,
1276 sender_id,
1277 response_tx,
1278 })
1279 .await
1280 {
1281 Ok(_) => {
1282 match response_rx.await {
1284 Ok(Ok(())) => {
1285 info!("Channel closed successfully: {}", channel_id);
1286
1287 channel_subscriptions.lock().await.remove(&channel_id);
1289 connection_channel_subscriptions.retain(|id| id != &channel_id);
1290
1291 ManagementResponse::ChannelClosed { channel_id }
1292 }
1293 Ok(Err(e)) => {
1294 error!("Failed to close channel: {}", e);
1295 ManagementResponse::Error {
1296 error: ManagementError::RuntimeError(format!(
1297 "Failed to close channel: {}",
1298 e
1299 )),
1300 }
1301 }
1302 Err(e) => {
1303 error!("Failed to receive channel close response: {}", e);
1304 ManagementResponse::Error {
1305 error: ManagementError::CommunicationError(format!(
1306 "Failed to receive channel close response: {}",
1307 e
1308 )),
1309 }
1310 }
1311 }
1312 }
1313 Err(e) => {
1314 error!("Failed to route channel close: {}", e);
1315 ManagementResponse::Error {
1316 error: ManagementError::RuntimeError(format!(
1317 "Failed to route channel close: {}",
1318 e
1319 )),
1320 }
1321 }
1322 }
1323 }
1324 ManagementCommand::NewStore {} => {
1325 info!("Creating new store");
1326 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1327 runtime_tx
1328 .send(TheaterCommand::NewStore {
1329 response_tx: cmd_tx,
1330 })
1331 .await?;
1332
1333 let store_id = cmd_rx.await?;
1334 ManagementResponse::StoreCreated {
1335 store_id: store_id?.id,
1336 }
1337 }
1338 };
1339
1340 debug!("Sending response: {:?}", response);
1341 if let Err(e) = client_tx.send(response).await {
1342 error!("Failed to send response: {}", e);
1343 break;
1344 }
1345 debug!("Response sent");
1346 }
1347
1348 debug!(
1350 "Connection closed, cleaning up {} subscriptions",
1351 connection_subscriptions.len()
1352 );
1353 let mut subs = subscriptions.lock().await;
1354
1355 for (actor_id, sub_id) in connection_subscriptions {
1356 if let Some(actor_subs) = subs.get_mut(&actor_id) {
1357 actor_subs.retain(|sub| sub.id != sub_id);
1358
1359 if actor_subs.is_empty() {
1361 subs.remove(&actor_id);
1362 }
1363 }
1364 }
1365
1366 debug!(
1368 "Connection closed, cleaning up {} channel subscriptions",
1369 connection_channel_subscriptions.len()
1370 );
1371 let mut channel_subs = channel_subscriptions.lock().await;
1372
1373 for channel_id in connection_channel_subscriptions {
1374 channel_subs.remove(&channel_id);
1375 }
1376
1377 debug!("Cleaned up all subscriptions for the connection");
1378 Ok(())
1379 }
1380}