actr_runtime/lifecycle/
actr_system.rs1use actr_config::Config;
4use actr_framework::Workload;
5use actr_protocol::ActorResult;
6use std::sync::Arc;
7
8use super::ActrNode;
9use crate::context_factory::ContextFactory;
10
11use crate::wire::webrtc::{
13 ReconnectConfig, SignalingClient, SignalingConfig, WebSocketSignalingClient,
14};
15use actr_mailbox::{DeadLetterQueue, Mailbox};
16
17pub struct ActrSystem {
24 config: Config,
26
27 mailbox: Arc<dyn Mailbox>,
29
30 dlq: Arc<dyn DeadLetterQueue>,
32
33 context_factory: ContextFactory,
35
36 signaling_client: Arc<dyn SignalingClient>,
38}
39
40impl ActrSystem {
41 pub async fn new(config: Config) -> ActorResult<Self> {
47 tracing::info!("🚀 Initializing ActrSystem");
48
49 let mailbox_path = config
51 .mailbox_path
52 .as_ref()
53 .map(|p| p.to_string_lossy().to_string())
54 .unwrap_or_else(|| ":memory:".to_string());
55
56 tracing::info!("📂 Mailbox database path: {}", mailbox_path);
57
58 let mailbox: Arc<dyn Mailbox> = Arc::new(
59 actr_mailbox::SqliteMailbox::new(&mailbox_path)
60 .await
61 .map_err(|e| {
62 actr_protocol::ProtocolError::TransportError(format!(
63 "Mailbox init failed: {e}"
64 ))
65 })?,
66 );
67
68 let dlq_path = if mailbox_path == ":memory:" {
71 ":memory:".to_string() } else {
73 format!("{mailbox_path}.dlq") };
75
76 let dlq: Arc<dyn DeadLetterQueue> = Arc::new(
77 actr_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
78 .await
79 .map_err(|e| {
80 actr_protocol::ProtocolError::TransportError(format!("DLQ init failed: {e}"))
81 })?,
82 );
83 tracing::info!("✅ Dead Letter Queue initialized");
84
85 use crate::outbound::{InprocOutGate, OutGate};
89 use crate::transport::InprocTransportManager;
90
91 let shell_to_workload = Arc::new(InprocTransportManager::new());
96 tracing::debug!("✨ Created shell_to_workload InprocTransportManager");
97
98 let workload_to_shell = Arc::new(InprocTransportManager::new());
100 tracing::debug!("✨ Created workload_to_shell InprocTransportManager");
101
102 let inproc_gate =
104 OutGate::InprocOut(Arc::new(InprocOutGate::new(shell_to_workload.clone())));
105
106 let data_stream_registry = Arc::new(crate::inbound::DataStreamRegistry::new());
108 tracing::debug!("✨ Created DataStreamRegistry");
109
110 let media_frame_registry = Arc::new(crate::inbound::MediaFrameRegistry::new());
112 tracing::debug!("✨ Created MediaFrameRegistry");
113
114 let context_factory = ContextFactory::new(
116 inproc_gate,
117 shell_to_workload.clone(),
118 workload_to_shell.clone(),
119 data_stream_registry,
120 media_frame_registry,
121 );
122
123 tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Workload)");
124
125 let signaling_config = SignalingConfig {
127 server_url: config.signaling_url.clone(),
128 connection_timeout: 30,
129 heartbeat_interval: 30,
130 reconnect_config: ReconnectConfig::default(),
131 auth_config: None,
132 };
133 let signaling_client: Arc<dyn SignalingClient> =
134 Arc::new(WebSocketSignalingClient::new(signaling_config));
135
136 tracing::info!("✅ ActrSystem initialized");
137
138 Ok(Self {
139 config,
140 mailbox,
141 dlq,
142 context_factory,
143 signaling_client,
144 })
145 }
146
147 pub fn attach<W: Workload>(self, workload: W) -> ActrNode<W> {
158 tracing::info!("📦 Attaching workload");
159
160 ActrNode {
161 config: self.config,
162 workload: Arc::new(workload),
163 mailbox: self.mailbox,
164 dlq: self.dlq,
165 context_factory: Some(self.context_factory), signaling_client: self.signaling_client,
167 actor_id: None, credential: None, webrtc_coordinator: None, webrtc_gate: None, inproc_mgr: None, workload_to_shell_mgr: None, shutdown_token: tokio_util::sync::CancellationToken::new(),
174 }
175 }
176}