1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9 ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10 ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21 RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{default_init_state, ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::{resolve_reference, ResourceCache};
28use theater::TheaterRuntimeError;
29
30use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42 StartActor {
43 manifest: String,
44 initial_state: Option<Vec<u8>>,
45 parent: bool,
46 subscribe: bool,
47 },
48 StopActor {
49 id: TheaterId,
50 },
51 TerminateActor {
52 id: TheaterId,
53 },
54 ListActors,
55 SubscribeToActor {
56 id: TheaterId,
57 },
58 UnsubscribeFromActor {
59 id: TheaterId,
60 subscription_id: Uuid,
61 },
62 SendActorMessage {
63 id: TheaterId,
64 data: Vec<u8>,
65 },
66 RequestActorMessage {
67 id: TheaterId,
68 data: Vec<u8>,
69 },
70 GetActorManifest {
71 id: TheaterId,
72 },
73 GetActorStatus {
74 id: TheaterId,
75 },
76 RestartActor {
77 id: TheaterId,
78 },
79 GetActorState {
80 id: TheaterId,
81 },
82 GetActorMetrics {
83 id: TheaterId,
84 },
85 UpdateActorPackage {
86 id: TheaterId,
87 package: String,
88 },
89 OpenChannel {
91 actor_id: ChannelParticipant,
92 initial_message: Vec<u8>,
93 },
94 SendOnChannel {
95 channel_id: String,
96 message: Vec<u8>,
97 },
98 CloseChannel {
99 channel_id: String,
100 },
101
102 NewStore {},
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[allow(clippy::large_enum_variant)]
108pub enum ManagementResponse {
109 ActorStarted {
110 id: TheaterId,
111 },
112 ActorStopped {
113 id: TheaterId,
114 },
115 ActorList {
116 actors: Vec<(TheaterId, String)>,
117 },
118 Subscribed {
119 id: TheaterId,
120 subscription_id: Uuid,
121 },
122 Unsubscribed {
123 id: TheaterId,
124 },
125 ActorEvent {
126 event: ChainEvent,
127 },
128 ActorResult(ActorResult),
129 Error {
130 error: ManagementError,
131 },
132 RequestedMessage {
133 id: TheaterId,
134 message: Vec<u8>,
135 },
136 SentMessage {
137 id: TheaterId,
138 },
139 ActorStatus {
140 id: TheaterId,
141 status: ActorStatus,
142 },
143 Restarted {
144 id: TheaterId,
145 },
146 ActorManifest {
147 id: TheaterId,
148 manifest: ManifestConfig,
149 },
150 ActorState {
151 id: TheaterId,
152 state: Value,
153 },
154 ActorMetrics {
155 id: TheaterId,
156 metrics: serde_json::Value,
157 },
158 ActorPackageUpdated {
159 id: TheaterId,
160 },
161 ChannelOpened {
163 channel_id: String,
164 actor_id: ChannelParticipant,
165 },
166 MessageSent {
167 channel_id: String,
168 },
169 ChannelMessage {
170 channel_id: String,
171 sender_id: ChannelParticipant,
172 message: Vec<u8>,
173 },
174 ChannelClosed {
175 channel_id: String,
176 },
177
178 StoreCreated {
180 store_id: String,
181 },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub enum ManagementError {
186 ActorNotFound,
188 ActorAlreadyExists,
189 ActorNotRunning,
190 ActorError(String),
191
192 ChannelNotFound,
194 ChannelClosed,
195 ChannelRejected,
196
197 StoreError(String),
199
200 CommunicationError(String),
202
203 InvalidRequest(String),
205 Timeout,
206
207 RuntimeError(String),
209 InternalError(String),
210
211 SerializationError(String),
213
214 ActorInitializationError(String),
216}
217
218impl From<TheaterRuntimeError> for ManagementError {
220 fn from(err: TheaterRuntimeError) -> Self {
221 match err {
222 TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
223 TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
224 TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
225 TheaterRuntimeError::ActorOperationFailed(msg) => {
226 ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
227 }
228 TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
229 TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
230 TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
231 TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
232 TheaterRuntimeError::SerializationError(msg) => {
233 ManagementError::SerializationError(msg)
234 }
235 TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
236 TheaterRuntimeError::ActorInitializationError(msg) => {
237 ManagementError::ActorInitializationError(msg)
238 }
239 }
240 }
241}
242
243impl std::fmt::Display for ManagementError {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 match self {
247 ManagementError::ActorNotFound => write!(f, "Actor not found"),
248 ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
249 ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
250 ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
251 ManagementError::ChannelNotFound => write!(f, "Channel not found"),
252 ManagementError::ChannelClosed => write!(f, "Channel is closed"),
253 ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
254 ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
255 ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
256 ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
257 ManagementError::Timeout => write!(f, "Operation timed out"),
258 ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
259 ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
260 ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
261 ManagementError::ActorInitializationError(msg) => {
262 write!(f, "Actor initialization error: {}", msg)
263 }
264 }
265 }
266}
267
268impl std::error::Error for ManagementError {}
270
271#[derive(Debug)]
272#[allow(dead_code)]
273struct Subscription {
274 id: Uuid,
275 client_tx: mpsc::Sender<ManagementResponse>,
276}
277
278impl Eq for Subscription {}
279impl PartialEq for Subscription {
280 fn eq(&self, other: &Self) -> bool {
281 self.id == other.id
282 }
283}
284impl std::hash::Hash for Subscription {
285 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
286 self.id.hash(state);
287 }
288}
289
290#[derive(Debug)]
294#[allow(dead_code)]
295struct ChannelSubscription {
296 channel_id: String,
297 initiator_id: ChannelParticipant,
298 target_id: ChannelParticipant,
299 client_tx: mpsc::Sender<ManagementResponse>,
300}
301
302fn create_root_handler_registry(
311 theater_tx: mpsc::Sender<TheaterCommand>,
312 resource_cache: Arc<ResourceCache>,
313) -> (
314 HandlerRegistry,
315 theater_handler_message_server::MessageRouter,
316) {
317 let mut registry = HandlerRegistry::new();
318
319 info!("Initializing Theater server with Theater-specific handlers...");
320
321 let runtime_config = RuntimeHostConfig {};
323 registry.register(RuntimeHandler::new(
324 runtime_config,
325 theater_tx.clone(),
326 None,
327 ));
328
329 let store_config = StoreHandlerConfig::default();
331 registry.register(StoreHandler::new(store_config, None));
332
333 let supervisor_config = SupervisorHostConfig {};
338 registry.register(
339 SupervisorHandler::new(supervisor_config, None).with_resource_cache(resource_cache),
340 );
341
342 let message_router = theater_handler_message_server::MessageRouter::new();
344 registry.register(MessageServerHandler::new(None, message_router.clone()));
345
346 let tcp_config = TcpHandlerConfig {
348 listen: None,
349 max_connections: None,
350 ..Default::default()
351 };
352 registry.register(TcpHandler::new(tcp_config));
353
354 info!("✓ 5 Theater-specific handlers registered");
355 info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
356
357 (registry, message_router)
358}
359
360pub struct TheaterServer {
361 runtime: TheaterRuntime,
362 theater_tx: mpsc::Sender<TheaterCommand>,
363 management_socket: TcpListener,
364 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
365 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
367 #[allow(dead_code)]
369 channel_events_tx: mpsc::Sender<ChannelEvent>,
370 message_router: theater_handler_message_server::MessageRouter,
372}
373
374impl TheaterServer {
375 async fn process_channel_events(
377 mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
378 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
379 ) {
380 while let Some(event) = channel_events_rx.recv().await {
381 match event {
382 ChannelEvent::Message {
383 channel_id,
384 sender_id,
385 message,
386 } => {
387 tracing::debug!("Received channel message for {}", channel_id);
388 let subs = channel_subscriptions.lock().await;
390 if let Some(sub) = subs.get(&channel_id.0) {
391 let response = ManagementResponse::ChannelMessage {
392 channel_id: channel_id.0.clone(),
393 sender_id,
394 message,
395 };
396
397 tracing::debug!("Forwarding channel message to client: {:?}", response);
398
399 if let Err(e) = sub.client_tx.send(response).await {
400 tracing::warn!("Failed to forward channel message: {}", e);
401 } else {
402 tracing::debug!("Forwarded channel message to client");
403 }
404 }
405 }
406 ChannelEvent::Close { channel_id } => {
407 tracing::debug!("Received channel close event for {}", channel_id);
408 let mut subs = channel_subscriptions.lock().await;
410 if let Some(sub) = subs.remove(&channel_id.0) {
411 let response = ManagementResponse::ChannelClosed {
412 channel_id: channel_id.0.clone(),
413 };
414
415 if let Err(e) = sub.client_tx.send(response).await {
416 tracing::warn!("Failed to forward channel close event: {}", e);
417 } else {
418 tracing::debug!("Forwarded channel close event to client");
419 }
420 }
421 }
422 }
423 }
424 }
425
426 pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
427 let (theater_tx, theater_rx) = mpsc::channel(32);
428
429 let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
431
432 let resource_cache = Arc::new(ResourceCache::new());
444
445 let (handler_registry, message_router) =
448 create_root_handler_registry(theater_tx.clone(), resource_cache.clone());
449
450 let runtime = TheaterRuntime::new(
452 theater_tx.clone(),
453 theater_rx,
454 Some(channel_events_tx.clone()),
455 handler_registry,
456 )
457 .await?;
458 let management_socket = TcpListener::bind(address).await?;
459
460 let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
461
462 let channel_subs_clone = channel_subscriptions.clone();
464 tokio::spawn(async move {
465 Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
466 });
467
468 Ok(Self {
469 runtime,
470 theater_tx,
471 management_socket,
472 subscriptions: Arc::new(Mutex::new(HashMap::new())),
473 channel_subscriptions,
474 channel_events_tx,
475 message_router,
476 })
477 }
478
479 pub async fn run(mut self) -> Result<()> {
480 info!(
481 "Theater server starting on {:?}",
482 self.management_socket.local_addr()?
483 );
484
485 let runtime_handle = tokio::spawn(async move {
487 match self.runtime.run().await {
488 Ok(_) => Ok(()),
489 Err(e) => {
490 error!("Theater runtime failed: {}", e);
491 Err(e)
492 }
493 }
494 });
495
496 while let Ok((socket, addr)) = self.management_socket.accept().await {
498 info!("New management connection from {}", addr);
499 let runtime_tx = self.theater_tx.clone();
500 let subscriptions = self.subscriptions.clone();
501 let channel_subscriptions = self.channel_subscriptions.clone();
502 let message_router = self.message_router.clone();
503
504 tokio::spawn(async move {
505 if let Err(e) = Self::handle_management_connection(
506 socket,
507 runtime_tx,
508 subscriptions,
509 channel_subscriptions,
510 message_router,
511 )
512 .await
513 {
514 error!("Error handling management connection: {}", e);
515 }
516 });
517 }
518
519 runtime_handle.await??;
520 Ok(())
521 }
522
523 async fn handle_management_connection(
524 socket: TcpStream,
525 runtime_tx: mpsc::Sender<TheaterCommand>,
526 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
527 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
528 message_router: theater_handler_message_server::MessageRouter,
529 ) -> Result<()> {
530 let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
532
533 let codec = FragmentingCodec::new();
534 let framed = Framed::new(socket, codec);
535
536 let (mut framed_sink, mut framed_stream) = framed.split();
538
539 let cmd_client_tx = client_tx.clone();
541
542 let _response_task = tokio::spawn(async move {
544 while let Some(response) = client_rx.recv().await {
545 match serde_json::to_vec(&response) {
546 Ok(data) => {
547 debug!("Serialized response: {} bytes", data.len());
548 if data.len() > 10 * 1024 * 1024 {
549 debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
550 }
551 if let Err(e) = framed_sink.send(Bytes::from(data)).await {
552 debug!("Error sending response to client: {}", e);
553 break;
554 }
555 }
556 Err(e) => {
557 error!("Error serializing response: {}", e);
558 }
559 }
560 }
561 debug!("Response forwarder for client closed");
562 });
563
564 let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
566
567 let mut connection_channel_subscriptions: Vec<String> = Vec::new();
569
570 'connection: while let Some(msg) = framed_stream.next().await {
572 debug!("Received management message");
573 let msg = match msg {
574 Ok(m) => m,
575 Err(e) => {
576 error!("Error receiving message: {}", e);
577 break 'connection;
578 }
579 };
580
581 let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
582 Ok(c) => c,
583 Err(e) => {
584 error!(
585 "Error parsing command: {} {}",
586 e,
587 String::from_utf8_lossy(&msg)
588 );
589 continue;
590 }
591 };
592 debug!("Parsed command: {:?}", cmd);
593
594 let _cmd_clone = cmd.clone();
596
597 let response = match cmd {
598 ManagementCommand::StartActor {
599 manifest,
600 initial_state: _initial_state,
601 parent,
602 subscribe,
603 } => {
604 info!("Starting actor from manifest: {:?}", manifest);
605
606 let manifest_str = match resolve_reference(&manifest).await {
608 Ok(bytes) => match String::from_utf8(bytes) {
609 Ok(s) => s,
610 Err(e) => {
611 error!("Invalid manifest encoding: {}", e);
612 cmd_client_tx
613 .send(ManagementResponse::Error {
614 error: ManagementError::ActorInitializationError(format!(
615 "Invalid manifest encoding: {}",
616 e
617 )),
618 })
619 .await
620 .ok();
621 continue;
622 }
623 },
624 Err(e) => {
625 error!("Failed to load manifest: {}", e);
626 cmd_client_tx
627 .send(ManagementResponse::Error {
628 error: ManagementError::ActorInitializationError(format!(
629 "Failed to load manifest: {}",
630 e
631 )),
632 })
633 .await
634 .ok();
635 continue;
636 }
637 };
638
639 let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
640 Ok(m) => m,
641 Err(e) => {
642 error!("Failed to parse manifest: {}", e);
643 cmd_client_tx
644 .send(ManagementResponse::Error {
645 error: ManagementError::ActorInitializationError(format!(
646 "Failed to parse manifest: {}",
647 e
648 )),
649 })
650 .await
651 .ok();
652 continue;
653 }
654 };
655
656 let wasm_bytes = match resolve_reference(&manifest_config.package).await {
658 Ok(bytes) => bytes,
659 Err(e) => {
660 error!("Failed to load WASM: {}", e);
661 cmd_client_tx
662 .send(ManagementResponse::Error {
663 error: ManagementError::ActorInitializationError(format!(
664 "Failed to load WASM: {}",
665 e
666 )),
667 })
668 .await
669 .ok();
670 continue;
671 }
672 };
673
674 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
675 debug!("Sending SpawnActor command to runtime");
676 let supervisor_tx = if parent {
677 let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
678 let cmd_client_tx = cmd_client_tx.clone();
679 tokio::spawn(async move {
680 while let Some(res) = supervisor_rx.recv().await {
681 debug!("Received supervisor response: {:?}", res);
682 if let Err(e) = cmd_client_tx
683 .send(ManagementResponse::ActorResult(res))
684 .await
685 {
686 error!("Failed to send supervisor response: {}", e);
687 break;
688 }
689 }
690 });
691 Some(supervisor_tx)
692 } else {
693 None
694 };
695 let subscription_tx = if subscribe {
696 let (event_tx, mut event_rx) = mpsc::channel(32);
697
698 let cmd_client_tx = cmd_client_tx.clone();
700 tokio::spawn(async move {
701 while let Some((_actor_id, event)) = event_rx.recv().await {
702 debug!("Received event for subscription");
703 let response = ManagementResponse::ActorEvent { event };
704 if let Err(e) = cmd_client_tx.send(response).await {
705 debug!("Failed to forward event to client: {}", e);
706 break;
707 }
708 }
709 debug!("Event forwarder for subscription stopped");
710 });
711
712 Some(event_tx)
713 } else {
714 None
715 };
716 match runtime_tx
717 .send(TheaterCommand::SetupActor {
718 wasm_bytes,
719 name: Some(manifest_config.name.clone()),
720 manifest: Some(manifest_config),
721 init_state: default_init_state(),
722 response_tx: cmd_tx,
723 supervisor_tx,
724 subscription_tx,
725 })
726 .await
727 {
728 Ok(_) => {
729 debug!("SpawnActor command sent to runtime, awaiting response");
730 match cmd_rx.await {
731 Ok(result) => match result {
732 Ok(actor_id) => {
733 info!("Actor started with ID: {:?}", actor_id);
734 ManagementResponse::ActorStarted { id: actor_id }
735 }
736 Err(e) => {
737 error!("Runtime failed to start actor: {}", e);
738 ManagementResponse::Error {
739 error: ManagementError::RuntimeError(format!(
740 "Failed to start actor: {}",
741 e
742 )),
743 }
744 }
745 },
746 Err(e) => {
747 error!("Failed to receive spawn response: {}", e);
748 ManagementResponse::Error {
749 error: ManagementError::CommunicationError(format!(
750 "Failed to receive spawn response: {}",
751 e
752 )),
753 }
754 }
755 }
756 }
757 Err(e) => {
758 error!("Failed to send SpawnActor command: {}", e);
759 ManagementResponse::Error {
760 error: ManagementError::CommunicationError(format!(
761 "Failed to send spawn command: {}",
762 e
763 )),
764 }
765 }
766 }
767 }
768 ManagementCommand::StopActor { id } => {
769 info!("Stopping actor: {:?}", id);
770 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
771 runtime_tx
772 .send(TheaterCommand::StopActor {
773 actor_id: id,
774 response_tx: cmd_tx,
775 })
776 .await?;
777
778 match cmd_rx.await? {
779 Ok(_) => {
780 subscriptions.lock().await.remove(&id);
781 ManagementResponse::ActorStopped { id }
782 }
783 Err(e) => ManagementResponse::Error {
784 error: ManagementError::RuntimeError(format!(
785 "Failed to stop actor: {}",
786 e
787 )),
788 },
789 }
790 }
791 ManagementCommand::TerminateActor { id } => {
792 info!("Terminating actor: {:?}", id);
793 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
794 runtime_tx
795 .send(TheaterCommand::TerminateActor {
796 actor_id: id,
797 response_tx: cmd_tx,
798 })
799 .await?;
800
801 match cmd_rx.await? {
802 Ok(_) => {
803 subscriptions.lock().await.remove(&id);
804 ManagementResponse::ActorStopped { id }
805 }
806 Err(e) => ManagementResponse::Error {
807 error: ManagementError::RuntimeError(format!(
808 "Failed to terminate actor: {}",
809 e
810 )),
811 },
812 }
813 }
814 ManagementCommand::ListActors => {
815 debug!("Listing actors");
816 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
817 runtime_tx
818 .send(TheaterCommand::GetActors {
819 response_tx: cmd_tx,
820 })
821 .await?;
822
823 match cmd_rx.await? {
824 Ok(actors) => {
825 info!("Found {} actors", actors.len());
826 ManagementResponse::ActorList { actors }
827 }
828 Err(e) => ManagementResponse::Error {
829 error: ManagementError::RuntimeError(format!(
830 "Failed to list actors: {}",
831 e
832 )),
833 },
834 }
835 }
836 ManagementCommand::SubscribeToActor { id } => {
837 info!("New subscription request for actor: {:?}", id);
838 let subscription_id = Uuid::new_v4();
839 let subscription = Subscription {
840 id: subscription_id,
841 client_tx: cmd_client_tx.clone(),
842 };
843
844 debug!("Subscription created with ID: {}", subscription_id);
845
846 subscriptions
848 .lock()
849 .await
850 .entry(id)
851 .or_default()
852 .insert(subscription);
853
854 let (event_tx, mut event_rx) = mpsc::channel(32);
856 runtime_tx
857 .send(TheaterCommand::SubscribeToActor {
858 actor_id: id,
859 event_tx,
860 })
861 .await
862 .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
863
864 connection_subscriptions.push((id, subscription_id));
866
867 let client_tx_clone = cmd_client_tx.clone();
869 tokio::spawn(async move {
870 debug!(
871 "Starting event forwarder for subscription {}",
872 subscription_id
873 );
874 while let Some((_actor_id, event)) = event_rx.recv().await {
875 debug!("Received event for subscription {}", subscription_id);
876 let response = ManagementResponse::ActorEvent { event };
877 if let Err(e) = client_tx_clone.send(response).await {
878 debug!("Failed to forward event to client: {}", e);
879 break;
880 }
881 }
882 debug!(
883 "Event forwarder for subscription {} stopped",
884 subscription_id
885 );
886 });
887
888 ManagementResponse::Subscribed {
889 id,
890 subscription_id,
891 }
892 }
893 ManagementCommand::UnsubscribeFromActor {
894 id,
895 subscription_id,
896 } => {
897 debug!(
898 "Removing subscription {} for actor {:?}",
899 subscription_id, id
900 );
901
902 connection_subscriptions
904 .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
905
906 let mut subs = subscriptions.lock().await;
908 if let Some(actor_subs) = subs.get_mut(&id) {
909 actor_subs.retain(|sub| sub.id != subscription_id);
910
911 if actor_subs.is_empty() {
913 subs.remove(&id);
914 }
915 }
916
917 debug!("Subscription removed");
918 ManagementResponse::Unsubscribed { id }
919 }
920 ManagementCommand::SendActorMessage { id, data } => {
921 info!("Sending message to actor: {:?}", id);
922
923 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
925
926 let message = ActorMessage::Send(ActorSend { data });
928
929 match message_router
931 .route_message(theater::messages::MessageCommand::SendMessage {
932 target_id: id,
933 message,
934 response_tx,
935 })
936 .await
937 {
938 Ok(_) => {
939 match response_rx.await {
941 Ok(Ok(())) => {
942 info!("Message sent successfully to actor: {:?}", id);
943 ManagementResponse::SentMessage { id }
944 }
945 Ok(Err(e)) => {
946 error!("Failed to send message to actor: {}", e);
947 ManagementResponse::Error {
948 error: ManagementError::RuntimeError(format!(
949 "Failed to send: {}",
950 e
951 )),
952 }
953 }
954 Err(e) => {
955 error!("Failed to receive routing response: {}", e);
956 ManagementResponse::Error {
957 error: ManagementError::CommunicationError(format!(
958 "Failed to receive routing response: {}",
959 e
960 )),
961 }
962 }
963 }
964 }
965 Err(e) => {
966 error!("Failed to route message: {}", e);
967 ManagementResponse::Error {
968 error: ManagementError::RuntimeError(format!(
969 "Failed to route message: {}",
970 e
971 )),
972 }
973 }
974 }
975 }
976 ManagementCommand::RequestActorMessage { id, data } => {
977 info!("Requesting message from actor: {:?}", id);
978
979 let (route_tx, route_rx) = tokio::sync::oneshot::channel();
981 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
982
983 let message = ActorMessage::Request(ActorRequest { data, response_tx });
985
986 match message_router
988 .route_message(theater::messages::MessageCommand::SendMessage {
989 target_id: id,
990 message,
991 response_tx: route_tx,
992 })
993 .await
994 {
995 Ok(_) => {
996 match route_rx.await {
998 Ok(Ok(())) => {
999 match response_rx.await {
1001 Ok(response_data) => {
1002 info!("Received response from actor: {:?}", id);
1003 ManagementResponse::RequestedMessage {
1004 id,
1005 message: response_data,
1006 }
1007 }
1008 Err(e) => {
1009 error!("Actor didn't respond: {}", e);
1010 ManagementResponse::Error {
1011 error: ManagementError::RuntimeError(format!(
1012 "Actor didn't respond: {}",
1013 e
1014 )),
1015 }
1016 }
1017 }
1018 }
1019 Ok(Err(e)) => {
1020 error!("Failed to route request to actor: {}", e);
1021 ManagementResponse::Error {
1022 error: ManagementError::RuntimeError(format!(
1023 "Failed to route: {}",
1024 e
1025 )),
1026 }
1027 }
1028 Err(e) => {
1029 error!("Failed to receive routing response: {}", e);
1030 ManagementResponse::Error {
1031 error: ManagementError::CommunicationError(format!(
1032 "Failed to receive routing response: {}",
1033 e
1034 )),
1035 }
1036 }
1037 }
1038 }
1039 Err(e) => {
1040 error!("Failed to route request: {}", e);
1041 ManagementResponse::Error {
1042 error: ManagementError::RuntimeError(format!(
1043 "Failed to route request: {}",
1044 e
1045 )),
1046 }
1047 }
1048 }
1049 }
1050 ManagementCommand::GetActorManifest { id } => {
1051 info!("Getting manifest for actor: {:?}", id);
1052 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1053 runtime_tx
1054 .send(TheaterCommand::GetActorManifest {
1055 actor_id: id,
1056 response_tx: cmd_tx,
1057 })
1058 .await?;
1059
1060 let manifest = cmd_rx.await?;
1061 ManagementResponse::ActorManifest {
1062 id,
1063 manifest: manifest?,
1064 }
1065 }
1066 ManagementCommand::GetActorStatus { id } => {
1067 info!("Getting status for actor: {:?}", id);
1068 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1069 runtime_tx
1070 .send(TheaterCommand::GetActorStatus {
1071 actor_id: id,
1072 response_tx: cmd_tx,
1073 })
1074 .await?;
1075
1076 let status = cmd_rx.await?;
1077 ManagementResponse::ActorStatus {
1078 id,
1079 status: status?,
1080 }
1081 }
1082 ManagementCommand::RestartActor { id } => {
1083 info!("Restarting actor: {:?}", id);
1084 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1085 runtime_tx
1086 .send(TheaterCommand::RestartActor {
1087 actor_id: id,
1088 response_tx: cmd_tx,
1089 })
1090 .await?;
1091
1092 match cmd_rx.await? {
1093 Ok(_) => ManagementResponse::Restarted { id },
1094 Err(e) => ManagementResponse::Error {
1095 error: ManagementError::RuntimeError(format!(
1096 "Failed to restart actor: {}",
1097 e
1098 )),
1099 },
1100 }
1101 }
1102 ManagementCommand::GetActorState { id } => {
1103 info!("Getting state for actor: {:?}", id);
1104 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1105 runtime_tx
1106 .send(TheaterCommand::GetActorState {
1107 actor_id: id,
1108 response_tx: cmd_tx,
1109 })
1110 .await?;
1111
1112 let state = cmd_rx.await?;
1113 ManagementResponse::ActorState { id, state: state? }
1114 }
1115 ManagementCommand::GetActorMetrics { id } => {
1116 info!("Getting metrics for actor: {:?}", id);
1117 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1118 runtime_tx
1119 .send(TheaterCommand::GetActorMetrics {
1120 actor_id: id,
1121 response_tx: cmd_tx,
1122 })
1123 .await?;
1124
1125 let metrics = cmd_rx.await?;
1126 ManagementResponse::ActorMetrics {
1127 id,
1128 metrics: serde_json::to_value(metrics?)?,
1129 }
1130 }
1131 ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1132 ManagementResponse::Error {
1134 error: ManagementError::RuntimeError(
1135 "UpdateActorPackage not yet implemented".to_string(),
1136 ),
1137 }
1138 }
1139 ManagementCommand::OpenChannel {
1141 actor_id,
1142 initial_message,
1143 } => {
1144 info!("Opening channel to actor: {:?}", actor_id);
1145
1146 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1148
1149 let client_id = ChannelParticipant::External;
1151 let channel_id = ChannelId::new(&client_id, &actor_id);
1152 let channel_id_str = channel_id.0.clone();
1153
1154 message_router
1156 .route_message(theater::messages::MessageCommand::OpenChannel {
1157 initiator_id: client_id.clone(),
1158 target_id: actor_id.clone(),
1159 channel_id: channel_id.clone(),
1160 initial_message,
1161 response_tx,
1162 })
1163 .await
1164 .map_err(|e| {
1165 anyhow::anyhow!("Failed to send channel open command: {}", e)
1166 })?;
1167
1168 match response_rx.await {
1170 Ok(result) => {
1171 match result {
1172 Ok(accepted) => {
1173 if accepted {
1174 info!("Channel opened successfully: {}", channel_id_str);
1176
1177 let channel_sub = ChannelSubscription {
1179 channel_id: channel_id_str.clone(),
1180 initiator_id: client_id.clone(),
1181 target_id: actor_id.clone(),
1182 client_tx: cmd_client_tx.clone(),
1183 };
1184
1185 channel_subscriptions
1186 .lock()
1187 .await
1188 .insert(channel_id_str.clone(), channel_sub);
1189
1190 connection_channel_subscriptions
1192 .push(channel_id_str.clone());
1193
1194 ManagementResponse::ChannelOpened {
1195 channel_id: channel_id_str,
1196 actor_id,
1197 }
1198 } else {
1199 ManagementResponse::Error {
1201 error: ManagementError::ChannelRejected,
1202 }
1203 }
1204 }
1205 Err(e) => ManagementResponse::Error {
1206 error: ManagementError::RuntimeError(format!(
1207 "Error opening channel: {}",
1208 e
1209 )),
1210 },
1211 }
1212 }
1213 Err(e) => ManagementResponse::Error {
1214 error: ManagementError::CommunicationError(format!(
1215 "Failed to receive channel open response: {}",
1216 e
1217 )),
1218 },
1219 }
1220 }
1221 ManagementCommand::SendOnChannel {
1222 channel_id,
1223 message,
1224 } => {
1225 info!("Sending message on channel: {}", channel_id);
1226
1227 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1229
1230 let channel_id_parsed = ChannelId(channel_id.clone());
1232
1233 let sender_id = ChannelParticipant::External;
1235 match message_router
1236 .route_message(theater::messages::MessageCommand::ChannelMessage {
1237 channel_id: channel_id_parsed,
1238 sender_id,
1239 message,
1240 response_tx,
1241 })
1242 .await
1243 {
1244 Ok(_) => {
1245 match response_rx.await {
1247 Ok(Ok(())) => {
1248 info!("Message sent successfully on channel: {}", channel_id);
1249 ManagementResponse::MessageSent { channel_id }
1250 }
1251 Ok(Err(e)) => {
1252 error!("Failed to send on channel: {}", e);
1253 ManagementResponse::Error {
1254 error: ManagementError::RuntimeError(format!(
1255 "Failed to send on channel: {}",
1256 e
1257 )),
1258 }
1259 }
1260 Err(e) => {
1261 error!("Failed to receive channel send response: {}", e);
1262 ManagementResponse::Error {
1263 error: ManagementError::CommunicationError(format!(
1264 "Failed to receive channel send response: {}",
1265 e
1266 )),
1267 }
1268 }
1269 }
1270 }
1271 Err(e) => {
1272 error!("Failed to route channel message: {}", e);
1273 ManagementResponse::Error {
1274 error: ManagementError::RuntimeError(format!(
1275 "Failed to route channel message: {}",
1276 e
1277 )),
1278 }
1279 }
1280 }
1281 }
1282 ManagementCommand::CloseChannel { channel_id } => {
1283 info!("Closing channel: {}", channel_id);
1284
1285 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1287
1288 let channel_id_parsed = ChannelId(channel_id.clone());
1290
1291 let sender_id = ChannelParticipant::External;
1293 match message_router
1294 .route_message(theater::messages::MessageCommand::ChannelClose {
1295 channel_id: channel_id_parsed,
1296 sender_id,
1297 response_tx,
1298 })
1299 .await
1300 {
1301 Ok(_) => {
1302 match response_rx.await {
1304 Ok(Ok(())) => {
1305 info!("Channel closed successfully: {}", channel_id);
1306
1307 channel_subscriptions.lock().await.remove(&channel_id);
1309 connection_channel_subscriptions.retain(|id| id != &channel_id);
1310
1311 ManagementResponse::ChannelClosed { channel_id }
1312 }
1313 Ok(Err(e)) => {
1314 error!("Failed to close channel: {}", e);
1315 ManagementResponse::Error {
1316 error: ManagementError::RuntimeError(format!(
1317 "Failed to close channel: {}",
1318 e
1319 )),
1320 }
1321 }
1322 Err(e) => {
1323 error!("Failed to receive channel close response: {}", e);
1324 ManagementResponse::Error {
1325 error: ManagementError::CommunicationError(format!(
1326 "Failed to receive channel close response: {}",
1327 e
1328 )),
1329 }
1330 }
1331 }
1332 }
1333 Err(e) => {
1334 error!("Failed to route channel close: {}", e);
1335 ManagementResponse::Error {
1336 error: ManagementError::RuntimeError(format!(
1337 "Failed to route channel close: {}",
1338 e
1339 )),
1340 }
1341 }
1342 }
1343 }
1344 ManagementCommand::NewStore {} => {
1345 info!("Creating new store");
1346 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1347 runtime_tx
1348 .send(TheaterCommand::NewStore {
1349 response_tx: cmd_tx,
1350 })
1351 .await?;
1352
1353 let store_id = cmd_rx.await?;
1354 ManagementResponse::StoreCreated {
1355 store_id: store_id?.id,
1356 }
1357 }
1358 };
1359
1360 debug!("Sending response: {:?}", response);
1361 if let Err(e) = client_tx.send(response).await {
1362 error!("Failed to send response: {}", e);
1363 break;
1364 }
1365 debug!("Response sent");
1366 }
1367
1368 debug!(
1370 "Connection closed, cleaning up {} subscriptions",
1371 connection_subscriptions.len()
1372 );
1373 let mut subs = subscriptions.lock().await;
1374
1375 for (actor_id, sub_id) in connection_subscriptions {
1376 if let Some(actor_subs) = subs.get_mut(&actor_id) {
1377 actor_subs.retain(|sub| sub.id != sub_id);
1378
1379 if actor_subs.is_empty() {
1381 subs.remove(&actor_id);
1382 }
1383 }
1384 }
1385
1386 debug!(
1388 "Connection closed, cleaning up {} channel subscriptions",
1389 connection_channel_subscriptions.len()
1390 );
1391 let mut channel_subs = channel_subscriptions.lock().await;
1392
1393 for channel_id in connection_channel_subscriptions {
1394 channel_subs.remove(&channel_id);
1395 }
1396
1397 debug!("Cleaned up all subscriptions for the connection");
1398 Ok(())
1399 }
1400}