Skip to main content

actr_hyper/
actr_ref.rs

1//! ActrRef - Lightweight reference to a running Actor
2//!
3//! # Key Characteristics
4//!
5//! - **Cloneable**: Can be shared across tasks
6//! - **Lightweight**: Contains only an `Arc` to shared state
7//! - **Auto-cleanup**: Last `ActrRef` drop triggers resource cleanup
8//!
9//! # Usage
10//!
11//! ```rust,ignore
12//! let actr = Node::from_config_file("actr.toml")
13//!     .await?
14//!     .attach(&package)
15//!     .await?
16//!     .register(&ais_endpoint)
17//!     .await?
18//!     .start()
19//!     .await?;
20//!
21//! println!("actor id = {:?}", actr.actor_id());
22//!
23//! // Wait for process signals and then perform a graceful shutdown.
24//! actr.wait_for_ctrl_c_and_shutdown().await?;
25//! ```
26//!
27//! The typestate chain is `Node<Init> โ†’ Node<Attached> โ†’ Node<Registered>
28//! โ†’ ActrRef`. `Node::from_hyper` is the escape hatch when you need to own
29//! `HyperConfig` construction yourself.
30
31use crate::context::{BootstrapContextBuilder, RuntimeContext};
32use crate::lifecycle::CredentialState;
33use actr_framework::{Context as _, Dest};
34use actr_protocol::{ActorResult, ActrError, ActrId, ActrType, RpcRequest};
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::sync::Mutex;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41/// Shared state between all `ActrRef` clones
42///
43/// This is an internal implementation detail. When the last `ActrRef` is dropped,
44/// this struct's `Drop` impl will trigger shutdown and cleanup all resources.
45pub(crate) struct ActrRefShared {
46    /// Actor ID
47    pub(crate) actor_id: ActrId,
48    /// Builder used to materialize application-side runtime contexts on demand.
49    pub(crate) bootstrap_ctx_builder: BootstrapContextBuilder,
50    /// Current credential state for building application-side contexts.
51    pub(crate) credential_state: CredentialState,
52    /// Shutdown signal
53    pub(crate) shutdown_token: CancellationToken,
54    /// Background task handles (receive loops, WebRTC coordinator, etc.)
55    pub(crate) task_handles: Mutex<Vec<JoinHandle<()>>>,
56}
57
58/// ActrRef - Lightweight reference to a running Actor
59///
60/// This is the primary handle returned by `ActrNode::start()`.
61pub struct ActrRef {
62    pub(crate) shared: Arc<ActrRefShared>,
63}
64
65impl Clone for ActrRef {
66    fn clone(&self) -> Self {
67        Self {
68            shared: Arc::clone(&self.shared),
69        }
70    }
71}
72
73impl ActrRef {
74    /// Get Actor ID
75    pub fn actor_id(&self) -> &ActrId {
76        &self.shared.actor_id
77    }
78
79    /// Call the local workload with a typed RPC request.
80    ///
81    /// Convenience wrapper around `app_context().call(&Dest::Local, request)`.
82    /// Use this from app-side code to invoke the local guest workload.
83    pub async fn call<R: RpcRequest>(&self, request: R) -> ActorResult<R::Response> {
84        self.app_context().await.call(&Dest::Local, request).await
85    }
86
87    /// Call a remote actor directly with a typed RPC request.
88    ///
89    /// Convenience wrapper around `app_context().call(&Dest::Actor(target), request)`.
90    /// Use this when the client has no local guest workload and calls the remote actor directly.
91    pub async fn call_remote<R: RpcRequest>(
92        &self,
93        target: ActrId,
94        request: R,
95    ) -> ActorResult<R::Response> {
96        self.app_context()
97            .await
98            .call(&Dest::Actor(target), request)
99            .await
100    }
101
102    /// Discover route candidates for the given actor type.
103    ///
104    /// Returns up to `count` actor IDs registered under `target_type`.
105    /// Convenience wrapper for app-side discovery without holding a `RuntimeContext`.
106    ///
107    /// Note: The signaling protocol currently returns one candidate per request.
108    /// This method will make up to `count` requests to collect multiple unique candidates.
109    pub async fn discover_route_candidates(
110        &self,
111        target_type: &ActrType,
112        count: usize,
113    ) -> ActorResult<Vec<ActrId>> {
114        let ctx = self.app_context().await;
115        let mut results = Vec::with_capacity(count);
116
117        for _ in 0..count {
118            match ctx.discover_route_candidate(target_type).await {
119                Ok(id) => {
120                    if !results.contains(&id) {
121                        results.push(id);
122                    }
123                }
124                Err(e) => {
125                    // Return partial results if we have any, otherwise propagate error
126                    if results.is_empty() {
127                        return Err(e);
128                    }
129                    break;
130                }
131            }
132        }
133        Ok(results)
134    }
135
136    /// Create an application-side runtime context bound to this running actor.
137    pub async fn app_context(&self) -> RuntimeContext {
138        let credential = self.shared.credential_state.credential().await;
139        self.shared
140            .bootstrap_ctx_builder
141            .build_bootstrap(&self.shared.actor_id, &credential)
142    }
143
144    /// Trigger Actor shutdown
145    ///
146    /// This signals the Actor to stop, but does not wait for completion.
147    /// Use `wait_for_shutdown()` to wait for cleanup to finish.
148    pub fn shutdown(&self) {
149        tracing::info!(
150            "๐Ÿ›‘ Shutdown requested for Actor {}",
151            actr_protocol::ActrId::to_string_repr(&self.shared.actor_id)
152        );
153        self.shared.shutdown_token.cancel();
154    }
155
156    /// Wait for Actor to fully shutdown
157    ///
158    /// This waits for the shutdown signal to be triggered.
159    /// All background tasks will be aborted when the last `ActrRef` is dropped.
160    pub async fn wait_for_shutdown(&self) {
161        self.shared.shutdown_token.cancelled().await;
162        // Take ownership of the current handles so we can await them as Futures.
163        let mut guard = self.shared.task_handles.lock().await;
164        let handles = std::mem::take(&mut *guard);
165        drop(guard);
166        tracing::debug!("Waiting for tasks to complete: {:?}", handles.len());
167
168        // All tasks have been asked to shut down; wait for them with a timeout,
169        // and abort any that don't finish in time to avoid leaking background work.
170        for handle in handles {
171            let sleep = tokio::time::sleep(Duration::from_secs(5));
172            tokio::pin!(handle);
173            tokio::pin!(sleep);
174
175            tokio::select! {
176                res = &mut handle => {
177                    match res {
178                        Ok(_) => {
179                            tracing::debug!("Task completed");
180                        }
181                        Err(e) => {
182                            tracing::error!("Task failed: {:?}", e);
183                        }
184                    }
185                }
186                _ = sleep => {
187                    tracing::warn!("Task timed out after 5s, aborting");
188                    handle.abort();
189                }
190            }
191        }
192    }
193
194    /// Check if Actor is shutting down
195    pub fn is_shutting_down(&self) -> bool {
196        self.shared.shutdown_token.is_cancelled()
197    }
198
199    /// This consumes the `ActrRef` and waits for signal (Ctrl+C / SIGTERM),
200    /// then triggers shutdown.
201    ///
202    /// # Example
203    ///
204    /// ```rust,ignore
205    /// let actr = node.start().await?;
206    /// actr.wait_for_ctrl_c_and_shutdown().await?;
207    /// ```
208    pub async fn wait_for_ctrl_c_and_shutdown(self) -> ActorResult<()> {
209        #[cfg(unix)]
210        {
211            use tokio::signal::unix::{SignalKind, signal};
212
213            let mut sigint = signal(SignalKind::interrupt()).map_err(|e| {
214                ActrError::Unavailable(format!("Signal handler error (SIGINT): {e}"))
215            })?;
216            let mut sigterm = signal(SignalKind::terminate()).map_err(|e| {
217                ActrError::Unavailable(format!("Signal handler error (SIGTERM): {e}"))
218            })?;
219
220            tokio::select! {
221                _ = sigint.recv() => tracing::info!("๐Ÿ“ก Received SIGINT (Ctrl+C) signal"),
222                _ = sigterm.recv() => tracing::info!("๐Ÿ“ก Received SIGTERM signal"),
223            }
224        }
225
226        #[cfg(not(unix))]
227        {
228            tokio::signal::ctrl_c()
229                .await
230                .map_err(|e| ActrError::Unavailable(format!("Ctrl+C signal error: {e}")))?;
231            tracing::info!("๐Ÿ“ก Received Ctrl+C signal");
232        }
233
234        self.shutdown();
235        self.wait_for_shutdown().await;
236        Ok(())
237    }
238}
239
240impl Drop for ActrRefShared {
241    fn drop(&mut self) {
242        tracing::info!(
243            "๐Ÿงน ActrRefShared dropping - cleaning up Actor {}",
244            actr_protocol::ActrId::to_string_repr(&self.actor_id)
245        );
246
247        // Cancel shutdown token
248        self.shutdown_token.cancel();
249        // Abort all background tasks (best-effort)
250        if let Ok(mut handles) = self.task_handles.try_lock() {
251            for handle in handles.drain(..) {
252                handle.abort();
253            }
254        } else {
255            tracing::warn!(
256                "โš ๏ธ Failed to lock task_handles mutex during Drop; some tasks may still be running"
257            );
258        }
259
260        tracing::debug!(
261            "โœ… All background tasks aborted for Actor {}",
262            actr_protocol::ActrId::to_string_repr(&self.actor_id)
263        );
264    }
265}