actr_runtime/
actr_ref.rs

1//! ActrRef - Lightweight reference to a running Actor
2//!
3//! # Design Philosophy
4//!
5//! `ActrRef` is the primary handle for interacting with a running Actor.
6//! It provides:
7//!
8//! - **RPC calls**: Call Actor methods (Shell → Workload)
9//! - **Event subscription**: Subscribe to Actor events (Workload → Shell)
10//! - **Lifecycle control**: Shutdown and wait for completion
11//!
12//! # Key Characteristics
13//!
14//! - **Cloneable**: Can be shared across tasks
15//! - **Lightweight**: Contains only an `Arc` to shared state
16//! - **Auto-cleanup**: Last `ActrRef` drop triggers resource cleanup
17//! - **Code-gen friendly**: RPC methods will be generated and bound to this type
18//!
19//! # Usage
20//!
21//! ```rust,ignore
22//! let actr = node.start().await?;
23//!
24//! // Clone and use in different tasks
25//! let actr1 = actr.clone();
26//! tokio::spawn(async move {
27//!     actr1.call(SomeRequest { ... }).await?;
28//! });
29//!
30//! // Subscribe to events
31//! let mut events = actr.events();
32//! while let Some(event) = events.next().await {
33//!     println!("Event: {:?}", event);
34//! }
35//!
36//! // Shutdown
37//! actr.shutdown();
38//! actr.wait_for_shutdown().await;
39//! ```
40
41use std::marker::PhantomData;
42use std::sync::Arc;
43use tokio::task::JoinHandle;
44use tokio_util::sync::CancellationToken;
45
46use actr_framework::{Bytes, Workload};
47use actr_protocol::prost::Message as ProstMessage;
48use actr_protocol::{ActorResult, ActrError, ActrId, ProtocolError, RpcEnvelope};
49
50use crate::outbound::InprocOutGate;
51
52/// ActrRef - Lightweight reference to a running Actor
53///
54/// This is the primary handle returned by `ActrNode::start()`.
55///
56/// # Code Generation Pattern
57///
58/// `actr-cli` code generator will generate type-safe RPC methods for `ActrRef`.
59///
60/// ## Proto Definition
61///
62/// ```protobuf
63/// service EchoService {
64///   rpc Echo(EchoRequest) returns (EchoResponse);
65///   rpc Ping(PingRequest) returns (PingResponse);
66/// }
67/// ```
68///
69/// ## Generated Code (in `generated/echo_service_actr_ref.rs`)
70///
71/// ```rust,ignore
72/// use actr_runtime::ActrRef;
73/// use super::echo_service_actor::{EchoServiceWorkload, EchoServiceHandler};
74/// use super::echo::{EchoRequest, EchoResponse, PingRequest, PingResponse};
75///
76/// impl<T: EchoServiceHandler> ActrRef<EchoServiceWorkload<T>> {
77///     /// Call Echo RPC method
78///     pub async fn echo(&self, request: EchoRequest) -> ActorResult<EchoResponse> {
79///         self.call(request).await
80///     }
81///
82///     /// Call Ping RPC method
83///     pub async fn ping(&self, request: PingRequest) -> ActorResult<PingResponse> {
84///         self.call(request).await
85///     }
86/// }
87/// ```
88///
89/// ## Usage in Shell
90///
91/// ```rust,ignore
92/// use generated::echo_service_actr_ref::*;  // Import ActrRef extensions
93///
94/// let actr = node.start().await?;
95///
96/// // Type-safe RPC calls (generated methods)
97/// let response = actr.echo(EchoRequest {
98///     message: "Hello".to_string(),
99/// }).await?;
100///
101/// // Or use generic call() method
102/// let response: EchoResponse = actr.call(EchoRequest { ... }).await?;
103/// ```
104///
105/// # Design Rationale
106///
107/// **Why bind RPC methods to ActrRef?**
108///
109/// 1. **Type Safety**: Compiler checks request/response types
110/// 2. **Auto-completion**: IDE shows available RPC methods
111/// 3. **No target needed**: ActrRef already knows its target Actor
112/// 4. **Symmetric to Context**: Similar to Context extension pattern
113///
114/// **Comparison with Context pattern:**
115///
116/// | Aspect | Context (in Workload) | ActrRef (in Shell) |
117/// |--------|----------------------|-------------------|
118/// | Caller | Workload | Shell |
119/// | Target | Any Actor (needs `target` param) | This Workload (fixed) |
120/// | Method | `ctx.call(target, req)` | `actr.echo(req)` |
121/// | Generation | Extension trait | Concrete impl |
122pub struct ActrRef<W: Workload> {
123    pub(crate) shared: Arc<ActrRefShared>,
124    _phantom: PhantomData<W>,
125}
126
127impl<W: Workload> Clone for ActrRef<W> {
128    fn clone(&self) -> Self {
129        Self {
130            shared: Arc::clone(&self.shared),
131            _phantom: PhantomData,
132        }
133    }
134}
135
136/// Shared state between all ActrRef clones
137///
138/// This is an internal implementation detail. When the last `ActrRef` is dropped,
139/// this struct's `Drop` impl will trigger shutdown and cleanup all resources.
140pub(crate) struct ActrRefShared {
141    /// Actor ID
142    pub(crate) actor_id: ActrId,
143
144    /// Inproc gate for Shell → Workload RPC
145    pub(crate) inproc_gate: Arc<InprocOutGate>,
146
147    /// Shutdown signal
148    pub(crate) shutdown_token: CancellationToken,
149
150    /// Background task handles (receive loops, WebRTC coordinator, etc.)
151    pub(crate) task_handles: Vec<JoinHandle<()>>,
152}
153
154impl<W: Workload> ActrRef<W> {
155    /// Create new ActrRef from shared state
156    ///
157    /// This is an internal API used by `ActrNode::start()`.
158    pub(crate) fn new(shared: Arc<ActrRefShared>) -> Self {
159        Self {
160            shared,
161            _phantom: PhantomData,
162        }
163    }
164
165    /// Get Actor ID
166    pub fn actor_id(&self) -> &ActrId {
167        &self.shared.actor_id
168    }
169
170    /// Call Actor method (Shell → Workload RPC)
171    ///
172    /// This is a generic method used by code-generated RPC methods.
173    /// Most users should use the generated methods instead.
174    ///
175    /// # Example
176    ///
177    /// ```rust,ignore
178    /// // Generic call
179    /// let response: EchoResponse = actr.call(EchoRequest {
180    ///     message: "Hello".to_string(),
181    /// }).await?;
182    ///
183    /// // Generated method (preferred)
184    /// let response = actr.echo(EchoRequest {
185    ///     message: "Hello".to_string(),
186    /// }).await?;
187    /// ```
188    pub async fn call<R>(&self, request: R) -> ActorResult<R::Response>
189    where
190        R: actr_protocol::RpcRequest + ProstMessage,
191    {
192        // Encode request
193        let payload: Bytes = request.encode_to_vec().into();
194
195        // Create envelope
196        let envelope = RpcEnvelope {
197            route_key: R::route_key().to_string(),
198            payload: Some(payload),
199            error: None,
200            trace_id: uuid::Uuid::new_v4().to_string(),
201            request_id: uuid::Uuid::new_v4().to_string(),
202            metadata: vec![],
203            timeout_ms: 30000,
204        };
205
206        // Send request and wait for response (target is our actor_id for logging)
207        let response_bytes = self
208            .shared
209            .inproc_gate
210            .send_request(&self.shared.actor_id, envelope)
211            .await?;
212
213        // Decode response
214        R::Response::decode(&*response_bytes).map_err(|e| {
215            ProtocolError::Actr(ActrError::DecodeFailure {
216                message: format!("Failed to decode response: {e}"),
217            })
218        })
219    }
220
221    /// Send one-way message to Actor (Shell → Workload, fire-and-forget)
222    ///
223    /// Unlike `call()`, this method does not wait for a response.
224    /// Use this for notifications or commands that don't need acknowledgment.
225    ///
226    /// # Example
227    ///
228    /// ```rust,ignore
229    /// // Send notification without waiting for response
230    /// actr.tell(LogEvent {
231    ///     level: "INFO".to_string(),
232    ///     message: "User logged in".to_string(),
233    /// }).await?;
234    ///
235    /// // Generated method (if codegen supports tell)
236    /// actr.log_event(LogEvent { ... }).await?;
237    /// ```
238    ///
239    /// # Performance
240    ///
241    /// - **Latency**: ~10μs (in-process, zero serialization)
242    /// - **No blocking**: Returns immediately after sending
243    /// - **No response**: Caller won't know if message was processed
244    pub async fn tell<R>(&self, message: R) -> ActorResult<()>
245    where
246        R: actr_protocol::RpcRequest + ProstMessage,
247    {
248        // Encode message
249        let payload: Bytes = message.encode_to_vec().into();
250
251        // Create envelope (note: request_id still included for tracing)
252        let envelope = RpcEnvelope {
253            route_key: R::route_key().to_string(),
254            payload: Some(payload),
255            error: None,
256            trace_id: uuid::Uuid::new_v4().to_string(),
257            request_id: uuid::Uuid::new_v4().to_string(),
258            metadata: vec![],
259            timeout_ms: 0, // No timeout for one-way messages
260        };
261
262        // Send message without waiting for response
263        self.shared
264            .inproc_gate
265            .send_message(&self.shared.actor_id, envelope)
266            .await
267    }
268
269    /// Trigger Actor shutdown
270    ///
271    /// This signals the Actor to stop, but does not wait for completion.
272    /// Use `wait_for_shutdown()` to wait for cleanup to finish.
273    pub fn shutdown(&self) {
274        tracing::info!("🛑 Shutdown requested for Actor {:?}", self.shared.actor_id);
275        self.shared.shutdown_token.cancel();
276    }
277
278    /// Wait for Actor to fully shutdown
279    ///
280    /// This waits for the shutdown signal to be triggered.
281    /// All background tasks will be aborted when the last `ActrRef` is dropped.
282    pub async fn wait_for_shutdown(&self) {
283        self.shared.shutdown_token.cancelled().await;
284    }
285
286    /// Check if Actor is shutting down
287    pub fn is_shutting_down(&self) -> bool {
288        self.shared.shutdown_token.is_cancelled()
289    }
290
291    /// Convenience: wait for Ctrl+C and shutdown
292    ///
293    /// This consumes the `ActrRef` and waits for Ctrl+C, then triggers shutdown.
294    ///
295    /// # Example
296    ///
297    /// ```rust,ignore
298    /// let actr = node.start().await?;
299    /// actr.wait_for_ctrl_c_and_shutdown().await?;
300    /// ```
301    pub async fn wait_for_ctrl_c_and_shutdown(self) -> ActorResult<()> {
302        tokio::signal::ctrl_c()
303            .await
304            .map_err(|e| ProtocolError::TransportError(format!("Ctrl+C signal error: {e}")))?;
305
306        tracing::info!("📡 Received Ctrl+C signal");
307        self.shutdown();
308        self.wait_for_shutdown().await;
309
310        Ok(())
311    }
312}
313
314impl Drop for ActrRefShared {
315    fn drop(&mut self) {
316        tracing::info!(
317            "🧹 ActrRefShared dropping - cleaning up Actor {:?}",
318            self.actor_id
319        );
320
321        // Cancel shutdown token
322        self.shutdown_token.cancel();
323
324        // Abort all background tasks
325        for handle in self.task_handles.drain(..) {
326            handle.abort();
327        }
328
329        tracing::debug!(
330            "✅ All background tasks aborted for Actor {:?}",
331            self.actor_id
332        );
333    }
334}