Skip to main content

entelix_agents/agent/
mod.rs

1//! `Agent<S>` — production agent runtime wrapping any `Runnable<S, S>`
2//! with an event sink, execution mode, and lifecycle observers.
3//!
4//! `Agent<S>` is the surface every production caller targets:
5//!
6//! ```ignore
7//! let agent = create_react_agent(model, tools)?;
8//! let final_state = agent.execute(initial, &ctx).await?;  // sync drain
9//!
10//! let mut stream = agent.execute_stream(initial, &ctx);
11//! while let Some(event) = stream.next().await {
12//!     match event? { /* AgentEvent variants */ }
13//! }
14//! ```
15//!
16//! The runtime is deliberately a *thin wrapper* over the inner
17//! `Runnable<S, S>` (typically a `CompiledGraph<S>`):
18//!
19//! - **`execute`** drives the inner runnable via
20//!   [`Runnable::invoke`] and emits a `Started` / `Complete` pair
21//!   on the sink for observability — returns the terminal state.
22//! - **`execute_stream`** uses the same lifecycle and inner
23//!   primitive (`Runnable::invoke`) but returns a stream of
24//!   `AgentEvent` values for callers wiring to SSE or other
25//!   incremental consumers. Both surfaces observe one canonical
26//!   `Started` → `Complete(state)` sequence regardless of which
27//!   the caller picks.
28//!
29//! ## Composability
30//!
31//! `Agent<S>` itself implements `Runnable<S, S>`, so an agent is a
32//! valid node in a parent `StateGraph<ParentState>` that maps state
33//! across the boundary. Recursive sub-agent dispatch follows the
34//! same pattern as any other `Runnable` composition.
35//!
36//! ## Information density
37//!
38//! Events emitted to the sink carry the full observability surface
39//! (ids, durations, classifications). The agent's own re-feed path
40//! to the model — when consuming a `ToolComplete` to synthesize the
41//! next message — reads only the LLM-facing fields (`output`,
42//! `delta`, lean `error_message`). Test fixtures verify both
43//! surfaces independently.
44
45mod approval_layer;
46mod approver;
47mod event;
48mod mode;
49mod observer;
50mod result;
51mod sink;
52mod tool_event_layer;
53mod tool_hook_layer;
54
55use std::sync::Arc;
56
57use async_trait::async_trait;
58use entelix_core::context::ExecutionContext;
59use entelix_core::error::{Error, Result};
60use entelix_runnable::Runnable;
61use entelix_runnable::stream::BoxStream;
62use tracing::Instrument;
63
64pub use self::approval_layer::{
65    ApprovalLayer, ApprovalService, EffectGate, ToolApprovalEventSink, ToolApprovalEventSinkHandle,
66};
67pub use self::approver::{
68    AlwaysApprove, ApprovalDecision, ApprovalRequest, Approver, ChannelApprover,
69    ChannelApproverConfig, PendingApproval,
70};
71pub use self::event::AgentEvent;
72pub use self::mode::ExecutionMode;
73pub use self::observer::{AgentObserver, DynObserver};
74pub use self::result::AgentRunResult;
75pub use self::sink::{
76    AgentEventSink, BroadcastSink, CaptureSink, ChannelSink, DroppingSink, FailOpenSink,
77    FanOutSink, StateErasureSink,
78};
79pub use self::tool_event_layer::{ToolEventLayer, ToolEventService};
80pub use self::tool_hook_layer::{
81    ToolHook, ToolHookDecision, ToolHookLayer, ToolHookRegistry, ToolHookRequest, ToolHookService,
82};
83
84/// Production agent runtime.
85///
86/// Construct via [`Agent::builder`]; finalize with
87/// [`AgentBuilder::build`]. See module docs for the abstraction
88/// model and information-density discipline.
89pub struct Agent<S>
90where
91    S: Clone + Send + Sync + 'static,
92{
93    name: String,
94    runnable: Arc<dyn Runnable<S, S>>,
95    sink: Arc<dyn AgentEventSink<S>>,
96    observers: Vec<DynObserver<S>>,
97    execution_mode: ExecutionMode,
98    approver: Option<Arc<dyn Approver>>,
99}
100
101impl<S> Agent<S>
102where
103    S: Clone + Send + Sync + 'static,
104{
105    /// Start a fluent builder.
106    #[must_use]
107    pub fn builder() -> AgentBuilder<S> {
108        AgentBuilder::default()
109    }
110
111    /// Borrow the agent's stable name. Always non-empty —
112    /// `AgentBuilder::build` rejects empty / unset names so trace
113    /// correlation never silently breaks.
114    #[must_use]
115    pub fn name(&self) -> &str {
116        &self.name
117    }
118
119    /// Borrow the underlying runnable. Useful when an agent is
120    /// embedded as a node in a larger graph and the parent needs
121    /// direct access to the inner shape (e.g. for checkpointing).
122    #[must_use]
123    pub fn inner(&self) -> &Arc<dyn Runnable<S, S>> {
124        &self.runnable
125    }
126
127    /// Run to completion, returning the terminal state.
128    ///
129    /// Emits a `Started{run_id}` opener, fires every registered
130    /// observer at the appropriate lifecycle point, then emits
131    /// either `Complete{run_id, state}` or `Failed{run_id, error}`
132    /// on the sink — every run produces exactly one terminal event
133    ///.
134    ///
135    /// The `run_id` is inherited from `ctx.run_id()` when present,
136    /// otherwise a fresh UUID v7 is generated and propagated to the
137    /// inner runnable through a cloned context.
138    pub async fn execute(&self, input: S, ctx: &ExecutionContext) -> Result<AgentRunResult<S>> {
139        let (run_id, scoped_ctx) = Self::scoped_run_context(ctx);
140        let parent_run_id = scoped_ctx.parent_run_id().map(str::to_owned);
141        let ctx = &scoped_ctx;
142
143        // Attach the type-erased approval-event sink handle so any
144        // `ApprovalLayer` deeper in the dispatch stack can emit
145        // `ToolCallApproved` / `ToolCallDenied` through the agent's
146        // typed sink without taking it as a constructor arg
147        // (which would tie the layer to a specific `S`). The
148        // attachment is unconditional — operators without an
149        // `ApprovalLayer` pay only the cost of an unused
150        // `Extensions` slot.
151        let scoped_with_sink =
152            ctx.clone()
153                .add_extension(ToolApprovalEventSinkHandle::for_agent_sink(Arc::clone(
154                    &self.sink,
155                )));
156
157        self.execute_inner(input, run_id, parent_run_id, &scoped_with_sink)
158            .await
159    }
160
161    /// Convenience entry that attaches a [`entelix_core::RunOverrides`] extension
162    /// to the context for the duration of the call. Equivalent to
163    /// `agent.execute(input, &ctx.add_extension(overrides))` —
164    /// shorter at the call site and signals the per-call shape at
165    /// a glance.
166    ///
167    /// **Asymmetric by design** — `RunOverrides` is the
168    /// agent-loop carrier (model / system prompt / max iterations
169    /// owned by the loop the `Agent` itself drives) so a typed
170    /// convenience belongs on `Agent`. `RequestOverrides`
171    /// (temperature / top_p / top_k / max_tokens / stop_sequences /
172    /// reasoning_effort / tool_choice / response_format /
173    /// parallel_tool_calls) is `ChatModel`-shaped — the dispatch
174    /// layer downstream picks it up via
175    /// `ExecutionContext::add_extension(RequestOverrides::new()…)`,
176    /// no agent-side convenience is needed (and adding one would
177    /// duplicate the orthogonality that the carrier split S99
178    /// established).
179    ///
180    /// Operators threading multiple per-call extensions stay on
181    /// [`Self::execute`] with their own `add_extension` chain.
182    pub async fn execute_with(
183        &self,
184        input: S,
185        overrides: entelix_core::RunOverrides,
186        ctx: &ExecutionContext,
187    ) -> Result<AgentRunResult<S>> {
188        let scoped = ctx.clone().add_extension(overrides);
189        self.execute(input, &scoped).await
190    }
191
192    async fn execute_inner(
193        &self,
194        input: S,
195        run_id: String,
196        parent_run_id: Option<String>,
197        ctx: &ExecutionContext,
198    ) -> Result<AgentRunResult<S>> {
199        self.sink
200            .send(AgentEvent::Started {
201                run_id: run_id.clone(),
202                tenant_id: ctx.tenant_id().clone(),
203                parent_run_id: parent_run_id.clone(),
204                agent: self.name.clone(),
205            })
206            .await?;
207
208        // The model + tool service-layer spans fire inside
209        // `run_inner`. Instrumenting *that* future with the
210        // agent-run span makes those layer spans children of the
211        // agent root in the OTel trace tree — operators see one
212        // tree per agent run instead of layer spans floating
213        // side by side. `Started` / `Failed` / `Complete` are
214        // sink emissions, not tracing spans, so they live
215        // outside the instrumented future without losing
216        // observability.
217        let outcome = self
218            .run_inner(input, run_id.clone(), ctx)
219            .instrument(self.run_span(&run_id, ctx))
220            .await;
221        match outcome {
222            Ok(result) => {
223                self.sink
224                    .send(AgentEvent::Complete {
225                        run_id: result.run_id.clone(),
226                        tenant_id: ctx.tenant_id().clone(),
227                        state: result.state.clone(),
228                        usage: result.usage,
229                    })
230                    .await?;
231                Ok(result)
232            }
233            Err(err) => {
234                if let entelix_core::Error::UsageLimitExceeded(breach) = &err
235                    && let Some(handle) = ctx.audit_sink()
236                {
237                    handle.as_sink().record_usage_limit_exceeded(breach);
238                }
239                // Best-effort `Failed` emission — if the sink itself
240                // errors (dropped receiver), swallow the secondary
241                // error so the original surfaces unchanged.
242                let envelope = err.envelope();
243                let _ = self
244                    .sink
245                    .send(AgentEvent::Failed {
246                        run_id,
247                        tenant_id: ctx.tenant_id().clone(),
248                        error: err.to_string(),
249                        envelope,
250                    })
251                    .await;
252                Err(err)
253            }
254        }
255    }
256
257    /// Build the `entelix.agent.run` tracing span for one
258    /// `execute` / `execute_stream` invocation. Span fields use
259    /// the `gen_ai.agent.*` / `entelix.*` namespaces that match
260    /// the rest of the OTel surface so dashboards joining on
261    /// run_id stay consistent across the layer / agent stack.
262    ///
263    /// The six `gen_ai.usage.*` / `entelix.usage.*` fields are
264    /// declared as [`tracing::field::Empty`] placeholders and
265    /// populated by [`Self::run_inner`] at the
266    /// [`AgentRunResult::usage`] freeze point — a `RunBudget`
267    /// attached to the [`ExecutionContext`] surfaces its frozen
268    /// counters as span attributes so OTel dashboards filter
269    /// per-run consumption without consumers having to harvest
270    /// the envelope return value. Runs without a budget keep the
271    /// fields `Empty`; the `tracing-opentelemetry` bridge omits
272    /// empty fields from the exported span attributes.
273    fn run_span(&self, run_id: &str, ctx: &ExecutionContext) -> tracing::Span {
274        tracing::info_span!(
275            target: "gen_ai",
276            "entelix.agent.run",
277            gen_ai.agent.name = %self.name,
278            entelix.run_id = %run_id,
279            entelix.tenant_id = %ctx.tenant_id(),
280            entelix.thread_id = ctx.thread_id(),
281            gen_ai.usage.input_tokens = tracing::field::Empty,
282            gen_ai.usage.output_tokens = tracing::field::Empty,
283            gen_ai.usage.total_tokens = tracing::field::Empty,
284            entelix.agent.usage.cost = tracing::field::Empty,
285            entelix.usage.requests = tracing::field::Empty,
286            entelix.usage.tool_calls = tracing::field::Empty,
287        )
288    }
289
290    /// Drive observers + inner runnable as one cohesive unit so the
291    /// outer terminal-event branching (`Complete` vs `Failed`) only
292    /// matches once.
293    ///
294    /// The [`AgentRunResult::usage`] snapshot is captured *between*
295    /// `runnable.invoke` returning and `on_complete` firing —
296    /// observer dispatches may themselves consume budget through
297    /// downstream `ChatModel` calls (memory consolidation, summary
298    /// writes), and the envelope must reflect the agent run only
299    ///.
300    async fn run_inner(
301        &self,
302        input: S,
303        run_id: String,
304        ctx: &ExecutionContext,
305    ) -> Result<AgentRunResult<S>> {
306        for observer in &self.observers {
307            observer.pre_turn(&input, ctx).await?;
308        }
309        let state = match self.runnable.invoke(input, ctx).await {
310            Ok(state) => state,
311            Err(err) => {
312                // HITL pause-and-resume is a control signal, not a
313                // failure — observers wanting interrupt observation
314                // consume `AgentEvent::Interrupted` from the sink.
315                if !matches!(err, Error::Interrupted { .. }) {
316                    for observer in &self.observers {
317                        // Failure-path observability is one-way:
318                        // observer errors raised from on_error get
319                        // dropped (with a tracing warn) so they don't
320                        // replace the original error in flight.
321                        // Mirrors the audit-sink contract from
322                        // invariant 18 /
323                        if let Err(observer_err) = observer.on_error(&err, ctx).await {
324                            tracing::warn!(
325                                observer = %observer.name(),
326                                source = %observer_err,
327                                "AgentObserver::on_error returned an error; dropping"
328                            );
329                        }
330                    }
331                }
332                return Err(err);
333            }
334        };
335        let usage = ctx.run_budget().map(|budget| budget.snapshot());
336        // Mirror the frozen snapshot onto the `entelix.agent.run`
337        // span declared in `run_span` — `Span::current()` here is
338        // that root span (this future is `.instrument`-ed by
339        // `execute_inner` / `book_end_stream`). Runs without a
340        // budget leave the fields as `tracing::field::Empty`, which
341        // the `tracing-opentelemetry` bridge drops from the
342        // exported span (no zero-valued attributes ride through).
343        if let Some(snapshot) = usage {
344            let span = tracing::Span::current();
345            span.record("gen_ai.usage.input_tokens", snapshot.input_tokens);
346            span.record("gen_ai.usage.output_tokens", snapshot.output_tokens);
347            span.record("gen_ai.usage.total_tokens", snapshot.total_tokens());
348            // `Decimal` lacks a `tracing::Value` impl; serialise to
349            // string. OTel exporters round-trip the cost as a
350            // string-typed attribute (the `Decimal` representation
351            // is the most accurate; consumers parse on read).
352            //
353            // Attribute name `entelix.agent.usage.cost` is distinct
354            // from `OtelLayer`'s per-model-call `gen_ai.usage.cost`
355            // — that one is a per-charge increment (`f64`), this one
356            // is the per-run cumulative roll-up (`Decimal` Display).
357            // Same key would conflate two different metrics on the
358            // dashboard; the namespace split mirrors the existing
359            // `entelix.usage.requests` / `entelix.usage.tool_calls`
360            // pattern (per-run aggregates ride the `entelix.*`
361            // namespace; per-call vendor signals ride `gen_ai.*`).
362            span.record(
363                "entelix.agent.usage.cost",
364                tracing::field::display(&snapshot.cost_usd),
365            );
366            span.record("entelix.usage.requests", snapshot.requests);
367            span.record("entelix.usage.tool_calls", snapshot.tool_calls);
368        }
369        // on_complete fires before any terminal event so observers
370        // can mutate side-channel state (vector store writes,
371        // summary persistence) and have those writes reflected in
372        // the same audit trail row.
373        for observer in &self.observers {
374            observer.on_complete(&state, ctx).await?;
375        }
376        Ok(AgentRunResult::new(state, run_id, usage))
377    }
378
379    /// Compute `(run_id, ctx_with_run_id_when_minted)` for an entry
380    /// Mint a fresh run id for this `Agent::execute` and rebase the
381    /// context so the inner runnable sees `(run_id = fresh,
382    /// parent_run_id = Some(caller's run_id))`. Top-level runs land
383    /// with `parent_run_id = None`; sub-agent dispatches preserve
384    /// the parent's id under `parent_run_id` so `(run_id,
385    /// parent_run_id)` edges reconstruct the trace tree.
386    ///
387    /// Always mints — never reuses the caller's id. Recipes that
388    /// pre-allocated a `run_id` for an external reservation see
389    /// their id flow through as `parent_run_id` of the agent's run,
390    /// keeping a deterministic correlation without flattening the
391    /// hierarchy.
392    fn scoped_run_context(ctx: &ExecutionContext) -> (String, ExecutionContext) {
393        let fresh = uuid::Uuid::now_v7().to_string();
394        let mut scoped = ctx.clone().with_run_id(fresh.clone());
395        if let Some(parent) = ctx.run_id() {
396            scoped = scoped.with_parent_run_id(parent.to_owned());
397        }
398        (fresh, scoped)
399    }
400
401    /// Borrow the configured execution mode.
402    #[must_use]
403    pub const fn execution_mode(&self) -> ExecutionMode {
404        self.execution_mode
405    }
406
407    /// Borrow the configured approver (`None` in `Auto` mode).
408    #[must_use]
409    pub fn approver(&self) -> Option<&Arc<dyn Approver>> {
410        self.approver.as_ref()
411    }
412
413    /// Number of registered lifecycle observers.
414    #[must_use]
415    pub fn observer_count(&self) -> usize {
416        self.observers.len()
417    }
418
419    /// Run with `AgentEvent` book-ends as a stream. Sinks
420    /// attached at construction time receive the same events for
421    /// fan-out telemetry.
422    ///
423    /// The returned stream is the caller-facing view; the sink
424    /// is the observability-facing view. **Both observe the same
425    /// `Started` → `Complete(state)` sequence** that
426    /// [`Self::execute`] produces — the only difference is the
427    /// return shape (a stream of events vs the awaited terminal
428    /// state).
429    ///
430    /// Drives the inner runnable via [`Runnable::invoke`] rather
431    /// than [`Runnable::stream`] so the lifecycle is identical to
432    /// `execute` and the `Complete` event always fires on
433    /// successful runs (a previous design routed through
434    /// `Runnable::stream`'s `Updates` mode and could silently skip
435    /// `Complete` for runnables that emit no per-node updates).
436    ///
437    /// Construction is synchronous and infallible — every event
438    /// (including the initial `Started`) yields lazily as the
439    /// stream is polled. Callers consume with `.next().await` like
440    /// any `Stream`; no extra `.await` on the constructor itself.
441    #[must_use]
442    pub fn execute_stream<'a>(
443        &'a self,
444        input: S,
445        ctx: &'a ExecutionContext,
446    ) -> BoxStream<'a, Result<AgentEvent<S>>> {
447        Box::pin(self.book_end_stream(input, ctx))
448    }
449
450    /// Async-stream body for `execute_stream`. Mirrors
451    /// [`Self::execute`] exactly so observers / sinks see one
452    /// canonical lifecycle regardless of which surface the caller
453    /// picked.
454    #[allow(clippy::redundant_async_block)]
455    fn book_end_stream<'a>(
456        &'a self,
457        input: S,
458        ctx: &'a ExecutionContext,
459    ) -> impl futures::Stream<Item = Result<AgentEvent<S>>> + Send + 'a {
460        async_stream::stream! {
461            let (run_id, scoped) = Self::scoped_run_context(ctx);
462            let parent_run_id = scoped.parent_run_id().map(str::to_owned);
463            // Keep `scoped` alive across the `await` boundaries below
464            // so the run-id-stamped child context lives for the whole call.
465            let inner_ctx: &ExecutionContext = &scoped;
466
467            // Started book-end (sink + caller stream).
468            let started = AgentEvent::Started {
469                run_id: run_id.clone(),
470                tenant_id: inner_ctx.tenant_id().clone(),
471                parent_run_id,
472                agent: self.name.clone(),
473            };
474            self.sink.send(started.clone()).await?;
475            yield Ok(started);
476
477            // Same instrument pattern as `execute` — model +
478            // tool layer spans inside `run_inner` nest under the
479            // agent-run span. Sink emissions (book-end events)
480            // stay outside the span; they're sink-only, not
481            // tracing events.
482            let tenant_id = inner_ctx.tenant_id().clone();
483            let outcome = self
484                .run_inner(input, run_id.clone(), inner_ctx)
485                .instrument(self.run_span(&run_id, inner_ctx))
486                .await;
487            // `scoped` lives at least until here — the borrow above is
488            // valid for the whole stream body.
489            drop(scoped);
490            match outcome {
491                Ok(result) => {
492                    let complete = AgentEvent::Complete {
493                        run_id: result.run_id,
494                        tenant_id,
495                        state: result.state,
496                        usage: result.usage,
497                    };
498                    self.sink.send(complete.clone()).await?;
499                    yield Ok(complete);
500                }
501                Err(err) => {
502                    let envelope = err.envelope();
503                    let failed = AgentEvent::Failed {
504                        run_id,
505                        tenant_id,
506                        error: err.to_string(),
507                        envelope,
508                    };
509                    let _ = self.sink.send(failed.clone()).await;
510                    yield Ok(failed);
511                    yield Err(err);
512                }
513            }
514        }
515    }
516}
517
518impl<S> std::fmt::Debug for Agent<S>
519where
520    S: Clone + Send + Sync + 'static,
521{
522    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
523        f.debug_struct("Agent")
524            .field("name", &self.name)
525            .finish_non_exhaustive()
526    }
527}
528
529/// `Agent<S>` is itself a [`Runnable<S, S>`] so it composes inside
530/// larger graphs (recursive sub-agent dispatch).
531///
532/// The composition contract is `S → S`, so the [`AgentRunResult`]
533/// envelope is unwrapped here — composing graphs see only the
534/// terminal state. Callers that need the per-run `UsageSnapshot`
535/// or `run_id` go through [`Agent::execute`] directly.
536#[async_trait]
537impl<S> Runnable<S, S> for Agent<S>
538where
539    S: Clone + Send + Sync + 'static,
540{
541    async fn invoke(&self, input: S, ctx: &ExecutionContext) -> Result<S> {
542        self.execute(input, ctx)
543            .await
544            .map(AgentRunResult::into_state)
545    }
546}
547
548/// Fluent builder for [`Agent<S>`].
549///
550/// Required fields (build fails otherwise):
551/// - `name` — non-empty, surfaces in `AgentEvent::Started { agent }`
552///   and `OTel` spans for trace correlation
553/// - `runnable` — the inner state machine the agent drives
554///
555/// Optional fields with sensible defaults:
556/// - `sink`: [`DroppingSink`] (telemetry-free)
557/// - `observers`: empty (no lifecycle hooks fire)
558/// - `execution_mode`: [`ExecutionMode::Auto`]
559/// - `approver`: `None` (only meaningful in `Supervised` mode)
560pub struct AgentBuilder<S>
561where
562    S: Clone + Send + Sync + 'static,
563{
564    name: Option<String>,
565    runnable: Option<Arc<dyn Runnable<S, S>>>,
566    sinks: Vec<Arc<dyn AgentEventSink<S>>>,
567    observers: Vec<DynObserver<S>>,
568    execution_mode: ExecutionMode,
569    approver: Option<Arc<dyn Approver>>,
570}
571
572impl<S> Default for AgentBuilder<S>
573where
574    S: Clone + Send + Sync + 'static,
575{
576    fn default() -> Self {
577        Self {
578            name: None,
579            runnable: None,
580            sinks: Vec::new(),
581            observers: Vec::new(),
582            execution_mode: ExecutionMode::default(),
583            approver: None,
584        }
585    }
586}
587
588impl<S> AgentBuilder<S>
589where
590    S: Clone + Send + Sync + 'static,
591{
592    /// Set the agent's stable identifier — required, surfaces in
593    /// `AgentEvent::Started { agent }` and `OTel` spans for trace
594    /// correlation. Must be non-empty after `Into::into`; the
595    /// build call returns `Error::Config` otherwise.
596    #[must_use]
597    pub fn with_name(mut self, name: impl Into<String>) -> Self {
598        self.name = Some(name.into());
599        self
600    }
601
602    /// Required — the agent's underlying [`Runnable<S, S>`]. The
603    /// build call returns `Error::Config` otherwise.
604    #[must_use]
605    pub fn with_runnable<R>(mut self, runnable: R) -> Self
606    where
607        R: Runnable<S, S> + 'static,
608    {
609        self.runnable = Some(Arc::new(runnable));
610        self
611    }
612
613    /// Reuse an `Arc<dyn Runnable<S, S>>` directly — useful when
614    /// the inner has already been boxed elsewhere (recipes do this
615    /// to share a `CompiledGraph<S>` between the agent and other
616    /// composition sites).
617    #[must_use]
618    pub fn with_runnable_arc(mut self, runnable: Arc<dyn Runnable<S, S>>) -> Self {
619        self.runnable = Some(runnable);
620        self
621    }
622
623    /// Append an event sink. Multiple calls accumulate — the agent
624    /// dispatches each emitted event to every registered sink in
625    /// registration order via an internal [`FanOutSink`]. Sinks
626    /// added earlier run first; a sink that returns `Err` halts the
627    /// run before later sinks see the event, so operators add
628    /// must-succeed sinks (audit, compliance) first and wrap
629    /// best-effort sinks (telemetry, embedding indexer) with
630    /// [`FailOpenSink`]. Empty registrations resolve to
631    /// [`DroppingSink`] at build time.
632    #[must_use]
633    pub fn add_sink<K>(mut self, sink: K) -> Self
634    where
635        K: AgentEventSink<S> + 'static,
636    {
637        self.sinks.push(Arc::new(sink));
638        self
639    }
640
641    /// Append a pre-erased `Arc<dyn AgentEventSink<S>>` — useful
642    /// when the sink has already been boxed elsewhere.
643    #[must_use]
644    pub fn add_sink_arc(mut self, sink: Arc<dyn AgentEventSink<S>>) -> Self {
645        self.sinks.push(sink);
646        self
647    }
648
649    /// Register a lifecycle observer. Observers are appended in
650    /// registration order; the agent fires them in that order at
651    /// each lifecycle event. Multiple observers are supported via
652    /// repeated calls.
653    #[must_use]
654    pub fn with_observer<O>(mut self, observer: O) -> Self
655    where
656        O: AgentObserver<S> + 'static,
657    {
658        self.observers.push(Arc::new(observer));
659        self
660    }
661
662    /// Register an `Arc<dyn AgentObserver<S>>` directly — useful
663    /// when the observer is also held by another consumer (e.g.
664    /// the same observer drives both an agent and an HTTP route
665    /// for direct inspection).
666    #[must_use]
667    pub fn with_observer_arc(mut self, observer: DynObserver<S>) -> Self {
668        self.observers.push(observer);
669        self
670    }
671
672    /// Defaults to [`ExecutionMode::Auto`]. `Supervised` requires an
673    /// [`Approver`] (set via [`Self::with_approver`]); the call to
674    /// [`Self::build`] returns `Error::Config` if mode is
675    /// `Supervised` and no approver was registered.
676    #[must_use]
677    pub const fn with_execution_mode(mut self, mode: ExecutionMode) -> Self {
678        self.execution_mode = mode;
679        self
680    }
681
682    /// Attach the approver used in `Supervised` mode. Has no
683    /// effect in `Auto` mode but is preserved across builder
684    /// calls so a single fluent chain can configure both modes
685    /// before deciding.
686    #[must_use]
687    pub fn with_approver<A>(mut self, approver: A) -> Self
688    where
689        A: Approver + 'static,
690    {
691        self.approver = Some(Arc::new(approver));
692        self
693    }
694
695    /// Reuse an `Arc<dyn Approver>` directly.
696    #[must_use]
697    pub fn with_approver_arc(mut self, approver: Arc<dyn Approver>) -> Self {
698        self.approver = Some(approver);
699        self
700    }
701
702    /// Finalize. Returns [`entelix_core::Error::Config`] when:
703    /// - `name` was not set or is empty (every agent must be
704    ///   identifiable in traces — empty-string defaults silently
705    ///   destroy correlation), or
706    /// - `runnable` was not set, or
707    /// - `execution_mode` is `Supervised` but no `approver` was
708    ///   registered (supervised mode without an approver is a
709    ///   programming error — there is no decision-maker).
710    pub fn build(self) -> Result<Agent<S>> {
711        let name = self.name.filter(|n| !n.is_empty()).ok_or_else(|| {
712            entelix_core::Error::config(
713                "AgentBuilder::build: name is required and must be non-empty \
714                     (call .with_name(...) — surfaces in AgentEvent::Started and OTel spans)",
715            )
716        })?;
717        let runnable = self.runnable.ok_or_else(|| {
718            entelix_core::Error::config(
719                "AgentBuilder::build: runnable is required (call .with_runnable(...) or .with_runnable_arc(...))",
720            )
721        })?;
722        if self.execution_mode.requires_approval() && self.approver.is_none() {
723            return Err(entelix_core::Error::config(
724                "AgentBuilder::build: ExecutionMode::Supervised requires an Approver \
725                 (call .with_approver(...) or .with_approver_arc(...))",
726            ));
727        }
728        let sink: Arc<dyn AgentEventSink<S>> = match self.sinks.len() {
729            0 => Arc::new(DroppingSink),
730            1 => self
731                .sinks
732                .into_iter()
733                .next()
734                .unwrap_or_else(|| unreachable!("len()==1 guarantees a value")),
735            _ => {
736                let mut fan = FanOutSink::<S>::new();
737                for sink in self.sinks {
738                    fan = fan.push(sink);
739                }
740                Arc::new(fan)
741            }
742        };
743        Ok(Agent {
744            name,
745            runnable,
746            sink,
747            observers: self.observers,
748            execution_mode: self.execution_mode,
749            approver: self.approver,
750        })
751    }
752}
753
754#[cfg(test)]
755#[allow(clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing)]
756mod tests {
757    use entelix_runnable::RunnableLambda;
758    use futures::StreamExt;
759
760    use super::*;
761
762    fn echo_runnable() -> impl Runnable<i32, i32> {
763        RunnableLambda::new(|n: i32, _ctx| async move { Ok::<_, _>(n + 1) })
764    }
765
766    #[tokio::test]
767    async fn build_requires_name() {
768        let err = Agent::<i32>::builder()
769            .with_runnable(echo_runnable())
770            .build()
771            .unwrap_err();
772        assert!(format!("{err}").contains("name is required"));
773    }
774
775    #[tokio::test]
776    async fn build_rejects_empty_name() {
777        // Empty string defaults destroy trace correlation — guard
778        // against the silent-failure mode at build time.
779        let err = Agent::<i32>::builder()
780            .with_name("")
781            .with_runnable(echo_runnable())
782            .build()
783            .unwrap_err();
784        assert!(format!("{err}").contains("name is required"));
785    }
786
787    #[tokio::test]
788    async fn build_requires_runnable() {
789        let err = Agent::<i32>::builder()
790            .with_name("needs-runnable")
791            .build()
792            .unwrap_err();
793        assert!(format!("{err}").contains("runnable is required"));
794    }
795
796    #[tokio::test]
797    async fn execute_drives_inner_and_emits_book_ends() {
798        let sink = CaptureSink::<i32>::new();
799        let agent = Agent::<i32>::builder()
800            .with_name("test-agent")
801            .with_runnable(echo_runnable())
802            .add_sink(sink.clone())
803            .build()
804            .unwrap();
805
806        let result = agent.execute(41, &ExecutionContext::new()).await.unwrap();
807        assert_eq!(result.state, 42);
808        assert!(!result.run_id.is_empty(), "run_id must be minted");
809        assert!(
810            result.usage.is_none(),
811            "no RunBudget on ctx → envelope.usage is None"
812        );
813        let events = sink.events();
814        assert_eq!(events.len(), 2);
815        assert!(matches!(&events[0], AgentEvent::Started { agent, .. } if agent == "test-agent"));
816        assert!(matches!(events[1], AgentEvent::Complete { state: 42, .. }));
817    }
818
819    #[tokio::test]
820    async fn execute_envelope_carries_frozen_usage_snapshot_when_budget_is_attached() {
821        // With a RunBudget on the context, the envelope's `usage`
822        // is `Some(snapshot)` — frozen at the moment the inner
823        // runnable returned. Subsequent budget mutations (which
824        // would happen in a real downstream layer) MUST NOT be
825        // reflected in the snapshot we already returned.
826        use entelix_core::RunBudget;
827
828        let sink = CaptureSink::<i32>::new();
829        let agent = Agent::<i32>::builder()
830            .with_name("budgeted-agent")
831            .with_runnable(echo_runnable())
832            .add_sink(sink.clone())
833            .build()
834            .unwrap();
835
836        // `unlimited()` plus a non-`None` `request_limit` so the
837        // pre-call CAS actually increments — `check_pre_request`
838        // early-returns when no cap is attached and never touches
839        // the counter (the runtime path that consumes a budget
840        // always sets at least one cap, so this is the realistic
841        // shape).
842        let budget = RunBudget::unlimited().with_request_limit(100);
843        budget.check_pre_request().unwrap();
844        let ctx = ExecutionContext::new().with_run_budget(budget.clone());
845
846        let result = agent.execute(0, &ctx).await.unwrap();
847        let snapshot = result.usage.expect("budget attached → usage Some");
848        assert_eq!(snapshot.requests, 1, "snapshot reflects pre-stamped count");
849
850        // Mutate the budget after the run — the snapshot must NOT
851        // change. This is the frozen-at-terminal contract.
852        budget.check_pre_request().unwrap();
853        assert_eq!(
854            snapshot.requests, 1,
855            "snapshot is frozen — not Arc-shared with live counter",
856        );
857
858        // The matching `Complete` sink event carries the same
859        // snapshot — operators wiring telemetry through the sink
860        // see the same artifact as direct callers (one-shot vs
861        // streaming surfaces match).
862        let events = sink.events();
863        let complete = events
864            .iter()
865            .find_map(|event| match event {
866                AgentEvent::Complete { usage, .. } => Some(*usage),
867                _ => None,
868            })
869            .expect("Complete event must be emitted");
870        assert_eq!(complete, Some(snapshot));
871    }
872
873    #[tokio::test]
874    async fn agent_is_runnable_so_it_composes() {
875        // Demonstrate that Agent<S>: Runnable<S, S> — the agent is
876        // itself usable as a node in a larger composition.
877        let inner = Agent::<i32>::builder()
878            .with_name("composed-inner")
879            .with_runnable(echo_runnable())
880            .build()
881            .unwrap();
882        let composed: Arc<dyn Runnable<i32, i32>> = Arc::new(inner);
883        let result = composed.invoke(10, &ExecutionContext::new()).await.unwrap();
884        assert_eq!(result, 11);
885    }
886
887    #[tokio::test]
888    async fn execute_stream_emits_started_and_complete() {
889        let sink = CaptureSink::<i32>::new();
890        let agent = Agent::<i32>::builder()
891            .with_name("streamer")
892            .with_runnable(echo_runnable())
893            .add_sink(sink.clone())
894            .build()
895            .unwrap();
896
897        let ctx = ExecutionContext::new();
898        let mut stream = agent.execute_stream(7, &ctx);
899        let mut received = Vec::new();
900        while let Some(event) = stream.next().await {
901            received.push(event.unwrap());
902        }
903
904        // Events should be: Started → Complete(8). Both surfaces
905        // (caller stream + sink) see the same sequence.
906        assert!(matches!(received[0], AgentEvent::Started { .. }));
907        assert!(matches!(
908            received.last(),
909            Some(AgentEvent::Complete {
910                state: 8,
911                usage: None,
912                ..
913            })
914        ));
915        assert_eq!(received.len(), sink.len());
916    }
917
918    #[tokio::test]
919    async fn execute_stream_with_dropping_sink_does_not_block() {
920        // Caller-facing stream still works when sink is a no-op.
921        let agent = Agent::<i32>::builder()
922            .with_name("dropping-sink")
923            .with_runnable(echo_runnable())
924            .build()
925            .unwrap();
926        let ctx = ExecutionContext::new();
927        let mut stream = agent.execute_stream(0, &ctx);
928        let mut count = 0;
929        while stream.next().await.is_some() {
930            count += 1;
931        }
932        assert!(count >= 2, "expected at least Started + Complete");
933    }
934}