dwctl 8.68.2

The Doubleword Control Layer - A self-hostable observability and analytics platform for LLM applications
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
//! OpenAI-shaped request sanitisation and response usage injection,
//! relocated into dwctl for the dwctl-owned cache layer.
//!
//! Two jobs, both run by the cache tower layer (only when a cacheable request is
//! classified):
//!
//! 1. **Outbound request sanitisation** ([`strip_cache_control`]): recursively remove
//!    every `cache_control` marker from the request body, and ensure
//!    `stream_options.include_usage = true` so a streaming response carries a terminal
//!    usage frame to edit. Markers are a billing signal consumed here, not forwarded.
//! 2. **Response usage injection**: splice the neutral [`CacheStats`] into the OpenAI `usage`
//!    object — `prompt_tokens_details.cached_tokens` plus the doubleword extension fields.
//!    Non-streaming ([`inject_into_response_nonstreaming`]) buffers + edits the JSON body;
//!    streaming ([`scan_inject_sse`]) edits *only* the terminal usage frame before `[DONE]`,
//!    never buffering the whole stream (the cache layer drives it so the classify-await is
//!    deferred to that frame and never holds the first token).

use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use bytes::Bytes;
use serde_json::Value;
use tracing::error;

use super::stats::CacheStats;

/// Recursively remove every `cache_control` field from a JSON value. Returns
/// `(rewrote, had_marker)`: `rewrote` = any key was removed (so the body changed and must be
/// re-serialised before forwarding — a `cache_control` field would otherwise leak to the
/// upstream); `had_marker` = any removed value was NON-NULL, the adoption signal. An explicit
/// `cache_control: null` is "no marker" (matching parse/validation), but is still stripped.
fn remove_cache_control(value: &mut Value) -> (bool, bool) {
    let mut rewrote = false;
    let mut had_marker = false;
    match value {
        Value::Object(map) => {
            if let Some(removed) = map.remove("cache_control") {
                rewrote = true;
                had_marker |= !removed.is_null();
            }
            for v in map.values_mut() {
                let (r, h) = remove_cache_control(v);
                rewrote |= r;
                had_marker |= h;
            }
        }
        Value::Array(items) => {
            for v in items.iter_mut() {
                let (r, h) = remove_cache_control(v);
                rewrote |= r;
                had_marker |= h;
            }
        }
        _ => {}
    }
    (rewrote, had_marker)
}

/// Sanitise an outbound request body: strip every `cache_control` marker and, for
/// streaming requests, ensure `stream_options.include_usage = true`. Returns the
/// rewritten bytes when anything changed, or `None` to leave the original untouched. Also
/// returns whether the client actually sent `cache_control` markers (`had_markers`) — the
/// adoption signal, kept distinct from the body changing, since a stream gets
/// `include_usage` injected even when no markers were present.
pub fn strip_cache_control(body: &[u8]) -> (Option<Bytes>, bool) {
    let Ok(mut json) = serde_json::from_slice::<Value>(body) else {
        return (None, false);
    };
    let (rewrote, had_markers) = remove_cache_control(&mut json);

    let mut usage_set = false;
    if let Some(obj) = json.as_object_mut() {
        let is_streaming = obj.get("stream").and_then(Value::as_bool) == Some(true);
        if is_streaming {
            let opts = obj.entry("stream_options").or_insert_with(|| serde_json::json!({}));
            if let Some(opts_obj) = opts.as_object_mut() {
                let already = opts_obj.get("include_usage").and_then(Value::as_bool) == Some(true);
                if !already {
                    opts_obj.insert("include_usage".to_string(), serde_json::json!(true));
                    usage_set = true;
                }
            }
        }
    }

    let body = if rewrote || usage_set {
        serde_json::to_vec(&json).ok().map(Bytes::from)
    } else {
        None
    };
    (body, had_markers)
}

/// Splice the OpenAI-shaped cache fields into a `usage` object in place.
/// `prompt_tokens` is left as the full input count; only the cache breakdown is added.
fn splice_cache_fields(usage: &mut serde_json::Map<String, Value>, stats: &CacheStats) {
    let details = usage.entry("prompt_tokens_details").or_insert_with(|| serde_json::json!({}));
    if let Some(details_obj) = details.as_object_mut() {
        details_obj.insert("cached_tokens".to_string(), serde_json::json!(stats.read));
    }
    usage.insert("cache_read_input_tokens".to_string(), serde_json::json!(stats.read));
    usage.insert("cache_creation_input_tokens".to_string(), serde_json::json!(stats.creation_total()));
    usage.insert(
        "cache_creation".to_string(),
        serde_json::json!({
            "ephemeral_5m_input_tokens": stats.creation_5m,
            "ephemeral_1h_input_tokens": stats.creation_1h,
            "ephemeral_24h_input_tokens": stats.creation_24h,
        }),
    );
}

/// Inject the cache stats into a non-streaming chat-completion JSON body. Returns the
/// rewritten body, or `None` if it can't be parsed or has no `usage` object.
pub fn inject_into_usage_json(body: &[u8], stats: &CacheStats) -> Option<Bytes> {
    let mut json: Value = serde_json::from_slice(body).ok()?;
    let obj = json.as_object_mut()?;
    let usage = obj.get_mut("usage")?.as_object_mut()?;
    splice_cache_fields(usage, stats);
    serde_json::to_vec(&json).ok().map(Bytes::from)
}

/// The outcome of scanning one SSE body chunk: the (optionally) rewritten bytes plus the
/// two billing-success signals observed in it. Accumulated across chunks by the streaming
/// path so the cache-commit gate matches what billing sees.
pub(crate) struct SseScan {
    /// `Some` only if a usage frame was found *and* injected this call.
    pub rewritten: Option<Bytes>,
    /// A `data:` frame carrying an `error` payload (mid-stream provider failure).
    pub saw_error: bool,
    /// A `data:` frame carrying a `usage` object (the terminal usage frame).
    pub saw_usage: bool,
}

/// Scan an SSE body for error/usage frames and, unless `already_edited`, inject the cache
/// fields into the first usage frame found. Editing touches only that one frame; every
/// other line (deltas, `[DONE]`) is preserved byte-for-byte. Assumes uncompressed UTF-8
/// `text/event-stream`; non-UTF-8 bodies are a graceful no-op (no scan, no edit).
///
/// Each `data:` line is parsed as a complete JSON object. The SSE spec permits one object
/// to span several `data:` lines (joined by `\n`), but every OpenAI-compatible
/// chat-completions provider emits one compact line per frame, so we don't reassemble.
/// This deliberately matches the billing-path scanner `extract_cache_tokens`
/// (request_logging::serializers) line-for-line: the commit gate's "saw a usage frame" and
/// billing's "found usage" must make the *same* call, or the cache could commit a write for
/// a frame billing reads as zero. If a multi-line provider ever appears, both must learn to
/// reassemble together — not this one alone.
pub(crate) fn scan_inject_sse(body: &[u8], stats: &CacheStats, already_edited: bool) -> SseScan {
    let Ok(body_str) = std::str::from_utf8(body) else {
        return SseScan {
            rewritten: None,
            saw_error: false,
            saw_usage: false,
        };
    };

    // Fast path: the streaming layer probes every frame with `already_edited=true` purely to
    // collect the commit-gate signals — it can never rewrite, so skip the output-buffer rebuild
    // (an allocation + full-body copy per SSE frame otherwise).
    if already_edited {
        let mut saw_error = false;
        let mut saw_usage = false;
        for line in body_str.split('\n') {
            if let Some(chunk) = sse_data_json(line) {
                saw_error |= chunk.get("error").is_some();
                saw_usage |= chunk.get("usage").is_some_and(Value::is_object);
            }
        }
        return SseScan {
            rewritten: None,
            saw_error,
            saw_usage,
        };
    }

    let mut out = String::with_capacity(body_str.len() + 256);
    let mut edited = false;
    let mut saw_error = false;
    let mut saw_usage = false;

    let mut first = true;
    for line in body_str.split('\n') {
        if !first {
            out.push('\n');
        }
        first = false;

        if let Some(mut chunk) = sse_data_json(line) {
            let chunk_obj = chunk.as_object_mut().expect("sse_data_json returns only objects");
            // Observe billing signals on every frame, even after we've injected.
            if chunk_obj.contains_key("error") {
                saw_error = true;
            }
            if let Some(usage) = chunk_obj.get_mut("usage")
                && let Some(usage_obj) = usage.as_object_mut()
            {
                saw_usage = true;
                if !edited {
                    // Preserve the line's terminator style: on a CRLF stream this `line`
                    // (split on '\n') ends with '\r', which the reserialized JSON drops —
                    // re-append it so we don't emit a lone '\n' amid '\r\n' framing.
                    let has_cr = line.ends_with('\r');
                    splice_cache_fields(usage_obj, stats);
                    if let Ok(reserialized) = serde_json::to_string(&chunk) {
                        out.push_str("data: ");
                        out.push_str(&reserialized);
                        if has_cr {
                            out.push('\r');
                        }
                        edited = true;
                        continue;
                    }
                }
            }
        }
        out.push_str(line);
    }

    SseScan {
        rewritten: if edited { Some(Bytes::from(out)) } else { None },
        saw_error,
        saw_usage,
    }
}

/// Parse one SSE line's `data:` payload into its JSON object, or `None` for non-`data` lines,
/// `[DONE]`, unparseable JSON, or non-object payloads. Shared by both the scan-only fast path and
/// the editing path so they make the *identical* "is this a usage/error frame" call — the same
/// invariant the module doc requires against the billing scanner.
fn sse_data_json(line: &str) -> Option<Value> {
    // SSE allows `data:<value>` and `data: <value>` — strip the colon, then an optional single
    // space (matches onwards' own SSE parser).
    let data = line.strip_prefix("data:")?;
    let trimmed = data.strip_prefix(' ').unwrap_or(data).trim();
    if trimmed == "[DONE]" {
        return None;
    }
    serde_json::from_str::<Value>(trimmed).ok().filter(Value::is_object)
}

/// Inject the cache stats into the terminal usage frame of an SSE body. `None` if no usage
/// frame is found. (Thin wrapper over [`scan_inject_sse`]; the streaming path uses the
/// scan directly to also collect the commit-gate signals.)
pub fn inject_into_sse_body(body: &[u8], stats: &CacheStats) -> Option<Bytes> {
    scan_inject_sse(body, stats, false).rewritten
}

/// Inject the cache stats into a **non-streaming** chat-completion JSON response. Buffers the
/// body, splices the cache fields into `usage`, and returns whether the request succeeded for
/// billing — a 2xx *with* a usage object — so the caller gates the index write on it. A body that
/// can't be buffered becomes a structured 5xx with a `false` gate. Streaming responses are handled
/// separately by the cache layer, which defers the classify-await into the SSE stream so it never
/// holds the first token.
pub async fn inject_into_response_nonstreaming(response: Response, stats: &CacheStats) -> (Response, bool) {
    let status_ok = response.status().is_success();

    // Only JSON can carry a chat-completion `usage`; don't buffer explicitly non-JSON bodies
    // (preserve pass-through). Media types are case-insensitive and may carry parameters, so match
    // the trimmed base type case-insensitively. Missing/unknown content-type → try JSON.
    let is_json = response
        .headers()
        .get("content-type")
        .and_then(|v| v.to_str().ok())
        .map(|v| {
            v.split(';')
                .next()
                .map(str::trim)
                .is_some_and(|ct| ct.eq_ignore_ascii_case("application/json"))
        })
        .unwrap_or(true);
    if !is_json {
        return (response, false);
    }
    let (mut parts, body) = response.into_parts();
    let body_bytes = match axum::body::to_bytes(body, usize::MAX).await {
        Ok(b) => b,
        Err(e) => {
            // Buffering the upstream response failed (e.g. the upstream connection broke
            // mid-read). Forwarding an empty body would hand the client a misleading 200 with no
            // content; instead return a structured 5xx and veto the commit.
            error!("Failed to buffer response body for cache injection: {}", e);
            let err_body = serde_json::json!({
                "error": {
                    "message": format!("failed to read upstream response body: {e}"),
                    "type": "internal_error",
                    "code": "response_body_read_failed",
                }
            });
            return ((StatusCode::INTERNAL_SERVER_ERROR, axum::Json(err_body)).into_response(), false);
        }
    };

    // A present `usage` object is billing's success signal for a non-streamed call (it's where
    // token counts come from); combined with a 2xx status, it gates the write.
    match inject_into_usage_json(&body_bytes, stats) {
        Some(rewritten) => {
            let len = rewritten.len();
            parts.headers.remove(axum::http::header::TRANSFER_ENCODING);
            // We emit plain JSON (parse succeeded), so drop any stale Content-Encoding.
            parts.headers.remove(axum::http::header::CONTENT_ENCODING);
            parts
                .headers
                .insert(axum::http::header::CONTENT_LENGTH, axum::http::HeaderValue::from(len as u64));
            (Response::from_parts(parts, axum::body::Body::from(rewritten)), status_ok)
        }
        None => {
            // No usage object (error body, or non-completion JSON) → never commit.
            let len = body_bytes.len();
            parts.headers.remove(axum::http::header::TRANSFER_ENCODING);
            parts
                .headers
                .insert(axum::http::header::CONTENT_LENGTH, axum::http::HeaderValue::from(len as u64));
            (Response::from_parts(parts, axum::body::Body::from(body_bytes)), false)
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn stats() -> CacheStats {
        CacheStats {
            read: 1024,
            creation_5m: 10,
            creation_1h: 20,
            creation_24h: 30,
        }
    }

    #[test]
    fn strip_removes_nested_cache_control_and_sets_include_usage() {
        let body = serde_json::json!({
            "stream": true,
            "messages": [{"role":"system","content":[{"type":"text","text":"x","cache_control":{"type":"ephemeral"}}]}]
        })
        .to_string();
        let (out, had_markers) = strip_cache_control(body.as_bytes());
        assert!(had_markers, "body had cache_control");
        let out = out.expect("changed");
        let v: Value = serde_json::from_slice(&out).unwrap();
        assert!(!out.windows(13).any(|w| w == b"cache_control"));
        assert_eq!(v["stream_options"]["include_usage"], true);
    }

    #[test]
    fn strip_none_when_nothing_to_do() {
        let body = serde_json::json!({"messages":[{"role":"user","content":"hi"}]}).to_string();
        let (out, had_markers) = strip_cache_control(body.as_bytes());
        assert!(out.is_none());
        assert!(!had_markers);
    }

    #[test]
    fn strip_stream_without_markers_changes_body_but_not_marked() {
        // The overcounting case: a stream with no cache_control still gets include_usage
        // injected (body changes), but had_markers must stay false.
        let body = serde_json::json!({"stream": true, "messages":[{"role":"user","content":"hi"}]}).to_string();
        let (out, had_markers) = strip_cache_control(body.as_bytes());
        assert!(out.is_some(), "include_usage injected");
        assert!(!had_markers, "no markers present");
    }

    #[test]
    fn strip_null_cache_control_is_removed_but_not_marked() {
        // An explicit `cache_control: null` is "no marker" for the adoption metric (matching
        // parse/validation), but is still stripped so it can't leak to the upstream.
        let body = serde_json::json!({
            "messages": [{"role":"system","content":[{"type":"text","text":"x","cache_control":null}]}]
        })
        .to_string();
        let (out, had_markers) = strip_cache_control(body.as_bytes());
        assert!(!had_markers, "null cache_control is not a marker");
        let out = out.expect("body rewritten to drop the null cache_control key");
        assert!(!out.windows(13).any(|w| w == b"cache_control"), "cache_control key removed");
    }

    #[test]
    fn inject_non_streaming_adds_cache_fields() {
        let body = serde_json::json!({"usage":{"prompt_tokens":2000,"completion_tokens":5}}).to_string();
        let out = inject_into_usage_json(body.as_bytes(), &stats()).unwrap();
        let v: Value = serde_json::from_slice(&out).unwrap();
        assert_eq!(v["usage"]["prompt_tokens"], 2000, "total preserved");
        assert_eq!(v["usage"]["prompt_tokens_details"]["cached_tokens"], 1024);
        assert_eq!(v["usage"]["cache_read_input_tokens"], 1024);
        assert_eq!(v["usage"]["cache_creation_input_tokens"], 60);
        assert_eq!(v["usage"]["cache_creation"]["ephemeral_1h_input_tokens"], 20);
    }

    #[test]
    fn inject_non_streaming_none_when_no_usage() {
        let body = serde_json::json!({"choices":[]}).to_string();
        assert!(inject_into_usage_json(body.as_bytes(), &stats()).is_none());
    }

    #[test]
    fn inject_sse_preserves_crlf_on_edited_frame() {
        // CRLF-framed stream: the rewritten usage frame must keep its trailing '\r' so the
        // '\r\n\r\n' event boundary stays intact (no lone '\n' amid CRLF).
        let sse = "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":2000}}\r\n\r\ndata: [DONE]\r\n\r\n";
        let out = inject_into_sse_body(sse.as_bytes(), &stats()).unwrap();
        let s = std::str::from_utf8(&out).unwrap();
        assert!(s.contains("\"cache_read_input_tokens\":1024"), "got: {s}");
        // The injected frame is still terminated by CRLF, not a bare LF.
        assert!(s.contains("}\r\n\r\n"), "edited frame must keep CRLF framing, got: {s}");
        assert!(!s.contains("}\n\r"), "must not produce a malformed \\n\\r, got: {s}");
    }

    #[test]
    fn inject_sse_edits_only_terminal_usage_frame() {
        let sse = "data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\ndata: {\"choices\":[],\"usage\":{\"prompt_tokens\":2000}}\n\ndata: [DONE]\n\n";
        let out = inject_into_sse_body(sse.as_bytes(), &stats()).unwrap();
        let s = std::str::from_utf8(&out).unwrap();
        assert!(s.contains("\"cached_tokens\":1024"));
        assert!(s.contains("\"cache_read_input_tokens\":1024"));
        // exactly one injected frame; the delta + [DONE] are untouched.
        assert_eq!(s.matches("cached_tokens").count(), 1);
        assert!(s.contains("data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}"));
        assert!(s.contains("data: [DONE]"));
    }

    #[test]
    fn inject_sse_none_when_no_usage_frame() {
        let sse = "data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\ndata: [DONE]\n\n";
        assert!(inject_into_sse_body(sse.as_bytes(), &stats()).is_none());
    }

    #[test]
    fn inject_sse_handles_data_prefix_without_space() {
        // `data:{…}` (no space after the colon) is valid SSE and must still be injected.
        let sse = "data:{\"choices\":[],\"usage\":{\"prompt_tokens\":2000}}\n\ndata:[DONE]\n\n";
        let out = inject_into_sse_body(sse.as_bytes(), &stats()).expect("no-space data: frame is injected");
        let s = std::str::from_utf8(&out).unwrap();
        assert!(s.contains("\"cache_read_input_tokens\":1024"), "got: {s}");
    }

    #[test]
    fn inject_into_sse_body_edits_the_usage_frame() {
        // The injection primitive: splice cache fields into the terminal usage frame, leaving the
        // deltas and `[DONE]` untouched. (The streaming orchestration — deferred classify resolve
        // + the commit gate — is exercised end-to-end in the layer tests.)
        let body = b"data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\ndata: {\"choices\":[],\"usage\":{\"prompt_tokens\":2000}}\n\ndata: [DONE]\n\n";
        let out = inject_into_sse_body(body, &stats()).expect("usage frame present → edited");
        let s = std::str::from_utf8(&out).unwrap();
        assert!(s.contains("\"cached_tokens\":1024"), "got: {s}");
        assert!(s.contains("data: [DONE]"), "DONE preserved");
        assert!(s.contains("\"content\":\"hi\""), "delta preserved");
    }

    #[test]
    fn inject_into_sse_body_none_without_usage() {
        let body = b"data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\ndata: [DONE]\n\n";
        assert!(inject_into_sse_body(body, &stats()).is_none(), "no usage frame → nothing to edit");
    }

    #[tokio::test]
    async fn inject_nonstreaming_error_body_vetoes_commit() {
        use axum::body::Body;
        // A 400 JSON error body has no usage object → no injection, no commit.
        let resp = Response::builder()
            .status(400)
            .header("content-type", "application/json")
            .body(Body::from(serde_json::json!({"error":{"message":"bad request"}}).to_string()))
            .unwrap();
        let (_out, billing_ok) = inject_into_response_nonstreaming(resp, &stats()).await;
        assert!(!billing_ok, "error body → no commit");
    }

    #[tokio::test]
    async fn inject_nonstreaming_success_injects_and_allows_commit() {
        use axum::body::Body;
        let resp = Response::builder()
            .status(200)
            .header("content-type", "application/json")
            .body(Body::from(serde_json::json!({"usage":{"prompt_tokens":2000}}).to_string()))
            .unwrap();
        let (out, billing_ok) = inject_into_response_nonstreaming(resp, &stats()).await;
        assert!(billing_ok, "2xx with usage → commit allowed");
        let collected = axum::body::to_bytes(out.into_body(), usize::MAX).await.unwrap();
        let s = std::str::from_utf8(&collected).unwrap();
        assert!(s.contains("\"cached_tokens\":1024"), "got: {s}");
    }
}