mod approval_layer;
mod approver;
mod event;
mod mode;
mod observer;
mod result;
mod sink;
mod tool_event_layer;
mod tool_hook_layer;
use std::sync::Arc;
use async_trait::async_trait;
use entelix_core::context::ExecutionContext;
use entelix_core::error::{Error, Result};
use entelix_runnable::Runnable;
use entelix_runnable::stream::BoxStream;
use tracing::Instrument;
pub use self::approval_layer::{
ApprovalLayer, ApprovalService, EffectGate, ToolApprovalEventSink, ToolApprovalEventSinkHandle,
};
pub use self::approver::{
AlwaysApprove, ApprovalDecision, ApprovalRequest, Approver, ChannelApprover,
ChannelApproverConfig, PendingApproval,
};
pub use self::event::AgentEvent;
pub use self::mode::ExecutionMode;
pub use self::observer::{AgentObserver, DynObserver};
pub use self::result::AgentRunResult;
pub use self::sink::{
AgentEventSink, BroadcastSink, CaptureSink, ChannelSink, DroppingSink, FailOpenSink,
FanOutSink, StateErasureSink,
};
pub use self::tool_event_layer::{ToolEventLayer, ToolEventService};
pub use self::tool_hook_layer::{
ToolHook, ToolHookDecision, ToolHookLayer, ToolHookRegistry, ToolHookRequest, ToolHookService,
};
pub struct Agent<S>
where
S: Clone + Send + Sync + 'static,
{
name: String,
runnable: Arc<dyn Runnable<S, S>>,
sink: Arc<dyn AgentEventSink<S>>,
observers: Vec<DynObserver<S>>,
execution_mode: ExecutionMode,
approver: Option<Arc<dyn Approver>>,
}
impl<S> Agent<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn builder() -> AgentBuilder<S> {
AgentBuilder::default()
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn inner(&self) -> &Arc<dyn Runnable<S, S>> {
&self.runnable
}
pub async fn execute(&self, input: S, ctx: &ExecutionContext) -> Result<AgentRunResult<S>> {
let (run_id, scoped_ctx) = Self::scoped_run_context(ctx);
let parent_run_id = scoped_ctx.parent_run_id().map(str::to_owned);
let ctx = &scoped_ctx;
let scoped_with_sink =
ctx.clone()
.add_extension(ToolApprovalEventSinkHandle::for_agent_sink(Arc::clone(
&self.sink,
)));
self.execute_inner(input, run_id, parent_run_id, &scoped_with_sink)
.await
}
pub async fn execute_with(
&self,
input: S,
overrides: entelix_core::RunOverrides,
ctx: &ExecutionContext,
) -> Result<AgentRunResult<S>> {
let scoped = ctx.clone().add_extension(overrides);
self.execute(input, &scoped).await
}
async fn execute_inner(
&self,
input: S,
run_id: String,
parent_run_id: Option<String>,
ctx: &ExecutionContext,
) -> Result<AgentRunResult<S>> {
self.sink
.send(AgentEvent::Started {
run_id: run_id.clone(),
tenant_id: ctx.tenant_id().clone(),
parent_run_id: parent_run_id.clone(),
agent: self.name.clone(),
})
.await?;
let outcome = self
.run_inner(input, run_id.clone(), ctx)
.instrument(self.run_span(&run_id, ctx))
.await;
match outcome {
Ok(result) => {
self.sink
.send(AgentEvent::Complete {
run_id: result.run_id.clone(),
tenant_id: ctx.tenant_id().clone(),
state: result.state.clone(),
usage: result.usage,
})
.await?;
Ok(result)
}
Err(err) => {
if let entelix_core::Error::UsageLimitExceeded(breach) = &err
&& let Some(handle) = ctx.audit_sink()
{
handle.as_sink().record_usage_limit_exceeded(breach);
}
let envelope = err.envelope();
let _ = self
.sink
.send(AgentEvent::Failed {
run_id,
tenant_id: ctx.tenant_id().clone(),
error: err.to_string(),
envelope,
})
.await;
Err(err)
}
}
}
fn run_span(&self, run_id: &str, ctx: &ExecutionContext) -> tracing::Span {
tracing::info_span!(
target: "gen_ai",
"entelix.agent.run",
gen_ai.agent.name = %self.name,
entelix.run_id = %run_id,
entelix.tenant_id = %ctx.tenant_id(),
entelix.thread_id = ctx.thread_id(),
gen_ai.usage.input_tokens = tracing::field::Empty,
gen_ai.usage.output_tokens = tracing::field::Empty,
gen_ai.usage.total_tokens = tracing::field::Empty,
entelix.agent.usage.cost = tracing::field::Empty,
entelix.usage.requests = tracing::field::Empty,
entelix.usage.tool_calls = tracing::field::Empty,
)
}
async fn run_inner(
&self,
input: S,
run_id: String,
ctx: &ExecutionContext,
) -> Result<AgentRunResult<S>> {
for observer in &self.observers {
observer.pre_turn(&input, ctx).await?;
}
let state = match self.runnable.invoke(input, ctx).await {
Ok(state) => state,
Err(err) => {
if !matches!(err, Error::Interrupted { .. }) {
for observer in &self.observers {
if let Err(observer_err) = observer.on_error(&err, ctx).await {
tracing::warn!(
observer = %observer.name(),
source = %observer_err,
"AgentObserver::on_error returned an error; dropping"
);
}
}
}
return Err(err);
}
};
let usage = ctx.run_budget().map(|budget| budget.snapshot());
if let Some(snapshot) = usage {
let span = tracing::Span::current();
span.record("gen_ai.usage.input_tokens", snapshot.input_tokens);
span.record("gen_ai.usage.output_tokens", snapshot.output_tokens);
span.record("gen_ai.usage.total_tokens", snapshot.total_tokens());
span.record(
"entelix.agent.usage.cost",
tracing::field::display(&snapshot.cost_usd),
);
span.record("entelix.usage.requests", snapshot.requests);
span.record("entelix.usage.tool_calls", snapshot.tool_calls);
}
for observer in &self.observers {
observer.on_complete(&state, ctx).await?;
}
Ok(AgentRunResult::new(state, run_id, usage))
}
fn scoped_run_context(ctx: &ExecutionContext) -> (String, ExecutionContext) {
let fresh = uuid::Uuid::now_v7().to_string();
let mut scoped = ctx.clone().with_run_id(fresh.clone());
if let Some(parent) = ctx.run_id() {
scoped = scoped.with_parent_run_id(parent.to_owned());
}
(fresh, scoped)
}
#[must_use]
pub const fn execution_mode(&self) -> ExecutionMode {
self.execution_mode
}
#[must_use]
pub fn approver(&self) -> Option<&Arc<dyn Approver>> {
self.approver.as_ref()
}
#[must_use]
pub fn observer_count(&self) -> usize {
self.observers.len()
}
#[must_use]
pub fn execute_stream<'a>(
&'a self,
input: S,
ctx: &'a ExecutionContext,
) -> BoxStream<'a, Result<AgentEvent<S>>> {
Box::pin(self.book_end_stream(input, ctx))
}
#[allow(clippy::redundant_async_block)]
fn book_end_stream<'a>(
&'a self,
input: S,
ctx: &'a ExecutionContext,
) -> impl futures::Stream<Item = Result<AgentEvent<S>>> + Send + 'a {
async_stream::stream! {
let (run_id, scoped) = Self::scoped_run_context(ctx);
let parent_run_id = scoped.parent_run_id().map(str::to_owned);
let inner_ctx: &ExecutionContext = &scoped;
let started = AgentEvent::Started {
run_id: run_id.clone(),
tenant_id: inner_ctx.tenant_id().clone(),
parent_run_id,
agent: self.name.clone(),
};
self.sink.send(started.clone()).await?;
yield Ok(started);
let tenant_id = inner_ctx.tenant_id().clone();
let outcome = self
.run_inner(input, run_id.clone(), inner_ctx)
.instrument(self.run_span(&run_id, inner_ctx))
.await;
drop(scoped);
match outcome {
Ok(result) => {
let complete = AgentEvent::Complete {
run_id: result.run_id,
tenant_id,
state: result.state,
usage: result.usage,
};
self.sink.send(complete.clone()).await?;
yield Ok(complete);
}
Err(err) => {
let envelope = err.envelope();
let failed = AgentEvent::Failed {
run_id,
tenant_id,
error: err.to_string(),
envelope,
};
let _ = self.sink.send(failed.clone()).await;
yield Ok(failed);
yield Err(err);
}
}
}
}
}
impl<S> std::fmt::Debug for Agent<S>
where
S: Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Agent")
.field("name", &self.name)
.finish_non_exhaustive()
}
}
#[async_trait]
impl<S> Runnable<S, S> for Agent<S>
where
S: Clone + Send + Sync + 'static,
{
async fn invoke(&self, input: S, ctx: &ExecutionContext) -> Result<S> {
self.execute(input, ctx)
.await
.map(AgentRunResult::into_state)
}
}
pub struct AgentBuilder<S>
where
S: Clone + Send + Sync + 'static,
{
name: Option<String>,
runnable: Option<Arc<dyn Runnable<S, S>>>,
sinks: Vec<Arc<dyn AgentEventSink<S>>>,
observers: Vec<DynObserver<S>>,
execution_mode: ExecutionMode,
approver: Option<Arc<dyn Approver>>,
}
impl<S> Default for AgentBuilder<S>
where
S: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self {
name: None,
runnable: None,
sinks: Vec::new(),
observers: Vec::new(),
execution_mode: ExecutionMode::default(),
approver: None,
}
}
}
impl<S> AgentBuilder<S>
where
S: Clone + Send + Sync + 'static,
{
#[must_use]
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
#[must_use]
pub fn with_runnable<R>(mut self, runnable: R) -> Self
where
R: Runnable<S, S> + 'static,
{
self.runnable = Some(Arc::new(runnable));
self
}
#[must_use]
pub fn with_runnable_arc(mut self, runnable: Arc<dyn Runnable<S, S>>) -> Self {
self.runnable = Some(runnable);
self
}
#[must_use]
pub fn add_sink<K>(mut self, sink: K) -> Self
where
K: AgentEventSink<S> + 'static,
{
self.sinks.push(Arc::new(sink));
self
}
#[must_use]
pub fn add_sink_arc(mut self, sink: Arc<dyn AgentEventSink<S>>) -> Self {
self.sinks.push(sink);
self
}
#[must_use]
pub fn with_observer<O>(mut self, observer: O) -> Self
where
O: AgentObserver<S> + 'static,
{
self.observers.push(Arc::new(observer));
self
}
#[must_use]
pub fn with_observer_arc(mut self, observer: DynObserver<S>) -> Self {
self.observers.push(observer);
self
}
#[must_use]
pub const fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
self.execution_mode = mode;
self
}
#[must_use]
pub fn with_approver<A>(mut self, approver: A) -> Self
where
A: Approver + 'static,
{
self.approver = Some(Arc::new(approver));
self
}
#[must_use]
pub fn with_approver_arc(mut self, approver: Arc<dyn Approver>) -> Self {
self.approver = Some(approver);
self
}
pub fn build(self) -> Result<Agent<S>> {
let name = self.name.filter(|n| !n.is_empty()).ok_or_else(|| {
entelix_core::Error::config(
"AgentBuilder::build: name is required and must be non-empty \
(call .with_name(...) — surfaces in AgentEvent::Started and OTel spans)",
)
})?;
let runnable = self.runnable.ok_or_else(|| {
entelix_core::Error::config(
"AgentBuilder::build: runnable is required (call .with_runnable(...) or .with_runnable_arc(...))",
)
})?;
if self.execution_mode.requires_approval() && self.approver.is_none() {
return Err(entelix_core::Error::config(
"AgentBuilder::build: ExecutionMode::Supervised requires an Approver \
(call .with_approver(...) or .with_approver_arc(...))",
));
}
let sink: Arc<dyn AgentEventSink<S>> = match self.sinks.len() {
0 => Arc::new(DroppingSink),
1 => self
.sinks
.into_iter()
.next()
.unwrap_or_else(|| unreachable!("len()==1 guarantees a value")),
_ => {
let mut fan = FanOutSink::<S>::new();
for sink in self.sinks {
fan = fan.push(sink);
}
Arc::new(fan)
}
};
Ok(Agent {
name,
runnable,
sink,
observers: self.observers,
execution_mode: self.execution_mode,
approver: self.approver,
})
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
mod tests {
use entelix_runnable::RunnableLambda;
use futures::StreamExt;
use super::*;
fn echo_runnable() -> impl Runnable<i32, i32> {
RunnableLambda::new(|n: i32, _ctx| async move { Ok::<_, _>(n + 1) })
}
#[tokio::test]
async fn build_requires_name() {
let err = Agent::<i32>::builder()
.with_runnable(echo_runnable())
.build()
.unwrap_err();
assert!(format!("{err}").contains("name is required"));
}
#[tokio::test]
async fn build_rejects_empty_name() {
let err = Agent::<i32>::builder()
.with_name("")
.with_runnable(echo_runnable())
.build()
.unwrap_err();
assert!(format!("{err}").contains("name is required"));
}
#[tokio::test]
async fn build_requires_runnable() {
let err = Agent::<i32>::builder()
.with_name("needs-runnable")
.build()
.unwrap_err();
assert!(format!("{err}").contains("runnable is required"));
}
#[tokio::test]
async fn execute_drives_inner_and_emits_book_ends() {
let sink = CaptureSink::<i32>::new();
let agent = Agent::<i32>::builder()
.with_name("test-agent")
.with_runnable(echo_runnable())
.add_sink(sink.clone())
.build()
.unwrap();
let result = agent.execute(41, &ExecutionContext::new()).await.unwrap();
assert_eq!(result.state, 42);
assert!(!result.run_id.is_empty(), "run_id must be minted");
assert!(
result.usage.is_none(),
"no RunBudget on ctx → envelope.usage is None"
);
let events = sink.events();
assert_eq!(events.len(), 2);
assert!(matches!(&events[0], AgentEvent::Started { agent, .. } if agent == "test-agent"));
assert!(matches!(events[1], AgentEvent::Complete { state: 42, .. }));
}
#[tokio::test]
async fn execute_envelope_carries_frozen_usage_snapshot_when_budget_is_attached() {
use entelix_core::RunBudget;
let sink = CaptureSink::<i32>::new();
let agent = Agent::<i32>::builder()
.with_name("budgeted-agent")
.with_runnable(echo_runnable())
.add_sink(sink.clone())
.build()
.unwrap();
let budget = RunBudget::unlimited().with_request_limit(100);
budget.check_pre_request().unwrap();
let ctx = ExecutionContext::new().with_run_budget(budget.clone());
let result = agent.execute(0, &ctx).await.unwrap();
let snapshot = result.usage.expect("budget attached → usage Some");
assert_eq!(snapshot.requests, 1, "snapshot reflects pre-stamped count");
budget.check_pre_request().unwrap();
assert_eq!(
snapshot.requests, 1,
"snapshot is frozen — not Arc-shared with live counter",
);
let events = sink.events();
let complete = events
.iter()
.find_map(|event| match event {
AgentEvent::Complete { usage, .. } => Some(*usage),
_ => None,
})
.expect("Complete event must be emitted");
assert_eq!(complete, Some(snapshot));
}
#[tokio::test]
async fn agent_is_runnable_so_it_composes() {
let inner = Agent::<i32>::builder()
.with_name("composed-inner")
.with_runnable(echo_runnable())
.build()
.unwrap();
let composed: Arc<dyn Runnable<i32, i32>> = Arc::new(inner);
let result = composed.invoke(10, &ExecutionContext::new()).await.unwrap();
assert_eq!(result, 11);
}
#[tokio::test]
async fn execute_stream_emits_started_and_complete() {
let sink = CaptureSink::<i32>::new();
let agent = Agent::<i32>::builder()
.with_name("streamer")
.with_runnable(echo_runnable())
.add_sink(sink.clone())
.build()
.unwrap();
let ctx = ExecutionContext::new();
let mut stream = agent.execute_stream(7, &ctx);
let mut received = Vec::new();
while let Some(event) = stream.next().await {
received.push(event.unwrap());
}
assert!(matches!(received[0], AgentEvent::Started { .. }));
assert!(matches!(
received.last(),
Some(AgentEvent::Complete {
state: 8,
usage: None,
..
})
));
assert_eq!(received.len(), sink.len());
}
#[tokio::test]
async fn execute_stream_with_dropping_sink_does_not_block() {
let agent = Agent::<i32>::builder()
.with_name("dropping-sink")
.with_runnable(echo_runnable())
.build()
.unwrap();
let ctx = ExecutionContext::new();
let mut stream = agent.execute_stream(0, &ctx);
let mut count = 0;
while stream.next().await.is_some() {
count += 1;
}
assert!(count >= 2, "expected at least Started + Complete");
}
}