actr_hyper/actr_ref.rs
1//! ActrRef - Lightweight reference to a running Actor
2//!
3//! # Key Characteristics
4//!
5//! - **Cloneable**: Can be shared across tasks
6//! - **Lightweight**: Contains only an `Arc` to shared state
7//! - **Auto-cleanup**: Last `ActrRef` drop triggers resource cleanup
8//!
9//! # Usage
10//!
11//! ```rust,ignore
12//! let actr = Node::from_config_file("actr.toml")
13//! .await?
14//! .attach(&package)
15//! .await?
16//! .register(&ais_endpoint)
17//! .await?
18//! .start()
19//! .await?;
20//!
21//! println!("actor id = {:?}", actr.actor_id());
22//!
23//! // Wait for process signals and then perform a graceful shutdown.
24//! actr.wait_for_ctrl_c_and_shutdown().await?;
25//! ```
26//!
27//! The typestate chain is `Node<Init> โ Node<Attached> โ Node<Registered>
28//! โ ActrRef`. `Node::from_hyper` is the escape hatch when you need to own
29//! `HyperConfig` construction yourself.
30
31use crate::context::{BootstrapContextBuilder, RuntimeContext};
32use crate::lifecycle::CredentialState;
33use actr_framework::{Context as _, Dest};
34use actr_protocol::{ActorResult, ActrError, ActrId, ActrType, RpcRequest};
35use std::sync::Arc;
36use std::time::Duration;
37use tokio::sync::Mutex;
38use tokio::task::JoinHandle;
39use tokio_util::sync::CancellationToken;
40
41/// Shared state between all `ActrRef` clones
42///
43/// This is an internal implementation detail. When the last `ActrRef` is dropped,
44/// this struct's `Drop` impl will trigger shutdown and cleanup all resources.
45pub(crate) struct ActrRefShared {
46 /// Actor ID
47 pub(crate) actor_id: ActrId,
48 /// Builder used to materialize application-side runtime contexts on demand.
49 pub(crate) bootstrap_ctx_builder: BootstrapContextBuilder,
50 /// Current credential state for building application-side contexts.
51 pub(crate) credential_state: CredentialState,
52 /// Shutdown signal
53 pub(crate) shutdown_token: CancellationToken,
54 /// Background task handles (receive loops, WebRTC coordinator, etc.)
55 pub(crate) task_handles: Mutex<Vec<JoinHandle<()>>>,
56}
57
58/// ActrRef - Lightweight reference to a running Actor
59///
60/// This is the primary handle returned by `ActrNode::start()`.
61pub struct ActrRef {
62 pub(crate) shared: Arc<ActrRefShared>,
63}
64
65impl Clone for ActrRef {
66 fn clone(&self) -> Self {
67 Self {
68 shared: Arc::clone(&self.shared),
69 }
70 }
71}
72
73impl ActrRef {
74 /// Get Actor ID
75 pub fn actor_id(&self) -> &ActrId {
76 &self.shared.actor_id
77 }
78
79 /// Call the local workload with a typed RPC request.
80 ///
81 /// Convenience wrapper around `app_context().call(&Dest::Local, request)`.
82 /// Use this from app-side code to invoke the local guest workload.
83 pub async fn call<R: RpcRequest>(&self, request: R) -> ActorResult<R::Response> {
84 self.app_context().await.call(&Dest::Local, request).await
85 }
86
87 /// Call a remote actor directly with a typed RPC request.
88 ///
89 /// Convenience wrapper around `app_context().call(&Dest::Actor(target), request)`.
90 /// Use this when the client has no local guest workload and calls the remote actor directly.
91 pub async fn call_remote<R: RpcRequest>(
92 &self,
93 target: ActrId,
94 request: R,
95 ) -> ActorResult<R::Response> {
96 self.app_context()
97 .await
98 .call(&Dest::Actor(target), request)
99 .await
100 }
101
102 /// Discover route candidates for the given actor type.
103 ///
104 /// Returns up to `count` actor IDs registered under `target_type`.
105 /// Convenience wrapper for app-side discovery without holding a `RuntimeContext`.
106 ///
107 /// Note: The signaling protocol currently returns one candidate per request.
108 /// This method will make up to `count` requests to collect multiple unique candidates.
109 pub async fn discover_route_candidates(
110 &self,
111 target_type: &ActrType,
112 count: usize,
113 ) -> ActorResult<Vec<ActrId>> {
114 let ctx = self.app_context().await;
115 let mut results = Vec::with_capacity(count);
116
117 for _ in 0..count {
118 match ctx.discover_route_candidate(target_type).await {
119 Ok(id) => {
120 if !results.contains(&id) {
121 results.push(id);
122 }
123 }
124 Err(e) => {
125 // Return partial results if we have any, otherwise propagate error
126 if results.is_empty() {
127 return Err(e);
128 }
129 break;
130 }
131 }
132 }
133 Ok(results)
134 }
135
136 /// Create an application-side runtime context bound to this running actor.
137 pub async fn app_context(&self) -> RuntimeContext {
138 let credential = self.shared.credential_state.credential().await;
139 self.shared
140 .bootstrap_ctx_builder
141 .build_bootstrap(&self.shared.actor_id, &credential)
142 }
143
144 /// Trigger Actor shutdown
145 ///
146 /// This signals the Actor to stop, but does not wait for completion.
147 /// Use `wait_for_shutdown()` to wait for cleanup to finish.
148 pub fn shutdown(&self) {
149 tracing::info!(
150 "๐ Shutdown requested for Actor {}",
151 actr_protocol::ActrId::to_string_repr(&self.shared.actor_id)
152 );
153 self.shared.shutdown_token.cancel();
154 }
155
156 /// Wait for Actor to fully shutdown
157 ///
158 /// This waits for the shutdown signal to be triggered.
159 /// All background tasks will be aborted when the last `ActrRef` is dropped.
160 pub async fn wait_for_shutdown(&self) {
161 self.shared.shutdown_token.cancelled().await;
162 // Take ownership of the current handles so we can await them as Futures.
163 let mut guard = self.shared.task_handles.lock().await;
164 let handles = std::mem::take(&mut *guard);
165 drop(guard);
166 tracing::debug!("Waiting for tasks to complete: {:?}", handles.len());
167
168 // All tasks have been asked to shut down; wait for them with a timeout,
169 // and abort any that don't finish in time to avoid leaking background work.
170 for handle in handles {
171 let sleep = tokio::time::sleep(Duration::from_secs(5));
172 tokio::pin!(handle);
173 tokio::pin!(sleep);
174
175 tokio::select! {
176 res = &mut handle => {
177 match res {
178 Ok(_) => {
179 tracing::debug!("Task completed");
180 }
181 Err(e) => {
182 tracing::error!("Task failed: {:?}", e);
183 }
184 }
185 }
186 _ = sleep => {
187 tracing::warn!("Task timed out after 5s, aborting");
188 handle.abort();
189 }
190 }
191 }
192 }
193
194 /// Check if Actor is shutting down
195 pub fn is_shutting_down(&self) -> bool {
196 self.shared.shutdown_token.is_cancelled()
197 }
198
199 /// This consumes the `ActrRef` and waits for signal (Ctrl+C / SIGTERM),
200 /// then triggers shutdown.
201 ///
202 /// # Example
203 ///
204 /// ```rust,ignore
205 /// let actr = node.start().await?;
206 /// actr.wait_for_ctrl_c_and_shutdown().await?;
207 /// ```
208 pub async fn wait_for_ctrl_c_and_shutdown(self) -> ActorResult<()> {
209 #[cfg(unix)]
210 {
211 use tokio::signal::unix::{SignalKind, signal};
212
213 let mut sigint = signal(SignalKind::interrupt()).map_err(|e| {
214 ActrError::Unavailable(format!("Signal handler error (SIGINT): {e}"))
215 })?;
216 let mut sigterm = signal(SignalKind::terminate()).map_err(|e| {
217 ActrError::Unavailable(format!("Signal handler error (SIGTERM): {e}"))
218 })?;
219
220 tokio::select! {
221 _ = sigint.recv() => tracing::info!("๐ก Received SIGINT (Ctrl+C) signal"),
222 _ = sigterm.recv() => tracing::info!("๐ก Received SIGTERM signal"),
223 }
224 }
225
226 #[cfg(not(unix))]
227 {
228 tokio::signal::ctrl_c()
229 .await
230 .map_err(|e| ActrError::Unavailable(format!("Ctrl+C signal error: {e}")))?;
231 tracing::info!("๐ก Received Ctrl+C signal");
232 }
233
234 self.shutdown();
235 self.wait_for_shutdown().await;
236 Ok(())
237 }
238}
239
240impl Drop for ActrRefShared {
241 fn drop(&mut self) {
242 tracing::info!(
243 "๐งน ActrRefShared dropping - cleaning up Actor {}",
244 actr_protocol::ActrId::to_string_repr(&self.actor_id)
245 );
246
247 // Cancel shutdown token
248 self.shutdown_token.cancel();
249 // Abort all background tasks (best-effort)
250 if let Ok(mut handles) = self.task_handles.try_lock() {
251 for handle in handles.drain(..) {
252 handle.abort();
253 }
254 } else {
255 tracing::warn!(
256 "โ ๏ธ Failed to lock task_handles mutex during Drop; some tasks may still be running"
257 );
258 }
259
260 tracing::debug!(
261 "โ
All background tasks aborted for Actor {}",
262 actr_protocol::ActrId::to_string_repr(&self.actor_id)
263 );
264 }
265}