actr_runtime/
actr_ref.rs

1//! ActrRef - Lightweight reference to a running Actor
2//!
3//! # Design Philosophy
4//!
5//! `ActrRef` is the primary handle for interacting with a running Actor.
6//! It provides:
7//!
8//! - **RPC calls**: Call Actor methods (Shell → Workload)
9//! - **Event subscription**: Subscribe to Actor events (Workload → Shell)
10//! - **Lifecycle control**: Shutdown and wait for completion
11//!
12//! # Key Characteristics
13//!
14//! - **Cloneable**: Can be shared across tasks
15//! - **Lightweight**: Contains only an `Arc` to shared state
16//! - **Auto-cleanup**: Last `ActrRef` drop triggers resource cleanup
17//! - **Code-gen friendly**: RPC methods will be generated and bound to this type
18//!
19//! # Usage
20//!
21//! ```rust,ignore
22//! let actr = node.start().await?;
23//!
24//! // Clone and use in different tasks
25//! let actr1 = actr.clone();
26//! tokio::spawn(async move {
27//!     actr1.call(SomeRequest { ... }).await?;
28//! });
29//!
30//! // Subscribe to events
31//! let mut events = actr.events();
32//! while let Some(event) = events.next().await {
33//!     println!("Event: {:?}", event);
34//! }
35//!
36//! // Shutdown
37//! actr.shutdown();
38//! actr.wait_for_shutdown().await;
39//! ```
40
41use std::sync::Arc;
42use std::time::Duration;
43use tokio::sync::Mutex;
44use tokio::task::JoinHandle;
45use tokio_util::sync::CancellationToken;
46
47use actr_framework::{Bytes, Workload};
48use actr_protocol::prost::Message as ProstMessage;
49use actr_protocol::{ActorResult, ActrError, ActrId, ProtocolError, RpcEnvelope};
50
51use crate::lifecycle::ActrNode;
52use crate::outbound::InprocOutGate;
53
54/// ActrRef - Lightweight reference to a running Actor
55///
56/// This is the primary handle returned by `ActrNode::start()`.
57///
58/// # Code Generation Pattern
59///
60/// `actr-cli` code generator will generate type-safe RPC methods for `ActrRef`.
61///
62/// ## Proto Definition
63///
64/// ```protobuf
65/// service EchoService {
66///   rpc Echo(EchoRequest) returns (EchoResponse);
67///   rpc Ping(PingRequest) returns (PingResponse);
68/// }
69/// ```
70///
71/// ## Generated Code (in `generated/echo_service_actr_ref.rs`)
72///
73/// ```rust,ignore
74/// use actr_runtime::ActrRef;
75/// use super::echo_service_actor::{EchoServiceWorkload, EchoServiceHandler};
76/// use super::echo::{EchoRequest, EchoResponse, PingRequest, PingResponse};
77///
78/// impl<T: EchoServiceHandler> ActrRef<EchoServiceWorkload<T>> {
79///     /// Call Echo RPC method
80///     pub async fn echo(&self, request: EchoRequest) -> ActorResult<EchoResponse> {
81///         self.call(request).await
82///     }
83///
84///     /// Call Ping RPC method
85///     pub async fn ping(&self, request: PingRequest) -> ActorResult<PingResponse> {
86///         self.call(request).await
87///     }
88/// }
89/// ```
90///
91/// ## Usage in Shell
92///
93/// ```rust,ignore
94/// use generated::echo_service_actr_ref::*;  // Import ActrRef extensions
95///
96/// let actr = node.start().await?;
97///
98/// // Type-safe RPC calls (generated methods)
99/// let response = actr.echo(EchoRequest {
100///     message: "Hello".to_string(),
101/// }).await?;
102///
103/// // Or use generic call() method
104/// let response: EchoResponse = actr.call(EchoRequest { ... }).await?;
105/// ```
106///
107/// # Design Rationale
108///
109/// **Why bind RPC methods to ActrRef?**
110///
111/// 1. **Type Safety**: Compiler checks request/response types
112/// 2. **Auto-completion**: IDE shows available RPC methods
113/// 3. **No target needed**: ActrRef already knows its target Actor
114/// 4. **Symmetric to Context**: Similar to Context extension pattern
115///
116/// **Comparison with Context pattern:**
117///
118/// | Aspect | Context (in Workload) | ActrRef (in Shell) |
119/// |--------|----------------------|-------------------|
120/// | Caller | Workload | Shell |
121/// | Target | Any Actor (needs `target` param) | This Workload (fixed) |
122/// | Method | `ctx.call(target, req)` | `actr.echo(req)` |
123/// | Generation | Extension trait | Concrete impl |
124pub struct ActrRef<W: Workload> {
125    pub(crate) shared: Arc<ActrRefShared>,
126    pub(crate) node: Arc<ActrNode<W>>,
127}
128
129impl<W: Workload> Clone for ActrRef<W> {
130    fn clone(&self) -> Self {
131        Self {
132            shared: Arc::clone(&self.shared),
133            node: Arc::clone(&self.node),
134        }
135    }
136}
137
138/// Shared state between all ActrRef clones
139///
140/// This is an internal implementation detail. When the last `ActrRef` is dropped,
141/// this struct's `Drop` impl will trigger shutdown and cleanup all resources.
142pub(crate) struct ActrRefShared {
143    /// Actor ID
144    pub(crate) actor_id: ActrId,
145
146    /// Inproc gate for Shell → Workload RPC
147    pub(crate) inproc_gate: Arc<InprocOutGate>,
148
149    /// Shutdown signal
150    pub(crate) shutdown_token: CancellationToken,
151
152    /// Background task handles (receive loops, WebRTC coordinator, etc.)
153    pub(crate) task_handles: Mutex<Vec<JoinHandle<()>>>,
154}
155
156impl<W: Workload> ActrRef<W> {
157    /// Create new ActrRef from shared state
158    ///
159    /// This is an internal API used by `ActrNode::start()`.
160    pub(crate) fn new(shared: Arc<ActrRefShared>, node: Arc<ActrNode<W>>) -> Self {
161        Self { shared, node }
162    }
163
164    /// Get Actor ID
165    pub fn actor_id(&self) -> &ActrId {
166        &self.shared.actor_id
167    }
168
169    /// Discover remote actors of the specified type via signaling server.
170    pub async fn discover_route_candidates(
171        &self,
172        target_type: &actr_protocol::ActrType,
173        candidate_count: u32,
174    ) -> ActorResult<Vec<ActrId>> {
175        self.node
176            .discover_route_candidates(target_type, candidate_count)
177            .await
178    }
179
180    /// Call Actor method (Shell → Workload RPC)
181    ///
182    /// This is a generic method used by code-generated RPC methods.
183    /// Most users should use the generated methods instead.
184    ///
185    /// # Example
186    ///
187    /// ```rust,ignore
188    /// // Generic call
189    /// let response: EchoResponse = actr.call(EchoRequest {
190    ///     message: "Hello".to_string(),
191    /// }).await?;
192    ///
193    /// // Generated method (preferred)
194    /// let response = actr.echo(EchoRequest {
195    ///     message: "Hello".to_string(),
196    /// }).await?;
197    /// ```
198    pub async fn call<R>(&self, request: R) -> ActorResult<R::Response>
199    where
200        R: actr_protocol::RpcRequest,
201    {
202        // Encode request
203        let payload: Bytes = request.encode_to_vec().into();
204
205        // Create envelope
206        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
207        let mut envelope = RpcEnvelope {
208            route_key: R::route_key().to_string(),
209            payload: Some(payload),
210            error: None,
211            traceparent: None,
212            tracestate: None,
213            request_id: uuid::Uuid::new_v4().to_string(),
214            metadata: vec![],
215            timeout_ms: 30000,
216        };
217        // Inject tracing context
218        #[cfg(feature = "opentelemetry")]
219        {
220            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
221            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
222        }
223
224        // Send request and wait for response (target is our actor_id for logging)
225        let response_bytes = self
226            .shared
227            .inproc_gate
228            .send_request(&self.shared.actor_id, envelope)
229            .await?;
230
231        // Decode response
232        R::Response::decode(&*response_bytes).map_err(|e| {
233            ProtocolError::Actr(ActrError::DecodeFailure {
234                message: format!("Failed to decode response: {e}"),
235            })
236        })
237    }
238
239    /// Send one-way message to Actor (Shell → Workload, fire-and-forget)
240    ///
241    /// Unlike `call()`, this method does not wait for a response.
242    /// Use this for notifications or commands that don't need acknowledgment.
243    ///
244    /// # Example
245    ///
246    /// ```rust,ignore
247    /// // Send notification without waiting for response
248    /// actr.tell(LogEvent {
249    ///     level: "INFO".to_string(),
250    ///     message: "User logged in".to_string(),
251    /// }).await?;
252    ///
253    /// // Generated method (if codegen supports tell)
254    /// actr.log_event(LogEvent { ... }).await?;
255    /// ```
256    ///
257    /// # Performance
258    ///
259    /// - **Latency**: ~10μs (in-process, zero serialization)
260    /// - **No blocking**: Returns immediately after sending
261    /// - **No response**: Caller won't know if message was processed
262    pub async fn tell<R>(&self, message: R) -> ActorResult<()>
263    where
264        R: actr_protocol::RpcRequest + ProstMessage,
265    {
266        // Encode message
267        let payload: Bytes = message.encode_to_vec().into();
268
269        // Create envelope (note: request_id still included for tracing)
270        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
271        let mut envelope = RpcEnvelope {
272            route_key: R::route_key().to_string(),
273            payload: Some(payload),
274            error: None,
275            traceparent: None,
276            tracestate: None,
277            request_id: uuid::Uuid::new_v4().to_string(),
278            metadata: vec![],
279            timeout_ms: 0, // No timeout for one-way messages
280        };
281        // Inject tracing context
282        #[cfg(feature = "opentelemetry")]
283        {
284            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
285            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
286        }
287
288        // Send message without waiting for response
289        self.shared
290            .inproc_gate
291            .send_message(&self.shared.actor_id, envelope)
292            .await
293    }
294
295    /// Trigger Actor shutdown
296    ///
297    /// This signals the Actor to stop, but does not wait for completion.
298    /// Use `wait_for_shutdown()` to wait for cleanup to finish.
299    pub fn shutdown(&self) {
300        tracing::info!("🛑 Shutdown requested for Actor {:?}", self.shared.actor_id);
301        self.shared.shutdown_token.cancel();
302    }
303
304    /// Wait for Actor to fully shutdown
305    ///
306    /// This waits for the shutdown signal to be triggered.
307    /// All background tasks will be aborted when the last `ActrRef` is dropped.
308    pub async fn wait_for_shutdown(&self) {
309        self.shared.shutdown_token.cancelled().await;
310        // Take ownership of the current handles so we can await them as Futures.
311        let mut guard = self.shared.task_handles.lock().await;
312        let handles = std::mem::take(&mut *guard);
313        drop(guard);
314        tracing::debug!("Waiting for tasks to complete: {:?}", handles.len());
315        // All tasks have been asked to shut down; wait for them with a timeout,
316        // and abort any that don't finish in time to avoid leaking background work.
317        for handle in handles {
318            let sleep = tokio::time::sleep(Duration::from_secs(5));
319            tokio::pin!(handle);
320            tokio::pin!(sleep);
321
322            tokio::select! {
323                res = &mut handle => {
324                    match res {
325                        Ok(_) => {
326                            tracing::debug!("Task completed");
327                        }
328                        Err(e) => {
329                            tracing::error!("Task failed: {:?}", e);
330                        }
331                    }
332                }
333                _ = sleep => {
334                    tracing::warn!("Task timed out after 5s, aborting");
335                    handle.abort();
336                }
337            }
338        }
339    }
340
341    /// Check if Actor is shutting down
342    pub fn is_shutting_down(&self) -> bool {
343        self.shared.shutdown_token.is_cancelled()
344    }
345
346    ///
347    /// This consumes the `ActrRef` and waits for signal (Ctrl+C / SIGTERM) , then triggers shutdown.
348    ///
349    /// # Example
350    ///
351    /// ```rust,ignore
352    /// let actr = node.start().await?;
353    /// actr.wait_for_ctrl_c_and_shutdown().await?;
354    /// ```
355    pub async fn wait_for_ctrl_c_and_shutdown(self) -> ActorResult<()> {
356        #[cfg(unix)]
357        {
358            use tokio::signal::unix::{SignalKind, signal};
359
360            let mut sigint = signal(SignalKind::interrupt()).map_err(|e| {
361                ProtocolError::TransportError(format!("Signal handler error (SIGINT): {e}"))
362            })?;
363            let mut sigterm = signal(SignalKind::terminate()).map_err(|e| {
364                ProtocolError::TransportError(format!("Signal handler error (SIGTERM): {e}"))
365            })?;
366
367            tokio::select! {
368                _ = sigint.recv() => {
369                    tracing::info!("📡 Received SIGINT (Ctrl+C) signal");
370                }
371                _ = sigterm.recv() => {
372                    tracing::info!("📡 Received SIGTERM signal");
373                }
374            }
375        }
376
377        #[cfg(not(unix))]
378        {
379            tokio::signal::ctrl_c()
380                .await
381                .map_err(|e| ProtocolError::TransportError(format!("Ctrl+C signal error: {e}")))?;
382
383            tracing::info!("📡 Received Ctrl+C signal");
384        }
385
386        self.shutdown();
387        self.wait_for_shutdown().await;
388
389        Ok(())
390    }
391}
392
393impl Drop for ActrRefShared {
394    fn drop(&mut self) {
395        tracing::info!(
396            "🧹 ActrRefShared dropping - cleaning up Actor {:?}",
397            self.actor_id
398        );
399
400        // Cancel shutdown token
401        self.shutdown_token.cancel();
402
403        // Abort all background tasks (best-effort)
404        if let Ok(mut handles) = self.task_handles.try_lock() {
405            for handle in handles.drain(..) {
406                handle.abort();
407            }
408        } else {
409            tracing::warn!(
410                "⚠️ Failed to lock task_handles mutex during Drop; some tasks may still be running"
411            );
412        }
413
414        tracing::debug!(
415            "✅ All background tasks aborted for Actor {:?}",
416            self.actor_id
417        );
418    }
419}