actr_runtime/
actr_ref.rs

1//! ActrRef - Lightweight reference to a running Actor
2//!
3//! # Design Philosophy
4//!
5//! `ActrRef` is the primary handle for interacting with a running Actor.
6//! It provides:
7//!
8//! - **RPC calls**: Call Actor methods (Shell → Workload)
9//! - **Event subscription**: Subscribe to Actor events (Workload → Shell)
10//! - **Lifecycle control**: Shutdown and wait for completion
11//!
12//! # Key Characteristics
13//!
14//! - **Cloneable**: Can be shared across tasks
15//! - **Lightweight**: Contains only an `Arc` to shared state
16//! - **Auto-cleanup**: Last `ActrRef` drop triggers resource cleanup
17//! - **Code-gen friendly**: RPC methods will be generated and bound to this type
18//!
19//! # Usage
20//!
21//! ```rust,ignore
22//! let actr = node.start().await?;
23//!
24//! // Clone and use in different tasks
25//! let actr1 = actr.clone();
26//! tokio::spawn(async move {
27//!     actr1.call(SomeRequest { ... }).await?;
28//! });
29//!
30//! // Subscribe to events
31//! let mut events = actr.events();
32//! while let Some(event) = events.next().await {
33//!     println!("Event: {:?}", event);
34//! }
35//!
36//! // Shutdown
37//! actr.shutdown();
38//! actr.wait_for_shutdown().await;
39//! ```
40
41use crate::lifecycle::ActrNode;
42use crate::outbound::InprocOutGate;
43use actr_framework::{Bytes, Workload};
44use actr_protocol::prost::Message as ProstMessage;
45use actr_protocol::{ActorResult, ActrError, ActrId, PayloadType, ProtocolError, RpcEnvelope};
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::sync::Mutex;
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51
52/// ActrRef - Lightweight reference to a running Actor
53///
54/// This is the primary handle returned by `ActrNode::start()`.
55///
56/// # Code Generation Pattern
57///
58/// `actr-cli` code generator will generate type-safe RPC methods for `ActrRef`.
59///
60/// ## Proto Definition
61///
62/// ```protobuf
63/// service EchoService {
64///   rpc Echo(EchoRequest) returns (EchoResponse);
65///   rpc Ping(PingRequest) returns (PingResponse);
66/// }
67/// ```
68///
69/// ## Generated Code (in `generated/echo_service_actr_ref.rs`)
70///
71/// ```rust,ignore
72/// use actr_runtime::ActrRef;
73/// use super::echo_service_actor::{EchoServiceWorkload, EchoServiceHandler};
74/// use super::echo::{EchoRequest, EchoResponse, PingRequest, PingResponse};
75///
76/// impl<T: EchoServiceHandler> ActrRef<EchoServiceWorkload<T>> {
77///     /// Call Echo RPC method
78///     pub async fn echo(&self, request: EchoRequest) -> ActorResult<EchoResponse> {
79///         self.call(request).await
80///     }
81///
82///     /// Call Ping RPC method
83///     pub async fn ping(&self, request: PingRequest) -> ActorResult<PingResponse> {
84///         self.call(request).await
85///     }
86/// }
87/// ```
88///
89/// ## Usage in Shell
90///
91/// ```rust,ignore
92/// use generated::echo_service_actr_ref::*;  // Import ActrRef extensions
93///
94/// let actr = node.start().await?;
95///
96/// // Type-safe RPC calls (generated methods)
97/// let response = actr.echo(EchoRequest {
98///     message: "Hello".to_string(),
99/// }).await?;
100///
101/// // Or use generic call() method
102/// let response: EchoResponse = actr.call(EchoRequest { ... }).await?;
103/// ```
104///
105/// # Design Rationale
106///
107/// **Why bind RPC methods to ActrRef?**
108///
109/// 1. **Type Safety**: Compiler checks request/response types
110/// 2. **Auto-completion**: IDE shows available RPC methods
111/// 3. **No target needed**: ActrRef already knows its target Actor
112/// 4. **Symmetric to Context**: Similar to Context extension pattern
113///
114/// **Comparison with Context pattern:**
115///
116/// | Aspect | Context (in Workload) | ActrRef (in Shell) |
117/// |--------|----------------------|-------------------|
118/// | Caller | Workload | Shell |
119/// | Target | Any Actor (needs `target` param) | This Workload (fixed) |
120/// | Method | `ctx.call(target, req)` | `actr.echo(req)` |
121/// | Generation | Extension trait | Concrete impl |
122pub struct ActrRef<W: Workload> {
123    pub(crate) shared: Arc<ActrRefShared>,
124    pub(crate) node: Arc<ActrNode<W>>,
125}
126
127impl<W: Workload> Clone for ActrRef<W> {
128    fn clone(&self) -> Self {
129        Self {
130            shared: Arc::clone(&self.shared),
131            node: Arc::clone(&self.node),
132        }
133    }
134}
135
136/// Shared state between all ActrRef clones
137///
138/// This is an internal implementation detail. When the last `ActrRef` is dropped,
139/// this struct's `Drop` impl will trigger shutdown and cleanup all resources.
140pub(crate) struct ActrRefShared {
141    /// Actor ID
142    pub(crate) actor_id: ActrId,
143
144    /// Inproc gate for Shell → Workload RPC
145    pub(crate) inproc_gate: Arc<InprocOutGate>,
146
147    /// Shutdown signal
148    pub(crate) shutdown_token: CancellationToken,
149
150    /// Background task handles (receive loops, WebRTC coordinator, etc.)
151    pub(crate) task_handles: Mutex<Vec<JoinHandle<()>>>,
152}
153
154impl<W: Workload> ActrRef<W> {
155    /// Create new ActrRef from shared state
156    ///
157    /// This is an internal API used by `ActrNode::start()`.
158    pub(crate) fn new(shared: Arc<ActrRefShared>, node: Arc<ActrNode<W>>) -> Self {
159        Self { shared, node }
160    }
161
162    /// Get Actor ID
163    pub fn actor_id(&self) -> &ActrId {
164        &self.shared.actor_id
165    }
166
167    /// Discover remote actors of the specified type via signaling server.
168    ///
169    /// This method implements the full runtime compatibility negotiation workflow:
170    ///
171    /// 1. **Fast Path**: Check `compat.lock.toml` for cached negotiation results
172    /// 2. **Ideal Path**: Read fingerprint from `Actr.lock.toml` and request exact match
173    /// 3. **Negotiation**: If no exact match, server performs compatibility analysis
174    /// 4. **Result**: Returns candidates with compatibility info, updates caches
175    ///
176    /// The fingerprint is automatically obtained from the `Actr.lock.toml` file
177    /// loaded during `ActrSystem::attach()`.
178    ///
179    /// # Arguments
180    /// - `target_type`: The ActrType of the target service to discover
181    /// - `candidate_count`: Maximum number of candidates to return
182    ///
183    /// # Returns
184    /// A list of compatible `ActrId` candidates.
185    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
186    pub async fn discover_route_candidates(
187        &self,
188        target_type: &actr_protocol::ActrType,
189        candidate_count: u32,
190    ) -> ActorResult<Vec<ActrId>> {
191        let result = self
192            .node
193            .discover_route_candidates(target_type, candidate_count)
194            .await?;
195        Ok(result.candidates)
196    }
197
198    /// Call Actor method (Shell → Workload RPC)
199    ///
200    /// This is a generic method used by code-generated RPC methods.
201    /// Most users should use the generated methods instead.
202    ///
203    /// # Example
204    ///
205    /// ```rust,ignore
206    /// // Generic call
207    /// let response: EchoResponse = actr.call(EchoRequest {
208    ///     message: "Hello".to_string(),
209    /// }).await?;
210    ///
211    /// // Generated method (preferred)
212    /// let response = actr.echo(EchoRequest {
213    ///     message: "Hello".to_string(),
214    /// }).await?;
215    /// ```
216    #[cfg_attr(
217        feature = "opentelemetry",
218        tracing::instrument(skip_all, name = "ActrRef.call")
219    )]
220    pub async fn call<R>(&self, request: R) -> ActorResult<R::Response>
221    where
222        R: actr_protocol::RpcRequest,
223    {
224        // Encode request
225        let payload: Bytes = request.encode_to_vec().into();
226
227        // Create envelope
228        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
229        let mut envelope = RpcEnvelope {
230            route_key: R::route_key().to_string(),
231            payload: Some(payload),
232            error: None,
233            traceparent: None,
234            tracestate: None,
235            request_id: uuid::Uuid::new_v4().to_string(),
236            metadata: vec![],
237            timeout_ms: 30000,
238        };
239        // Inject tracing context
240        #[cfg(feature = "opentelemetry")]
241        {
242            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
243            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
244        }
245
246        // Send request and wait for response (target is our actor_id for logging)
247        let response_bytes = self
248            .shared
249            .inproc_gate
250            .send_request(&self.shared.actor_id, envelope)
251            .await?;
252
253        // Decode response
254        R::Response::decode(&*response_bytes).map_err(|e| {
255            ProtocolError::Actr(ActrError::DecodeFailure {
256                message: format!("Failed to decode response: {e}"),
257            })
258        })
259    }
260
261    /// Call Actor method using route_key and request bytes (for language bindings)
262    ///
263    /// This is a non-generic version of `call()` that accepts route_key and raw bytes,
264    /// making it suitable for language bindings (e.g., Python) that don't have access
265    /// to Rust's generic `RpcRequest` trait.
266    ///
267    /// # Parameters
268    /// - `route_key`: Route key string (e.g., "package.Service.Method")
269    /// - `request_bytes`: Request protobuf bytes
270    /// - `timeout_ms`: Timeout in milliseconds
271    /// - `payload_type`: Payload transmission type
272    ///
273    /// # Returns
274    /// Response protobuf bytes
275    pub async fn call_raw(
276        &self,
277        route_key: String,
278        request_bytes: Bytes,
279        timeout_ms: i64,
280        payload_type: PayloadType,
281    ) -> ActorResult<Bytes> {
282        // Create envelope
283        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
284        let mut envelope = RpcEnvelope {
285            route_key,
286            payload: Some(request_bytes),
287            error: None,
288            traceparent: None,
289            tracestate: None,
290            request_id: uuid::Uuid::new_v4().to_string(),
291            metadata: vec![],
292            timeout_ms,
293        };
294        // Inject tracing context
295        #[cfg(feature = "opentelemetry")]
296        {
297            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
298            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
299        }
300
301        // Send request and wait for response
302        self.shared
303            .inproc_gate
304            .send_request_with_type(&self.shared.actor_id, payload_type, None, envelope)
305            .await
306    }
307
308    /// Send one-way message using route_key and message bytes (for language bindings)
309    ///
310    /// This is a non-generic version of `tell()` that accepts route_key and raw bytes,
311    /// making it suitable for language bindings (e.g., Python) that don't have access
312    /// to Rust's generic `RpcRequest` trait.
313    ///
314    /// # Parameters
315    /// - `route_key`: Route key string (e.g., "package.Service.Method")
316    /// - `message_bytes`: Message protobuf bytes
317    /// - `payload_type`: Payload transmission type
318    ///
319    /// # Returns
320    /// Unit (fire-and-forget, no response)
321    pub async fn tell_raw(
322        &self,
323        route_key: String,
324        message_bytes: Bytes,
325        payload_type: PayloadType,
326    ) -> ActorResult<()> {
327        // Create envelope
328        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
329        let mut envelope = RpcEnvelope {
330            route_key,
331            payload: Some(message_bytes),
332            error: None,
333            traceparent: None,
334            tracestate: None,
335            request_id: uuid::Uuid::new_v4().to_string(),
336            metadata: vec![],
337            timeout_ms: 0, // No timeout for one-way messages
338        };
339        // Inject tracing context
340        #[cfg(feature = "opentelemetry")]
341        {
342            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
343            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
344        }
345
346        // Send message without waiting for response
347        self.shared
348            .inproc_gate
349            .send_message_with_type(&self.shared.actor_id, payload_type, None, envelope)
350            .await
351    }
352
353    /// Send one-way message to Actor (Shell → Workload, fire-and-forget)
354    ///
355    /// Unlike `call()`, this method does not wait for a response.
356    /// Use this for notifications or commands that don't need acknowledgment.
357    ///
358    /// # Example
359    ///
360    /// ```rust,ignore
361    /// // Send notification without waiting for response
362    /// actr.tell(LogEvent {
363    ///     level: "INFO".to_string(),
364    ///     message: "User logged in".to_string(),
365    /// }).await?;
366    ///
367    /// // Generated method (if codegen supports tell)
368    /// actr.log_event(LogEvent { ... }).await?;
369    /// ```
370    ///
371    /// # Performance
372    ///
373    /// - **Latency**: ~10μs (in-process, zero serialization)
374    /// - **No blocking**: Returns immediately after sending
375    /// - **No response**: Caller won't know if message was processed
376    #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
377    pub async fn tell<R>(&self, message: R) -> ActorResult<()>
378    where
379        R: actr_protocol::RpcRequest + ProstMessage,
380    {
381        // Encode message
382        let payload: Bytes = message.encode_to_vec().into();
383
384        // Create envelope (note: request_id still included for tracing)
385        #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
386        let mut envelope = RpcEnvelope {
387            route_key: R::route_key().to_string(),
388            payload: Some(payload),
389            error: None,
390            traceparent: None,
391            tracestate: None,
392            request_id: uuid::Uuid::new_v4().to_string(),
393            metadata: vec![],
394            timeout_ms: 0, // No timeout for one-way messages
395        };
396        // Inject tracing context
397        #[cfg(feature = "opentelemetry")]
398        {
399            use crate::wire::webrtc::trace::inject_span_context_to_rpc;
400            inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
401        }
402
403        // Send message without waiting for response
404        self.shared
405            .inproc_gate
406            .send_message(&self.shared.actor_id, envelope)
407            .await
408    }
409
410    /// Trigger Actor shutdown
411    ///
412    /// This signals the Actor to stop, but does not wait for completion.
413    /// Use `wait_for_shutdown()` to wait for cleanup to finish.
414    pub fn shutdown(&self) {
415        tracing::info!("🛑 Shutdown requested for Actor {:?}", self.shared.actor_id);
416        self.shared.shutdown_token.cancel();
417    }
418
419    /// Wait for Actor to fully shutdown
420    ///
421    /// This waits for the shutdown signal to be triggered.
422    /// All background tasks will be aborted when the last `ActrRef` is dropped.
423    pub async fn wait_for_shutdown(&self) {
424        self.shared.shutdown_token.cancelled().await;
425        // Take ownership of the current handles so we can await them as Futures.
426        let mut guard = self.shared.task_handles.lock().await;
427        let handles = std::mem::take(&mut *guard);
428        drop(guard);
429        tracing::debug!("Waiting for tasks to complete: {:?}", handles.len());
430        // All tasks have been asked to shut down; wait for them with a timeout,
431        // and abort any that don't finish in time to avoid leaking background work.
432        for handle in handles {
433            let sleep = tokio::time::sleep(Duration::from_secs(5));
434            tokio::pin!(handle);
435            tokio::pin!(sleep);
436
437            tokio::select! {
438                res = &mut handle => {
439                    match res {
440                        Ok(_) => {
441                            tracing::debug!("Task completed");
442                        }
443                        Err(e) => {
444                            tracing::error!("Task failed: {:?}", e);
445                        }
446                    }
447                }
448                _ = sleep => {
449                    tracing::warn!("Task timed out after 5s, aborting");
450                    handle.abort();
451                }
452            }
453        }
454    }
455
456    /// Check if Actor is shutting down
457    pub fn is_shutting_down(&self) -> bool {
458        self.shared.shutdown_token.is_cancelled()
459    }
460
461    ///
462    /// This consumes the `ActrRef` and waits for signal (Ctrl+C / SIGTERM) , then triggers shutdown.
463    ///
464    /// # Example
465    ///
466    /// ```rust,ignore
467    /// let actr = node.start().await?;
468    /// actr.wait_for_ctrl_c_and_shutdown().await?;
469    /// ```
470    pub async fn wait_for_ctrl_c_and_shutdown(self) -> ActorResult<()> {
471        #[cfg(unix)]
472        {
473            use tokio::signal::unix::{SignalKind, signal};
474
475            let mut sigint = signal(SignalKind::interrupt()).map_err(|e| {
476                ProtocolError::TransportError(format!("Signal handler error (SIGINT): {e}"))
477            })?;
478            let mut sigterm = signal(SignalKind::terminate()).map_err(|e| {
479                ProtocolError::TransportError(format!("Signal handler error (SIGTERM): {e}"))
480            })?;
481
482            tokio::select! {
483                _ = sigint.recv() => {
484                    tracing::info!("📡 Received SIGINT (Ctrl+C) signal");
485                }
486                _ = sigterm.recv() => {
487                    tracing::info!("📡 Received SIGTERM signal");
488                }
489            }
490        }
491
492        #[cfg(not(unix))]
493        {
494            tokio::signal::ctrl_c()
495                .await
496                .map_err(|e| ProtocolError::TransportError(format!("Ctrl+C signal error: {e}")))?;
497
498            tracing::info!("📡 Received Ctrl+C signal");
499        }
500
501        self.shutdown();
502        self.wait_for_shutdown().await;
503
504        Ok(())
505    }
506}
507
508impl Drop for ActrRefShared {
509    fn drop(&mut self) {
510        tracing::info!(
511            "🧹 ActrRefShared dropping - cleaning up Actor {:?}",
512            self.actor_id
513        );
514
515        // Cancel shutdown token
516        self.shutdown_token.cancel();
517
518        // Abort all background tasks (best-effort)
519        if let Ok(mut handles) = self.task_handles.try_lock() {
520            for handle in handles.drain(..) {
521                handle.abort();
522            }
523        } else {
524            tracing::warn!(
525                "⚠️ Failed to lock task_handles mutex during Drop; some tasks may still be running"
526            );
527        }
528
529        tracing::debug!(
530            "✅ All background tasks aborted for Actor {:?}",
531            self.actor_id
532        );
533    }
534}