Skip to main content

entelix_core/
service.rs

1//! `tower::Service` spine for entelix model + tool calls.
2//!
3//! Two invocation types — [`ModelInvocation`] (request + ctx for a
4//! `Codec`/`Transport` model call) and [`ToolInvocation`] (name +
5//! input + ctx for a `Tool` execution) — flow through a layered
6//! `tower::Service<Request, Response = ...>` stack. Cross-cutting
7//! concerns (PII redaction, rate limiting, cost metering, OTel
8//! observability) are `tower::Layer<S>` middleware. The composition
9//! primitive is `tower::ServiceBuilder`; the dyn-erased handle is
10//! [`BoxedModelService`] / [`BoxedToolService`].
11//!
12//! ## Why `tower::Service` rather than a bespoke `Hook` trait
13//!
14//! `tower` is the de-facto Rust async-middleware ecosystem
15//! (`axum`, `tonic`, `reqwest-middleware`, `tower-http`). Adopting
16//! it means:
17//!
18//! - `ServiceBuilder::new().layer(PolicyLayer).layer(OtelLayer)
19//!   .service(model)` is the composition contract — `model` is a
20//!   [`ChatModel`](crate::ChatModel)-produced leaf service.
21//! - `Service::poll_ready` gives backpressure for free; layers like
22//!   `tower::limit::ConcurrencyLimitLayer`,
23//!   `tower::retry::RetryLayer`, and `tower::timeout::TimeoutLayer`
24//!   plug in unchanged.
25//! - The same layer (e.g. `PolicyLayer`) wraps both model calls and
26//!   tool calls because it has separate `Service<ModelInvocation>`
27//!   and `Service<ToolInvocation>` impls behind the same struct.
28
29use std::sync::Arc;
30use std::task::{Context, Poll};
31
32use futures::future::BoxFuture;
33use serde_json::Value;
34use tower::Service;
35use tower::util::BoxCloneService;
36
37use crate::codecs::BoxDeltaStream;
38use crate::context::ExecutionContext;
39use crate::error::Error;
40use crate::ir::{ModelRequest, ModelResponse};
41use crate::tools::ToolMetadata;
42
43/// One model call's full request + request-scope context. Layers
44/// read both fields; the leaf service consumes them.
45#[derive(Clone, Debug)]
46pub struct ModelInvocation {
47    /// Provider-neutral model request the codec will encode.
48    pub request: ModelRequest,
49    /// Request-scope state (cancellation, deadline, tenant, thread).
50    pub ctx: ExecutionContext,
51}
52
53impl ModelInvocation {
54    /// Bundle `request` + `ctx` into one invocation.
55    pub const fn new(request: ModelRequest, ctx: ExecutionContext) -> Self {
56        Self { request, ctx }
57    }
58}
59
60/// Streaming-side counterpart to [`ModelInvocation`] — the same
61/// `request + ctx` payload but a distinct request type so the
62/// `tower::Service` trait's associated `Response` can resolve to
63/// [`ModelStream`] for the streaming spine while [`ModelInvocation`]
64/// keeps resolving to [`ModelResponse`] for the one-shot spine.
65///
66/// Rust's `Service<Request>` carries `Response` as an associated
67/// type — one trait impl per `(Self, Request)` pair. The wrapper
68/// here is the cleanest way to expose two response types from one
69/// leaf service: the same `InnerChatModel<C, T>` implements
70/// `Service<ModelInvocation, Response = ModelResponse>` and
71/// `Service<StreamingModelInvocation, Response = ModelStream>`,
72/// and layers stack onto each independently.
73///
74/// `#[non_exhaustive]` to keep room for streaming-only knobs
75/// (chunk size hints, partial-output buffers) post-1.0 without
76/// breaking callers.
77#[derive(Clone, Debug)]
78#[non_exhaustive]
79pub struct StreamingModelInvocation {
80    /// The wrapped one-shot invocation. Streaming layers read
81    /// `request` and `ctx` through this — same fields, same
82    /// semantics, different `Service` shape.
83    pub inner: ModelInvocation,
84}
85
86impl StreamingModelInvocation {
87    /// Wrap a [`ModelInvocation`].
88    #[must_use]
89    pub const fn new(inner: ModelInvocation) -> Self {
90        Self { inner }
91    }
92
93    /// Borrow the request — read-side shortcut so layers don't
94    /// have to write `invocation.inner.request`.
95    #[must_use]
96    pub const fn request(&self) -> &ModelRequest {
97        &self.inner.request
98    }
99
100    /// Borrow the context — read-side shortcut.
101    #[must_use]
102    pub const fn ctx(&self) -> &ExecutionContext {
103        &self.inner.ctx
104    }
105}
106
107impl From<ModelInvocation> for StreamingModelInvocation {
108    fn from(inner: ModelInvocation) -> Self {
109        Self::new(inner)
110    }
111}
112
113/// One tool call's identifier + descriptor + input + request-scope
114/// context.
115///
116/// `tool_use_id` carries the IR's stable id so observability layers
117/// can correlate `ToolStart` / `ToolComplete` / `ToolError` events
118/// for the *same* dispatch even when several parallel calls share
119/// the same tool name. `metadata` is the dispatched tool's full
120/// declarative descriptor — name / version / effect / idempotent /
121/// retry hint flow through the layer stack from a single source so
122/// `OtelLayer`, `PolicyLayer`, and retry middleware see one
123/// authoritative struct. Layers may mutate `input` (e.g. PII
124/// redaction).
125#[derive(Clone, Debug)]
126pub struct ToolInvocation {
127    /// Stable tool-use id matching the originating
128    /// `ContentPart::ToolUse::id`. Empty when the call did not
129    /// originate from a model `ToolUse` block (e.g. recipe-driven
130    /// direct dispatch); observability layers fall back to
131    /// `metadata.name` in that case.
132    pub tool_use_id: String,
133    /// Full declarative descriptor of the tool being dispatched —
134    /// shared via `Arc` so layers don't pay a clone per pass.
135    pub metadata: Arc<ToolMetadata>,
136    /// JSON input payload.
137    pub input: Value,
138    /// Request-scope state.
139    pub ctx: ExecutionContext,
140}
141
142impl ToolInvocation {
143    /// Bundle the fields.
144    pub const fn new(
145        tool_use_id: String,
146        metadata: Arc<ToolMetadata>,
147        input: Value,
148        ctx: ExecutionContext,
149    ) -> Self {
150        Self {
151            tool_use_id,
152            metadata,
153            input,
154            ctx,
155        }
156    }
157
158    /// Tool name (shortcut for `self.metadata.name.as_str()`).
159    #[must_use]
160    pub fn name(&self) -> &str {
161        &self.metadata.name
162    }
163
164    /// Tool version (shortcut for `self.metadata.version.as_deref()`).
165    #[must_use]
166    pub fn version(&self) -> Option<&str> {
167        self.metadata.version.as_deref()
168    }
169}
170
171/// Type-erased, cloneable `Service<ModelInvocation>` handle. The
172/// canonical pre-composed shape `ChatModel` exposes via its
173/// `service()` accessor and that user code stores on agents.
174pub type BoxedModelService = BoxCloneService<ModelInvocation, ModelResponse, Error>;
175
176/// Type-erased, cloneable `Service<ToolInvocation>` handle. Tool
177/// dispatch funnels through this; `ToolRegistry` builds it on
178/// demand from a registered `Tool` + the registry's layer stack.
179pub type BoxedToolService = BoxCloneService<ToolInvocation, Value, Error>;
180
181/// Streaming dispatch result returned by
182/// `Service<ModelInvocation, Response = ModelStream>` — the
183/// caller-visible delta stream paired with a future that resolves
184/// to the aggregated terminal response.
185///
186/// The [`Self::stream`] field carries the raw `StreamDelta` flow
187/// (text chunks, tool-use boundaries, usage, rate-limit, warnings,
188/// terminal `Stop`). The [`Self::completion`] future resolves to
189/// `Ok(ModelResponse)` after the stream has been fully consumed
190/// AND a `StreamAggregator` has reconstructed the final response;
191/// it resolves to `Err(...)` if the stream errored mid-flight, was
192/// dropped before terminal `Stop`, or violated the aggregator's
193/// protocol invariants.
194///
195/// Layers (`OtelLayer`, `PolicyLayer`) wrap `completion` to emit
196/// observability / cost events on the **`Ok` branch only** —
197/// invariant 12. A stream that errors mid-flight surfaces the
198/// error through the consumer's stream-side `Err` *and* through
199/// `completion` resolving to `Err`; either way, no cost charge
200/// fires.
201///
202/// `completion` is internally driven by the same stream
203/// `stream` carries — consumers do not need to poll it
204/// separately. The aggregator runs as the consumer drains the
205/// stream; `completion` resolves naturally when the consumer
206/// reads the terminal `Stop` (or drops the stream early, in which
207/// case `completion` resolves `Err`).
208pub struct ModelStream {
209    /// Raw delta stream surfaced to the caller. The wrapper
210    /// produced by `entelix_core::stream::tap_aggregator` taps
211    /// each delta into a `StreamAggregator` as it flows past, so
212    /// the caller sees an unmodified stream while
213    /// [`Self::completion`] receives the aggregated final response
214    /// without a second pass.
215    pub stream: BoxDeltaStream<'static>,
216    /// Future resolving to the aggregated `ModelResponse` after
217    /// the stream has been consumed to its terminal `Stop`. Layers
218    /// wrap this future to gate observability emission on success
219    /// (invariant 12). Consumers that ignore the streaming-side
220    /// completion (e.g. wire it into a fire-and-forget OTel layer)
221    /// do not need to await it directly — dropping the
222    /// `ModelStream` is the canonical "I'm done" signal that lets
223    /// any wrapping layer observe stream-completion regardless of
224    /// whether the consumer polled `completion` itself.
225    pub completion: BoxFuture<'static, Result<ModelResponse, Error>>,
226}
227
228impl std::fmt::Debug for ModelStream {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        f.debug_struct("ModelStream")
231            .field("stream", &"<BoxDeltaStream>")
232            .field("completion", &"<BoxFuture<Result<ModelResponse>>>")
233            .finish()
234    }
235}
236
237/// Type-erased, cloneable `Service<StreamingModelInvocation,
238/// Response = ModelStream>` handle. Parallel to
239/// [`BoxedModelService`] for the streaming dispatch path.
240/// `ChatModel::streaming_service()` produces this; `OtelLayer` /
241/// `PolicyLayer` wrap it the same way they wrap
242/// [`BoxedModelService`] for the one-shot path.
243pub type BoxedStreamingService = BoxCloneService<StreamingModelInvocation, ModelStream, Error>;
244
245/// Convenience: an always-ready `Service` whose `poll_ready` returns
246/// `Poll::Ready(Ok(()))` unconditionally. Most leaf services have
247/// no internal queue and use this shape; layers inherit
248/// `poll_ready` from their inner service.
249pub trait AlwaysReady<Request>: Service<Request> {
250    /// `poll_ready` impl that's always ready. Call from a leaf
251    /// service's `poll_ready` body.
252    #[inline]
253    fn poll_ready_always(_cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
254        Poll::Ready(Ok(()))
255    }
256}
257
258impl<S, Request> AlwaysReady<Request> for S where S: Service<Request> {}
259
260/// Static identity for a [`tower::Layer`] participant in
261/// [`crate::ChatModel::layer`] / `ToolRegistry::layer` composition.
262///
263/// [`crate::ChatModel::layer_names`] walks the composed stack and
264/// surfaces the names through the typed introspection channel —
265/// diagnostic dashboards read `entelix.chat_model.layers` without
266/// parsing prose `Debug` output, conditional-wiring code asserts the
267/// expected stack at boot ("did my staging `OtelLayer` actually
268/// compose in?"), and audit trails distinguish runs whose policy
269/// wiring drifted at deploy time.
270///
271/// **Conventions** (patch-version-stable — renaming is a breaking
272/// change for dashboards keyed off the value):
273///
274/// - `snake_case` ASCII bucketing the layer's primary role
275///   (`"policy"`, `"otel"`, `"retry"`, `"approval"`).
276/// - One canonical name per concrete layer struct, surfaced as
277///   `pub const NAME: &'static str` on the struct so renaming
278///   happens in one place.
279/// - The trait method returns `&'static str` because layer
280///   composition is a startup-time event and the name is part of
281///   the binary's identity — runtime-built names would defeat the
282///   stable-key promise.
283///
284/// External `tower::Layer` middleware (e.g. `tower::limit`'s
285/// `ConcurrencyLimitLayer`) wraps through [`WithName`] to
286/// participate in the same channel.
287pub trait NamedLayer {
288    /// Stable, patch-version-stable identifier surfaced through
289    /// [`crate::ChatModel::layer_names`]. See trait doc for the
290    /// naming convention.
291    fn layer_name(&self) -> &'static str;
292}
293
294/// Wraps any [`tower::Layer<S>`] with a static name so external
295/// middleware participates in the [`crate::ChatModel::layer_names`]
296/// introspection channel. The wrapper is transparent at the
297/// `tower::Layer` boundary — `WithName::new("concurrency",
298/// ConcurrencyLimitLayer::new(10)).layer(inner)` produces the same
299/// service the underlying layer would.
300///
301/// First-party entelix layers (`PolicyLayer`, `OtelLayer`)
302/// implement [`NamedLayer`] directly and do **not** need this
303/// wrapper; it exists exclusively for external `tower` middleware.
304#[derive(Clone, Copy, Debug)]
305pub struct WithName<L> {
306    name: &'static str,
307    inner: L,
308}
309
310impl<L> WithName<L> {
311    /// Stamp `inner` with the diagnostic name `name`. See
312    /// [`NamedLayer`]'s trait doc for the snake_case convention.
313    pub const fn new(name: &'static str, inner: L) -> Self {
314        Self { name, inner }
315    }
316
317    /// Borrow the underlying layer.
318    #[must_use]
319    pub const fn inner(&self) -> &L {
320        &self.inner
321    }
322}
323
324impl<L> NamedLayer for WithName<L> {
325    fn layer_name(&self) -> &'static str {
326        self.name
327    }
328}
329
330impl<L, S> tower::Layer<S> for WithName<L>
331where
332    L: tower::Layer<S>,
333{
334    type Service = L::Service;
335
336    fn layer(&self, inner: S) -> Self::Service {
337        self.inner.layer(inner)
338    }
339}