mod types;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
pub use types::*;
use async_trait::async_trait;
use crate::graph::builder::NodeContext;
use crate::graph::command::NodeResult;
use crate::graph::compiled::next_seq;
use crate::graph::recursion::ChildRun;
use crate::harness::events::EventSink;
use crate::harness::ids::{GraphId, RunId};
use crate::harness::subagent::SubAgent;
use crate::registry::{CapabilityRegistry, ComponentId};
use crate::{Result, TinyAgentsError};
type Handler<S, U> = Box<
dyn Fn(S, NodeContext) -> Pin<Box<dyn Future<Output = Result<NodeResult<U>>> + Send>>
+ Send
+ Sync,
>;
impl<State, Update> SubAgentNode<State, Update> {
pub fn new(
agent: impl Into<String>,
input_mapper: InputMapper<State>,
output_mapper: OutputMapper<Update>,
) -> Self {
Self {
agent: ComponentId::new(agent),
input_mapper,
output_mapper,
policy: SubAgentPolicy::default(),
events: None,
}
}
pub fn from_fns<I, O>(agent: impl Into<String>, input: I, output: O) -> Self
where
I: Fn(&State) -> SubAgentInput + Send + Sync + 'static,
O: Fn(SubAgentOutput) -> Update + Send + Sync + 'static,
{
Self::new(agent, Arc::new(input), Arc::new(output))
}
pub fn with_policy(mut self, policy: SubAgentPolicy) -> Self {
self.policy = policy;
self
}
pub fn with_events(mut self, events: EventSink) -> Self {
self.events = Some(events);
self
}
}
pub fn subagent_node<State, Update, RState>(
node: SubAgentNode<State, Update>,
registry: Arc<CapabilityRegistry<RState>>,
) -> Handler<State, Update>
where
State: Clone + Send + Sync + 'static,
Update: Send + 'static,
RState: Send + Sync + 'static,
{
let node = Arc::new(node);
Box::new(move |state: State, ctx: NodeContext| {
let node = node.clone();
let registry = registry.clone();
Box::pin(async move {
let agent = registry.agent(node.agent.as_str()).ok_or_else(|| {
TinyAgentsError::Capability(format!(
"sub-agent `{}` is not a registered agent",
node.agent.as_str()
))
})?;
let input = (node.input_mapper)(&state);
let events = node.events.clone().unwrap_or_default();
let output = run_with_policy(&agent, input, events, &node.policy).await?;
record_child_run(&ctx, agent.name(), &output);
let update = (node.output_mapper)(output);
Ok(NodeResult::Update(update))
})
})
}
async fn run_with_policy(
agent: &Arc<dyn HarnessAgent>,
input: SubAgentInput,
events: EventSink,
policy: &SubAgentPolicy,
) -> Result<SubAgentOutput> {
let mut attempt = 0;
loop {
let fut = agent.run(input.clone(), events.clone());
let result = match policy.timeout {
Some(timeout) => match tokio::time::timeout(timeout, fut).await {
Ok(result) => result,
Err(_) => Err(TinyAgentsError::Timeout(format!(
"sub-agent `{}` timed out after {timeout:?}",
agent.name()
))),
},
None => fut.await,
};
match result {
Ok(output) => return policy.budget.check(&output, agent.name()).map(|()| output),
Err(err) => {
if policy.retry.should_retry(attempt) && crate::harness::retry::is_retryable(&err) {
attempt += 1;
let backoff = policy.retry.backoff_for_attempt(attempt);
if backoff > Duration::ZERO {
tokio::time::sleep(backoff).await;
}
continue;
}
return Err(err);
}
}
}
}
fn record_child_run(ctx: &NodeContext, agent: &str, output: &SubAgentOutput) {
let Some(sink) = &ctx.child_runs else {
return;
};
let root_run_id = ctx
.root_run_id
.clone()
.unwrap_or_else(|| ctx.run_id.clone());
sink.record(ChildRun {
node: ctx.node_id.clone(),
graph_id: GraphId::new(format!("agent:{agent}")),
run_id: RunId::new(format!("subagent-{}", next_seq())),
root_run_id,
usage: output.usage,
});
}
pub struct HarnessSubAgent<S = (), C = ()>
where
S: Send + Sync,
C: Send + Sync,
{
inner: Arc<SubAgent<S, C>>,
parent_depth: usize,
state: S,
}
impl<S, C> HarnessSubAgent<S, C>
where
S: Send + Sync + Default,
C: Send + Sync + Default,
{
pub fn new(inner: Arc<SubAgent<S, C>>) -> Self {
Self {
inner,
parent_depth: 0,
state: S::default(),
}
}
pub fn with_parent_depth(mut self, parent_depth: usize) -> Self {
self.parent_depth = parent_depth;
self
}
pub fn into_dyn(self) -> Arc<dyn HarnessAgent>
where
S: 'static,
C: 'static,
{
Arc::new(self)
}
}
#[async_trait]
impl<S, C> HarnessAgent for HarnessSubAgent<S, C>
where
S: Send + Sync + Default + 'static,
C: Send + Sync + Default + 'static,
{
fn name(&self) -> &str {
self.inner.name()
}
async fn run(&self, input: SubAgentInput, events: EventSink) -> Result<SubAgentOutput> {
let run = self
.inner
.invoke_with_events(
&self.state,
C::default(),
self.parent_depth,
input.prompt,
&events,
)
.await?;
Ok(SubAgentOutput {
text: run.text().unwrap_or_default(),
structured: run.structured.clone(),
usage: run.usage,
model_calls: run.model_calls,
tool_calls: run.tool_calls,
})
}
}
#[cfg(test)]
mod test;