1use anyhow::Result;
2use bytes::Bytes;
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use theater::messages::{
9 ActorMessage, ActorRequest, ActorResult, ActorSend, ActorStatus, ChannelEvent,
10 ChannelParticipant,
11};
12use theater::{ActorError, ChainEvent, ManifestConfig};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::sync::{mpsc, Mutex};
15use tokio_util::codec::Framed;
16use tracing::{debug, error, info};
17use uuid::Uuid;
18
19use theater::id::TheaterId;
20use theater::messages::{ChannelId, TheaterCommand};
21use theater::theater_runtime::TheaterRuntime;
22use theater::TheaterRuntimeError;
23
24use crate::fragmenting_codec::FragmentingCodec;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub enum ManagementCommand {
28 StartActor {
29 manifest: String,
30 initial_state: Option<Vec<u8>>,
31 parent: bool,
32 subscribe: bool,
33 },
34 StopActor {
35 id: TheaterId,
36 },
37 TerminateActor {
38 id: TheaterId,
39 },
40 ListActors,
41 SubscribeToActor {
42 id: TheaterId,
43 },
44 UnsubscribeFromActor {
45 id: TheaterId,
46 subscription_id: Uuid,
47 },
48 SendActorMessage {
49 id: TheaterId,
50 data: Vec<u8>,
51 },
52 RequestActorMessage {
53 id: TheaterId,
54 data: Vec<u8>,
55 },
56 GetActorManifest {
57 id: TheaterId,
58 },
59 GetActorStatus {
60 id: TheaterId,
61 },
62 RestartActor {
63 id: TheaterId,
64 },
65 GetActorState {
66 id: TheaterId,
67 },
68 GetActorEvents {
69 id: TheaterId,
70 },
71 GetActorMetrics {
72 id: TheaterId,
73 },
74 UpdateActorComponent {
75 id: TheaterId,
76 component: String,
77 },
78 OpenChannel {
80 actor_id: ChannelParticipant,
81 initial_message: Vec<u8>,
82 },
83 SendOnChannel {
84 channel_id: String,
85 message: Vec<u8>,
86 },
87 CloseChannel {
88 channel_id: String,
89 },
90
91 NewStore {},
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub enum ManagementResponse {
97 ActorStarted {
98 id: TheaterId,
99 },
100 ActorStopped {
101 id: TheaterId,
102 },
103 ActorList {
104 actors: Vec<(TheaterId, String)>,
105 },
106 Subscribed {
107 id: TheaterId,
108 subscription_id: Uuid,
109 },
110 Unsubscribed {
111 id: TheaterId,
112 },
113 ActorEvent {
114 event: ChainEvent,
115 },
116 ActorResult(ActorResult),
117 ActorError {
118 error: ActorError,
119 },
120 Error {
121 error: ManagementError,
122 },
123 RequestedMessage {
124 id: TheaterId,
125 message: Vec<u8>,
126 },
127 SentMessage {
128 id: TheaterId,
129 },
130 ActorStatus {
131 id: TheaterId,
132 status: ActorStatus,
133 },
134 Restarted {
135 id: TheaterId,
136 },
137 ActorManifest {
138 id: TheaterId,
139 manifest: ManifestConfig,
140 },
141 ActorState {
142 id: TheaterId,
143 state: Option<Vec<u8>>,
144 },
145 ActorEvents {
146 id: TheaterId,
147 events: Vec<ChainEvent>,
148 },
149 ActorMetrics {
150 id: TheaterId,
151 metrics: serde_json::Value,
152 },
153 ActorComponentUpdated {
154 id: TheaterId,
155 },
156 ChannelOpened {
158 channel_id: String,
159 actor_id: ChannelParticipant,
160 },
161 MessageSent {
162 channel_id: String,
163 },
164 ChannelMessage {
165 channel_id: String,
166 sender_id: ChannelParticipant,
167 message: Vec<u8>,
168 },
169 ChannelClosed {
170 channel_id: String,
171 },
172
173 StoreCreated {
175 store_id: String,
176 },
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub enum ManagementError {
181 ActorNotFound,
183 ActorAlreadyExists,
184 ActorNotRunning,
185 ActorError(String),
186
187 ChannelNotFound,
189 ChannelClosed,
190 ChannelRejected,
191
192 StoreError(String),
194
195 CommunicationError(String),
197
198 InvalidRequest(String),
200 Timeout,
201
202 RuntimeError(String),
204 InternalError(String),
205
206 SerializationError(String),
208}
209
210impl From<TheaterRuntimeError> for ManagementError {
212 fn from(err: TheaterRuntimeError) -> Self {
213 match err {
214 TheaterRuntimeError::ActorNotFound(_) => ManagementError::ActorNotFound,
215 TheaterRuntimeError::ActorAlreadyExists(_) => ManagementError::ActorAlreadyExists,
216 TheaterRuntimeError::ActorNotRunning(_) => ManagementError::ActorNotRunning,
217 TheaterRuntimeError::ActorOperationFailed(msg) => {
218 ManagementError::RuntimeError(format!("Actor operation failed: {}", msg))
219 }
220 TheaterRuntimeError::ActorError(e) => ManagementError::ActorError(e.to_string()),
221 TheaterRuntimeError::ChannelError(msg) => ManagementError::CommunicationError(msg),
222 TheaterRuntimeError::ChannelNotFound(_) => ManagementError::ChannelNotFound,
223 TheaterRuntimeError::ChannelRejected => ManagementError::ChannelRejected,
224 TheaterRuntimeError::SerializationError(msg) => {
225 ManagementError::SerializationError(msg)
226 }
227 TheaterRuntimeError::InternalError(msg) => ManagementError::InternalError(msg),
228 }
229 }
230}
231
232impl std::fmt::Display for ManagementError {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 match self {
236 ManagementError::ActorNotFound => write!(f, "Actor not found"),
237 ManagementError::ActorAlreadyExists => write!(f, "Actor already exists"),
238 ManagementError::ActorNotRunning => write!(f, "Actor is not running"),
239 ManagementError::ActorError(msg) => write!(f, "Actor error: {}", msg),
240 ManagementError::ChannelNotFound => write!(f, "Channel not found"),
241 ManagementError::ChannelClosed => write!(f, "Channel is closed"),
242 ManagementError::ChannelRejected => write!(f, "Channel was rejected"),
243 ManagementError::StoreError(msg) => write!(f, "Store error: {}", msg),
244 ManagementError::CommunicationError(msg) => write!(f, "Communication error: {}", msg),
245 ManagementError::InvalidRequest(msg) => write!(f, "Invalid request: {}", msg),
246 ManagementError::Timeout => write!(f, "Operation timed out"),
247 ManagementError::RuntimeError(msg) => write!(f, "Runtime error: {}", msg),
248 ManagementError::InternalError(msg) => write!(f, "Internal error: {}", msg),
249 ManagementError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
250 }
251 }
252}
253
254impl std::error::Error for ManagementError {}
256
257#[derive(Debug)]
258#[allow(dead_code)]
259struct Subscription {
260 id: Uuid,
261 client_tx: mpsc::Sender<ManagementResponse>,
262}
263
264impl Eq for Subscription {}
265impl PartialEq for Subscription {
266 fn eq(&self, other: &Self) -> bool {
267 self.id == other.id
268 }
269}
270impl std::hash::Hash for Subscription {
271 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
272 self.id.hash(state);
273 }
274}
275
276#[derive(Debug)]
280#[allow(dead_code)]
281struct ChannelSubscription {
282 channel_id: String,
283 initiator_id: ChannelParticipant,
284 target_id: ChannelParticipant,
285 client_tx: mpsc::Sender<ManagementResponse>,
286}
287
288pub struct TheaterServer {
289 runtime: TheaterRuntime,
290 theater_tx: mpsc::Sender<TheaterCommand>,
291 management_socket: TcpListener,
292 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
293 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
295 #[allow(dead_code)]
297 channel_events_tx: mpsc::Sender<ChannelEvent>,
298}
299
300impl TheaterServer {
301 async fn process_channel_events(
303 mut channel_events_rx: mpsc::Receiver<ChannelEvent>,
304 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
305 ) {
306 while let Some(event) = channel_events_rx.recv().await {
307 match event {
308 ChannelEvent::Message {
309 channel_id,
310 sender_id,
311 message,
312 } => {
313 tracing::debug!("Received channel message for {}", channel_id);
314 let subs = channel_subscriptions.lock().await;
316 if let Some(sub) = subs.get(&channel_id.0) {
317 let response = ManagementResponse::ChannelMessage {
318 channel_id: channel_id.0.clone(),
319 sender_id,
320 message,
321 };
322
323 tracing::debug!("Forwarding channel message to client: {:?}", response);
324
325 if let Err(e) = sub.client_tx.send(response).await {
326 tracing::warn!("Failed to forward channel message: {}", e);
327 } else {
328 tracing::debug!("Forwarded channel message to client");
329 }
330 }
331 }
332 ChannelEvent::Close { channel_id } => {
333 tracing::debug!("Received channel close event for {}", channel_id);
334 let mut subs = channel_subscriptions.lock().await;
336 if let Some(sub) = subs.remove(&channel_id.0) {
337 let response = ManagementResponse::ChannelClosed {
338 channel_id: channel_id.0.clone(),
339 };
340
341 if let Err(e) = sub.client_tx.send(response).await {
342 tracing::warn!("Failed to forward channel close event: {}", e);
343 } else {
344 tracing::debug!("Forwarded channel close event to client");
345 }
346 }
347 }
348 }
349 }
350 }
351
352 pub async fn new(address: std::net::SocketAddr) -> Result<Self> {
353 let (theater_tx, theater_rx) = mpsc::channel(32);
354
355 let (channel_events_tx, channel_events_rx) = mpsc::channel(32);
357
358 let runtime = TheaterRuntime::new(
360 theater_tx.clone(),
361 theater_rx,
362 Some(channel_events_tx.clone()),
363 theater::config::permissions::HandlerPermission::root(), )
365 .await?;
366 let management_socket = TcpListener::bind(address).await?;
367
368 let channel_subscriptions = Arc::new(Mutex::new(HashMap::new()));
369
370 let channel_subs_clone = channel_subscriptions.clone();
372 tokio::spawn(async move {
373 Self::process_channel_events(channel_events_rx, channel_subs_clone).await;
374 });
375
376 Ok(Self {
377 runtime,
378 theater_tx,
379 management_socket,
380 subscriptions: Arc::new(Mutex::new(HashMap::new())),
381 channel_subscriptions,
382 channel_events_tx,
383 })
384 }
385
386 pub async fn run(mut self) -> Result<()> {
387 info!(
388 "Theater server starting on {:?}",
389 self.management_socket.local_addr()?
390 );
391
392 let runtime_handle = tokio::spawn(async move {
394 match self.runtime.run().await {
395 Ok(_) => Ok(()),
396 Err(e) => {
397 error!("Theater runtime failed: {}", e);
398 Err(e)
399 }
400 }
401 });
402
403 while let Ok((socket, addr)) = self.management_socket.accept().await {
405 info!("New management connection from {}", addr);
406 let runtime_tx = self.theater_tx.clone();
407 let subscriptions = self.subscriptions.clone();
408 let channel_subscriptions = self.channel_subscriptions.clone();
409
410 tokio::spawn(async move {
411 if let Err(e) = Self::handle_management_connection(
412 socket,
413 runtime_tx,
414 subscriptions,
415 channel_subscriptions,
416 )
417 .await
418 {
419 error!("Error handling management connection: {}", e);
420 }
421 });
422 }
423
424 runtime_handle.await??;
425 Ok(())
426 }
427
428 async fn handle_management_connection(
429 socket: TcpStream,
430 runtime_tx: mpsc::Sender<TheaterCommand>,
431 subscriptions: Arc<Mutex<HashMap<TheaterId, HashSet<Subscription>>>>,
432 channel_subscriptions: Arc<Mutex<HashMap<String, ChannelSubscription>>>,
433 ) -> Result<()> {
434 let (client_tx, mut client_rx) = mpsc::channel::<ManagementResponse>(32);
436
437 let codec = FragmentingCodec::new();
438 let framed = Framed::new(socket, codec);
439
440 let (mut framed_sink, mut framed_stream) = framed.split();
442
443 let cmd_client_tx = client_tx.clone();
445
446 let _response_task = tokio::spawn(async move {
448 while let Some(response) = client_rx.recv().await {
449 match serde_json::to_vec(&response) {
450 Ok(data) => {
451 if let Err(e) = framed_sink.send(Bytes::from(data)).await {
452 debug!("Error sending response to client: {}", e);
453 break;
454 }
455 }
456 Err(e) => {
457 error!("Error serializing response: {}", e);
458 }
459 }
460 }
461 debug!("Response forwarder for client closed");
462 });
463
464 let mut connection_subscriptions: Vec<(TheaterId, Uuid)> = Vec::new();
466
467 let mut connection_channel_subscriptions: Vec<String> = Vec::new();
469
470 'connection: while let Some(msg) = framed_stream.next().await {
472 debug!("Received management message");
473 let msg = match msg {
474 Ok(m) => m,
475 Err(e) => {
476 error!("Error receiving message: {}", e);
477 break 'connection;
478 }
479 };
480
481 let cmd = match serde_json::from_slice::<ManagementCommand>(&msg) {
482 Ok(c) => c,
483 Err(e) => {
484 error!(
485 "Error parsing command: {} {}",
486 e,
487 String::from_utf8_lossy(&msg)
488 );
489 continue;
490 }
491 };
492 debug!("Parsed command: {:?}", cmd);
493
494 let _cmd_clone = cmd.clone();
496
497 let response = match cmd {
498 ManagementCommand::StartActor {
499 manifest,
500 initial_state,
501 parent,
502 subscribe,
503 } => {
504 info!("Starting actor from manifest: {:?}", manifest);
505 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
506 debug!("Sending SpawnActor command to runtime");
507 let supervisor_tx = if parent {
508 let (supervisor_tx, mut supervisor_rx) = mpsc::channel(32);
509 let cmd_client_tx = cmd_client_tx.clone();
510 tokio::spawn(async move {
511 while let Some(res) = supervisor_rx.recv().await {
512 debug!("Received supervisor response: {:?}", res);
513 if let Err(e) = cmd_client_tx
514 .send(ManagementResponse::ActorResult(res))
515 .await
516 {
517 error!("Failed to send supervisor response: {}", e);
518 break;
519 }
520 }
521 });
522 Some(supervisor_tx)
523 } else {
524 None
525 };
526 let subscription_tx = if subscribe {
527 let (event_tx, mut event_rx) = mpsc::channel(32);
528
529 let cmd_client_tx = cmd_client_tx.clone();
531 tokio::spawn(async move {
532 while let Some(event) = event_rx.recv().await {
533 debug!("Received event for subscription");
534 let response = match event {
535 Ok(event) => ManagementResponse::ActorEvent { event },
536 Err(e) => ManagementResponse::ActorError { error: e },
537 };
538 if let Err(e) = cmd_client_tx.send(response).await {
539 debug!("Failed to forward event to client: {}", e);
540 break;
541 }
542 }
543 debug!("Event forwarder for subscription stopped");
544 });
545
546 Some(event_tx)
547 } else {
548 None
549 };
550 match runtime_tx
551 .send(TheaterCommand::SpawnActor {
552 manifest_path: manifest.clone(),
553 init_bytes: initial_state,
554 response_tx: cmd_tx,
555 parent_id: None,
556 supervisor_tx,
557 subscription_tx,
558 })
559 .await
560 {
561 Ok(_) => {
562 debug!("SpawnActor command sent to runtime, awaiting response");
563 match cmd_rx.await {
564 Ok(result) => match result {
565 Ok(actor_id) => {
566 info!("Actor started with ID: {:?}", actor_id);
567 ManagementResponse::ActorStarted { id: actor_id }
568 }
569 Err(e) => {
570 error!("Runtime failed to start actor: {}", e);
571 ManagementResponse::Error {
572 error: ManagementError::RuntimeError(format!(
573 "Failed to start actor: {}",
574 e
575 )),
576 }
577 }
578 },
579 Err(e) => {
580 error!("Failed to receive spawn response: {}", e);
581 ManagementResponse::Error {
582 error: ManagementError::CommunicationError(format!(
583 "Failed to receive spawn response: {}",
584 e
585 )),
586 }
587 }
588 }
589 }
590 Err(e) => {
591 error!("Failed to send SpawnActor command: {}", e);
592 ManagementResponse::Error {
593 error: ManagementError::CommunicationError(format!(
594 "Failed to send spawn command: {}",
595 e
596 )),
597 }
598 }
599 }
600 }
601 ManagementCommand::StopActor { id } => {
602 info!("Stopping actor: {:?}", id);
603 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
604 runtime_tx
605 .send(TheaterCommand::StopActor {
606 actor_id: id.clone(),
607 response_tx: cmd_tx,
608 })
609 .await?;
610
611 match cmd_rx.await? {
612 Ok(_) => {
613 subscriptions.lock().await.remove(&id);
614 ManagementResponse::ActorStopped { id }
615 }
616 Err(e) => ManagementResponse::Error {
617 error: ManagementError::RuntimeError(format!(
618 "Failed to stop actor: {}",
619 e
620 )),
621 },
622 }
623 }
624 ManagementCommand::TerminateActor { id } => {
625 info!("Terminating actor: {:?}", id);
626 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
627 runtime_tx
628 .send(TheaterCommand::TerminateActor {
629 actor_id: id.clone(),
630 response_tx: cmd_tx,
631 })
632 .await?;
633
634 match cmd_rx.await? {
635 Ok(_) => {
636 subscriptions.lock().await.remove(&id);
637 ManagementResponse::ActorStopped { id }
638 }
639 Err(e) => ManagementResponse::Error {
640 error: ManagementError::RuntimeError(format!(
641 "Failed to terminate actor: {}",
642 e
643 )),
644 },
645 }
646 }
647 ManagementCommand::ListActors => {
648 debug!("Listing actors");
649 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
650 runtime_tx
651 .send(TheaterCommand::GetActors {
652 response_tx: cmd_tx,
653 })
654 .await?;
655
656 match cmd_rx.await? {
657 Ok(actors) => {
658 info!("Found {} actors", actors.len());
659 ManagementResponse::ActorList { actors }
660 }
661 Err(e) => ManagementResponse::Error {
662 error: ManagementError::RuntimeError(format!(
663 "Failed to list actors: {}",
664 e
665 )),
666 },
667 }
668 }
669 ManagementCommand::SubscribeToActor { id } => {
670 info!("New subscription request for actor: {:?}", id);
671 let subscription_id = Uuid::new_v4();
672 let subscription = Subscription {
673 id: subscription_id,
674 client_tx: cmd_client_tx.clone(),
675 };
676
677 debug!("Subscription created with ID: {}", subscription_id);
678
679 subscriptions
681 .lock()
682 .await
683 .entry(id.clone())
684 .or_default()
685 .insert(subscription);
686
687 let (event_tx, mut event_rx) = mpsc::channel(32);
689 runtime_tx
690 .send(TheaterCommand::SubscribeToActor {
691 actor_id: id.clone(),
692 event_tx,
693 })
694 .await
695 .map_err(|e| anyhow::anyhow!("Failed to subscribe: {}", e))?;
696
697 connection_subscriptions.push((id.clone(), subscription_id));
699
700 let client_tx_clone = cmd_client_tx.clone();
702 tokio::spawn(async move {
703 debug!(
704 "Starting event forwarder for subscription {}",
705 subscription_id
706 );
707 while let Some(event) = event_rx.recv().await {
708 debug!("Received event for subscription {}", subscription_id);
709 let response = match event {
710 Ok(event) => ManagementResponse::ActorEvent { event },
711 Err(e) => ManagementResponse::ActorError { error: e },
712 };
713 if let Err(e) = client_tx_clone.send(response).await {
714 debug!("Failed to forward event to client: {}", e);
715 break;
716 }
717 }
718 debug!(
719 "Event forwarder for subscription {} stopped",
720 subscription_id
721 );
722 });
723
724 ManagementResponse::Subscribed {
725 id,
726 subscription_id,
727 }
728 }
729 ManagementCommand::UnsubscribeFromActor {
730 id,
731 subscription_id,
732 } => {
733 debug!(
734 "Removing subscription {} for actor {:?}",
735 subscription_id, id
736 );
737
738 connection_subscriptions
740 .retain(|(aid, sid)| *aid != id || *sid != subscription_id);
741
742 let mut subs = subscriptions.lock().await;
744 if let Some(actor_subs) = subs.get_mut(&id) {
745 actor_subs.retain(|sub| sub.id != subscription_id);
746
747 if actor_subs.is_empty() {
749 subs.remove(&id);
750 }
751 }
752
753 debug!("Subscription removed");
754 ManagementResponse::Unsubscribed { id }
755 }
756 ManagementCommand::SendActorMessage { id, data } => {
757 info!("Sending message to actor: {:?}", id);
758 runtime_tx
759 .send(TheaterCommand::SendMessage {
760 actor_id: id.clone(),
761 actor_message: ActorMessage::Send(ActorSend { data: data.clone() }),
762 })
763 .await?;
764
765 ManagementResponse::SentMessage { id }
766 }
767 ManagementCommand::RequestActorMessage { id, data } => {
768 info!("Requesting message from actor: {:?}", id);
769 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
770 runtime_tx
771 .send(TheaterCommand::SendMessage {
772 actor_id: id.clone(),
773 actor_message: ActorMessage::Request(ActorRequest {
774 data: data.clone(),
775 response_tx: cmd_tx,
776 }),
777 })
778 .await?;
779
780 let response = cmd_rx.await?;
781 ManagementResponse::RequestedMessage {
782 id,
783 message: response,
784 }
785 }
786 ManagementCommand::GetActorManifest { id } => {
787 info!("Getting manifest for actor: {:?}", id);
788 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
789 runtime_tx
790 .send(TheaterCommand::GetActorManifest {
791 actor_id: id.clone(),
792 response_tx: cmd_tx,
793 })
794 .await?;
795
796 let manifest = cmd_rx.await?;
797 ManagementResponse::ActorManifest {
798 id,
799 manifest: manifest?,
800 }
801 }
802 ManagementCommand::GetActorStatus { id } => {
803 info!("Getting status for actor: {:?}", id);
804 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
805 runtime_tx
806 .send(TheaterCommand::GetActorStatus {
807 actor_id: id.clone(),
808 response_tx: cmd_tx,
809 })
810 .await?;
811
812 let status = cmd_rx.await?;
813 ManagementResponse::ActorStatus {
814 id,
815 status: status?,
816 }
817 }
818 ManagementCommand::RestartActor { id } => {
819 info!("Restarting actor: {:?}", id);
820 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
821 runtime_tx
822 .send(TheaterCommand::RestartActor {
823 actor_id: id.clone(),
824 response_tx: cmd_tx,
825 })
826 .await?;
827
828 match cmd_rx.await? {
829 Ok(_) => ManagementResponse::Restarted { id },
830 Err(e) => ManagementResponse::Error {
831 error: ManagementError::RuntimeError(format!(
832 "Failed to restart actor: {}",
833 e
834 )),
835 },
836 }
837 }
838 ManagementCommand::GetActorState { id } => {
839 info!("Getting state for actor: {:?}", id);
840 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
841 runtime_tx
842 .send(TheaterCommand::GetActorState {
843 actor_id: id.clone(),
844 response_tx: cmd_tx,
845 })
846 .await?;
847
848 let state = cmd_rx.await?;
849 ManagementResponse::ActorState { id, state: state? }
850 }
851 ManagementCommand::GetActorEvents { id } => {
852 info!("Getting events for actor: {:?}", id);
853 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
854 runtime_tx
855 .send(TheaterCommand::GetActorEvents {
856 actor_id: id.clone(),
857 response_tx: cmd_tx,
858 })
859 .await?;
860
861 match cmd_rx.await {
862 Ok(result) => match result {
863 Ok(events) => {
864 debug!(
865 "Successfully retrieved {} events for actor {}",
866 events.len(),
867 id
868 );
869 ManagementResponse::ActorEvents { id, events }
870 }
871 Err(e) => {
872 debug!("Error getting events for actor {}: {}", id, e);
873 ManagementResponse::Error { error: e.into() }
874 }
875 },
876 Err(e) => {
877 error!("Failed to receive events response: {}", e);
878 ManagementResponse::Error {
879 error: ManagementError::CommunicationError(format!(
880 "Failed to receive events response: {}",
881 e
882 )),
883 }
884 }
885 }
886 }
887 ManagementCommand::GetActorMetrics { id } => {
888 info!("Getting metrics for actor: {:?}", id);
889 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
890 runtime_tx
891 .send(TheaterCommand::GetActorMetrics {
892 actor_id: id.clone(),
893 response_tx: cmd_tx,
894 })
895 .await?;
896
897 let metrics = cmd_rx.await?;
898 ManagementResponse::ActorMetrics {
899 id,
900 metrics: serde_json::to_value(metrics?)?,
901 }
902 }
903 ManagementCommand::UpdateActorComponent { id, component } => {
904 info!("Updating component for actor {:?} to {}", id, component);
905 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
906 runtime_tx
907 .send(TheaterCommand::UpdateActorComponent {
908 actor_id: id.clone(),
909 component: component.clone(),
910 response_tx: cmd_tx,
911 })
912 .await?;
913
914 match cmd_rx.await? {
915 Ok(_) => ManagementResponse::ActorComponentUpdated { id },
916 Err(e) => ManagementResponse::Error {
917 error: ManagementError::RuntimeError(format!(
918 "Failed to update actor component: {}",
919 e
920 )),
921 },
922 }
923 }
924 ManagementCommand::OpenChannel {
926 actor_id,
927 initial_message,
928 } => {
929 info!("Opening channel to actor: {:?}", actor_id);
930
931 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
933
934 let client_id = ChannelParticipant::External;
936 let channel_id = ChannelId::new(&client_id, &actor_id);
937 let channel_id_str = channel_id.0.clone();
938
939 runtime_tx
941 .send(TheaterCommand::ChannelOpen {
942 initiator_id: client_id.clone(),
943 target_id: actor_id.clone(),
944 channel_id: channel_id.clone(),
945 initial_message,
946 response_tx,
947 })
948 .await
949 .map_err(|e| {
950 anyhow::anyhow!("Failed to send channel open command: {}", e)
951 })?;
952
953 match response_rx.await {
955 Ok(result) => {
956 match result {
957 Ok(accepted) => {
958 if accepted {
959 info!("Channel opened successfully: {}", channel_id_str);
961
962 let channel_sub = ChannelSubscription {
964 channel_id: channel_id_str.clone(),
965 initiator_id: client_id.clone(),
966 target_id: actor_id.clone(),
967 client_tx: cmd_client_tx.clone(),
968 };
969
970 channel_subscriptions
971 .lock()
972 .await
973 .insert(channel_id_str.clone(), channel_sub);
974
975 connection_channel_subscriptions
977 .push(channel_id_str.clone());
978
979 ManagementResponse::ChannelOpened {
980 channel_id: channel_id_str,
981 actor_id,
982 }
983 } else {
984 ManagementResponse::Error {
986 error: ManagementError::ChannelRejected,
987 }
988 }
989 }
990 Err(e) => ManagementResponse::Error {
991 error: ManagementError::RuntimeError(format!(
992 "Error opening channel: {}",
993 e
994 )),
995 },
996 }
997 }
998 Err(e) => ManagementResponse::Error {
999 error: ManagementError::CommunicationError(format!(
1000 "Failed to receive channel open response: {}",
1001 e
1002 )),
1003 },
1004 }
1005 }
1006 ManagementCommand::SendOnChannel {
1007 channel_id,
1008 message,
1009 } => {
1010 info!("Sending message on channel: {}", channel_id);
1011
1012 let channel_id_parsed = ChannelId(channel_id.clone());
1014 let client_id = ChannelParticipant::External;
1015
1016 runtime_tx
1018 .send(TheaterCommand::ChannelMessage {
1019 channel_id: channel_id_parsed,
1020 message,
1021 sender_id: client_id,
1022 })
1023 .await
1024 .map_err(|e| anyhow::anyhow!("Failed to send message on channel: {}", e))?;
1025
1026 ManagementResponse::MessageSent { channel_id }
1027 }
1028 ManagementCommand::CloseChannel { channel_id } => {
1029 info!("Closing channel: {}", channel_id);
1030
1031 let channel_id_parsed = ChannelId(channel_id.clone());
1033
1034 runtime_tx
1036 .send(TheaterCommand::ChannelClose {
1037 channel_id: channel_id_parsed,
1038 })
1039 .await
1040 .map_err(|e| anyhow::anyhow!("Failed to close channel: {}", e))?;
1041
1042 channel_subscriptions.lock().await.remove(&channel_id);
1044 connection_channel_subscriptions.retain(|id| id != &channel_id);
1045
1046 ManagementResponse::ChannelClosed { channel_id }
1047 }
1048 ManagementCommand::NewStore {} => {
1049 info!("Creating new store");
1050 let (cmd_tx, cmd_rx) = tokio::sync::oneshot::channel();
1051 runtime_tx
1052 .send(TheaterCommand::NewStore {
1053 response_tx: cmd_tx,
1054 })
1055 .await?;
1056
1057 let store_id = cmd_rx.await?;
1058 ManagementResponse::StoreCreated {
1059 store_id: store_id?.id,
1060 }
1061 }
1062 };
1063
1064 debug!("Sending response: {:?}", response);
1065 if let Err(e) = client_tx.send(response).await {
1066 error!("Failed to send response: {}", e);
1067 break;
1068 }
1069 debug!("Response sent");
1070 }
1071
1072 debug!(
1074 "Connection closed, cleaning up {} subscriptions",
1075 connection_subscriptions.len()
1076 );
1077 let mut subs = subscriptions.lock().await;
1078
1079 for (actor_id, sub_id) in connection_subscriptions {
1080 if let Some(actor_subs) = subs.get_mut(&actor_id) {
1081 actor_subs.retain(|sub| sub.id != sub_id);
1082
1083 if actor_subs.is_empty() {
1085 subs.remove(&actor_id);
1086 }
1087 }
1088 }
1089
1090 debug!(
1092 "Connection closed, cleaning up {} channel subscriptions",
1093 connection_channel_subscriptions.len()
1094 );
1095 let mut channel_subs = channel_subscriptions.lock().await;
1096
1097 for channel_id in connection_channel_subscriptions {
1098 channel_subs.remove(&channel_id);
1099 }
1100
1101 debug!("Cleaned up all subscriptions for the connection");
1102 Ok(())
1103 }
1104}