actr_runtime/lifecycle/
actr_system.rs

1//! ActrSystem - Generic-free infrastructure
2
3use actr_config::Config;
4use actr_framework::Workload;
5use actr_protocol::ActorResult;
6use std::sync::Arc;
7
8use super::ActrNode;
9use crate::context_factory::ContextFactory;
10
11// Use types from sub-crates
12use crate::wire::webrtc::{
13    ReconnectConfig, SignalingClient, SignalingConfig, WebSocketSignalingClient,
14};
15use actr_mailbox::{DeadLetterQueue, Mailbox};
16
17/// ActrSystem - Runtime infrastructure (generic-free)
18///
19/// # Design Philosophy
20/// - Phase 1: Create pure runtime framework
21/// - Knows nothing about business logic types
22/// - Transforms into ActrNode<W> via attach()
23pub struct ActrSystem {
24    /// Runtime configuration
25    config: Config,
26
27    /// SQLite persistent mailbox
28    mailbox: Arc<dyn Mailbox>,
29
30    /// Dead Letter Queue for poison messages
31    dlq: Arc<dyn DeadLetterQueue>,
32
33    /// Context factory (with inproc_gate ready, outproc_gate deferred)
34    context_factory: ContextFactory,
35
36    /// Signaling client
37    signaling_client: Arc<dyn SignalingClient>,
38
39    /// Network event channels (延迟创建,在 create_network_event_handle() 时创建)
40    /// attach() 时会 take 这些 channels 传递给 ActrNode
41    network_event_channels: std::sync::Mutex<
42        Option<(
43            tokio::sync::mpsc::Receiver<crate::lifecycle::network_event::NetworkEvent>,
44            tokio::sync::mpsc::Sender<crate::lifecycle::network_event::NetworkEventResult>,
45        )>,
46    >,
47}
48
49impl ActrSystem {
50    /// Create new ActrSystem
51    ///
52    /// # Errors
53    /// - Mailbox initialization failed
54    /// - Transport initialization failed
55    pub async fn new(config: Config) -> ActorResult<Self> {
56        tracing::info!("🚀 Initializing ActrSystem");
57
58        // Initialize Mailbox (using SqliteMailbox implementation)
59        let mailbox_path = config
60            .mailbox_path
61            .as_ref()
62            .map(|p| p.to_string_lossy().to_string())
63            .unwrap_or_else(|| ":memory:".to_string());
64
65        tracing::info!("📂 Mailbox database path: {}", mailbox_path);
66
67        let mailbox: Arc<dyn Mailbox> = Arc::new(
68            actr_mailbox::SqliteMailbox::new(&mailbox_path)
69                .await
70                .map_err(|e| {
71                    actr_protocol::ProtocolError::TransportError(format!(
72                        "Mailbox init failed: {e}"
73                    ))
74                })?,
75        );
76
77        // Initialize Dead Letter Queue
78        // Use same path as mailbox, DLQ will create separate table in same database
79        let dlq_path = if mailbox_path == ":memory:" {
80            ":memory:".to_string() // Separate in-memory DB for DLQ
81        } else {
82            format!("{mailbox_path}.dlq") // Separate file for DLQ
83        };
84
85        let dlq: Arc<dyn DeadLetterQueue> = Arc::new(
86            actr_mailbox::SqliteDeadLetterQueue::new_standalone(&dlq_path)
87                .await
88                .map_err(|e| {
89                    actr_protocol::ProtocolError::TransportError(format!("DLQ init failed: {e}"))
90                })?,
91        );
92        tracing::info!("✅ Dead Letter Queue initialized");
93
94        // Initialize signaling client (using WebSocketSignalingClient implementation)
95        let signaling_config = SignalingConfig {
96            server_url: config.signaling_url.clone(),
97            connection_timeout: 30,
98            heartbeat_interval: 30,
99            reconnect_config: ReconnectConfig::default(),
100            auth_config: None,
101        };
102
103        let client = Arc::new(WebSocketSignalingClient::new(signaling_config));
104        client.start_auto_reconnector(); // Start if reconnect_config.enabled = true
105        let signaling_client: Arc<dyn SignalingClient> = client;
106
107        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
108        // Initialize inproc infrastructure (Shell/Local communication - immediately available)
109        // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
110        use crate::outbound::{InprocOutGate, OutGate};
111        use crate::transport::InprocTransportManager;
112
113        // Create TWO separate InprocTransportManager instances for bidirectional communication
114        // This ensures Shell's pending_requests and Workload's pending_requests are separate
115
116        // Direction 1: Shell → Workload (REQUEST)
117        let shell_to_workload = Arc::new(InprocTransportManager::new());
118        tracing::debug!("✨ Created shell_to_workload InprocTransportManager");
119
120        // Direction 2: Workload → Shell (RESPONSE)
121        let workload_to_shell = Arc::new(InprocTransportManager::new());
122        tracing::debug!("✨ Created workload_to_shell InprocTransportManager");
123
124        // Shell uses shell_to_workload for sending
125        let inproc_gate =
126            OutGate::InprocOut(Arc::new(InprocOutGate::new(shell_to_workload.clone())));
127
128        // Create DataStreamRegistry for DataStream callbacks
129        let data_stream_registry = Arc::new(crate::inbound::DataStreamRegistry::new());
130        tracing::debug!("✨ Created DataStreamRegistry");
131
132        // Create MediaFrameRegistry for MediaTrack callbacks
133        let media_frame_registry = Arc::new(crate::inbound::MediaFrameRegistry::new());
134        tracing::debug!("✨ Created MediaFrameRegistry");
135
136        // ContextFactory holds both managers and registries
137        let context_factory = ContextFactory::new(
138            inproc_gate,
139            shell_to_workload.clone(),
140            workload_to_shell.clone(),
141            data_stream_registry,
142            media_frame_registry,
143            signaling_client.clone(),
144        );
145
146        tracing::info!("✅ Inproc infrastructure initialized (bidirectional Shell ↔ Workload)");
147
148        tracing::info!("✅ ActrSystem initialized");
149
150        Ok(Self {
151            config,
152            mailbox,
153            dlq,
154            context_factory,
155            signaling_client,
156            network_event_channels: std::sync::Mutex::new(None),
157        })
158    }
159
160    /// 创建网络事件处理基础设施(按需调用)
161    ///
162    /// 创建 NetworkEventHandle 和内部 channels。
163    /// channels 会存储在结构体中,供 attach() 使用。
164    ///
165    /// # 注意
166    /// - 只能调用一次
167    /// - 如果不调用此方法,网络事件功能将不可用
168    ///
169    /// # Panics
170    /// 如果已经调用过此方法,会 panic
171    pub fn create_network_event_handle(&self) -> crate::lifecycle::NetworkEventHandle {
172        // 创建双向 channels
173        let (event_tx, event_rx) = tokio::sync::mpsc::channel(100);
174        let (result_tx, result_rx) = tokio::sync::mpsc::channel(100);
175
176        // 存储 channels(供 attach() 使用)
177        let mut channels = self
178            .network_event_channels
179            .lock()
180            .expect("Failed to lock network_event_channels");
181
182        if channels.is_some() {
183            panic!("create_network_event_handle() can only be called once");
184        }
185
186        *channels = Some((event_rx, result_tx));
187
188        // 创建并返回 NetworkEventHandle
189        crate::lifecycle::NetworkEventHandle::new(event_tx, result_rx)
190    }
191
192    /// Attach Workload, transform into ActrNode<W>
193    ///
194    /// # Type Inference
195    /// - Infer W::Dispatcher from W
196    /// - Compiler monomorphizes ActrNode<W>
197    /// - Completely zero-dyn, full inline chain
198    ///
199    /// # Consumes self
200    /// - Move ensures can only be called once
201    /// - Embodies one-actor-per-instance principle
202    pub fn attach<W: Workload>(self, workload: W) -> ActrNode<W> {
203        tracing::info!("📦 Attaching workload");
204
205        // Load Actr.lock.toml from config directory (REQUIRED)
206        let actr_lock_path = self.config.config_dir.join("Actr.lock.toml");
207        let actr_lock = match actr_config::lock::LockFile::from_file(&actr_lock_path) {
208            Ok(lock) => {
209                tracing::info!(
210                    "📋 Loaded Actr.lock.toml with {} dependencies",
211                    lock.dependencies.len()
212                );
213                Some(lock)
214            }
215            Err(e) => {
216                // Terminate if lock file is missing or invalid
217                tracing::error!(
218                    "❌ FAILED TO LOAD Actr.lock.toml with path {:?}, ERR: {}",
219                    actr_lock_path,
220                    e
221                );
222                panic!("Actr.lock.toml is missing or invalid. Run 'actr install' to generate it.");
223            }
224        };
225        // 从 network_event_channels 中 take channels(如果存在)
226        let (network_event_rx, network_event_result_tx) = self
227            .network_event_channels
228            .lock()
229            .expect("Failed to lock network_event_channels")
230            .take()
231            .map(|(rx, tx)| (Some(rx), Some(tx)))
232            .unwrap_or((None, None));
233
234        ActrNode {
235            config: self.config,
236            workload: Arc::new(workload),
237            mailbox: self.mailbox,
238            dlq: self.dlq,
239            context_factory: Some(self.context_factory), // Initialized with inproc_gate ready
240            signaling_client: self.signaling_client,
241            actor_id: None,              // Obtained after startup
242            credential_state: None,      // Obtained after startup
243            psk: None,                   // Obtained after startup (for TURN auth)
244            webrtc_coordinator: None,    // Pass shared coordinator
245            webrtc_gate: None,           // Created after startup
246            inproc_mgr: None,            // Set after startup
247            workload_to_shell_mgr: None, // Set after startup
248            shutdown_token: tokio_util::sync::CancellationToken::new(),
249            actr_lock,
250            network_event_rx,
251            network_event_result_tx,
252        }
253    }
254}