entelix_core/service.rs
1//! `tower::Service` spine for entelix model + tool calls.
2//!
3//! Two invocation types — [`ModelInvocation`] (request + ctx for a
4//! `Codec`/`Transport` model call) and [`ToolInvocation`] (name +
5//! input + ctx for a `Tool` execution) — flow through a layered
6//! `tower::Service<Request, Response = ...>` stack. Cross-cutting
7//! concerns (PII redaction, rate limiting, cost metering, OTel
8//! observability) are `tower::Layer<S>` middleware. The composition
9//! primitive is `tower::ServiceBuilder`; the dyn-erased handle is
10//! [`BoxedModelService`] / [`BoxedToolService`].
11//!
12//! ## Why `tower::Service` rather than a bespoke `Hook` trait
13//!
14//! `tower` is the de-facto Rust async-middleware ecosystem
15//! (`axum`, `tonic`, `reqwest-middleware`, `tower-http`). Adopting
16//! it means:
17//!
18//! - `ServiceBuilder::new().layer(PolicyLayer).layer(OtelLayer)
19//! .service(model)` is the composition contract — `model` is a
20//! [`ChatModel`](crate::ChatModel)-produced leaf service.
21//! - `Service::poll_ready` gives backpressure for free; layers like
22//! `tower::limit::ConcurrencyLimitLayer`,
23//! `tower::retry::RetryLayer`, and `tower::timeout::TimeoutLayer`
24//! plug in unchanged.
25//! - The same layer (e.g. `PolicyLayer`) wraps both model calls and
26//! tool calls because it has separate `Service<ModelInvocation>`
27//! and `Service<ToolInvocation>` impls behind the same struct.
28
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use futures::future::BoxFuture;
33use serde_json::Value;
34use tower::Service;
35use tower::util::BoxCloneService;
36
37use crate::codecs::BoxDeltaStream;
38use crate::context::ExecutionContext;
39use crate::error::Error;
40use crate::ir::{ModelRequest, ModelResponse};
41use crate::tools::ToolMetadata;
42
43/// One model call's full request + request-scope context. Layers
44/// read both fields; the leaf service consumes them.
45#[derive(Clone, Debug)]
46pub struct ModelInvocation {
47 /// Provider-neutral model request the codec will encode.
48 pub request: ModelRequest,
49 /// Request-scope state (cancellation, deadline, tenant, thread).
50 pub ctx: ExecutionContext,
51}
52
53impl ModelInvocation {
54 /// Bundle `request` + `ctx` into one invocation.
55 pub const fn new(request: ModelRequest, ctx: ExecutionContext) -> Self {
56 Self { request, ctx }
57 }
58}
59
60/// Streaming-side counterpart to [`ModelInvocation`] — the same
61/// `request + ctx` payload but a distinct request type so the
62/// `tower::Service` trait's associated `Response` can resolve to
63/// [`ModelStream`] for the streaming spine while [`ModelInvocation`]
64/// keeps resolving to [`ModelResponse`] for the one-shot spine.
65///
66/// Rust's `Service<Request>` carries `Response` as an associated
67/// type — one trait impl per `(Self, Request)` pair. The wrapper
68/// here is the cleanest way to expose two response types from one
69/// leaf service: the same `InnerChatModel<C, T>` implements
70/// `Service<ModelInvocation, Response = ModelResponse>` and
71/// `Service<StreamingModelInvocation, Response = ModelStream>`,
72/// and layers stack onto each independently.
73///
74/// `#[non_exhaustive]` to keep room for streaming-only knobs
75/// (chunk size hints, partial-output buffers) post-1.0 without
76/// breaking callers.
77#[derive(Clone, Debug)]
78#[non_exhaustive]
79pub struct StreamingModelInvocation {
80 /// The wrapped one-shot invocation. Streaming layers read
81 /// `request` and `ctx` through this — same fields, same
82 /// semantics, different `Service` shape.
83 pub inner: ModelInvocation,
84}
85
86impl StreamingModelInvocation {
87 /// Wrap a [`ModelInvocation`].
88 #[must_use]
89 pub const fn new(inner: ModelInvocation) -> Self {
90 Self { inner }
91 }
92
93 /// Borrow the request — read-side shortcut so layers don't
94 /// have to write `invocation.inner.request`.
95 #[must_use]
96 pub const fn request(&self) -> &ModelRequest {
97 &self.inner.request
98 }
99
100 /// Borrow the context — read-side shortcut.
101 #[must_use]
102 pub const fn ctx(&self) -> &ExecutionContext {
103 &self.inner.ctx
104 }
105}
106
107impl From<ModelInvocation> for StreamingModelInvocation {
108 fn from(inner: ModelInvocation) -> Self {
109 Self::new(inner)
110 }
111}
112
113/// One tool call's identifier + descriptor + input + request-scope
114/// context.
115///
116/// `tool_use_id` carries the IR's stable id so observability layers
117/// can correlate `ToolStart` / `ToolComplete` / `ToolError` events
118/// for the *same* dispatch even when several parallel calls share
119/// the same tool name. `metadata` is the dispatched tool's full
120/// declarative descriptor — name / version / effect / idempotent /
121/// retry hint flow through the layer stack from a single source so
122/// `OtelLayer`, `PolicyLayer`, and retry middleware see one
123/// authoritative struct. Layers may mutate `input` (e.g. PII
124/// redaction).
125#[derive(Clone, Debug)]
126pub struct ToolInvocation {
127 /// Stable tool-use id matching the originating
128 /// `ContentPart::ToolUse::id`. Empty when the call did not
129 /// originate from a model `ToolUse` block (e.g. recipe-driven
130 /// direct dispatch); observability layers fall back to
131 /// `metadata.name` in that case.
132 pub tool_use_id: String,
133 /// Full declarative descriptor of the tool being dispatched —
134 /// shared via `Arc` so layers don't pay a clone per pass.
135 pub metadata: Arc<ToolMetadata>,
136 /// JSON input payload.
137 pub input: Value,
138 /// Request-scope state.
139 pub ctx: ExecutionContext,
140}
141
142impl ToolInvocation {
143 /// Bundle the fields.
144 pub const fn new(
145 tool_use_id: String,
146 metadata: Arc<ToolMetadata>,
147 input: Value,
148 ctx: ExecutionContext,
149 ) -> Self {
150 Self {
151 tool_use_id,
152 metadata,
153 input,
154 ctx,
155 }
156 }
157
158 /// Tool name (shortcut for `self.metadata.name.as_str()`).
159 #[must_use]
160 pub fn name(&self) -> &str {
161 &self.metadata.name
162 }
163
164 /// Tool version (shortcut for `self.metadata.version.as_deref()`).
165 #[must_use]
166 pub fn version(&self) -> Option<&str> {
167 self.metadata.version.as_deref()
168 }
169}
170
171/// Type-erased, cloneable `Service<ModelInvocation>` handle. The
172/// canonical pre-composed shape `ChatModel` exposes via its
173/// `service()` accessor and that user code stores on agents.
174pub type BoxedModelService = BoxCloneService<ModelInvocation, ModelResponse, Error>;
175
176/// Type-erased, cloneable `Service<ToolInvocation>` handle. Tool
177/// dispatch funnels through this; `ToolRegistry` builds it on
178/// demand from a registered `Tool` + the registry's layer stack.
179pub type BoxedToolService = BoxCloneService<ToolInvocation, Value, Error>;
180
181/// Streaming dispatch result returned by
182/// `Service<ModelInvocation, Response = ModelStream>` — the
183/// caller-visible delta stream paired with a future that resolves
184/// to the aggregated terminal response.
185///
186/// The [`Self::stream`] field carries the raw `StreamDelta` flow
187/// (text chunks, tool-use boundaries, usage, rate-limit, warnings,
188/// terminal `Stop`). The [`Self::completion`] future resolves to
189/// `Ok(ModelResponse)` after the stream has been fully consumed
190/// AND a `StreamAggregator` has reconstructed the final response;
191/// it resolves to `Err(...)` if the stream errored mid-flight, was
192/// dropped before terminal `Stop`, or violated the aggregator's
193/// protocol invariants.
194///
195/// Layers (`OtelLayer`, `PolicyLayer`) wrap `completion` to emit
196/// observability / cost events on the **`Ok` branch only** —
197/// invariant 12. A stream that errors mid-flight surfaces the
198/// error through the consumer's stream-side `Err` *and* through
199/// `completion` resolving to `Err`; either way, no cost charge
200/// fires.
201///
202/// `completion` is internally driven by the same stream
203/// `stream` carries — consumers do not need to poll it
204/// separately. The aggregator runs as the consumer drains the
205/// stream; `completion` resolves naturally when the consumer
206/// reads the terminal `Stop` (or drops the stream early, in which
207/// case `completion` resolves `Err`).
208pub struct ModelStream {
209 /// Raw delta stream surfaced to the caller. The wrapper
210 /// produced by `entelix_core::stream::tap_aggregator` taps
211 /// each delta into a `StreamAggregator` as it flows past, so
212 /// the caller sees an unmodified stream while
213 /// [`Self::completion`] receives the aggregated final response
214 /// without a second pass.
215 pub stream: BoxDeltaStream<'static>,
216 /// Future resolving to the aggregated `ModelResponse` after
217 /// the stream has been consumed to its terminal `Stop`. Layers
218 /// wrap this future to gate observability emission on success
219 /// (invariant 12). Consumers that ignore the streaming-side
220 /// completion (e.g. wire it into a fire-and-forget OTel layer)
221 /// do not need to await it directly — dropping the
222 /// `ModelStream` is the canonical "I'm done" signal that lets
223 /// any wrapping layer observe stream-completion regardless of
224 /// whether the consumer polled `completion` itself.
225 pub completion: BoxFuture<'static, Result<ModelResponse, Error>>,
226}
227
228impl std::fmt::Debug for ModelStream {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 f.debug_struct("ModelStream")
231 .field("stream", &"<BoxDeltaStream>")
232 .field("completion", &"<BoxFuture<Result<ModelResponse>>>")
233 .finish()
234 }
235}
236
237/// Type-erased, cloneable `Service<StreamingModelInvocation,
238/// Response = ModelStream>` handle. Parallel to
239/// [`BoxedModelService`] for the streaming dispatch path.
240/// `ChatModel::streaming_service()` produces this; `OtelLayer` /
241/// `PolicyLayer` wrap it the same way they wrap
242/// [`BoxedModelService`] for the one-shot path.
243pub type BoxedStreamingService = BoxCloneService<StreamingModelInvocation, ModelStream, Error>;
244
245/// Convenience: an always-ready `Service` whose `poll_ready` returns
246/// `Poll::Ready(Ok(()))` unconditionally. Most leaf services have
247/// no internal queue and use this shape; layers inherit
248/// `poll_ready` from their inner service.
249pub trait AlwaysReady<Request>: Service<Request> {
250 /// `poll_ready` impl that's always ready. Call from a leaf
251 /// service's `poll_ready` body.
252 #[inline]
253 fn poll_ready_always(_cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
254 Poll::Ready(Ok(()))
255 }
256}
257
258impl<S, Request> AlwaysReady<Request> for S where S: Service<Request> {}
259
260/// Static identity for a [`tower::Layer`] participant in
261/// [`crate::ChatModel::layer`] / `ToolRegistry::layer` composition.
262///
263/// [`crate::ChatModel::layer_names`] walks the composed stack and
264/// surfaces the names through the typed introspection channel —
265/// diagnostic dashboards read `entelix.chat_model.layers` without
266/// parsing prose `Debug` output, conditional-wiring code asserts the
267/// expected stack at boot ("did my staging `OtelLayer` actually
268/// compose in?"), and audit trails distinguish runs whose policy
269/// wiring drifted at deploy time.
270///
271/// **Conventions** (patch-version-stable — renaming is a breaking
272/// change for dashboards keyed off the value):
273///
274/// - `snake_case` ASCII bucketing the layer's primary role
275/// (`"policy"`, `"otel"`, `"retry"`, `"approval"`).
276/// - One canonical name per concrete layer struct, surfaced as
277/// `pub const NAME: &'static str` on the struct so renaming
278/// happens in one place.
279/// - The trait method returns `&'static str` because layer
280/// composition is a startup-time event and the name is part of
281/// the binary's identity — runtime-built names would defeat the
282/// stable-key promise.
283///
284/// External `tower::Layer` middleware (e.g. `tower::limit`'s
285/// `ConcurrencyLimitLayer`) wraps through [`WithName`] to
286/// participate in the same channel.
287pub trait NamedLayer {
288 /// Stable, patch-version-stable identifier surfaced through
289 /// [`crate::ChatModel::layer_names`]. See trait doc for the
290 /// naming convention.
291 fn layer_name(&self) -> &'static str;
292}
293
294/// Wraps any [`tower::Layer<S>`] with a static name so external
295/// middleware participates in the [`crate::ChatModel::layer_names`]
296/// introspection channel. The wrapper is transparent at the
297/// `tower::Layer` boundary — `WithName::new("concurrency",
298/// ConcurrencyLimitLayer::new(10)).layer(inner)` produces the same
299/// service the underlying layer would.
300///
301/// First-party entelix layers (`PolicyLayer`, `OtelLayer`)
302/// implement [`NamedLayer`] directly and do **not** need this
303/// wrapper; it exists exclusively for external `tower` middleware.
304#[derive(Clone, Copy, Debug)]
305pub struct WithName<L> {
306 name: &'static str,
307 inner: L,
308}
309
310impl<L> WithName<L> {
311 /// Stamp `inner` with the diagnostic name `name`. See
312 /// [`NamedLayer`]'s trait doc for the snake_case convention.
313 pub const fn new(name: &'static str, inner: L) -> Self {
314 Self { name, inner }
315 }
316
317 /// Borrow the underlying layer.
318 #[must_use]
319 pub const fn inner(&self) -> &L {
320 &self.inner
321 }
322}
323
324impl<L> NamedLayer for WithName<L> {
325 fn layer_name(&self) -> &'static str {
326 self.name
327 }
328}
329
330impl<L, S> tower::Layer<S> for WithName<L>
331where
332 L: tower::Layer<S>,
333{
334 type Service = L::Service;
335
336 fn layer(&self, inner: S) -> Self::Service {
337 self.inner.layer(inner)
338 }
339}