Skip to main content

devboy_format_pipeline/
layered_pipeline.rs

1//! `LayeredPipeline` — generic tool-response compressor.
2//!
3//! Implements the 4-layer architecture from Paper 2 (see
4//! `docs/research/paper-2-mckp-format-adaptive.md`):
5//!
6//! ```text
7//!     L0: Hint-based dedup (content-hash LRU + mutation invalidation)
8//!       ↓ fresh
9//!     L1: Per-endpoint templates (csv_from_md, pipeline_deep_mckp, ...)
10//!       ↓ no template match
11//!     L2: Generic MCKP (shape classifier → format encoder)
12//!       ↓ no gain
13//!     L3: As-is passthrough
14//! ```
15//!
16//! A single `LayeredPipeline` instance is scoped to one session. It is
17//! **not** thread-safe by value — wrap in `Arc<Mutex<...>>` if shared
18//! across threads.
19//!
20//! # Usage
21//!
22//! ```
23//! use devboy_format_pipeline::adaptive_config::AdaptiveConfig;
24//! use devboy_format_pipeline::layered_pipeline::{LayeredPipeline, ToolResponseInput};
25//!
26//! let mut p = LayeredPipeline::new("sess_abcd".into(), AdaptiveConfig::default());
27//! // Response must be ≥ min_body_chars (default 200) for L0 to engage.
28//! let body = "fn main() { println!(\"hello\"); }\n".repeat(10);
29//! let out1 = p.process(ToolResponseInput {
30//!     tool_call_id: "tc_1",
31//!     tool_name: "Read",
32//!     file_path: Some("/src/main.rs"),
33//!     content: &body,
34//!     is_sidechain: false,
35//!     ts_ms: 0,
36//!     enricher_prefetched: false,
37//!     enricher_predicted_cost_tokens: 0,
38//! });
39//! // Second call with identical content emits a hint.
40//! let out2 = p.process(ToolResponseInput {
41//!     tool_call_id: "tc_2",
42//!     tool_name: "Read",
43//!     file_path: Some("/src/main.rs"),
44//!     content: &body,
45//!     is_sidechain: false,
46//!     ts_ms: 10,
47//!     enricher_prefetched: false,
48//!     enricher_predicted_cost_tokens: 0,
49//! });
50//! assert!(matches!(out2.layer, devboy_format_pipeline::telemetry::Layer::L0));
51//! ```
52
53use std::sync::Arc;
54
55use crate::adaptive_config::AdaptiveConfig;
56use crate::dedup::{DedupCache, DedupDecision, ToolKind, content_hash, render_reference_hint_with};
57use crate::shape::{ClassifiedResponse, classify};
58use crate::telemetry::{Layer, PipelineEvent, Shape, TelemetrySink};
59
60/// Input to [`LayeredPipeline::process`].
61#[derive(Debug, Clone, Copy)]
62pub struct ToolResponseInput<'a> {
63    /// Raw `tool_use_id` from the MCP server (we hash it internally before
64    /// storing in cache or emitting in hints).
65    pub tool_call_id: &'a str,
66    /// Tool name — used for endpoint classification and ToolKind lookup.
67    pub tool_name: &'a str,
68    /// Primary file path argument for file-operating tools (`Read`, `Edit`, …).
69    /// Hashed internally for cache invalidation.
70    pub file_path: Option<&'a str>,
71    /// Raw response bytes.
72    pub content: &'a str,
73    /// True for subagent (sidechain) responses.
74    pub is_sidechain: bool,
75    /// Unix milliseconds when the response was produced.
76    pub ts_ms: i64,
77    /// Paper 3 — `true` when this response landed via the speculative
78    /// pre-fetch dispatcher (host called the tool ahead of the LLM
79    /// asking). Sets `PipelineEvent.enricher_prefetched` so the
80    /// offline post-pass can attribute citations.
81    /// Default `false` — the LLM emitted the call directly.
82    pub enricher_prefetched: bool,
83    /// Paper 3 — `cost_model.typical_kb`-derived prediction (in
84    /// tokens) that the planner committed to when admitting this
85    /// call. `0` when not a prefetch (LLM-emitted) — the cost-overrun
86    /// rate denominator skips events with `enricher_predicted_cost_tokens
87    /// = 0`.
88    pub enricher_predicted_cost_tokens: u32,
89}
90
91/// Output from [`LayeredPipeline::process`].
92#[derive(Debug, Clone)]
93pub struct ProcessedResponse {
94    /// The text to send back to the LLM agent (either the original content,
95    /// a hint, or a re-encoded body).
96    pub output: String,
97    /// Which layer terminated the decision.
98    pub layer: Layer,
99    /// Optional identifier for L1/L2 format or template used.
100    pub format_or_template: Option<String>,
101    /// Tokens baseline (original) minus tokens final.
102    pub tokens_saved: i64,
103    /// Token count of the `output` we're returning.
104    pub tokens_final: u32,
105}
106
107/// Session-scoped layered pipeline.
108///
109/// Holds mutable dedup state plus the full configuration. Each
110/// `process` call advances the internal event counter and, if a
111/// telemetry sink is attached, emits a [`PipelineEvent`].
112pub struct LayeredPipeline {
113    session_hash: String,
114    config: AdaptiveConfig,
115    dedup: DedupCache,
116    telemetry: Option<Arc<dyn TelemetrySink>>,
117    event_counter: u64,
118    /// Counts events that reached the sink — used to trigger periodic flush
119    /// according to `telemetry.flush_every_n`.
120    recorded_counter: u64,
121}
122
123impl LayeredPipeline {
124    /// Construct a new session pipeline.
125    ///
126    /// The shared LRU cache is sized to the maximum capacity requested by any
127    /// endpoint-level override (or the global `dedup.lru_size` otherwise), so
128    /// per-endpoint hints that ask for a larger working set are respected.
129    pub fn new(session_hash: String, config: AdaptiveConfig) -> Self {
130        let lru = config.max_lru_size();
131        Self {
132            session_hash,
133            dedup: DedupCache::with_capacity(lru),
134            config,
135            telemetry: None,
136            event_counter: 0,
137            recorded_counter: 0,
138        }
139    }
140
141    /// Attach a telemetry sink. Every subsequent `process` emits an event.
142    pub fn with_telemetry(mut self, sink: Arc<dyn TelemetrySink>) -> Self {
143        self.telemetry = Some(sink);
144        self
145    }
146
147    /// Signal a compaction boundary — clears the dedup cache and advances
148    /// the partition counter.
149    pub fn on_compaction_boundary(&mut self) {
150        self.dedup.on_compaction_boundary();
151    }
152
153    /// Current dedup cache partition number.
154    pub fn partition(&self) -> u64 {
155        self.dedup.partition()
156    }
157
158    /// Drop every cache entry tagged with `file_path`. Called by the host
159    /// after a mutating tool (`Edit` / `Write` / `MultiEdit` / …) so the
160    /// next `Read` of the same file returns the fresh body, not a hint.
161    pub fn invalidate_file(&mut self, file_path: &str) -> usize {
162        let hash = crate::dedup_util::file_path_hash(file_path);
163        self.dedup.invalidate_file(&hash)
164    }
165
166    /// Token count for `text` under the active tokenizer profile.
167    ///
168    /// Always routed through [`TokenizerProfile::count_tokens`] so a
169    /// custom `chars_per_token` set in `[profiles.tokenizer.variants]`
170    /// actually takes effect — earlier versions hard-coded `chars/4`
171    /// for the heuristic branch and silently ignored the config knob.
172    /// For Python-extractor parity, the matching variant ships
173    /// `chars_per_token = 4.0`.
174    fn tokens(&self, text: &str) -> u32 {
175        self.config.effective_tokenizer_profile().count_tokens(text) as u32
176    }
177
178    /// Dispatch a tool response through the 4-layer pipeline.
179    pub fn process(&mut self, input: ToolResponseInput<'_>) -> ProcessedResponse {
180        self.event_counter += 1;
181        let baseline_tokens = self.tokens(input.content);
182
183        // Mutation-aware invalidation must fire even for tiny Edit responses —
184        // the invalidation itself is correctness-critical, not an optimization.
185        let tool_kind = ToolKind::from_tool_name(input.tool_name);
186        let file_path_hash = input
187            .file_path
188            .filter(|_| matches!(tool_kind, ToolKind::FileRead | ToolKind::FileMutate))
189            .map(crate::dedup_util::file_path_hash);
190
191        if tool_kind == ToolKind::FileMutate
192            && let Some(ref fh) = file_path_hash
193        {
194            self.dedup.invalidate_file(fh);
195        }
196
197        // Short-circuit: L3 for too-small bodies (not worth any optimization).
198        let min_chars = self.config.effective_min_body_chars(input.tool_name).max(1);
199        if input.content.len() < min_chars {
200            let out = ProcessedResponse {
201                output: input.content.to_string(),
202                layer: Layer::L3,
203                format_or_template: None,
204                tokens_saved: 0,
205                tokens_final: baseline_tokens,
206            };
207            self.emit_event(&input, &out, None, None, None, None);
208            return out;
209        }
210
211        // === L0: content-hash dedup ===
212        let endpoint_ok = self.config.effective_dedup_enabled(input.tool_name);
213        let content_hash_value = content_hash(input.content.as_bytes());
214        let content_sha_hex = hex_of(&content_hash_value);
215
216        if endpoint_ok
217            && let DedupDecision::Hint {
218                reference_tool_call_id,
219            } = self.dedup.check(&content_hash_value)
220        {
221            let hint = render_reference_hint_with(
222                &reference_tool_call_id,
223                self.config.dedup.hint_verbosity.to_runtime(),
224                Some(tool_kind),
225            );
226            let tokens_final = self.tokens(&hint);
227            let out = ProcessedResponse {
228                output: hint,
229                layer: Layer::L0,
230                format_or_template: Some("hint_exact".into()),
231                tokens_saved: baseline_tokens as i64 - tokens_final as i64,
232                tokens_final,
233            };
234            self.emit_event(
235                &input,
236                &out,
237                None,
238                Some(&content_sha_hex),
239                file_path_hash.as_deref(),
240                None,
241            );
242            return out;
243        }
244
245        // Not byte-identical — try Type-2 (near-duplicate) hint when
246        // enabled. The cache must hold a body snapshot for at least one
247        // matching prior entry; otherwise this is a no-op fall-through
248        // to L1.
249        if endpoint_ok && self.config.dedup.near_ref_enabled {
250            let near_cfg = crate::near_ref::NearRefConfig::default();
251            if let Some((reference_tool_call_id, deltas)) =
252                self.dedup.find_near_ref(input.content, &near_cfg)
253            {
254                let hint = crate::near_ref::render_near_ref_hint(&reference_tool_call_id, &deltas);
255                let tokens_final = self.tokens(&hint);
256                let out = ProcessedResponse {
257                    output: hint,
258                    layer: Layer::L0,
259                    format_or_template: Some("hint_near".into()),
260                    tokens_saved: baseline_tokens as i64 - tokens_final as i64,
261                    tokens_final,
262                };
263                self.emit_event(
264                    &input,
265                    &out,
266                    None,
267                    Some(&content_sha_hex),
268                    file_path_hash.as_deref(),
269                    None,
270                );
271                // Even on a near-ref hit, still cache the *new* body so
272                // a later turn that drifts further can build a delta off
273                // the most recent state.
274                let tc_hash = short_hash(input.tool_call_id);
275                self.dedup.insert_with_body(
276                    content_hash_value,
277                    tc_hash,
278                    tool_kind,
279                    file_path_hash.clone(),
280                    std::sync::Arc::new(input.content.to_string()),
281                    input.tool_name,
282                );
283                return out;
284            }
285        }
286
287        // Not a duplicate — insert into cache for future reference. When
288        // near-ref is enabled, also retain the body so future turns can
289        // diff against it; otherwise stick to the cheaper hash-only entry.
290        //
291        // `tool_name` is recorded so `DedupCache::invalidate_by_tool`
292        // (Paper 3 §Cross-tool invalidation) can later drop entries for
293        // tools that this writer's `ToolValueModel.invalidates` lists.
294        //
295        // We pass `input.tool_name` *as-is* (e.g. `mcp__gitlab__get_issue`).
296        // Anonymization (`mcp__p<hash6>__verb`) only applies to the
297        // public corpus aggregates in `docs/research/data/paper3_*.csv`;
298        // `[tools.<name>]` keys, `effective_tool_value_model` lookups,
299        // and the dedup cache all use the live runtime name. Otherwise
300        // a user override `[tools."mcp__gitlab__update_issue"]` would
301        // never be found, and `invalidates = ["mcp__gitlab__get_issue"]`
302        // would never match a cached entry.
303        let tc_hash = short_hash(input.tool_call_id);
304        if self.config.dedup.near_ref_enabled {
305            self.dedup.insert_with_body(
306                content_hash_value,
307                tc_hash.clone(),
308                tool_kind,
309                file_path_hash.clone(),
310                std::sync::Arc::new(input.content.to_string()),
311                input.tool_name,
312            );
313        } else {
314            self.dedup.insert(
315                content_hash_value,
316                tc_hash.clone(),
317                tool_kind,
318                file_path_hash.clone(),
319                input.tool_name,
320            );
321        }
322
323        // Paper 3 cross-tool invalidation: if the tool that just ran
324        // declares an `invalidates` list in its value model, drop every
325        // matching cached entry now so the next call returns fresh.
326        if let Some(model) = self.config.effective_tool_value_model(input.tool_name)
327            && !model.invalidates.is_empty()
328        {
329            self.dedup.invalidate_by_tool(&model.invalidates);
330        }
331
332        // === Shape classification (used by L1/L2) ===
333        let classified = classify(input.content);
334
335        // === L1: per-endpoint templates ===
336        if let Some(t_id) = self
337            .config
338            .effective_template(input.tool_name)
339            .map(str::to_string)
340            && self.config.templates.is_template_active(&t_id)
341            && let Some(body) = crate::templates::apply_by_id(&t_id, input.content, &classified)
342        {
343            let tokens_final = self.tokens(&body);
344            if tokens_final < baseline_tokens {
345                let out = ProcessedResponse {
346                    output: body,
347                    layer: Layer::L1,
348                    format_or_template: Some(t_id.clone()),
349                    tokens_saved: baseline_tokens as i64 - tokens_final as i64,
350                    tokens_final,
351                };
352                self.emit_event(
353                    &input,
354                    &out,
355                    Some(&classified),
356                    Some(&content_sha_hex),
357                    file_path_hash.as_deref(),
358                    Some(&t_id),
359                );
360                return out;
361            }
362        }
363
364        // === L2: generic MCKP ===
365        if let Some((fmt_id, body)) =
366            crate::mckp_router::route(&self.config.mckp, input.content, &classified)
367        {
368            let tokens_final = self.tokens(&body);
369            if tokens_final < baseline_tokens {
370                let out = ProcessedResponse {
371                    output: body,
372                    layer: Layer::L2,
373                    format_or_template: Some(fmt_id.to_string()),
374                    tokens_saved: baseline_tokens as i64 - tokens_final as i64,
375                    tokens_final,
376                };
377                self.emit_event(
378                    &input,
379                    &out,
380                    Some(&classified),
381                    Some(&content_sha_hex),
382                    file_path_hash.as_deref(),
383                    None,
384                );
385                return out;
386            }
387        }
388
389        // === L3: passthrough ===
390        let out = ProcessedResponse {
391            output: input.content.to_string(),
392            layer: Layer::L3,
393            format_or_template: None,
394            tokens_saved: 0,
395            tokens_final: baseline_tokens,
396        };
397        self.emit_event(
398            &input,
399            &out,
400            Some(&classified),
401            Some(&content_sha_hex),
402            file_path_hash.as_deref(),
403            None,
404        );
405        out
406    }
407
408    /// Deterministic sampler — returns `true` iff the current event should be
409    /// written to the sink under the configured `telemetry.sample_rate`.
410    ///
411    /// Deterministic so replays over the same session are reproducible.
412    fn should_sample(&self) -> bool {
413        let rate = self.config.telemetry.sample_rate.clamp(0.0, 1.0);
414        if rate >= 1.0 {
415            return true;
416        }
417        if rate <= 0.0 {
418            return false;
419        }
420        // Fixed-interval stride matching the requested rate.
421        let stride = (1.0 / rate).round().max(1.0) as u64;
422        self.event_counter.is_multiple_of(stride)
423    }
424
425    fn emit_event(
426        &mut self,
427        input: &ToolResponseInput<'_>,
428        out: &ProcessedResponse,
429        classified: Option<&ClassifiedResponse>,
430        content_sha_hex: Option<&str>,
431        file_path_hash: Option<&str>,
432        template_id: Option<&str>,
433    ) {
434        let Some(sink) = self.telemetry.clone() else {
435            return;
436        };
437        if !self.should_sample() {
438            return;
439        }
440        let shape = classified.map(|c| c.shape).unwrap_or(Shape::Unknown);
441        let inner_formats = classified
442            .map(|c| {
443                c.inner_formats
444                    .iter()
445                    .map(|f| f.as_tag().to_string())
446                    .collect::<Vec<_>>()
447            })
448            .unwrap_or_default();
449        let evt = PipelineEvent {
450            session_hash: self.session_hash.clone(),
451            tool_call_id_hash: short_hash(input.tool_call_id),
452            tool_name_anon: anonymize_tool_name(input.tool_name),
453            endpoint_class: input.tool_name.to_string(),
454            response_chars: input.content.len() as u64,
455            shape,
456            inner_formats,
457            content_sha_prefix_hex: content_sha_hex.unwrap_or_default().to_string(),
458            file_path_hash: file_path_hash.map(String::from),
459            is_dedup_hit: matches!(out.layer, Layer::L0),
460            layer_used: out.layer,
461            template_id: template_id.map(String::from),
462            tokens_baseline: self.tokens(input.content),
463            tokens_final: out.tokens_final,
464            context_partition: self.dedup.partition() as u32,
465            is_sidechain: input.is_sidechain,
466            ts_ms: input.ts_ms,
467            sample_rate_applied: self.config.telemetry.sample_rate,
468            // Paper 3 enricher fields — set on the input by the host
469            // when the call was issued by the speculative dispatcher.
470            // LLM-emitted calls leave both fields default (false / 0).
471            enricher_prefetched: input.enricher_prefetched,
472            enricher_predicted_cost_tokens: input.enricher_predicted_cost_tokens,
473            enricher_decline_reason: None,
474            cited_in_next_n_turns: None,
475        };
476        if sink.record(&evt).is_err() {
477            return; // Best-effort; do not fail the pipeline.
478        }
479        self.recorded_counter += 1;
480        let flush_every = self.config.telemetry.flush_every_n.max(1) as u64;
481        if self.recorded_counter.is_multiple_of(flush_every) {
482            let _ = sink.flush();
483        }
484    }
485}
486
487// ─── INLINE HELPERS (internal; no external API surface) ─────────────────
488
489/// 32-char hex of a 16-byte (128-bit) content hash prefix. Matches the
490/// `content_sha_prefix_hex` field documented in
491/// `docs/research/paper-2-mckp-format-adaptive.md` §Telemetry.
492fn hex_of(bytes: &[u8; 16]) -> String {
493    let mut out = String::with_capacity(32);
494    for b in bytes {
495        out.push_str(&format!("{:02x}", b));
496    }
497    out
498}
499
500/// Short hash for tool_call_id / other anon identifiers.
501pub(crate) fn short_hash(s: &str) -> String {
502    use sha2::{Digest, Sha256};
503    let digest = Sha256::digest(s.as_bytes());
504    let mut out = String::with_capacity(8);
505    for b in &digest[..4] {
506        out.push_str(&format!("{:02x}", b));
507    }
508    out
509}
510
511/// Redact MCP slugs the same way `docs/research/scripts/analyze_sessions.py` does.
512pub(crate) fn anonymize_tool_name(name: &str) -> String {
513    if !name.starts_with("mcp__") {
514        return name.to_string();
515    }
516    let inner = &name[5..];
517    if let Some(verb_start) = inner.rfind("__") {
518        let slug = &inner[..verb_start];
519        let verb = &inner[verb_start + 2..];
520        let slug_hash = short_hash(slug);
521        return format!("mcp__p{}__{}", &slug_hash[..6], verb);
522    }
523    name.to_string()
524}
525
526// ─── TESTS ──────────────────────────────────────────────────────────────
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use crate::telemetry::MemorySink;
532    use std::sync::Arc;
533
534    fn input<'a>(
535        tc: &'a str,
536        tool: &'a str,
537        file: Option<&'a str>,
538        content: &'a str,
539    ) -> ToolResponseInput<'a> {
540        ToolResponseInput {
541            tool_call_id: tc,
542            tool_name: tool,
543            file_path: file,
544            content,
545            is_sidechain: false,
546            ts_ms: 0,
547            enricher_prefetched: false,
548            enricher_predicted_cost_tokens: 0,
549        }
550    }
551
552    #[test]
553    fn first_read_is_fresh_second_is_dedup() {
554        let mut p = LayeredPipeline::new("s1".into(), AdaptiveConfig::default());
555        let body = "fn main() { println!(\"hello\"); }\n".repeat(20);
556        let o1 = p.process(input("tc_1", "Read", Some("/src/main.rs"), &body));
557        assert_eq!(o1.layer, Layer::L3); // prose content, no MCKP gain
558        let o2 = p.process(input("tc_2", "Read", Some("/src/main.rs"), &body));
559        assert_eq!(o2.layer, Layer::L0);
560        assert!(o2.tokens_saved > 0);
561        assert!(o2.output.starts_with("> [ref:"));
562    }
563
564    #[test]
565    fn edit_invalidates_file_read() {
566        let mut p = LayeredPipeline::new("s2".into(), AdaptiveConfig::default());
567        let body = "original content ".repeat(30);
568        // First read: cache
569        p.process(input("tc_1", "Read", Some("/src/x.rs"), &body));
570        // Edit invalidates
571        p.process(input(
572            "tc_2",
573            "Edit",
574            Some("/src/x.rs"),
575            "edit response body long enough to not be skipped",
576        ));
577        // Re-read with same content — should now be Fresh, not a hint
578        let o = p.process(input("tc_3", "Read", Some("/src/x.rs"), &body));
579        assert_eq!(o.layer, Layer::L3);
580    }
581
582    #[test]
583    fn compaction_clears_cache() {
584        let mut p = LayeredPipeline::new("s3".into(), AdaptiveConfig::default());
585        let body = "x".repeat(300);
586        p.process(input("tc_1", "Bash", None, &body));
587        p.on_compaction_boundary();
588        let o = p.process(input("tc_2", "Bash", None, &body));
589        assert_eq!(o.layer, Layer::L3);
590    }
591
592    #[test]
593    fn tiny_body_goes_straight_to_l3() {
594        let mut p = LayeredPipeline::new("s4".into(), AdaptiveConfig::default());
595        let o = p.process(input("tc_1", "Bash", None, "short"));
596        assert_eq!(o.layer, Layer::L3);
597        assert_eq!(o.tokens_saved, 0);
598    }
599
600    #[test]
601    fn telemetry_sink_receives_events() {
602        let sink = Arc::new(MemorySink::new());
603        let mut p = LayeredPipeline::new("s5".into(), AdaptiveConfig::default())
604            .with_telemetry(sink.clone());
605        let body = "x".repeat(500);
606        p.process(input("tc_1", "Bash", None, &body));
607        p.process(input("tc_2", "Bash", None, &body));
608        let events = sink.events();
609        assert_eq!(events.len(), 2);
610        assert_eq!(events[1].layer_used, Layer::L0);
611        assert!(events[1].is_dedup_hit);
612    }
613
614    #[test]
615    fn mcp_tool_names_are_anonymized() {
616        let name = "mcp__super_secret_internal_slug__get_issues";
617        let anon = anonymize_tool_name(name);
618        assert!(anon.starts_with("mcp__p"));
619        assert!(anon.ends_with("__get_issues"));
620        assert!(!anon.contains("super_secret"));
621    }
622
623    #[test]
624    fn markdown_table_gets_l2_csv_encoding() {
625        let mut p = LayeredPipeline::new("s6".into(), AdaptiveConfig::default());
626        let md = "| id | name | status |\n|----|------|--------|\n| 1 | a | ok |\n| 2 | b | ok |\n| 3 | c | bad |\n| 4 | d | ok |\n| 5 | e | bad |\n";
627        // Need body >= min_body_chars (200). Pad the table.
628        let padded = md.repeat(3);
629        let o = p.process(input("tc_1", "Bash", None, &padded));
630        // csv_from_md might apply if body is short enough, or L3 for plain text
631        // We just assert the pipeline returns something valid.
632        assert!(o.output.len() <= padded.len() + 100);
633    }
634
635    #[test]
636    fn json_flat_object_passthrough_when_small() {
637        let mut p = LayeredPipeline::new("s7".into(), AdaptiveConfig::default());
638        let body = r#"{"id":1,"name":"test","status":"ok"}"#;
639        let o = p.process(input("tc_1", "Bash", None, body));
640        // Small body → L3
641        assert_eq!(o.layer, Layer::L3);
642    }
643
644    #[test]
645    fn multi_session_cache_isolation() {
646        // Same content in different pipelines must NOT hit.
647        let body = "x".repeat(500);
648        let mut p1 = LayeredPipeline::new("s_a".into(), AdaptiveConfig::default());
649        let mut p2 = LayeredPipeline::new("s_b".into(), AdaptiveConfig::default());
650        p1.process(input("tc_1", "Bash", None, &body));
651        let o2 = p2.process(input("tc_1", "Bash", None, &body));
652        assert_eq!(o2.layer, Layer::L3);
653    }
654
655    #[test]
656    fn partition_counter_advances_on_compaction() {
657        let mut p = LayeredPipeline::new("s_part".into(), AdaptiveConfig::default());
658        assert_eq!(p.partition(), 0);
659        p.on_compaction_boundary();
660        assert_eq!(p.partition(), 1);
661        p.on_compaction_boundary();
662        assert_eq!(p.partition(), 2);
663    }
664
665    #[test]
666    fn endpoint_override_disables_dedup() {
667        let mut cfg = AdaptiveConfig::default();
668        cfg.endpoint_overrides.insert(
669            "Bash".into(),
670            crate::adaptive_config::EndpointOverride {
671                dedup_enabled: Some(false),
672                ..Default::default()
673            },
674        );
675        let sink = Arc::new(MemorySink::new());
676        let mut p = LayeredPipeline::new("s_disabled".into(), cfg).with_telemetry(sink.clone());
677        let body = "y".repeat(500);
678        p.process(input("tc_1", "Bash", None, &body));
679        let o2 = p.process(input("tc_2", "Bash", None, &body));
680        // dedup disabled → no L0 hint even on identical content
681        assert_eq!(o2.layer, Layer::L3);
682        assert!(!sink.events()[1].is_dedup_hit);
683    }
684
685    #[test]
686    fn per_endpoint_min_body_chars_override() {
687        let mut cfg = AdaptiveConfig::default();
688        cfg.endpoint_overrides.insert(
689            "Bash".into(),
690            crate::adaptive_config::EndpointOverride {
691                min_body_chars: Some(50),
692                ..Default::default()
693            },
694        );
695        let mut p = LayeredPipeline::new("s_min".into(), cfg);
696        // 60-char body would be skipped under default 200 but processed under override 50.
697        let body = "z".repeat(60);
698        p.process(input("tc_1", "Bash", None, &body));
699        let o2 = p.process(input("tc_2", "Bash", None, &body));
700        assert_eq!(o2.layer, Layer::L0); // dedup fires because we cached tc_1
701    }
702
703    #[test]
704    fn sample_rate_zero_skips_all_events() {
705        let mut cfg = AdaptiveConfig::default();
706        cfg.telemetry.sample_rate = 0.0;
707        let sink = Arc::new(MemorySink::new());
708        let mut p = LayeredPipeline::new("s_rate0".into(), cfg).with_telemetry(sink.clone());
709        let body = "q".repeat(500);
710        for i in 0..5 {
711            p.process(input(&format!("tc_{i}"), "Bash", None, &body));
712        }
713        assert_eq!(sink.events().len(), 0);
714    }
715
716    #[test]
717    fn sample_rate_half_keeps_every_other() {
718        let mut cfg = AdaptiveConfig::default();
719        cfg.telemetry.sample_rate = 0.5;
720        let sink = Arc::new(MemorySink::new());
721        let mut p = LayeredPipeline::new("s_half".into(), cfg).with_telemetry(sink.clone());
722        let body = "w".repeat(500);
723        for i in 0..10 {
724            p.process(input(&format!("tc_{i}"), "Bash", None, &body));
725        }
726        // Stride sampler keeps 1-in-2 events (events 2, 4, 6, 8, 10).
727        assert_eq!(sink.events().len(), 5);
728    }
729
730    #[test]
731    fn hint_verbosity_terse_is_honoured() {
732        let mut cfg = AdaptiveConfig::default();
733        cfg.dedup.hint_verbosity = crate::adaptive_config::HintVerbosity::Terse;
734        let mut p = LayeredPipeline::new("s_terse".into(), cfg);
735        let body = "terse body of sufficient length ".repeat(20);
736        p.process(input("tc_1", "Bash", None, &body));
737        let o2 = p.process(input("tc_2", "Bash", None, &body));
738        assert_eq!(o2.layer, Layer::L0);
739        assert!(!o2.output.contains("byte-identical"));
740    }
741
742    #[test]
743    fn inner_formats_populated_in_telemetry() {
744        let sink = Arc::new(MemorySink::new());
745        let mut p = LayeredPipeline::new("s_inner".into(), AdaptiveConfig::default())
746            .with_telemetry(sink.clone());
747        // Response with embedded URL — classifier should populate inner_formats.
748        let body = format!(
749            "Line 1\nSee https://example.com/resource for details.\n{}",
750            "filler ".repeat(50)
751        );
752        p.process(input("tc_1", "Bash", None, &body));
753        let events = sink.events();
754        assert!(
755            !events[0].inner_formats.is_empty(),
756            "inner_formats should populate from classifier"
757        );
758        assert!(events[0].inner_formats.iter().any(|f| f == "url"));
759    }
760
761    #[test]
762    fn cache_capacity_grows_via_endpoint_lru_override() {
763        let mut cfg = AdaptiveConfig::default();
764        cfg.endpoint_overrides.insert(
765            "ep".into(),
766            crate::adaptive_config::EndpointOverride {
767                lru_size: Some(12),
768                ..Default::default()
769            },
770        );
771        // Pipeline is constructed with the larger capacity — no direct getter,
772        // but we can verify by inserting many distinct responses and confirming
773        // that the 12th+ entry evicts the 1st.
774        let mut p = LayeredPipeline::new("s_lru".into(), cfg);
775        let distinct: Vec<String> = (0..13)
776            .map(|i| format!("{}{}", i, "x".repeat(300)))
777            .collect();
778        for (i, b) in distinct.iter().enumerate() {
779            p.process(input(&format!("tc_{i}"), "Bash", None, b));
780        }
781        // First entry now evicted (capacity 12), so recalling it should be Fresh.
782        let o = p.process(input("tc_recheck", "Bash", None, &distinct[0]));
783        assert_eq!(o.layer, Layer::L3);
784    }
785
786    #[test]
787    fn tokens_method_falls_back_to_heuristic_by_default() {
788        let cfg = AdaptiveConfig::default();
789        // Default profile is "auto" → resolves to anthropic_class which now
790        // selects O200kBase BPE. Pipeline.tokens should therefore *not* match
791        // the legacy chars/4 estimate on a sufficiently long body.
792        let p = LayeredPipeline::new("s_tk".into(), cfg);
793        let body = "a".repeat(2_000);
794        let bpe_count = p.tokens(&body);
795        let heuristic = (body.len() / 4) as u32;
796        // BPE on a long run of a single character compresses much better than
797        // chars/4 — assert they differ (and BPE is smaller, the whole point of
798        // bringing in tiktoken-rs).
799        assert!(
800            bpe_count < heuristic,
801            "expected BPE count {bpe_count} < heuristic {heuristic} on a degenerate input"
802        );
803    }
804
805    #[test]
806    fn near_ref_enabled_emits_delta_hint_for_pipeline_polling() {
807        // Two responses to the same MCP endpoint differing only in
808        // `status` and `duration` — the canonical Type-2 case.
809        let body_a = format!(
810            r#"{{"id":42,"name":"deploy","status":"pending","duration":10,"url":"https://example.com/p/42","commit_sha":"abcd","triggered_by":"webhook","preview":"{}"}}"#,
811            "x".repeat(500)
812        );
813        let body_b = format!(
814            r#"{{"id":42,"name":"deploy","status":"success","duration":42,"url":"https://example.com/p/42","commit_sha":"abcd","triggered_by":"webhook","preview":"{}"}}"#,
815            "x".repeat(500)
816        );
817
818        let mut cfg = AdaptiveConfig::default();
819        cfg.dedup.near_ref_enabled = true;
820        let mut p = LayeredPipeline::new("s_near".into(), cfg);
821
822        let r1 = p.process(input("tc_pipeline_1", "Bash", None, &body_a));
823        assert_eq!(r1.layer, Layer::L3, "first call must be fresh");
824
825        let r2 = p.process(input("tc_pipeline_2", "Bash", None, &body_b));
826        assert_eq!(r2.layer, Layer::L0, "second call must hit L0 via near-ref");
827        assert_eq!(r2.format_or_template.as_deref(), Some("hint_near"));
828        assert!(
829            r2.output.contains("near-ref"),
830            "expected near-ref hint, got `{}`",
831            r2.output
832        );
833        // Both differing scalar fields must show in the hint.
834        assert!(r2.output.contains("status"));
835        assert!(r2.output.contains("duration"));
836        // Compaction-friendly bound — the rendered hint must be tiny.
837        assert!(
838            r2.output.len() < body_b.len() / 5,
839            "near-ref hint should be far smaller than the body"
840        );
841    }
842
843    #[test]
844    fn near_ref_disabled_falls_through_when_bodies_drift() {
845        let body_a = format!(
846            r#"{{"id":42,"status":"pending","preview":"{}"}}"#,
847            "x".repeat(500)
848        );
849        let body_b = format!(
850            r#"{{"id":42,"status":"success","preview":"{}"}}"#,
851            "x".repeat(500)
852        );
853
854        let mut cfg = AdaptiveConfig::default();
855        cfg.dedup.near_ref_enabled = false; // explicit
856        let mut p = LayeredPipeline::new("s_no_near".into(), cfg);
857        let _ = p.process(input("tc_a", "Bash", None, &body_a));
858        let r2 = p.process(input("tc_b", "Bash", None, &body_b));
859        // Without near-ref, the second call goes to L3 (or L2 if shape
860        // happens to match) — but never to L0/hint_near.
861        assert_ne!(r2.format_or_template.as_deref(), Some("hint_near"));
862    }
863
864    #[test]
865    fn tokens_method_honours_profile_chars_per_token() {
866        let mut cfg = AdaptiveConfig::default();
867        // Force the active tokenizer to `ollama_bpe`, which ships
868        // `chars_per_token = 3.8` and `bpe = Heuristic`. The pipeline
869        // must respect the configured ratio (8 / 3.8 ≈ 2.1 → 3 tokens
870        // after ceil); earlier versions silently hard-coded chars/4
871        // and produced 2 instead — Copilot review on PR #207.
872        cfg.profiles.tokenizer.active = "ollama_bpe".into();
873        let p = LayeredPipeline::new("s_h".into(), cfg);
874        let body = "abcdefgh"; // 8 chars
875        assert_eq!(p.tokens(body), 3);
876    }
877
878    #[test]
879    fn cross_tool_invalidation_drops_cached_response() {
880        // Paper 3 P-3-07 — `update_issue` declares
881        // `invalidates = ["get_issue"]`. After `get_issue` is cached
882        // and `update_issue` runs, a re-read of the same `get_issue`
883        // body must come back fresh (not as a hint).
884        use devboy_core::ToolValueModel;
885        let mut cfg = AdaptiveConfig::default();
886        cfg.tools.insert(
887            "update_issue".into(),
888            ToolValueModel {
889                invalidates: vec!["get_issue".into()],
890                ..ToolValueModel::default()
891            },
892        );
893        let mut p = LayeredPipeline::new("s_invl".into(), cfg);
894        let body = "x".repeat(400);
895
896        // Turn 1 — get_issue caches.
897        let r1 = p.process(input("tc_1", "get_issue", None, &body));
898        assert_eq!(r1.layer, Layer::L3);
899
900        // Turn 2 — same body, would dedup ...
901        let r2 = p.process(input("tc_2", "get_issue", None, &body));
902        assert_eq!(r2.layer, Layer::L0, "second call should dedup");
903
904        // Turn 3 — `update_issue` runs (different body!), declares
905        // `invalidates = ["get_issue"]`. Its run drops the cached
906        // get_issue entry.
907        let _ = p.process(input("tc_3", "update_issue", None, &"u".repeat(400)));
908
909        // Turn 4 — re-issuing get_issue with the *same* body must now
910        // come back fresh; the cache was busted by update_issue.
911        let r4 = p.process(input("tc_4", "get_issue", None, &body));
912        assert_eq!(
913            r4.layer,
914            Layer::L3,
915            "get_issue cache must be invalidated by update_issue"
916        );
917    }
918}