use std::sync::Arc;
use adk_core::{AdkError, Agent, EventStream, Result};
use futures::StreamExt;
use tokio::sync::{Notify, RwLock};
use tokio::task::JoinHandle;
use super::event_source::EventSource;
pub type TriggerHandler = Arc<
dyn Fn(
super::event_source::TriggerEvent,
Arc<dyn Agent>,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = Result<EventStream>> + Send>>
+ Send
+ Sync,
>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AmbientAgentStatus {
Running,
Paused,
Stopped,
}
pub struct AmbientAgent {
agent: Arc<dyn Agent>,
source: Arc<dyn EventSource>,
trigger_handler: Option<TriggerHandler>,
status: Arc<RwLock<AmbientAgentStatus>>,
resume_notify: Arc<Notify>,
handle: Option<JoinHandle<()>>,
}
impl AmbientAgent {
pub fn new(agent: Arc<dyn Agent>, source: Arc<dyn EventSource>) -> Self {
Self {
agent,
source,
trigger_handler: None,
status: Arc::new(RwLock::new(AmbientAgentStatus::Stopped)),
resume_notify: Arc::new(Notify::new()),
handle: None,
}
}
pub fn with_trigger_handler(mut self, handler: TriggerHandler) -> Self {
self.trigger_handler = Some(handler);
self
}
pub async fn start(&mut self) -> Result<()> {
let current = *self.status.read().await;
if current != AmbientAgentStatus::Stopped {
return Err(AdkError::agent("agent already running"));
}
let stream = self.source.subscribe().await?;
let status = Arc::clone(&self.status);
let resume_notify = Arc::clone(&self.resume_notify);
let agent = Arc::clone(&self.agent);
let trigger_handler = self.trigger_handler.clone();
*self.status.write().await = AmbientAgentStatus::Running;
let handle = tokio::spawn(async move {
let mut stream = stream;
while let Some(event) = stream.next().await {
loop {
let current_status = *status.read().await;
match current_status {
AmbientAgentStatus::Running => break,
AmbientAgentStatus::Paused => {
resume_notify.notified().await;
}
AmbientAgentStatus::Stopped => return,
}
}
tracing::info!(
agent = agent.name(),
source = %event.source,
"ambient agent triggered"
);
tracing::debug!(payload = %event.payload, "trigger event payload");
if let Some(ref handler) = trigger_handler {
match handler(event, agent.clone()).await {
Ok(mut event_stream) => {
while let Some(result) = event_stream.next().await {
match result {
Ok(ev) => {
tracing::debug!(
author = %ev.author,
"ambient agent produced event"
);
}
Err(e) => {
tracing::warn!(
error = %e,
"ambient agent invocation error"
);
break;
}
}
}
}
Err(e) => {
tracing::warn!(
error = %e,
"ambient agent trigger handler failed"
);
}
}
}
}
});
self.handle = Some(handle);
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
let current = *self.status.read().await;
if current == AmbientAgentStatus::Stopped {
return Err(AdkError::agent("agent already stopped"));
}
*self.status.write().await = AmbientAgentStatus::Stopped;
self.resume_notify.notify_one();
if let Some(handle) = self.handle.take() {
handle.abort();
}
Ok(())
}
pub async fn pause(&mut self) -> Result<()> {
let current = *self.status.read().await;
if current != AmbientAgentStatus::Running {
return Err(AdkError::agent("can only pause a running agent"));
}
*self.status.write().await = AmbientAgentStatus::Paused;
Ok(())
}
pub async fn resume(&mut self) -> Result<()> {
let current = *self.status.read().await;
if current != AmbientAgentStatus::Paused {
return Err(AdkError::agent("can only resume a paused agent"));
}
*self.status.write().await = AmbientAgentStatus::Running;
self.resume_notify.notify_one();
Ok(())
}
pub async fn status(&self) -> AmbientAgentStatus {
*self.status.read().await
}
}
impl Drop for AmbientAgent {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
}
}
}
impl std::fmt::Debug for AmbientAgent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AmbientAgent")
.field("agent", &self.agent.name())
.field("source", &self.source.name())
.finish()
}
}