entelix_agents/agent/mod.rs
1//! `Agent<S>` — production agent runtime wrapping any `Runnable<S, S>`
2//! with an event sink, execution mode, and lifecycle observers.
3//!
4//! `Agent<S>` is the surface every production caller targets:
5//!
6//! ```ignore
7//! let agent = create_react_agent(model, tools)?;
8//! let final_state = agent.execute(initial, &ctx).await?; // sync drain
9//!
10//! let mut stream = agent.execute_stream(initial, &ctx);
11//! while let Some(event) = stream.next().await {
12//! match event? { /* AgentEvent variants */ }
13//! }
14//! ```
15//!
16//! The runtime is deliberately a *thin wrapper* over the inner
17//! `Runnable<S, S>` (typically a `CompiledGraph<S>`):
18//!
19//! - **`execute`** drives the inner runnable via
20//! [`Runnable::invoke`] and emits a `Started` / `Complete` pair
21//! on the sink for observability — returns the terminal state.
22//! - **`execute_stream`** uses the same lifecycle and inner
23//! primitive (`Runnable::invoke`) but returns a stream of
24//! `AgentEvent` values for callers wiring to SSE or other
25//! incremental consumers. Both surfaces observe one canonical
26//! `Started` → `Complete(state)` sequence regardless of which
27//! the caller picks.
28//!
29//! ## Composability
30//!
31//! `Agent<S>` itself implements `Runnable<S, S>`, so an agent is a
32//! valid node in a parent `StateGraph<ParentState>` that maps state
33//! across the boundary. Recursive sub-agent dispatch follows the
34//! same pattern as any other `Runnable` composition.
35//!
36//! ## Information density
37//!
38//! Events emitted to the sink carry the full observability surface
39//! (ids, durations, classifications). The agent's own re-feed path
40//! to the model — when consuming a `ToolComplete` to synthesize the
41//! next message — reads only the LLM-facing fields (`output`,
42//! `delta`, lean `error_message`). Test fixtures verify both
43//! surfaces independently.
44
45mod approval_layer;
46mod approver;
47mod event;
48mod mode;
49mod observer;
50mod result;
51mod sink;
52mod tool_event_layer;
53mod tool_hook_layer;
54
55use std::sync::Arc;
56
57use async_trait::async_trait;
58use entelix_core::context::ExecutionContext;
59use entelix_core::error::{Error, Result};
60use entelix_runnable::Runnable;
61use entelix_runnable::stream::BoxStream;
62use tracing::Instrument;
63
64pub use self::approval_layer::{
65 ApprovalLayer, ApprovalService, EffectGate, ToolApprovalEventSink, ToolApprovalEventSinkHandle,
66};
67pub use self::approver::{
68 AlwaysApprove, ApprovalDecision, ApprovalRequest, Approver, ChannelApprover,
69 ChannelApproverConfig, PendingApproval,
70};
71pub use self::event::AgentEvent;
72pub use self::mode::ExecutionMode;
73pub use self::observer::{AgentObserver, DynObserver};
74pub use self::result::AgentRunResult;
75pub use self::sink::{
76 AgentEventSink, BroadcastSink, CaptureSink, ChannelSink, DroppingSink, FailOpenSink,
77 FanOutSink, StateErasureSink,
78};
79pub use self::tool_event_layer::{ToolEventLayer, ToolEventService};
80pub use self::tool_hook_layer::{
81 ToolHook, ToolHookDecision, ToolHookLayer, ToolHookRegistry, ToolHookRequest, ToolHookService,
82};
83
84/// Production agent runtime.
85///
86/// Construct via [`Agent::builder`]; finalize with
87/// [`AgentBuilder::build`]. See module docs for the abstraction
88/// model and information-density discipline.
89pub struct Agent<S>
90where
91 S: Clone + Send + Sync + 'static,
92{
93 name: String,
94 runnable: Arc<dyn Runnable<S, S>>,
95 sink: Arc<dyn AgentEventSink<S>>,
96 observers: Vec<DynObserver<S>>,
97 execution_mode: ExecutionMode,
98 approver: Option<Arc<dyn Approver>>,
99}
100
101impl<S> Agent<S>
102where
103 S: Clone + Send + Sync + 'static,
104{
105 /// Start a fluent builder.
106 #[must_use]
107 pub fn builder() -> AgentBuilder<S> {
108 AgentBuilder::default()
109 }
110
111 /// Borrow the agent's stable name. Always non-empty —
112 /// `AgentBuilder::build` rejects empty / unset names so trace
113 /// correlation never silently breaks.
114 #[must_use]
115 pub fn name(&self) -> &str {
116 &self.name
117 }
118
119 /// Borrow the underlying runnable. Useful when an agent is
120 /// embedded as a node in a larger graph and the parent needs
121 /// direct access to the inner shape (e.g. for checkpointing).
122 #[must_use]
123 pub fn inner(&self) -> &Arc<dyn Runnable<S, S>> {
124 &self.runnable
125 }
126
127 /// Run to completion, returning the terminal state.
128 ///
129 /// Emits a `Started{run_id}` opener, fires every registered
130 /// observer at the appropriate lifecycle point, then emits
131 /// either `Complete{run_id, state}` or `Failed{run_id, error}`
132 /// on the sink — every run produces exactly one terminal event
133 ///.
134 ///
135 /// The `run_id` is inherited from `ctx.run_id()` when present,
136 /// otherwise a fresh UUID v7 is generated and propagated to the
137 /// inner runnable through a cloned context.
138 pub async fn execute(&self, input: S, ctx: &ExecutionContext) -> Result<AgentRunResult<S>> {
139 let (run_id, scoped_ctx) = Self::scoped_run_context(ctx);
140 let parent_run_id = scoped_ctx.parent_run_id().map(str::to_owned);
141 let ctx = &scoped_ctx;
142
143 // Attach the type-erased approval-event sink handle so any
144 // `ApprovalLayer` deeper in the dispatch stack can emit
145 // `ToolCallApproved` / `ToolCallDenied` through the agent's
146 // typed sink without taking it as a constructor arg
147 // (which would tie the layer to a specific `S`). The
148 // attachment is unconditional — operators without an
149 // `ApprovalLayer` pay only the cost of an unused
150 // `Extensions` slot.
151 let scoped_with_sink =
152 ctx.clone()
153 .add_extension(ToolApprovalEventSinkHandle::for_agent_sink(Arc::clone(
154 &self.sink,
155 )));
156
157 self.execute_inner(input, run_id, parent_run_id, &scoped_with_sink)
158 .await
159 }
160
161 /// Convenience entry that attaches a [`entelix_core::RunOverrides`] extension
162 /// to the context for the duration of the call. Equivalent to
163 /// `agent.execute(input, &ctx.add_extension(overrides))` —
164 /// shorter at the call site and signals the per-call shape at
165 /// a glance.
166 ///
167 /// **Asymmetric by design** — `RunOverrides` is the
168 /// agent-loop carrier (model / system prompt / max iterations
169 /// owned by the loop the `Agent` itself drives) so a typed
170 /// convenience belongs on `Agent`. `RequestOverrides`
171 /// (temperature / top_p / top_k / max_tokens / stop_sequences /
172 /// reasoning_effort / tool_choice / response_format /
173 /// parallel_tool_calls) is `ChatModel`-shaped — the dispatch
174 /// layer downstream picks it up via
175 /// `ExecutionContext::add_extension(RequestOverrides::new()…)`,
176 /// no agent-side convenience is needed (and adding one would
177 /// duplicate the orthogonality that the carrier split S99
178 /// established).
179 ///
180 /// Operators threading multiple per-call extensions stay on
181 /// [`Self::execute`] with their own `add_extension` chain.
182 pub async fn execute_with(
183 &self,
184 input: S,
185 overrides: entelix_core::RunOverrides,
186 ctx: &ExecutionContext,
187 ) -> Result<AgentRunResult<S>> {
188 let scoped = ctx.clone().add_extension(overrides);
189 self.execute(input, &scoped).await
190 }
191
192 async fn execute_inner(
193 &self,
194 input: S,
195 run_id: String,
196 parent_run_id: Option<String>,
197 ctx: &ExecutionContext,
198 ) -> Result<AgentRunResult<S>> {
199 self.sink
200 .send(AgentEvent::Started {
201 run_id: run_id.clone(),
202 tenant_id: ctx.tenant_id().clone(),
203 parent_run_id: parent_run_id.clone(),
204 agent: self.name.clone(),
205 })
206 .await?;
207
208 // The model + tool service-layer spans fire inside
209 // `run_inner`. Instrumenting *that* future with the
210 // agent-run span makes those layer spans children of the
211 // agent root in the OTel trace tree — operators see one
212 // tree per agent run instead of layer spans floating
213 // side by side. `Started` / `Failed` / `Complete` are
214 // sink emissions, not tracing spans, so they live
215 // outside the instrumented future without losing
216 // observability.
217 let outcome = self
218 .run_inner(input, run_id.clone(), ctx)
219 .instrument(self.run_span(&run_id, ctx))
220 .await;
221 match outcome {
222 Ok(result) => {
223 self.sink
224 .send(AgentEvent::Complete {
225 run_id: result.run_id.clone(),
226 tenant_id: ctx.tenant_id().clone(),
227 state: result.state.clone(),
228 usage: result.usage,
229 })
230 .await?;
231 Ok(result)
232 }
233 Err(err) => {
234 if let entelix_core::Error::UsageLimitExceeded(breach) = &err
235 && let Some(handle) = ctx.audit_sink()
236 {
237 handle.as_sink().record_usage_limit_exceeded(breach);
238 }
239 // Best-effort `Failed` emission — if the sink itself
240 // errors (dropped receiver), swallow the secondary
241 // error so the original surfaces unchanged.
242 let envelope = err.envelope();
243 let _ = self
244 .sink
245 .send(AgentEvent::Failed {
246 run_id,
247 tenant_id: ctx.tenant_id().clone(),
248 error: err.to_string(),
249 envelope,
250 })
251 .await;
252 Err(err)
253 }
254 }
255 }
256
257 /// Build the `entelix.agent.run` tracing span for one
258 /// `execute` / `execute_stream` invocation. Span fields use
259 /// the `gen_ai.agent.*` / `entelix.*` namespaces that match
260 /// the rest of the OTel surface so dashboards joining on
261 /// run_id stay consistent across the layer / agent stack.
262 ///
263 /// The six `gen_ai.usage.*` / `entelix.usage.*` fields are
264 /// declared as [`tracing::field::Empty`] placeholders and
265 /// populated by [`Self::run_inner`] at the
266 /// [`AgentRunResult::usage`] freeze point — a `RunBudget`
267 /// attached to the [`ExecutionContext`] surfaces its frozen
268 /// counters as span attributes so OTel dashboards filter
269 /// per-run consumption without consumers having to harvest
270 /// the envelope return value. Runs without a budget keep the
271 /// fields `Empty`; the `tracing-opentelemetry` bridge omits
272 /// empty fields from the exported span attributes.
273 fn run_span(&self, run_id: &str, ctx: &ExecutionContext) -> tracing::Span {
274 tracing::info_span!(
275 target: "gen_ai",
276 "entelix.agent.run",
277 gen_ai.agent.name = %self.name,
278 entelix.run_id = %run_id,
279 entelix.tenant_id = %ctx.tenant_id(),
280 entelix.thread_id = ctx.thread_id(),
281 gen_ai.usage.input_tokens = tracing::field::Empty,
282 gen_ai.usage.output_tokens = tracing::field::Empty,
283 gen_ai.usage.total_tokens = tracing::field::Empty,
284 entelix.agent.usage.cost = tracing::field::Empty,
285 entelix.usage.requests = tracing::field::Empty,
286 entelix.usage.tool_calls = tracing::field::Empty,
287 )
288 }
289
290 /// Drive observers + inner runnable as one cohesive unit so the
291 /// outer terminal-event branching (`Complete` vs `Failed`) only
292 /// matches once.
293 ///
294 /// The [`AgentRunResult::usage`] snapshot is captured *between*
295 /// `runnable.invoke` returning and `on_complete` firing —
296 /// observer dispatches may themselves consume budget through
297 /// downstream `ChatModel` calls (memory consolidation, summary
298 /// writes), and the envelope must reflect the agent run only
299 ///.
300 async fn run_inner(
301 &self,
302 input: S,
303 run_id: String,
304 ctx: &ExecutionContext,
305 ) -> Result<AgentRunResult<S>> {
306 for observer in &self.observers {
307 observer.pre_turn(&input, ctx).await?;
308 }
309 let state = match self.runnable.invoke(input, ctx).await {
310 Ok(state) => state,
311 Err(err) => {
312 // HITL pause-and-resume is a control signal, not a
313 // failure — observers wanting interrupt observation
314 // consume `AgentEvent::Interrupted` from the sink.
315 if !matches!(err, Error::Interrupted { .. }) {
316 for observer in &self.observers {
317 // Failure-path observability is one-way:
318 // observer errors raised from on_error get
319 // dropped (with a tracing warn) so they don't
320 // replace the original error in flight.
321 // Mirrors the audit-sink contract from
322 // invariant 18 /
323 if let Err(observer_err) = observer.on_error(&err, ctx).await {
324 tracing::warn!(
325 observer = %observer.name(),
326 source = %observer_err,
327 "AgentObserver::on_error returned an error; dropping"
328 );
329 }
330 }
331 }
332 return Err(err);
333 }
334 };
335 let usage = ctx.run_budget().map(|budget| budget.snapshot());
336 // Mirror the frozen snapshot onto the `entelix.agent.run`
337 // span declared in `run_span` — `Span::current()` here is
338 // that root span (this future is `.instrument`-ed by
339 // `execute_inner` / `book_end_stream`). Runs without a
340 // budget leave the fields as `tracing::field::Empty`, which
341 // the `tracing-opentelemetry` bridge drops from the
342 // exported span (no zero-valued attributes ride through).
343 if let Some(snapshot) = usage {
344 let span = tracing::Span::current();
345 span.record("gen_ai.usage.input_tokens", snapshot.input_tokens);
346 span.record("gen_ai.usage.output_tokens", snapshot.output_tokens);
347 span.record("gen_ai.usage.total_tokens", snapshot.total_tokens());
348 // `Decimal` lacks a `tracing::Value` impl; serialise to
349 // string. OTel exporters round-trip the cost as a
350 // string-typed attribute (the `Decimal` representation
351 // is the most accurate; consumers parse on read).
352 //
353 // Attribute name `entelix.agent.usage.cost` is distinct
354 // from `OtelLayer`'s per-model-call `gen_ai.usage.cost`
355 // — that one is a per-charge increment (`f64`), this one
356 // is the per-run cumulative roll-up (`Decimal` Display).
357 // Same key would conflate two different metrics on the
358 // dashboard; the namespace split mirrors the existing
359 // `entelix.usage.requests` / `entelix.usage.tool_calls`
360 // pattern (per-run aggregates ride the `entelix.*`
361 // namespace; per-call vendor signals ride `gen_ai.*`).
362 span.record(
363 "entelix.agent.usage.cost",
364 tracing::field::display(&snapshot.cost_usd),
365 );
366 span.record("entelix.usage.requests", snapshot.requests);
367 span.record("entelix.usage.tool_calls", snapshot.tool_calls);
368 }
369 // on_complete fires before any terminal event so observers
370 // can mutate side-channel state (vector store writes,
371 // summary persistence) and have those writes reflected in
372 // the same audit trail row.
373 for observer in &self.observers {
374 observer.on_complete(&state, ctx).await?;
375 }
376 Ok(AgentRunResult::new(state, run_id, usage))
377 }
378
379 /// Compute `(run_id, ctx_with_run_id_when_minted)` for an entry
380 /// Mint a fresh run id for this `Agent::execute` and rebase the
381 /// context so the inner runnable sees `(run_id = fresh,
382 /// parent_run_id = Some(caller's run_id))`. Top-level runs land
383 /// with `parent_run_id = None`; sub-agent dispatches preserve
384 /// the parent's id under `parent_run_id` so `(run_id,
385 /// parent_run_id)` edges reconstruct the trace tree.
386 ///
387 /// Always mints — never reuses the caller's id. Recipes that
388 /// pre-allocated a `run_id` for an external reservation see
389 /// their id flow through as `parent_run_id` of the agent's run,
390 /// keeping a deterministic correlation without flattening the
391 /// hierarchy.
392 fn scoped_run_context(ctx: &ExecutionContext) -> (String, ExecutionContext) {
393 let fresh = uuid::Uuid::now_v7().to_string();
394 let mut scoped = ctx.clone().with_run_id(fresh.clone());
395 if let Some(parent) = ctx.run_id() {
396 scoped = scoped.with_parent_run_id(parent.to_owned());
397 }
398 (fresh, scoped)
399 }
400
401 /// Borrow the configured execution mode.
402 #[must_use]
403 pub const fn execution_mode(&self) -> ExecutionMode {
404 self.execution_mode
405 }
406
407 /// Borrow the configured approver (`None` in `Auto` mode).
408 #[must_use]
409 pub fn approver(&self) -> Option<&Arc<dyn Approver>> {
410 self.approver.as_ref()
411 }
412
413 /// Number of registered lifecycle observers.
414 #[must_use]
415 pub fn observer_count(&self) -> usize {
416 self.observers.len()
417 }
418
419 /// Run with `AgentEvent` book-ends as a stream. Sinks
420 /// attached at construction time receive the same events for
421 /// fan-out telemetry.
422 ///
423 /// The returned stream is the caller-facing view; the sink
424 /// is the observability-facing view. **Both observe the same
425 /// `Started` → `Complete(state)` sequence** that
426 /// [`Self::execute`] produces — the only difference is the
427 /// return shape (a stream of events vs the awaited terminal
428 /// state).
429 ///
430 /// Drives the inner runnable via [`Runnable::invoke`] rather
431 /// than [`Runnable::stream`] so the lifecycle is identical to
432 /// `execute` and the `Complete` event always fires on
433 /// successful runs (a previous design routed through
434 /// `Runnable::stream`'s `Updates` mode and could silently skip
435 /// `Complete` for runnables that emit no per-node updates).
436 ///
437 /// Construction is synchronous and infallible — every event
438 /// (including the initial `Started`) yields lazily as the
439 /// stream is polled. Callers consume with `.next().await` like
440 /// any `Stream`; no extra `.await` on the constructor itself.
441 #[must_use]
442 pub fn execute_stream<'a>(
443 &'a self,
444 input: S,
445 ctx: &'a ExecutionContext,
446 ) -> BoxStream<'a, Result<AgentEvent<S>>> {
447 Box::pin(self.book_end_stream(input, ctx))
448 }
449
450 /// Async-stream body for `execute_stream`. Mirrors
451 /// [`Self::execute`] exactly so observers / sinks see one
452 /// canonical lifecycle regardless of which surface the caller
453 /// picked.
454 #[allow(clippy::redundant_async_block)]
455 fn book_end_stream<'a>(
456 &'a self,
457 input: S,
458 ctx: &'a ExecutionContext,
459 ) -> impl futures::Stream<Item = Result<AgentEvent<S>>> + Send + 'a {
460 async_stream::stream! {
461 let (run_id, scoped) = Self::scoped_run_context(ctx);
462 let parent_run_id = scoped.parent_run_id().map(str::to_owned);
463 // Keep `scoped` alive across the `await` boundaries below
464 // so the run-id-stamped child context lives for the whole call.
465 let inner_ctx: &ExecutionContext = &scoped;
466
467 // Started book-end (sink + caller stream).
468 let started = AgentEvent::Started {
469 run_id: run_id.clone(),
470 tenant_id: inner_ctx.tenant_id().clone(),
471 parent_run_id,
472 agent: self.name.clone(),
473 };
474 self.sink.send(started.clone()).await?;
475 yield Ok(started);
476
477 // Same instrument pattern as `execute` — model +
478 // tool layer spans inside `run_inner` nest under the
479 // agent-run span. Sink emissions (book-end events)
480 // stay outside the span; they're sink-only, not
481 // tracing events.
482 let tenant_id = inner_ctx.tenant_id().clone();
483 let outcome = self
484 .run_inner(input, run_id.clone(), inner_ctx)
485 .instrument(self.run_span(&run_id, inner_ctx))
486 .await;
487 // `scoped` lives at least until here — the borrow above is
488 // valid for the whole stream body.
489 drop(scoped);
490 match outcome {
491 Ok(result) => {
492 let complete = AgentEvent::Complete {
493 run_id: result.run_id,
494 tenant_id,
495 state: result.state,
496 usage: result.usage,
497 };
498 self.sink.send(complete.clone()).await?;
499 yield Ok(complete);
500 }
501 Err(err) => {
502 let envelope = err.envelope();
503 let failed = AgentEvent::Failed {
504 run_id,
505 tenant_id,
506 error: err.to_string(),
507 envelope,
508 };
509 let _ = self.sink.send(failed.clone()).await;
510 yield Ok(failed);
511 yield Err(err);
512 }
513 }
514 }
515 }
516}
517
518impl<S> std::fmt::Debug for Agent<S>
519where
520 S: Clone + Send + Sync + 'static,
521{
522 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
523 f.debug_struct("Agent")
524 .field("name", &self.name)
525 .finish_non_exhaustive()
526 }
527}
528
529/// `Agent<S>` is itself a [`Runnable<S, S>`] so it composes inside
530/// larger graphs (recursive sub-agent dispatch).
531///
532/// The composition contract is `S → S`, so the [`AgentRunResult`]
533/// envelope is unwrapped here — composing graphs see only the
534/// terminal state. Callers that need the per-run `UsageSnapshot`
535/// or `run_id` go through [`Agent::execute`] directly.
536#[async_trait]
537impl<S> Runnable<S, S> for Agent<S>
538where
539 S: Clone + Send + Sync + 'static,
540{
541 async fn invoke(&self, input: S, ctx: &ExecutionContext) -> Result<S> {
542 self.execute(input, ctx)
543 .await
544 .map(AgentRunResult::into_state)
545 }
546}
547
548/// Fluent builder for [`Agent<S>`].
549///
550/// Required fields (build fails otherwise):
551/// - `name` — non-empty, surfaces in `AgentEvent::Started { agent }`
552/// and `OTel` spans for trace correlation
553/// - `runnable` — the inner state machine the agent drives
554///
555/// Optional fields with sensible defaults:
556/// - `sink`: [`DroppingSink`] (telemetry-free)
557/// - `observers`: empty (no lifecycle hooks fire)
558/// - `execution_mode`: [`ExecutionMode::Auto`]
559/// - `approver`: `None` (only meaningful in `Supervised` mode)
560pub struct AgentBuilder<S>
561where
562 S: Clone + Send + Sync + 'static,
563{
564 name: Option<String>,
565 runnable: Option<Arc<dyn Runnable<S, S>>>,
566 sinks: Vec<Arc<dyn AgentEventSink<S>>>,
567 observers: Vec<DynObserver<S>>,
568 execution_mode: ExecutionMode,
569 approver: Option<Arc<dyn Approver>>,
570}
571
572impl<S> Default for AgentBuilder<S>
573where
574 S: Clone + Send + Sync + 'static,
575{
576 fn default() -> Self {
577 Self {
578 name: None,
579 runnable: None,
580 sinks: Vec::new(),
581 observers: Vec::new(),
582 execution_mode: ExecutionMode::default(),
583 approver: None,
584 }
585 }
586}
587
588impl<S> AgentBuilder<S>
589where
590 S: Clone + Send + Sync + 'static,
591{
592 /// Set the agent's stable identifier — required, surfaces in
593 /// `AgentEvent::Started { agent }` and `OTel` spans for trace
594 /// correlation. Must be non-empty after `Into::into`; the
595 /// build call returns `Error::Config` otherwise.
596 #[must_use]
597 pub fn with_name(mut self, name: impl Into<String>) -> Self {
598 self.name = Some(name.into());
599 self
600 }
601
602 /// Required — the agent's underlying [`Runnable<S, S>`]. The
603 /// build call returns `Error::Config` otherwise.
604 #[must_use]
605 pub fn with_runnable<R>(mut self, runnable: R) -> Self
606 where
607 R: Runnable<S, S> + 'static,
608 {
609 self.runnable = Some(Arc::new(runnable));
610 self
611 }
612
613 /// Reuse an `Arc<dyn Runnable<S, S>>` directly — useful when
614 /// the inner has already been boxed elsewhere (recipes do this
615 /// to share a `CompiledGraph<S>` between the agent and other
616 /// composition sites).
617 #[must_use]
618 pub fn with_runnable_arc(mut self, runnable: Arc<dyn Runnable<S, S>>) -> Self {
619 self.runnable = Some(runnable);
620 self
621 }
622
623 /// Append an event sink. Multiple calls accumulate — the agent
624 /// dispatches each emitted event to every registered sink in
625 /// registration order via an internal [`FanOutSink`]. Sinks
626 /// added earlier run first; a sink that returns `Err` halts the
627 /// run before later sinks see the event, so operators add
628 /// must-succeed sinks (audit, compliance) first and wrap
629 /// best-effort sinks (telemetry, embedding indexer) with
630 /// [`FailOpenSink`]. Empty registrations resolve to
631 /// [`DroppingSink`] at build time.
632 #[must_use]
633 pub fn add_sink<K>(mut self, sink: K) -> Self
634 where
635 K: AgentEventSink<S> + 'static,
636 {
637 self.sinks.push(Arc::new(sink));
638 self
639 }
640
641 /// Append a pre-erased `Arc<dyn AgentEventSink<S>>` — useful
642 /// when the sink has already been boxed elsewhere.
643 #[must_use]
644 pub fn add_sink_arc(mut self, sink: Arc<dyn AgentEventSink<S>>) -> Self {
645 self.sinks.push(sink);
646 self
647 }
648
649 /// Register a lifecycle observer. Observers are appended in
650 /// registration order; the agent fires them in that order at
651 /// each lifecycle event. Multiple observers are supported via
652 /// repeated calls.
653 #[must_use]
654 pub fn with_observer<O>(mut self, observer: O) -> Self
655 where
656 O: AgentObserver<S> + 'static,
657 {
658 self.observers.push(Arc::new(observer));
659 self
660 }
661
662 /// Register an `Arc<dyn AgentObserver<S>>` directly — useful
663 /// when the observer is also held by another consumer (e.g.
664 /// the same observer drives both an agent and an HTTP route
665 /// for direct inspection).
666 #[must_use]
667 pub fn with_observer_arc(mut self, observer: DynObserver<S>) -> Self {
668 self.observers.push(observer);
669 self
670 }
671
672 /// Defaults to [`ExecutionMode::Auto`]. `Supervised` requires an
673 /// [`Approver`] (set via [`Self::with_approver`]); the call to
674 /// [`Self::build`] returns `Error::Config` if mode is
675 /// `Supervised` and no approver was registered.
676 #[must_use]
677 pub const fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
678 self.execution_mode = mode;
679 self
680 }
681
682 /// Attach the approver used in `Supervised` mode. Has no
683 /// effect in `Auto` mode but is preserved across builder
684 /// calls so a single fluent chain can configure both modes
685 /// before deciding.
686 #[must_use]
687 pub fn with_approver<A>(mut self, approver: A) -> Self
688 where
689 A: Approver + 'static,
690 {
691 self.approver = Some(Arc::new(approver));
692 self
693 }
694
695 /// Reuse an `Arc<dyn Approver>` directly.
696 #[must_use]
697 pub fn with_approver_arc(mut self, approver: Arc<dyn Approver>) -> Self {
698 self.approver = Some(approver);
699 self
700 }
701
702 /// Finalize. Returns [`entelix_core::Error::Config`] when:
703 /// - `name` was not set or is empty (every agent must be
704 /// identifiable in traces — empty-string defaults silently
705 /// destroy correlation), or
706 /// - `runnable` was not set, or
707 /// - `execution_mode` is `Supervised` but no `approver` was
708 /// registered (supervised mode without an approver is a
709 /// programming error — there is no decision-maker).
710 pub fn build(self) -> Result<Agent<S>> {
711 let name = self.name.filter(|n| !n.is_empty()).ok_or_else(|| {
712 entelix_core::Error::config(
713 "AgentBuilder::build: name is required and must be non-empty \
714 (call .with_name(...) — surfaces in AgentEvent::Started and OTel spans)",
715 )
716 })?;
717 let runnable = self.runnable.ok_or_else(|| {
718 entelix_core::Error::config(
719 "AgentBuilder::build: runnable is required (call .with_runnable(...) or .with_runnable_arc(...))",
720 )
721 })?;
722 if self.execution_mode.requires_approval() && self.approver.is_none() {
723 return Err(entelix_core::Error::config(
724 "AgentBuilder::build: ExecutionMode::Supervised requires an Approver \
725 (call .with_approver(...) or .with_approver_arc(...))",
726 ));
727 }
728 let sink: Arc<dyn AgentEventSink<S>> = match self.sinks.len() {
729 0 => Arc::new(DroppingSink),
730 1 => self
731 .sinks
732 .into_iter()
733 .next()
734 .unwrap_or_else(|| unreachable!("len()==1 guarantees a value")),
735 _ => {
736 let mut fan = FanOutSink::<S>::new();
737 for sink in self.sinks {
738 fan = fan.push(sink);
739 }
740 Arc::new(fan)
741 }
742 };
743 Ok(Agent {
744 name,
745 runnable,
746 sink,
747 observers: self.observers,
748 execution_mode: self.execution_mode,
749 approver: self.approver,
750 })
751 }
752}
753
754#[cfg(test)]
755#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
756mod tests {
757 use entelix_runnable::RunnableLambda;
758 use futures::StreamExt;
759
760 use super::*;
761
762 fn echo_runnable() -> impl Runnable<i32, i32> {
763 RunnableLambda::new(|n: i32, _ctx| async move { Ok::<_, _>(n + 1) })
764 }
765
766 #[tokio::test]
767 async fn build_requires_name() {
768 let err = Agent::<i32>::builder()
769 .with_runnable(echo_runnable())
770 .build()
771 .unwrap_err();
772 assert!(format!("{err}").contains("name is required"));
773 }
774
775 #[tokio::test]
776 async fn build_rejects_empty_name() {
777 // Empty string defaults destroy trace correlation — guard
778 // against the silent-failure mode at build time.
779 let err = Agent::<i32>::builder()
780 .with_name("")
781 .with_runnable(echo_runnable())
782 .build()
783 .unwrap_err();
784 assert!(format!("{err}").contains("name is required"));
785 }
786
787 #[tokio::test]
788 async fn build_requires_runnable() {
789 let err = Agent::<i32>::builder()
790 .with_name("needs-runnable")
791 .build()
792 .unwrap_err();
793 assert!(format!("{err}").contains("runnable is required"));
794 }
795
796 #[tokio::test]
797 async fn execute_drives_inner_and_emits_book_ends() {
798 let sink = CaptureSink::<i32>::new();
799 let agent = Agent::<i32>::builder()
800 .with_name("test-agent")
801 .with_runnable(echo_runnable())
802 .add_sink(sink.clone())
803 .build()
804 .unwrap();
805
806 let result = agent.execute(41, &ExecutionContext::new()).await.unwrap();
807 assert_eq!(result.state, 42);
808 assert!(!result.run_id.is_empty(), "run_id must be minted");
809 assert!(
810 result.usage.is_none(),
811 "no RunBudget on ctx → envelope.usage is None"
812 );
813 let events = sink.events();
814 assert_eq!(events.len(), 2);
815 assert!(matches!(&events[0], AgentEvent::Started { agent, .. } if agent == "test-agent"));
816 assert!(matches!(events[1], AgentEvent::Complete { state: 42, .. }));
817 }
818
819 #[tokio::test]
820 async fn execute_envelope_carries_frozen_usage_snapshot_when_budget_is_attached() {
821 // With a RunBudget on the context, the envelope's `usage`
822 // is `Some(snapshot)` — frozen at the moment the inner
823 // runnable returned. Subsequent budget mutations (which
824 // would happen in a real downstream layer) MUST NOT be
825 // reflected in the snapshot we already returned.
826 use entelix_core::RunBudget;
827
828 let sink = CaptureSink::<i32>::new();
829 let agent = Agent::<i32>::builder()
830 .with_name("budgeted-agent")
831 .with_runnable(echo_runnable())
832 .add_sink(sink.clone())
833 .build()
834 .unwrap();
835
836 // `unlimited()` plus a non-`None` `request_limit` so the
837 // pre-call CAS actually increments — `check_pre_request`
838 // early-returns when no cap is attached and never touches
839 // the counter (the runtime path that consumes a budget
840 // always sets at least one cap, so this is the realistic
841 // shape).
842 let budget = RunBudget::unlimited().with_request_limit(100);
843 budget.check_pre_request().unwrap();
844 let ctx = ExecutionContext::new().with_run_budget(budget.clone());
845
846 let result = agent.execute(0, &ctx).await.unwrap();
847 let snapshot = result.usage.expect("budget attached → usage Some");
848 assert_eq!(snapshot.requests, 1, "snapshot reflects pre-stamped count");
849
850 // Mutate the budget after the run — the snapshot must NOT
851 // change. This is the frozen-at-terminal contract.
852 budget.check_pre_request().unwrap();
853 assert_eq!(
854 snapshot.requests, 1,
855 "snapshot is frozen — not Arc-shared with live counter",
856 );
857
858 // The matching `Complete` sink event carries the same
859 // snapshot — operators wiring telemetry through the sink
860 // see the same artifact as direct callers (one-shot vs
861 // streaming surfaces match).
862 let events = sink.events();
863 let complete = events
864 .iter()
865 .find_map(|event| match event {
866 AgentEvent::Complete { usage, .. } => Some(*usage),
867 _ => None,
868 })
869 .expect("Complete event must be emitted");
870 assert_eq!(complete, Some(snapshot));
871 }
872
873 #[tokio::test]
874 async fn agent_is_runnable_so_it_composes() {
875 // Demonstrate that Agent<S>: Runnable<S, S> — the agent is
876 // itself usable as a node in a larger composition.
877 let inner = Agent::<i32>::builder()
878 .with_name("composed-inner")
879 .with_runnable(echo_runnable())
880 .build()
881 .unwrap();
882 let composed: Arc<dyn Runnable<i32, i32>> = Arc::new(inner);
883 let result = composed.invoke(10, &ExecutionContext::new()).await.unwrap();
884 assert_eq!(result, 11);
885 }
886
887 #[tokio::test]
888 async fn execute_stream_emits_started_and_complete() {
889 let sink = CaptureSink::<i32>::new();
890 let agent = Agent::<i32>::builder()
891 .with_name("streamer")
892 .with_runnable(echo_runnable())
893 .add_sink(sink.clone())
894 .build()
895 .unwrap();
896
897 let ctx = ExecutionContext::new();
898 let mut stream = agent.execute_stream(7, &ctx);
899 let mut received = Vec::new();
900 while let Some(event) = stream.next().await {
901 received.push(event.unwrap());
902 }
903
904 // Events should be: Started → Complete(8). Both surfaces
905 // (caller stream + sink) see the same sequence.
906 assert!(matches!(received[0], AgentEvent::Started { .. }));
907 assert!(matches!(
908 received.last(),
909 Some(AgentEvent::Complete {
910 state: 8,
911 usage: None,
912 ..
913 })
914 ));
915 assert_eq!(received.len(), sink.len());
916 }
917
918 #[tokio::test]
919 async fn execute_stream_with_dropping_sink_does_not_block() {
920 // Caller-facing stream still works when sink is a no-op.
921 let agent = Agent::<i32>::builder()
922 .with_name("dropping-sink")
923 .with_runnable(echo_runnable())
924 .build()
925 .unwrap();
926 let ctx = ExecutionContext::new();
927 let mut stream = agent.execute_stream(0, &ctx);
928 let mut count = 0;
929 while stream.next().await.is_some() {
930 count += 1;
931 }
932 assert!(count >= 2, "expected at least Started + Complete");
933 }
934}