entelix_agents/agent/tool_event_layer.rs
1//! `ToolEventLayer<S>` — `tower::Layer<Service<ToolInvocation>>`
2//! middleware emitting per-tool [`AgentEvent`] variants
3//! (`ToolStart` / `ToolComplete` / `ToolError`) to a configured
4//! `AgentEventSink`.
5//!
6//! Wired by recipes alongside the rest of the tool registry. The
7//! agent runtime itself never auto-installs this layer; recipe
8//! code reads as one explicit line:
9//!
10//! ```ignore
11//! let registry = ToolRegistry::new()
12//! .layer(ToolEventLayer::new(sink.clone()))
13//! .register(my_tool)?;
14//! ```
15//!
16//! When the layer fires, the `run_id` it stamps onto each event is
17//! read from `ExecutionContext::run_id()`. Absent context leaves
18//! events unstamped (the layer falls through to the inner service
19//! without emitting), so wiring the layer outside an agent run is
20//! a quiet no-op rather than a panic.
21//!
22//! ## Cancellation
23//!
24//! `ToolStart` is emitted before the inner dispatch begins. If the
25//! dispatch future is dropped while awaiting the sink (cooperative
26//! cancellation, deadline expiry, parent-future drop), observers may
27//! see a `ToolStart` for which no matching `ToolComplete` / `ToolError`
28//! ever arrives. This is inherent to any "emit-before-dispatch" shape
29//! over an async sink — durable correlation lives in the
30//! [`entelix_session::SessionAuditSink`] event-log channel
31//! (`ToolCall` + `ToolResult` are written through
32//! [`AgentEvent::to_graph_event`]), not in the in-flight `AgentEvent`
33//! stream. Consumers that need orphan detection time the
34//! `(ToolStart, run_id, tool_use_id)` triple against an absent
35//! terminal event in their own dashboard.
36
37use std::sync::Arc;
38use std::task::{Context, Poll};
39use std::time::Instant;
40
41use futures::future::BoxFuture;
42use serde_json::Value;
43use tower::{Layer, Service, ServiceExt};
44
45use entelix_core::CurrentToolInvocation;
46use entelix_core::LlmRenderable;
47use entelix_core::error::{Error, Result};
48use entelix_core::service::ToolInvocation;
49
50use crate::agent::event::AgentEvent;
51use crate::agent::sink::AgentEventSink;
52
53/// Layer that emits per-tool `AgentEvent` variants to the
54/// configured sink.
55///
56/// Generic over the agent state type `S` so the
57/// `AgentEvent::ToolStart` / `ToolComplete` / `ToolError` variants
58/// share their state-type parameter with the agent's other events
59/// on a single sink.
60pub struct ToolEventLayer<S>
61where
62 S: Clone + Send + Sync + 'static,
63{
64 sink: Arc<dyn AgentEventSink<S>>,
65}
66
67impl<S> ToolEventLayer<S>
68where
69 S: Clone + Send + Sync + 'static,
70{
71 /// Patch-version-stable identifier surfaced through
72 /// [`entelix_core::tools::ToolRegistry::layer_names`]. Renaming
73 /// this constant is a breaking change for dashboards keyed off
74 /// the value.
75 pub const NAME: &'static str = "tool_event";
76
77 /// Build with a sink. Cloning the layer is cheap (`Arc`-backed).
78 #[must_use]
79 pub fn new(sink: Arc<dyn AgentEventSink<S>>) -> Self {
80 Self { sink }
81 }
82}
83
84impl<S> Clone for ToolEventLayer<S>
85where
86 S: Clone + Send + Sync + 'static,
87{
88 fn clone(&self) -> Self {
89 Self {
90 sink: Arc::clone(&self.sink),
91 }
92 }
93}
94
95impl<S> std::fmt::Debug for ToolEventLayer<S>
96where
97 S: Clone + Send + Sync + 'static,
98{
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("ToolEventLayer").finish_non_exhaustive()
101 }
102}
103
104impl<S, Inner> Layer<Inner> for ToolEventLayer<S>
105where
106 S: Clone + Send + Sync + 'static,
107{
108 type Service = ToolEventService<S, Inner>;
109 fn layer(&self, inner: Inner) -> Self::Service {
110 ToolEventService {
111 inner,
112 sink: Arc::clone(&self.sink),
113 }
114 }
115}
116
117impl<S> entelix_core::NamedLayer for ToolEventLayer<S>
118where
119 S: Clone + Send + Sync + 'static,
120{
121 fn layer_name(&self) -> &'static str {
122 Self::NAME
123 }
124}
125
126/// `Service` produced by [`ToolEventLayer`]. Generic over the inner
127/// service so the layer composes with any tower-stacked tool path.
128pub struct ToolEventService<S, Inner>
129where
130 S: Clone + Send + Sync + 'static,
131{
132 inner: Inner,
133 sink: Arc<dyn AgentEventSink<S>>,
134}
135
136impl<S, Inner: Clone> Clone for ToolEventService<S, Inner>
137where
138 S: Clone + Send + Sync + 'static,
139{
140 fn clone(&self) -> Self {
141 Self {
142 inner: self.inner.clone(),
143 sink: Arc::clone(&self.sink),
144 }
145 }
146}
147
148impl<S, Inner> Service<ToolInvocation> for ToolEventService<S, Inner>
149where
150 S: Clone + Send + Sync + 'static,
151 Inner: Service<ToolInvocation, Response = Value, Error = Error> + Clone + Send + 'static,
152 Inner::Future: Send + 'static,
153{
154 type Response = Value;
155 type Error = Error;
156 type Future = BoxFuture<'static, Result<Value>>;
157
158 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
159 self.inner.poll_ready(cx)
160 }
161
162 fn call(&mut self, mut invocation: ToolInvocation) -> Self::Future {
163 let inner = self.inner.clone();
164 let sink = Arc::clone(&self.sink);
165 Box::pin(async move {
166 // Run-id stamping is a no-op outside an agent run.
167 let run_id = invocation.ctx.run_id().map(str::to_owned);
168 let tenant_id = invocation.ctx.tenant_id().clone();
169 let tool = invocation.metadata.name.clone();
170 let tool_version = invocation.metadata.version.clone();
171 let tool_use_id = invocation.tool_use_id.clone();
172 let input = invocation.input.clone();
173
174 if let Some(rid) = &run_id {
175 let _ = sink
176 .send(AgentEvent::ToolStart {
177 run_id: rid.clone(),
178 tenant_id: tenant_id.clone(),
179 tool_use_id: tool_use_id.clone(),
180 tool: tool.clone(),
181 tool_version: tool_version.clone(),
182 input,
183 })
184 .await;
185 }
186
187 // Stamp the per-dispatch identity marker just before
188 // dispatch so leaf-tool `ctx.record_phase(...)` calls
189 // resolve a stable (tool_use_id, tool_name, started_at)
190 // triple whose `started_at` aligns with the dispatch
191 // baseline below — `dispatch_elapsed_ms` measures
192 // tool-local work without layer-emit overhead leaking in.
193 // `tool_use_id` falls back to the tool name when the
194 // dispatch did not originate from a model `ToolUse` block.
195 let marker_use_id = if tool_use_id.is_empty() {
196 tool.clone()
197 } else {
198 tool_use_id.clone()
199 };
200 if let Ok(marker) = CurrentToolInvocation::new(marker_use_id, tool.clone()) {
201 invocation.ctx = invocation.ctx.clone().add_extension(marker);
202 }
203
204 let started_at = Instant::now();
205 let result = inner.oneshot(invocation).await;
206 let duration_ms = u64::try_from(started_at.elapsed().as_millis()).unwrap_or(u64::MAX);
207
208 match (&result, run_id) {
209 (Ok(output), Some(rid)) => {
210 let _ = sink
211 .send(AgentEvent::ToolComplete {
212 run_id: rid,
213 tenant_id,
214 tool_use_id,
215 tool,
216 tool_version,
217 duration_ms,
218 output: output.clone(),
219 })
220 .await;
221 }
222 (Err(err), Some(rid)) => {
223 let envelope = err.envelope();
224 let _ = sink
225 .send(AgentEvent::ToolError {
226 run_id: rid,
227 tenant_id,
228 tool_use_id,
229 tool,
230 tool_version,
231 error: err.to_string(),
232 error_for_llm: err.for_llm(),
233 envelope,
234 duration_ms,
235 })
236 .await;
237 }
238 _ => {}
239 }
240 result
241 })
242 }
243}