dwctl 8.68.0

The Doubleword Control Layer - A self-hostable observability and analytics platform for LLM applications
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
//! Parse an OpenAI chat-completions request into the cache primitives
//!: the ordered content blocks, a cumulative content
//! hash at every block boundary, and the explicit `cache_control` breakpoints (≤4)
//! with their TTL tiers.
//!
//! Reads and writes derive from these: a breakpoint's prefix hash — plus up to a
//! 20-block walk-back of earlier boundary hashes — is looked up for read hits; the
//! span beyond the matched read is tokenized for the write. The hash **excludes** the
//! `cache_control` directive itself, so identical content carrying different markers
//! matches, and the hashed bytes are exactly what onwards forwards after stripping.
//!
//! v1 scope: text content blocks on chat-completions messages. Tools-level markers
//! and image-token caching are deferred — image blocks still contribute to the
//! prefix hash but carry no text, so their tokens fall into the uncached tail.

use sha2::{Digest, Sha256};

use super::index::{TierPolicy, TtlTier};

/// Per-request breakpoint cap. Mirrors Anthropic's public limit
/// of **4 `cache_control` breakpoints** per request — enough for the common
/// tools→system→history→latest-turn pattern; more than that is rejected as abuse.
pub const MAX_BREAKPOINTS: usize = 4;
/// Walk-back window per breakpoint. Mirrors Anthropic's documented **20-block**
/// look-back: from a breakpoint we search up to 20 earlier block boundaries for a prior
/// write before giving up (a match further back is a genuine miss — the documented fix is
/// a second breakpoint). Bounds the per-request lookup-candidate set.
pub const WALK_BACK: usize = 20;

#[derive(Debug, thiserror::Error)]
pub enum ParseError {
    #[error("request body is not valid JSON: {0}")]
    Json(#[from] serde_json::Error),
    #[error("too many cache_control breakpoints (max {MAX_BREAKPOINTS})")]
    TooManyBreakpoints,
    #[error("invalid cache_control ttl: {0:?}")]
    InvalidTtl(String),
    #[error("unsupported cache_control type: {0:?} (only \"ephemeral\")")]
    UnsupportedType(String),
    #[error("cache_control ttl tier '{}' is not currently available", .0.as_str())]
    DisabledTier(TtlTier),
    #[error("cache_control must be an object with a string \"type\": \"ephemeral\" (and an optional string \"ttl\")")]
    MalformedCacheControl,
}

/// A single content block in canonical order.
#[derive(Debug, Clone)]
pub struct Block {
    pub role: String,
    /// Text content for tokenization (the write-side segment). Empty for non-text
    /// blocks (e.g. images), which then bill as uncached.
    pub text: String,
}

/// An explicit cache breakpoint (a `cache_control`-marked block).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Breakpoint {
    /// Index into `blocks` of the marked block (the inclusive prefix end).
    pub block_index: usize,
    pub ttl_tier: TtlTier,
}

/// The parsed cache view of a request.
#[derive(Debug, Clone)]
pub struct ParsedPrompt {
    pub blocks: Vec<Block>,
    /// Cumulative content hash *after* each block (`len() == blocks.len()`).
    pub cumulative_hashes: Vec<Vec<u8>>,
    /// Explicit breakpoints in block order, guaranteed `≤ MAX_BREAKPOINTS`.
    pub breakpoints: Vec<Breakpoint>,
}

impl ParsedPrompt {
    /// Walk-back read candidates for a breakpoint: the cumulative hash at the
    /// breakpoint, then each earlier boundary back through a 20-block window —
    /// **longest prefix first**, so the first index hit is the longest read.
    pub fn read_candidates(&self, bp: &Breakpoint) -> Vec<Vec<u8>> {
        let i = bp.block_index;
        // The range `start..=i` is inclusive on both ends, so subtract `WALK_BACK - 1`
        // (not `WALK_BACK`) to get exactly `WALK_BACK` candidates (e.g. i=24 → 5..=24 = 20).
        let start = i.saturating_sub(WALK_BACK - 1);
        (start..=i).rev().map(|j| self.cumulative_hashes[j].clone()).collect()
    }
}

/// Parse a chat-completions body into its cache primitives. Callers decide what a `ParseError`
/// means: the classifier degrades to "no cache" (the request is forwarded untouched), while the
/// request-path [`validate_markers`] surfaces it to the client as a 400.
pub fn parse_chat_completions(body: &[u8], policy: &TierPolicy) -> Result<ParsedPrompt, ParseError> {
    let v: serde_json::Value = serde_json::from_slice(body)?;

    let mut blocks: Vec<Block> = Vec::new();
    let mut breakpoints: Vec<Breakpoint> = Vec::new();
    let mut cumulative_hashes: Vec<Vec<u8>> = Vec::new();
    let mut hasher = Sha256::new();

    if let Some(messages) = v.get("messages").and_then(|m| m.as_array()) {
        for msg in messages {
            let role = msg.get("role").and_then(|r| r.as_str()).unwrap_or("").to_string();

            match msg.get("content") {
                // String content: one implicit text block, no marker possible.
                Some(serde_json::Value::String(s)) => {
                    let canonical = canonical_block_bytes(&role, &serde_json::json!({ "type": "text", "text": s }));
                    hasher.update(&canonical);
                    cumulative_hashes.push(hasher.clone().finalize().to_vec());
                    blocks.push(Block {
                        role: role.clone(),
                        text: s.clone(),
                    });
                }
                // Array content: a sequence of blocks, each possibly marked.
                Some(serde_json::Value::Array(arr)) => {
                    for block in arr {
                        let ttl = match block.get("cache_control") {
                            // An explicit `null` (or absent) is "no marker"; anything else is
                            // validated by `parse_ttl` (which rejects non-object values).
                            Some(cc) if !cc.is_null() => Some(parse_ttl(cc, policy)?),
                            _ => None,
                        };
                        let stripped = strip_cache_control(block);
                        let text = stripped.get("text").and_then(|t| t.as_str()).unwrap_or("").to_string();

                        let canonical = canonical_block_bytes(&role, &stripped);
                        hasher.update(&canonical);
                        cumulative_hashes.push(hasher.clone().finalize().to_vec());

                        let block_index = blocks.len();
                        blocks.push(Block { role: role.clone(), text });
                        if let Some(ttl_tier) = ttl {
                            breakpoints.push(Breakpoint { block_index, ttl_tier });
                        }
                    }
                }
                _ => {}
            }
        }
    }

    if breakpoints.len() > MAX_BREAKPOINTS {
        return Err(ParseError::TooManyBreakpoints);
    }

    Ok(ParsedPrompt {
        blocks,
        cumulative_hashes,
        breakpoints,
    })
}

/// `cache_control: { type: "ephemeral"[, ttl: "5m"|"1h"|"24h"] }`. `type` is required; a missing
/// `ttl` defaults to `policy.default_ttl` (Anthropic-style; configurable). Errors:
/// - a non-object marker, a missing/non-string `type`, or a non-string `ttl` → [`ParseError::MalformedCacheControl`]
/// - a string `type` that isn't `"ephemeral"` → [`ParseError::UnsupportedType`]
/// - an unknown `ttl` string → [`ParseError::InvalidTtl`]
/// - a valid tier the platform has disabled (not in `enabled_ttls`) → [`ParseError::DisabledTier`]
fn parse_ttl(cache_control: &serde_json::Value, policy: &TierPolicy) -> Result<TtlTier, ParseError> {
    use serde_json::Value;
    // Must be an object; a present `type`/`ttl` must be a *string*. A non-string field is
    // malformed, not "absent" — otherwise `.as_str()` would return `None` and e.g. `ttl: 123`
    // would silently default rather than being surfaced as a 400. (An explicit `null` marker is
    // filtered out as "no marker" before we get here.)
    if !cache_control.is_object() {
        return Err(ParseError::MalformedCacheControl);
    }
    // `type` is REQUIRED and must be the string "ephemeral" — Anthropic mandates it even though
    // it's the only valid value. Missing or non-string → malformed; a different string → unsupported.
    match cache_control.get("type") {
        Some(Value::String(t)) if t == "ephemeral" => {}
        Some(Value::String(t)) => return Err(ParseError::UnsupportedType(t.clone())),
        _ => return Err(ParseError::MalformedCacheControl),
    }
    let tier = match cache_control.get("ttl") {
        None => policy.default_ttl(),
        Some(Value::String(ttl)) => TtlTier::parse(ttl).ok_or_else(|| ParseError::InvalidTtl(ttl.clone()))?,
        Some(_) => return Err(ParseError::MalformedCacheControl),
    };
    if !policy.is_enabled(tier) {
        return Err(ParseError::DisabledTier(tier));
    }
    Ok(tier)
}

/// Synchronous request-path validation of `cache_control` markers — it skips the hashing that
/// the full [`parse_chat_completions`] does, so it's cheap enough to run before forwarding. Every
/// marker must be ephemeral, name an *enabled* tier (or default to one), and there must be
/// `≤ MAX_BREAKPOINTS`. A failure is turned into a 400 by the layer: the request is rejected
/// (like a bad parameter) rather than silently un-cached, so the client learns immediately and
/// isn't billed full price thinking it cached. Takes the body `Value` the layer already parses
/// to extract `model` — no extra deserialization.
pub fn validate_markers(body: &serde_json::Value, policy: &TierPolicy) -> Result<(), ParseError> {
    let mut breakpoints = 0usize;
    if let Some(messages) = body.get("messages").and_then(|m| m.as_array()) {
        for msg in messages {
            if let Some(arr) = msg.get("content").and_then(|c| c.as_array()) {
                for block in arr {
                    match block.get("cache_control") {
                        // Explicit `null` (or absent) is "no marker"; a non-object value is
                        // rejected by parse_ttl as malformed.
                        Some(cc) if !cc.is_null() => {
                            parse_ttl(cc, policy)?;
                            breakpoints += 1;
                            // Short-circuit on the request path — stop the moment the cap is exceeded.
                            // The error reports "max N", not an exact count (we don't keep scanning).
                            if breakpoints > MAX_BREAKPOINTS {
                                return Err(ParseError::TooManyBreakpoints);
                            }
                        }
                        _ => {}
                    }
                }
            }
        }
    }
    Ok(())
}

fn strip_cache_control(block: &serde_json::Value) -> serde_json::Value {
    let mut b = block.clone();
    if let Some(obj) = b.as_object_mut() {
        obj.remove("cache_control");
    }
    b
}

/// `role` + canonical JSON of the marker-stripped block. The role is included so the
/// same text under different roles hashes differently.
///
/// "Canonical" here relies on `serde_json` being built **without** the `preserve_order`
/// feature: `Value::Object` is then a `BTreeMap`, so `to_vec` emits keys in sorted order
/// and two blocks that differ only in key insertion order (common across SDKs/languages)
/// hash identically. If a dependency ever enables `preserve_order` (→ `IndexMap`,
/// insertion order), this would need explicit key-sorting to keep the cache-hit rate up.
fn canonical_block_bytes(role: &str, stripped_block: &serde_json::Value) -> Vec<u8> {
    let mut out = Vec::new();
    out.extend_from_slice(role.as_bytes());
    out.push(0x00);
    out.extend_from_slice(&serde_json::to_vec(stripped_block).unwrap_or_default());
    out
}

#[cfg(test)]
mod tests {
    use super::*;

    fn all_tiers() -> TierPolicy {
        TierPolicy::from_config(&["5m".to_string(), "1h".to_string(), "24h".to_string()], "5m")
    }

    fn parse(body: serde_json::Value) -> ParsedPrompt {
        parse_chat_completions(body.to_string().as_bytes(), &all_tiers()).unwrap()
    }

    #[test]
    fn no_markers_no_breakpoints() {
        let p = parse(serde_json::json!({
            "model": "m",
            "messages": [
                {"role": "system", "content": "you are helpful"},
                {"role": "user", "content": "hi"}
            ]
        }));
        assert_eq!(p.blocks.len(), 2);
        assert_eq!(p.cumulative_hashes.len(), 2);
        assert!(p.breakpoints.is_empty());
        assert_ne!(p.cumulative_hashes[0], p.cumulative_hashes[1]);
    }

    #[test]
    fn single_marker_with_default_ttl() {
        let p = parse(serde_json::json!({
            "messages": [
                {"role": "system", "content": [
                    {"type": "text", "text": "long ctx", "cache_control": {"type": "ephemeral"}}
                ]},
                {"role": "user", "content": "q"}
            ]
        }));
        assert_eq!(p.breakpoints.len(), 1);
        assert_eq!(p.breakpoints[0].block_index, 0);
        assert_eq!(p.breakpoints[0].ttl_tier, TtlTier::FiveMinutes);
    }

    #[test]
    fn ttl_tiers_parse() {
        for (ttl, tier) in [
            ("5m", TtlTier::FiveMinutes),
            ("1h", TtlTier::OneHour),
            ("24h", TtlTier::TwentyFourHours),
        ] {
            let p = parse(serde_json::json!({
                "messages": [{"role": "system", "content": [
                    {"type": "text", "text": "x", "cache_control": {"type": "ephemeral", "ttl": ttl}}
                ]}]
            }));
            assert_eq!(p.breakpoints[0].ttl_tier, tier);
        }
    }

    #[test]
    fn invalid_ttl_errors() {
        let err = parse_chat_completions(
            serde_json::json!({
                "messages": [{"role": "system", "content": [
                    {"type": "text", "text": "x", "cache_control": {"type": "ephemeral", "ttl": "2h"}}
                ]}]
            })
            .to_string()
            .as_bytes(),
            &all_tiers(),
        )
        .unwrap_err();
        assert!(matches!(err, ParseError::InvalidTtl(t) if t == "2h"));
    }

    #[test]
    fn unsupported_cache_control_type_errors() {
        let err = parse_chat_completions(
            serde_json::json!({
                "messages": [{"role": "system", "content": [
                    {"type": "text", "text": "x", "cache_control": {"type": "persistent"}}
                ]}]
            })
            .to_string()
            .as_bytes(),
            &all_tiers(),
        )
        .unwrap_err();
        assert!(matches!(err, ParseError::UnsupportedType(t) if t == "persistent"));
    }

    #[test]
    fn more_than_four_breakpoints_errors() {
        let blocks: Vec<_> = (0..5)
            .map(|i| serde_json::json!({"type": "text", "text": format!("b{i}"), "cache_control": {"type": "ephemeral"}}))
            .collect();
        let err = parse_chat_completions(
            serde_json::json!({ "messages": [{"role": "user", "content": blocks}] })
                .to_string()
                .as_bytes(),
            &all_tiers(),
        )
        .unwrap_err();
        assert!(matches!(err, ParseError::TooManyBreakpoints));
    }

    #[test]
    fn disabled_tier_rejected_by_validate_and_parse() {
        // Policy enables only 5m + 1h; a 24h marker is a valid-but-disabled tier.
        let policy = TierPolicy::from_config(&["5m".to_string(), "1h".to_string()], "5m");
        let body = serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "x", "cache_control": {"type": "ephemeral", "ttl": "24h"}}
            ]}]
        });
        let err = validate_markers(&body, &policy).unwrap_err();
        assert!(matches!(err, ParseError::DisabledTier(TtlTier::TwentyFourHours)));
        // The full parse rejects it identically (the two share parse_ttl, so they can't diverge).
        let err2 = parse_chat_completions(body.to_string().as_bytes(), &policy).unwrap_err();
        assert!(matches!(err2, ParseError::DisabledTier(TtlTier::TwentyFourHours)));
    }

    #[test]
    fn validate_markers_default_ttl_honours_policy() {
        // No explicit ttl → defaults to the policy default. With default "1h", a no-ttl marker
        // becomes a 1h breakpoint (and passes because 1h is enabled).
        let policy = TierPolicy::from_config(&["1h".to_string()], "1h");
        let body = serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "x", "cache_control": {"type": "ephemeral"}}
            ]}]
        });
        assert!(validate_markers(&body, &policy).is_ok());
        let p = parse_chat_completions(body.to_string().as_bytes(), &policy).unwrap();
        assert_eq!(p.breakpoints[0].ttl_tier, TtlTier::OneHour);
    }

    #[test]
    fn validate_markers_ok_and_counts_breakpoints() {
        let ok = serde_json::json!({
            "messages": [{"role": "user", "content": [
                {"type": "text", "text": "a", "cache_control": {"type": "ephemeral", "ttl": "1h"}},
                {"type": "text", "text": "q"}
            ]}]
        });
        assert!(validate_markers(&ok, &all_tiers()).is_ok());

        // validate_markers enforces the breakpoint cap too (not just the full parse).
        let blocks: Vec<_> = (0..5)
            .map(|i| serde_json::json!({"type": "text", "text": format!("b{i}"), "cache_control": {"type": "ephemeral"}}))
            .collect();
        let too_many = serde_json::json!({ "messages": [{"role": "user", "content": blocks}] });
        assert!(matches!(
            validate_markers(&too_many, &all_tiers()).unwrap_err(),
            ParseError::TooManyBreakpoints
        ));
    }

    #[test]
    fn non_object_cache_control_is_malformed() {
        // A bare string (or any non-object) must NOT slip through as the default tier.
        let body = serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "x", "cache_control": "persistent"}
            ]}]
        });
        assert!(matches!(
            validate_markers(&body, &all_tiers()).unwrap_err(),
            ParseError::MalformedCacheControl
        ));
        assert!(matches!(
            parse_chat_completions(body.to_string().as_bytes(), &all_tiers()).unwrap_err(),
            ParseError::MalformedCacheControl
        ));
    }

    #[test]
    fn non_string_type_or_ttl_is_malformed() {
        // A present-but-non-string `type`/`ttl` must not be treated as absent (which would
        // silently default e.g. `ttl: 123` into the default tier).
        let bad_ttl = serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "x", "cache_control": {"type": "ephemeral", "ttl": 123}}
            ]}]
        });
        assert!(matches!(
            validate_markers(&bad_ttl, &all_tiers()).unwrap_err(),
            ParseError::MalformedCacheControl
        ));

        let bad_type = serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "x", "cache_control": {"type": true}}
            ]}]
        });
        assert!(matches!(
            validate_markers(&bad_type, &all_tiers()).unwrap_err(),
            ParseError::MalformedCacheControl
        ));
    }

    #[test]
    fn missing_type_is_malformed() {
        // `type` is required (Anthropic mandates it, even though "ephemeral" is the only value).
        let no_type = serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "x", "cache_control": {"ttl": "1h"}}
            ]}]
        });
        assert!(matches!(
            validate_markers(&no_type, &all_tiers()).unwrap_err(),
            ParseError::MalformedCacheControl
        ));
        // An empty cache_control object (no type) is malformed too.
        let empty = serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "x", "cache_control": {}}
            ]}]
        });
        assert!(matches!(
            parse_chat_completions(empty.to_string().as_bytes(), &all_tiers()).unwrap_err(),
            ParseError::MalformedCacheControl
        ));
    }

    #[test]
    fn null_cache_control_is_no_marker() {
        // An explicit `null` means "no marker": no breakpoint, no error.
        let body = serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "x", "cache_control": null}
            ]}]
        });
        assert!(validate_markers(&body, &all_tiers()).is_ok());
        let p = parse_chat_completions(body.to_string().as_bytes(), &all_tiers()).unwrap();
        assert!(p.breakpoints.is_empty());
    }

    #[test]
    fn cache_control_excluded_from_hash() {
        // Same content, one with a marker and one without — the cumulative hash at
        // that block must be identical (the marker is stripped before hashing).
        let marked = parse(serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "shared prefix", "cache_control": {"type": "ephemeral", "ttl": "1h"}}
            ]}]
        }));
        let unmarked = parse(serde_json::json!({
            "messages": [{"role": "system", "content": [
                {"type": "text", "text": "shared prefix"}
            ]}]
        }));
        assert_eq!(marked.cumulative_hashes[0], unmarked.cumulative_hashes[0]);
        // ...and different content differs.
        let other = parse(serde_json::json!({
            "messages": [{"role": "system", "content": [{"type": "text", "text": "different"}]}]
        }));
        assert_ne!(marked.cumulative_hashes[0], other.cumulative_hashes[0]);
    }

    #[test]
    fn walk_back_candidates_longest_first_bounded() {
        // 25 blocks, breakpoint on the last → candidates are the 20 most recent
        // boundary hashes, longest (the breakpoint) first.
        let blocks: Vec<_> = (0..25)
            .map(|i| {
                let mut b = serde_json::json!({"type": "text", "text": format!("b{i}")});
                if i == 24 {
                    b["cache_control"] = serde_json::json!({"type": "ephemeral"});
                }
                b
            })
            .collect();
        let p = parse(serde_json::json!({ "messages": [{"role": "user", "content": blocks}] }));
        let cands = p.read_candidates(&p.breakpoints[0]);
        assert_eq!(cands.len(), WALK_BACK);
        assert_eq!(cands[0], p.cumulative_hashes[24]); // breakpoint itself, longest
        assert_eq!(cands[1], p.cumulative_hashes[23]);
        assert_eq!(cands[WALK_BACK - 1], p.cumulative_hashes[5]); // 24 - 19
    }

    #[test]
    fn deterministic() {
        let body = serde_json::json!({
            "messages": [{"role": "system", "content": [{"type": "text", "text": "abc"}]}]
        });
        assert_eq!(parse(body.clone()).cumulative_hashes, parse(body).cumulative_hashes);
    }
}