Skip to main content

entelix_agents/agent/
tool_event_layer.rs

1//! `ToolEventLayer<S>` — `tower::Layer<Service<ToolInvocation>>`
2//! middleware emitting per-tool [`AgentEvent`] variants
3//! (`ToolStart` / `ToolComplete` / `ToolError`) to a configured
4//! `AgentEventSink`.
5//!
6//! Wired by recipes alongside the rest of the tool registry. The
7//! agent runtime itself never auto-installs this layer; recipe
8//! code reads as one explicit line:
9//!
10//! ```ignore
11//! let registry = ToolRegistry::new()
12//!     .layer(ToolEventLayer::new(sink.clone()))
13//!     .register(my_tool)?;
14//! ```
15//!
16//! When the layer fires, the `run_id` it stamps onto each event is
17//! read from `ExecutionContext::run_id()`. Absent context leaves
18//! events unstamped (the layer falls through to the inner service
19//! without emitting), so wiring the layer outside an agent run is
20//! a quiet no-op rather than a panic.
21//!
22//! ## Cancellation
23//!
24//! `ToolStart` is emitted before the inner dispatch begins. If the
25//! dispatch future is dropped while awaiting the sink (cooperative
26//! cancellation, deadline expiry, parent-future drop), observers may
27//! see a `ToolStart` for which no matching `ToolComplete` / `ToolError`
28//! ever arrives. This is inherent to any "emit-before-dispatch" shape
29//! over an async sink — durable correlation lives in the
30//! [`entelix_session::SessionAuditSink`] event-log channel
31//! (`ToolCall` + `ToolResult` are written through
32//! [`AgentEvent::to_graph_event`]), not in the in-flight `AgentEvent`
33//! stream. Consumers that need orphan detection time the
34//! `(ToolStart, run_id, tool_use_id)` triple against an absent
35//! terminal event in their own dashboard.
36
37use std::sync::Arc;
38use std::task::{Context, Poll};
39use std::time::Instant;
40
41use futures::future::BoxFuture;
42use serde_json::Value;
43use tower::{Layer, Service, ServiceExt};
44
45use entelix_core::CurrentToolInvocation;
46use entelix_core::LlmRenderable;
47use entelix_core::error::{Error, Result};
48use entelix_core::service::ToolInvocation;
49
50use crate::agent::event::AgentEvent;
51use crate::agent::sink::AgentEventSink;
52
53/// Layer that emits per-tool `AgentEvent` variants to the
54/// configured sink.
55///
56/// Generic over the agent state type `S` so the
57/// `AgentEvent::ToolStart` / `ToolComplete` / `ToolError` variants
58/// share their state-type parameter with the agent's other events
59/// on a single sink.
60pub struct ToolEventLayer<S>
61where
62    S: Clone + Send + Sync + 'static,
63{
64    sink: Arc<dyn AgentEventSink<S>>,
65}
66
67impl<S> ToolEventLayer<S>
68where
69    S: Clone + Send + Sync + 'static,
70{
71    /// Patch-version-stable identifier surfaced through
72    /// [`entelix_core::tools::ToolRegistry::layer_names`]. Renaming
73    /// this constant is a breaking change for dashboards keyed off
74    /// the value.
75    pub const NAME: &'static str = "tool_event";
76
77    /// Build with a sink. Cloning the layer is cheap (`Arc`-backed).
78    #[must_use]
79    pub fn new(sink: Arc<dyn AgentEventSink<S>>) -> Self {
80        Self { sink }
81    }
82}
83
84impl<S> Clone for ToolEventLayer<S>
85where
86    S: Clone + Send + Sync + 'static,
87{
88    fn clone(&self) -> Self {
89        Self {
90            sink: Arc::clone(&self.sink),
91        }
92    }
93}
94
95impl<S> std::fmt::Debug for ToolEventLayer<S>
96where
97    S: Clone + Send + Sync + 'static,
98{
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct("ToolEventLayer").finish_non_exhaustive()
101    }
102}
103
104impl<S, Inner> Layer<Inner> for ToolEventLayer<S>
105where
106    S: Clone + Send + Sync + 'static,
107{
108    type Service = ToolEventService<S, Inner>;
109    fn layer(&self, inner: Inner) -> Self::Service {
110        ToolEventService {
111            inner,
112            sink: Arc::clone(&self.sink),
113        }
114    }
115}
116
117impl<S> entelix_core::NamedLayer for ToolEventLayer<S>
118where
119    S: Clone + Send + Sync + 'static,
120{
121    fn layer_name(&self) -> &'static str {
122        Self::NAME
123    }
124}
125
126/// `Service` produced by [`ToolEventLayer`]. Generic over the inner
127/// service so the layer composes with any tower-stacked tool path.
128pub struct ToolEventService<S, Inner>
129where
130    S: Clone + Send + Sync + 'static,
131{
132    inner: Inner,
133    sink: Arc<dyn AgentEventSink<S>>,
134}
135
136impl<S, Inner: Clone> Clone for ToolEventService<S, Inner>
137where
138    S: Clone + Send + Sync + 'static,
139{
140    fn clone(&self) -> Self {
141        Self {
142            inner: self.inner.clone(),
143            sink: Arc::clone(&self.sink),
144        }
145    }
146}
147
148impl<S, Inner> Service<ToolInvocation> for ToolEventService<S, Inner>
149where
150    S: Clone + Send + Sync + 'static,
151    Inner: Service<ToolInvocation, Response = Value, Error = Error> + Clone + Send + 'static,
152    Inner::Future: Send + 'static,
153{
154    type Response = Value;
155    type Error = Error;
156    type Future = BoxFuture<'static, Result<Value>>;
157
158    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
159        self.inner.poll_ready(cx)
160    }
161
162    fn call(&mut self, mut invocation: ToolInvocation) -> Self::Future {
163        let inner = self.inner.clone();
164        let sink = Arc::clone(&self.sink);
165        Box::pin(async move {
166            // Run-id stamping is a no-op outside an agent run.
167            let run_id = invocation.ctx.run_id().map(str::to_owned);
168            let tenant_id = invocation.ctx.tenant_id().clone();
169            let tool = invocation.metadata.name.clone();
170            let tool_version = invocation.metadata.version.clone();
171            let tool_use_id = invocation.tool_use_id.clone();
172            let input = invocation.input.clone();
173
174            if let Some(rid) = &run_id {
175                let _ = sink
176                    .send(AgentEvent::ToolStart {
177                        run_id: rid.clone(),
178                        tenant_id: tenant_id.clone(),
179                        tool_use_id: tool_use_id.clone(),
180                        tool: tool.clone(),
181                        tool_version: tool_version.clone(),
182                        input,
183                    })
184                    .await;
185            }
186
187            // Stamp the per-dispatch identity marker just before
188            // dispatch so leaf-tool `ctx.record_phase(...)` calls
189            // resolve a stable (tool_use_id, tool_name, started_at)
190            // triple whose `started_at` aligns with the dispatch
191            // baseline below — `dispatch_elapsed_ms` measures
192            // tool-local work without layer-emit overhead leaking in.
193            // `tool_use_id` falls back to the tool name when the
194            // dispatch did not originate from a model `ToolUse` block.
195            let marker_use_id = if tool_use_id.is_empty() {
196                tool.clone()
197            } else {
198                tool_use_id.clone()
199            };
200            if let Ok(marker) = CurrentToolInvocation::new(marker_use_id, tool.clone()) {
201                invocation.ctx = invocation.ctx.clone().add_extension(marker);
202            }
203
204            let started_at = Instant::now();
205            let result = inner.oneshot(invocation).await;
206            let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
207
208            match (&result, run_id) {
209                (Ok(output), Some(rid)) => {
210                    let _ = sink
211                        .send(AgentEvent::ToolComplete {
212                            run_id: rid,
213                            tenant_id,
214                            tool_use_id,
215                            tool,
216                            tool_version,
217                            duration_ms,
218                            output: output.clone(),
219                        })
220                        .await;
221                }
222                (Err(err), Some(rid)) => {
223                    let envelope = err.envelope();
224                    let _ = sink
225                        .send(AgentEvent::ToolError {
226                            run_id: rid,
227                            tenant_id,
228                            tool_use_id,
229                            tool,
230                            tool_version,
231                            error: err.to_string(),
232                            error_for_llm: err.for_llm(),
233                            envelope,
234                            duration_ms,
235                        })
236                        .await;
237                }
238                _ => {}
239            }
240            result
241        })
242    }
243}