use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use acton_ern::Ern;
use futures::future::join_all;
use tokio::sync::oneshot;
use tracing::trace;
use crate::actor::{AgentConfig, Idle, ManagedAgent};
use crate::common::{ActonApp, AgentBroker, AgentHandle, BrokerRef};
use crate::common::acton_inner::ActonInner;
use crate::traits::Actor;
#[derive(Debug, Clone, Default)]
pub struct AgentRuntime(pub(crate) ActonInner);
impl AgentRuntime {
pub async fn new_agent_with_name<State>(&mut self, name: String) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
let actor_config = AgentConfig::new(
Ern::with_root(name).unwrap(),
None,
Some(self.0.broker.clone()),
).expect("Failed to create actor config");
let runtime = self.clone();
let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
self.0.roots.insert(new_actor.id.clone(), new_actor.handle.clone());
new_actor
}
pub async fn new_agent<State>(&mut self) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
let actor_config = AgentConfig::new(
Ern::with_root("agent").unwrap(),
None,
Some(self.0.broker.clone()),
).expect("Failed to create actor config");
let runtime = self.clone();
let new_actor = ManagedAgent::new(&Some(runtime), Some(actor_config)).await;
self.0.roots.insert(new_actor.id.clone(), new_actor.handle.clone());
new_actor
}
pub fn agent_count(&self) -> usize {
self.0.roots.len()
}
pub async fn create_actor_with_config<State>(
&mut self,
mut config: AgentConfig,
) -> ManagedAgent<Idle, State>
where
State: Default + Send + Debug + 'static,
{
let acton_ready = self.clone();
if config.broker.is_none() {
config.broker = Some(self.0.broker.clone());
}
let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
trace!("Created new actor with id {}", new_agent.id);
self.0.roots.insert(new_agent.id.clone(), new_agent.handle.clone());
new_agent
}
pub fn broker(&self) -> BrokerRef {
self.0.broker.clone()
}
pub async fn spawn_agent_with_setup_fn<State>(
&mut self,
mut config: AgentConfig,
setup_fn: impl FnOnce(
ManagedAgent<Idle, State>,
) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
) -> anyhow::Result<AgentHandle>
where
State: Default + Send + Debug + 'static,
{
let acton_ready = self.clone();
if config.broker.is_none() {
config.broker = Some(self.0.broker.clone());
}
let new_agent = ManagedAgent::new(&Some(acton_ready), Some(config)).await;
let handle = setup_fn(new_agent).await;
self.0.roots.insert(handle.id.clone(), handle.clone());
Ok(handle)
}
pub async fn shutdown_all(&mut self) -> anyhow::Result<()> {
let suspend_futures = self.0.roots.iter().map(|item| {
let root_actor = item.value().clone(); async move {
root_actor.stop().await
}
});
let results: Vec<anyhow::Result<()>> = join_all(suspend_futures).await;
for result in results {
result?;
}
self.0.broker.stop().await?;
Ok(())
}
pub async fn spawn_actor<State>(
&mut self,
setup_fn: impl FnOnce(
ManagedAgent<Idle, State>,
) -> Pin<Box<dyn Future<Output=AgentHandle> + Send + 'static>>,
) -> anyhow::Result<AgentHandle>
where
State: Default + Send + Debug + 'static,
{
let broker = self.broker();
let mut config = AgentConfig::new(Ern::default(), None, Some(broker.clone()))?;
if config.broker.is_none() {
config.broker = Some(self.0.broker.clone());
}
let runtime = self.clone();
let new_agent = ManagedAgent::new(&Some(runtime), Some(config)).await;
let handle = setup_fn(new_agent).await;
self.0.roots.insert(handle.id.clone(), handle.clone());
Ok(handle)
}
}
impl From<ActonApp> for AgentRuntime {
fn from(_acton: ActonApp) -> Self {
let (sender, receiver) = oneshot::channel();
tokio::spawn(async move {
let broker = AgentBroker::initialize().await;
let _ = sender.send(broker);
});
let broker = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { receiver.await.expect("Broker initialization failed") })
});
AgentRuntime(ActonInner { broker: broker.clone(), ..Default::default() })
}
}