use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use acton_ern::Ern;
use futures::future::join_all;
use tokio::sync::oneshot;
use tracing::{error, trace};
use crate::actor::{AgentConfig, Idle, ManagedAgent};
use crate::common::acton_inner::ActonInner;
use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
use crate::traits::AgentHandleInterface;
#[derive(Debug, Clone, Default)]
pub struct AgentRuntime(pub(crate) ActonInner);
impl AgentRuntime {
pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
let actor_config = AgentConfig::new(
Ern::with_root(name).expect("Failed to create root Ern for new agent"), None, Some(self.0.broker.clone()), )
.expect("Failed to create actor config");
let runtime = self.clone();
let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
trace!("Registering new top-level agent: {}", new_actor.id());
self.0
.roots
.insert(new_actor.id.clone(), new_actor.handle.clone());
new_actor
}
pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
self.new_agent_with_name("agent".to_string()).await }
#[inline]
pub fn agent_count(&self) -> usize {
self.0.roots.len()
}
pub async fn new_agent_with_config<State>(
&mut self,
mut config: AgentConfig,
) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
let acton_ready = self.clone();
if config.broker.is_none() {
config.broker = Some(self.0.broker.clone());
}
let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
trace!(
"Created new agent builder with config, id: {}",
new_agent.id()
);
self.0
.roots
.insert(new_agent.id.clone(), new_agent.handle.clone());
new_agent
}
#[inline]
pub fn broker(&self) -> BrokerRef {
self.0.broker.clone()
}
pub async fn spawn_agent_with_setup_fn<State>(
&mut self,
mut config: AgentConfig,
setup_fn: impl FnOnce(
ManagedAgent<Idle, State>,
) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
) -> anyhow::Result<AgentHandle>
where
State: Default + Send + Debug + 'static,
{
let acton_ready = self.clone();
if config.broker.is_none() {
config.broker = Some(self.0.broker.clone());
}
let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
let agent_id = new_agent.id().clone(); trace!("Running setup function for agent: {}", agent_id);
let handle = setup_fn(new_agent).await; trace!("Agent {} setup complete, registering handle.", agent_id);
self.0.roots.insert(handle.id.clone(), handle.clone()); Ok(handle)
}
pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
use std::env;
use std::time::Duration;
use tokio::time::timeout as tokio_timeout;
trace!("Sending Terminate signal to all root agents.");
let stop_futures: Vec<_> = self
.0
.roots
.iter()
.map(|item| {
let handle = item.value().clone();
async move {
if let Err(e) = handle.stop().await {
error!("Error stopping agent {}: {:?}", handle.id(), e);
}
}
})
.collect();
let timeout_ms: u64 = env::var("ACTON_SYSTEM_SHUTDOWN_TIMEOUT_MS")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(30_000);
trace!("Waiting for all agents to finish gracefully...");
if tokio_timeout(Duration::from_millis(timeout_ms), join_all(stop_futures))
.await
.is_err()
{
error!("System-wide shutdown timeout expired after {} ms. Forcefully cancelling remaining tasks.", timeout_ms);
self.0.cancellation_token.cancel(); } else {
trace!("All agents completed gracefully.");
}
trace!("Stopping the system broker...");
match tokio_timeout(Duration::from_millis(timeout_ms), self.0.broker.stop()).await {
Ok(res) => res?,
Err(_) => {
error!(
"Timeout waiting for broker to shut down after {} ms",
timeout_ms
);
return Err(anyhow::anyhow!(
"Timeout while waiting for system broker to shut down after {} ms",
timeout_ms
));
}
}
trace!("System shutdown complete.");
Ok(())
}
pub async fn spawn_agent<State>(
&mut self,
setup_fn: impl FnOnce(
ManagedAgent<Idle, State>,
) -> Pin<Box<dyn Future<Output = AgentHandle> + Send + 'static>>,
) -> anyhow::Result<AgentHandle>
where
State: Default + Send + Debug + 'static,
{
let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
self.spawn_agent_with_setup_fn(config, setup_fn).await
}
}
impl From<ActonApp> for AgentRuntime {
fn from(_acton: ActonApp) -> Self {
trace!("Starting Acton system initialization (From<ActonApp>)");
let (sender, receiver) = oneshot::channel();
let mut runtime = AgentRuntime(ActonInner::default());
let runtime_clone = runtime.clone();
assert!(
!runtime_clone.0.cancellation_token.is_cancelled(),
"ActonInner cancellation_token must be present and active before Broker initialization"
);
tokio::spawn(async move {
trace!("Broker initialization task started.");
let broker = AgentBroker::initialize(runtime_clone).await;
trace!("Broker initialization task finished, sending handle.");
let _ = sender.send(broker); });
trace!("Blocking current thread to wait for broker initialization...");
let broker = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { receiver.await.expect("Broker initialization failed") })
});
trace!("Broker handle received, constructing AgentRuntime.");
runtime.0.broker = broker;
runtime
}
}