Skip to main content

entelix_core/
chat.rs

1//! `ChatModel` — composes a `Codec` and a `Transport` into a layered
2//! `tower::Service<ModelInvocation, Response = ModelResponse>` plus
3//! a streaming surface.
4//!
5//! The composition is two-tiered:
6//!
7//! - An internal leaf service performs the raw
8//!   `codec.encode → transport.send → codec.decode` round trip,
9//!   implementing `tower::Service<ModelInvocation>`.
10//! - [`ChatModel<C, T>`] is the user-facing builder. It owns the
11//!   leaf inner service and a layer stack assembled via
12//!   [`ChatModel::layer`]. The composed `tower::Service` materialises
13//!   on each [`ChatModel::complete_full`] call.
14//!
15//! Cross-cutting concerns (PII redaction, quota gates, cost
16//! metering, OTel observability) live as `tower::Layer<S>` types in
17//! sibling crates (`entelix-policy::PolicyLayer`,
18//! `entelix-otel::OtelLayer`). Same layer wraps both this surface
19//! and [`crate::tools::ToolRegistry`] dispatch — one composition
20//! primitive across the whole agent stack.
21
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use tower::util::BoxCloneService;
28use tower::{Layer, Service, ServiceExt};
29
30use crate::codecs::{BoxDeltaStream, Codec};
31use crate::context::ExecutionContext;
32use crate::error::{Error, Result};
33use crate::ir::{
34    ContentPart, JsonSchemaSpec, Message, ModelRequest, ModelResponse, ReasoningEffort,
35    ResponseFormat, Role, SystemPrompt, ToolChoice, ToolSpec,
36};
37use crate::overrides::{RequestOverrides, RunOverrides};
38use crate::service::{
39    BoxedModelService, BoxedStreamingService, ModelInvocation, ModelStream, NamedLayer,
40    StreamingModelInvocation,
41};
42use crate::stream::{StreamDelta, tap_aggregator};
43use crate::transports::Transport;
44
45/// Patch `request` with any [`RunOverrides`] and [`RequestOverrides`]
46/// attached to `ctx`. Both `complete_full` and `stream_deltas` route
47/// through this helper so the override semantics stay identical
48/// across the two surfaces.
49///
50/// `RunOverrides` patches agent-loop-owned fields (`model`,
51/// `system`); `RequestOverrides` patches `ModelRequest`-shaped
52/// sampling knobs (`temperature`, `top_p`, `max_tokens`,
53/// `stop_sequences`, `reasoning_effort`, `tool_choice`,
54/// `response_format`). Either, both, or neither may be present.
55fn apply_overrides(request: &mut ModelRequest, ctx: &ExecutionContext) {
56    if let Some(run) = ctx.extension::<RunOverrides>() {
57        if let Some(model) = run.model() {
58            model.clone_into(&mut request.model);
59        }
60        if let Some(system) = run.system_prompt() {
61            request.system = system.clone();
62        }
63        if let Some(specs) = run.tool_specs() {
64            request.tools = Arc::clone(specs);
65        }
66    }
67    if let Some(req) = ctx.extension::<RequestOverrides>() {
68        if let Some(t) = req.temperature() {
69            request.temperature = Some(t);
70        }
71        if let Some(p) = req.top_p() {
72            request.top_p = Some(p);
73        }
74        if let Some(k) = req.top_k() {
75            request.top_k = Some(k);
76        }
77        if let Some(n) = req.max_tokens() {
78            request.max_tokens = Some(n);
79        }
80        if let Some(sequences) = req.stop_sequences() {
81            request.stop_sequences = sequences.to_vec();
82        }
83        if let Some(effort) = req.reasoning_effort() {
84            request.reasoning_effort = Some(effort.clone());
85        }
86        if let Some(choice) = req.tool_choice() {
87            request.tool_choice = choice.clone();
88        }
89        if let Some(format) = req.response_format() {
90            request.response_format = Some(format.clone());
91        }
92        if let Some(parallel) = req.parallel_tool_calls() {
93            request.parallel_tool_calls = Some(parallel);
94        }
95        if let Some(user_id) = req.end_user_id() {
96            request.end_user_id = Some(user_id.to_owned());
97        }
98        if let Some(seed) = req.seed() {
99            request.seed = Some(seed);
100        }
101    }
102}
103
104/// Builder-side configuration that flows into every `ModelRequest`
105/// the `ChatModel` issues. Stored separately from the leaf service
106/// so layers / streaming / future surfaces share one source of
107/// truth.
108///
109/// Fields are private. Construct via [`ChatModelConfig::new`];
110/// mutate through [`ChatModel`]'s `with_*` setters; inspect through
111/// the bare accessors on this type. `#[non_exhaustive]` plus the
112/// privatised fields together mean post-1.0 additions ship as MINOR
113/// without surprising any downstream consumer.
114#[derive(Clone, Debug)]
115#[non_exhaustive]
116pub struct ChatModelConfig {
117    model: String,
118    max_tokens: Option<u32>,
119    system: SystemPrompt,
120    temperature: Option<f32>,
121    top_p: Option<f32>,
122    top_k: Option<u32>,
123    stop_sequences: Vec<String>,
124    tools: Arc<[ToolSpec]>,
125    tool_choice: ToolChoice,
126    reasoning_effort: Option<ReasoningEffort>,
127    /// `complete_typed<O>` retry budget — schema-mismatch and
128    /// [`crate::OutputValidator`] failures both reflect their hint
129    /// to the model and re-prompt up to `validation_retries` times
130    /// before surfacing [`Error::ModelRetry`] (invariant 20).
131    /// Default `0` (no retry). Distinct from
132    /// [`crate::Error::Provider`]'s transport retries (handled by
133    /// `RetryService`).
134    validation_retries: u32,
135    /// Operator-supplied token counter for pre-flight budget checks
136    /// and content-economy estimation. `None` (the default) means
137    /// the SDK relies on the vendor's post-flight `Usage` block;
138    /// pre-flight enforcement (refusing a call that would exceed
139    /// the configured `RunBudget` ceiling before sending) requires
140    /// an explicit counter. Concrete counters ship as companion
141    /// crates (`entelix-tokenizer-tiktoken`,
142    /// `entelix-tokenizer-hf`, locale-aware companions);
143    /// [`crate::ByteCountTokenCounter`] is the zero-dependency
144    /// English-biased default.
145    token_counter: Option<std::sync::Arc<dyn crate::tokens::TokenCounter>>,
146}
147
148impl ChatModelConfig {
149    /// Build a fresh config seeded with `model` and otherwise
150    /// defaulted.
151    #[must_use]
152    pub fn new(model: impl Into<String>) -> Self {
153        Self {
154            model: model.into(),
155            max_tokens: None,
156            system: SystemPrompt::default(),
157            temperature: None,
158            top_p: None,
159            top_k: None,
160            stop_sequences: Vec::new(),
161            tools: Arc::from([]),
162            tool_choice: ToolChoice::default(),
163            reasoning_effort: None,
164            validation_retries: 0,
165            token_counter: None,
166        }
167    }
168
169    /// Borrow the configured token counter, if any. Returns `None`
170    /// when the operator has not wired one — pre-flight budget
171    /// enforcement falls back to vendor `Usage` post-response.
172    #[must_use]
173    pub fn token_counter(&self) -> Option<&std::sync::Arc<dyn crate::tokens::TokenCounter>> {
174        self.token_counter.as_ref()
175    }
176
177    /// `complete_typed<O>` retry budget. Default `0` — the first
178    /// schema-mismatch fail surfaces unchanged. Operators that want
179    /// the loop to reflect the parse error to the model and ask for
180    /// a corrected JSON response set this to `1`–`3`.
181    pub const fn validation_retries(&self) -> u32 {
182        self.validation_retries
183    }
184
185    /// Provider model identifier sent on the wire.
186    pub fn model(&self) -> &str {
187        &self.model
188    }
189
190    /// Per-call `max_tokens` cap (`None` = vendor default).
191    pub const fn max_tokens(&self) -> Option<u32> {
192        self.max_tokens
193    }
194
195    /// System-prompt blocks prepended to every call. Supports
196    /// per-block prompt caching (Anthropic / Bedrock Converse).
197    pub const fn system(&self) -> &SystemPrompt {
198        &self.system
199    }
200
201    /// Sampling temperature.
202    pub const fn temperature(&self) -> Option<f32> {
203        self.temperature
204    }
205
206    /// Nucleus-sampling parameter.
207    pub const fn top_p(&self) -> Option<f32> {
208        self.top_p
209    }
210
211    /// Top-k sampling parameter (`None` ⇒ vendor default). Codec
212    /// support follows the IR mapping documented on
213    /// [`crate::ir::ModelRequest::top_k`].
214    pub const fn top_k(&self) -> Option<u32> {
215        self.top_k
216    }
217
218    /// Stop sequences.
219    pub fn stop_sequences(&self) -> &[String] {
220        &self.stop_sequences
221    }
222
223    /// Advertised tools.
224    pub fn tools(&self) -> &[ToolSpec] {
225        &self.tools
226    }
227
228    /// Tool-choice mode.
229    pub const fn tool_choice(&self) -> &ToolChoice {
230        &self.tool_choice
231    }
232
233    /// Cross-vendor reasoning-effort knob (`None` ⇒ vendor default).
234    /// Codecs translate onto their native wire shape per the
235    /// per-vendor mapping documented on
236    /// [`crate::ir::ReasoningEffort`]; lossy approximations emit
237    /// `ModelWarning::LossyEncode`.
238    pub const fn reasoning_effort(&self) -> Option<&ReasoningEffort> {
239        self.reasoning_effort.as_ref()
240    }
241
242    /// Combine config with caller-supplied messages into a full
243    /// [`ModelRequest`]. Only the fields the config carries are
244    /// projected; per-request knobs and provider extensions stay at
245    /// their `Default` (i.e. unset) and flow in via `RequestOverrides`
246    /// or direct `ExecutionContext::add_extension` instead.
247    #[must_use]
248    pub fn build_request(&self, messages: Vec<Message>) -> ModelRequest {
249        ModelRequest {
250            model: self.model.clone(),
251            messages,
252            system: self.system.clone(),
253            max_tokens: self.max_tokens,
254            temperature: self.temperature,
255            top_p: self.top_p,
256            top_k: self.top_k,
257            stop_sequences: self.stop_sequences.clone(),
258            tools: Arc::clone(&self.tools),
259            tool_choice: self.tool_choice.clone(),
260            reasoning_effort: self.reasoning_effort.clone(),
261            ..ModelRequest::default()
262        }
263    }
264}
265
266/// Leaf service: raw `codec.encode → transport.send → codec.decode`.
267/// `tower::Service<ModelInvocation, Response = ModelResponse>`.
268/// Cloning is cheap.
269///
270/// Internal composition primitive — users compose at the
271/// [`ChatModel`] level. Exposing the leaf service publicly would
272/// invite callers to bypass the layer stack and miss
273/// observability / policy / cost middleware.
274pub(crate) struct InnerChatModel<C: Codec, T: Transport> {
275    codec: Arc<C>,
276    transport: Arc<T>,
277}
278
279impl<C: Codec, T: Transport> Clone for InnerChatModel<C, T> {
280    fn clone(&self) -> Self {
281        Self {
282            codec: Arc::clone(&self.codec),
283            transport: Arc::clone(&self.transport),
284        }
285    }
286}
287
288impl<C: Codec, T: Transport> InnerChatModel<C, T> {
289    /// Wrap shared `Arc`s.
290    pub(crate) const fn from_arc(codec: Arc<C>, transport: Arc<T>) -> Self {
291        Self { codec, transport }
292    }
293
294    /// Borrow the codec.
295    pub(crate) fn codec(&self) -> &C {
296        &self.codec
297    }
298
299    /// Borrow the transport.
300    pub(crate) fn transport(&self) -> &T {
301        &self.transport
302    }
303}
304
305impl<C: Codec + 'static, T: Transport + 'static> Service<ModelInvocation> for InnerChatModel<C, T> {
306    type Response = ModelResponse;
307    type Error = Error;
308    type Future = BoxFuture<'static, Result<ModelResponse>>;
309
310    #[inline]
311    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
312        Poll::Ready(Ok(()))
313    }
314
315    fn call(&mut self, invocation: ModelInvocation) -> Self::Future {
316        let codec = Arc::clone(&self.codec);
317        let transport = Arc::clone(&self.transport);
318        Box::pin(async move {
319            let ModelInvocation { request, ctx } = invocation;
320            let encoded = codec.encode(&request)?;
321            let warnings = encoded.warnings.clone();
322            let response = transport.send(encoded, &ctx).await?;
323            if !(200..300).contains(&response.status) {
324                let body_text = String::from_utf8_lossy(&response.body).into_owned();
325                let mut err = Error::provider_http(response.status, body_text);
326                if let Some(after) =
327                    crate::transports::parse_retry_after(response.headers.get("retry-after"))
328                {
329                    err = err.with_retry_after(after);
330                }
331                return Err(err);
332            }
333            let rate_limit = codec.extract_rate_limit(&response.headers);
334            let mut decoded = codec.decode(&response.body, warnings)?;
335            decoded.rate_limit = rate_limit;
336            Ok(decoded)
337        })
338    }
339}
340
341impl<C: Codec + 'static, T: Transport + 'static> Service<StreamingModelInvocation>
342    for InnerChatModel<C, T>
343{
344    type Response = ModelStream;
345    type Error = Error;
346    type Future = BoxFuture<'static, Result<ModelStream>>;
347
348    #[inline]
349    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
350        Poll::Ready(Ok(()))
351    }
352
353    fn call(&mut self, invocation: StreamingModelInvocation) -> Self::Future {
354        let codec = Arc::clone(&self.codec);
355        let transport = Arc::clone(&self.transport);
356        Box::pin(async move {
357            let StreamingModelInvocation {
358                inner: ModelInvocation { request, ctx },
359            } = invocation;
360            let encoded = codec.encode_streaming(&request)?;
361            let warnings = encoded.warnings.clone();
362            let stream = transport.send_streaming(encoded, &ctx).await?;
363            if !(200..300).contains(&stream.status) {
364                let mut buf = Vec::new();
365                let mut body = stream.body;
366                while let Some(chunk) = body.next().await {
367                    if let Ok(b) = chunk {
368                        buf.extend_from_slice(&b);
369                    }
370                }
371                let body_text = String::from_utf8_lossy(&buf).into_owned();
372                let mut err = Error::provider_http(stream.status, body_text);
373                if let Some(after) =
374                    crate::transports::parse_retry_after(stream.headers.get("retry-after"))
375                {
376                    err = err.with_retry_after(after);
377                }
378                return Err(err);
379            }
380            let rate_limit = codec.extract_rate_limit(&stream.headers);
381            // The codec's `decode_stream` signature borrows
382            // `&'a self`, so we tie the borrow's lifetime to the
383            // owned `Arc<C>` by capturing it inside an
384            // `async_stream::stream!` generator — the generator
385            // becomes a `'static` future state that owns the Arc
386            // for as long as the resulting stream lives. Without
387            // this re-anchoring, the returned `BoxDeltaStream<'a>`
388            // would borrow from a stack-local `codec` binding
389            // that drops at the end of `call`.
390            let codec_for_stream = Arc::clone(&codec);
391            // The Rust 2024 tail-expr-drop-order change is benign
392            // here — the `async_stream::stream!` block holds a
393            // pinned mutable iterator (`inner`) over the codec's
394            // decode pipeline; whether the temporaries inside drop
395            // at end-of-block (Edition 2021) or end-of-statement
396            // (Edition 2024) does not change observable
397            // semantics, since `inner` is fully consumed before
398            // the block returns.
399            #[allow(tail_expr_drop_order)]
400            let codec_stream: BoxDeltaStream<'static> = Box::pin(async_stream::stream! {
401                let inner = codec_for_stream.decode_stream(stream.body, warnings);
402                futures::pin_mut!(inner);
403                while let Some(delta) = inner.next().await {
404                    yield delta;
405                }
406            });
407            // Prepend a synthetic `RateLimit` delta when the codec
408            // could parse one from the response headers — the
409            // aggregator captures it the same way it would a
410            // mid-stream `Usage` delta, so the final
411            // `ModelResponse` carries the snapshot operators
412            // expect on `ModelResponse::rate_limit`.
413            let prefixed: BoxDeltaStream<'static> = match rate_limit {
414                Some(snapshot) => {
415                    let prepend = futures::stream::iter(vec![Ok(StreamDelta::RateLimit(snapshot))]);
416                    Box::pin(prepend.chain(codec_stream))
417                }
418                None => codec_stream,
419            };
420            Ok(tap_aggregator(prefixed))
421        })
422    }
423}
424
425/// Boxed factory: `InnerChatModel` → layered [`BoxedModelService`].
426/// Stored on `ChatModel` so the type of the (potentially deeply
427/// nested) layer stack stays opaque to users; one concrete
428/// `ChatModel<C, T>` shape regardless of how many layers are
429/// attached.
430type LayerFactory<C, T> = Arc<dyn Fn(InnerChatModel<C, T>) -> BoxedModelService + Send + Sync>;
431
432/// Boxed factory for the streaming-side spine. Parallel to
433/// [`LayerFactory`]; layers attached via [`ChatModel::layer`] are
434/// stacked onto this factory the same way they're stacked onto
435/// the one-shot factory, so a single `.layer(OtelLayer::new(...))`
436/// call wraps both spines with one observability stack.
437type StreamingLayerFactory<C, T> =
438    Arc<dyn Fn(InnerChatModel<C, T>) -> BoxedStreamingService + Send + Sync>;
439
440/// Configurable chat model — codec + transport + layer stack.
441///
442/// Cheap to clone (handles are `Arc`-backed). Builder-style
443/// configuration via `with_*` methods; layer attachment via
444/// [`Self::layer`].
445pub struct ChatModel<C: Codec + 'static, T: Transport + 'static> {
446    inner: InnerChatModel<C, T>,
447    config: ChatModelConfig,
448    factory: Option<LayerFactory<C, T>>,
449    streaming_factory: Option<StreamingLayerFactory<C, T>>,
450    /// Diagnostic stack — names captured at [`Self::layer`] compose
451    /// time, in insertion order (innermost composed first → outermost
452    /// composed last). The wrapped service stacks last-composed
453    /// outermost, so this `Vec` reads bottom-to-top relative to
454    /// request flow.
455    layer_names: Vec<&'static str>,
456}
457
458impl<C: Codec + 'static, T: Transport + 'static> Clone for ChatModel<C, T> {
459    fn clone(&self) -> Self {
460        Self {
461            inner: self.inner.clone(),
462            config: self.config.clone(),
463            factory: self.factory.clone(),
464            streaming_factory: self.streaming_factory.clone(),
465            layer_names: self.layer_names.clone(),
466        }
467    }
468}
469
470impl<C: Codec + 'static, T: Transport + 'static> ChatModel<C, T> {
471    /// Build a model bundling owned codec + transport + model name.
472    pub fn new(codec: C, transport: T, model: impl Into<String>) -> Self {
473        Self::from_arc(Arc::new(codec), Arc::new(transport), model)
474    }
475
476    /// Build from already-shared `Arc`s — useful when many models
477    /// share one transport / codec instance.
478    pub fn from_arc(codec: Arc<C>, transport: Arc<T>, model: impl Into<String>) -> Self {
479        Self {
480            inner: InnerChatModel::from_arc(codec, transport),
481            config: ChatModelConfig::new(model),
482            factory: None,
483            streaming_factory: None,
484            layer_names: Vec::new(),
485        }
486    }
487
488    /// Set per-call `max_tokens`.
489    #[must_use]
490    pub const fn with_max_tokens(mut self, n: u32) -> Self {
491        self.config.max_tokens = Some(n);
492        self
493    }
494
495    /// Attach a system / instruction prompt. Convenience
496    /// shorthand for a single-block uncached prompt — for
497    /// multi-block or cached prompts, set `config.system` to a
498    /// pre-built [`SystemPrompt`] directly.
499    #[must_use]
500    pub fn with_system(mut self, s: impl Into<String>) -> Self {
501        self.config.system = SystemPrompt::text(s);
502        self
503    }
504
505    /// Attach a pre-built [`SystemPrompt`] — supports multi-block
506    /// and per-block cache control.
507    #[must_use]
508    pub fn with_system_prompt(mut self, prompt: SystemPrompt) -> Self {
509        self.config.system = prompt;
510        self
511    }
512
513    /// Set sampling temperature.
514    #[must_use]
515    pub const fn with_temperature(mut self, t: f32) -> Self {
516        self.config.temperature = Some(t);
517        self
518    }
519
520    /// Set the [`complete_typed`](Self::complete_typed) validation
521    /// retry budget — number of times the loop reflects a
522    /// schema-mismatch or [`crate::OutputValidator`] failure back to
523    /// the model with corrective hint text before surfacing the
524    /// terminal [`Error::ModelRetry`]. Default `0` (no retry). Each
525    /// retry increments the conversation length by two messages
526    /// (assistant's failed reply + retry prompt). Both retry shapes
527    /// share one budget and route through `Error::ModelRetry`
528    /// (invariant 20).
529    #[must_use]
530    pub const fn with_validation_retries(mut self, n: u32) -> Self {
531        self.config.validation_retries = n;
532        self
533    }
534
535    /// Wire an operator-supplied [`TokenCounter`](crate::TokenCounter)
536    /// for pre-flight budget checks and content-economy estimation.
537    /// Each call replaces the configured counter — the last call wins.
538    ///
539    /// Vendor-accurate counters (tiktoken, HuggingFace, ko-mecab)
540    /// ship as companion crates so the core stays
541    /// zero-dependency. [`crate::ByteCountTokenCounter`] is the
542    /// English-biased zero-dependency default for development
543    /// scaffolding.
544    #[must_use]
545    pub fn with_token_counter(
546        mut self,
547        counter: std::sync::Arc<dyn crate::tokens::TokenCounter>,
548    ) -> Self {
549        self.config.token_counter = Some(counter);
550        self
551    }
552
553    /// Set nucleus sampling parameter.
554    #[must_use]
555    pub const fn with_top_p(mut self, p: f32) -> Self {
556        self.config.top_p = Some(p);
557        self
558    }
559
560    /// Set top-k sampling parameter. Native on Anthropic, Gemini,
561    /// and Bedrock-Anthropic; OpenAI codecs surface as
562    /// `LossyEncode`.
563    #[must_use]
564    pub const fn with_top_k(mut self, k: u32) -> Self {
565        self.config.top_k = Some(k);
566        self
567    }
568
569    /// Append one stop sequence.
570    #[must_use]
571    pub fn with_stop_sequence(mut self, s: impl Into<String>) -> Self {
572        self.config.stop_sequences.push(s.into());
573        self
574    }
575
576    /// Replace the full stop sequences list.
577    #[must_use]
578    pub fn with_stop_sequences(mut self, seqs: Vec<String>) -> Self {
579        self.config.stop_sequences = seqs;
580        self
581    }
582
583    /// Replace the advertised tools list. Accepts anything that
584    /// converts into `Arc<[ToolSpec]>` — `Vec<ToolSpec>` and
585    /// `[ToolSpec; N]` literals both qualify, so caller ergonomics
586    /// match the previous `Vec` shape while per-dispatch
587    /// `build_request` clones become an atomic refcount bump rather
588    /// than a deep walk of every tool's JSON schema.
589    #[must_use]
590    pub fn with_tools(mut self, tools: impl Into<Arc<[ToolSpec]>>) -> Self {
591        self.config.tools = tools.into();
592        self
593    }
594
595    /// Set the tool-choice mode.
596    #[must_use]
597    pub fn with_tool_choice(mut self, c: ToolChoice) -> Self {
598        self.config.tool_choice = c;
599        self
600    }
601
602    /// Set the cross-vendor reasoning-effort knob. Codecs translate
603    /// onto their native wire shape per the table on
604    /// [`crate::ir::ReasoningEffort`] — `Off` / `Minimal` / `Low`
605    /// / `Medium` / `High` / `Auto` snap to vendor buckets, lossy
606    /// approximations emit `ModelWarning::LossyEncode`, and
607    /// `VendorSpecific(s)` passes the literal vendor wire value
608    /// through.
609    #[must_use]
610    pub fn with_reasoning_effort(mut self, effort: ReasoningEffort) -> Self {
611        self.config.reasoning_effort = Some(effort);
612        self
613    }
614
615    /// Append a `tower::Layer` to **both** dispatch spines — the
616    /// one-shot path (`Service<ModelInvocation, Response =
617    /// ModelResponse>`) and the streaming path
618    /// (`Service<StreamingModelInvocation, Response =
619    /// ModelStream>`). Each `.layer(L)` wraps `L` *around* the
620    /// already-composed stack, so the **last-registered layer is
621    /// outermost** (sees the request first, the response last) and
622    /// the first-registered layer sits innermost against the leaf
623    /// `InnerChatModel`.
624    ///
625    /// `PolicyLayer` and `OtelLayer` from the policy / otel
626    /// crates satisfy both spines, so a single `.layer(...)` call
627    /// wraps the agent's complete dispatch fan-out:
628    ///
629    /// - one-shot: `Service<ModelInvocation, Response = ModelResponse>`
630    /// - streaming: `Service<StreamingModelInvocation, Response = ModelStream>`
631    /// - tool dispatch (separate, on `ToolRegistry::layer`):
632    ///   `Service<ToolInvocation, Response = Value>`
633    ///
634    /// The streaming-side layer wraps the [`ModelStream`]'s
635    /// `completion` future so observability events (cost,
636    /// latency, span close) fire only on the `Ok` branch — a
637    /// stream that errors mid-flight surfaces the error and
638    /// emits no charge (invariant 12).
639    ///
640    /// Layers that legitimately apply only to the one-shot spine
641    /// (e.g. `RetryLayer` — retrying a partially-streamed
642    /// response is meaningless) implement a pass-through
643    /// `Layer<BoxedStreamingService>` so they satisfy the
644    /// constraint without affecting streaming dispatch.
645    ///
646    /// ## Introspection
647    ///
648    /// Every layer is recorded in [`Self::layer_names`] at compose
649    /// time via [`NamedLayer::layer_name`]. First-party entelix
650    /// layers (`PolicyLayer`, `OtelLayer`) implement
651    /// [`NamedLayer`] directly; external `tower::Layer` middleware
652    /// wraps through [`crate::service::WithName`] to participate:
653    ///
654    /// ```ignore
655    /// use entelix_core::WithName;
656    ///
657    /// chat_model
658    ///     .layer(PolicyLayer::new(registry))
659    ///     .layer(OtelLayer::new("anthropic"))
660    ///     .layer(WithName::new("concurrency", tower::limit::ConcurrencyLimitLayer::new(10)));
661    /// // chat_model.layer_names() == ["policy", "otel", "concurrency"]
662    /// ```
663    #[must_use]
664    pub fn layer<L>(mut self, layer: L) -> Self
665    where
666        L: Layer<BoxedModelService>
667            + Layer<BoxedStreamingService>
668            + NamedLayer
669            + Clone
670            + Send
671            + Sync
672            + 'static,
673        <L as Layer<BoxedModelService>>::Service: Service<ModelInvocation, Response = ModelResponse, Error = Error>
674            + Clone
675            + Send
676            + 'static,
677        <<L as Layer<BoxedModelService>>::Service as Service<ModelInvocation>>::Future:
678            Send + 'static,
679        <L as Layer<BoxedStreamingService>>::Service: Service<StreamingModelInvocation, Response = ModelStream, Error = Error>
680            + Clone
681            + Send
682            + 'static,
683        <<L as Layer<BoxedStreamingService>>::Service as Service<StreamingModelInvocation>>::Future:
684            Send + 'static,
685    {
686        // Capture the static name BEFORE the type-erasing closure
687        // swallows `L`. The factory chain (Arc<dyn Fn(InnerChatModel)
688        // -> BoxedModelService>) discards concrete layer types, so
689        // introspection has to happen here at compose time.
690        self.layer_names.push(layer.layer_name());
691
692        let prev = self.factory.take();
693        let prev_streaming = self.streaming_factory.take();
694        let layer_one_shot = layer.clone();
695        let layer_streaming = layer;
696        let new_factory: LayerFactory<C, T> = Arc::new(move |inner: InnerChatModel<C, T>| {
697            let stacked: BoxedModelService = match &prev {
698                Some(prev_factory) => prev_factory(inner),
699                None => BoxCloneService::new(inner),
700            };
701            BoxCloneService::new(<L as Layer<BoxedModelService>>::layer(
702                &layer_one_shot,
703                stacked,
704            ))
705        });
706        let new_streaming: StreamingLayerFactory<C, T> =
707            Arc::new(move |inner: InnerChatModel<C, T>| {
708                let stacked: BoxedStreamingService = match &prev_streaming {
709                    Some(prev_factory) => prev_factory(inner),
710                    None => BoxCloneService::new(inner),
711                };
712                BoxCloneService::new(<L as Layer<BoxedStreamingService>>::layer(
713                    &layer_streaming,
714                    stacked,
715                ))
716            });
717        self.factory = Some(new_factory);
718        self.streaming_factory = Some(new_streaming);
719        self
720    }
721
722    /// Compose a `tower::Layer` whose only difference from a
723    /// first-party entelix layer is the missing [`NamedLayer`]
724    /// impl. Equivalent to `self.layer(WithName::new(name, layer))`
725    /// — the wrapper supplies the identity surfaced through
726    /// [`Self::layer_names`].
727    ///
728    /// Use this for external middleware (`tower::limit`,
729    /// `tower::timeout`, operator-defined wrappers) without
730    /// implementing [`NamedLayer`] yourself. First-party entelix
731    /// layers already implement [`NamedLayer`] and should reach for
732    /// [`Self::layer`] directly so the canonical role name
733    /// (e.g. `"policy"`, `"otel"`, `"retry"`) ships at the call
734    /// site.
735    #[must_use]
736    pub fn layer_named<L>(self, name: &'static str, layer: L) -> Self
737    where
738        L: Layer<BoxedModelService> + Layer<BoxedStreamingService> + Clone + Send + Sync + 'static,
739        <L as Layer<BoxedModelService>>::Service: Service<ModelInvocation, Response = ModelResponse, Error = Error>
740            + Clone
741            + Send
742            + 'static,
743        <<L as Layer<BoxedModelService>>::Service as Service<ModelInvocation>>::Future:
744            Send + 'static,
745        <L as Layer<BoxedStreamingService>>::Service: Service<StreamingModelInvocation, Response = ModelStream, Error = Error>
746            + Clone
747            + Send
748            + 'static,
749        <<L as Layer<BoxedStreamingService>>::Service as Service<StreamingModelInvocation>>::Future:
750            Send + 'static,
751    {
752        self.layer(crate::service::WithName::new(name, layer))
753    }
754
755    /// Diagnostic snapshot of the composed layer stack, in
756    /// registration order: `[0]` is the first `.layer(...)` call
757    /// (innermost, against the leaf `InnerChatModel`); the last
758    /// element is the most recent registration (outermost, sees
759    /// requests first). Empty for a `ChatModel` with no layers
760    /// composed.
761    ///
762    /// Surfaced for boot-time wiring assertions, debug dashboards,
763    /// and conditional-layer audits. The values are
764    /// [`NamedLayer::layer_name`] outputs — `&'static str` and
765    /// patch-version-stable per the trait's contract.
766    #[must_use]
767    pub fn layer_names(&self) -> &[&'static str] {
768        &self.layer_names
769    }
770
771    /// Borrow the configured codec — exposes its `name()` and
772    /// `capabilities()` for diagnostics.
773    pub fn codec(&self) -> &C {
774        self.inner.codec()
775    }
776
777    /// Borrow the configured transport.
778    pub fn transport(&self) -> &T {
779        self.inner.transport()
780    }
781
782    /// Borrow the configured request shape — `model()`,
783    /// `max_tokens()`, `system()`, `temperature()`, `top_p()`,
784    /// `stop_sequences()`, `tools()`, `tool_choice()` accessors all
785    /// live on [`ChatModelConfig`].
786    pub const fn config(&self) -> &ChatModelConfig {
787        &self.config
788    }
789
790    /// Build the layered [`BoxedModelService`] — used by callers
791    /// who want to drive the service directly (e.g. wrap with
792    /// further `tower::ServiceBuilder` middleware externally).
793    #[must_use]
794    pub fn service(&self) -> BoxedModelService {
795        match &self.factory {
796            Some(factory) => factory(self.inner.clone()),
797            None => BoxCloneService::new(self.inner.clone()),
798        }
799    }
800
801    /// Build the layered [`BoxedStreamingService`] — the streaming
802    /// counterpart to [`Self::service`]. Layers attached via
803    /// [`Self::layer`] wrap this spine the same way they wrap the
804    /// one-shot service; consumers driving the service directly
805    /// (rather than through [`Self::stream_deltas`]) drive
806    /// `Service<StreamingModelInvocation, Response = ModelStream>`.
807    #[must_use]
808    pub fn streaming_service(&self) -> BoxedStreamingService {
809        match &self.streaming_factory {
810            Some(factory) => factory(self.inner.clone()),
811            None => BoxCloneService::new(self.inner.clone()),
812        }
813    }
814
815    /// Send a conversation and return the assistant reply as a single
816    /// [`Message`]. The full pipeline routes through the layer
817    /// stack: each layer's pre-call work runs before encode, each
818    /// layer's post-call work runs after decode.
819    pub async fn complete(
820        &self,
821        messages: Vec<Message>,
822        ctx: &ExecutionContext,
823    ) -> Result<Message> {
824        let response = self.complete_full(messages, ctx).await?;
825        Ok(Message::new(Role::Assistant, response.content))
826    }
827
828    /// Same pipeline as [`Self::complete`], but returns the full
829    /// [`ModelResponse`] — usage, stop reason, codec warnings, and
830    /// the provider rate-limit snapshot when the codec could parse
831    /// one from the response headers.
832    pub async fn complete_full(
833        &self,
834        messages: Vec<Message>,
835        ctx: &ExecutionContext,
836    ) -> Result<ModelResponse> {
837        let budget = ctx.run_budget();
838        if let Some(budget) = &budget {
839            // Pre-call axes — request count cap. Token caps fire
840            // post-decode below.
841            budget.check_pre_request()?;
842        }
843        let mut request = self.config.build_request(messages);
844        apply_overrides(&mut request, ctx);
845        let invocation = ModelInvocation::new(request, ctx.clone());
846        let response = self.service().oneshot(invocation).await?;
847        if let Some(budget) = &budget {
848            // Post-call accumulation — invariant 12 transactional
849            // semantics: only on the `Ok` branch does the budget
850            // see usage. The `?` above ensures the error branch
851            // never reaches this line.
852            budget.observe_usage(&response.usage)?;
853        }
854        Ok(response)
855    }
856
857    /// Send a conversation and return a typed `O` parsed from the
858    /// model's structured-output channel.
859    ///
860    /// The codec emits a `response_format` directive carrying the
861    /// schemars-derived schema for `O`; the dispatch shape (native
862    /// JSON-Schema vs forced tool call) is the codec's
863    /// [`Codec::auto_output_strategy`] for the configured model
864    /// when [`crate::ir::OutputStrategy::Auto`] is selected — see
865    /// [`crate::ir::OutputStrategy`] and for the cross-vendor mapping.
866    /// Operators that need to override the strategy build their
867    /// own [`ResponseFormat`] via
868    /// [`ResponseFormat::with_strategy`] and attach it to the
869    /// request through a custom flow.
870    ///
871    /// On `Native` dispatch, the codec produces a single text
872    /// `ContentPart` whose body parses as `O`. On `Tool` dispatch,
873    /// the codec emits one forced `ContentPart::ToolUse` whose
874    /// `input` is the JSON object the model produced; this method
875    /// extracts the input and parses it as `O`.
876    ///
877    /// `O: JsonSchema + DeserializeOwned + Send + 'static` —
878    /// schemars derives the JSON Schema at call time (zero-cost
879    /// after the first call thanks to schemars' static schema
880    /// caching). Production operators that cache the schema across
881    /// many calls build the [`JsonSchemaSpec`] once and attach it
882    /// to the request directly (no `O` type parameter on the
883    /// caller side).
884    pub async fn complete_typed<O>(
885        &self,
886        messages: Vec<Message>,
887        ctx: &ExecutionContext,
888    ) -> Result<O>
889    where
890        O: schemars::JsonSchema + serde::de::DeserializeOwned + Send + 'static,
891    {
892        self.complete_typed_validated(messages, |_: &O| Ok(()), ctx)
893            .await
894    }
895
896    /// Send a conversation, parse the structured-output response as
897    /// `O`, and run `validator` against the parsed value.
898    ///
899    /// Both failure modes — schema-mismatch (the model emitted JSON
900    /// the deserialiser couldn't bind to `O`) and validator failure
901    /// (the deserialised value broke a semantic invariant) — route
902    /// through one channel: [`crate::Error::ModelRetry`]. The retry
903    /// loop catches the variant, reflects the hint to the model as
904    /// a corrective user message, and re-invokes within the same
905    /// [`ChatModelConfig::validation_retries`](crate::ChatModelConfig::validation_retries)
906    /// budget (invariant 20).
907    ///
908    /// [`Self::complete_typed`] is the no-validator shortcut — it
909    /// calls into this method with an always-`Ok` validator so the
910    /// schema-mismatch retry path stays uniform.
911    ///
912    /// The validator surface is sync ([`crate::OutputValidator::validate`])
913    /// so simple closures (`|out: &O| -> Result<()>`) compose
914    /// without `async-trait` ceremony. Validators that need to
915    /// `.await` (DB lookup, external check) compose around the
916    /// `complete_typed_validated` call boundary instead — run the
917    /// async work after the typed response returns.
918    pub async fn complete_typed_validated<O, V>(
919        &self,
920        messages: Vec<Message>,
921        validator: V,
922        ctx: &ExecutionContext,
923    ) -> Result<O>
924    where
925        O: schemars::JsonSchema + serde::de::DeserializeOwned + Send + 'static,
926        V: crate::output_validator::OutputValidator<O>,
927    {
928        use crate::llm_facing::RenderedForLlm;
929
930        let schema_value = serde_json::to_value(schemars::schema_for!(O)).map_err(Error::Serde)?;
931        let type_name = std::any::type_name::<O>();
932        // Strip the module path so the wire-side `name` is short
933        // and stable (`entelix_core::ir::request::ModelRequest` →
934        // `ModelRequest`); vendors that surface the name in
935        // observability ship a readable string.
936        let short_name = type_name.rsplit("::").next().unwrap_or(type_name);
937        let spec = JsonSchemaSpec::new(short_name, schema_value)?;
938        let format = ResponseFormat::strict(spec);
939
940        let mut conversation = messages;
941        let max_retries = self.config.validation_retries;
942        let mut attempt: u32 = 0;
943        loop {
944            let budget = ctx.run_budget();
945            if let Some(budget) = &budget {
946                budget.check_pre_request()?;
947            }
948            let mut request = self.config.build_request(conversation.clone());
949            apply_overrides(&mut request, ctx);
950            request.response_format = Some(format.clone());
951
952            let invocation = ModelInvocation::new(request, ctx.clone());
953            let response = self.service().oneshot(invocation).await?;
954            if let Some(budget) = &budget {
955                budget.observe_usage(&response.usage)?;
956            }
957            // Capture the assistant's reply text for the retry path.
958            // `parse_typed_response` consumes by value; clone the
959            // text-block content first so a parse failure can
960            // re-feed the model its own output as context.
961            let assistant_text = response_text_for_retry(&response);
962
963            // Both failure modes arrive as `Error::ModelRetry`:
964            // schema-mismatch is wrapped inside `parse_typed_response`
965            // at the parse site, and validator failures already
966            // construct `Error::ModelRetry` themselves. A shared
967            // budget governs both because they reflect the same
968            // condition — the model emitted output the harness
969            // cannot accept — and distinguishing them at the budget
970            // level would add knobs without buying behaviour
971            // operators commonly want to vary independently
972            // (invariant 20).
973            let retry_hint: RenderedForLlm<String> =
974                match parse_typed_response::<O>(short_name, response) {
975                    Ok(value) => match validator.validate(&value) {
976                        Ok(()) => return Ok(value),
977                        Err(Error::ModelRetry { hint, .. }) => hint,
978                        Err(err) => return Err(err),
979                    },
980                    Err(Error::ModelRetry { hint, .. }) => hint,
981                    Err(err) => return Err(err),
982                };
983
984            if attempt >= max_retries {
985                return Err(Error::model_retry(retry_hint, attempt));
986            }
987            attempt += 1;
988
989            // Echo the assistant's failed turn into the conversation
990            // so the next call sees what it produced, then push the
991            // corrective user message carrying the rendered hint.
992            conversation.push(Message::new(
993                crate::ir::Role::Assistant,
994                vec![ContentPart::Text {
995                    text: assistant_text.unwrap_or_default(),
996                    cache_control: None,
997                    provider_echoes: Vec::new(),
998                }],
999            ));
1000            conversation.push(Message::new(
1001                crate::ir::Role::User,
1002                vec![ContentPart::Text {
1003                    text: retry_hint.into_inner(),
1004                    cache_control: None,
1005                    provider_echoes: Vec::new(),
1006                }],
1007            ));
1008        }
1009    }
1010
1011    /// Open a streaming model call and return an IR `StreamDelta`
1012    /// stream.
1013    ///
1014    /// Pipeline: `codec.encode_streaming` → `transport.send_streaming`
1015    /// → `codec.decode_stream` → `tap_aggregator`, all driven
1016    /// through the same `tower::Service` spine as
1017    /// [`Self::complete_full`]. Layers attached via
1018    /// [`Self::layer`] (e.g. `OtelLayer`, `PolicyLayer`) wrap
1019    /// the streaming dispatch the same way they wrap the one-shot
1020    /// dispatch; observability events (cost, span close) fire on
1021    /// the streaming-side completion future's `Ok` branch only —
1022    /// invariant 12.
1023    ///
1024    /// The returned [`ModelStream`] carries both the delta stream
1025    /// (consumer-visible) and a `completion` future that resolves
1026    /// to the aggregated [`ModelResponse`] after the consumer
1027    /// drains the stream. Layers wrap `completion` to gate
1028    /// observability emission on the `Ok` branch — a stream that
1029    /// errors mid-flight surfaces the error on both the consumer
1030    /// side and the completion future, and no charge fires.
1031    ///
1032    /// Codecs without true streaming support fall back to a
1033    /// single-shot pseudo-stream the same way they did before
1034    /// the spine refactor.
1035    pub async fn stream_deltas(
1036        &self,
1037        messages: Vec<Message>,
1038        ctx: &ExecutionContext,
1039    ) -> Result<ModelStream> {
1040        let budget = ctx.run_budget();
1041        if let Some(budget) = &budget {
1042            budget.check_pre_request()?;
1043        }
1044        let mut request = self.config.build_request(messages);
1045        apply_overrides(&mut request, ctx);
1046        let invocation = StreamingModelInvocation::new(ModelInvocation::new(request, ctx.clone()));
1047        let model_stream = self.streaming_service().oneshot(invocation).await?;
1048        let ModelStream { stream, completion } = model_stream;
1049        // Wrap completion to observe usage on the streaming-side
1050        // `Ok` branch — invariant 12 transactional semantics: a
1051        // stream that errors mid-flight resolves `completion` to
1052        // `Err` and never reaches the budget. Mirrors the
1053        // OtelLayer / PolicyLayer streaming wrap from G-1.
1054        let budget_for_completion = budget.clone();
1055        let user_facing = async move {
1056            let result = completion.await;
1057            if let (Ok(response), Some(budget)) = (&result, budget_for_completion.as_ref()) {
1058                budget.observe_usage(&response.usage)?;
1059            }
1060            result
1061        };
1062        Ok(ModelStream {
1063            stream,
1064            completion: Box::pin(user_facing),
1065        })
1066    }
1067
1068    /// Streaming sibling of [`Self::complete_typed`]. Returns a
1069    /// [`TypedModelStream<O>`] whose `stream` field exposes raw
1070    /// [`StreamDelta`]s (text fragments operators echo to the user
1071    /// during generation) and whose `completion` future resolves to
1072    /// the aggregated, parsed `O`.
1073    ///
1074    /// `response_format = ResponseFormat::strict(JsonSchemaSpec::for::<O>())`
1075    /// is set on the request so the model emits the typed JSON
1076    /// payload natively (Native strategy via `text` deltas) or
1077    /// through a `tool_use` block (Tool strategy). The aggregator
1078    /// behind [`Self::stream_deltas`] already collects both shapes
1079    /// into the final [`ModelResponse`]; `stream_typed` parses the
1080    /// completion the same way [`Self::complete_typed`] does.
1081    ///
1082    /// `O: JsonSchema + DeserializeOwned + Send + 'static` —
1083    /// schemars derives the JSON Schema at call time.
1084    ///
1085    /// # Streaming + retry tradeoff
1086    ///
1087    /// `stream_typed` does **not** retry on parse failure: by the
1088    /// time `completion` resolves the deltas have already been
1089    /// surfaced to the consumer, so re-invoking with a corrective
1090    /// hint would emit a divergent second stream. Operators wanting
1091    /// the [`ChatModelConfig::validation_retries`] loop call
1092    /// [`Self::complete_typed`] / [`Self::complete_typed_validated`]
1093    /// instead — a parse failure there is fully recoverable because
1094    /// no partial output was surfaced.
1095    ///
1096    /// A validator that needs to inspect the parsed `O` runs after
1097    /// `completion` resolves: `let value = stream.completion.await?;
1098    /// validator.validate(&value)?;`. The validator's `Err` does not
1099    /// flow back into the stream — it surfaces alongside the typed
1100    /// completion at the call site.
1101    pub async fn stream_typed<O>(
1102        &self,
1103        messages: Vec<Message>,
1104        ctx: &ExecutionContext,
1105    ) -> Result<TypedModelStream<O>>
1106    where
1107        O: schemars::JsonSchema + serde::de::DeserializeOwned + Send + 'static,
1108    {
1109        if self.config.validation_retries > 0 {
1110            // Surface the contract divergence — operators wiring
1111            // with_validation_retries on a ChatModel and then calling
1112            // stream_typed almost certainly expect the retry loop to
1113            // also cover the streaming path. records the
1114            // design decision (no-retry on streaming because deltas
1115            // were already surfaced to the consumer); a debug log
1116            // makes the silent ignore visible at run time.
1117            tracing::debug!(
1118                validation_retries = self.config.validation_retries,
1119                "ChatModel::stream_typed ignores validation_retries — \
1120                 streaming + retry would emit a divergent second stream \
1121                 over already-surfaced deltas. Use complete_typed for \
1122                 the unified retry budget."
1123            );
1124        }
1125        let schema_value = serde_json::to_value(schemars::schema_for!(O)).map_err(Error::Serde)?;
1126        let type_name = std::any::type_name::<O>();
1127        let short_name = type_name.rsplit("::").next().unwrap_or(type_name);
1128        let spec = JsonSchemaSpec::new(short_name, schema_value)?;
1129        let format = ResponseFormat::strict(spec);
1130
1131        let budget = ctx.run_budget();
1132        if let Some(budget) = &budget {
1133            budget.check_pre_request()?;
1134        }
1135        let mut request = self.config.build_request(messages);
1136        apply_overrides(&mut request, ctx);
1137        request.response_format = Some(format);
1138
1139        let invocation = StreamingModelInvocation::new(ModelInvocation::new(request, ctx.clone()));
1140        let model_stream = self.streaming_service().oneshot(invocation).await?;
1141        let ModelStream { stream, completion } = model_stream;
1142
1143        let budget_for_completion = budget.clone();
1144        let short_name_owned = short_name.to_owned();
1145        let typed_completion = async move {
1146            let response = completion.await?;
1147            if let Some(budget) = &budget_for_completion {
1148                budget.observe_usage(&response.usage)?;
1149            }
1150            parse_typed_response::<O>(&short_name_owned, response)
1151        };
1152        Ok(TypedModelStream {
1153            stream,
1154            completion: Box::pin(typed_completion),
1155        })
1156    }
1157}
1158
1159/// Streaming counterpart to a `complete_typed` call — raw deltas on
1160/// `stream`, the aggregated typed payload on `completion`.
1161///
1162/// The `stream` field is identical to [`ModelStream::stream`]: raw
1163/// [`StreamDelta`]s the consumer surfaces to the user (text
1164/// fragments, thinking deltas, …) as the model emits them. The
1165/// `completion` future resolves to the typed `O` parsed from the
1166/// aggregated final response — operators await it once the stream
1167/// has been drained.
1168///
1169/// Mirrors [`crate::service::ModelStream`] for the typed-output
1170/// path; the underlying aggregator is the same `tap_aggregator`
1171/// used by [`ChatModel::stream_deltas`].
1172pub struct TypedModelStream<O> {
1173    /// Raw delta stream surfaced to the consumer.
1174    pub stream: BoxDeltaStream<'static>,
1175    /// Future resolving to the typed `O` after the stream has been
1176    /// consumed to its terminal `Stop`. Drops the `TypedModelStream`
1177    /// before draining the stream to surface an `Err` on completion.
1178    pub completion: BoxFuture<'static, Result<O>>,
1179}
1180
1181impl<O> std::fmt::Debug for TypedModelStream<O> {
1182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1183        f.debug_struct("TypedModelStream")
1184            .field("stream", &"<BoxDeltaStream>")
1185            .field(
1186                "completion",
1187                &format_args!("<BoxFuture<Result<{}>>>", std::any::type_name::<O>()),
1188            )
1189            .finish()
1190    }
1191}
1192
1193impl<C: Codec + 'static, T: Transport + 'static> std::fmt::Debug for ChatModel<C, T> {
1194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1195        f.debug_struct("ChatModel")
1196            .field("model", &self.config.model)
1197            .field("codec", &self.codec().name())
1198            .field("transport", &self.transport().name())
1199            .field("layers", &self.layer_names)
1200            .finish()
1201    }
1202}
1203
1204/// Parse a [`ModelResponse`] produced by a `complete_typed`
1205/// dispatch into the operator's typed `O`. The two dispatch
1206/// shapes both surface the JSON object the model produced —
1207/// `Native` strategy lands as a single text content part whose
1208/// body is the JSON document; `Tool` strategy lands as a single
1209/// `ContentPart::ToolUse` whose `input` is the JSON object. The
1210/// helper tries the tool path first (Anthropic / Bedrock-Anthropic
1211/// default), then falls through to the text path (OpenAI /
1212/// Gemini default).
1213// `response` is taken by value to express ownership of the
1214// response payload — the function consumes the response's
1215// stop reason / warnings semantics from the caller's
1216// perspective even though only `content` is read. Clippy's
1217// `needless_pass_by_value` would have us borrow, but a borrow
1218// would let the caller continue to inspect `response.warnings`
1219// (etc.) after `parse_typed_response` succeeded, masking the
1220// "this response has been consumed" intent the typed surface
1221// communicates.
1222/// Concatenate every `ContentPart::Text` block in `response` for
1223/// the retry-path conversation echo. Returns `None` when the model
1224/// surfaced no textual content (in that case the retry loop seeds
1225/// the assistant turn with an empty string — the diagnostic message
1226/// alone is enough context for the model to course-correct).
1227fn response_text_for_retry(response: &ModelResponse) -> Option<String> {
1228    let mut out = String::new();
1229    for part in &response.content {
1230        if let ContentPart::Text { text, .. } = part {
1231            out.push_str(text);
1232        }
1233    }
1234    if out.is_empty() { None } else { Some(out) }
1235}
1236
1237/// Parse a structured-output response into `O`, wrapping any
1238/// schema-mismatch directly into [`Error::ModelRetry`] at the parse
1239/// site so the unified retry loop never sees a raw [`Error::Serde`]
1240/// for schema-mismatch failures (invariant 20). The hint flows through
1241/// the [`crate::llm_facing::RenderedForLlm`] funnel so the model
1242/// receives a corrective message routed by the operator (invariant 16).
1243///
1244/// [`Error::invalid_request`] still surfaces unchanged when the
1245/// response carried no parseable content — that is an `OutputStrategy`
1246/// configuration error, not a retry condition.
1247#[allow(clippy::needless_pass_by_value)]
1248fn parse_typed_response<O>(short_name: &str, response: ModelResponse) -> Result<O>
1249where
1250    O: serde::de::DeserializeOwned,
1251{
1252    use crate::llm_facing::LlmRenderable;
1253    let wrap = |e: serde_json::Error| -> Error {
1254        Error::model_retry(schema_mismatch_diagnostic(short_name, &e).for_llm(), 0)
1255    };
1256    for part in &response.content {
1257        if let ContentPart::ToolUse { input, .. } = part {
1258            return serde_json::from_value(input.clone()).map_err(wrap);
1259        }
1260    }
1261    for part in &response.content {
1262        if let ContentPart::Text { text, .. } = part {
1263            return serde_json::from_str(text).map_err(wrap);
1264        }
1265    }
1266    Err(Error::invalid_request(
1267        "complete_typed: model response carried neither a `tool_use` block nor a text \
1268         block — the configured `OutputStrategy` did not produce typed output",
1269    ))
1270}
1271
1272/// Render a schema-mismatch parse error into a model-actionable hint.
1273/// Strips `serde_json::Error`'s trailing `at line N column M` position
1274/// noise — line / column offsets reference the raw bytes the parser
1275/// scanned and cannot help the model correct its output, but they leak
1276/// internal parser state into the LLM channel (invariant 16).
1277///
1278/// Schema-mismatch and validator-driven retries converge on the
1279/// returned text wrapped in the `RenderedForLlm` carrier (invariant 20).
1280fn schema_mismatch_diagnostic(short_name: &str, err: &serde_json::Error) -> String {
1281    let raw = err.to_string();
1282    let trimmed = raw
1283        .split(" at line ")
1284        .next()
1285        .unwrap_or(raw.as_str())
1286        .trim_end_matches('.');
1287    format!(
1288        "Your previous response did not match the required JSON schema for `{short_name}`. \
1289         Parser diagnostic: {trimmed}.\n\
1290         Re-emit the response as a single valid JSON object that conforms to the schema."
1291    )
1292}
1293
1294// ── Provider shortcuts ────────────────────────────────────────────
1295//
1296// One-call constructors that bundle the codec + transport + auth
1297// setup for the three first-party direct-API providers. These
1298// trade flexibility for ergonomics: callers wanting custom base
1299// URLs, per-call retry policies, or alternative auth shapes
1300// continue to use [`ChatModel::new`] with the explicit components.
1301//
1302// Why these three: they ship in `entelix-core` as built-in codecs
1303// against `DirectTransport`. Cloud transports (Bedrock, Vertex,
1304// Foundry) live in `entelix-cloud` and get their own shortcuts in
1305// that crate to avoid pulling AWS / GCP / Azure SDKs into the
1306// always-on dependency tree.
1307//
1308// Per Invariant 10, no credential ever lands on
1309// [`ExecutionContext`]; the API keys are wrapped in
1310// [`secrecy::SecretString`] and held by the credential provider,
1311// then read by the transport at request time.
1312
1313use crate::auth::{ApiKeyProvider, BearerProvider};
1314use crate::codecs::{AnthropicMessagesCodec, GeminiCodec, OpenAiChatCodec};
1315use crate::transports::DirectTransport;
1316use secrecy::SecretString;
1317
1318impl ChatModel<AnthropicMessagesCodec, DirectTransport> {
1319    /// One-call construction against `https://api.anthropic.com`.
1320    /// Bundles [`AnthropicMessagesCodec`] + [`DirectTransport`] +
1321    /// [`ApiKeyProvider`]. Mirrors LangChain's `ChatAnthropic(api_key=…)`
1322    /// surface so the 5-line agent path is achievable without
1323    /// hand-wiring four components.
1324    ///
1325    /// Returns [`Error::Config`] if the underlying HTTP client
1326    /// cannot be initialised.
1327    pub fn anthropic(api_key: impl Into<SecretString>, model: impl Into<String>) -> Result<Self> {
1328        let credentials = Arc::new(ApiKeyProvider::anthropic(api_key));
1329        let transport = DirectTransport::anthropic(credentials)?;
1330        Ok(Self::new(AnthropicMessagesCodec::new(), transport, model))
1331    }
1332}
1333
1334impl ChatModel<OpenAiChatCodec, DirectTransport> {
1335    /// One-call construction against `https://api.openai.com`.
1336    /// Bundles [`OpenAiChatCodec`] + [`DirectTransport`] +
1337    /// [`BearerProvider`].
1338    pub fn openai(api_key: impl Into<SecretString>, model: impl Into<String>) -> Result<Self> {
1339        let credentials = Arc::new(BearerProvider::new(api_key));
1340        let transport = DirectTransport::openai(credentials)?;
1341        Ok(Self::new(OpenAiChatCodec::new(), transport, model))
1342    }
1343}
1344
1345impl ChatModel<GeminiCodec, DirectTransport> {
1346    /// One-call construction against
1347    /// `https://generativelanguage.googleapis.com`. Bundles
1348    /// [`GeminiCodec`] + [`DirectTransport`] + [`BearerProvider`].
1349    pub fn gemini(api_key: impl Into<SecretString>, model: impl Into<String>) -> Result<Self> {
1350        let credentials = Arc::new(BearerProvider::new(api_key));
1351        let transport = DirectTransport::gemini(credentials)?;
1352        Ok(Self::new(GeminiCodec::new(), transport, model))
1353    }
1354}