1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9 ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10 ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21 RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{default_init_state, ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::{resolve_reference, resolve_reference_cached, ResourceCache};
28use theater::TheaterRuntimeError;
29
30use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42 StartActor {
43 manifest: String,
44 initial_state: Option<Vec<u8>>,
45 parent: bool,
46 subscribe: bool,
47 },
48 StopActor {
49 id: TheaterId,
50 },
51 TerminateActor {
52 id: TheaterId,
53 },
54 ListActors,
55 SubscribeToActor {
56 id: TheaterId,
57 },
58 UnsubscribeFromActor {
59 id: TheaterId,
60 subscription_id: Uuid,
61 },
62 SendActorMessage {
63 id: TheaterId,
64 data: Vec<u8>,
65 },
66 RequestActorMessage {
67 id: TheaterId,
68 data: Vec<u8>,
69 },
70 GetActorManifest {
71 id: TheaterId,
72 },
73 GetActorStatus {
74 id: TheaterId,
75 },
76 RestartActor {
77 id: TheaterId,
78 },
79 GetActorState {
80 id: TheaterId,
81 },
82 GetActorMetrics {
83 id: TheaterId,
84 },
85 UpdateActorPackage {
86 id: TheaterId,
87 package: String,
88 },
89 OpenChannel {
91 actor_id: ChannelParticipant,
92 initial_message: Vec<u8>,
93 },
94 SendOnChannel {
95 channel_id: String,
96 message: Vec<u8>,
97 },
98 CloseChannel {
99 channel_id: String,
100 },
101
102 NewStore {},
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[allow(clippy::large_enum_variant)]
108pub enum ManagementResponse {
109 ActorStarted {
110 id: TheaterId,
111 },
112 ActorStopped {
113 id: TheaterId,
114 },
115 ActorList {
116 actors: Vec<(TheaterId, String)>,
117 },
118 Subscribed {
119 id: TheaterId,
120 subscription_id: Uuid,
121 },
122 Unsubscribed {
123 id: TheaterId,
124 },
125 ActorEvent {
126 event: ChainEvent,
127 },
128 ActorResult(ActorResult),
129 Error {
130 error: ManagementError,
131 },
132 RequestedMessage {
133 id: TheaterId,
134 message: Vec<u8>,
135 },
136 SentMessage {
137 id: TheaterId,
138 },
139 ActorStatus {
140 id: TheaterId,
141 status: ActorStatus,
142 },
143 Restarted {
144 id: TheaterId,
145 },
146 ActorManifest {
147 id: TheaterId,
148 manifest: ManifestConfig,
149 },
150 ActorState {
151 id: TheaterId,
152 state: Value,
153 },
154 ActorMetrics {
155 id: TheaterId,
156 metrics: serde_json::Value,
157 },
158 ActorPackageUpdated {
159 id: TheaterId,
160 },
161 ChannelOpened {
163 channel_id: String,
164 actor_id: ChannelParticipant,
165 },
166 MessageSent {
167 channel_id: String,
168 },
169 ChannelMessage {
170 channel_id: String,
171 sender_id: ChannelParticipant,
172 message: Vec<u8>,
173 },
174 ChannelClosed {
175 channel_id: String,
176 },
177
178 StoreCreated {
180 store_id: String,
181 },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub enum ManagementError {
186 ActorNotFound,
188 ActorAlreadyExists,
189 ActorNotRunning,
190 ActorError(String),
191
192 ChannelNotFound,
194 ChannelClosed,
195 ChannelRejected,
196
197 StoreError(String),
199
200 CommunicationError(String),
202
203 InvalidRequest(String),
205 Timeout,
206
207 RuntimeError(String),
209 InternalError(String),
210
211 SerializationError(String),
213
214 ActorInitializationError(String),
216}
217
218impl From<TheaterRuntimeError> for ManagementError {
220 fn from(err: TheaterRuntimeError) -> Self {
221 match err {
222 TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
223 TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
224 TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
225 TheaterRuntimeError::ActorOperationFailed(msg) => {
226 ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
227 }
228 TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
229 TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
230 TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
231 TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
232 TheaterRuntimeError::SerializationError(msg) => {
233 ManagementError::SerializationError(msg)
234 }
235 TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
236 TheaterRuntimeError::ActorInitializationError(msg) => {
237 ManagementError::ActorInitializationError(msg)
238 }
239 }
240 }
241}
242
243impl std::fmt::Display for ManagementError {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 match self {
247 ManagementError::ActorNotFound => write!(f, "Actor not found"),
248 ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
249 ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
250 ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
251 ManagementError::ChannelNotFound => write!(f, "Channel not found"),
252 ManagementError::ChannelClosed => write!(f, "Channel is closed"),
253 ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
254 ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
255 ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
256 ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
257 ManagementError::Timeout => write!(f, "Operation timed out"),
258 ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
259 ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
260 ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
261 ManagementError::ActorInitializationError(msg) => {
262 write!(f, "Actor initialization error: {}", msg)
263 }
264 }
265 }
266}
267
268impl std::error::Error for ManagementError {}
270
271#[derive(Debug)]
272#[allow(dead_code)]
273struct Subscription {
274 id: Uuid,
275 client_tx: mpsc::Sender<ManagementResponse>,
276}
277
278impl Eq for Subscription {}
279impl PartialEq for Subscription {
280 fn eq(&self, other: &Self) -> bool {
281 self.id == other.id
282 }
283}
284impl std::hash::Hash for Subscription {
285 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
286 self.id.hash(state);
287 }
288}
289
290#[derive(Debug)]
294#[allow(dead_code)]
295struct ChannelSubscription {
296 channel_id: String,
297 initiator_id: ChannelParticipant,
298 target_id: ChannelParticipant,
299 client_tx: mpsc::Sender<ManagementResponse>,
300}
301
302fn create_root_handler_registry(
311 theater_tx: mpsc::Sender<TheaterCommand>,
312 resource_cache: Arc<ResourceCache>,
313) -> (
314 HandlerRegistry,
315 theater_handler_message_server::MessageRouter,
316) {
317 let mut registry = HandlerRegistry::new();
318
319 info!("Initializing Theater server with Theater-specific handlers...");
320
321 let runtime_config = RuntimeHostConfig {};
323 registry.register(RuntimeHandler::new(
324 runtime_config,
325 theater_tx.clone(),
326 None,
327 ));
328
329 let store_config = StoreHandlerConfig::default();
331 registry.register(StoreHandler::new(store_config, None));
332
333 let supervisor_config = SupervisorHostConfig {};
338 registry.register(
339 SupervisorHandler::new(supervisor_config, None).with_resource_cache(resource_cache),
340 );
341
342 let message_router = theater_handler_message_server::MessageRouter::new();
344 registry.register(MessageServerHandler::new(None, message_router.clone()));
345
346 let tcp_config = TcpHandlerConfig {
348 listen: None,
349 max_connections: None,
350 ..Default::default()
351 };
352 registry.register(TcpHandler::new(tcp_config));
353
354 info!("✓ 5 Theater-specific handlers registered");
355 info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
356
357 (registry, message_router)
358}
359
360pub struct TheaterServer {
361 runtime: TheaterRuntime,
362 theater_tx: mpsc::Sender<TheaterCommand>,
363 management_socket: TcpListener,
364 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
365 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
367 #[allow(dead_code)]
369 channel_events_tx: mpsc::Sender<ChannelEvent>,
370 message_router: theater_handler_message_server::MessageRouter,
372}
373
374impl TheaterServer {
375 async fn process_channel_events(
377 mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
378 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
379 ) {
380 while let Some(event) = channel_events_rx.recv().await {
381 match event {
382 ChannelEvent::Message {
383 channel_id,
384 sender_id,
385 message,
386 } => {
387 tracing::debug!("Received channel message for {}", channel_id);
388 let subs = channel_subscriptions.lock().await;
390 if let Some(sub) = subs.get(&channel_id.0) {
391 let response = ManagementResponse::ChannelMessage {
392 channel_id: channel_id.0.clone(),
393 sender_id,
394 message,
395 };
396
397 tracing::debug!("Forwarding channel message to client: {:?}", response);
398
399 if let Err(e) = sub.client_tx.send(response).await {
400 tracing::warn!("Failed to forward channel message: {}", e);
401 } else {
402 tracing::debug!("Forwarded channel message to client");
403 }
404 }
405 }
406 ChannelEvent::Close { channel_id } => {
407 tracing::debug!("Received channel close event for {}", channel_id);
408 let mut subs = channel_subscriptions.lock().await;
410 if let Some(sub) = subs.remove(&channel_id.0) {
411 let response = ManagementResponse::ChannelClosed {
412 channel_id: channel_id.0.clone(),
413 };
414
415 if let Err(e) = sub.client_tx.send(response).await {
416 tracing::warn!("Failed to forward channel close event: {}", e);
417 } else {
418 tracing::debug!("Forwarded channel close event to client");
419 }
420 }
421 }
422 }
423 }
424 }
425
426 pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
427 let (theater_tx, theater_rx) = mpsc::channel(32);
428
429 let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
431
432 let resource_cache = Arc::new(ResourceCache::new());
440
441 let (handler_registry, message_router) =
444 create_root_handler_registry(theater_tx.clone(), resource_cache.clone());
445
446 let runtime = TheaterRuntime::new(
448 theater_tx.clone(),
449 theater_rx,
450 Some(channel_events_tx.clone()),
451 handler_registry,
452 resource_cache,
453 )
454 .await?;
455 let management_socket = TcpListener::bind(address).await?;
456
457 let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
458
459 let channel_subs_clone = channel_subscriptions.clone();
461 tokio::spawn(async move {
462 Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
463 });
464
465 Ok(Self {
466 runtime,
467 theater_tx,
468 management_socket,
469 subscriptions: Arc::new(Mutex::new(HashMap::new())),
470 channel_subscriptions,
471 channel_events_tx,
472 message_router,
473 })
474 }
475
476 pub async fn run(mut self) -> Result<()> {
477 info!(
478 "Theater server starting on {:?}",
479 self.management_socket.local_addr()?
480 );
481
482 let resource_cache = self.runtime.resource_cache().clone();
484
485 let runtime_handle = tokio::spawn(async move {
487 match self.runtime.run().await {
488 Ok(_) => Ok(()),
489 Err(e) => {
490 error!("Theater runtime failed: {}", e);
491 Err(e)
492 }
493 }
494 });
495
496 while let Ok((socket, addr)) = self.management_socket.accept().await {
498 info!("New management connection from {}", addr);
499 let runtime_tx = self.theater_tx.clone();
500 let subscriptions = self.subscriptions.clone();
501 let channel_subscriptions = self.channel_subscriptions.clone();
502 let message_router = self.message_router.clone();
503 let resource_cache = resource_cache.clone();
504
505 tokio::spawn(async move {
506 if let Err(e) = Self::handle_management_connection(
507 socket,
508 runtime_tx,
509 subscriptions,
510 channel_subscriptions,
511 message_router,
512 resource_cache,
513 )
514 .await
515 {
516 error!("Error handling management connection: {}", e);
517 }
518 });
519 }
520
521 runtime_handle.await??;
522 Ok(())
523 }
524
525 async fn handle_management_connection(
526 socket: TcpStream,
527 runtime_tx: mpsc::Sender<TheaterCommand>,
528 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
529 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
530 message_router: theater_handler_message_server::MessageRouter,
531 resource_cache: Arc<ResourceCache>,
532 ) -> Result<()> {
533 let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
535
536 let codec = FragmentingCodec::new();
537 let framed = Framed::new(socket, codec);
538
539 let (mut framed_sink, mut framed_stream) = framed.split();
541
542 let cmd_client_tx = client_tx.clone();
544
545 let _response_task = tokio::spawn(async move {
547 while let Some(response) = client_rx.recv().await {
548 match serde_json::to_vec(&response) {
549 Ok(data) => {
550 debug!("Serialized response: {} bytes", data.len());
551 if data.len() > 10 * 1024 * 1024 {
552 debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
553 }
554 if let Err(e) = framed_sink.send(Bytes::from(data)).await {
555 debug!("Error sending response to client: {}", e);
556 break;
557 }
558 }
559 Err(e) => {
560 error!("Error serializing response: {}", e);
561 }
562 }
563 }
564 debug!("Response forwarder for client closed");
565 });
566
567 let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
569
570 let mut connection_channel_subscriptions: Vec<String> = Vec::new();
572
573 'connection: while let Some(msg) = framed_stream.next().await {
575 debug!("Received management message");
576 let msg = match msg {
577 Ok(m) => m,
578 Err(e) => {
579 error!("Error receiving message: {}", e);
580 break 'connection;
581 }
582 };
583
584 let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
585 Ok(c) => c,
586 Err(e) => {
587 error!(
588 "Error parsing command: {} {}",
589 e,
590 String::from_utf8_lossy(&msg)
591 );
592 continue;
593 }
594 };
595 debug!("Parsed command: {:?}", cmd);
596
597 let _cmd_clone = cmd.clone();
599
600 let response = match cmd {
601 ManagementCommand::StartActor {
602 manifest,
603 initial_state: _initial_state,
604 parent,
605 subscribe,
606 } => {
607 info!("Starting actor from manifest: {:?}", manifest);
608
609 let manifest_str = match resolve_reference(&manifest).await {
611 Ok(bytes) => match String::from_utf8(bytes) {
612 Ok(s) => s,
613 Err(e) => {
614 error!("Invalid manifest encoding: {}", e);
615 cmd_client_tx
616 .send(ManagementResponse::Error {
617 error: ManagementError::ActorInitializationError(format!(
618 "Invalid manifest encoding: {}",
619 e
620 )),
621 })
622 .await
623 .ok();
624 continue;
625 }
626 },
627 Err(e) => {
628 error!("Failed to load manifest: {}", e);
629 cmd_client_tx
630 .send(ManagementResponse::Error {
631 error: ManagementError::ActorInitializationError(format!(
632 "Failed to load manifest: {}",
633 e
634 )),
635 })
636 .await
637 .ok();
638 continue;
639 }
640 };
641
642 let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
643 Ok(m) => m,
644 Err(e) => {
645 error!("Failed to parse manifest: {}", e);
646 cmd_client_tx
647 .send(ManagementResponse::Error {
648 error: ManagementError::ActorInitializationError(format!(
649 "Failed to parse manifest: {}",
650 e
651 )),
652 })
653 .await
654 .ok();
655 continue;
656 }
657 };
658
659 let wasm_bytes_result = if manifest_config.static_package {
663 resolve_reference_cached(&manifest_config.package, &resource_cache)
664 .await
665 .map(|(arc, _)| (*arc).clone())
666 } else {
667 resolve_reference(&manifest_config.package).await
668 };
669 let wasm_bytes = match wasm_bytes_result {
670 Ok(bytes) => bytes,
671 Err(e) => {
672 error!("Failed to load WASM: {}", e);
673 cmd_client_tx
674 .send(ManagementResponse::Error {
675 error: ManagementError::ActorInitializationError(format!(
676 "Failed to load WASM: {}",
677 e
678 )),
679 })
680 .await
681 .ok();
682 continue;
683 }
684 };
685
686 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
687 debug!("Sending SpawnActor command to runtime");
688 let supervisor_tx = if parent {
689 let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
690 let cmd_client_tx = cmd_client_tx.clone();
691 tokio::spawn(async move {
692 while let Some(res) = supervisor_rx.recv().await {
693 debug!("Received supervisor response: {:?}", res);
694 if let Err(e) = cmd_client_tx
695 .send(ManagementResponse::ActorResult(res))
696 .await
697 {
698 error!("Failed to send supervisor response: {}", e);
699 break;
700 }
701 }
702 });
703 Some(supervisor_tx)
704 } else {
705 None
706 };
707 let subscription_tx = if subscribe {
708 let (event_tx, mut event_rx) = mpsc::channel(32);
709
710 let cmd_client_tx = cmd_client_tx.clone();
712 tokio::spawn(async move {
713 while let Some((_actor_id, event)) = event_rx.recv().await {
714 debug!("Received event for subscription");
715 let response = ManagementResponse::ActorEvent { event };
716 if let Err(e) = cmd_client_tx.send(response).await {
717 debug!("Failed to forward event to client: {}", e);
718 break;
719 }
720 }
721 debug!("Event forwarder for subscription stopped");
722 });
723
724 Some(event_tx)
725 } else {
726 None
727 };
728 match runtime_tx
729 .send(TheaterCommand::SetupActor {
730 wasm_bytes,
731 name: Some(manifest_config.name.clone()),
732 manifest: Some(manifest_config),
733 init_state: default_init_state(),
734 response_tx: cmd_tx,
735 supervisor_tx,
736 subscription_tx,
737 })
738 .await
739 {
740 Ok(_) => {
741 debug!("SpawnActor command sent to runtime, awaiting response");
742 match cmd_rx.await {
743 Ok(result) => match result {
744 Ok(actor_id) => {
745 info!("Actor started with ID: {:?}", actor_id);
746 ManagementResponse::ActorStarted { id: actor_id }
747 }
748 Err(e) => {
749 error!("Runtime failed to start actor: {}", e);
750 ManagementResponse::Error {
751 error: ManagementError::RuntimeError(format!(
752 "Failed to start actor: {}",
753 e
754 )),
755 }
756 }
757 },
758 Err(e) => {
759 error!("Failed to receive spawn response: {}", e);
760 ManagementResponse::Error {
761 error: ManagementError::CommunicationError(format!(
762 "Failed to receive spawn response: {}",
763 e
764 )),
765 }
766 }
767 }
768 }
769 Err(e) => {
770 error!("Failed to send SpawnActor command: {}", e);
771 ManagementResponse::Error {
772 error: ManagementError::CommunicationError(format!(
773 "Failed to send spawn command: {}",
774 e
775 )),
776 }
777 }
778 }
779 }
780 ManagementCommand::StopActor { id } => {
781 info!("Stopping actor: {:?}", id);
782 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
783 runtime_tx
784 .send(TheaterCommand::StopActor {
785 actor_id: id,
786 response_tx: cmd_tx,
787 })
788 .await?;
789
790 match cmd_rx.await? {
791 Ok(_) => {
792 subscriptions.lock().await.remove(&id);
793 ManagementResponse::ActorStopped { id }
794 }
795 Err(e) => ManagementResponse::Error {
796 error: ManagementError::RuntimeError(format!(
797 "Failed to stop actor: {}",
798 e
799 )),
800 },
801 }
802 }
803 ManagementCommand::TerminateActor { id } => {
804 info!("Terminating actor: {:?}", id);
805 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
806 runtime_tx
807 .send(TheaterCommand::TerminateActor {
808 actor_id: id,
809 response_tx: cmd_tx,
810 })
811 .await?;
812
813 match cmd_rx.await? {
814 Ok(_) => {
815 subscriptions.lock().await.remove(&id);
816 ManagementResponse::ActorStopped { id }
817 }
818 Err(e) => ManagementResponse::Error {
819 error: ManagementError::RuntimeError(format!(
820 "Failed to terminate actor: {}",
821 e
822 )),
823 },
824 }
825 }
826 ManagementCommand::ListActors => {
827 debug!("Listing actors");
828 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
829 runtime_tx
830 .send(TheaterCommand::GetActors {
831 response_tx: cmd_tx,
832 })
833 .await?;
834
835 match cmd_rx.await? {
836 Ok(actors) => {
837 info!("Found {} actors", actors.len());
838 ManagementResponse::ActorList { actors }
839 }
840 Err(e) => ManagementResponse::Error {
841 error: ManagementError::RuntimeError(format!(
842 "Failed to list actors: {}",
843 e
844 )),
845 },
846 }
847 }
848 ManagementCommand::SubscribeToActor { id } => {
849 info!("New subscription request for actor: {:?}", id);
850 let subscription_id = Uuid::new_v4();
851 let subscription = Subscription {
852 id: subscription_id,
853 client_tx: cmd_client_tx.clone(),
854 };
855
856 debug!("Subscription created with ID: {}", subscription_id);
857
858 subscriptions
860 .lock()
861 .await
862 .entry(id)
863 .or_default()
864 .insert(subscription);
865
866 let (event_tx, mut event_rx) = mpsc::channel(32);
868 runtime_tx
869 .send(TheaterCommand::SubscribeToActor {
870 actor_id: id,
871 event_tx,
872 })
873 .await
874 .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
875
876 connection_subscriptions.push((id, subscription_id));
878
879 let client_tx_clone = cmd_client_tx.clone();
881 tokio::spawn(async move {
882 debug!(
883 "Starting event forwarder for subscription {}",
884 subscription_id
885 );
886 while let Some((_actor_id, event)) = event_rx.recv().await {
887 debug!("Received event for subscription {}", subscription_id);
888 let response = ManagementResponse::ActorEvent { event };
889 if let Err(e) = client_tx_clone.send(response).await {
890 debug!("Failed to forward event to client: {}", e);
891 break;
892 }
893 }
894 debug!(
895 "Event forwarder for subscription {} stopped",
896 subscription_id
897 );
898 });
899
900 ManagementResponse::Subscribed {
901 id,
902 subscription_id,
903 }
904 }
905 ManagementCommand::UnsubscribeFromActor {
906 id,
907 subscription_id,
908 } => {
909 debug!(
910 "Removing subscription {} for actor {:?}",
911 subscription_id, id
912 );
913
914 connection_subscriptions
916 .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
917
918 let mut subs = subscriptions.lock().await;
920 if let Some(actor_subs) = subs.get_mut(&id) {
921 actor_subs.retain(|sub| sub.id != subscription_id);
922
923 if actor_subs.is_empty() {
925 subs.remove(&id);
926 }
927 }
928
929 debug!("Subscription removed");
930 ManagementResponse::Unsubscribed { id }
931 }
932 ManagementCommand::SendActorMessage { id, data } => {
933 info!("Sending message to actor: {:?}", id);
934
935 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
937
938 let message = ActorMessage::Send(ActorSend { data });
940
941 match message_router
943 .route_message(theater::messages::MessageCommand::SendMessage {
944 target_id: id,
945 message,
946 response_tx,
947 })
948 .await
949 {
950 Ok(_) => {
951 match response_rx.await {
953 Ok(Ok(())) => {
954 info!("Message sent successfully to actor: {:?}", id);
955 ManagementResponse::SentMessage { id }
956 }
957 Ok(Err(e)) => {
958 error!("Failed to send message to actor: {}", e);
959 ManagementResponse::Error {
960 error: ManagementError::RuntimeError(format!(
961 "Failed to send: {}",
962 e
963 )),
964 }
965 }
966 Err(e) => {
967 error!("Failed to receive routing response: {}", e);
968 ManagementResponse::Error {
969 error: ManagementError::CommunicationError(format!(
970 "Failed to receive routing response: {}",
971 e
972 )),
973 }
974 }
975 }
976 }
977 Err(e) => {
978 error!("Failed to route message: {}", e);
979 ManagementResponse::Error {
980 error: ManagementError::RuntimeError(format!(
981 "Failed to route message: {}",
982 e
983 )),
984 }
985 }
986 }
987 }
988 ManagementCommand::RequestActorMessage { id, data } => {
989 info!("Requesting message from actor: {:?}", id);
990
991 let (route_tx, route_rx) = tokio::sync::oneshot::channel();
993 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
994
995 let message = ActorMessage::Request(ActorRequest { data, response_tx });
997
998 match message_router
1000 .route_message(theater::messages::MessageCommand::SendMessage {
1001 target_id: id,
1002 message,
1003 response_tx: route_tx,
1004 })
1005 .await
1006 {
1007 Ok(_) => {
1008 match route_rx.await {
1010 Ok(Ok(())) => {
1011 match response_rx.await {
1013 Ok(response_data) => {
1014 info!("Received response from actor: {:?}", id);
1015 ManagementResponse::RequestedMessage {
1016 id,
1017 message: response_data,
1018 }
1019 }
1020 Err(e) => {
1021 error!("Actor didn't respond: {}", e);
1022 ManagementResponse::Error {
1023 error: ManagementError::RuntimeError(format!(
1024 "Actor didn't respond: {}",
1025 e
1026 )),
1027 }
1028 }
1029 }
1030 }
1031 Ok(Err(e)) => {
1032 error!("Failed to route request to actor: {}", e);
1033 ManagementResponse::Error {
1034 error: ManagementError::RuntimeError(format!(
1035 "Failed to route: {}",
1036 e
1037 )),
1038 }
1039 }
1040 Err(e) => {
1041 error!("Failed to receive routing response: {}", e);
1042 ManagementResponse::Error {
1043 error: ManagementError::CommunicationError(format!(
1044 "Failed to receive routing response: {}",
1045 e
1046 )),
1047 }
1048 }
1049 }
1050 }
1051 Err(e) => {
1052 error!("Failed to route request: {}", e);
1053 ManagementResponse::Error {
1054 error: ManagementError::RuntimeError(format!(
1055 "Failed to route request: {}",
1056 e
1057 )),
1058 }
1059 }
1060 }
1061 }
1062 ManagementCommand::GetActorManifest { id } => {
1063 info!("Getting manifest for actor: {:?}", id);
1064 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1065 runtime_tx
1066 .send(TheaterCommand::GetActorManifest {
1067 actor_id: id,
1068 response_tx: cmd_tx,
1069 })
1070 .await?;
1071
1072 let manifest = cmd_rx.await?;
1073 ManagementResponse::ActorManifest {
1074 id,
1075 manifest: manifest?,
1076 }
1077 }
1078 ManagementCommand::GetActorStatus { id } => {
1079 info!("Getting status for actor: {:?}", id);
1080 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1081 runtime_tx
1082 .send(TheaterCommand::GetActorStatus {
1083 actor_id: id,
1084 response_tx: cmd_tx,
1085 })
1086 .await?;
1087
1088 let status = cmd_rx.await?;
1089 ManagementResponse::ActorStatus {
1090 id,
1091 status: status?,
1092 }
1093 }
1094 ManagementCommand::RestartActor { id } => {
1095 info!("Restarting actor: {:?}", id);
1096 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1097 runtime_tx
1098 .send(TheaterCommand::RestartActor {
1099 actor_id: id,
1100 response_tx: cmd_tx,
1101 })
1102 .await?;
1103
1104 match cmd_rx.await? {
1105 Ok(_) => ManagementResponse::Restarted { id },
1106 Err(e) => ManagementResponse::Error {
1107 error: ManagementError::RuntimeError(format!(
1108 "Failed to restart actor: {}",
1109 e
1110 )),
1111 },
1112 }
1113 }
1114 ManagementCommand::GetActorState { id } => {
1115 info!("Getting state for actor: {:?}", id);
1116 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1117 runtime_tx
1118 .send(TheaterCommand::GetActorState {
1119 actor_id: id,
1120 response_tx: cmd_tx,
1121 })
1122 .await?;
1123
1124 let state = cmd_rx.await?;
1125 ManagementResponse::ActorState { id, state: state? }
1126 }
1127 ManagementCommand::GetActorMetrics { id } => {
1128 info!("Getting metrics for actor: {:?}", id);
1129 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1130 runtime_tx
1131 .send(TheaterCommand::GetActorMetrics {
1132 actor_id: id,
1133 response_tx: cmd_tx,
1134 })
1135 .await?;
1136
1137 let metrics = cmd_rx.await?;
1138 ManagementResponse::ActorMetrics {
1139 id,
1140 metrics: serde_json::to_value(metrics?)?,
1141 }
1142 }
1143 ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1144 ManagementResponse::Error {
1146 error: ManagementError::RuntimeError(
1147 "UpdateActorPackage not yet implemented".to_string(),
1148 ),
1149 }
1150 }
1151 ManagementCommand::OpenChannel {
1153 actor_id,
1154 initial_message,
1155 } => {
1156 info!("Opening channel to actor: {:?}", actor_id);
1157
1158 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1160
1161 let client_id = ChannelParticipant::External;
1163 let channel_id = ChannelId::new(&client_id, &actor_id);
1164 let channel_id_str = channel_id.0.clone();
1165
1166 message_router
1168 .route_message(theater::messages::MessageCommand::OpenChannel {
1169 initiator_id: client_id.clone(),
1170 target_id: actor_id.clone(),
1171 channel_id: channel_id.clone(),
1172 initial_message,
1173 response_tx,
1174 })
1175 .await
1176 .map_err(|e| {
1177 anyhow::anyhow!("Failed to send channel open command: {}", e)
1178 })?;
1179
1180 match response_rx.await {
1182 Ok(result) => {
1183 match result {
1184 Ok(accepted) => {
1185 if accepted {
1186 info!("Channel opened successfully: {}", channel_id_str);
1188
1189 let channel_sub = ChannelSubscription {
1191 channel_id: channel_id_str.clone(),
1192 initiator_id: client_id.clone(),
1193 target_id: actor_id.clone(),
1194 client_tx: cmd_client_tx.clone(),
1195 };
1196
1197 channel_subscriptions
1198 .lock()
1199 .await
1200 .insert(channel_id_str.clone(), channel_sub);
1201
1202 connection_channel_subscriptions
1204 .push(channel_id_str.clone());
1205
1206 ManagementResponse::ChannelOpened {
1207 channel_id: channel_id_str,
1208 actor_id,
1209 }
1210 } else {
1211 ManagementResponse::Error {
1213 error: ManagementError::ChannelRejected,
1214 }
1215 }
1216 }
1217 Err(e) => ManagementResponse::Error {
1218 error: ManagementError::RuntimeError(format!(
1219 "Error opening channel: {}",
1220 e
1221 )),
1222 },
1223 }
1224 }
1225 Err(e) => ManagementResponse::Error {
1226 error: ManagementError::CommunicationError(format!(
1227 "Failed to receive channel open response: {}",
1228 e
1229 )),
1230 },
1231 }
1232 }
1233 ManagementCommand::SendOnChannel {
1234 channel_id,
1235 message,
1236 } => {
1237 info!("Sending message on channel: {}", channel_id);
1238
1239 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1241
1242 let channel_id_parsed = ChannelId(channel_id.clone());
1244
1245 let sender_id = ChannelParticipant::External;
1247 match message_router
1248 .route_message(theater::messages::MessageCommand::ChannelMessage {
1249 channel_id: channel_id_parsed,
1250 sender_id,
1251 message,
1252 response_tx,
1253 })
1254 .await
1255 {
1256 Ok(_) => {
1257 match response_rx.await {
1259 Ok(Ok(())) => {
1260 info!("Message sent successfully on channel: {}", channel_id);
1261 ManagementResponse::MessageSent { channel_id }
1262 }
1263 Ok(Err(e)) => {
1264 error!("Failed to send on channel: {}", e);
1265 ManagementResponse::Error {
1266 error: ManagementError::RuntimeError(format!(
1267 "Failed to send on channel: {}",
1268 e
1269 )),
1270 }
1271 }
1272 Err(e) => {
1273 error!("Failed to receive channel send response: {}", e);
1274 ManagementResponse::Error {
1275 error: ManagementError::CommunicationError(format!(
1276 "Failed to receive channel send response: {}",
1277 e
1278 )),
1279 }
1280 }
1281 }
1282 }
1283 Err(e) => {
1284 error!("Failed to route channel message: {}", e);
1285 ManagementResponse::Error {
1286 error: ManagementError::RuntimeError(format!(
1287 "Failed to route channel message: {}",
1288 e
1289 )),
1290 }
1291 }
1292 }
1293 }
1294 ManagementCommand::CloseChannel { channel_id } => {
1295 info!("Closing channel: {}", channel_id);
1296
1297 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1299
1300 let channel_id_parsed = ChannelId(channel_id.clone());
1302
1303 let sender_id = ChannelParticipant::External;
1305 match message_router
1306 .route_message(theater::messages::MessageCommand::ChannelClose {
1307 channel_id: channel_id_parsed,
1308 sender_id,
1309 response_tx,
1310 })
1311 .await
1312 {
1313 Ok(_) => {
1314 match response_rx.await {
1316 Ok(Ok(())) => {
1317 info!("Channel closed successfully: {}", channel_id);
1318
1319 channel_subscriptions.lock().await.remove(&channel_id);
1321 connection_channel_subscriptions.retain(|id| id != &channel_id);
1322
1323 ManagementResponse::ChannelClosed { channel_id }
1324 }
1325 Ok(Err(e)) => {
1326 error!("Failed to close channel: {}", e);
1327 ManagementResponse::Error {
1328 error: ManagementError::RuntimeError(format!(
1329 "Failed to close channel: {}",
1330 e
1331 )),
1332 }
1333 }
1334 Err(e) => {
1335 error!("Failed to receive channel close response: {}", e);
1336 ManagementResponse::Error {
1337 error: ManagementError::CommunicationError(format!(
1338 "Failed to receive channel close response: {}",
1339 e
1340 )),
1341 }
1342 }
1343 }
1344 }
1345 Err(e) => {
1346 error!("Failed to route channel close: {}", e);
1347 ManagementResponse::Error {
1348 error: ManagementError::RuntimeError(format!(
1349 "Failed to route channel close: {}",
1350 e
1351 )),
1352 }
1353 }
1354 }
1355 }
1356 ManagementCommand::NewStore {} => {
1357 info!("Creating new store");
1358 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1359 runtime_tx
1360 .send(TheaterCommand::NewStore {
1361 response_tx: cmd_tx,
1362 })
1363 .await?;
1364
1365 let store_id = cmd_rx.await?;
1366 ManagementResponse::StoreCreated {
1367 store_id: store_id?.id,
1368 }
1369 }
1370 };
1371
1372 debug!("Sending response: {:?}", response);
1373 if let Err(e) = client_tx.send(response).await {
1374 error!("Failed to send response: {}", e);
1375 break;
1376 }
1377 debug!("Response sent");
1378 }
1379
1380 debug!(
1382 "Connection closed, cleaning up {} subscriptions",
1383 connection_subscriptions.len()
1384 );
1385 let mut subs = subscriptions.lock().await;
1386
1387 for (actor_id, sub_id) in connection_subscriptions {
1388 if let Some(actor_subs) = subs.get_mut(&actor_id) {
1389 actor_subs.retain(|sub| sub.id != sub_id);
1390
1391 if actor_subs.is_empty() {
1393 subs.remove(&actor_id);
1394 }
1395 }
1396 }
1397
1398 debug!(
1400 "Connection closed, cleaning up {} channel subscriptions",
1401 connection_channel_subscriptions.len()
1402 );
1403 let mut channel_subs = channel_subscriptions.lock().await;
1404
1405 for channel_id in connection_channel_subscriptions {
1406 channel_subs.remove(&channel_id);
1407 }
1408
1409 debug!("Cleaned up all subscriptions for the connection");
1410 Ok(())
1411 }
1412}