1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9 ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10 ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21 RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::resolve_reference;
28use theater::TheaterRuntimeError;
29
30use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42 StartActor {
43 manifest: String,
44 initial_state: Option<Vec<u8>>,
45 parent: bool,
46 subscribe: bool,
47 },
48 StopActor {
49 id: TheaterId,
50 },
51 TerminateActor {
52 id: TheaterId,
53 },
54 ListActors,
55 SubscribeToActor {
56 id: TheaterId,
57 },
58 UnsubscribeFromActor {
59 id: TheaterId,
60 subscription_id: Uuid,
61 },
62 SendActorMessage {
63 id: TheaterId,
64 data: Vec<u8>,
65 },
66 RequestActorMessage {
67 id: TheaterId,
68 data: Vec<u8>,
69 },
70 GetActorManifest {
71 id: TheaterId,
72 },
73 GetActorStatus {
74 id: TheaterId,
75 },
76 RestartActor {
77 id: TheaterId,
78 },
79 GetActorState {
80 id: TheaterId,
81 },
82 GetActorEvents {
83 id: TheaterId,
84 },
85 GetActorMetrics {
86 id: TheaterId,
87 },
88 UpdateActorPackage {
89 id: TheaterId,
90 package: String,
91 },
92 OpenChannel {
94 actor_id: ChannelParticipant,
95 initial_message: Vec<u8>,
96 },
97 SendOnChannel {
98 channel_id: String,
99 message: Vec<u8>,
100 },
101 CloseChannel {
102 channel_id: String,
103 },
104
105 NewStore {},
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[allow(clippy::large_enum_variant)]
111pub enum ManagementResponse {
112 ActorStarted {
113 id: TheaterId,
114 },
115 ActorStopped {
116 id: TheaterId,
117 },
118 ActorList {
119 actors: Vec<(TheaterId, String)>,
120 },
121 Subscribed {
122 id: TheaterId,
123 subscription_id: Uuid,
124 },
125 Unsubscribed {
126 id: TheaterId,
127 },
128 ActorEvent {
129 event: ChainEvent,
130 },
131 ActorResult(ActorResult),
132 Error {
133 error: ManagementError,
134 },
135 RequestedMessage {
136 id: TheaterId,
137 message: Vec<u8>,
138 },
139 SentMessage {
140 id: TheaterId,
141 },
142 ActorStatus {
143 id: TheaterId,
144 status: ActorStatus,
145 },
146 Restarted {
147 id: TheaterId,
148 },
149 ActorManifest {
150 id: TheaterId,
151 manifest: ManifestConfig,
152 },
153 ActorState {
154 id: TheaterId,
155 state: Value,
156 },
157 ActorEvents {
158 id: TheaterId,
159 events: Vec<ChainEvent>,
160 },
161 ActorMetrics {
162 id: TheaterId,
163 metrics: serde_json::Value,
164 },
165 ActorPackageUpdated {
166 id: TheaterId,
167 },
168 ChannelOpened {
170 channel_id: String,
171 actor_id: ChannelParticipant,
172 },
173 MessageSent {
174 channel_id: String,
175 },
176 ChannelMessage {
177 channel_id: String,
178 sender_id: ChannelParticipant,
179 message: Vec<u8>,
180 },
181 ChannelClosed {
182 channel_id: String,
183 },
184
185 StoreCreated {
187 store_id: String,
188 },
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub enum ManagementError {
193 ActorNotFound,
195 ActorAlreadyExists,
196 ActorNotRunning,
197 ActorError(String),
198
199 ChannelNotFound,
201 ChannelClosed,
202 ChannelRejected,
203
204 StoreError(String),
206
207 CommunicationError(String),
209
210 InvalidRequest(String),
212 Timeout,
213
214 RuntimeError(String),
216 InternalError(String),
217
218 SerializationError(String),
220
221 ActorInitializationError(String),
223}
224
225impl From<TheaterRuntimeError> for ManagementError {
227 fn from(err: TheaterRuntimeError) -> Self {
228 match err {
229 TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
230 TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
231 TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
232 TheaterRuntimeError::ActorOperationFailed(msg) => {
233 ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
234 }
235 TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
236 TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
237 TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
238 TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
239 TheaterRuntimeError::SerializationError(msg) => {
240 ManagementError::SerializationError(msg)
241 }
242 TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
243 TheaterRuntimeError::ActorInitializationError(msg) => {
244 ManagementError::ActorInitializationError(msg)
245 }
246 }
247 }
248}
249
250impl std::fmt::Display for ManagementError {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 match self {
254 ManagementError::ActorNotFound => write!(f, "Actor not found"),
255 ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
256 ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
257 ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
258 ManagementError::ChannelNotFound => write!(f, "Channel not found"),
259 ManagementError::ChannelClosed => write!(f, "Channel is closed"),
260 ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
261 ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
262 ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
263 ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
264 ManagementError::Timeout => write!(f, "Operation timed out"),
265 ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
266 ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
267 ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
268 ManagementError::ActorInitializationError(msg) => {
269 write!(f, "Actor initialization error: {}", msg)
270 }
271 }
272 }
273}
274
275impl std::error::Error for ManagementError {}
277
278#[derive(Debug)]
279#[allow(dead_code)]
280struct Subscription {
281 id: Uuid,
282 client_tx: mpsc::Sender<ManagementResponse>,
283}
284
285impl Eq for Subscription {}
286impl PartialEq for Subscription {
287 fn eq(&self, other: &Self) -> bool {
288 self.id == other.id
289 }
290}
291impl std::hash::Hash for Subscription {
292 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
293 self.id.hash(state);
294 }
295}
296
297#[derive(Debug)]
301#[allow(dead_code)]
302struct ChannelSubscription {
303 channel_id: String,
304 initiator_id: ChannelParticipant,
305 target_id: ChannelParticipant,
306 client_tx: mpsc::Sender<ManagementResponse>,
307}
308
309fn create_root_handler_registry(
318 theater_tx: mpsc::Sender<TheaterCommand>,
319) -> (
320 HandlerRegistry,
321 theater_handler_message_server::MessageRouter,
322) {
323 let mut registry = HandlerRegistry::new();
324
325 info!("Initializing Theater server with Theater-specific handlers...");
326
327 let runtime_config = RuntimeHostConfig {};
329 registry.register(RuntimeHandler::new(
330 runtime_config,
331 theater_tx.clone(),
332 None,
333 ));
334
335 let store_config = StoreHandlerConfig::default();
337 registry.register(StoreHandler::new(store_config, None));
338
339 let supervisor_config = SupervisorHostConfig {};
341 registry.register(SupervisorHandler::new(supervisor_config, None));
342
343 let message_router = theater_handler_message_server::MessageRouter::new();
345 registry.register(MessageServerHandler::new(None, message_router.clone()));
346
347 let tcp_config = TcpHandlerConfig {
349 listen: None,
350 max_connections: None,
351 ..Default::default()
352 };
353 registry.register(TcpHandler::new(tcp_config));
354
355 info!("✓ 5 Theater-specific handlers registered");
356 info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
357
358 (registry, message_router)
359}
360
361pub struct TheaterServer {
362 runtime: TheaterRuntime,
363 theater_tx: mpsc::Sender<TheaterCommand>,
364 management_socket: TcpListener,
365 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
366 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
368 #[allow(dead_code)]
370 channel_events_tx: mpsc::Sender<ChannelEvent>,
371 message_router: theater_handler_message_server::MessageRouter,
373}
374
375impl TheaterServer {
376 async fn process_channel_events(
378 mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
379 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
380 ) {
381 while let Some(event) = channel_events_rx.recv().await {
382 match event {
383 ChannelEvent::Message {
384 channel_id,
385 sender_id,
386 message,
387 } => {
388 tracing::debug!("Received channel message for {}", channel_id);
389 let subs = channel_subscriptions.lock().await;
391 if let Some(sub) = subs.get(&channel_id.0) {
392 let response = ManagementResponse::ChannelMessage {
393 channel_id: channel_id.0.clone(),
394 sender_id,
395 message,
396 };
397
398 tracing::debug!("Forwarding channel message to client: {:?}", response);
399
400 if let Err(e) = sub.client_tx.send(response).await {
401 tracing::warn!("Failed to forward channel message: {}", e);
402 } else {
403 tracing::debug!("Forwarded channel message to client");
404 }
405 }
406 }
407 ChannelEvent::Close { channel_id } => {
408 tracing::debug!("Received channel close event for {}", channel_id);
409 let mut subs = channel_subscriptions.lock().await;
411 if let Some(sub) = subs.remove(&channel_id.0) {
412 let response = ManagementResponse::ChannelClosed {
413 channel_id: channel_id.0.clone(),
414 };
415
416 if let Err(e) = sub.client_tx.send(response).await {
417 tracing::warn!("Failed to forward channel close event: {}", e);
418 } else {
419 tracing::debug!("Forwarded channel close event to client");
420 }
421 }
422 }
423 }
424 }
425 }
426
427 pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
428 let (theater_tx, theater_rx) = mpsc::channel(32);
429
430 let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
432
433 let (handler_registry, message_router) = create_root_handler_registry(theater_tx.clone());
436
437 let runtime = TheaterRuntime::new(
439 theater_tx.clone(),
440 theater_rx,
441 Some(channel_events_tx.clone()),
442 handler_registry,
443 )
444 .await?;
445 let management_socket = TcpListener::bind(address).await?;
446
447 let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
448
449 let channel_subs_clone = channel_subscriptions.clone();
451 tokio::spawn(async move {
452 Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
453 });
454
455 Ok(Self {
456 runtime,
457 theater_tx,
458 management_socket,
459 subscriptions: Arc::new(Mutex::new(HashMap::new())),
460 channel_subscriptions,
461 channel_events_tx,
462 message_router,
463 })
464 }
465
466 pub async fn run(mut self) -> Result<()> {
467 info!(
468 "Theater server starting on {:?}",
469 self.management_socket.local_addr()?
470 );
471
472 let runtime_handle = tokio::spawn(async move {
474 match self.runtime.run().await {
475 Ok(_) => Ok(()),
476 Err(e) => {
477 error!("Theater runtime failed: {}", e);
478 Err(e)
479 }
480 }
481 });
482
483 while let Ok((socket, addr)) = self.management_socket.accept().await {
485 info!("New management connection from {}", addr);
486 let runtime_tx = self.theater_tx.clone();
487 let subscriptions = self.subscriptions.clone();
488 let channel_subscriptions = self.channel_subscriptions.clone();
489 let message_router = self.message_router.clone();
490
491 tokio::spawn(async move {
492 if let Err(e) = Self::handle_management_connection(
493 socket,
494 runtime_tx,
495 subscriptions,
496 channel_subscriptions,
497 message_router,
498 )
499 .await
500 {
501 error!("Error handling management connection: {}", e);
502 }
503 });
504 }
505
506 runtime_handle.await??;
507 Ok(())
508 }
509
510 async fn handle_management_connection(
511 socket: TcpStream,
512 runtime_tx: mpsc::Sender<TheaterCommand>,
513 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
514 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
515 message_router: theater_handler_message_server::MessageRouter,
516 ) -> Result<()> {
517 let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
519
520 let codec = FragmentingCodec::new();
521 let framed = Framed::new(socket, codec);
522
523 let (mut framed_sink, mut framed_stream) = framed.split();
525
526 let cmd_client_tx = client_tx.clone();
528
529 let _response_task = tokio::spawn(async move {
531 while let Some(response) = client_rx.recv().await {
532 match serde_json::to_vec(&response) {
533 Ok(data) => {
534 debug!("Serialized response: {} bytes", data.len());
535 if data.len() > 10 * 1024 * 1024 {
536 debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
537 }
538 if let Err(e) = framed_sink.send(Bytes::from(data)).await {
539 debug!("Error sending response to client: {}", e);
540 break;
541 }
542 }
543 Err(e) => {
544 error!("Error serializing response: {}", e);
545 }
546 }
547 }
548 debug!("Response forwarder for client closed");
549 });
550
551 let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
553
554 let mut connection_channel_subscriptions: Vec<String> = Vec::new();
556
557 'connection: while let Some(msg) = framed_stream.next().await {
559 debug!("Received management message");
560 let msg = match msg {
561 Ok(m) => m,
562 Err(e) => {
563 error!("Error receiving message: {}", e);
564 break 'connection;
565 }
566 };
567
568 let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
569 Ok(c) => c,
570 Err(e) => {
571 error!(
572 "Error parsing command: {} {}",
573 e,
574 String::from_utf8_lossy(&msg)
575 );
576 continue;
577 }
578 };
579 debug!("Parsed command: {:?}", cmd);
580
581 let _cmd_clone = cmd.clone();
583
584 let response = match cmd {
585 ManagementCommand::StartActor {
586 manifest,
587 initial_state: _initial_state,
588 parent,
589 subscribe,
590 } => {
591 info!("Starting actor from manifest: {:?}", manifest);
592
593 let manifest_str = match resolve_reference(&manifest).await {
595 Ok(bytes) => match String::from_utf8(bytes) {
596 Ok(s) => s,
597 Err(e) => {
598 error!("Invalid manifest encoding: {}", e);
599 cmd_client_tx
600 .send(ManagementResponse::Error {
601 error: ManagementError::ActorInitializationError(format!(
602 "Invalid manifest encoding: {}",
603 e
604 )),
605 })
606 .await
607 .ok();
608 continue;
609 }
610 },
611 Err(e) => {
612 error!("Failed to load manifest: {}", e);
613 cmd_client_tx
614 .send(ManagementResponse::Error {
615 error: ManagementError::ActorInitializationError(format!(
616 "Failed to load manifest: {}",
617 e
618 )),
619 })
620 .await
621 .ok();
622 continue;
623 }
624 };
625
626 let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
627 Ok(m) => m,
628 Err(e) => {
629 error!("Failed to parse manifest: {}", e);
630 cmd_client_tx
631 .send(ManagementResponse::Error {
632 error: ManagementError::ActorInitializationError(format!(
633 "Failed to parse manifest: {}",
634 e
635 )),
636 })
637 .await
638 .ok();
639 continue;
640 }
641 };
642
643 let wasm_bytes = match resolve_reference(&manifest_config.package).await {
645 Ok(bytes) => bytes,
646 Err(e) => {
647 error!("Failed to load WASM: {}", e);
648 cmd_client_tx
649 .send(ManagementResponse::Error {
650 error: ManagementError::ActorInitializationError(format!(
651 "Failed to load WASM: {}",
652 e
653 )),
654 })
655 .await
656 .ok();
657 continue;
658 }
659 };
660
661 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
662 debug!("Sending SpawnActor command to runtime");
663 let supervisor_tx = if parent {
664 let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
665 let cmd_client_tx = cmd_client_tx.clone();
666 tokio::spawn(async move {
667 while let Some(res) = supervisor_rx.recv().await {
668 debug!("Received supervisor response: {:?}", res);
669 if let Err(e) = cmd_client_tx
670 .send(ManagementResponse::ActorResult(res))
671 .await
672 {
673 error!("Failed to send supervisor response: {}", e);
674 break;
675 }
676 }
677 });
678 Some(supervisor_tx)
679 } else {
680 None
681 };
682 let subscription_tx = if subscribe {
683 let (event_tx, mut event_rx) = mpsc::channel(32);
684
685 let cmd_client_tx = cmd_client_tx.clone();
687 tokio::spawn(async move {
688 while let Some(event) = event_rx.recv().await {
689 debug!("Received event for subscription");
690 let response = match event {
691 Ok(event) => ManagementResponse::ActorEvent { event },
692 Err(_e) => {
693 debug!("Actor error received, but already in event chain");
696 continue;
697 }
698 };
699 if let Err(e) = cmd_client_tx.send(response).await {
700 debug!("Failed to forward event to client: {}", e);
701 break;
702 }
703 }
704 debug!("Event forwarder for subscription stopped");
705 });
706
707 Some(event_tx)
708 } else {
709 None
710 };
711 match runtime_tx
712 .send(TheaterCommand::SpawnActor {
713 wasm_bytes,
714 name: Some(manifest_config.name.clone()),
715 manifest: Some(manifest_config),
716 init_bytes: None,
717 response_tx: cmd_tx,
718 supervisor_tx,
719 subscription_tx,
720 })
721 .await
722 {
723 Ok(_) => {
724 debug!("SpawnActor command sent to runtime, awaiting response");
725 match cmd_rx.await {
726 Ok(result) => match result {
727 Ok(actor_id) => {
728 info!("Actor started with ID: {:?}", actor_id);
729 ManagementResponse::ActorStarted { id: actor_id }
730 }
731 Err(e) => {
732 error!("Runtime failed to start actor: {}", e);
733 ManagementResponse::Error {
734 error: ManagementError::RuntimeError(format!(
735 "Failed to start actor: {}",
736 e
737 )),
738 }
739 }
740 },
741 Err(e) => {
742 error!("Failed to receive spawn response: {}", e);
743 ManagementResponse::Error {
744 error: ManagementError::CommunicationError(format!(
745 "Failed to receive spawn response: {}",
746 e
747 )),
748 }
749 }
750 }
751 }
752 Err(e) => {
753 error!("Failed to send SpawnActor command: {}", e);
754 ManagementResponse::Error {
755 error: ManagementError::CommunicationError(format!(
756 "Failed to send spawn command: {}",
757 e
758 )),
759 }
760 }
761 }
762 }
763 ManagementCommand::StopActor { id } => {
764 info!("Stopping actor: {:?}", id);
765 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
766 runtime_tx
767 .send(TheaterCommand::StopActor {
768 actor_id: id,
769 response_tx: cmd_tx,
770 })
771 .await?;
772
773 match cmd_rx.await? {
774 Ok(_) => {
775 subscriptions.lock().await.remove(&id);
776 ManagementResponse::ActorStopped { id }
777 }
778 Err(e) => ManagementResponse::Error {
779 error: ManagementError::RuntimeError(format!(
780 "Failed to stop actor: {}",
781 e
782 )),
783 },
784 }
785 }
786 ManagementCommand::TerminateActor { id } => {
787 info!("Terminating actor: {:?}", id);
788 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
789 runtime_tx
790 .send(TheaterCommand::TerminateActor {
791 actor_id: id,
792 response_tx: cmd_tx,
793 })
794 .await?;
795
796 match cmd_rx.await? {
797 Ok(_) => {
798 subscriptions.lock().await.remove(&id);
799 ManagementResponse::ActorStopped { id }
800 }
801 Err(e) => ManagementResponse::Error {
802 error: ManagementError::RuntimeError(format!(
803 "Failed to terminate actor: {}",
804 e
805 )),
806 },
807 }
808 }
809 ManagementCommand::ListActors => {
810 debug!("Listing actors");
811 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
812 runtime_tx
813 .send(TheaterCommand::GetActors {
814 response_tx: cmd_tx,
815 })
816 .await?;
817
818 match cmd_rx.await? {
819 Ok(actors) => {
820 info!("Found {} actors", actors.len());
821 ManagementResponse::ActorList { actors }
822 }
823 Err(e) => ManagementResponse::Error {
824 error: ManagementError::RuntimeError(format!(
825 "Failed to list actors: {}",
826 e
827 )),
828 },
829 }
830 }
831 ManagementCommand::SubscribeToActor { id } => {
832 info!("New subscription request for actor: {:?}", id);
833 let subscription_id = Uuid::new_v4();
834 let subscription = Subscription {
835 id: subscription_id,
836 client_tx: cmd_client_tx.clone(),
837 };
838
839 debug!("Subscription created with ID: {}", subscription_id);
840
841 subscriptions
843 .lock()
844 .await
845 .entry(id)
846 .or_default()
847 .insert(subscription);
848
849 let (event_tx, mut event_rx) = mpsc::channel(32);
851 runtime_tx
852 .send(TheaterCommand::SubscribeToActor {
853 actor_id: id,
854 event_tx,
855 })
856 .await
857 .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
858
859 connection_subscriptions.push((id, subscription_id));
861
862 let client_tx_clone = cmd_client_tx.clone();
864 tokio::spawn(async move {
865 debug!(
866 "Starting event forwarder for subscription {}",
867 subscription_id
868 );
869 while let Some(event) = event_rx.recv().await {
870 debug!("Received event for subscription {}", subscription_id);
871 let response = match event {
872 Ok(event) => ManagementResponse::ActorEvent { event },
873 Err(_e) => {
874 debug!("Actor error received, but already in event chain");
877 continue;
878 }
879 };
880 if let Err(e) = client_tx_clone.send(response).await {
881 debug!("Failed to forward event to client: {}", e);
882 break;
883 }
884 }
885 debug!(
886 "Event forwarder for subscription {} stopped",
887 subscription_id
888 );
889 });
890
891 ManagementResponse::Subscribed {
892 id,
893 subscription_id,
894 }
895 }
896 ManagementCommand::UnsubscribeFromActor {
897 id,
898 subscription_id,
899 } => {
900 debug!(
901 "Removing subscription {} for actor {:?}",
902 subscription_id, id
903 );
904
905 connection_subscriptions
907 .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
908
909 let mut subs = subscriptions.lock().await;
911 if let Some(actor_subs) = subs.get_mut(&id) {
912 actor_subs.retain(|sub| sub.id != subscription_id);
913
914 if actor_subs.is_empty() {
916 subs.remove(&id);
917 }
918 }
919
920 debug!("Subscription removed");
921 ManagementResponse::Unsubscribed { id }
922 }
923 ManagementCommand::SendActorMessage { id, data } => {
924 info!("Sending message to actor: {:?}", id);
925
926 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
928
929 let message = ActorMessage::Send(ActorSend { data });
931
932 match message_router
934 .route_message(theater::messages::MessageCommand::SendMessage {
935 target_id: id,
936 message,
937 response_tx,
938 })
939 .await
940 {
941 Ok(_) => {
942 match response_rx.await {
944 Ok(Ok(())) => {
945 info!("Message sent successfully to actor: {:?}", id);
946 ManagementResponse::SentMessage { id }
947 }
948 Ok(Err(e)) => {
949 error!("Failed to send message to actor: {}", e);
950 ManagementResponse::Error {
951 error: ManagementError::RuntimeError(format!(
952 "Failed to send: {}",
953 e
954 )),
955 }
956 }
957 Err(e) => {
958 error!("Failed to receive routing response: {}", e);
959 ManagementResponse::Error {
960 error: ManagementError::CommunicationError(format!(
961 "Failed to receive routing response: {}",
962 e
963 )),
964 }
965 }
966 }
967 }
968 Err(e) => {
969 error!("Failed to route message: {}", e);
970 ManagementResponse::Error {
971 error: ManagementError::RuntimeError(format!(
972 "Failed to route message: {}",
973 e
974 )),
975 }
976 }
977 }
978 }
979 ManagementCommand::RequestActorMessage { id, data } => {
980 info!("Requesting message from actor: {:?}", id);
981
982 let (route_tx, route_rx) = tokio::sync::oneshot::channel();
984 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
985
986 let message = ActorMessage::Request(ActorRequest { data, response_tx });
988
989 match message_router
991 .route_message(theater::messages::MessageCommand::SendMessage {
992 target_id: id,
993 message,
994 response_tx: route_tx,
995 })
996 .await
997 {
998 Ok(_) => {
999 match route_rx.await {
1001 Ok(Ok(())) => {
1002 match response_rx.await {
1004 Ok(response_data) => {
1005 info!("Received response from actor: {:?}", id);
1006 ManagementResponse::RequestedMessage {
1007 id,
1008 message: response_data,
1009 }
1010 }
1011 Err(e) => {
1012 error!("Actor didn't respond: {}", e);
1013 ManagementResponse::Error {
1014 error: ManagementError::RuntimeError(format!(
1015 "Actor didn't respond: {}",
1016 e
1017 )),
1018 }
1019 }
1020 }
1021 }
1022 Ok(Err(e)) => {
1023 error!("Failed to route request to actor: {}", e);
1024 ManagementResponse::Error {
1025 error: ManagementError::RuntimeError(format!(
1026 "Failed to route: {}",
1027 e
1028 )),
1029 }
1030 }
1031 Err(e) => {
1032 error!("Failed to receive routing response: {}", e);
1033 ManagementResponse::Error {
1034 error: ManagementError::CommunicationError(format!(
1035 "Failed to receive routing response: {}",
1036 e
1037 )),
1038 }
1039 }
1040 }
1041 }
1042 Err(e) => {
1043 error!("Failed to route request: {}", e);
1044 ManagementResponse::Error {
1045 error: ManagementError::RuntimeError(format!(
1046 "Failed to route request: {}",
1047 e
1048 )),
1049 }
1050 }
1051 }
1052 }
1053 ManagementCommand::GetActorManifest { id } => {
1054 info!("Getting manifest for actor: {:?}", id);
1055 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1056 runtime_tx
1057 .send(TheaterCommand::GetActorManifest {
1058 actor_id: id,
1059 response_tx: cmd_tx,
1060 })
1061 .await?;
1062
1063 let manifest = cmd_rx.await?;
1064 ManagementResponse::ActorManifest {
1065 id,
1066 manifest: manifest?,
1067 }
1068 }
1069 ManagementCommand::GetActorStatus { id } => {
1070 info!("Getting status for actor: {:?}", id);
1071 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1072 runtime_tx
1073 .send(TheaterCommand::GetActorStatus {
1074 actor_id: id,
1075 response_tx: cmd_tx,
1076 })
1077 .await?;
1078
1079 let status = cmd_rx.await?;
1080 ManagementResponse::ActorStatus {
1081 id,
1082 status: status?,
1083 }
1084 }
1085 ManagementCommand::RestartActor { id } => {
1086 info!("Restarting actor: {:?}", id);
1087 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1088 runtime_tx
1089 .send(TheaterCommand::RestartActor {
1090 actor_id: id,
1091 response_tx: cmd_tx,
1092 })
1093 .await?;
1094
1095 match cmd_rx.await? {
1096 Ok(_) => ManagementResponse::Restarted { id },
1097 Err(e) => ManagementResponse::Error {
1098 error: ManagementError::RuntimeError(format!(
1099 "Failed to restart actor: {}",
1100 e
1101 )),
1102 },
1103 }
1104 }
1105 ManagementCommand::GetActorState { id } => {
1106 info!("Getting state for actor: {:?}", id);
1107 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1108 runtime_tx
1109 .send(TheaterCommand::GetActorState {
1110 actor_id: id,
1111 response_tx: cmd_tx,
1112 })
1113 .await?;
1114
1115 let state = cmd_rx.await?;
1116 ManagementResponse::ActorState { id, state: state? }
1117 }
1118 ManagementCommand::GetActorEvents { id } => {
1119 info!("Getting events for actor: {:?}", id);
1120 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1121 runtime_tx
1122 .send(TheaterCommand::GetActorEvents {
1123 actor_id: id,
1124 response_tx: cmd_tx,
1125 })
1126 .await?;
1127
1128 match cmd_rx.await {
1129 Ok(result) => match result {
1130 Ok(events) => {
1131 debug!(
1132 "Successfully retrieved {} events for actor {}",
1133 events.len(),
1134 id
1135 );
1136 ManagementResponse::ActorEvents { id, events }
1137 }
1138 Err(e) => {
1139 debug!("Error getting events for actor {}: {}", id, e);
1140 ManagementResponse::Error { error: e.into() }
1141 }
1142 },
1143 Err(e) => {
1144 error!("Failed to receive events response: {}", e);
1145 ManagementResponse::Error {
1146 error: ManagementError::CommunicationError(format!(
1147 "Failed to receive events response: {}",
1148 e
1149 )),
1150 }
1151 }
1152 }
1153 }
1154 ManagementCommand::GetActorMetrics { id } => {
1155 info!("Getting metrics for actor: {:?}", id);
1156 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1157 runtime_tx
1158 .send(TheaterCommand::GetActorMetrics {
1159 actor_id: id,
1160 response_tx: cmd_tx,
1161 })
1162 .await?;
1163
1164 let metrics = cmd_rx.await?;
1165 ManagementResponse::ActorMetrics {
1166 id,
1167 metrics: serde_json::to_value(metrics?)?,
1168 }
1169 }
1170 ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1171 ManagementResponse::Error {
1173 error: ManagementError::RuntimeError(
1174 "UpdateActorPackage not yet implemented".to_string(),
1175 ),
1176 }
1177 }
1178 ManagementCommand::OpenChannel {
1180 actor_id,
1181 initial_message,
1182 } => {
1183 info!("Opening channel to actor: {:?}", actor_id);
1184
1185 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1187
1188 let client_id = ChannelParticipant::External;
1190 let channel_id = ChannelId::new(&client_id, &actor_id);
1191 let channel_id_str = channel_id.0.clone();
1192
1193 message_router
1195 .route_message(theater::messages::MessageCommand::OpenChannel {
1196 initiator_id: client_id.clone(),
1197 target_id: actor_id.clone(),
1198 channel_id: channel_id.clone(),
1199 initial_message,
1200 response_tx,
1201 })
1202 .await
1203 .map_err(|e| {
1204 anyhow::anyhow!("Failed to send channel open command: {}", e)
1205 })?;
1206
1207 match response_rx.await {
1209 Ok(result) => {
1210 match result {
1211 Ok(accepted) => {
1212 if accepted {
1213 info!("Channel opened successfully: {}", channel_id_str);
1215
1216 let channel_sub = ChannelSubscription {
1218 channel_id: channel_id_str.clone(),
1219 initiator_id: client_id.clone(),
1220 target_id: actor_id.clone(),
1221 client_tx: cmd_client_tx.clone(),
1222 };
1223
1224 channel_subscriptions
1225 .lock()
1226 .await
1227 .insert(channel_id_str.clone(), channel_sub);
1228
1229 connection_channel_subscriptions
1231 .push(channel_id_str.clone());
1232
1233 ManagementResponse::ChannelOpened {
1234 channel_id: channel_id_str,
1235 actor_id,
1236 }
1237 } else {
1238 ManagementResponse::Error {
1240 error: ManagementError::ChannelRejected,
1241 }
1242 }
1243 }
1244 Err(e) => ManagementResponse::Error {
1245 error: ManagementError::RuntimeError(format!(
1246 "Error opening channel: {}",
1247 e
1248 )),
1249 },
1250 }
1251 }
1252 Err(e) => ManagementResponse::Error {
1253 error: ManagementError::CommunicationError(format!(
1254 "Failed to receive channel open response: {}",
1255 e
1256 )),
1257 },
1258 }
1259 }
1260 ManagementCommand::SendOnChannel {
1261 channel_id,
1262 message,
1263 } => {
1264 info!("Sending message on channel: {}", channel_id);
1265
1266 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1268
1269 let channel_id_parsed = ChannelId(channel_id.clone());
1271
1272 let sender_id = ChannelParticipant::External;
1274 match message_router
1275 .route_message(theater::messages::MessageCommand::ChannelMessage {
1276 channel_id: channel_id_parsed,
1277 sender_id,
1278 message,
1279 response_tx,
1280 })
1281 .await
1282 {
1283 Ok(_) => {
1284 match response_rx.await {
1286 Ok(Ok(())) => {
1287 info!("Message sent successfully on channel: {}", channel_id);
1288 ManagementResponse::MessageSent { channel_id }
1289 }
1290 Ok(Err(e)) => {
1291 error!("Failed to send on channel: {}", e);
1292 ManagementResponse::Error {
1293 error: ManagementError::RuntimeError(format!(
1294 "Failed to send on channel: {}",
1295 e
1296 )),
1297 }
1298 }
1299 Err(e) => {
1300 error!("Failed to receive channel send response: {}", e);
1301 ManagementResponse::Error {
1302 error: ManagementError::CommunicationError(format!(
1303 "Failed to receive channel send response: {}",
1304 e
1305 )),
1306 }
1307 }
1308 }
1309 }
1310 Err(e) => {
1311 error!("Failed to route channel message: {}", e);
1312 ManagementResponse::Error {
1313 error: ManagementError::RuntimeError(format!(
1314 "Failed to route channel message: {}",
1315 e
1316 )),
1317 }
1318 }
1319 }
1320 }
1321 ManagementCommand::CloseChannel { channel_id } => {
1322 info!("Closing channel: {}", channel_id);
1323
1324 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1326
1327 let channel_id_parsed = ChannelId(channel_id.clone());
1329
1330 let sender_id = ChannelParticipant::External;
1332 match message_router
1333 .route_message(theater::messages::MessageCommand::ChannelClose {
1334 channel_id: channel_id_parsed,
1335 sender_id,
1336 response_tx,
1337 })
1338 .await
1339 {
1340 Ok(_) => {
1341 match response_rx.await {
1343 Ok(Ok(())) => {
1344 info!("Channel closed successfully: {}", channel_id);
1345
1346 channel_subscriptions.lock().await.remove(&channel_id);
1348 connection_channel_subscriptions.retain(|id| id != &channel_id);
1349
1350 ManagementResponse::ChannelClosed { channel_id }
1351 }
1352 Ok(Err(e)) => {
1353 error!("Failed to close channel: {}", e);
1354 ManagementResponse::Error {
1355 error: ManagementError::RuntimeError(format!(
1356 "Failed to close channel: {}",
1357 e
1358 )),
1359 }
1360 }
1361 Err(e) => {
1362 error!("Failed to receive channel close response: {}", e);
1363 ManagementResponse::Error {
1364 error: ManagementError::CommunicationError(format!(
1365 "Failed to receive channel close response: {}",
1366 e
1367 )),
1368 }
1369 }
1370 }
1371 }
1372 Err(e) => {
1373 error!("Failed to route channel close: {}", e);
1374 ManagementResponse::Error {
1375 error: ManagementError::RuntimeError(format!(
1376 "Failed to route channel close: {}",
1377 e
1378 )),
1379 }
1380 }
1381 }
1382 }
1383 ManagementCommand::NewStore {} => {
1384 info!("Creating new store");
1385 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1386 runtime_tx
1387 .send(TheaterCommand::NewStore {
1388 response_tx: cmd_tx,
1389 })
1390 .await?;
1391
1392 let store_id = cmd_rx.await?;
1393 ManagementResponse::StoreCreated {
1394 store_id: store_id?.id,
1395 }
1396 }
1397 };
1398
1399 debug!("Sending response: {:?}", response);
1400 if let Err(e) = client_tx.send(response).await {
1401 error!("Failed to send response: {}", e);
1402 break;
1403 }
1404 debug!("Response sent");
1405 }
1406
1407 debug!(
1409 "Connection closed, cleaning up {} subscriptions",
1410 connection_subscriptions.len()
1411 );
1412 let mut subs = subscriptions.lock().await;
1413
1414 for (actor_id, sub_id) in connection_subscriptions {
1415 if let Some(actor_subs) = subs.get_mut(&actor_id) {
1416 actor_subs.retain(|sub| sub.id != sub_id);
1417
1418 if actor_subs.is_empty() {
1420 subs.remove(&actor_id);
1421 }
1422 }
1423 }
1424
1425 debug!(
1427 "Connection closed, cleaning up {} channel subscriptions",
1428 connection_channel_subscriptions.len()
1429 );
1430 let mut channel_subs = channel_subscriptions.lock().await;
1431
1432 for channel_id in connection_channel_subscriptions {
1433 channel_subs.remove(&channel_id);
1434 }
1435
1436 debug!("Cleaned up all subscriptions for the connection");
1437 Ok(())
1438 }
1439}