dwctl 8.63.0

The Doubleword Control Layer - A self-hostable observability and analytics platform for LLM applications
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
//! The classify orchestration: turn a request into a neutral
//! [`CacheStats`] split plus the [`PendingWrite`] to commit on success.
//!
//! Flow: resolve principal → per-model gate → parse markers → find the longest cached
//! prefix (read, via the 20-block walk-back) → tokenize the new suffix (write) →
//! enforce the min-prefix floor → assemble. Reads need no tokenization (the count is
//! stored on the entry); only the new write span is tokenized, and it runs in parallel
//! with generation. Any recoverable failure (tokenizer down, model unmapped, parse
//! error, no principal) degrades to all-zero "no caching" — never an error to the
//! customer (best-effort; a reconciliation pass backstops residual overcharges).
//!
//! v1 scope: chat-completions message bodies. Tool-using multi-step Responses are a
//! fast-follow; image tokens fall into the uncached tail.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use chrono::Utc;

use super::index::{CacheEntry, CacheIndex, CacheResult, IndexScope, PrefixHash};
use super::model_config::ModelConfigResolver;
use super::parse::parse_chat_completions;
use super::principal::PrincipalResolver;
use super::stats::{CacheStats, PendingWrite};
use super::tokenizer::TokenizerClient;

/// What `classify` needs from the request.
pub struct ClassifyRequest<'a> {
    /// The virtual model (the `deployed_models.alias` = the cache key dimension).
    pub virtual_model: &'a str,
    /// The raw request body (with `cache_control` markers intact).
    pub body: &'a [u8],
    /// The validated bearer token, or `None` (un-scopable → no caching).
    pub api_key: Option<&'a str>,
}

/// The result of [`Classifier::classify`].
///
/// `active` is true once the per-model gate passes (the model is cache-enabled),
/// independent of whether this particular prompt cached anything. It drives the
/// uniform-zeros injection: an enabled model always gets the `cache_*` usage
/// fields on its response — zeroed when nothing cached — so the cohort has one
/// response shape; a disabled model's response is left untouched. `stats`/`pending`
/// carry the actual read/write split (both zero when `active` but nothing cached).
pub struct ClassifyOutcome {
    pub stats: CacheStats,
    pub pending: PendingWrite,
    pub active: bool,
}

impl ClassifyOutcome {
    /// Model not cache-enabled (or unscopable) — leave the response untouched.
    pub(crate) fn inactive() -> Self {
        Self {
            stats: CacheStats::default(),
            pending: PendingWrite::default(),
            active: false,
        }
    }

    /// Enabled, but this prompt cached nothing (no markers, below floor, tokenizer
    /// degraded, …) — inject uniform zeros, commit nothing.
    fn zero_active() -> Self {
        Self {
            stats: CacheStats::default(),
            pending: PendingWrite::default(),
            active: true,
        }
    }

    /// Enabled with a real read/write split.
    fn active(stats: CacheStats, pending: PendingWrite) -> Self {
        Self {
            stats,
            pending,
            active: true,
        }
    }
}

/// Owns the classify engine's dependencies. Cheap to clone (everything inside is
/// `Arc`/pool/cache-backed).
#[derive(Clone)]
pub struct Classifier {
    principal: PrincipalResolver,
    model_config: ModelConfigResolver,
    tokenizer: TokenizerClient,
    index: Arc<dyn CacheIndex>,
    /// alias → tokenizer_version (from tokenizer-svc `/v1/models`); `None` = unmapped.
    versions: moka::future::Cache<String, Option<String>>,
}

impl Classifier {
    pub fn new(
        principal: PrincipalResolver,
        model_config: ModelConfigResolver,
        tokenizer: TokenizerClient,
        index: Arc<dyn CacheIndex>,
    ) -> Self {
        let versions = moka::future::Cache::builder()
            .max_capacity(10_000)
            .time_to_live(std::time::Duration::from_secs(300))
            .build();
        Self {
            principal,
            model_config,
            tokenizer,
            index,
            versions,
        }
    }

    /// Classify a request into its read/write split + the entries to commit on success.
    ///
    /// Pre-`cfg.enabled` bails are `inactive` (unscopable / disabled model → response
    /// untouched). Once the model is enabled, every bail is `zero_active` (uniform
    /// zeros injected, nothing committed) — so enabled models present one shape.
    pub async fn classify(&self, req: ClassifyRequest<'_>) -> CacheResult<ClassifyOutcome> {
        // Gates that fire *before* we know the model is cache-enabled → inactive.
        let Some(api_key) = req.api_key else {
            return Ok(ClassifyOutcome::inactive());
        };
        let Some(principal_id) = self.principal.resolve(api_key).await? else {
            return Ok(ClassifyOutcome::inactive());
        };
        let cfg = self.model_config.resolve(req.virtual_model).await?;
        if !cfg.enabled {
            return Ok(ClassifyOutcome::inactive());
        }

        // From here the model is cache-enabled: any bail is `zero_active`.
        let parsed = match parse_chat_completions(req.body) {
            Ok(p) => p,
            Err(e) => {
                // Best-effort: degrade to no caching (uniform zeros). Log at debug so the
                // silent degradation is diagnosable without warn-level noise on every odd body.
                tracing::debug!(error = %e, virtual_model = req.virtual_model, "cache classify: body not cacheable (unparseable / >4 breakpoints)");
                return Ok(ClassifyOutcome::zero_active());
            }
        };
        if parsed.breakpoints.is_empty() {
            return Ok(ClassifyOutcome::zero_active()); // markers are required to cache
        }
        let Some(tokenizer_version) = self.tokenizer_version(req.virtual_model).await? else {
            return Ok(ClassifyOutcome::zero_active()); // model not mapped in tokenizer-svc
        };
        let scope = IndexScope {
            principal_id,
            virtual_model: req.virtual_model.to_string(),
            tokenizer_version,
        };

        // Longest cached prefix across all breakpoints' walk-back windows.
        let read = self.find_longest_read(&scope, &parsed).await?;
        let read_block = read.as_ref().map(|r| r.block); // index into parsed.blocks
        let read_tokens = read.as_ref().map(|r| r.tokens).unwrap_or(0);

        // Parse guarantees ≥1 breakpoint here (the is-empty check above bails early), but
        // degrade to no-caching rather than panic if that invariant ever drifts in a refactor.
        let Some(deepest_bp) = parsed.breakpoints.last() else {
            return Ok(ClassifyOutcome::zero_active());
        };
        let deepest = deepest_bp.block_index;

        let mut stats = CacheStats {
            read: read_tokens as u64,
            ..Default::default()
        };
        let mut pending = PendingWrite::default();

        // Refresh the matched read entry's TTL (sliding window).
        if let Some(r) = &read {
            pending.refresh = Some((scope.clone(), r.hash.clone(), Utc::now() + r.duration));
        }

        // Pure read: the deepest declared prefix is already cached → no write.
        if read_block == Some(deepest) {
            // Floor is a write-time gate; a live read entry was already above it.
            return Ok(ClassifyOutcome::active(stats, pending));
        }

        // Write span: blocks after the matched read, up to the deepest breakpoint.
        let write_start = read_block.map(|b| b + 1).unwrap_or(0);
        let segments: Vec<String> = parsed.blocks[write_start..=deepest].iter().map(|b| b.text.clone()).collect();

        // Tokenize the suffix (the only tokenization; reads needed none). Failure →
        // degrade to no caching (safe under the best-effort contract).
        let tok = match self.tokenizer.tokenize(req.virtual_model, &segments).await {
            Ok(tok) => tok,
            Err(e) => {
                tracing::debug!(error = %e, virtual_model = req.virtual_model, "cache classify: tokenize failed, degrading to no write");
                return Ok(ClassifyOutcome::zero_active());
            }
        };
        if tok.cumulative.len() != segments.len() {
            // The tokenizer returned a different number of cumulative counts than segments we
            // sent — we can't map tokens to blocks safely, so bail (no write) rather than guess.
            tracing::debug!(
                segments = segments.len(),
                cumulative = tok.cumulative.len(),
                virtual_model = req.virtual_model,
                "cache classify: tokenizer segment-count mismatch, degrading to no write"
            );
            return Ok(ClassifyOutcome::zero_active());
        }

        // cumulative token count *at* each block in the write span (with the read offset).
        let cumulative_at = |block: usize| -> u64 { read_tokens as u64 + tok.cumulative[block - write_start] as u64 };
        let total_prefix = cumulative_at(deepest);
        if total_prefix < cfg.min_prefix_tokens as u64 {
            return Ok(ClassifyOutcome::zero_active()); // below the per-model floor → no caching
        }

        // Each breakpoint beyond the read is its own cached prefix; the segment it
        // closes is creation under its tier. (`block_index > read_block`, treating a
        // no-read as -1, selects exactly the breakpoints within the write span.)
        let mut prev_boundary = read_tokens as u64;
        let now = Utc::now();
        let read_block_idx: isize = read_block.map(|b| b as isize).unwrap_or(-1);
        for bp in parsed.breakpoints.iter().filter(|bp| bp.block_index as isize > read_block_idx) {
            let bp_cumulative = cumulative_at(bp.block_index);
            let segment_tokens = bp_cumulative.saturating_sub(prev_boundary);
            stats.add_creation(bp.ttl_tier, segment_tokens);
            pending.writes.push(CacheEntry {
                scope: scope.clone(),
                prefix_hash: parsed.cumulative_hashes[bp.block_index].clone(),
                // Cap at u32::MAX — a prefix exceeding ~4.3B tokens is beyond any model's
                // context window; if that ever becomes realistic the column needs BIGINT.
                cumulative_token_count: bp_cumulative.min(u32::MAX as u64) as u32,
                ttl_tier: bp.ttl_tier,
                expires_at: now + bp.ttl_tier.duration(),
            });
            prev_boundary = bp_cumulative;
        }

        Ok(ClassifyOutcome::active(stats, pending))
    }

    /// Commit a [`PendingWrite`] to the index — the success-gated, post-response step
    /// the cache layer runs on a 2xx: upsert the new write entries and
    /// slide the matched read's TTL.
    pub async fn commit(&self, pending: &PendingWrite) -> CacheResult<()> {
        for entry in &pending.writes {
            self.index.write(entry).await?;
        }
        if let Some((scope, hash, new_expires_at)) = &pending.refresh {
            self.index.refresh(scope, hash, *new_expires_at).await?;
        }
        Ok(())
    }

    /// alias → tokenizer_version (cached from `/v1/models`); `None` if unmapped or the
    /// service is unreachable (→ no caching).
    async fn tokenizer_version(&self, alias: &str) -> CacheResult<Option<String>> {
        if let Some(v) = self.versions.get(alias).await {
            return Ok(v);
        }
        let Ok(models) = self.tokenizer.models().await else {
            // Service unreachable → deliberately NOT memoised. A genuine "model not in the
            // list" result IS cached below (a stable fact), but an outage is transient: leaving
            // it uncached means caching resumes the instant tokenizer-svc recovers rather than
            // staying dark for the cache TTL. The cost is one cheap failed `/v1/models` GET per
            // cacheable request during the outage — best-effort, off the user path. (Caching
            // None at the 300s TTL here would instead blind the cache for up to 5 min after
            // recovery, which is worse than the redundant probes.)
            return Ok(None);
        };
        let mut found = None;
        for m in models {
            if m.alias == alias {
                found = Some(m.tokenizer_version.clone());
            }
            self.versions.insert(m.alias, Some(m.tokenizer_version)).await;
        }
        if found.is_none() {
            self.versions.insert(alias.to_string(), None).await;
        }
        Ok(found)
    }

    /// Find the longest cached prefix: union the walk-back candidates across all
    /// breakpoints, look them up, and pick the match at the deepest block.
    async fn find_longest_read(&self, scope: &IndexScope, parsed: &super::parse::ParsedPrompt) -> CacheResult<Option<ReadHit>> {
        let mut candidates: Vec<PrefixHash> = Vec::new();
        let mut seen: HashSet<PrefixHash> = HashSet::new();
        for bp in &parsed.breakpoints {
            for h in parsed.read_candidates(bp) {
                if seen.insert(h.clone()) {
                    candidates.push(h);
                }
            }
        }
        let matches = self.index.lookup(scope, &candidates).await?;
        if matches.is_empty() {
            return Ok(None);
        }
        let hash_to_block: HashMap<&[u8], usize> = parsed
            .cumulative_hashes
            .iter()
            .enumerate()
            .map(|(i, h)| (h.as_slice(), i))
            .collect();

        let mut best: Option<ReadHit> = None;
        for m in matches {
            if let Some(&block) = hash_to_block.get(m.prefix_hash.as_slice())
                && best.as_ref().is_none_or(|b| block > b.block)
            {
                best = Some(ReadHit {
                    block,
                    tokens: m.cumulative_token_count,
                    hash: m.prefix_hash,
                    duration: m.ttl_tier.duration(),
                });
            }
        }
        Ok(best)
    }
}

struct ReadHit {
    block: usize,
    tokens: u32,
    hash: PrefixHash,
    duration: chrono::Duration,
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::api::models::users::Role;
    use crate::prompt_cache::{CacheEntry, IndexScope, PostgresIndex, TtlTier, parse_chat_completions};
    use crate::test::utils::{create_test_api_key_for_user, create_test_endpoint, create_test_model, create_test_user};
    use sqlx::PgPool;
    use wiremock::matchers::{method, path};
    use wiremock::{Mock, MockServer, ResponseTemplate};

    const ALIAS: &str = "cache-model";
    const TOK_VER: &str = "sha256:v1";

    /// One marked system block (1h) + an unmarked user block. The prefix is block 0.
    fn body() -> Vec<u8> {
        serde_json::json!({
            "model": ALIAS,
            "messages": [
                {"role":"system","content":[
                    {"type":"text","text":"a long static system prompt","cache_control":{"type":"ephemeral","ttl":"1h"}}
                ]},
                {"role":"user","content":"hello"}
            ]
        })
        .to_string()
        .into_bytes()
    }

    fn prefix_hash() -> PrefixHash {
        parse_chat_completions(&body()).unwrap().cumulative_hashes[0].clone()
    }

    struct H {
        classifier: Classifier,
        secret: String,
        principal_id: uuid::Uuid,
        pool: PgPool,
        _server: MockServer,
    }

    async fn harness(pool: &PgPool, enabled: bool, tokenize_total: u32, min_prefix: i32) -> H {
        let user = create_test_user(pool, Role::StandardUser).await;
        let key = create_test_api_key_for_user(pool, user.id).await;
        let endpoint = create_test_endpoint(pool, "ep", user.id).await;
        let id = create_test_model(pool, "m", ALIAS, endpoint, user.id).await;
        // Presence of a cache-tariff row IS the enable gate: insert one only when enabled.
        if enabled {
            sqlx::query!(
                r#"INSERT INTO model_cache_tariffs
                     (deployed_model_id, write_multiplier_5m, write_multiplier_1h, write_multiplier_24h, min_prefix_tokens)
                   VALUES ($1, 1.25, 2.0, 2.5, $2)"#,
                id,
                min_prefix
            )
            .execute(pool)
            .await
            .unwrap();
        }

        let server = MockServer::start().await;
        Mock::given(method("GET"))
            .and(path("/v1/models"))
            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
                "models": [{"alias": ALIAS, "hf_repo": "org/m", "tokenizer_version": TOK_VER}]
            })))
            .mount(&server)
            .await;
        Mock::given(method("POST"))
            .and(path("/v1/tokenize"))
            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
                "virtual_model": ALIAS, "tokenizer_version": TOK_VER,
                "segment_counts": [tokenize_total], "cumulative": [tokenize_total], "total": tokenize_total
            })))
            .mount(&server)
            .await;

        let classifier = Classifier::new(
            PrincipalResolver::new(pool.clone()),
            ModelConfigResolver::new(pool.clone()),
            TokenizerClient::new(server.uri()),
            Arc::new(PostgresIndex::new(pool.clone())),
        );
        H {
            classifier,
            secret: key.secret,
            principal_id: user.id,
            pool: pool.clone(),
            _server: server,
        }
    }

    fn req<'a>(secret: &'a str, body: &'a [u8]) -> ClassifyRequest<'a> {
        ClassifyRequest {
            virtual_model: ALIAS,
            body,
            api_key: Some(secret),
        }
    }

    #[sqlx::test]
    async fn no_prior_entry_is_all_creation(pool: PgPool) {
        let h = harness(&pool, true, 1500, 1024).await;
        let b = body();
        let ClassifyOutcome { stats, pending, active } = h.classifier.classify(req(&h.secret, &b)).await.unwrap();

        assert!(active, "enabled model is active");
        assert_eq!(stats.read, 0);
        assert_eq!(stats.creation_1h, 1500);
        assert_eq!(stats.creation_total(), 1500);
        assert_eq!(pending.writes.len(), 1);
        assert_eq!(pending.writes[0].cumulative_token_count, 1500);
        assert_eq!(pending.writes[0].ttl_tier, TtlTier::OneHour);
        assert_eq!(pending.writes[0].prefix_hash, prefix_hash());
        assert!(pending.refresh.is_none());
    }

    #[sqlx::test]
    async fn read_hit_is_pure_read(pool: PgPool) {
        let h = harness(&pool, true, 1500, 1024).await;
        // Seed the entry this prefix would write, as if a prior request created it.
        let scope = IndexScope {
            principal_id: h.principal_id,
            virtual_model: ALIAS.to_string(),
            tokenizer_version: TOK_VER.to_string(),
        };
        PostgresIndex::new(h.pool.clone())
            .write(&CacheEntry {
                scope: scope.clone(),
                prefix_hash: prefix_hash(),
                cumulative_token_count: 1500,
                ttl_tier: TtlTier::OneHour,
                expires_at: Utc::now() + chrono::Duration::hours(1),
            })
            .await
            .unwrap();

        let b = body();
        let ClassifyOutcome { stats, pending, active } = h.classifier.classify(req(&h.secret, &b)).await.unwrap();
        assert!(active);
        assert_eq!(stats.read, 1500);
        assert_eq!(stats.creation_total(), 0, "a full read writes nothing");
        assert!(pending.writes.is_empty());
        assert!(pending.refresh.is_some(), "a read slides the entry's TTL");
    }

    #[sqlx::test]
    async fn below_floor_is_no_cache(pool: PgPool) {
        let h = harness(&pool, true, 500, 1024).await; // 500 < 1024
        let b = body();
        // Enabled but below the floor → active (uniform zeros) with nothing to commit.
        let out = h.classifier.classify(req(&h.secret, &b)).await.unwrap();
        assert!(out.active, "an enabled model stays active even below the floor");
        assert!(out.stats.is_zero());
        assert!(out.pending.is_empty());
    }

    #[sqlx::test]
    async fn disabled_model_is_inactive(pool: PgPool) {
        let h = harness(&pool, false, 1500, 1024).await; // not enabled
        let b = body();
        let out = h.classifier.classify(req(&h.secret, &b)).await.unwrap();
        assert!(!out.active, "a disabled model is inactive → response left untouched");
        assert!(out.stats.is_zero());
        assert!(out.pending.is_empty());
    }

    #[sqlx::test]
    async fn no_markers_is_zero_active(pool: PgPool) {
        let h = harness(&pool, true, 1500, 1024).await;
        let b = serde_json::json!({
            "model": ALIAS,
            "messages": [{"role":"user","content":"hi, no markers here"}]
        })
        .to_string()
        .into_bytes();
        // Enabled model, no markers → active (uniform zeros), nothing committed.
        let out = h.classifier.classify(req(&h.secret, &b)).await.unwrap();
        assert!(out.active, "enabled model with no markers still presents zero cache fields");
        assert!(out.stats.is_zero());
        assert!(out.pending.is_empty());
    }
}