entelix_core/codecs/codec.rs
1//! `Codec` trait and `EncodedRequest` — the IR ⇄ wire boundary.
2//!
3//! `Codec` is intentionally narrow: it converts `ModelRequest`s into
4//! provider-shaped HTTP payloads and decodes the responses back into IR.
5//! Streaming is a first-class concern through `decode_stream` —
6//! incremental byte chunks become `StreamDelta` events, with the codec
7//! owning its own parser state machine (SSE event parser, NDJSON line
8//! splitter, AWS event-stream binary frame, etc.).
9
10use std::pin::Pin;
11
12use bytes::Bytes;
13use futures::Stream;
14
15use crate::error::Result;
16use crate::ir::{Capabilities, ModelRequest, ModelResponse, ModelWarning, OutputStrategy};
17use crate::rate_limit::RateLimitSnapshot;
18use crate::stream::StreamDelta;
19
20/// Boxed byte-chunk stream produced by a `Transport` and consumed by a
21/// `Codec` during streaming.
22pub type BoxByteStream<'a> = Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + 'a>>;
23
24/// Boxed `StreamDelta` stream produced by `Codec::decode_stream`.
25pub type BoxDeltaStream<'a> = Pin<Box<dyn Stream<Item = Result<StreamDelta>> + Send + 'a>>;
26
27/// Bytes the transport will send to the provider plus the metadata the codec
28/// learned during encoding.
29///
30/// `path` is the URL path the transport appends to its base URL (e.g.
31/// `/v1/messages` for Anthropic). `headers` carries codec-required fields
32/// like `content-type` and `anthropic-version`; transports add credentials
33/// at send time.
34#[derive(Clone, Debug)]
35pub struct EncodedRequest {
36 /// HTTP method. POST for every Phase-1 codec; pre-set as a forward hint.
37 pub method: http::Method,
38 /// URL path appended to the transport's base URL.
39 pub path: String,
40 /// Vendor-required HTTP headers (NOT credentials — transport adds those).
41 pub headers: http::HeaderMap,
42 /// JSON body bytes.
43 pub body: Bytes,
44 /// `true` if this request was produced by `encode_streaming` and the
45 /// transport should stream the response body (e.g. open an SSE
46 /// connection). Transports treat unset / `false` as a regular
47 /// request/response.
48 pub streaming: bool,
49 /// Non-fatal warnings the codec produced during encoding. Carried into
50 /// `ModelResponse::warnings` after the call returns.
51 pub warnings: Vec<ModelWarning>,
52}
53
54impl EncodedRequest {
55 /// Build a POST request with the given path and JSON body.
56 pub fn post_json(path: impl Into<String>, body: Bytes) -> Self {
57 let mut headers = http::HeaderMap::new();
58 headers.insert(
59 http::header::CONTENT_TYPE,
60 http::HeaderValue::from_static("application/json"),
61 );
62 Self {
63 method: http::Method::POST,
64 path: path.into(),
65 headers,
66 body,
67 streaming: false,
68 warnings: Vec::new(),
69 }
70 }
71
72 /// Mark this request as streaming-shaped. Codecs call this from
73 /// `encode_streaming` after appending any vendor-specific headers
74 /// (e.g. `Accept: text/event-stream`).
75 #[must_use]
76 pub const fn into_streaming(mut self) -> Self {
77 self.streaming = true;
78 self
79 }
80}
81
82/// Stateless encoder/decoder for ONE provider wire format.
83///
84/// A `Codec` knows nothing about HTTP, auth, or retries. It turns IR into
85/// bytes and bytes into IR. Streaming uses the same trait — `decode_stream`
86/// owns the codec's parser state machine.
87pub trait Codec: Send + Sync + 'static {
88 /// Stable codec identifier — `"anthropic-messages"`,
89 /// `"openai-chat"`, etc. Used in logs and metrics tags.
90 fn name(&self) -> &'static str;
91
92 /// Capability surface the codec advertises for the given model. Codecs
93 /// vary by model (small models lacking vision, etc.).
94 fn capabilities(&self, model: &str) -> Capabilities;
95
96 /// Resolve [`OutputStrategy::Auto`] to the codec's preferred
97 /// dispatch shape for `model`. Called once at codec-construction
98 /// time per request — never per-delta or per-retry, so the
99 /// resolved strategy is part of the SessionGraph event log's
100 /// deterministic-replay surface.
101 ///
102 /// Default returns [`OutputStrategy::Native`] — most codecs ship
103 /// vendor-native structured output (OpenAI Responses
104 /// `text.format=json_schema`, Gemini `responseJsonSchema`,
105 /// Bedrock Anthropic-passthrough). Codecs whose native channel
106 /// is newer / less mature than the tool-call surface
107 /// (Anthropic Messages today — `output_config` ships without
108 /// a strict toggle) override to [`OutputStrategy::Tool`].
109 fn auto_output_strategy(&self, _model: &str) -> OutputStrategy {
110 OutputStrategy::Native
111 }
112
113 /// Encode IR → wire body for a one-shot (non-streaming) call.
114 /// Implementors push warnings onto the returned
115 /// `EncodedRequest::warnings` for any IR field they had to drop or
116 /// coerce.
117 fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest>;
118
119 /// Decode wire body → IR. `warnings_in` are the encode-time warnings
120 /// that should be carried forward into `ModelResponse::warnings` so the
121 /// caller sees the full advisory list in one place.
122 fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse>;
123
124 /// Extract a [`RateLimitSnapshot`] from response headers, if the
125 /// vendor exposes rate-limit state in headers. Default returns
126 /// `None` — codecs whose providers publish rate-limit headers
127 /// override this and parse them.
128 fn extract_rate_limit(&self, _headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
129 None
130 }
131
132 /// Encode IR → wire body for a streaming call. Default impl delegates
133 /// to `encode` and marks the request as streaming; codecs that need a
134 /// different body shape (e.g. `stream: true` field) or extra headers
135 /// (e.g. `Accept: text/event-stream`) override.
136 fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
137 Ok(self.encode(request)?.into_streaming())
138 }
139
140 /// Decode an incremental byte stream → IR `StreamDelta` stream.
141 ///
142 /// Implementors own their parser state machine — Anthropic walks SSE
143 /// events, `OpenAI` splits `data:` lines, Gemini reads NDJSON, Bedrock
144 /// parses AWS event-stream frames. Default impl is a graceful
145 /// fallback: collects every chunk, runs `decode` once at the end, and
146 /// emits the resulting `ModelResponse` as a single
147 /// `StreamDelta::Stop`. Concrete codecs replace it as soon as they
148 /// support real token-level streaming.
149 #[allow(tail_expr_drop_order)]
150 fn decode_stream<'a>(
151 &'a self,
152 bytes: BoxByteStream<'a>,
153 warnings_in: Vec<ModelWarning>,
154 ) -> BoxDeltaStream<'a> {
155 Box::pin(async_stream::stream! {
156 let mut buf: Vec<u8> = Vec::new();
157 let mut bytes = bytes;
158 while let Some(chunk) = futures::StreamExt::next(&mut bytes).await {
159 let chunk = match chunk {
160 Ok(b) => b,
161 Err(e) => {
162 yield Err(e);
163 return;
164 }
165 };
166 buf.extend_from_slice(&chunk);
167 }
168 let response = match self.decode(&buf, warnings_in) {
169 Ok(r) => r,
170 Err(e) => {
171 yield Err(e);
172 return;
173 }
174 };
175 for delta in deltas_from_response(&response) {
176 yield Ok(delta);
177 }
178 })
179 }
180}
181
182/// Wire literal for the OpenAI `service_tier` request field. Shared
183/// between `OpenAiChatCodec` and `OpenAiResponsesCodec` so the
184/// rendered string matches the documented enum exactly across both
185/// endpoints.
186pub fn service_tier_str(tier: crate::ir::ServiceTier) -> &'static str {
187 match tier {
188 crate::ir::ServiceTier::Auto => "auto",
189 crate::ir::ServiceTier::Default => "default",
190 crate::ir::ServiceTier::Flex => "flex",
191 crate::ir::ServiceTier::Priority => "priority",
192 crate::ir::ServiceTier::Scale => "scale",
193 }
194}
195
196/// Shared OpenAI-style rate-limit header parser. Used by both
197/// `OpenAiChatCodec` and `OpenAiResponsesCodec` because the
198/// `x-ratelimit-*` family is identical across the two endpoints.
199pub fn extract_openai_rate_limit(headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
200 let mut snapshot = RateLimitSnapshot::default();
201 let mut populated = false;
202 let pairs: [(&str, &mut Option<u64>); 2] = [
203 (
204 "x-ratelimit-remaining-requests",
205 &mut snapshot.requests_remaining,
206 ),
207 (
208 "x-ratelimit-remaining-tokens",
209 &mut snapshot.tokens_remaining,
210 ),
211 ];
212 for (header_name, target) in pairs {
213 if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok())
214 && let Ok(parsed) = v.parse::<u64>()
215 {
216 *target = Some(parsed);
217 snapshot.raw.insert(header_name.to_owned(), v.to_owned());
218 populated = true;
219 }
220 }
221 // OpenAI emits reset deltas as durations (e.g. `6m0s`); preserve the
222 // raw value for diagnostics rather than parsing into DateTime — vendor
223 // doesn't surface absolute reset timestamps.
224 for header_name in ["x-ratelimit-reset-requests", "x-ratelimit-reset-tokens"] {
225 if let Some(v) = headers.get(header_name).and_then(|h| h.to_str().ok()) {
226 snapshot.raw.insert(header_name.to_owned(), v.to_owned());
227 populated = true;
228 }
229 }
230 populated.then_some(snapshot)
231}
232
233fn deltas_from_response(response: &ModelResponse) -> Vec<StreamDelta> {
234 use crate::ir::ContentPart;
235
236 let mut deltas = Vec::new();
237 deltas.push(StreamDelta::Start {
238 id: response.id.clone(),
239 model: response.model.clone(),
240 provider_echoes: Vec::new(),
241 });
242 for part in &response.content {
243 match part {
244 ContentPart::Text {
245 text,
246 provider_echoes,
247 ..
248 } => {
249 deltas.push(StreamDelta::TextDelta {
250 text: text.clone(),
251 provider_echoes: provider_echoes.clone(),
252 });
253 }
254 ContentPart::ToolUse {
255 id,
256 name,
257 input,
258 provider_echoes,
259 } => {
260 deltas.push(StreamDelta::ToolUseStart {
261 id: id.clone(),
262 name: name.clone(),
263 provider_echoes: provider_echoes.clone(),
264 });
265 deltas.push(StreamDelta::ToolUseInputDelta {
266 partial_json: input.to_string(),
267 });
268 deltas.push(StreamDelta::ToolUseStop);
269 }
270 ContentPart::Thinking {
271 text,
272 provider_echoes,
273 ..
274 } => {
275 deltas.push(StreamDelta::ThinkingDelta {
276 text: text.clone(),
277 provider_echoes: provider_echoes.clone(),
278 });
279 }
280 // Multimodal inputs, citations, tool results never originate
281 // from the model on the assistant streaming path — text /
282 // thinking / tool_use / image_output / audio_output are the
283 // assistant-emitted shapes. Output media on the synthetic
284 // streaming fallback rides through `StreamDelta::Warning`
285 // (codecs that natively support multimodal output emit
286 // their own delta in the per-codec stream parser).
287 ContentPart::Image { .. }
288 | ContentPart::Audio { .. }
289 | ContentPart::Video { .. }
290 | ContentPart::Document { .. }
291 | ContentPart::RedactedThinking { .. }
292 | ContentPart::Citation { .. }
293 | ContentPart::ToolResult { .. }
294 | ContentPart::ImageOutput { .. }
295 | ContentPart::AudioOutput { .. } => {}
296 }
297 }
298 deltas.push(StreamDelta::Usage(response.usage.clone()));
299 for w in &response.warnings {
300 deltas.push(StreamDelta::Warning(w.clone()));
301 }
302 deltas.push(StreamDelta::Stop {
303 stop_reason: response.stop_reason.clone(),
304 });
305 deltas
306}
307
308/// Parse a wire-format response body into a `serde_json::Value`,
309/// wrapping the underlying serde error with operator-actionable
310/// context: the codec's name, the response body size, and a
311/// truncated peek at the first ~200 bytes. The bare
312/// `serde_json::Error` ("expected value at line 4 column 12") is
313/// useless for triage — operators need to know which provider
314/// returned the body and roughly what shape it had.
315///
316/// Used by every codec's `decode` entry point so the error story is
317/// uniform across Anthropic / OpenAI Chat / OpenAI Responses /
318/// Gemini / Bedrock Converse paths.
319pub(super) fn parse_response_body(
320 body: &[u8],
321 codec_name: &'static str,
322) -> Result<serde_json::Value> {
323 serde_json::from_slice(body).map_err(|e| {
324 const PEEK_BYTES: usize = 200;
325 let peek_end = peek_at_char_boundary(body, PEEK_BYTES);
326 let peek = body.get(..peek_end).map_or_else(String::new, |slice| {
327 String::from_utf8_lossy(slice).into_owned()
328 });
329 let suffix = if body.len() > PEEK_BYTES { "…" } else { "" };
330 crate::error::Error::provider_network(format!(
331 "{codec_name} codec failed to decode response: {e}; \
332 body was {} bytes; first {peek_end} bytes: {peek:?}{suffix} \
333 — the response did not parse as JSON; the upstream may have \
334 returned an HTML error page, a truncated body, or a wire \
335 format the codec does not yet understand",
336 body.len(),
337 ))
338 })
339}
340
341/// Find the largest `cut <= max` such that `body[..cut]` is valid
342/// UTF-8 — non-ASCII bodies produce a `%` placeholder in the
343/// middle of a multi-byte codepoint otherwise.
344fn peek_at_char_boundary(body: &[u8], max: usize) -> usize {
345 let mut cut = max.min(body.len());
346 while cut > 0
347 && body
348 .get(..cut)
349 .is_some_and(|slice| std::str::from_utf8(slice).is_err())
350 {
351 cut -= 1;
352 }
353 cut
354}