actr_runtime/actr_ref.rs
1//! ActrRef - Lightweight reference to a running Actor
2//!
3//! # Design Philosophy
4//!
5//! `ActrRef` is the primary handle for interacting with a running Actor.
6//! It provides:
7//!
8//! - **RPC calls**: Call Actor methods (Shell → Workload)
9//! - **Event subscription**: Subscribe to Actor events (Workload → Shell)
10//! - **Lifecycle control**: Shutdown and wait for completion
11//!
12//! # Key Characteristics
13//!
14//! - **Cloneable**: Can be shared across tasks
15//! - **Lightweight**: Contains only an `Arc` to shared state
16//! - **Auto-cleanup**: Last `ActrRef` drop triggers resource cleanup
17//! - **Code-gen friendly**: RPC methods will be generated and bound to this type
18//!
19//! # Usage
20//!
21//! ```rust,ignore
22//! let actr = node.start().await?;
23//!
24//! // Clone and use in different tasks
25//! let actr1 = actr.clone();
26//! tokio::spawn(async move {
27//! actr1.call(SomeRequest { ... }).await?;
28//! });
29//!
30//! // Subscribe to events
31//! let mut events = actr.events();
32//! while let Some(event) = events.next().await {
33//! println!("Event: {:?}", event);
34//! }
35//!
36//! // Shutdown
37//! actr.shutdown();
38//! actr.wait_for_shutdown().await;
39//! ```
40
41use crate::lifecycle::ActrNode;
42use crate::outbound::InprocOutGate;
43use actr_framework::{Bytes, Workload};
44use actr_protocol::prost::Message as ProstMessage;
45use actr_protocol::{ActorResult, ActrError, ActrId, PayloadType, ProtocolError, RpcEnvelope};
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::sync::Mutex;
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51
52/// ActrRef - Lightweight reference to a running Actor
53///
54/// This is the primary handle returned by `ActrNode::start()`.
55///
56/// # Code Generation Pattern
57///
58/// `actr-cli` code generator will generate type-safe RPC methods for `ActrRef`.
59///
60/// ## Proto Definition
61///
62/// ```protobuf
63/// service EchoService {
64/// rpc Echo(EchoRequest) returns (EchoResponse);
65/// rpc Ping(PingRequest) returns (PingResponse);
66/// }
67/// ```
68///
69/// ## Generated Code (in `generated/echo_service_actr_ref.rs`)
70///
71/// ```rust,ignore
72/// use actr_runtime::ActrRef;
73/// use super::echo_service_actor::{EchoServiceWorkload, EchoServiceHandler};
74/// use super::echo::{EchoRequest, EchoResponse, PingRequest, PingResponse};
75///
76/// impl<T: EchoServiceHandler> ActrRef<EchoServiceWorkload<T>> {
77/// /// Call Echo RPC method
78/// pub async fn echo(&self, request: EchoRequest) -> ActorResult<EchoResponse> {
79/// self.call(request).await
80/// }
81///
82/// /// Call Ping RPC method
83/// pub async fn ping(&self, request: PingRequest) -> ActorResult<PingResponse> {
84/// self.call(request).await
85/// }
86/// }
87/// ```
88///
89/// ## Usage in Shell
90///
91/// ```rust,ignore
92/// use generated::echo_service_actr_ref::*; // Import ActrRef extensions
93///
94/// let actr = node.start().await?;
95///
96/// // Type-safe RPC calls (generated methods)
97/// let response = actr.echo(EchoRequest {
98/// message: "Hello".to_string(),
99/// }).await?;
100///
101/// // Or use generic call() method
102/// let response: EchoResponse = actr.call(EchoRequest { ... }).await?;
103/// ```
104///
105/// # Design Rationale
106///
107/// **Why bind RPC methods to ActrRef?**
108///
109/// 1. **Type Safety**: Compiler checks request/response types
110/// 2. **Auto-completion**: IDE shows available RPC methods
111/// 3. **No target needed**: ActrRef already knows its target Actor
112/// 4. **Symmetric to Context**: Similar to Context extension pattern
113///
114/// **Comparison with Context pattern:**
115///
116/// | Aspect | Context (in Workload) | ActrRef (in Shell) |
117/// |--------|----------------------|-------------------|
118/// | Caller | Workload | Shell |
119/// | Target | Any Actor (needs `target` param) | This Workload (fixed) |
120/// | Method | `ctx.call(target, req)` | `actr.echo(req)` |
121/// | Generation | Extension trait | Concrete impl |
122pub struct ActrRef<W: Workload> {
123 pub(crate) shared: Arc<ActrRefShared>,
124 pub(crate) node: Arc<ActrNode<W>>,
125}
126
127impl<W: Workload> Clone for ActrRef<W> {
128 fn clone(&self) -> Self {
129 Self {
130 shared: Arc::clone(&self.shared),
131 node: Arc::clone(&self.node),
132 }
133 }
134}
135
136/// Shared state between all ActrRef clones
137///
138/// This is an internal implementation detail. When the last `ActrRef` is dropped,
139/// this struct's `Drop` impl will trigger shutdown and cleanup all resources.
140pub(crate) struct ActrRefShared {
141 /// Actor ID
142 pub(crate) actor_id: ActrId,
143
144 /// Inproc gate for Shell → Workload RPC
145 pub(crate) inproc_gate: Arc<InprocOutGate>,
146
147 /// Shutdown signal
148 pub(crate) shutdown_token: CancellationToken,
149
150 /// Background task handles (receive loops, WebRTC coordinator, etc.)
151 pub(crate) task_handles: Mutex<Vec<JoinHandle<()>>>,
152}
153
154impl<W: Workload> ActrRef<W> {
155 /// Create new ActrRef from shared state
156 ///
157 /// This is an internal API used by `ActrNode::start()`.
158 pub(crate) fn new(shared: Arc<ActrRefShared>, node: Arc<ActrNode<W>>) -> Self {
159 Self { shared, node }
160 }
161
162 /// Get Actor ID
163 pub fn actor_id(&self) -> &ActrId {
164 &self.shared.actor_id
165 }
166
167 /// Discover remote actors of the specified type via signaling server.
168 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
169 pub async fn discover_route_candidates(
170 &self,
171 target_type: &actr_protocol::ActrType,
172 candidate_count: u32,
173 ) -> ActorResult<Vec<ActrId>> {
174 self.node
175 .discover_route_candidates(target_type, candidate_count)
176 .await
177 }
178
179 /// Call Actor method (Shell → Workload RPC)
180 ///
181 /// This is a generic method used by code-generated RPC methods.
182 /// Most users should use the generated methods instead.
183 ///
184 /// # Example
185 ///
186 /// ```rust,ignore
187 /// // Generic call
188 /// let response: EchoResponse = actr.call(EchoRequest {
189 /// message: "Hello".to_string(),
190 /// }).await?;
191 ///
192 /// // Generated method (preferred)
193 /// let response = actr.echo(EchoRequest {
194 /// message: "Hello".to_string(),
195 /// }).await?;
196 /// ```
197 #[cfg_attr(
198 feature = "opentelemetry",
199 tracing::instrument(skip_all, name = "ActrRef.call")
200 )]
201 pub async fn call<R>(&self, request: R) -> ActorResult<R::Response>
202 where
203 R: actr_protocol::RpcRequest,
204 {
205 // Encode request
206 let payload: Bytes = request.encode_to_vec().into();
207
208 // Create envelope
209 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
210 let mut envelope = RpcEnvelope {
211 route_key: R::route_key().to_string(),
212 payload: Some(payload),
213 error: None,
214 traceparent: None,
215 tracestate: None,
216 request_id: uuid::Uuid::new_v4().to_string(),
217 metadata: vec![],
218 timeout_ms: 30000,
219 };
220 // Inject tracing context
221 #[cfg(feature = "opentelemetry")]
222 {
223 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
224 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
225 }
226
227 // Send request and wait for response (target is our actor_id for logging)
228 let response_bytes = self
229 .shared
230 .inproc_gate
231 .send_request(&self.shared.actor_id, envelope)
232 .await?;
233
234 // Decode response
235 R::Response::decode(&*response_bytes).map_err(|e| {
236 ProtocolError::Actr(ActrError::DecodeFailure {
237 message: format!("Failed to decode response: {e}"),
238 })
239 })
240 }
241
242 /// Call Actor method using route_key and request bytes (for language bindings)
243 ///
244 /// This is a non-generic version of `call()` that accepts route_key and raw bytes,
245 /// making it suitable for language bindings (e.g., Python) that don't have access
246 /// to Rust's generic `RpcRequest` trait.
247 ///
248 /// # Parameters
249 /// - `route_key`: Route key string (e.g., "package.Service.Method")
250 /// - `request_bytes`: Request protobuf bytes
251 /// - `timeout_ms`: Timeout in milliseconds
252 /// - `payload_type`: Payload transmission type
253 ///
254 /// # Returns
255 /// Response protobuf bytes
256 pub async fn call_raw(
257 &self,
258 route_key: String,
259 request_bytes: Bytes,
260 timeout_ms: i64,
261 payload_type: PayloadType,
262 ) -> ActorResult<Bytes> {
263 // Create envelope
264 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
265 let mut envelope = RpcEnvelope {
266 route_key,
267 payload: Some(request_bytes),
268 error: None,
269 traceparent: None,
270 tracestate: None,
271 request_id: uuid::Uuid::new_v4().to_string(),
272 metadata: vec![],
273 timeout_ms,
274 };
275 // Inject tracing context
276 #[cfg(feature = "opentelemetry")]
277 {
278 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
279 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
280 }
281
282 // Send request and wait for response
283 self.shared
284 .inproc_gate
285 .send_request_with_type(&self.shared.actor_id, payload_type, None, envelope)
286 .await
287 }
288
289 /// Send one-way message using route_key and message bytes (for language bindings)
290 ///
291 /// This is a non-generic version of `tell()` that accepts route_key and raw bytes,
292 /// making it suitable for language bindings (e.g., Python) that don't have access
293 /// to Rust's generic `RpcRequest` trait.
294 ///
295 /// # Parameters
296 /// - `route_key`: Route key string (e.g., "package.Service.Method")
297 /// - `message_bytes`: Message protobuf bytes
298 /// - `payload_type`: Payload transmission type
299 ///
300 /// # Returns
301 /// Unit (fire-and-forget, no response)
302 pub async fn tell_raw(
303 &self,
304 route_key: String,
305 message_bytes: Bytes,
306 payload_type: PayloadType,
307 ) -> ActorResult<()> {
308 // Create envelope
309 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
310 let mut envelope = RpcEnvelope {
311 route_key,
312 payload: Some(message_bytes),
313 error: None,
314 traceparent: None,
315 tracestate: None,
316 request_id: uuid::Uuid::new_v4().to_string(),
317 metadata: vec![],
318 timeout_ms: 0, // No timeout for one-way messages
319 };
320 // Inject tracing context
321 #[cfg(feature = "opentelemetry")]
322 {
323 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
324 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
325 }
326
327 // Send message without waiting for response
328 self.shared
329 .inproc_gate
330 .send_message_with_type(&self.shared.actor_id, payload_type, None, envelope)
331 .await
332 }
333
334 /// Send one-way message to Actor (Shell → Workload, fire-and-forget)
335 ///
336 /// Unlike `call()`, this method does not wait for a response.
337 /// Use this for notifications or commands that don't need acknowledgment.
338 ///
339 /// # Example
340 ///
341 /// ```rust,ignore
342 /// // Send notification without waiting for response
343 /// actr.tell(LogEvent {
344 /// level: "INFO".to_string(),
345 /// message: "User logged in".to_string(),
346 /// }).await?;
347 ///
348 /// // Generated method (if codegen supports tell)
349 /// actr.log_event(LogEvent { ... }).await?;
350 /// ```
351 ///
352 /// # Performance
353 ///
354 /// - **Latency**: ~10μs (in-process, zero serialization)
355 /// - **No blocking**: Returns immediately after sending
356 /// - **No response**: Caller won't know if message was processed
357 #[cfg_attr(feature = "opentelemetry", tracing::instrument(skip_all))]
358 pub async fn tell<R>(&self, message: R) -> ActorResult<()>
359 where
360 R: actr_protocol::RpcRequest + ProstMessage,
361 {
362 // Encode message
363 let payload: Bytes = message.encode_to_vec().into();
364
365 // Create envelope (note: request_id still included for tracing)
366 #[cfg_attr(not(feature = "opentelemetry"), allow(unused_mut))]
367 let mut envelope = RpcEnvelope {
368 route_key: R::route_key().to_string(),
369 payload: Some(payload),
370 error: None,
371 traceparent: None,
372 tracestate: None,
373 request_id: uuid::Uuid::new_v4().to_string(),
374 metadata: vec![],
375 timeout_ms: 0, // No timeout for one-way messages
376 };
377 // Inject tracing context
378 #[cfg(feature = "opentelemetry")]
379 {
380 use crate::wire::webrtc::trace::inject_span_context_to_rpc;
381 inject_span_context_to_rpc(&tracing::Span::current(), &mut envelope);
382 }
383
384 // Send message without waiting for response
385 self.shared
386 .inproc_gate
387 .send_message(&self.shared.actor_id, envelope)
388 .await
389 }
390
391 /// Trigger Actor shutdown
392 ///
393 /// This signals the Actor to stop, but does not wait for completion.
394 /// Use `wait_for_shutdown()` to wait for cleanup to finish.
395 pub fn shutdown(&self) {
396 tracing::info!("🛑 Shutdown requested for Actor {:?}", self.shared.actor_id);
397 self.shared.shutdown_token.cancel();
398 }
399
400 /// Wait for Actor to fully shutdown
401 ///
402 /// This waits for the shutdown signal to be triggered.
403 /// All background tasks will be aborted when the last `ActrRef` is dropped.
404 pub async fn wait_for_shutdown(&self) {
405 self.shared.shutdown_token.cancelled().await;
406 // Take ownership of the current handles so we can await them as Futures.
407 let mut guard = self.shared.task_handles.lock().await;
408 let handles = std::mem::take(&mut *guard);
409 drop(guard);
410 tracing::debug!("Waiting for tasks to complete: {:?}", handles.len());
411 // All tasks have been asked to shut down; wait for them with a timeout,
412 // and abort any that don't finish in time to avoid leaking background work.
413 for handle in handles {
414 let sleep = tokio::time::sleep(Duration::from_secs(5));
415 tokio::pin!(handle);
416 tokio::pin!(sleep);
417
418 tokio::select! {
419 res = &mut handle => {
420 match res {
421 Ok(_) => {
422 tracing::debug!("Task completed");
423 }
424 Err(e) => {
425 tracing::error!("Task failed: {:?}", e);
426 }
427 }
428 }
429 _ = sleep => {
430 tracing::warn!("Task timed out after 5s, aborting");
431 handle.abort();
432 }
433 }
434 }
435 }
436
437 /// Check if Actor is shutting down
438 pub fn is_shutting_down(&self) -> bool {
439 self.shared.shutdown_token.is_cancelled()
440 }
441
442 ///
443 /// This consumes the `ActrRef` and waits for signal (Ctrl+C / SIGTERM) , then triggers shutdown.
444 ///
445 /// # Example
446 ///
447 /// ```rust,ignore
448 /// let actr = node.start().await?;
449 /// actr.wait_for_ctrl_c_and_shutdown().await?;
450 /// ```
451 pub async fn wait_for_ctrl_c_and_shutdown(self) -> ActorResult<()> {
452 #[cfg(unix)]
453 {
454 use tokio::signal::unix::{SignalKind, signal};
455
456 let mut sigint = signal(SignalKind::interrupt()).map_err(|e| {
457 ProtocolError::TransportError(format!("Signal handler error (SIGINT): {e}"))
458 })?;
459 let mut sigterm = signal(SignalKind::terminate()).map_err(|e| {
460 ProtocolError::TransportError(format!("Signal handler error (SIGTERM): {e}"))
461 })?;
462
463 tokio::select! {
464 _ = sigint.recv() => {
465 tracing::info!("📡 Received SIGINT (Ctrl+C) signal");
466 }
467 _ = sigterm.recv() => {
468 tracing::info!("📡 Received SIGTERM signal");
469 }
470 }
471 }
472
473 #[cfg(not(unix))]
474 {
475 tokio::signal::ctrl_c()
476 .await
477 .map_err(|e| ProtocolError::TransportError(format!("Ctrl+C signal error: {e}")))?;
478
479 tracing::info!("📡 Received Ctrl+C signal");
480 }
481
482 self.shutdown();
483 self.wait_for_shutdown().await;
484
485 Ok(())
486 }
487}
488
489impl Drop for ActrRefShared {
490 fn drop(&mut self) {
491 tracing::info!(
492 "🧹 ActrRefShared dropping - cleaning up Actor {:?}",
493 self.actor_id
494 );
495
496 // Cancel shutdown token
497 self.shutdown_token.cancel();
498
499 // Abort all background tasks (best-effort)
500 if let Ok(mut handles) = self.task_handles.try_lock() {
501 for handle in handles.drain(..) {
502 handle.abort();
503 }
504 } else {
505 tracing::warn!(
506 "⚠️ Failed to lock task_handles mutex during Drop; some tasks may still be running"
507 );
508 }
509
510 tracing::debug!(
511 "✅ All background tasks aborted for Actor {:?}",
512 self.actor_id
513 );
514 }
515}