entelix_core/chat.rs
1//! `ChatModel` — composes a `Codec` and a `Transport` into a layered
2//! `tower::Service<ModelInvocation, Response = ModelResponse>` plus
3//! a streaming surface.
4//!
5//! The composition is two-tiered:
6//!
7//! - An internal leaf service performs the raw
8//! `codec.encode → transport.send → codec.decode` round trip,
9//! implementing `tower::Service<ModelInvocation>`.
10//! - [`ChatModel<C, T>`] is the user-facing builder. It owns the
11//! leaf inner service and a layer stack assembled via
12//! [`ChatModel::layer`]. The composed `tower::Service` materialises
13//! on each [`ChatModel::complete_full`] call.
14//!
15//! Cross-cutting concerns (PII redaction, quota gates, cost
16//! metering, OTel observability) live as `tower::Layer<S>` types in
17//! sibling crates (`entelix-policy::PolicyLayer`,
18//! `entelix-otel::OtelLayer`). Same layer wraps both this surface
19//! and [`crate::tools::ToolRegistry`] dispatch — one composition
20//! primitive across the whole agent stack.
21
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use futures::StreamExt;
26use futures::future::BoxFuture;
27use tower::util::BoxCloneService;
28use tower::{Layer, Service, ServiceExt};
29
30use crate::codecs::{BoxDeltaStream, Codec};
31use crate::context::ExecutionContext;
32use crate::error::{Error, Result};
33use crate::ir::{
34 ContentPart, JsonSchemaSpec, Message, ModelRequest, ModelResponse, ReasoningEffort,
35 ResponseFormat, Role, SystemPrompt, ToolChoice, ToolSpec,
36};
37use crate::overrides::{RequestOverrides, RunOverrides};
38use crate::service::{
39 BoxedModelService, BoxedStreamingService, ModelInvocation, ModelStream, NamedLayer,
40 StreamingModelInvocation,
41};
42use crate::stream::{StreamDelta, tap_aggregator};
43use crate::transports::Transport;
44
45/// Patch `request` with any [`RunOverrides`] and [`RequestOverrides`]
46/// attached to `ctx`. Both `complete_full` and `stream_deltas` route
47/// through this helper so the override semantics stay identical
48/// across the two surfaces.
49///
50/// `RunOverrides` patches agent-loop-owned fields (`model`,
51/// `system`); `RequestOverrides` patches `ModelRequest`-shaped
52/// sampling knobs (`temperature`, `top_p`, `max_tokens`,
53/// `stop_sequences`, `reasoning_effort`, `tool_choice`,
54/// `response_format`). Either, both, or neither may be present.
55fn apply_overrides(request: &mut ModelRequest, ctx: &ExecutionContext) {
56 if let Some(run) = ctx.extension::<RunOverrides>() {
57 if let Some(model) = run.model() {
58 model.clone_into(&mut request.model);
59 }
60 if let Some(system) = run.system_prompt() {
61 request.system = system.clone();
62 }
63 if let Some(specs) = run.tool_specs() {
64 request.tools = Arc::clone(specs);
65 }
66 }
67 if let Some(req) = ctx.extension::<RequestOverrides>() {
68 if let Some(t) = req.temperature() {
69 request.temperature = Some(t);
70 }
71 if let Some(p) = req.top_p() {
72 request.top_p = Some(p);
73 }
74 if let Some(k) = req.top_k() {
75 request.top_k = Some(k);
76 }
77 if let Some(n) = req.max_tokens() {
78 request.max_tokens = Some(n);
79 }
80 if let Some(sequences) = req.stop_sequences() {
81 request.stop_sequences = sequences.to_vec();
82 }
83 if let Some(effort) = req.reasoning_effort() {
84 request.reasoning_effort = Some(effort.clone());
85 }
86 if let Some(choice) = req.tool_choice() {
87 request.tool_choice = choice.clone();
88 }
89 if let Some(format) = req.response_format() {
90 request.response_format = Some(format.clone());
91 }
92 if let Some(parallel) = req.parallel_tool_calls() {
93 request.parallel_tool_calls = Some(parallel);
94 }
95 if let Some(user_id) = req.end_user_id() {
96 request.end_user_id = Some(user_id.to_owned());
97 }
98 if let Some(seed) = req.seed() {
99 request.seed = Some(seed);
100 }
101 }
102}
103
104/// Builder-side configuration that flows into every `ModelRequest`
105/// the `ChatModel` issues. Stored separately from the leaf service
106/// so layers / streaming / future surfaces share one source of
107/// truth.
108///
109/// Fields are private. Construct via [`ChatModelConfig::new`];
110/// mutate through [`ChatModel`]'s `with_*` setters; inspect through
111/// the bare accessors on this type. `#[non_exhaustive]` plus the
112/// privatised fields together mean post-1.0 additions ship as MINOR
113/// without surprising any downstream consumer.
114#[derive(Clone, Debug)]
115#[non_exhaustive]
116pub struct ChatModelConfig {
117 model: String,
118 max_tokens: Option<u32>,
119 system: SystemPrompt,
120 temperature: Option<f32>,
121 top_p: Option<f32>,
122 top_k: Option<u32>,
123 stop_sequences: Vec<String>,
124 tools: Arc<[ToolSpec]>,
125 tool_choice: ToolChoice,
126 reasoning_effort: Option<ReasoningEffort>,
127 /// `complete_typed<O>` retry budget — schema-mismatch and
128 /// [`crate::OutputValidator`] failures both reflect their hint
129 /// to the model and re-prompt up to `validation_retries` times
130 /// before surfacing [`Error::ModelRetry`] (invariant 20).
131 /// Default `0` (no retry). Distinct from
132 /// [`crate::Error::Provider`]'s transport retries (handled by
133 /// `RetryService`).
134 validation_retries: u32,
135 /// Operator-supplied token counter for pre-flight budget checks
136 /// and content-economy estimation. `None` (the default) means
137 /// the SDK relies on the vendor's post-flight `Usage` block;
138 /// pre-flight enforcement (refusing a call that would exceed
139 /// the configured `RunBudget` ceiling before sending) requires
140 /// an explicit counter. Concrete counters ship as companion
141 /// crates (`entelix-tokenizer-tiktoken`,
142 /// `entelix-tokenizer-hf`, locale-aware companions);
143 /// [`crate::ByteCountTokenCounter`] is the zero-dependency
144 /// English-biased default.
145 token_counter: Option<std::sync::Arc<dyn crate::tokens::TokenCounter>>,
146}
147
148impl ChatModelConfig {
149 /// Build a fresh config seeded with `model` and otherwise
150 /// defaulted.
151 #[must_use]
152 pub fn new(model: impl Into<String>) -> Self {
153 Self {
154 model: model.into(),
155 max_tokens: None,
156 system: SystemPrompt::default(),
157 temperature: None,
158 top_p: None,
159 top_k: None,
160 stop_sequences: Vec::new(),
161 tools: Arc::from([]),
162 tool_choice: ToolChoice::default(),
163 reasoning_effort: None,
164 validation_retries: 0,
165 token_counter: None,
166 }
167 }
168
169 /// Borrow the configured token counter, if any. Returns `None`
170 /// when the operator has not wired one — pre-flight budget
171 /// enforcement falls back to vendor `Usage` post-response.
172 #[must_use]
173 pub fn token_counter(&self) -> Option<&std::sync::Arc<dyn crate::tokens::TokenCounter>> {
174 self.token_counter.as_ref()
175 }
176
177 /// `complete_typed<O>` retry budget. Default `0` — the first
178 /// schema-mismatch fail surfaces unchanged. Operators that want
179 /// the loop to reflect the parse error to the model and ask for
180 /// a corrected JSON response set this to `1`–`3`.
181 pub const fn validation_retries(&self) -> u32 {
182 self.validation_retries
183 }
184
185 /// Provider model identifier sent on the wire.
186 pub fn model(&self) -> &str {
187 &self.model
188 }
189
190 /// Per-call `max_tokens` cap (`None` = vendor default).
191 pub const fn max_tokens(&self) -> Option<u32> {
192 self.max_tokens
193 }
194
195 /// System-prompt blocks prepended to every call. Supports
196 /// per-block prompt caching (Anthropic / Bedrock Converse).
197 pub const fn system(&self) -> &SystemPrompt {
198 &self.system
199 }
200
201 /// Sampling temperature.
202 pub const fn temperature(&self) -> Option<f32> {
203 self.temperature
204 }
205
206 /// Nucleus-sampling parameter.
207 pub const fn top_p(&self) -> Option<f32> {
208 self.top_p
209 }
210
211 /// Top-k sampling parameter (`None` ⇒ vendor default). Codec
212 /// support follows the IR mapping documented on
213 /// [`crate::ir::ModelRequest::top_k`].
214 pub const fn top_k(&self) -> Option<u32> {
215 self.top_k
216 }
217
218 /// Stop sequences.
219 pub fn stop_sequences(&self) -> &[String] {
220 &self.stop_sequences
221 }
222
223 /// Advertised tools.
224 pub fn tools(&self) -> &[ToolSpec] {
225 &self.tools
226 }
227
228 /// Tool-choice mode.
229 pub const fn tool_choice(&self) -> &ToolChoice {
230 &self.tool_choice
231 }
232
233 /// Cross-vendor reasoning-effort knob (`None` ⇒ vendor default).
234 /// Codecs translate onto their native wire shape per the
235 /// per-vendor mapping documented on
236 /// [`crate::ir::ReasoningEffort`]; lossy approximations emit
237 /// `ModelWarning::LossyEncode`.
238 pub const fn reasoning_effort(&self) -> Option<&ReasoningEffort> {
239 self.reasoning_effort.as_ref()
240 }
241
242 /// Combine config with caller-supplied messages into a full
243 /// [`ModelRequest`]. Only the fields the config carries are
244 /// projected; per-request knobs and provider extensions stay at
245 /// their `Default` (i.e. unset) and flow in via `RequestOverrides`
246 /// or direct `ExecutionContext::add_extension` instead.
247 #[must_use]
248 pub fn build_request(&self, messages: Vec<Message>) -> ModelRequest {
249 ModelRequest {
250 model: self.model.clone(),
251 messages,
252 system: self.system.clone(),
253 max_tokens: self.max_tokens,
254 temperature: self.temperature,
255 top_p: self.top_p,
256 top_k: self.top_k,
257 stop_sequences: self.stop_sequences.clone(),
258 tools: Arc::clone(&self.tools),
259 tool_choice: self.tool_choice.clone(),
260 reasoning_effort: self.reasoning_effort.clone(),
261 ..ModelRequest::default()
262 }
263 }
264}
265
266/// Leaf service: raw `codec.encode → transport.send → codec.decode`.
267/// `tower::Service<ModelInvocation, Response = ModelResponse>`.
268/// Cloning is cheap.
269///
270/// Internal composition primitive — users compose at the
271/// [`ChatModel`] level. Exposing the leaf service publicly would
272/// invite callers to bypass the layer stack and miss
273/// observability / policy / cost middleware.
274pub(crate) struct InnerChatModel<C: Codec, T: Transport> {
275 codec: Arc<C>,
276 transport: Arc<T>,
277}
278
279impl<C: Codec, T: Transport> Clone for InnerChatModel<C, T> {
280 fn clone(&self) -> Self {
281 Self {
282 codec: Arc::clone(&self.codec),
283 transport: Arc::clone(&self.transport),
284 }
285 }
286}
287
288impl<C: Codec, T: Transport> InnerChatModel<C, T> {
289 /// Wrap shared `Arc`s.
290 pub(crate) const fn from_arc(codec: Arc<C>, transport: Arc<T>) -> Self {
291 Self { codec, transport }
292 }
293
294 /// Borrow the codec.
295 pub(crate) fn codec(&self) -> &C {
296 &self.codec
297 }
298
299 /// Borrow the transport.
300 pub(crate) fn transport(&self) -> &T {
301 &self.transport
302 }
303}
304
305impl<C: Codec + 'static, T: Transport + 'static> Service<ModelInvocation> for InnerChatModel<C, T> {
306 type Response = ModelResponse;
307 type Error = Error;
308 type Future = BoxFuture<'static, Result<ModelResponse>>;
309
310 #[inline]
311 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
312 Poll::Ready(Ok(()))
313 }
314
315 fn call(&mut self, invocation: ModelInvocation) -> Self::Future {
316 let codec = Arc::clone(&self.codec);
317 let transport = Arc::clone(&self.transport);
318 Box::pin(async move {
319 let ModelInvocation { request, ctx } = invocation;
320 let encoded = codec.encode(&request)?;
321 let warnings = encoded.warnings.clone();
322 let response = transport.send(encoded, &ctx).await?;
323 if !(200..300).contains(&response.status) {
324 let body_text = String::from_utf8_lossy(&response.body).into_owned();
325 let mut err = Error::provider_http(response.status, body_text);
326 if let Some(after) =
327 crate::transports::parse_retry_after(response.headers.get("retry-after"))
328 {
329 err = err.with_retry_after(after);
330 }
331 return Err(err);
332 }
333 let rate_limit = codec.extract_rate_limit(&response.headers);
334 let mut decoded = codec.decode(&response.body, warnings)?;
335 decoded.rate_limit = rate_limit;
336 Ok(decoded)
337 })
338 }
339}
340
341impl<C: Codec + 'static, T: Transport + 'static> Service<StreamingModelInvocation>
342 for InnerChatModel<C, T>
343{
344 type Response = ModelStream;
345 type Error = Error;
346 type Future = BoxFuture<'static, Result<ModelStream>>;
347
348 #[inline]
349 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
350 Poll::Ready(Ok(()))
351 }
352
353 fn call(&mut self, invocation: StreamingModelInvocation) -> Self::Future {
354 let codec = Arc::clone(&self.codec);
355 let transport = Arc::clone(&self.transport);
356 Box::pin(async move {
357 let StreamingModelInvocation {
358 inner: ModelInvocation { request, ctx },
359 } = invocation;
360 let encoded = codec.encode_streaming(&request)?;
361 let warnings = encoded.warnings.clone();
362 let stream = transport.send_streaming(encoded, &ctx).await?;
363 if !(200..300).contains(&stream.status) {
364 let mut buf = Vec::new();
365 let mut body = stream.body;
366 while let Some(chunk) = body.next().await {
367 if let Ok(b) = chunk {
368 buf.extend_from_slice(&b);
369 }
370 }
371 let body_text = String::from_utf8_lossy(&buf).into_owned();
372 let mut err = Error::provider_http(stream.status, body_text);
373 if let Some(after) =
374 crate::transports::parse_retry_after(stream.headers.get("retry-after"))
375 {
376 err = err.with_retry_after(after);
377 }
378 return Err(err);
379 }
380 let rate_limit = codec.extract_rate_limit(&stream.headers);
381 // The codec's `decode_stream` signature borrows
382 // `&'a self`, so we tie the borrow's lifetime to the
383 // owned `Arc<C>` by capturing it inside an
384 // `async_stream::stream!` generator — the generator
385 // becomes a `'static` future state that owns the Arc
386 // for as long as the resulting stream lives. Without
387 // this re-anchoring, the returned `BoxDeltaStream<'a>`
388 // would borrow from a stack-local `codec` binding
389 // that drops at the end of `call`.
390 let codec_for_stream = Arc::clone(&codec);
391 // The Rust 2024 tail-expr-drop-order change is benign
392 // here — the `async_stream::stream!` block holds a
393 // pinned mutable iterator (`inner`) over the codec's
394 // decode pipeline; whether the temporaries inside drop
395 // at end-of-block (Edition 2021) or end-of-statement
396 // (Edition 2024) does not change observable
397 // semantics, since `inner` is fully consumed before
398 // the block returns.
399 #[allow(tail_expr_drop_order)]
400 let codec_stream: BoxDeltaStream<'static> = Box::pin(async_stream::stream! {
401 let inner = codec_for_stream.decode_stream(stream.body, warnings);
402 futures::pin_mut!(inner);
403 while let Some(delta) = inner.next().await {
404 yield delta;
405 }
406 });
407 // Prepend a synthetic `RateLimit` delta when the codec
408 // could parse one from the response headers — the
409 // aggregator captures it the same way it would a
410 // mid-stream `Usage` delta, so the final
411 // `ModelResponse` carries the snapshot operators
412 // expect on `ModelResponse::rate_limit`.
413 let prefixed: BoxDeltaStream<'static> = match rate_limit {
414 Some(snapshot) => {
415 let prepend = futures::stream::iter(vec![Ok(StreamDelta::RateLimit(snapshot))]);
416 Box::pin(prepend.chain(codec_stream))
417 }
418 None => codec_stream,
419 };
420 Ok(tap_aggregator(prefixed))
421 })
422 }
423}
424
425/// Boxed factory: `InnerChatModel` → layered [`BoxedModelService`].
426/// Stored on `ChatModel` so the type of the (potentially deeply
427/// nested) layer stack stays opaque to users; one concrete
428/// `ChatModel<C, T>` shape regardless of how many layers are
429/// attached.
430type LayerFactory<C, T> = Arc<dyn Fn(InnerChatModel<C, T>) -> BoxedModelService + Send + Sync>;
431
432/// Boxed factory for the streaming-side spine. Parallel to
433/// [`LayerFactory`]; layers attached via [`ChatModel::layer`] are
434/// stacked onto this factory the same way they're stacked onto
435/// the one-shot factory, so a single `.layer(OtelLayer::new(...))`
436/// call wraps both spines with one observability stack.
437type StreamingLayerFactory<C, T> =
438 Arc<dyn Fn(InnerChatModel<C, T>) -> BoxedStreamingService + Send + Sync>;
439
440/// Configurable chat model — codec + transport + layer stack.
441///
442/// Cheap to clone (handles are `Arc`-backed). Builder-style
443/// configuration via `with_*` methods; layer attachment via
444/// [`Self::layer`].
445pub struct ChatModel<C: Codec + 'static, T: Transport + 'static> {
446 inner: InnerChatModel<C, T>,
447 config: ChatModelConfig,
448 factory: Option<LayerFactory<C, T>>,
449 streaming_factory: Option<StreamingLayerFactory<C, T>>,
450 /// Diagnostic stack — names captured at [`Self::layer`] compose
451 /// time, in insertion order (innermost composed first → outermost
452 /// composed last). The wrapped service stacks last-composed
453 /// outermost, so this `Vec` reads bottom-to-top relative to
454 /// request flow.
455 layer_names: Vec<&'static str>,
456}
457
458impl<C: Codec + 'static, T: Transport + 'static> Clone for ChatModel<C, T> {
459 fn clone(&self) -> Self {
460 Self {
461 inner: self.inner.clone(),
462 config: self.config.clone(),
463 factory: self.factory.clone(),
464 streaming_factory: self.streaming_factory.clone(),
465 layer_names: self.layer_names.clone(),
466 }
467 }
468}
469
470impl<C: Codec + 'static, T: Transport + 'static> ChatModel<C, T> {
471 /// Build a model bundling owned codec + transport + model name.
472 pub fn new(codec: C, transport: T, model: impl Into<String>) -> Self {
473 Self::from_arc(Arc::new(codec), Arc::new(transport), model)
474 }
475
476 /// Build from already-shared `Arc`s — useful when many models
477 /// share one transport / codec instance.
478 pub fn from_arc(codec: Arc<C>, transport: Arc<T>, model: impl Into<String>) -> Self {
479 Self {
480 inner: InnerChatModel::from_arc(codec, transport),
481 config: ChatModelConfig::new(model),
482 factory: None,
483 streaming_factory: None,
484 layer_names: Vec::new(),
485 }
486 }
487
488 /// Set per-call `max_tokens`.
489 #[must_use]
490 pub const fn with_max_tokens(mut self, n: u32) -> Self {
491 self.config.max_tokens = Some(n);
492 self
493 }
494
495 /// Attach a system / instruction prompt. Convenience
496 /// shorthand for a single-block uncached prompt — for
497 /// multi-block or cached prompts, set `config.system` to a
498 /// pre-built [`SystemPrompt`] directly.
499 #[must_use]
500 pub fn with_system(mut self, s: impl Into<String>) -> Self {
501 self.config.system = SystemPrompt::text(s);
502 self
503 }
504
505 /// Attach a pre-built [`SystemPrompt`] — supports multi-block
506 /// and per-block cache control.
507 #[must_use]
508 pub fn with_system_prompt(mut self, prompt: SystemPrompt) -> Self {
509 self.config.system = prompt;
510 self
511 }
512
513 /// Set sampling temperature.
514 #[must_use]
515 pub const fn with_temperature(mut self, t: f32) -> Self {
516 self.config.temperature = Some(t);
517 self
518 }
519
520 /// Set the [`complete_typed`](Self::complete_typed) validation
521 /// retry budget — number of times the loop reflects a
522 /// schema-mismatch or [`crate::OutputValidator`] failure back to
523 /// the model with corrective hint text before surfacing the
524 /// terminal [`Error::ModelRetry`]. Default `0` (no retry). Each
525 /// retry increments the conversation length by two messages
526 /// (assistant's failed reply + retry prompt). Both retry shapes
527 /// share one budget and route through `Error::ModelRetry`
528 /// (invariant 20).
529 #[must_use]
530 pub const fn with_validation_retries(mut self, n: u32) -> Self {
531 self.config.validation_retries = n;
532 self
533 }
534
535 /// Wire an operator-supplied [`TokenCounter`](crate::TokenCounter)
536 /// for pre-flight budget checks and content-economy estimation.
537 /// Each call replaces the configured counter — the last call wins.
538 ///
539 /// Vendor-accurate counters (tiktoken, HuggingFace, ko-mecab)
540 /// ship as companion crates so the core stays
541 /// zero-dependency. [`crate::ByteCountTokenCounter`] is the
542 /// English-biased zero-dependency default for development
543 /// scaffolding.
544 #[must_use]
545 pub fn with_token_counter(
546 mut self,
547 counter: std::sync::Arc<dyn crate::tokens::TokenCounter>,
548 ) -> Self {
549 self.config.token_counter = Some(counter);
550 self
551 }
552
553 /// Set nucleus sampling parameter.
554 #[must_use]
555 pub const fn with_top_p(mut self, p: f32) -> Self {
556 self.config.top_p = Some(p);
557 self
558 }
559
560 /// Set top-k sampling parameter. Native on Anthropic, Gemini,
561 /// and Bedrock-Anthropic; OpenAI codecs surface as
562 /// `LossyEncode`.
563 #[must_use]
564 pub const fn with_top_k(mut self, k: u32) -> Self {
565 self.config.top_k = Some(k);
566 self
567 }
568
569 /// Append one stop sequence.
570 #[must_use]
571 pub fn with_stop_sequence(mut self, s: impl Into<String>) -> Self {
572 self.config.stop_sequences.push(s.into());
573 self
574 }
575
576 /// Replace the full stop sequences list.
577 #[must_use]
578 pub fn with_stop_sequences(mut self, seqs: Vec<String>) -> Self {
579 self.config.stop_sequences = seqs;
580 self
581 }
582
583 /// Replace the advertised tools list. Accepts anything that
584 /// converts into `Arc<[ToolSpec]>` — `Vec<ToolSpec>` and
585 /// `[ToolSpec; N]` literals both qualify, so caller ergonomics
586 /// match the previous `Vec` shape while per-dispatch
587 /// `build_request` clones become an atomic refcount bump rather
588 /// than a deep walk of every tool's JSON schema.
589 #[must_use]
590 pub fn with_tools(mut self, tools: impl Into<Arc<[ToolSpec]>>) -> Self {
591 self.config.tools = tools.into();
592 self
593 }
594
595 /// Set the tool-choice mode.
596 #[must_use]
597 pub fn with_tool_choice(mut self, c: ToolChoice) -> Self {
598 self.config.tool_choice = c;
599 self
600 }
601
602 /// Set the cross-vendor reasoning-effort knob. Codecs translate
603 /// onto their native wire shape per the table on
604 /// [`crate::ir::ReasoningEffort`] — `Off` / `Minimal` / `Low`
605 /// / `Medium` / `High` / `Auto` snap to vendor buckets, lossy
606 /// approximations emit `ModelWarning::LossyEncode`, and
607 /// `VendorSpecific(s)` passes the literal vendor wire value
608 /// through.
609 #[must_use]
610 pub fn with_reasoning_effort(mut self, effort: ReasoningEffort) -> Self {
611 self.config.reasoning_effort = Some(effort);
612 self
613 }
614
615 /// Append a `tower::Layer` to **both** dispatch spines — the
616 /// one-shot path (`Service<ModelInvocation, Response =
617 /// ModelResponse>`) and the streaming path
618 /// (`Service<StreamingModelInvocation, Response =
619 /// ModelStream>`). Each `.layer(L)` wraps `L` *around* the
620 /// already-composed stack, so the **last-registered layer is
621 /// outermost** (sees the request first, the response last) and
622 /// the first-registered layer sits innermost against the leaf
623 /// `InnerChatModel`.
624 ///
625 /// `PolicyLayer` and `OtelLayer` from the policy / otel
626 /// crates satisfy both spines, so a single `.layer(...)` call
627 /// wraps the agent's complete dispatch fan-out:
628 ///
629 /// - one-shot: `Service<ModelInvocation, Response = ModelResponse>`
630 /// - streaming: `Service<StreamingModelInvocation, Response = ModelStream>`
631 /// - tool dispatch (separate, on `ToolRegistry::layer`):
632 /// `Service<ToolInvocation, Response = Value>`
633 ///
634 /// The streaming-side layer wraps the [`ModelStream`]'s
635 /// `completion` future so observability events (cost,
636 /// latency, span close) fire only on the `Ok` branch — a
637 /// stream that errors mid-flight surfaces the error and
638 /// emits no charge (invariant 12).
639 ///
640 /// Layers that legitimately apply only to the one-shot spine
641 /// (e.g. `RetryLayer` — retrying a partially-streamed
642 /// response is meaningless) implement a pass-through
643 /// `Layer<BoxedStreamingService>` so they satisfy the
644 /// constraint without affecting streaming dispatch.
645 ///
646 /// ## Introspection
647 ///
648 /// Every layer is recorded in [`Self::layer_names`] at compose
649 /// time via [`NamedLayer::layer_name`]. First-party entelix
650 /// layers (`PolicyLayer`, `OtelLayer`) implement
651 /// [`NamedLayer`] directly; external `tower::Layer` middleware
652 /// wraps through [`crate::service::WithName`] to participate:
653 ///
654 /// ```ignore
655 /// use entelix_core::WithName;
656 ///
657 /// chat_model
658 /// .layer(PolicyLayer::new(registry))
659 /// .layer(OtelLayer::new("anthropic"))
660 /// .layer(WithName::new("concurrency", tower::limit::ConcurrencyLimitLayer::new(10)));
661 /// // chat_model.layer_names() == ["policy", "otel", "concurrency"]
662 /// ```
663 #[must_use]
664 pub fn layer<L>(mut self, layer: L) -> Self
665 where
666 L: Layer<BoxedModelService>
667 + Layer<BoxedStreamingService>
668 + NamedLayer
669 + Clone
670 + Send
671 + Sync
672 + 'static,
673 <L as Layer<BoxedModelService>>::Service: Service<ModelInvocation, Response = ModelResponse, Error = Error>
674 + Clone
675 + Send
676 + 'static,
677 <<L as Layer<BoxedModelService>>::Service as Service<ModelInvocation>>::Future:
678 Send + 'static,
679 <L as Layer<BoxedStreamingService>>::Service: Service<StreamingModelInvocation, Response = ModelStream, Error = Error>
680 + Clone
681 + Send
682 + 'static,
683 <<L as Layer<BoxedStreamingService>>::Service as Service<StreamingModelInvocation>>::Future:
684 Send + 'static,
685 {
686 // Capture the static name BEFORE the type-erasing closure
687 // swallows `L`. The factory chain (Arc<dyn Fn(InnerChatModel)
688 // -> BoxedModelService>) discards concrete layer types, so
689 // introspection has to happen here at compose time.
690 self.layer_names.push(layer.layer_name());
691
692 let prev = self.factory.take();
693 let prev_streaming = self.streaming_factory.take();
694 let layer_one_shot = layer.clone();
695 let layer_streaming = layer;
696 let new_factory: LayerFactory<C, T> = Arc::new(move |inner: InnerChatModel<C, T>| {
697 let stacked: BoxedModelService = match &prev {
698 Some(prev_factory) => prev_factory(inner),
699 None => BoxCloneService::new(inner),
700 };
701 BoxCloneService::new(<L as Layer<BoxedModelService>>::layer(
702 &layer_one_shot,
703 stacked,
704 ))
705 });
706 let new_streaming: StreamingLayerFactory<C, T> =
707 Arc::new(move |inner: InnerChatModel<C, T>| {
708 let stacked: BoxedStreamingService = match &prev_streaming {
709 Some(prev_factory) => prev_factory(inner),
710 None => BoxCloneService::new(inner),
711 };
712 BoxCloneService::new(<L as Layer<BoxedStreamingService>>::layer(
713 &layer_streaming,
714 stacked,
715 ))
716 });
717 self.factory = Some(new_factory);
718 self.streaming_factory = Some(new_streaming);
719 self
720 }
721
722 /// Compose a `tower::Layer` whose only difference from a
723 /// first-party entelix layer is the missing [`NamedLayer`]
724 /// impl. Equivalent to `self.layer(WithName::new(name, layer))`
725 /// — the wrapper supplies the identity surfaced through
726 /// [`Self::layer_names`].
727 ///
728 /// Use this for external middleware (`tower::limit`,
729 /// `tower::timeout`, operator-defined wrappers) without
730 /// implementing [`NamedLayer`] yourself. First-party entelix
731 /// layers already implement [`NamedLayer`] and should reach for
732 /// [`Self::layer`] directly so the canonical role name
733 /// (e.g. `"policy"`, `"otel"`, `"retry"`) ships at the call
734 /// site.
735 #[must_use]
736 pub fn layer_named<L>(self, name: &'static str, layer: L) -> Self
737 where
738 L: Layer<BoxedModelService> + Layer<BoxedStreamingService> + Clone + Send + Sync + 'static,
739 <L as Layer<BoxedModelService>>::Service: Service<ModelInvocation, Response = ModelResponse, Error = Error>
740 + Clone
741 + Send
742 + 'static,
743 <<L as Layer<BoxedModelService>>::Service as Service<ModelInvocation>>::Future:
744 Send + 'static,
745 <L as Layer<BoxedStreamingService>>::Service: Service<StreamingModelInvocation, Response = ModelStream, Error = Error>
746 + Clone
747 + Send
748 + 'static,
749 <<L as Layer<BoxedStreamingService>>::Service as Service<StreamingModelInvocation>>::Future:
750 Send + 'static,
751 {
752 self.layer(crate::service::WithName::new(name, layer))
753 }
754
755 /// Diagnostic snapshot of the composed layer stack, in
756 /// registration order: `[0]` is the first `.layer(...)` call
757 /// (innermost, against the leaf `InnerChatModel`); the last
758 /// element is the most recent registration (outermost, sees
759 /// requests first). Empty for a `ChatModel` with no layers
760 /// composed.
761 ///
762 /// Surfaced for boot-time wiring assertions, debug dashboards,
763 /// and conditional-layer audits. The values are
764 /// [`NamedLayer::layer_name`] outputs — `&'static str` and
765 /// patch-version-stable per the trait's contract.
766 #[must_use]
767 pub fn layer_names(&self) -> &[&'static str] {
768 &self.layer_names
769 }
770
771 /// Borrow the configured codec — exposes its `name()` and
772 /// `capabilities()` for diagnostics.
773 pub fn codec(&self) -> &C {
774 self.inner.codec()
775 }
776
777 /// Borrow the configured transport.
778 pub fn transport(&self) -> &T {
779 self.inner.transport()
780 }
781
782 /// Borrow the configured request shape — `model()`,
783 /// `max_tokens()`, `system()`, `temperature()`, `top_p()`,
784 /// `stop_sequences()`, `tools()`, `tool_choice()` accessors all
785 /// live on [`ChatModelConfig`].
786 pub const fn config(&self) -> &ChatModelConfig {
787 &self.config
788 }
789
790 /// Build the layered [`BoxedModelService`] — used by callers
791 /// who want to drive the service directly (e.g. wrap with
792 /// further `tower::ServiceBuilder` middleware externally).
793 #[must_use]
794 pub fn service(&self) -> BoxedModelService {
795 match &self.factory {
796 Some(factory) => factory(self.inner.clone()),
797 None => BoxCloneService::new(self.inner.clone()),
798 }
799 }
800
801 /// Build the layered [`BoxedStreamingService`] — the streaming
802 /// counterpart to [`Self::service`]. Layers attached via
803 /// [`Self::layer`] wrap this spine the same way they wrap the
804 /// one-shot service; consumers driving the service directly
805 /// (rather than through [`Self::stream_deltas`]) drive
806 /// `Service<StreamingModelInvocation, Response = ModelStream>`.
807 #[must_use]
808 pub fn streaming_service(&self) -> BoxedStreamingService {
809 match &self.streaming_factory {
810 Some(factory) => factory(self.inner.clone()),
811 None => BoxCloneService::new(self.inner.clone()),
812 }
813 }
814
815 /// Send a conversation and return the assistant reply as a single
816 /// [`Message`]. The full pipeline routes through the layer
817 /// stack: each layer's pre-call work runs before encode, each
818 /// layer's post-call work runs after decode.
819 pub async fn complete(
820 &self,
821 messages: Vec<Message>,
822 ctx: &ExecutionContext,
823 ) -> Result<Message> {
824 let response = self.complete_full(messages, ctx).await?;
825 Ok(Message::new(Role::Assistant, response.content))
826 }
827
828 /// Same pipeline as [`Self::complete`], but returns the full
829 /// [`ModelResponse`] — usage, stop reason, codec warnings, and
830 /// the provider rate-limit snapshot when the codec could parse
831 /// one from the response headers.
832 pub async fn complete_full(
833 &self,
834 messages: Vec<Message>,
835 ctx: &ExecutionContext,
836 ) -> Result<ModelResponse> {
837 let budget = ctx.run_budget();
838 if let Some(budget) = &budget {
839 // Pre-call axes — request count cap. Token caps fire
840 // post-decode below.
841 budget.check_pre_request()?;
842 }
843 let mut request = self.config.build_request(messages);
844 apply_overrides(&mut request, ctx);
845 let invocation = ModelInvocation::new(request, ctx.clone());
846 let response = self.service().oneshot(invocation).await?;
847 if let Some(budget) = &budget {
848 // Post-call accumulation — invariant 12 transactional
849 // semantics: only on the `Ok` branch does the budget
850 // see usage. The `?` above ensures the error branch
851 // never reaches this line.
852 budget.observe_usage(&response.usage)?;
853 }
854 Ok(response)
855 }
856
857 /// Send a conversation and return a typed `O` parsed from the
858 /// model's structured-output channel.
859 ///
860 /// The codec emits a `response_format` directive carrying the
861 /// schemars-derived schema for `O`; the dispatch shape (native
862 /// JSON-Schema vs forced tool call) is the codec's
863 /// [`Codec::auto_output_strategy`] for the configured model
864 /// when [`crate::ir::OutputStrategy::Auto`] is selected — see
865 /// [`crate::ir::OutputStrategy`] and for the cross-vendor mapping.
866 /// Operators that need to override the strategy build their
867 /// own [`ResponseFormat`] via
868 /// [`ResponseFormat::with_strategy`] and attach it to the
869 /// request through a custom flow.
870 ///
871 /// On `Native` dispatch, the codec produces a single text
872 /// `ContentPart` whose body parses as `O`. On `Tool` dispatch,
873 /// the codec emits one forced `ContentPart::ToolUse` whose
874 /// `input` is the JSON object the model produced; this method
875 /// extracts the input and parses it as `O`.
876 ///
877 /// `O: JsonSchema + DeserializeOwned + Send + 'static` —
878 /// schemars derives the JSON Schema at call time (zero-cost
879 /// after the first call thanks to schemars' static schema
880 /// caching). Production operators that cache the schema across
881 /// many calls build the [`JsonSchemaSpec`] once and attach it
882 /// to the request directly (no `O` type parameter on the
883 /// caller side).
884 pub async fn complete_typed<O>(
885 &self,
886 messages: Vec<Message>,
887 ctx: &ExecutionContext,
888 ) -> Result<O>
889 where
890 O: schemars::JsonSchema + serde::de::DeserializeOwned + Send + 'static,
891 {
892 self.complete_typed_validated(messages, |_: &O| Ok(()), ctx)
893 .await
894 }
895
896 /// Send a conversation, parse the structured-output response as
897 /// `O`, and run `validator` against the parsed value.
898 ///
899 /// Both failure modes — schema-mismatch (the model emitted JSON
900 /// the deserialiser couldn't bind to `O`) and validator failure
901 /// (the deserialised value broke a semantic invariant) — route
902 /// through one channel: [`crate::Error::ModelRetry`]. The retry
903 /// loop catches the variant, reflects the hint to the model as
904 /// a corrective user message, and re-invokes within the same
905 /// [`ChatModelConfig::validation_retries`](crate::ChatModelConfig::validation_retries)
906 /// budget (invariant 20).
907 ///
908 /// [`Self::complete_typed`] is the no-validator shortcut — it
909 /// calls into this method with an always-`Ok` validator so the
910 /// schema-mismatch retry path stays uniform.
911 ///
912 /// The validator surface is sync ([`crate::OutputValidator::validate`])
913 /// so simple closures (`|out: &O| -> Result<()>`) compose
914 /// without `async-trait` ceremony. Validators that need to
915 /// `.await` (DB lookup, external check) compose around the
916 /// `complete_typed_validated` call boundary instead — run the
917 /// async work after the typed response returns.
918 pub async fn complete_typed_validated<O, V>(
919 &self,
920 messages: Vec<Message>,
921 validator: V,
922 ctx: &ExecutionContext,
923 ) -> Result<O>
924 where
925 O: schemars::JsonSchema + serde::de::DeserializeOwned + Send + 'static,
926 V: crate::output_validator::OutputValidator<O>,
927 {
928 use crate::llm_facing::RenderedForLlm;
929
930 let schema_value = serde_json::to_value(schemars::schema_for!(O)).map_err(Error::Serde)?;
931 let type_name = std::any::type_name::<O>();
932 // Strip the module path so the wire-side `name` is short
933 // and stable (`entelix_core::ir::request::ModelRequest` →
934 // `ModelRequest`); vendors that surface the name in
935 // observability ship a readable string.
936 let short_name = type_name.rsplit("::").next().unwrap_or(type_name);
937 let spec = JsonSchemaSpec::new(short_name, schema_value)?;
938 let format = ResponseFormat::strict(spec);
939
940 let mut conversation = messages;
941 let max_retries = self.config.validation_retries;
942 let mut attempt: u32 = 0;
943 loop {
944 let budget = ctx.run_budget();
945 if let Some(budget) = &budget {
946 budget.check_pre_request()?;
947 }
948 let mut request = self.config.build_request(conversation.clone());
949 apply_overrides(&mut request, ctx);
950 request.response_format = Some(format.clone());
951
952 let invocation = ModelInvocation::new(request, ctx.clone());
953 let response = self.service().oneshot(invocation).await?;
954 if let Some(budget) = &budget {
955 budget.observe_usage(&response.usage)?;
956 }
957 // Capture the assistant's reply text for the retry path.
958 // `parse_typed_response` consumes by value; clone the
959 // text-block content first so a parse failure can
960 // re-feed the model its own output as context.
961 let assistant_text = response_text_for_retry(&response);
962
963 // Both failure modes arrive as `Error::ModelRetry`:
964 // schema-mismatch is wrapped inside `parse_typed_response`
965 // at the parse site, and validator failures already
966 // construct `Error::ModelRetry` themselves. A shared
967 // budget governs both because they reflect the same
968 // condition — the model emitted output the harness
969 // cannot accept — and distinguishing them at the budget
970 // level would add knobs without buying behaviour
971 // operators commonly want to vary independently
972 // (invariant 20).
973 let retry_hint: RenderedForLlm<String> =
974 match parse_typed_response::<O>(short_name, response) {
975 Ok(value) => match validator.validate(&value) {
976 Ok(()) => return Ok(value),
977 Err(Error::ModelRetry { hint, .. }) => hint,
978 Err(err) => return Err(err),
979 },
980 Err(Error::ModelRetry { hint, .. }) => hint,
981 Err(err) => return Err(err),
982 };
983
984 if attempt >= max_retries {
985 return Err(Error::model_retry(retry_hint, attempt));
986 }
987 attempt += 1;
988
989 // Echo the assistant's failed turn into the conversation
990 // so the next call sees what it produced, then push the
991 // corrective user message carrying the rendered hint.
992 conversation.push(Message::new(
993 crate::ir::Role::Assistant,
994 vec![ContentPart::Text {
995 text: assistant_text.unwrap_or_default(),
996 cache_control: None,
997 provider_echoes: Vec::new(),
998 }],
999 ));
1000 conversation.push(Message::new(
1001 crate::ir::Role::User,
1002 vec![ContentPart::Text {
1003 text: retry_hint.into_inner(),
1004 cache_control: None,
1005 provider_echoes: Vec::new(),
1006 }],
1007 ));
1008 }
1009 }
1010
1011 /// Open a streaming model call and return an IR `StreamDelta`
1012 /// stream.
1013 ///
1014 /// Pipeline: `codec.encode_streaming` → `transport.send_streaming`
1015 /// → `codec.decode_stream` → `tap_aggregator`, all driven
1016 /// through the same `tower::Service` spine as
1017 /// [`Self::complete_full`]. Layers attached via
1018 /// [`Self::layer`] (e.g. `OtelLayer`, `PolicyLayer`) wrap
1019 /// the streaming dispatch the same way they wrap the one-shot
1020 /// dispatch; observability events (cost, span close) fire on
1021 /// the streaming-side completion future's `Ok` branch only —
1022 /// invariant 12.
1023 ///
1024 /// The returned [`ModelStream`] carries both the delta stream
1025 /// (consumer-visible) and a `completion` future that resolves
1026 /// to the aggregated [`ModelResponse`] after the consumer
1027 /// drains the stream. Layers wrap `completion` to gate
1028 /// observability emission on the `Ok` branch — a stream that
1029 /// errors mid-flight surfaces the error on both the consumer
1030 /// side and the completion future, and no charge fires.
1031 ///
1032 /// Codecs without true streaming support fall back to a
1033 /// single-shot pseudo-stream the same way they did before
1034 /// the spine refactor.
1035 pub async fn stream_deltas(
1036 &self,
1037 messages: Vec<Message>,
1038 ctx: &ExecutionContext,
1039 ) -> Result<ModelStream> {
1040 let budget = ctx.run_budget();
1041 if let Some(budget) = &budget {
1042 budget.check_pre_request()?;
1043 }
1044 let mut request = self.config.build_request(messages);
1045 apply_overrides(&mut request, ctx);
1046 let invocation = StreamingModelInvocation::new(ModelInvocation::new(request, ctx.clone()));
1047 let model_stream = self.streaming_service().oneshot(invocation).await?;
1048 let ModelStream { stream, completion } = model_stream;
1049 // Wrap completion to observe usage on the streaming-side
1050 // `Ok` branch — invariant 12 transactional semantics: a
1051 // stream that errors mid-flight resolves `completion` to
1052 // `Err` and never reaches the budget. Mirrors the
1053 // OtelLayer / PolicyLayer streaming wrap from G-1.
1054 let budget_for_completion = budget.clone();
1055 let user_facing = async move {
1056 let result = completion.await;
1057 if let (Ok(response), Some(budget)) = (&result, budget_for_completion.as_ref()) {
1058 budget.observe_usage(&response.usage)?;
1059 }
1060 result
1061 };
1062 Ok(ModelStream {
1063 stream,
1064 completion: Box::pin(user_facing),
1065 })
1066 }
1067
1068 /// Streaming sibling of [`Self::complete_typed`]. Returns a
1069 /// [`TypedModelStream<O>`] whose `stream` field exposes raw
1070 /// [`StreamDelta`]s (text fragments operators echo to the user
1071 /// during generation) and whose `completion` future resolves to
1072 /// the aggregated, parsed `O`.
1073 ///
1074 /// `response_format = ResponseFormat::strict(JsonSchemaSpec::for::<O>())`
1075 /// is set on the request so the model emits the typed JSON
1076 /// payload natively (Native strategy via `text` deltas) or
1077 /// through a `tool_use` block (Tool strategy). The aggregator
1078 /// behind [`Self::stream_deltas`] already collects both shapes
1079 /// into the final [`ModelResponse`]; `stream_typed` parses the
1080 /// completion the same way [`Self::complete_typed`] does.
1081 ///
1082 /// `O: JsonSchema + DeserializeOwned + Send + 'static` —
1083 /// schemars derives the JSON Schema at call time.
1084 ///
1085 /// # Streaming + retry tradeoff
1086 ///
1087 /// `stream_typed` does **not** retry on parse failure: by the
1088 /// time `completion` resolves the deltas have already been
1089 /// surfaced to the consumer, so re-invoking with a corrective
1090 /// hint would emit a divergent second stream. Operators wanting
1091 /// the [`ChatModelConfig::validation_retries`] loop call
1092 /// [`Self::complete_typed`] / [`Self::complete_typed_validated`]
1093 /// instead — a parse failure there is fully recoverable because
1094 /// no partial output was surfaced.
1095 ///
1096 /// A validator that needs to inspect the parsed `O` runs after
1097 /// `completion` resolves: `let value = stream.completion.await?;
1098 /// validator.validate(&value)?;`. The validator's `Err` does not
1099 /// flow back into the stream — it surfaces alongside the typed
1100 /// completion at the call site.
1101 pub async fn stream_typed<O>(
1102 &self,
1103 messages: Vec<Message>,
1104 ctx: &ExecutionContext,
1105 ) -> Result<TypedModelStream<O>>
1106 where
1107 O: schemars::JsonSchema + serde::de::DeserializeOwned + Send + 'static,
1108 {
1109 if self.config.validation_retries > 0 {
1110 // Surface the contract divergence — operators wiring
1111 // with_validation_retries on a ChatModel and then calling
1112 // stream_typed almost certainly expect the retry loop to
1113 // also cover the streaming path. records the
1114 // design decision (no-retry on streaming because deltas
1115 // were already surfaced to the consumer); a debug log
1116 // makes the silent ignore visible at run time.
1117 tracing::debug!(
1118 validation_retries = self.config.validation_retries,
1119 "ChatModel::stream_typed ignores validation_retries — \
1120 streaming + retry would emit a divergent second stream \
1121 over already-surfaced deltas. Use complete_typed for \
1122 the unified retry budget."
1123 );
1124 }
1125 let schema_value = serde_json::to_value(schemars::schema_for!(O)).map_err(Error::Serde)?;
1126 let type_name = std::any::type_name::<O>();
1127 let short_name = type_name.rsplit("::").next().unwrap_or(type_name);
1128 let spec = JsonSchemaSpec::new(short_name, schema_value)?;
1129 let format = ResponseFormat::strict(spec);
1130
1131 let budget = ctx.run_budget();
1132 if let Some(budget) = &budget {
1133 budget.check_pre_request()?;
1134 }
1135 let mut request = self.config.build_request(messages);
1136 apply_overrides(&mut request, ctx);
1137 request.response_format = Some(format);
1138
1139 let invocation = StreamingModelInvocation::new(ModelInvocation::new(request, ctx.clone()));
1140 let model_stream = self.streaming_service().oneshot(invocation).await?;
1141 let ModelStream { stream, completion } = model_stream;
1142
1143 let budget_for_completion = budget.clone();
1144 let short_name_owned = short_name.to_owned();
1145 let typed_completion = async move {
1146 let response = completion.await?;
1147 if let Some(budget) = &budget_for_completion {
1148 budget.observe_usage(&response.usage)?;
1149 }
1150 parse_typed_response::<O>(&short_name_owned, response)
1151 };
1152 Ok(TypedModelStream {
1153 stream,
1154 completion: Box::pin(typed_completion),
1155 })
1156 }
1157}
1158
1159/// Streaming counterpart to a `complete_typed` call — raw deltas on
1160/// `stream`, the aggregated typed payload on `completion`.
1161///
1162/// The `stream` field is identical to [`ModelStream::stream`]: raw
1163/// [`StreamDelta`]s the consumer surfaces to the user (text
1164/// fragments, thinking deltas, …) as the model emits them. The
1165/// `completion` future resolves to the typed `O` parsed from the
1166/// aggregated final response — operators await it once the stream
1167/// has been drained.
1168///
1169/// Mirrors [`crate::service::ModelStream`] for the typed-output
1170/// path; the underlying aggregator is the same `tap_aggregator`
1171/// used by [`ChatModel::stream_deltas`].
1172pub struct TypedModelStream<O> {
1173 /// Raw delta stream surfaced to the consumer.
1174 pub stream: BoxDeltaStream<'static>,
1175 /// Future resolving to the typed `O` after the stream has been
1176 /// consumed to its terminal `Stop`. Drops the `TypedModelStream`
1177 /// before draining the stream to surface an `Err` on completion.
1178 pub completion: BoxFuture<'static, Result<O>>,
1179}
1180
1181impl<O> std::fmt::Debug for TypedModelStream<O> {
1182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1183 f.debug_struct("TypedModelStream")
1184 .field("stream", &"<BoxDeltaStream>")
1185 .field(
1186 "completion",
1187 &format_args!("<BoxFuture<Result<{}>>>", std::any::type_name::<O>()),
1188 )
1189 .finish()
1190 }
1191}
1192
1193impl<C: Codec + 'static, T: Transport + 'static> std::fmt::Debug for ChatModel<C, T> {
1194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1195 f.debug_struct("ChatModel")
1196 .field("model", &self.config.model)
1197 .field("codec", &self.codec().name())
1198 .field("transport", &self.transport().name())
1199 .field("layers", &self.layer_names)
1200 .finish()
1201 }
1202}
1203
1204/// Parse a [`ModelResponse`] produced by a `complete_typed`
1205/// dispatch into the operator's typed `O`. The two dispatch
1206/// shapes both surface the JSON object the model produced —
1207/// `Native` strategy lands as a single text content part whose
1208/// body is the JSON document; `Tool` strategy lands as a single
1209/// `ContentPart::ToolUse` whose `input` is the JSON object. The
1210/// helper tries the tool path first (Anthropic / Bedrock-Anthropic
1211/// default), then falls through to the text path (OpenAI /
1212/// Gemini default).
1213// `response` is taken by value to express ownership of the
1214// response payload — the function consumes the response's
1215// stop reason / warnings semantics from the caller's
1216// perspective even though only `content` is read. Clippy's
1217// `needless_pass_by_value` would have us borrow, but a borrow
1218// would let the caller continue to inspect `response.warnings`
1219// (etc.) after `parse_typed_response` succeeded, masking the
1220// "this response has been consumed" intent the typed surface
1221// communicates.
1222/// Concatenate every `ContentPart::Text` block in `response` for
1223/// the retry-path conversation echo. Returns `None` when the model
1224/// surfaced no textual content (in that case the retry loop seeds
1225/// the assistant turn with an empty string — the diagnostic message
1226/// alone is enough context for the model to course-correct).
1227fn response_text_for_retry(response: &ModelResponse) -> Option<String> {
1228 let mut out = String::new();
1229 for part in &response.content {
1230 if let ContentPart::Text { text, .. } = part {
1231 out.push_str(text);
1232 }
1233 }
1234 if out.is_empty() { None } else { Some(out) }
1235}
1236
1237/// Parse a structured-output response into `O`, wrapping any
1238/// schema-mismatch directly into [`Error::ModelRetry`] at the parse
1239/// site so the unified retry loop never sees a raw [`Error::Serde`]
1240/// for schema-mismatch failures (invariant 20). The hint flows through
1241/// the [`crate::llm_facing::RenderedForLlm`] funnel so the model
1242/// receives a corrective message routed by the operator (invariant 16).
1243///
1244/// [`Error::invalid_request`] still surfaces unchanged when the
1245/// response carried no parseable content — that is an `OutputStrategy`
1246/// configuration error, not a retry condition.
1247#[allow(clippy::needless_pass_by_value)]
1248fn parse_typed_response<O>(short_name: &str, response: ModelResponse) -> Result<O>
1249where
1250 O: serde::de::DeserializeOwned,
1251{
1252 use crate::llm_facing::LlmRenderable;
1253 let wrap = |e: serde_json::Error| -> Error {
1254 Error::model_retry(schema_mismatch_diagnostic(short_name, &e).for_llm(), 0)
1255 };
1256 for part in &response.content {
1257 if let ContentPart::ToolUse { input, .. } = part {
1258 return serde_json::from_value(input.clone()).map_err(wrap);
1259 }
1260 }
1261 for part in &response.content {
1262 if let ContentPart::Text { text, .. } = part {
1263 return serde_json::from_str(text).map_err(wrap);
1264 }
1265 }
1266 Err(Error::invalid_request(
1267 "complete_typed: model response carried neither a `tool_use` block nor a text \
1268 block — the configured `OutputStrategy` did not produce typed output",
1269 ))
1270}
1271
1272/// Render a schema-mismatch parse error into a model-actionable hint.
1273/// Strips `serde_json::Error`'s trailing `at line N column M` position
1274/// noise — line / column offsets reference the raw bytes the parser
1275/// scanned and cannot help the model correct its output, but they leak
1276/// internal parser state into the LLM channel (invariant 16).
1277///
1278/// Schema-mismatch and validator-driven retries converge on the
1279/// returned text wrapped in the `RenderedForLlm` carrier (invariant 20).
1280fn schema_mismatch_diagnostic(short_name: &str, err: &serde_json::Error) -> String {
1281 let raw = err.to_string();
1282 let trimmed = raw
1283 .split(" at line ")
1284 .next()
1285 .unwrap_or(raw.as_str())
1286 .trim_end_matches('.');
1287 format!(
1288 "Your previous response did not match the required JSON schema for `{short_name}`. \
1289 Parser diagnostic: {trimmed}.\n\
1290 Re-emit the response as a single valid JSON object that conforms to the schema."
1291 )
1292}
1293
1294// ── Provider shortcuts ────────────────────────────────────────────
1295//
1296// One-call constructors that bundle the codec + transport + auth
1297// setup for the three first-party direct-API providers. These
1298// trade flexibility for ergonomics: callers wanting custom base
1299// URLs, per-call retry policies, or alternative auth shapes
1300// continue to use [`ChatModel::new`] with the explicit components.
1301//
1302// Why these three: they ship in `entelix-core` as built-in codecs
1303// against `DirectTransport`. Cloud transports (Bedrock, Vertex,
1304// Foundry) live in `entelix-cloud` and get their own shortcuts in
1305// that crate to avoid pulling AWS / GCP / Azure SDKs into the
1306// always-on dependency tree.
1307//
1308// Per Invariant 10, no credential ever lands on
1309// [`ExecutionContext`]; the API keys are wrapped in
1310// [`secrecy::SecretString`] and held by the credential provider,
1311// then read by the transport at request time.
1312
1313use crate::auth::{ApiKeyProvider, BearerProvider};
1314use crate::codecs::{AnthropicMessagesCodec, GeminiCodec, OpenAiChatCodec};
1315use crate::transports::DirectTransport;
1316use secrecy::SecretString;
1317
1318impl ChatModel<AnthropicMessagesCodec, DirectTransport> {
1319 /// One-call construction against `https://api.anthropic.com`.
1320 /// Bundles [`AnthropicMessagesCodec`] + [`DirectTransport`] +
1321 /// [`ApiKeyProvider`]. Mirrors LangChain's `ChatAnthropic(api_key=…)`
1322 /// surface so the 5-line agent path is achievable without
1323 /// hand-wiring four components.
1324 ///
1325 /// Returns [`Error::Config`] if the underlying HTTP client
1326 /// cannot be initialised.
1327 pub fn anthropic(api_key: impl Into<SecretString>, model: impl Into<String>) -> Result<Self> {
1328 let credentials = Arc::new(ApiKeyProvider::anthropic(api_key));
1329 let transport = DirectTransport::anthropic(credentials)?;
1330 Ok(Self::new(AnthropicMessagesCodec::new(), transport, model))
1331 }
1332}
1333
1334impl ChatModel<OpenAiChatCodec, DirectTransport> {
1335 /// One-call construction against `https://api.openai.com`.
1336 /// Bundles [`OpenAiChatCodec`] + [`DirectTransport`] +
1337 /// [`BearerProvider`].
1338 pub fn openai(api_key: impl Into<SecretString>, model: impl Into<String>) -> Result<Self> {
1339 let credentials = Arc::new(BearerProvider::new(api_key));
1340 let transport = DirectTransport::openai(credentials)?;
1341 Ok(Self::new(OpenAiChatCodec::new(), transport, model))
1342 }
1343}
1344
1345impl ChatModel<GeminiCodec, DirectTransport> {
1346 /// One-call construction against
1347 /// `https://generativelanguage.googleapis.com`. Bundles
1348 /// [`GeminiCodec`] + [`DirectTransport`] + [`BearerProvider`].
1349 pub fn gemini(api_key: impl Into<SecretString>, model: impl Into<String>) -> Result<Self> {
1350 let credentials = Arc::new(BearerProvider::new(api_key));
1351 let transport = DirectTransport::gemini(credentials)?;
1352 Ok(Self::new(GeminiCodec::new(), transport, model))
1353 }
1354}