1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9 ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10 ChannelParticipant,
11};
12use theater::pack_bridge::Value;
13use theater::{ChainEvent, ManifestConfig};
14use tokio::net::{TcpListener, TcpStream};
15use tokio::sync::{mpsc, Mutex};
16use tokio_util::codec::Framed;
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use theater::config::actor_manifest::{
21 RuntimeHostConfig, StoreHandlerConfig, SupervisorHostConfig, TcpHandlerConfig,
22};
23use theater::handler::HandlerRegistry;
24use theater::id::TheaterId;
25use theater::messages::{default_init_state, ChannelId, TheaterCommand};
26use theater::theater_runtime::TheaterRuntime;
27use theater::utils::resolve_reference;
28use theater::TheaterRuntimeError;
29
30use theater_handler_message_server::MessageServerHandler;
33use theater_handler_runtime::RuntimeHandler;
34use theater_handler_store::StoreHandler;
35use theater_handler_supervisor::SupervisorHandler;
36use theater_handler_tcp::TcpHandler;
37
38use crate::fragmenting_codec::FragmentingCodec;
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum ManagementCommand {
42 StartActor {
43 manifest: String,
44 initial_state: Option<Vec<u8>>,
45 parent: bool,
46 subscribe: bool,
47 },
48 StopActor {
49 id: TheaterId,
50 },
51 TerminateActor {
52 id: TheaterId,
53 },
54 ListActors,
55 SubscribeToActor {
56 id: TheaterId,
57 },
58 UnsubscribeFromActor {
59 id: TheaterId,
60 subscription_id: Uuid,
61 },
62 SendActorMessage {
63 id: TheaterId,
64 data: Vec<u8>,
65 },
66 RequestActorMessage {
67 id: TheaterId,
68 data: Vec<u8>,
69 },
70 GetActorManifest {
71 id: TheaterId,
72 },
73 GetActorStatus {
74 id: TheaterId,
75 },
76 RestartActor {
77 id: TheaterId,
78 },
79 GetActorState {
80 id: TheaterId,
81 },
82 GetActorMetrics {
83 id: TheaterId,
84 },
85 UpdateActorPackage {
86 id: TheaterId,
87 package: String,
88 },
89 OpenChannel {
91 actor_id: ChannelParticipant,
92 initial_message: Vec<u8>,
93 },
94 SendOnChannel {
95 channel_id: String,
96 message: Vec<u8>,
97 },
98 CloseChannel {
99 channel_id: String,
100 },
101
102 NewStore {},
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[allow(clippy::large_enum_variant)]
108pub enum ManagementResponse {
109 ActorStarted {
110 id: TheaterId,
111 },
112 ActorStopped {
113 id: TheaterId,
114 },
115 ActorList {
116 actors: Vec<(TheaterId, String)>,
117 },
118 Subscribed {
119 id: TheaterId,
120 subscription_id: Uuid,
121 },
122 Unsubscribed {
123 id: TheaterId,
124 },
125 ActorEvent {
126 event: ChainEvent,
127 },
128 ActorResult(ActorResult),
129 Error {
130 error: ManagementError,
131 },
132 RequestedMessage {
133 id: TheaterId,
134 message: Vec<u8>,
135 },
136 SentMessage {
137 id: TheaterId,
138 },
139 ActorStatus {
140 id: TheaterId,
141 status: ActorStatus,
142 },
143 Restarted {
144 id: TheaterId,
145 },
146 ActorManifest {
147 id: TheaterId,
148 manifest: ManifestConfig,
149 },
150 ActorState {
151 id: TheaterId,
152 state: Value,
153 },
154 ActorMetrics {
155 id: TheaterId,
156 metrics: serde_json::Value,
157 },
158 ActorPackageUpdated {
159 id: TheaterId,
160 },
161 ChannelOpened {
163 channel_id: String,
164 actor_id: ChannelParticipant,
165 },
166 MessageSent {
167 channel_id: String,
168 },
169 ChannelMessage {
170 channel_id: String,
171 sender_id: ChannelParticipant,
172 message: Vec<u8>,
173 },
174 ChannelClosed {
175 channel_id: String,
176 },
177
178 StoreCreated {
180 store_id: String,
181 },
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub enum ManagementError {
186 ActorNotFound,
188 ActorAlreadyExists,
189 ActorNotRunning,
190 ActorError(String),
191
192 ChannelNotFound,
194 ChannelClosed,
195 ChannelRejected,
196
197 StoreError(String),
199
200 CommunicationError(String),
202
203 InvalidRequest(String),
205 Timeout,
206
207 RuntimeError(String),
209 InternalError(String),
210
211 SerializationError(String),
213
214 ActorInitializationError(String),
216}
217
218impl From<TheaterRuntimeError> for ManagementError {
220 fn from(err: TheaterRuntimeError) -> Self {
221 match err {
222 TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
223 TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
224 TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
225 TheaterRuntimeError::ActorOperationFailed(msg) => {
226 ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
227 }
228 TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
229 TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
230 TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
231 TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
232 TheaterRuntimeError::SerializationError(msg) => {
233 ManagementError::SerializationError(msg)
234 }
235 TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
236 TheaterRuntimeError::ActorInitializationError(msg) => {
237 ManagementError::ActorInitializationError(msg)
238 }
239 }
240 }
241}
242
243impl std::fmt::Display for ManagementError {
245 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
246 match self {
247 ManagementError::ActorNotFound => write!(f, "Actor not found"),
248 ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
249 ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
250 ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
251 ManagementError::ChannelNotFound => write!(f, "Channel not found"),
252 ManagementError::ChannelClosed => write!(f, "Channel is closed"),
253 ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
254 ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
255 ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
256 ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
257 ManagementError::Timeout => write!(f, "Operation timed out"),
258 ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
259 ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
260 ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
261 ManagementError::ActorInitializationError(msg) => {
262 write!(f, "Actor initialization error: {}", msg)
263 }
264 }
265 }
266}
267
268impl std::error::Error for ManagementError {}
270
271#[derive(Debug)]
272#[allow(dead_code)]
273struct Subscription {
274 id: Uuid,
275 client_tx: mpsc::Sender<ManagementResponse>,
276}
277
278impl Eq for Subscription {}
279impl PartialEq for Subscription {
280 fn eq(&self, other: &Self) -> bool {
281 self.id == other.id
282 }
283}
284impl std::hash::Hash for Subscription {
285 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
286 self.id.hash(state);
287 }
288}
289
290#[derive(Debug)]
294#[allow(dead_code)]
295struct ChannelSubscription {
296 channel_id: String,
297 initiator_id: ChannelParticipant,
298 target_id: ChannelParticipant,
299 client_tx: mpsc::Sender<ManagementResponse>,
300}
301
302fn create_root_handler_registry(
311 theater_tx: mpsc::Sender<TheaterCommand>,
312) -> (
313 HandlerRegistry,
314 theater_handler_message_server::MessageRouter,
315) {
316 let mut registry = HandlerRegistry::new();
317
318 info!("Initializing Theater server with Theater-specific handlers...");
319
320 let runtime_config = RuntimeHostConfig {};
322 registry.register(RuntimeHandler::new(
323 runtime_config,
324 theater_tx.clone(),
325 None,
326 ));
327
328 let store_config = StoreHandlerConfig::default();
330 registry.register(StoreHandler::new(store_config, None));
331
332 let supervisor_config = SupervisorHostConfig {};
334 registry.register(SupervisorHandler::new(supervisor_config, None));
335
336 let message_router = theater_handler_message_server::MessageRouter::new();
338 registry.register(MessageServerHandler::new(None, message_router.clone()));
339
340 let tcp_config = TcpHandlerConfig {
342 listen: None,
343 max_connections: None,
344 ..Default::default()
345 };
346 registry.register(TcpHandler::new(tcp_config));
347
348 info!("✓ 5 Theater-specific handlers registered");
349 info!("NOTE: WASI handlers are deprecated - see crates/deprecated/");
350
351 (registry, message_router)
352}
353
354pub struct TheaterServer {
355 runtime: TheaterRuntime,
356 theater_tx: mpsc::Sender<TheaterCommand>,
357 management_socket: TcpListener,
358 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
359 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
361 #[allow(dead_code)]
363 channel_events_tx: mpsc::Sender<ChannelEvent>,
364 message_router: theater_handler_message_server::MessageRouter,
366}
367
368impl TheaterServer {
369 async fn process_channel_events(
371 mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
372 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
373 ) {
374 while let Some(event) = channel_events_rx.recv().await {
375 match event {
376 ChannelEvent::Message {
377 channel_id,
378 sender_id,
379 message,
380 } => {
381 tracing::debug!("Received channel message for {}", channel_id);
382 let subs = channel_subscriptions.lock().await;
384 if let Some(sub) = subs.get(&channel_id.0) {
385 let response = ManagementResponse::ChannelMessage {
386 channel_id: channel_id.0.clone(),
387 sender_id,
388 message,
389 };
390
391 tracing::debug!("Forwarding channel message to client: {:?}", response);
392
393 if let Err(e) = sub.client_tx.send(response).await {
394 tracing::warn!("Failed to forward channel message: {}", e);
395 } else {
396 tracing::debug!("Forwarded channel message to client");
397 }
398 }
399 }
400 ChannelEvent::Close { channel_id } => {
401 tracing::debug!("Received channel close event for {}", channel_id);
402 let mut subs = channel_subscriptions.lock().await;
404 if let Some(sub) = subs.remove(&channel_id.0) {
405 let response = ManagementResponse::ChannelClosed {
406 channel_id: channel_id.0.clone(),
407 };
408
409 if let Err(e) = sub.client_tx.send(response).await {
410 tracing::warn!("Failed to forward channel close event: {}", e);
411 } else {
412 tracing::debug!("Forwarded channel close event to client");
413 }
414 }
415 }
416 }
417 }
418 }
419
420 pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
421 let (theater_tx, theater_rx) = mpsc::channel(32);
422
423 let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
425
426 let (handler_registry, message_router) = create_root_handler_registry(theater_tx.clone());
429
430 let runtime = TheaterRuntime::new(
432 theater_tx.clone(),
433 theater_rx,
434 Some(channel_events_tx.clone()),
435 handler_registry,
436 )
437 .await?;
438 let management_socket = TcpListener::bind(address).await?;
439
440 let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
441
442 let channel_subs_clone = channel_subscriptions.clone();
444 tokio::spawn(async move {
445 Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
446 });
447
448 Ok(Self {
449 runtime,
450 theater_tx,
451 management_socket,
452 subscriptions: Arc::new(Mutex::new(HashMap::new())),
453 channel_subscriptions,
454 channel_events_tx,
455 message_router,
456 })
457 }
458
459 pub async fn run(mut self) -> Result<()> {
460 info!(
461 "Theater server starting on {:?}",
462 self.management_socket.local_addr()?
463 );
464
465 let runtime_handle = tokio::spawn(async move {
467 match self.runtime.run().await {
468 Ok(_) => Ok(()),
469 Err(e) => {
470 error!("Theater runtime failed: {}", e);
471 Err(e)
472 }
473 }
474 });
475
476 while let Ok((socket, addr)) = self.management_socket.accept().await {
478 info!("New management connection from {}", addr);
479 let runtime_tx = self.theater_tx.clone();
480 let subscriptions = self.subscriptions.clone();
481 let channel_subscriptions = self.channel_subscriptions.clone();
482 let message_router = self.message_router.clone();
483
484 tokio::spawn(async move {
485 if let Err(e) = Self::handle_management_connection(
486 socket,
487 runtime_tx,
488 subscriptions,
489 channel_subscriptions,
490 message_router,
491 )
492 .await
493 {
494 error!("Error handling management connection: {}", e);
495 }
496 });
497 }
498
499 runtime_handle.await??;
500 Ok(())
501 }
502
503 async fn handle_management_connection(
504 socket: TcpStream,
505 runtime_tx: mpsc::Sender<TheaterCommand>,
506 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
507 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
508 message_router: theater_handler_message_server::MessageRouter,
509 ) -> Result<()> {
510 let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
512
513 let codec = FragmentingCodec::new();
514 let framed = Framed::new(socket, codec);
515
516 let (mut framed_sink, mut framed_stream) = framed.split();
518
519 let cmd_client_tx = client_tx.clone();
521
522 let _response_task = tokio::spawn(async move {
524 while let Some(response) = client_rx.recv().await {
525 match serde_json::to_vec(&response) {
526 Ok(data) => {
527 debug!("Serialized response: {} bytes", data.len());
528 if data.len() > 10 * 1024 * 1024 {
529 debug!("Large response detected: {} MB", data.len() / 1024 / 1024);
530 }
531 if let Err(e) = framed_sink.send(Bytes::from(data)).await {
532 debug!("Error sending response to client: {}", e);
533 break;
534 }
535 }
536 Err(e) => {
537 error!("Error serializing response: {}", e);
538 }
539 }
540 }
541 debug!("Response forwarder for client closed");
542 });
543
544 let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
546
547 let mut connection_channel_subscriptions: Vec<String> = Vec::new();
549
550 'connection: while let Some(msg) = framed_stream.next().await {
552 debug!("Received management message");
553 let msg = match msg {
554 Ok(m) => m,
555 Err(e) => {
556 error!("Error receiving message: {}", e);
557 break 'connection;
558 }
559 };
560
561 let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
562 Ok(c) => c,
563 Err(e) => {
564 error!(
565 "Error parsing command: {} {}",
566 e,
567 String::from_utf8_lossy(&msg)
568 );
569 continue;
570 }
571 };
572 debug!("Parsed command: {:?}", cmd);
573
574 let _cmd_clone = cmd.clone();
576
577 let response = match cmd {
578 ManagementCommand::StartActor {
579 manifest,
580 initial_state: _initial_state,
581 parent,
582 subscribe,
583 } => {
584 info!("Starting actor from manifest: {:?}", manifest);
585
586 let manifest_str = match resolve_reference(&manifest).await {
588 Ok(bytes) => match String::from_utf8(bytes) {
589 Ok(s) => s,
590 Err(e) => {
591 error!("Invalid manifest encoding: {}", e);
592 cmd_client_tx
593 .send(ManagementResponse::Error {
594 error: ManagementError::ActorInitializationError(format!(
595 "Invalid manifest encoding: {}",
596 e
597 )),
598 })
599 .await
600 .ok();
601 continue;
602 }
603 },
604 Err(e) => {
605 error!("Failed to load manifest: {}", e);
606 cmd_client_tx
607 .send(ManagementResponse::Error {
608 error: ManagementError::ActorInitializationError(format!(
609 "Failed to load manifest: {}",
610 e
611 )),
612 })
613 .await
614 .ok();
615 continue;
616 }
617 };
618
619 let manifest_config = match ManifestConfig::from_toml_str(&manifest_str) {
620 Ok(m) => m,
621 Err(e) => {
622 error!("Failed to parse manifest: {}", e);
623 cmd_client_tx
624 .send(ManagementResponse::Error {
625 error: ManagementError::ActorInitializationError(format!(
626 "Failed to parse manifest: {}",
627 e
628 )),
629 })
630 .await
631 .ok();
632 continue;
633 }
634 };
635
636 let wasm_bytes = match resolve_reference(&manifest_config.package).await {
638 Ok(bytes) => bytes,
639 Err(e) => {
640 error!("Failed to load WASM: {}", e);
641 cmd_client_tx
642 .send(ManagementResponse::Error {
643 error: ManagementError::ActorInitializationError(format!(
644 "Failed to load WASM: {}",
645 e
646 )),
647 })
648 .await
649 .ok();
650 continue;
651 }
652 };
653
654 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
655 debug!("Sending SpawnActor command to runtime");
656 let supervisor_tx = if parent {
657 let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
658 let cmd_client_tx = cmd_client_tx.clone();
659 tokio::spawn(async move {
660 while let Some(res) = supervisor_rx.recv().await {
661 debug!("Received supervisor response: {:?}", res);
662 if let Err(e) = cmd_client_tx
663 .send(ManagementResponse::ActorResult(res))
664 .await
665 {
666 error!("Failed to send supervisor response: {}", e);
667 break;
668 }
669 }
670 });
671 Some(supervisor_tx)
672 } else {
673 None
674 };
675 let subscription_tx = if subscribe {
676 let (event_tx, mut event_rx) = mpsc::channel(32);
677
678 let cmd_client_tx = cmd_client_tx.clone();
680 tokio::spawn(async move {
681 while let Some(event) = event_rx.recv().await {
682 debug!("Received event for subscription");
683 let response = match event {
684 Ok(event) => ManagementResponse::ActorEvent { event },
685 Err(_e) => {
686 debug!("Actor error received, but already in event chain");
689 continue;
690 }
691 };
692 if let Err(e) = cmd_client_tx.send(response).await {
693 debug!("Failed to forward event to client: {}", e);
694 break;
695 }
696 }
697 debug!("Event forwarder for subscription stopped");
698 });
699
700 Some(event_tx)
701 } else {
702 None
703 };
704 match runtime_tx
705 .send(TheaterCommand::SetupActor {
706 wasm_bytes,
707 name: Some(manifest_config.name.clone()),
708 manifest: Some(manifest_config),
709 init_state: default_init_state(),
710 response_tx: cmd_tx,
711 supervisor_tx,
712 subscription_tx,
713 })
714 .await
715 {
716 Ok(_) => {
717 debug!("SpawnActor command sent to runtime, awaiting response");
718 match cmd_rx.await {
719 Ok(result) => match result {
720 Ok(actor_id) => {
721 info!("Actor started with ID: {:?}", actor_id);
722 ManagementResponse::ActorStarted { id: actor_id }
723 }
724 Err(e) => {
725 error!("Runtime failed to start actor: {}", e);
726 ManagementResponse::Error {
727 error: ManagementError::RuntimeError(format!(
728 "Failed to start actor: {}",
729 e
730 )),
731 }
732 }
733 },
734 Err(e) => {
735 error!("Failed to receive spawn response: {}", e);
736 ManagementResponse::Error {
737 error: ManagementError::CommunicationError(format!(
738 "Failed to receive spawn response: {}",
739 e
740 )),
741 }
742 }
743 }
744 }
745 Err(e) => {
746 error!("Failed to send SpawnActor command: {}", e);
747 ManagementResponse::Error {
748 error: ManagementError::CommunicationError(format!(
749 "Failed to send spawn command: {}",
750 e
751 )),
752 }
753 }
754 }
755 }
756 ManagementCommand::StopActor { id } => {
757 info!("Stopping actor: {:?}", id);
758 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
759 runtime_tx
760 .send(TheaterCommand::StopActor {
761 actor_id: id,
762 response_tx: cmd_tx,
763 })
764 .await?;
765
766 match cmd_rx.await? {
767 Ok(_) => {
768 subscriptions.lock().await.remove(&id);
769 ManagementResponse::ActorStopped { id }
770 }
771 Err(e) => ManagementResponse::Error {
772 error: ManagementError::RuntimeError(format!(
773 "Failed to stop actor: {}",
774 e
775 )),
776 },
777 }
778 }
779 ManagementCommand::TerminateActor { id } => {
780 info!("Terminating actor: {:?}", id);
781 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
782 runtime_tx
783 .send(TheaterCommand::TerminateActor {
784 actor_id: id,
785 response_tx: cmd_tx,
786 })
787 .await?;
788
789 match cmd_rx.await? {
790 Ok(_) => {
791 subscriptions.lock().await.remove(&id);
792 ManagementResponse::ActorStopped { id }
793 }
794 Err(e) => ManagementResponse::Error {
795 error: ManagementError::RuntimeError(format!(
796 "Failed to terminate actor: {}",
797 e
798 )),
799 },
800 }
801 }
802 ManagementCommand::ListActors => {
803 debug!("Listing actors");
804 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
805 runtime_tx
806 .send(TheaterCommand::GetActors {
807 response_tx: cmd_tx,
808 })
809 .await?;
810
811 match cmd_rx.await? {
812 Ok(actors) => {
813 info!("Found {} actors", actors.len());
814 ManagementResponse::ActorList { actors }
815 }
816 Err(e) => ManagementResponse::Error {
817 error: ManagementError::RuntimeError(format!(
818 "Failed to list actors: {}",
819 e
820 )),
821 },
822 }
823 }
824 ManagementCommand::SubscribeToActor { id } => {
825 info!("New subscription request for actor: {:?}", id);
826 let subscription_id = Uuid::new_v4();
827 let subscription = Subscription {
828 id: subscription_id,
829 client_tx: cmd_client_tx.clone(),
830 };
831
832 debug!("Subscription created with ID: {}", subscription_id);
833
834 subscriptions
836 .lock()
837 .await
838 .entry(id)
839 .or_default()
840 .insert(subscription);
841
842 let (event_tx, mut event_rx) = mpsc::channel(32);
844 runtime_tx
845 .send(TheaterCommand::SubscribeToActor {
846 actor_id: id,
847 event_tx,
848 })
849 .await
850 .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
851
852 connection_subscriptions.push((id, subscription_id));
854
855 let client_tx_clone = cmd_client_tx.clone();
857 tokio::spawn(async move {
858 debug!(
859 "Starting event forwarder for subscription {}",
860 subscription_id
861 );
862 while let Some(event) = event_rx.recv().await {
863 debug!("Received event for subscription {}", subscription_id);
864 let response = match event {
865 Ok(event) => ManagementResponse::ActorEvent { event },
866 Err(_e) => {
867 debug!("Actor error received, but already in event chain");
870 continue;
871 }
872 };
873 if let Err(e) = client_tx_clone.send(response).await {
874 debug!("Failed to forward event to client: {}", e);
875 break;
876 }
877 }
878 debug!(
879 "Event forwarder for subscription {} stopped",
880 subscription_id
881 );
882 });
883
884 ManagementResponse::Subscribed {
885 id,
886 subscription_id,
887 }
888 }
889 ManagementCommand::UnsubscribeFromActor {
890 id,
891 subscription_id,
892 } => {
893 debug!(
894 "Removing subscription {} for actor {:?}",
895 subscription_id, id
896 );
897
898 connection_subscriptions
900 .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
901
902 let mut subs = subscriptions.lock().await;
904 if let Some(actor_subs) = subs.get_mut(&id) {
905 actor_subs.retain(|sub| sub.id != subscription_id);
906
907 if actor_subs.is_empty() {
909 subs.remove(&id);
910 }
911 }
912
913 debug!("Subscription removed");
914 ManagementResponse::Unsubscribed { id }
915 }
916 ManagementCommand::SendActorMessage { id, data } => {
917 info!("Sending message to actor: {:?}", id);
918
919 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
921
922 let message = ActorMessage::Send(ActorSend { data });
924
925 match message_router
927 .route_message(theater::messages::MessageCommand::SendMessage {
928 target_id: id,
929 message,
930 response_tx,
931 })
932 .await
933 {
934 Ok(_) => {
935 match response_rx.await {
937 Ok(Ok(())) => {
938 info!("Message sent successfully to actor: {:?}", id);
939 ManagementResponse::SentMessage { id }
940 }
941 Ok(Err(e)) => {
942 error!("Failed to send message to actor: {}", e);
943 ManagementResponse::Error {
944 error: ManagementError::RuntimeError(format!(
945 "Failed to send: {}",
946 e
947 )),
948 }
949 }
950 Err(e) => {
951 error!("Failed to receive routing response: {}", e);
952 ManagementResponse::Error {
953 error: ManagementError::CommunicationError(format!(
954 "Failed to receive routing response: {}",
955 e
956 )),
957 }
958 }
959 }
960 }
961 Err(e) => {
962 error!("Failed to route message: {}", e);
963 ManagementResponse::Error {
964 error: ManagementError::RuntimeError(format!(
965 "Failed to route message: {}",
966 e
967 )),
968 }
969 }
970 }
971 }
972 ManagementCommand::RequestActorMessage { id, data } => {
973 info!("Requesting message from actor: {:?}", id);
974
975 let (route_tx, route_rx) = tokio::sync::oneshot::channel();
977 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
978
979 let message = ActorMessage::Request(ActorRequest { data, response_tx });
981
982 match message_router
984 .route_message(theater::messages::MessageCommand::SendMessage {
985 target_id: id,
986 message,
987 response_tx: route_tx,
988 })
989 .await
990 {
991 Ok(_) => {
992 match route_rx.await {
994 Ok(Ok(())) => {
995 match response_rx.await {
997 Ok(response_data) => {
998 info!("Received response from actor: {:?}", id);
999 ManagementResponse::RequestedMessage {
1000 id,
1001 message: response_data,
1002 }
1003 }
1004 Err(e) => {
1005 error!("Actor didn't respond: {}", e);
1006 ManagementResponse::Error {
1007 error: ManagementError::RuntimeError(format!(
1008 "Actor didn't respond: {}",
1009 e
1010 )),
1011 }
1012 }
1013 }
1014 }
1015 Ok(Err(e)) => {
1016 error!("Failed to route request to actor: {}", e);
1017 ManagementResponse::Error {
1018 error: ManagementError::RuntimeError(format!(
1019 "Failed to route: {}",
1020 e
1021 )),
1022 }
1023 }
1024 Err(e) => {
1025 error!("Failed to receive routing response: {}", e);
1026 ManagementResponse::Error {
1027 error: ManagementError::CommunicationError(format!(
1028 "Failed to receive routing response: {}",
1029 e
1030 )),
1031 }
1032 }
1033 }
1034 }
1035 Err(e) => {
1036 error!("Failed to route request: {}", e);
1037 ManagementResponse::Error {
1038 error: ManagementError::RuntimeError(format!(
1039 "Failed to route request: {}",
1040 e
1041 )),
1042 }
1043 }
1044 }
1045 }
1046 ManagementCommand::GetActorManifest { id } => {
1047 info!("Getting manifest for actor: {:?}", id);
1048 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1049 runtime_tx
1050 .send(TheaterCommand::GetActorManifest {
1051 actor_id: id,
1052 response_tx: cmd_tx,
1053 })
1054 .await?;
1055
1056 let manifest = cmd_rx.await?;
1057 ManagementResponse::ActorManifest {
1058 id,
1059 manifest: manifest?,
1060 }
1061 }
1062 ManagementCommand::GetActorStatus { id } => {
1063 info!("Getting status for actor: {:?}", id);
1064 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1065 runtime_tx
1066 .send(TheaterCommand::GetActorStatus {
1067 actor_id: id,
1068 response_tx: cmd_tx,
1069 })
1070 .await?;
1071
1072 let status = cmd_rx.await?;
1073 ManagementResponse::ActorStatus {
1074 id,
1075 status: status?,
1076 }
1077 }
1078 ManagementCommand::RestartActor { id } => {
1079 info!("Restarting actor: {:?}", id);
1080 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1081 runtime_tx
1082 .send(TheaterCommand::RestartActor {
1083 actor_id: id,
1084 response_tx: cmd_tx,
1085 })
1086 .await?;
1087
1088 match cmd_rx.await? {
1089 Ok(_) => ManagementResponse::Restarted { id },
1090 Err(e) => ManagementResponse::Error {
1091 error: ManagementError::RuntimeError(format!(
1092 "Failed to restart actor: {}",
1093 e
1094 )),
1095 },
1096 }
1097 }
1098 ManagementCommand::GetActorState { id } => {
1099 info!("Getting state for actor: {:?}", id);
1100 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1101 runtime_tx
1102 .send(TheaterCommand::GetActorState {
1103 actor_id: id,
1104 response_tx: cmd_tx,
1105 })
1106 .await?;
1107
1108 let state = cmd_rx.await?;
1109 ManagementResponse::ActorState { id, state: state? }
1110 }
1111 ManagementCommand::GetActorMetrics { id } => {
1112 info!("Getting metrics for actor: {:?}", id);
1113 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1114 runtime_tx
1115 .send(TheaterCommand::GetActorMetrics {
1116 actor_id: id,
1117 response_tx: cmd_tx,
1118 })
1119 .await?;
1120
1121 let metrics = cmd_rx.await?;
1122 ManagementResponse::ActorMetrics {
1123 id,
1124 metrics: serde_json::to_value(metrics?)?,
1125 }
1126 }
1127 ManagementCommand::UpdateActorPackage { id: _, package: _ } => {
1128 ManagementResponse::Error {
1130 error: ManagementError::RuntimeError(
1131 "UpdateActorPackage not yet implemented".to_string(),
1132 ),
1133 }
1134 }
1135 ManagementCommand::OpenChannel {
1137 actor_id,
1138 initial_message,
1139 } => {
1140 info!("Opening channel to actor: {:?}", actor_id);
1141
1142 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1144
1145 let client_id = ChannelParticipant::External;
1147 let channel_id = ChannelId::new(&client_id, &actor_id);
1148 let channel_id_str = channel_id.0.clone();
1149
1150 message_router
1152 .route_message(theater::messages::MessageCommand::OpenChannel {
1153 initiator_id: client_id.clone(),
1154 target_id: actor_id.clone(),
1155 channel_id: channel_id.clone(),
1156 initial_message,
1157 response_tx,
1158 })
1159 .await
1160 .map_err(|e| {
1161 anyhow::anyhow!("Failed to send channel open command: {}", e)
1162 })?;
1163
1164 match response_rx.await {
1166 Ok(result) => {
1167 match result {
1168 Ok(accepted) => {
1169 if accepted {
1170 info!("Channel opened successfully: {}", channel_id_str);
1172
1173 let channel_sub = ChannelSubscription {
1175 channel_id: channel_id_str.clone(),
1176 initiator_id: client_id.clone(),
1177 target_id: actor_id.clone(),
1178 client_tx: cmd_client_tx.clone(),
1179 };
1180
1181 channel_subscriptions
1182 .lock()
1183 .await
1184 .insert(channel_id_str.clone(), channel_sub);
1185
1186 connection_channel_subscriptions
1188 .push(channel_id_str.clone());
1189
1190 ManagementResponse::ChannelOpened {
1191 channel_id: channel_id_str,
1192 actor_id,
1193 }
1194 } else {
1195 ManagementResponse::Error {
1197 error: ManagementError::ChannelRejected,
1198 }
1199 }
1200 }
1201 Err(e) => ManagementResponse::Error {
1202 error: ManagementError::RuntimeError(format!(
1203 "Error opening channel: {}",
1204 e
1205 )),
1206 },
1207 }
1208 }
1209 Err(e) => ManagementResponse::Error {
1210 error: ManagementError::CommunicationError(format!(
1211 "Failed to receive channel open response: {}",
1212 e
1213 )),
1214 },
1215 }
1216 }
1217 ManagementCommand::SendOnChannel {
1218 channel_id,
1219 message,
1220 } => {
1221 info!("Sending message on channel: {}", channel_id);
1222
1223 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1225
1226 let channel_id_parsed = ChannelId(channel_id.clone());
1228
1229 let sender_id = ChannelParticipant::External;
1231 match message_router
1232 .route_message(theater::messages::MessageCommand::ChannelMessage {
1233 channel_id: channel_id_parsed,
1234 sender_id,
1235 message,
1236 response_tx,
1237 })
1238 .await
1239 {
1240 Ok(_) => {
1241 match response_rx.await {
1243 Ok(Ok(())) => {
1244 info!("Message sent successfully on channel: {}", channel_id);
1245 ManagementResponse::MessageSent { channel_id }
1246 }
1247 Ok(Err(e)) => {
1248 error!("Failed to send on channel: {}", e);
1249 ManagementResponse::Error {
1250 error: ManagementError::RuntimeError(format!(
1251 "Failed to send on channel: {}",
1252 e
1253 )),
1254 }
1255 }
1256 Err(e) => {
1257 error!("Failed to receive channel send response: {}", e);
1258 ManagementResponse::Error {
1259 error: ManagementError::CommunicationError(format!(
1260 "Failed to receive channel send response: {}",
1261 e
1262 )),
1263 }
1264 }
1265 }
1266 }
1267 Err(e) => {
1268 error!("Failed to route channel message: {}", e);
1269 ManagementResponse::Error {
1270 error: ManagementError::RuntimeError(format!(
1271 "Failed to route channel message: {}",
1272 e
1273 )),
1274 }
1275 }
1276 }
1277 }
1278 ManagementCommand::CloseChannel { channel_id } => {
1279 info!("Closing channel: {}", channel_id);
1280
1281 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
1283
1284 let channel_id_parsed = ChannelId(channel_id.clone());
1286
1287 let sender_id = ChannelParticipant::External;
1289 match message_router
1290 .route_message(theater::messages::MessageCommand::ChannelClose {
1291 channel_id: channel_id_parsed,
1292 sender_id,
1293 response_tx,
1294 })
1295 .await
1296 {
1297 Ok(_) => {
1298 match response_rx.await {
1300 Ok(Ok(())) => {
1301 info!("Channel closed successfully: {}", channel_id);
1302
1303 channel_subscriptions.lock().await.remove(&channel_id);
1305 connection_channel_subscriptions.retain(|id| id != &channel_id);
1306
1307 ManagementResponse::ChannelClosed { channel_id }
1308 }
1309 Ok(Err(e)) => {
1310 error!("Failed to close channel: {}", e);
1311 ManagementResponse::Error {
1312 error: ManagementError::RuntimeError(format!(
1313 "Failed to close channel: {}",
1314 e
1315 )),
1316 }
1317 }
1318 Err(e) => {
1319 error!("Failed to receive channel close response: {}", e);
1320 ManagementResponse::Error {
1321 error: ManagementError::CommunicationError(format!(
1322 "Failed to receive channel close response: {}",
1323 e
1324 )),
1325 }
1326 }
1327 }
1328 }
1329 Err(e) => {
1330 error!("Failed to route channel close: {}", e);
1331 ManagementResponse::Error {
1332 error: ManagementError::RuntimeError(format!(
1333 "Failed to route channel close: {}",
1334 e
1335 )),
1336 }
1337 }
1338 }
1339 }
1340 ManagementCommand::NewStore {} => {
1341 info!("Creating new store");
1342 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1343 runtime_tx
1344 .send(TheaterCommand::NewStore {
1345 response_tx: cmd_tx,
1346 })
1347 .await?;
1348
1349 let store_id = cmd_rx.await?;
1350 ManagementResponse::StoreCreated {
1351 store_id: store_id?.id,
1352 }
1353 }
1354 };
1355
1356 debug!("Sending response: {:?}", response);
1357 if let Err(e) = client_tx.send(response).await {
1358 error!("Failed to send response: {}", e);
1359 break;
1360 }
1361 debug!("Response sent");
1362 }
1363
1364 debug!(
1366 "Connection closed, cleaning up {} subscriptions",
1367 connection_subscriptions.len()
1368 );
1369 let mut subs = subscriptions.lock().await;
1370
1371 for (actor_id, sub_id) in connection_subscriptions {
1372 if let Some(actor_subs) = subs.get_mut(&actor_id) {
1373 actor_subs.retain(|sub| sub.id != sub_id);
1374
1375 if actor_subs.is_empty() {
1377 subs.remove(&actor_id);
1378 }
1379 }
1380 }
1381
1382 debug!(
1384 "Connection closed, cleaning up {} channel subscriptions",
1385 connection_channel_subscriptions.len()
1386 );
1387 let mut channel_subs = channel_subscriptions.lock().await;
1388
1389 for channel_id in connection_channel_subscriptions {
1390 channel_subs.remove(&channel_id);
1391 }
1392
1393 debug!("Cleaned up all subscriptions for the connection");
1394 Ok(())
1395 }
1396}