use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use acton_ern::Ern;
use futures::future::join_all;
use tokio::sync::oneshot;
use tracing::{trace, error};
use crate::actor::{AgentConfig, Idle, ManagedAgent};
use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
use crate::common::acton_inner::ActonInner;
use crate::traits::AgentHandleInterface;
#[derive(Debug, Clone, Default)]
pub struct AgentRuntime(pub(crate) ActonInner);
impl AgentRuntime {
pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
let actor_config = AgentConfig::new(
Ern::with_root(name).expect("Failed to create root Ern for new agent"), None, Some(self.0.broker.clone()), ).expect("Failed to create actor config");
let runtime = self.clone();
let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
trace!("Registering new top-level agent: {}", new_actor.id());
self.0.roots.insert(new_actor.id.clone(), new_actor.handle.clone());
new_actor
}
pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
self.new_agent_with_name("agent".to_string()).await }
#[inline]
pub fn agent_count(&self) -> usize {
self.0.roots.len()
}
pub async fn new_agent_with_config<State>(
&mut self,
mut config: AgentConfig,
) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
let acton_ready = self.clone();
if config.broker.is_none() {
config.broker = Some(self.0.broker.clone());
}
let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
trace!("Created new agent builder with config, id: {}", new_agent.id());
self.0.roots.insert(new_agent.id.clone(), new_agent.handle.clone());
new_agent
}
#[inline]
pub fn broker(&self) -> BrokerRef {
self.0.broker.clone()
}
pub async fn spawn_agent_with_setup_fn<State>(
&mut self,
mut config: AgentConfig,
setup_fn: impl FnOnce(
ManagedAgent<Idle, State>,
) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
) -> anyhow::Result<AgentHandle>
where
State: Default + Send + Debug + 'static,
{
let acton_ready = self.clone();
if config.broker.is_none() {
config.broker = Some(self.0.broker.clone());
}
let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
let agent_id = new_agent.id().clone(); trace!("Running setup function for agent: {}", agent_id);
let handle = setup_fn(new_agent).await; trace!("Agent {} setup complete, registering handle.", agent_id);
self.0.roots.insert(handle.id.clone(), handle.clone()); Ok(handle)
}
pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
trace!("Initiating shutdown of all top-level agents...");
let stop_futures = self.0.roots.iter().map(|item| {
let root_handle = item.value().clone();
async move {
trace!("Sending stop signal to root agent: {}", root_handle.id());
root_handle.stop().await }
});
let results: Vec<anyhow::Result<()>> = join_all(stop_futures).await;
trace!("All root agent stop futures completed.");
for result in results {
if let Err(e) = result {
error!("Error stopping agent during shutdown: {:?}", e);
}
}
trace!("Stopping the system broker...");
self.0.broker.stop().await?; trace!("System shutdown complete.");
Ok(())
}
pub async fn spawn_agent<State>(
&mut self,
setup_fn: impl FnOnce(
ManagedAgent<Idle, State>,
) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
) -> anyhow::Result<AgentHandle>
where
State: Default + Send + Debug + 'static,
{
let config = AgentConfig::new(Ern::default(), None, Some(self.broker()))?;
self.spawn_agent_with_setup_fn(config, setup_fn).await
}
}
impl From<ActonApp> for AgentRuntime {
fn from(_acton: ActonApp) -> Self {
trace!("Starting Acton system initialization (From<ActonApp>)");
let (sender, receiver) = oneshot::channel();
tokio::spawn(async move {
trace!("Broker initialization task started.");
let broker = AgentBroker::initialize().await;
trace!("Broker initialization task finished, sending handle.");
let _ = sender.send(broker); });
trace!("Blocking current thread to wait for broker initialization...");
let broker = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { receiver.await.expect("Broker initialization failed") })
});
trace!("Broker handle received, constructing AgentRuntime.");
AgentRuntime(ActonInner { broker, ..Default::default() })
}
}