Skip to main content

entelix_core/codecs/
codec.rs

1//! `Codec` trait and `EncodedRequest` — the IR ⇄ wire boundary.
2//!
3//! `Codec` is intentionally narrow: it converts `ModelRequest`s into
4//! provider-shaped HTTP payloads and decodes the responses back into IR.
5//! Streaming is a first-class concern through `decode_stream` —
6//! incremental byte chunks become `StreamDelta` events, with the codec
7//! owning its own parser state machine (SSE event parser, NDJSON line
8//! splitter, AWS event-stream binary frame, etc.).
9
10use std::pin::Pin;
11
12use bytes::Bytes;
13use futures::Stream;
14
15use crate::error::Result;
16use crate::ir::{Capabilities, ModelRequest, ModelResponse, ModelWarning, OutputStrategy};
17use crate::rate_limit::RateLimitSnapshot;
18use crate::stream::StreamDelta;
19
20/// Boxed byte-chunk stream produced by a `Transport` and consumed by a
21/// `Codec` during streaming.
22pub type BoxByteStream<'a> = Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + 'a>>;
23
24/// Boxed `StreamDelta` stream produced by `Codec::decode_stream`.
25pub type BoxDeltaStream<'a> = Pin<Box<dyn Stream<Item = Result<StreamDelta>> + Send + 'a>>;
26
27/// Bytes the transport will send to the provider plus the metadata the codec
28/// learned during encoding.
29///
30/// `path` is the URL path the transport appends to its base URL (e.g.
31/// `/v1/messages` for Anthropic). `headers` carries codec-required fields
32/// like `content-type` and `anthropic-version`; transports add credentials
33/// at send time.
34#[derive(Clone, Debug)]
35pub struct EncodedRequest {
36    /// HTTP method. POST for every Phase-1 codec; pre-set as a forward hint.
37    pub method: http::Method,
38    /// URL path appended to the transport's base URL.
39    pub path: String,
40    /// Vendor-required HTTP headers (NOT credentials — transport adds those).
41    pub headers: http::HeaderMap,
42    /// JSON body bytes.
43    pub body: Bytes,
44    /// `true` if this request was produced by `encode_streaming` and the
45    /// transport should stream the response body (e.g. open an SSE
46    /// connection). Transports treat unset / `false` as a regular
47    /// request/response.
48    pub streaming: bool,
49    /// Non-fatal warnings the codec produced during encoding. Carried into
50    /// `ModelResponse::warnings` after the call returns.
51    pub warnings: Vec<ModelWarning>,
52}
53
54impl EncodedRequest {
55    /// Build a POST request with the given path and JSON body.
56    pub fn post_json(path: impl Into<String>, body: Bytes) -> Self {
57        let mut headers = http::HeaderMap::new();
58        headers.insert(
59            http::header::CONTENT_TYPE,
60            http::HeaderValue::from_static("application/json"),
61        );
62        Self {
63            method: http::Method::POST,
64            path: path.into(),
65            headers,
66            body,
67            streaming: false,
68            warnings: Vec::new(),
69        }
70    }
71
72    /// Mark this request as streaming-shaped. Codecs call this from
73    /// `encode_streaming` after appending any vendor-specific headers
74    /// (e.g. `Accept: text/event-stream`).
75    #[must_use]
76    pub const fn into_streaming(mut self) -> Self {
77        self.streaming = true;
78        self
79    }
80}
81
82/// Stateless encoder/decoder for ONE provider wire format.
83///
84/// A `Codec` knows nothing about HTTP, auth, or retries. It turns IR into
85/// bytes and bytes into IR. Streaming uses the same trait — `decode_stream`
86/// owns the codec's parser state machine.
87pub trait Codec: Send + Sync + 'static {
88    /// Stable codec identifier — `"anthropic-messages"`,
89    /// `"openai-chat"`, etc. Used in logs and metrics tags.
90    fn name(&self) -> &'static str;
91
92    /// Capability surface the codec advertises for the given model. Codecs
93    /// vary by model (small models lacking vision, etc.).
94    fn capabilities(&self, model: &str) -> Capabilities;
95
96    /// Resolve [`OutputStrategy::Auto`] to the codec's preferred
97    /// dispatch shape for `model`. Called once at codec-construction
98    /// time per request — never per-delta or per-retry, so the
99    /// resolved strategy is part of the SessionGraph event log's
100    /// deterministic-replay surface.
101    ///
102    /// Default returns [`OutputStrategy::Native`] — most codecs ship
103    /// vendor-native structured output (OpenAI Responses
104    /// `text.format=json_schema`, Gemini `responseJsonSchema`,
105    /// Bedrock Anthropic-passthrough). Codecs whose native channel
106    /// is newer / less mature than the tool-call surface
107    /// (Anthropic Messages today — `output_config` ships without
108    /// a strict toggle) override to [`OutputStrategy::Tool`].
109    fn auto_output_strategy(&self, _model: &str) -> OutputStrategy {
110        OutputStrategy::Native
111    }
112
113    /// Encode IR → wire body for a one-shot (non-streaming) call.
114    /// Implementors push warnings onto the returned
115    /// `EncodedRequest::warnings` for any IR field they had to drop or
116    /// coerce.
117    fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest>;
118
119    /// Decode wire body → IR. `warnings_in` are the encode-time warnings
120    /// that should be carried forward into `ModelResponse::warnings` so the
121    /// caller sees the full advisory list in one place.
122    fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse>;
123
124    /// Extract a [`RateLimitSnapshot`] from response headers, if the
125    /// vendor exposes rate-limit state in headers. Default returns
126    /// `None` — codecs whose providers publish rate-limit headers
127    /// override this and parse them.
128    fn extract_rate_limit(&self, _headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
129        None
130    }
131
132    /// Encode IR → wire body for a streaming call. Default impl delegates
133    /// to `encode` and marks the request as streaming; codecs that need a
134    /// different body shape (e.g. `stream: true` field) or extra headers
135    /// (e.g. `Accept: text/event-stream`) override.
136    fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
137        Ok(self.encode(request)?.into_streaming())
138    }
139
140    /// Decode an incremental byte stream → IR `StreamDelta` stream.
141    ///
142    /// Implementors own their parser state machine — Anthropic walks SSE
143    /// events, `OpenAI` splits `data:` lines, Gemini reads NDJSON, Bedrock
144    /// parses AWS event-stream frames. Default impl is a graceful
145    /// fallback: collects every chunk, runs `decode` once at the end, and
146    /// emits the resulting `ModelResponse` as a single
147    /// `StreamDelta::Stop`. Concrete codecs replace it as soon as they
148    /// support real token-level streaming.
149    #[allow(tail_expr_drop_order)]
150    fn decode_stream<'a>(
151        &'a self,
152        bytes: BoxByteStream<'a>,
153        warnings_in: Vec<ModelWarning>,
154    ) -> BoxDeltaStream<'a> {
155        Box::pin(async_stream::stream! {
156            let mut buf: Vec<u8> = Vec::new();
157            let mut bytes = bytes;
158            while let Some(chunk) = futures::StreamExt::next(&mut bytes).await {
159                let chunk = match chunk {
160                    Ok(b) => b,
161                    Err(e) => {
162                        yield Err(e);
163                        return;
164                    }
165                };
166                buf.extend_from_slice(&chunk);
167            }
168            let response = match self.decode(&buf, warnings_in) {
169                Ok(r) => r,
170                Err(e) => {
171                    yield Err(e);
172                    return;
173                }
174            };
175            for delta in deltas_from_response(&response) {
176                yield Ok(delta);
177            }
178        })
179    }
180}
181
182/// Wire literal for the OpenAI `service_tier` request field. Shared
183/// between `OpenAiChatCodec` and `OpenAiResponsesCodec` so the
184/// rendered string matches the documented enum exactly across both
185/// endpoints.
186pub fn service_tier_str(tier: crate::ir::ServiceTier) -> &'static str {
187    match tier {
188        crate::ir::ServiceTier::Auto => "auto",
189        crate::ir::ServiceTier::Default => "default",
190        crate::ir::ServiceTier::Flex => "flex",
191        crate::ir::ServiceTier::Priority => "priority",
192        crate::ir::ServiceTier::Scale => "scale",
193    }
194}
195
196/// Shared OpenAI-style rate-limit header parser. Used by both
197/// `OpenAiChatCodec` and `OpenAiResponsesCodec` because the
198/// `x-ratelimit-*` family is identical across the two endpoints.
199pub fn extract_openai_rate_limit(headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
200    let mut snapshot = RateLimitSnapshot::default();
201    let mut populated = false;
202    let pairs: [(&str, &mut Option<u64>); 2] = [
203        (
204            "x-ratelimit-remaining-requests",
205            &mut snapshot.requests_remaining,
206        ),
207        (
208            "x-ratelimit-remaining-tokens",
209            &mut snapshot.tokens_remaining,
210        ),
211    ];
212    for (header_name, target) in pairs {
213        if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
214            && let Ok(parsed) = v.parse::<u64>()
215        {
216            *target = Some(parsed);
217            snapshot.raw.insert(header_name.to_owned(), v.to_owned());
218            populated = true;
219        }
220    }
221    // OpenAI emits reset deltas as durations (e.g. `6m0s`); preserve the
222    // raw value for diagnostics rather than parsing into DateTime — vendor
223    // doesn't surface absolute reset timestamps.
224    for header_name in ["x-ratelimit-reset-requests", "x-ratelimit-reset-tokens"] {
225        if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok()) {
226            snapshot.raw.insert(header_name.to_owned(), v.to_owned());
227            populated = true;
228        }
229    }
230    populated.then_some(snapshot)
231}
232
233fn deltas_from_response(response: &ModelResponse) -> Vec<StreamDelta> {
234    use crate::ir::ContentPart;
235
236    let mut deltas = Vec::new();
237    deltas.push(StreamDelta::Start {
238        id: response.id.clone(),
239        model: response.model.clone(),
240        provider_echoes: Vec::new(),
241    });
242    for part in &response.content {
243        match part {
244            ContentPart::Text {
245                text,
246                provider_echoes,
247                ..
248            } => {
249                deltas.push(StreamDelta::TextDelta {
250                    text: text.clone(),
251                    provider_echoes: provider_echoes.clone(),
252                });
253            }
254            ContentPart::ToolUse {
255                id,
256                name,
257                input,
258                provider_echoes,
259            } => {
260                deltas.push(StreamDelta::ToolUseStart {
261                    id: id.clone(),
262                    name: name.clone(),
263                    provider_echoes: provider_echoes.clone(),
264                });
265                deltas.push(StreamDelta::ToolUseInputDelta {
266                    partial_json: input.to_string(),
267                });
268                deltas.push(StreamDelta::ToolUseStop);
269            }
270            ContentPart::Thinking {
271                text,
272                provider_echoes,
273                ..
274            } => {
275                deltas.push(StreamDelta::ThinkingDelta {
276                    text: text.clone(),
277                    provider_echoes: provider_echoes.clone(),
278                });
279            }
280            // Multimodal inputs, citations, tool results never originate
281            // from the model on the assistant streaming path — text /
282            // thinking / tool_use / image_output / audio_output are the
283            // assistant-emitted shapes. Output media on the synthetic
284            // streaming fallback rides through `StreamDelta::Warning`
285            // (codecs that natively support multimodal output emit
286            // their own delta in the per-codec stream parser).
287            ContentPart::Image { .. }
288            | ContentPart::Audio { .. }
289            | ContentPart::Video { .. }
290            | ContentPart::Document { .. }
291            | ContentPart::RedactedThinking { .. }
292            | ContentPart::Citation { .. }
293            | ContentPart::ToolResult { .. }
294            | ContentPart::ImageOutput { .. }
295            | ContentPart::AudioOutput { .. } => {}
296        }
297    }
298    deltas.push(StreamDelta::Usage(response.usage.clone()));
299    for w in &response.warnings {
300        deltas.push(StreamDelta::Warning(w.clone()));
301    }
302    deltas.push(StreamDelta::Stop {
303        stop_reason: response.stop_reason.clone(),
304    });
305    deltas
306}
307
308/// Parse a wire-format response body into a `serde_json::Value`,
309/// wrapping the underlying serde error with operator-actionable
310/// context: the codec's name, the response body size, and a
311/// truncated peek at the first ~200 bytes. The bare
312/// `serde_json::Error` ("expected value at line 4 column 12") is
313/// useless for triage — operators need to know which provider
314/// returned the body and roughly what shape it had.
315///
316/// Used by every codec's `decode` entry point so the error story is
317/// uniform across Anthropic / OpenAI Chat / OpenAI Responses /
318/// Gemini / Bedrock Converse paths.
319pub(super) fn parse_response_body(
320    body: &[u8],
321    codec_name: &'static str,
322) -> Result<serde_json::Value> {
323    serde_json::from_slice(body).map_err(|e| {
324        const PEEK_BYTES: usize = 200;
325        let peek_end = peek_at_char_boundary(body, PEEK_BYTES);
326        let peek = body.get(..peek_end).map_or_else(String::new, |slice| {
327            String::from_utf8_lossy(slice).into_owned()
328        });
329        let suffix = if body.len() > PEEK_BYTES { "…" } else { "" };
330        crate::error::Error::provider_network(format!(
331            "{codec_name} codec failed to decode response: {e}; \
332             body was {} bytes; first {peek_end} bytes: {peek:?}{suffix} \
333             — the response did not parse as JSON; the upstream may have \
334             returned an HTML error page, a truncated body, or a wire \
335             format the codec does not yet understand",
336            body.len(),
337        ))
338    })
339}
340
341/// Find the largest `cut <= max` such that `body[..cut]` is valid
342/// UTF-8 — non-ASCII bodies produce a `%` placeholder in the
343/// middle of a multi-byte codepoint otherwise.
344fn peek_at_char_boundary(body: &[u8], max: usize) -> usize {
345    let mut cut = max.min(body.len());
346    while cut > 0
347        && body
348            .get(..cut)
349            .is_some_and(|slice| std::str::from_utf8(slice).is_err())
350    {
351        cut -= 1;
352    }
353    cut
354}