use crate::context::{BootstrapContextBuilder, RuntimeContext};
use crate::lifecycle::CredentialState;
use actr_framework::{Context as _, Dest};
use actr_protocol::{ActorResult, ActrError, ActrId, ActrType, RpcRequest};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
pub(crate) struct ActrRefShared {
pub(crate) actor_id: ActrId,
pub(crate) bootstrap_ctx_builder: BootstrapContextBuilder,
pub(crate) credential_state: CredentialState,
pub(crate) shutdown_token: CancellationToken,
pub(crate) task_handles: Mutex<Vec<JoinHandle<()>>>,
}
pub struct ActrRef {
pub(crate) shared: Arc<ActrRefShared>,
}
impl Clone for ActrRef {
fn clone(&self) -> Self {
Self {
shared: Arc::clone(&self.shared),
}
}
}
impl ActrRef {
pub fn actor_id(&self) -> &ActrId {
&self.shared.actor_id
}
pub async fn call<R: RpcRequest>(&self, request: R) -> ActorResult<R::Response> {
self.app_context().await.call(&Dest::Local, request).await
}
pub async fn call_remote<R: RpcRequest>(
&self,
target: ActrId,
request: R,
) -> ActorResult<R::Response> {
self.app_context()
.await
.call(&Dest::Actor(target), request)
.await
}
pub async fn discover_route_candidates(
&self,
target_type: &ActrType,
count: usize,
) -> ActorResult<Vec<ActrId>> {
let ctx = self.app_context().await;
let mut results = Vec::with_capacity(count);
for _ in 0..count {
match ctx.discover_route_candidate(target_type).await {
Ok(id) => {
if !results.contains(&id) {
results.push(id);
}
}
Err(e) => {
if results.is_empty() {
return Err(e);
}
break;
}
}
}
Ok(results)
}
pub async fn app_context(&self) -> RuntimeContext {
let credential = self.shared.credential_state.credential().await;
self.shared
.bootstrap_ctx_builder
.build_bootstrap(&self.shared.actor_id, &credential)
}
pub fn shutdown(&self) {
tracing::info!(
"🛑 Shutdown requested for Actor {}",
actr_protocol::ActrId::to_string_repr(&self.shared.actor_id)
);
self.shared.shutdown_token.cancel();
}
pub async fn wait_for_shutdown(&self) {
self.shared.shutdown_token.cancelled().await;
let mut guard = self.shared.task_handles.lock().await;
let handles = std::mem::take(&mut *guard);
drop(guard);
tracing::debug!("Waiting for tasks to complete: {:?}", handles.len());
for handle in handles {
let sleep = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(handle);
tokio::pin!(sleep);
tokio::select! {
res = &mut handle => {
match res {
Ok(_) => {
tracing::debug!("Task completed");
}
Err(e) => {
tracing::error!("Task failed: {:?}", e);
}
}
}
_ = sleep => {
tracing::warn!("Task timed out after 5s, aborting");
handle.abort();
}
}
}
}
pub fn is_shutting_down(&self) -> bool {
self.shared.shutdown_token.is_cancelled()
}
pub async fn wait_for_ctrl_c_and_shutdown(self) -> ActorResult<()> {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigint = signal(SignalKind::interrupt()).map_err(|e| {
ActrError::Unavailable(format!("Signal handler error (SIGINT): {e}"))
})?;
let mut sigterm = signal(SignalKind::terminate()).map_err(|e| {
ActrError::Unavailable(format!("Signal handler error (SIGTERM): {e}"))
})?;
tokio::select! {
_ = sigint.recv() => tracing::info!("📡 Received SIGINT (Ctrl+C) signal"),
_ = sigterm.recv() => tracing::info!("📡 Received SIGTERM signal"),
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c()
.await
.map_err(|e| ActrError::Unavailable(format!("Ctrl+C signal error: {e}")))?;
tracing::info!("📡 Received Ctrl+C signal");
}
self.shutdown();
self.wait_for_shutdown().await;
Ok(())
}
}
impl Drop for ActrRefShared {
fn drop(&mut self) {
tracing::info!(
"🧹 ActrRefShared dropping - cleaning up Actor {}",
actr_protocol::ActrId::to_string_repr(&self.actor_id)
);
self.shutdown_token.cancel();
if let Ok(mut handles) = self.task_handles.try_lock() {
for handle in handles.drain(..) {
handle.abort();
}
} else {
tracing::warn!(
"⚠️ Failed to lock task_handles mutex during Drop; some tasks may still be running"
);
}
tracing::debug!(
"✅ All background tasks aborted for Actor {}",
actr_protocol::ActrId::to_string_repr(&self.actor_id)
);
}
}