actr_runtime/lifecycle/
actr_system.rs1use actr_config::Config;
4use actr_framework::Workload;
5use actr_protocol::ActorResult;
6use std::sync::Arc;
7
8use super::ActrNode;
9use crate::context_factory::ContextFactory;
10
11use crate::wire::webrtc::{
13 ReconnectConfig, SignalingClient, SignalingConfig, WebSocketSignalingClient,
14};
15use actr_mailbox::{DeadLetterQueue, Mailbox};
16
17pub struct ActrSystem {
24 config: Config,
26
27 mailbox: Arc<dyn Mailbox>,
29
30 dlq: Arc<dyn DeadLetterQueue>,
32
33 context_factory: ContextFactory,
35
36 signaling_client: Arc<dyn SignalingClient>,
38
39 network_event_channels: std::sync::Mutex<
42 Option<(
43 tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>,
44 tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>,
45 )>,
46 >,
47}
48
49impl ActrSystem {
50 pub async fn new(config: Config) -> ActorResult<Self> {
56 tracing::info!("🚀 Initializing ActrSystem");
57
58 let mailbox_path = config
60 .mailbox_path
61 .as_ref()
62 .map(|p| p.to_string_lossy().to_string())
63 .unwrap_or_else(|| ":memory:".to_string());
64
65 tracing::info!("📂 Mailbox database path: {}", mailbox_path);
66
67 let mailbox: Arc<dyn Mailbox> = Arc::new(
68 actr_mailbox::SqliteMailbox::new(&mailbox_path)
69 .await
70 .map_err(|e| {
71 actr_protocol::ProtocolError::TransportError(format!(
72 "Mailbox init failed: {e}"
73 ))
74 })?,
75 );
76
77 let dlq_path = if mailbox_path == ":memory:" {
80 ":memory:".to_string() } else {
82 format!("{mailbox_path}.dlq") };
84
85 let dlq: Arc<dyn DeadLetterQueue> = Arc::new(
86 actr_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
87 .await
88 .map_err(|e| {
89 actr_protocol::ProtocolError::TransportError(format!("DLQ init failed: {e}"))
90 })?,
91 );
92 tracing::info!("✅ Dead Letter Queue initialized");
93
94 let signaling_config = SignalingConfig {
96 server_url: config.signaling_url.clone(),
97 connection_timeout: 30,
98 heartbeat_interval: 30,
99 reconnect_config: ReconnectConfig::default(),
100 auth_config: None,
101 };
102
103 let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
104 client.start_auto_reconnector(); let signaling_client: Arc<dyn SignalingClient> = client;
106
107 use crate::outbound::{InprocOutGate, OutGate};
111 use crate::transport::InprocTransportManager;
112
113 let shell_to_workload = Arc::new(InprocTransportManager::new());
118 tracing::debug!("✨ Created shell_to_workload InprocTransportManager");
119
120 let workload_to_shell = Arc::new(InprocTransportManager::new());
122 tracing::debug!("✨ Created workload_to_shell InprocTransportManager");
123
124 let inproc_gate =
126 OutGate::InprocOut(Arc::new(InprocOutGate::new(shell_to_workload.clone())));
127
128 let data_stream_registry = Arc::new(crate::inbound::DataStreamRegistry::new());
130 tracing::debug!("✨ Created DataStreamRegistry");
131
132 let media_frame_registry = Arc::new(crate::inbound::MediaFrameRegistry::new());
134 tracing::debug!("✨ Created MediaFrameRegistry");
135
136 let context_factory = ContextFactory::new(
138 inproc_gate,
139 shell_to_workload.clone(),
140 workload_to_shell.clone(),
141 data_stream_registry,
142 media_frame_registry,
143 signaling_client.clone(),
144 );
145
146 tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Workload)");
147
148 tracing::info!("✅ ActrSystem initialized");
149
150 Ok(Self {
151 config,
152 mailbox,
153 dlq,
154 context_factory,
155 signaling_client,
156 network_event_channels: std::sync::Mutex::new(None),
157 })
158 }
159
160 pub fn create_network_event_handle(&self) -> crate::lifecycle::NetworkEventHandle {
172 let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
174 let (result_tx, result_rx) = tokio::sync::mpsc::channel(100);
175
176 let mut channels = self
178 .network_event_channels
179 .lock()
180 .expect("Failed to lock network_event_channels");
181
182 if channels.is_some() {
183 panic!("create_network_event_handle() can only be called once");
184 }
185
186 *channels = Some((event_rx, result_tx));
187
188 crate::lifecycle::NetworkEventHandle::new(event_tx, result_rx)
190 }
191
192 pub fn attach<W: Workload>(self, workload: W) -> ActrNode<W> {
203 tracing::info!("📦 Attaching workload");
204
205 let actr_lock_path = self.config.config_dir.join("Actr.lock.toml");
207 let actr_lock = match actr_config::lock::LockFile::from_file(&actr_lock_path) {
208 Ok(lock) => {
209 tracing::info!(
210 "📋 Loaded Actr.lock.toml with {} dependencies",
211 lock.dependencies.len()
212 );
213 Some(lock)
214 }
215 Err(e) => {
216 tracing::error!(
218 "❌ FAILED TO LOAD Actr.lock.toml with path {:?}, ERR: {}",
219 actr_lock_path,
220 e
221 );
222 panic!("Actr.lock.toml is missing or invalid. Run 'actr install' to generate it.");
223 }
224 };
225 let (network_event_rx, network_event_result_tx) = self
227 .network_event_channels
228 .lock()
229 .expect("Failed to lock network_event_channels")
230 .take()
231 .map(|(rx, tx)| (Some(rx), Some(tx)))
232 .unwrap_or((None, None));
233
234 ActrNode {
235 config: self.config,
236 workload: Arc::new(workload),
237 mailbox: self.mailbox,
238 dlq: self.dlq,
239 context_factory: Some(self.context_factory), signaling_client: self.signaling_client,
241 actor_id: None, credential_state: None, psk: None, webrtc_coordinator: None, webrtc_gate: None, inproc_mgr: None, workload_to_shell_mgr: None, shutdown_token: tokio_util::sync::CancellationToken::new(),
249 actr_lock,
250 network_event_rx,
251 network_event_result_tx,
252 }
253 }
254}