Skip to main content

mati_core/store/
session.rs

1//! Analytics and session lifecycle functions for the hook pipeline.
2//!
3//! These functions are called from two paths:
4//! - `cli/hooks.rs` fallback (when daemon is not running, direct store open)
5//! - `mcp/server.rs` daemon socket (when MCP server holds the exclusive lock)
6//!
7//! Having them here avoids code duplication and ensures both paths are
8//! behaviourally identical.
9
10use std::path::Path;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use anyhow::{Context, Result};
14
15use super::{
16    Category, ConfidenceScore, FileRecord, GotchaRecord, Priority, QualityScore, Record,
17    RecordLifecycle, RecordSource, RecordVersion, StaleReviewEntry, StaleReviewPayload,
18    StalenessScore, StalenessTier, Store,
19};
20use crate::health::staleness::StalenessAnalyzer;
21
22// ── Internal helpers ──────────────────────────────────────────────────────────
23
24pub fn now_secs() -> u64 {
25    SystemTime::now()
26        .duration_since(UNIX_EPOCH)
27        .unwrap_or_default()
28        .as_secs()
29}
30
31pub fn today_key(prefix: &str) -> String {
32    let now = chrono::Utc::now().format("%Y-%m-%d");
33    format!("{prefix}{now}")
34}
35
36pub fn session_record(key: &str, value: String) -> Record {
37    let now = now_secs();
38    Record {
39        key: key.to_string(),
40        value,
41        category: Category::Session,
42        priority: Priority::Normal,
43        tags: vec![],
44        created_at: now,
45        updated_at: now,
46        ref_url: None,
47        staleness: StalenessScore::fresh(),
48        lifecycle: RecordLifecycle::Active,
49        version: RecordVersion {
50            device_id: crate::store::stable_device_id(),
51            logical_clock: 1,
52            wall_clock: now,
53        },
54        quality: QualityScore::layer0_default(),
55        access_count: 0,
56        last_accessed: 0,
57        source: RecordSource::SessionHook,
58        confidence: ConfidenceScore::for_new_record(&RecordSource::SessionHook),
59        gap_analysis_score: 0.0,
60        payload: None,
61    }
62}
63
64pub fn analytics_record(key: &str, value: String) -> Record {
65    let mut r = session_record(key, value);
66    r.category = Category::Analytics;
67    r
68}
69
70/// Daily aggregation record value.
71#[derive(serde::Serialize, serde::Deserialize, Debug)]
72pub struct DailyAgg {
73    pub count: u64,
74    pub keys: Vec<String>,
75}
76
77pub const MAX_AGG_KEYS: usize = 100;
78
79/// Minimum staleness value for stale review inclusion.
80const STALE_REVIEW_MIN: f32 = 0.4;
81/// Maximum staleness value for stale review inclusion (Liability and above excluded).
82const STALE_REVIEW_MAX: f32 = 0.7;
83/// Default TTL for recent consultation receipts (15 minutes).
84pub const CONSULTED_RECENT_TTL_SECS: u64 = 900;
85/// Maximum entries in a single daily stale review record.
86pub const MAX_STALE_REVIEW_ENTRIES: usize = 20;
87/// Minimum access count before an unconfirmed gotcha is auto-promoted.
88pub const GOTCHA_PROMOTION_ACCESS_THRESHOLD: u32 = 3;
89
90pub async fn upsert_daily_agg(store: &Store, agg_key: &str, target_key: &str) -> Result<()> {
91    let now = now_secs();
92
93    match store.get(agg_key).await? {
94        Some(mut record) => {
95            let mut agg: DailyAgg = record.payload_as::<DailyAgg>().unwrap_or(DailyAgg {
96                count: 0,
97                keys: vec![],
98            });
99            agg.count += 1;
100            if agg.keys.len() < MAX_AGG_KEYS && !agg.keys.iter().any(|k| k == target_key) {
101                agg.keys.push(target_key.to_string());
102            }
103            record.payload = serde_json::to_value(&agg).ok();
104            record.updated_at = now;
105            record.version.logical_clock += 1;
106            record.version.wall_clock = now;
107            store.put(agg_key, &record).await?;
108        }
109        None => {
110            let agg = DailyAgg {
111                count: 1,
112                keys: vec![target_key.to_string()],
113            };
114            let mut record = analytics_record(agg_key, String::new());
115            record.payload = serde_json::to_value(&agg).ok();
116            store.put(agg_key, &record).await?;
117        }
118    }
119
120    Ok(())
121}
122
123/// Compute the daily aggregation upsert WITHOUT persisting.
124///
125/// Returns `(key, serialized_record_bytes)` for staging into a
126/// `transact_sessions_raw` call. The caller commits this alongside
127/// other writes (e.g., audit) in one atomic transaction.
128pub async fn upsert_daily_agg_staged(
129    store: &Store,
130    agg_key: &str,
131    target_key: &str,
132) -> Result<(String, Vec<u8>)> {
133    let now = now_secs();
134
135    let record = match store.get(agg_key).await? {
136        Some(mut record) => {
137            let mut agg: DailyAgg = record.payload_as::<DailyAgg>().unwrap_or(DailyAgg {
138                count: 0,
139                keys: vec![],
140            });
141            agg.count += 1;
142            if agg.keys.len() < MAX_AGG_KEYS && !agg.keys.iter().any(|k| k == target_key) {
143                agg.keys.push(target_key.to_string());
144            }
145            record.payload = serde_json::to_value(&agg).ok();
146            record.updated_at = now;
147            record.version.logical_clock += 1;
148            record.version.wall_clock = now;
149            record
150        }
151        None => {
152            let agg = DailyAgg {
153                count: 1,
154                keys: vec![target_key.to_string()],
155            };
156            let mut record = analytics_record(agg_key, String::new());
157            record.payload = serde_json::to_value(&agg).ok();
158            record
159        }
160    };
161
162    let bytes = rmp_serde::to_vec_named(&record)
163        .with_context(|| format!("failed to serialize agg record for {agg_key}"))?;
164    Ok((agg_key.to_string(), bytes))
165}
166
167fn receipt_key(key: &str, actor: Option<&str>) -> String {
168    match actor {
169        Some(a) => format!("session:consulted:{a}:{key}"),
170        None => format!("session:consulted:{key}"),
171    }
172}
173
174/// Compute the consultation receipt record WITHOUT persisting.
175///
176/// When `actor` is `Some`, writes an actor-scoped key `session:consulted:<actor>:<key>`
177/// alongside the global key path. Pass `None` for all existing callers (global).
178pub fn consultation_receipt_staged(key: &str, actor: Option<&str>) -> Result<(String, Vec<u8>)> {
179    let consulted_key = receipt_key(key, actor);
180    let record = session_record(&consulted_key, String::new());
181    let bytes = rmp_serde::to_vec_named(&record)
182        .with_context(|| format!("failed to serialize consulted receipt for {consulted_key}"))?;
183    Ok((consulted_key, bytes))
184}
185
186/// Compute the session:current flush record WITHOUT persisting.
187///
188/// Returns `(key, serialized_record_bytes)` for staging.
189pub async fn session_flush_staged(store: &Store) -> Result<Option<(String, Vec<u8>)>> {
190    let now = now_secs();
191    let consulted_keys = store.scan_keys("session:consulted:").await?;
192    let stripped: Vec<String> = consulted_keys
193        .iter()
194        .map(|k| {
195            k.strip_prefix("session:consulted:")
196                .unwrap_or(k)
197                .to_string()
198        })
199        .collect();
200
201    let session_data = serde_json::json!({
202        "consulted_keys": stripped,
203        "flushed_at": now,
204    });
205    let mut rec = session_record("session:current", String::new());
206    rec.payload = Some(session_data);
207    let bytes = rmp_serde::to_vec_named(&rec)?;
208    Ok(Some(("session:current".to_string(), bytes)))
209}
210
211// ── log_hit ───────────────────────────────────────────────────────────────────
212
213/// Record a cache hit: write consulted marker, bump access_count, update daily agg.
214pub async fn log_hit(store: &Store, key: &str) -> Result<()> {
215    let now = now_secs();
216
217    // 1. Daily hit aggregation
218    let agg_key = today_key("analytics:hit_");
219    upsert_daily_agg(store, &agg_key, key).await?;
220
221    // 2. Mark as consulted for session tracking
222    let consulted_key = receipt_key(key, None);
223    store
224        .put(
225            &consulted_key,
226            &session_record(&consulted_key, String::new()),
227        )
228        .await?;
229
230    // 3. Bump access_count and last_accessed on the target record
231    if let Some(mut record) = store.get(key).await? {
232        record.access_count += 1;
233        record.last_accessed = now;
234        store.put(key, &record).await?;
235    }
236
237    // 4. Best-effort enforcement event: ReceiptMinted.
238    //
239    // Mirrors the socket-mode path in `dispatch_v2::ConsultationHit` so the
240    // direct-mode CLI path (`mati explain` without a daemon, or any code
241    // calling `session::log_hit` against an open Store) produces the same
242    // `receipt_minted` row in `mati history --enforcement`. Without this
243    // parity, the enforcement audit log has gaps depending on whether the
244    // mint happened over socket or direct mode.
245    let _ = crate::store::enforcement::record_event(
246        store,
247        crate::store::enforcement::EnforcementEventType::ReceiptMinted,
248        crate::store::enforcement::SubjectKind::File,
249        key.to_string(),
250        "claude".to_string(),
251        None,
252        "consultation_requested".to_string(),
253        None,
254    )
255    .await;
256
257    Ok(())
258}
259
260// ── log_miss ──────────────────────────────────────────────────────────────────
261
262/// Record a cache miss: update daily miss aggregation.
263pub async fn log_miss(store: &Store, key: &str) -> Result<()> {
264    let agg_key = today_key("analytics:miss_");
265    upsert_daily_agg(store, &agg_key, key).await
266}
267
268// ── log_compliance_miss ───────────────────────────────────────────────────────
269
270/// Record a compliance miss: file read without prior mati consultation.
271pub async fn log_compliance_miss(store: &Store, key: &str) -> Result<()> {
272    let agg_key = today_key("compliance:miss_");
273    upsert_daily_agg(store, &agg_key, key).await
274}
275
276/// Record a compliance hit: file access allowed because a valid consultation
277/// receipt existed. Platform-neutral — incremented for both Claude pre-read
278/// `AlreadyConsulted` allow and Codex post-bash confirmed consultation.
279pub async fn log_compliance_hit(store: &Store, key: &str) -> Result<()> {
280    let agg_key = today_key("compliance:allow_after_receipt_");
281    upsert_daily_agg(store, &agg_key, key).await
282}
283
284/// Record a Codex shell compliance miss: Bash file inspection without consultation.
285pub async fn log_codex_shell_miss(store: &Store, key: &str) -> Result<()> {
286    let agg_key = today_key("compliance:codex_shell_miss_");
287    upsert_daily_agg(store, &agg_key, key).await
288}
289
290/// Record a Codex prompt nudge: prompt indicated code work before clear consultation.
291pub async fn log_prompt_nudge(store: &Store, key: &str) -> Result<()> {
292    let agg_key = today_key("analytics:codex_prompt_nudge_");
293    upsert_daily_agg(store, &agg_key, key).await
294}
295
296/// Record a bootstrap event. Used to measure Codex/agent bootstrap adoption.
297pub async fn log_bootstrap(store: &Store, key: &str) -> Result<()> {
298    let agg_key = today_key("analytics:bootstrap_");
299    upsert_daily_agg(store, &agg_key, key).await
300}
301
302// ── check_consulted ───────────────────────────────────────────────────────────
303
304/// Return true if the consulted marker exists (set by `log_hit` / capture hook).
305///
306/// When `actor` is `Some(id)`, reads the actor-scoped key
307/// `session:consulted:<id>:<key>` (subagent path); `None` reads the global key
308/// `session:consulted:<key>` (main-thread path — unchanged).
309pub async fn check_consulted(store: &Store, key: &str, actor: Option<&str>) -> Result<bool> {
310    let consulted_key = receipt_key(key, actor);
311    Ok(store.get(&consulted_key).await?.is_some())
312}
313
314/// Return true if the consulted marker exists and is newer than `ttl_secs`.
315///
316/// When `actor` is `Some(id)`, reads the actor-scoped key
317/// `session:consulted:<id>:<key>` (subagent enforcement path).
318/// When `actor` is `None`, reads the global key `session:consulted:<key>`
319/// (main-thread path — unchanged behaviour).
320pub async fn check_consulted_recent(
321    store: &Store,
322    key: &str,
323    ttl_secs: u64,
324    actor: Option<&str>,
325) -> Result<bool> {
326    let consulted_key = receipt_key(key, actor);
327    let Some(record) = store.get(&consulted_key).await? else {
328        return Ok(false);
329    };
330    let age = now_secs().saturating_sub(record.updated_at);
331    Ok(age <= ttl_secs)
332}
333
334// ── session_flush ─────────────────────────────────────────────────────────────
335
336/// Collect all consulted markers into `session:current` for harvest.
337pub async fn session_flush(store: &Store) -> Result<()> {
338    let now = now_secs();
339
340    let consulted_keys = store.scan_keys("session:consulted:").await?;
341    let stripped: Vec<String> = consulted_keys
342        .iter()
343        .map(|k| {
344            k.strip_prefix("session:consulted:")
345                .unwrap_or(k)
346                .to_string()
347        })
348        .collect();
349
350    let session_data = serde_json::json!({
351        "consulted_keys": stripped,
352        "flushed_at": now,
353    });
354    let mut rec = session_record("session:current", String::new());
355    rec.payload = Some(session_data);
356    store.put("session:current", &rec).await?;
357    Ok(())
358}
359
360/// Delete all consult receipts (`session:consulted:*`) from the store.
361///
362/// Shared by `session_clear_consults` (PostCompact) and the end-of-session
363/// `session_harvest` / `session_harvest_no_staleness` cleanup. Propagates store
364/// errors; the daemon-startup stale-marker sweep keeps its own fail-soft loop.
365async fn delete_all_receipts(store: &Store) -> Result<()> {
366    let consulted_keys = store.scan_keys("session:consulted:").await?;
367    for k in &consulted_keys {
368        store.delete(k).await?;
369    }
370    Ok(())
371}
372
373/// Clear all consult receipts for the session.
374///
375/// Used by the PostCompact hook: compaction wipes the agent's memory of consulted
376/// gotchas, but receipts are time-based and survive, so PreToolUse would not
377/// re-block. Clearing them forces a fresh mem_get on next access.
378pub async fn session_clear_consults(store: &Store) -> Result<()> {
379    delete_all_receipts(store).await
380}
381
382// ── session_harvest ───────────────────────────────────────────────────────────
383
384/// Archive session, run staleness analysis, auto-promote gotchas.
385///
386/// Full version: includes git-based staleness analysis. Used from CLI path.
387/// For the daemon socket path (tokio::spawn, !Send constraint), use
388/// `session_harvest_no_staleness` instead.
389pub async fn session_harvest(store: &Store, cwd: &Path) -> Result<()> {
390    let now = now_secs();
391
392    // M-12-D: promote gotcha candidates before archiving
393    match promote_gotcha_candidates(store).await {
394        Ok(n) if n > 0 => tracing::info!(promoted = n, "gotcha candidates auto-promoted"),
395        Ok(_) => {}
396        Err(e) => tracing::warn!(error = %e, "gotcha promotion failed"),
397    }
398
399    // M-13-A: run full staleness analysis
400    match StalenessAnalyzer::new(cwd).analyze_all(store).await {
401        Ok(report) if report.updated > 0 => {
402            tracing::info!(
403                scanned = report.scanned,
404                updated = report.updated,
405                tombstoned = report.tombstoned,
406                liability = report.liability,
407                "staleness analysis complete"
408            );
409        }
410        Ok(_) => {}
411        Err(e) => tracing::warn!(error = %e, "staleness analysis failed"),
412    }
413
414    // Read session:current (written by session-flush)
415    let session_rec = match store.get("session:current").await? {
416        Some(r) => r,
417        None => return Ok(()),
418    };
419
420    let session_value = match session_rec.payload.as_ref() {
421        Some(p) => serde_json::to_string(p).unwrap_or_default(),
422        None => session_rec.value.clone(),
423    };
424
425    // M-13-C: collect and store stale reviews for consulted keys
426    match collect_and_store_stale_reviews(store, &session_value, now).await {
427        Ok(n) if n > 0 => tracing::info!(entries = n, "stale review entries collected"),
428        Ok(_) => {}
429        Err(e) => tracing::warn!(error = %e, "stale review collection failed"),
430    }
431
432    // Write permanent session record
433    let session_key = format!("session:{now}");
434    let mut perm = session_record(&session_key, session_value);
435    perm.payload = session_rec.payload;
436    store.put(&session_key, &perm).await?;
437
438    // Clean up session:consulted:* markers
439    delete_all_receipts(store).await?;
440
441    // Update stage:current with last session timestamp
442    if let Some(mut stage) = store.get("stage:current").await? {
443        stage.updated_at = now;
444        stage.version.logical_clock += 1;
445        stage.version.wall_clock = now;
446        let base = stage
447            .value
448            .lines()
449            .filter(|l| !l.starts_with("last_session:"))
450            .collect::<Vec<_>>()
451            .join("\n");
452        stage.value = if base.is_empty() {
453            format!("last_session: {session_key}")
454        } else {
455            format!("{base}\nlast_session: {session_key}")
456        };
457        store.put("stage:current", &stage).await?;
458    }
459
460    Ok(())
461}
462
463/// Session harvest without git-based staleness analysis.
464///
465/// Used from the daemon socket (tokio::spawn requires Send, but StalenessAnalyzer
466/// contains git2::Repository which is !Send). Staleness analysis is deferred to
467/// the next CLI-path harvest (when the MCP server is not holding the lock).
468pub async fn session_harvest_no_staleness(store: &Store) -> Result<()> {
469    let now = now_secs();
470
471    // M-12-D: promote gotcha candidates
472    match promote_gotcha_candidates(store).await {
473        Ok(n) if n > 0 => tracing::info!(promoted = n, "gotcha candidates auto-promoted"),
474        Ok(_) => {}
475        Err(e) => tracing::warn!(error = %e, "gotcha promotion failed"),
476    }
477
478    // Read session:current (written by session-flush)
479    let session_rec = match store.get("session:current").await? {
480        Some(r) => r,
481        None => return Ok(()),
482    };
483
484    let session_value = match session_rec.payload.as_ref() {
485        Some(p) => serde_json::to_string(p).unwrap_or_default(),
486        None => session_rec.value.clone(),
487    };
488
489    // M-13-C: collect stale reviews (no git analysis — uses existing staleness values)
490    match collect_and_store_stale_reviews(store, &session_value, now).await {
491        Ok(n) if n > 0 => tracing::info!(entries = n, "stale review entries collected"),
492        Ok(_) => {}
493        Err(e) => tracing::warn!(error = %e, "stale review collection failed"),
494    }
495
496    // Write permanent session record
497    let session_key = format!("session:{now}");
498    let mut perm = session_record(&session_key, session_value);
499    perm.payload = session_rec.payload;
500    store.put(&session_key, &perm).await?;
501
502    // Clean up consulted markers
503    delete_all_receipts(store).await?;
504
505    // Update stage:current
506    if let Some(mut stage) = store.get("stage:current").await? {
507        stage.updated_at = now;
508        stage.version.logical_clock += 1;
509        stage.version.wall_clock = now;
510        let base = stage
511            .value
512            .lines()
513            .filter(|l| !l.starts_with("last_session:"))
514            .collect::<Vec<_>>()
515            .join("\n");
516        stage.value = if base.is_empty() {
517            format!("last_session: {session_key}")
518        } else {
519            format!("{base}\nlast_session: {session_key}")
520        };
521        store.put("stage:current", &stage).await?;
522    }
523
524    Ok(())
525}
526
527// ── doc_capture ───────────────────────────────────────────────────────────────
528
529/// Extract a canonical doc comment from `content` and update `file:{path}` record.
530///
531/// No-ops when: no record exists, record source is not StaticAnalysis, or no
532/// doc comment found in content.
533pub async fn doc_capture(store: &Store, path: &str, content: &str) -> Result<()> {
534    let purpose = extract_doc_comment(path, content);
535    if purpose.is_empty() {
536        return Ok(());
537    }
538
539    let file_key = format!("file:{path}");
540    let mut record = match store.get(&file_key).await? {
541        Some(r) => r,
542        None => return Ok(()),
543    };
544
545    // Only update when the record's current source is static analysis
546    // (Layer 0 stub) — don't overwrite developer-manual or higher-quality records.
547    if record.source != RecordSource::StaticAnalysis {
548        return Ok(());
549    }
550
551    if let Some(mut fr) = record.payload_as::<FileRecord>() {
552        fr.purpose = purpose.clone();
553        record.payload = serde_json::to_value(&fr).ok();
554    } else {
555        return Ok(());
556    }
557
558    let now = now_secs();
559    record.value = purpose;
560    record.source = RecordSource::SessionHook;
561    record.confidence.value = 0.65;
562    record.quality = QualityScore::doc_comment_default();
563    record.updated_at = now;
564    record.version.logical_clock += 1;
565    record.version.wall_clock = now;
566
567    if let Err(e) = store.put(&file_key, &record).await {
568        tracing::warn!(path, "doc-capture put failed: {e}");
569    }
570    Ok(())
571}
572
573// ── Doc comment extraction ────────────────────────────────────────────────────
574
575pub fn extract_doc_comment(path: &str, content: &str) -> String {
576    let ext = std::path::Path::new(path)
577        .extension()
578        .and_then(|e| e.to_str())
579        .unwrap_or("");
580
581    match ext {
582        "rs" => extract_rust_module_doc(content),
583        "py" => extract_python_docstring(content),
584        "go" => extract_go_package_doc_comment(content),
585        "ts" | "tsx" | "js" | "jsx" | "mjs" | "cjs" => extract_jsdoc(content),
586        _ => String::new(),
587    }
588}
589
590fn extract_rust_module_doc(content: &str) -> String {
591    let lines: Vec<&str> = content
592        .lines()
593        .take_while(|l| l.trim_start().starts_with("//!"))
594        .map(|l| l.trim_start().trim_start_matches("//!").trim())
595        .collect();
596    lines.join(" ").trim().to_string()
597}
598
599fn extract_python_docstring(content: &str) -> String {
600    let trimmed = content.trim_start();
601    for delim in &[r#"""""#, "'''"] {
602        if let Some(rest) = trimmed.strip_prefix(delim) {
603            if let Some(end) = rest.find(delim) {
604                return rest[..end]
605                    .trim()
606                    .lines()
607                    .next()
608                    .unwrap_or("")
609                    .trim()
610                    .to_string();
611            }
612        }
613    }
614    String::new()
615}
616
617fn extract_go_package_doc_comment(content: &str) -> String {
618    let mut lines: Vec<String> = Vec::new();
619    for line in content.lines() {
620        let t = line.trim();
621        if t.starts_with("//") {
622            lines.push(t.trim_start_matches("//").trim().to_string());
623        } else if t.starts_with("package ") {
624            break;
625        } else if !t.is_empty() {
626            lines.clear();
627        }
628    }
629    lines.join(" ").trim().to_string()
630}
631
632fn extract_jsdoc(content: &str) -> String {
633    let trimmed = content.trim_start();
634    if let Some(rest) = trimmed.strip_prefix("/**") {
635        if let Some(end) = rest.find("*/") {
636            let text: Vec<&str> = rest[..end]
637                .lines()
638                .map(|l| l.trim().trim_start_matches('*').trim())
639                .filter(|l| !l.is_empty())
640                .collect();
641            return text.join(" ").trim().to_string();
642        }
643    }
644    String::new()
645}
646
647// ── M-12-D: Gotcha auto-promotion ────────────────────────────────────────────
648
649pub async fn promote_gotcha_candidates(store: &Store) -> Result<u32> {
650    let gotchas = store.scan_prefix("gotcha:").await?;
651    let now = now_secs();
652    let mut promoted = 0u32;
653
654    for mut record in gotchas {
655        if record.access_count < GOTCHA_PROMOTION_ACCESS_THRESHOLD {
656            continue;
657        }
658        let mut gotcha: GotchaRecord = match record.payload_as::<GotchaRecord>() {
659            Some(g) => g,
660            None => continue,
661        };
662        if gotcha.confirmed {
663            continue;
664        }
665        gotcha.confirmed = true;
666        record.payload = serde_json::to_value(&gotcha).ok();
667        // NOTE: confirmation_count includes auto-promotions. Downstream consumers
668        // should not assume this counter reflects only human confirmations.
669        record.confidence.confirmation_count += 1;
670        record.updated_at = now;
671        record.version.logical_clock += 1;
672        record.version.wall_clock = now;
673        store.put(&record.key, &record).await?;
674        promoted += 1;
675    }
676
677    Ok(promoted)
678}
679
680// ── M-13-C: Stale review collection ──────────────────────────────────────────
681
682pub fn format_review_date(now_secs: u64) -> String {
683    let dt = chrono::DateTime::from_timestamp(now_secs as i64, 0).unwrap_or_else(chrono::Utc::now);
684    dt.format("%Y-%m-%d").to_string()
685}
686
687pub async fn collect_and_store_stale_reviews(
688    store: &Store,
689    session_value: &str,
690    now: u64,
691) -> Result<usize> {
692    let session: serde_json::Value = serde_json::from_str(session_value)?;
693    let consulted_keys = match session["consulted_keys"].as_array() {
694        Some(arr) => arr
695            .iter()
696            .filter_map(|v| v.as_str().map(|s| s.to_string()))
697            .collect::<Vec<_>>(),
698        None => return Ok(0),
699    };
700    if consulted_keys.is_empty() {
701        return Ok(0);
702    }
703
704    let new_entries = collect_stale_entries(store, &consulted_keys).await?;
705    if new_entries.is_empty() {
706        return Ok(0);
707    }
708
709    let date = format_review_date(now);
710    let review_key = format!("analytics:stale_review_{date}");
711    let new_count = new_entries.len();
712
713    let mut payload = match store.get(&review_key).await? {
714        Some(existing) => {
715            existing
716                .payload_as::<StaleReviewPayload>()
717                .unwrap_or(StaleReviewPayload {
718                    session_timestamp: now,
719                    entries: vec![],
720                })
721        }
722        None => StaleReviewPayload {
723            session_timestamp: now,
724            entries: vec![],
725        },
726    };
727
728    // Merge: new entries take priority, dedup by key
729    let mut seen_keys = std::collections::HashSet::new();
730    let mut merged = Vec::new();
731    for entry in new_entries {
732        if seen_keys.insert(entry.key.clone()) {
733            merged.push(entry);
734        }
735    }
736    for entry in payload.entries {
737        if seen_keys.insert(entry.key.clone()) {
738            merged.push(entry);
739        }
740    }
741
742    // Sort descending by staleness, truncate
743    merged.sort_by(|a, b| {
744        b.staleness_value
745            .partial_cmp(&a.staleness_value)
746            .unwrap_or(std::cmp::Ordering::Equal)
747    });
748    merged.truncate(MAX_STALE_REVIEW_ENTRIES);
749
750    payload.session_timestamp = now;
751    payload.entries = merged;
752
753    let mut record = analytics_record(&review_key, String::new());
754    record.payload = serde_json::to_value(&payload).ok();
755    store.put(&review_key, &record).await?;
756
757    Ok(new_count)
758}
759
760pub async fn collect_stale_entries(
761    store: &Store,
762    consulted_keys: &[String],
763) -> Result<Vec<StaleReviewEntry>> {
764    let mut entries = Vec::new();
765
766    for key in consulted_keys {
767        let record = match store.get(key).await? {
768            Some(r) => r,
769            None => continue,
770        };
771
772        // Exclude non-Active lifecycle
773        if !matches!(record.lifecycle, RecordLifecycle::Active) {
774            continue;
775        }
776
777        // Exclude Liability and Tombstone tiers
778        if matches!(
779            record.staleness.tier,
780            StalenessTier::Liability | StalenessTier::Tombstone
781        ) {
782            continue;
783        }
784
785        // Filter to [STALE_REVIEW_MIN, STALE_REVIEW_MAX) range
786        if record.staleness.value < STALE_REVIEW_MIN || record.staleness.value >= STALE_REVIEW_MAX {
787            continue;
788        }
789
790        let top_signals: Vec<String> = record
791            .staleness
792            .signals
793            .iter()
794            .take(3)
795            .map(|s| s.to_string())
796            .collect();
797
798        entries.push(StaleReviewEntry {
799            key: key.clone(),
800            staleness_value: record.staleness.value,
801            tier: record.staleness.tier.clone(),
802            last_updated: record.updated_at,
803            signals: top_signals,
804        });
805    }
806
807    entries.sort_by(|a, b| {
808        b.staleness_value
809            .partial_cmp(&a.staleness_value)
810            .unwrap_or(std::cmp::Ordering::Equal)
811    });
812    entries.truncate(MAX_STALE_REVIEW_ENTRIES);
813
814    Ok(entries)
815}
816
817#[cfg(test)]
818mod tests {
819    use tempfile::TempDir;
820
821    use super::*;
822
823    async fn temp_store() -> (TempDir, Store) {
824        let dir = TempDir::new().expect("tempdir");
825        let store = Store::open(dir.path()).await.expect("open store");
826        (dir, store)
827    }
828
829    #[tokio::test]
830    async fn log_bootstrap_creates_daily_aggregate() {
831        let (_dir, store) = temp_store().await;
832
833        log_bootstrap(&store, "__bootstrap__")
834            .await
835            .expect("log bootstrap");
836
837        let key = today_key("analytics:bootstrap_");
838        let record = store
839            .get(&key)
840            .await
841            .expect("get bootstrap aggregate")
842            .expect("bootstrap record exists");
843        let agg = record.payload_as::<DailyAgg>().expect("daily agg payload");
844        assert_eq!(agg.count, 1);
845        assert_eq!(agg.keys, vec!["__bootstrap__".to_string()]);
846    }
847
848    #[tokio::test]
849    async fn check_consulted_recent_uses_receipt_ttl() {
850        let (_dir, store) = temp_store().await;
851        let key = "file:src/main.rs";
852
853        assert!(!check_consulted_recent(&store, key, 900, None)
854            .await
855            .expect("no receipt yet"));
856
857        log_hit(&store, key).await.expect("log consultation hit");
858
859        assert!(check_consulted_recent(&store, key, 900, None)
860            .await
861            .expect("fresh receipt should be valid"));
862    }
863
864    #[tokio::test]
865    async fn consult_receipt_is_actor_scoped_when_actor_present() {
866        let (_dir, store) = temp_store().await;
867
868        // Actor-scoped receipt: actor Some("agentA").
869        let (k, v) = consultation_receipt_staged("file:x", Some("agentA")).unwrap();
870        store.transact_sessions_raw(&[(&k, &v)]).await.unwrap();
871
872        let keys = store.scan_keys("session:consulted:").await.unwrap();
873        assert!(
874            keys.iter().any(|k| k == "session:consulted:agentA:file:x"),
875            "actor-scoped key must be present, got: {keys:?}"
876        );
877        assert!(
878            !keys.iter().any(|k| k == "session:consulted:file:x"),
879            "global key must NOT be written by actor-scoped call, got: {keys:?}"
880        );
881
882        // Global receipt: actor None.
883        let (k2, v2) = consultation_receipt_staged("file:x", None).unwrap();
884        store.transact_sessions_raw(&[(&k2, &v2)]).await.unwrap();
885
886        let keys2 = store.scan_keys("session:consulted:").await.unwrap();
887        assert!(
888            keys2.iter().any(|k| k == "session:consulted:file:x"),
889            "global key must be present with actor=None, got: {keys2:?}"
890        );
891    }
892
893    #[tokio::test]
894    async fn gate_requires_actor_scoped_receipt_for_subagent() {
895        let (_dir, store) = temp_store().await;
896
897        // Write an actor-scoped receipt for agentA / file:x.
898        let (k, v) = consultation_receipt_staged("file:x", Some("agentA")).unwrap();
899        store.transact_sessions_raw(&[(&k, &v)]).await.unwrap();
900
901        // agentA's own receipt is found.
902        assert!(
903            check_consulted_recent(&store, "file:x", 900, Some("agentA"))
904                .await
905                .expect("agentA receipt lookup"),
906            "agentA should see its own actor-scoped receipt"
907        );
908
909        // A DIFFERENT subagent (agentB) does NOT see agentA's receipt.
910        assert!(
911            !check_consulted_recent(&store, "file:x", 900, Some("agentB"))
912                .await
913                .expect("agentB receipt lookup"),
914            "agentB must NOT ride agentA's receipt"
915        );
916
917        // Write a GLOBAL receipt for file:y (main-thread path).
918        let (k2, v2) = consultation_receipt_staged("file:y", None).unwrap();
919        store.transact_sessions_raw(&[(&k2, &v2)]).await.unwrap();
920
921        // Main thread (actor=None) sees the global receipt unchanged.
922        assert!(
923            check_consulted_recent(&store, "file:y", 900, None)
924                .await
925                .expect("global receipt lookup"),
926            "main thread must still see the global receipt"
927        );
928
929        // A subagent does NOT ride the global (main-thread) receipt.
930        assert!(
931            !check_consulted_recent(&store, "file:y", 900, Some("agentA"))
932                .await
933                .expect("agentA vs global receipt lookup"),
934            "subagent must NOT ride the global main-thread receipt"
935        );
936    }
937
938    #[tokio::test]
939    async fn session_clear_consults_deletes_all_receipts() {
940        let (_dir, store) = temp_store().await;
941        let key1 = "file:src/main.rs";
942        let key2 = "file:src/lib.rs";
943
944        log_hit(&store, key1).await.expect("log first hit");
945        log_hit(&store, key2).await.expect("log second hit");
946
947        // Verify receipts exist before clearing.
948        let before = store
949            .scan_keys("session:consulted:")
950            .await
951            .expect("scan before");
952        assert_eq!(before.len(), 2, "expected two receipts before clear");
953
954        session_clear_consults(&store)
955            .await
956            .expect("clear_consults should succeed");
957
958        let after = store
959            .scan_keys("session:consulted:")
960            .await
961            .expect("scan after");
962        assert!(after.is_empty(), "all receipts should be gone after clear");
963    }
964}