Skip to main content

mati_core/store/
session.rs

1//! Analytics and session lifecycle functions for the hook pipeline.
2//!
3//! These functions are called from two paths:
4//! - `cli/hooks.rs` fallback (when daemon is not running, direct store open)
5//! - `mcp/server.rs` daemon socket (when MCP server holds the exclusive lock)
6//!
7//! Having them here avoids code duplication and ensures both paths are
8//! behaviourally identical.
9
10use std::path::Path;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use anyhow::{Context, Result};
14
15use super::{
16    Category, ConfidenceScore, FileRecord, GotchaRecord, Priority, QualityScore, Record,
17    RecordLifecycle, RecordSource, RecordVersion, StaleReviewEntry, StaleReviewPayload,
18    StalenessScore, StalenessTier, Store,
19};
20use crate::health::staleness::StalenessAnalyzer;
21
22// ── Internal helpers ──────────────────────────────────────────────────────────
23
24pub fn now_secs() -> u64 {
25    SystemTime::now()
26        .duration_since(UNIX_EPOCH)
27        .unwrap_or_default()
28        .as_secs()
29}
30
31pub fn today_key(prefix: &str) -> String {
32    let now = chrono::Utc::now().format("%Y-%m-%d");
33    format!("{prefix}{now}")
34}
35
36pub fn session_record(key: &str, value: String) -> Record {
37    let now = now_secs();
38    Record {
39        key: key.to_string(),
40        value,
41        category: Category::Session,
42        priority: Priority::Normal,
43        tags: vec![],
44        created_at: now,
45        updated_at: now,
46        ref_url: None,
47        staleness: StalenessScore::fresh(),
48        lifecycle: RecordLifecycle::Active,
49        version: RecordVersion {
50            device_id: uuid::Uuid::new_v4(),
51            logical_clock: 1,
52            wall_clock: now,
53        },
54        quality: QualityScore::layer0_default(),
55        access_count: 0,
56        last_accessed: 0,
57        source: RecordSource::SessionHook,
58        confidence: ConfidenceScore::for_new_record(&RecordSource::SessionHook),
59        gap_analysis_score: 0.0,
60        payload: None,
61    }
62}
63
64pub fn analytics_record(key: &str, value: String) -> Record {
65    let mut r = session_record(key, value);
66    r.category = Category::Analytics;
67    r
68}
69
70/// Daily aggregation record value.
71#[derive(serde::Serialize, serde::Deserialize, Debug)]
72pub struct DailyAgg {
73    pub count: u64,
74    pub keys: Vec<String>,
75}
76
77pub const MAX_AGG_KEYS: usize = 100;
78
79/// Minimum staleness value for stale review inclusion.
80const STALE_REVIEW_MIN: f32 = 0.4;
81/// Maximum staleness value for stale review inclusion (Liability and above excluded).
82const STALE_REVIEW_MAX: f32 = 0.7;
83/// Default TTL for recent consultation receipts (15 minutes).
84pub const CONSULTED_RECENT_TTL_SECS: u64 = 900;
85/// Maximum entries in a single daily stale review record.
86pub const MAX_STALE_REVIEW_ENTRIES: usize = 20;
87/// Minimum access count before an unconfirmed gotcha is auto-promoted.
88pub const GOTCHA_PROMOTION_ACCESS_THRESHOLD: u32 = 3;
89
90pub async fn upsert_daily_agg(store: &Store, agg_key: &str, target_key: &str) -> Result<()> {
91    let now = now_secs();
92
93    match store.get(agg_key).await? {
94        Some(mut record) => {
95            let mut agg: DailyAgg = record.payload_as::<DailyAgg>().unwrap_or(DailyAgg {
96                count: 0,
97                keys: vec![],
98            });
99            agg.count += 1;
100            if agg.keys.len() < MAX_AGG_KEYS && !agg.keys.iter().any(|k| k == target_key) {
101                agg.keys.push(target_key.to_string());
102            }
103            record.payload = serde_json::to_value(&agg).ok();
104            record.updated_at = now;
105            record.version.logical_clock += 1;
106            record.version.wall_clock = now;
107            store.put(agg_key, &record).await?;
108        }
109        None => {
110            let agg = DailyAgg {
111                count: 1,
112                keys: vec![target_key.to_string()],
113            };
114            let mut record = analytics_record(agg_key, String::new());
115            record.payload = serde_json::to_value(&agg).ok();
116            store.put(agg_key, &record).await?;
117        }
118    }
119
120    Ok(())
121}
122
123/// Compute the daily aggregation upsert WITHOUT persisting.
124///
125/// Returns `(key, serialized_record_bytes)` for staging into a
126/// `transact_sessions_raw` call. The caller commits this alongside
127/// other writes (e.g., audit) in one atomic transaction.
128pub async fn upsert_daily_agg_staged(
129    store: &Store,
130    agg_key: &str,
131    target_key: &str,
132) -> Result<(String, Vec<u8>)> {
133    let now = now_secs();
134
135    let record = match store.get(agg_key).await? {
136        Some(mut record) => {
137            let mut agg: DailyAgg = record.payload_as::<DailyAgg>().unwrap_or(DailyAgg {
138                count: 0,
139                keys: vec![],
140            });
141            agg.count += 1;
142            if agg.keys.len() < MAX_AGG_KEYS && !agg.keys.iter().any(|k| k == target_key) {
143                agg.keys.push(target_key.to_string());
144            }
145            record.payload = serde_json::to_value(&agg).ok();
146            record.updated_at = now;
147            record.version.logical_clock += 1;
148            record.version.wall_clock = now;
149            record
150        }
151        None => {
152            let agg = DailyAgg {
153                count: 1,
154                keys: vec![target_key.to_string()],
155            };
156            let mut record = analytics_record(agg_key, String::new());
157            record.payload = serde_json::to_value(&agg).ok();
158            record
159        }
160    };
161
162    let bytes = rmp_serde::to_vec_named(&record)
163        .with_context(|| format!("failed to serialize agg record for {agg_key}"))?;
164    Ok((agg_key.to_string(), bytes))
165}
166
167/// Compute the consultation receipt record WITHOUT persisting.
168///
169/// Returns `(key, serialized_record_bytes)` for staging.
170pub fn consultation_receipt_staged(key: &str) -> Result<(String, Vec<u8>)> {
171    let consulted_key = format!("session:consulted:{key}");
172    let record = session_record(&consulted_key, String::new());
173    let bytes = rmp_serde::to_vec_named(&record)
174        .with_context(|| format!("failed to serialize consulted receipt for {consulted_key}"))?;
175    Ok((consulted_key, bytes))
176}
177
178/// Compute the session:current flush record WITHOUT persisting.
179///
180/// Returns `(key, serialized_record_bytes)` for staging.
181pub async fn session_flush_staged(store: &Store) -> Result<Option<(String, Vec<u8>)>> {
182    let now = now_secs();
183    let consulted_keys = store.scan_keys("session:consulted:").await?;
184    let stripped: Vec<String> = consulted_keys
185        .iter()
186        .map(|k| {
187            k.strip_prefix("session:consulted:")
188                .unwrap_or(k)
189                .to_string()
190        })
191        .collect();
192
193    let session_data = serde_json::json!({
194        "consulted_keys": stripped,
195        "flushed_at": now,
196    });
197    let mut rec = session_record("session:current", String::new());
198    rec.payload = Some(session_data);
199    let bytes = rmp_serde::to_vec_named(&rec)?;
200    Ok(Some(("session:current".to_string(), bytes)))
201}
202
203// ── log_hit ───────────────────────────────────────────────────────────────────
204
205/// Record a cache hit: write consulted marker, bump access_count, update daily agg.
206pub async fn log_hit(store: &Store, key: &str) -> Result<()> {
207    let now = now_secs();
208
209    // 1. Daily hit aggregation
210    let agg_key = today_key("analytics:hit_");
211    upsert_daily_agg(store, &agg_key, key).await?;
212
213    // 2. Mark as consulted for session tracking
214    let consulted_key = format!("session:consulted:{key}");
215    store
216        .put(
217            &consulted_key,
218            &session_record(&consulted_key, String::new()),
219        )
220        .await?;
221
222    // 3. Bump access_count and last_accessed on the target record
223    if let Some(mut record) = store.get(key).await? {
224        record.access_count += 1;
225        record.last_accessed = now;
226        store.put(key, &record).await?;
227    }
228
229    // 4. Best-effort enforcement event: ReceiptMinted.
230    //
231    // Mirrors the socket-mode path in `dispatch_v2::ConsultationHit` so the
232    // direct-mode CLI path (`mati explain` without a daemon, or any code
233    // calling `session::log_hit` against an open Store) produces the same
234    // `receipt_minted` row in `mati history --enforcement`. Without this
235    // parity, the enforcement audit log has gaps depending on whether the
236    // mint happened over socket or direct mode.
237    let _ = crate::store::enforcement::record_event(
238        store,
239        crate::store::enforcement::EnforcementEventType::ReceiptMinted,
240        crate::store::enforcement::SubjectKind::File,
241        key.to_string(),
242        "claude".to_string(),
243        None,
244        "consultation_requested".to_string(),
245        None,
246    )
247    .await;
248
249    Ok(())
250}
251
252// ── log_miss ──────────────────────────────────────────────────────────────────
253
254/// Record a cache miss: update daily miss aggregation.
255pub async fn log_miss(store: &Store, key: &str) -> Result<()> {
256    let agg_key = today_key("analytics:miss_");
257    upsert_daily_agg(store, &agg_key, key).await
258}
259
260// ── log_compliance_miss ───────────────────────────────────────────────────────
261
262/// Record a compliance miss: file read without prior mati consultation.
263pub async fn log_compliance_miss(store: &Store, key: &str) -> Result<()> {
264    let agg_key = today_key("compliance:miss_");
265    upsert_daily_agg(store, &agg_key, key).await
266}
267
268/// Record a compliance hit: file access allowed because a valid consultation
269/// receipt existed. Platform-neutral — incremented for both Claude pre-read
270/// `AlreadyConsulted` allow and Codex post-bash confirmed consultation.
271pub async fn log_compliance_hit(store: &Store, key: &str) -> Result<()> {
272    let agg_key = today_key("compliance:allow_after_receipt_");
273    upsert_daily_agg(store, &agg_key, key).await
274}
275
276/// Record a Codex shell compliance miss: Bash file inspection without consultation.
277pub async fn log_codex_shell_miss(store: &Store, key: &str) -> Result<()> {
278    let agg_key = today_key("compliance:codex_shell_miss_");
279    upsert_daily_agg(store, &agg_key, key).await
280}
281
282/// Record a Codex prompt nudge: prompt indicated code work before clear consultation.
283pub async fn log_prompt_nudge(store: &Store, key: &str) -> Result<()> {
284    let agg_key = today_key("analytics:codex_prompt_nudge_");
285    upsert_daily_agg(store, &agg_key, key).await
286}
287
288/// Record a bootstrap event. Used to measure Codex/agent bootstrap adoption.
289pub async fn log_bootstrap(store: &Store, key: &str) -> Result<()> {
290    let agg_key = today_key("analytics:bootstrap_");
291    upsert_daily_agg(store, &agg_key, key).await
292}
293
294// ── check_consulted ───────────────────────────────────────────────────────────
295
296/// Return true if `session:consulted:{key}` exists (set by `log_hit`).
297pub async fn check_consulted(store: &Store, key: &str) -> Result<bool> {
298    let consulted_key = format!("session:consulted:{key}");
299    Ok(store.get(&consulted_key).await?.is_some())
300}
301
302/// Return true if the consulted marker exists and is newer than `ttl_secs`.
303pub async fn check_consulted_recent(store: &Store, key: &str, ttl_secs: u64) -> Result<bool> {
304    let consulted_key = format!("session:consulted:{key}");
305    let Some(record) = store.get(&consulted_key).await? else {
306        return Ok(false);
307    };
308    let age = now_secs().saturating_sub(record.updated_at);
309    Ok(age <= ttl_secs)
310}
311
312// ── session_flush ─────────────────────────────────────────────────────────────
313
314/// Collect all consulted markers into `session:current` for harvest.
315pub async fn session_flush(store: &Store) -> Result<()> {
316    let now = now_secs();
317
318    let consulted_keys = store.scan_keys("session:consulted:").await?;
319    let stripped: Vec<String> = consulted_keys
320        .iter()
321        .map(|k| {
322            k.strip_prefix("session:consulted:")
323                .unwrap_or(k)
324                .to_string()
325        })
326        .collect();
327
328    let session_data = serde_json::json!({
329        "consulted_keys": stripped,
330        "flushed_at": now,
331    });
332    let mut rec = session_record("session:current", String::new());
333    rec.payload = Some(session_data);
334    store.put("session:current", &rec).await?;
335    Ok(())
336}
337
338// ── session_harvest ───────────────────────────────────────────────────────────
339
340/// Archive session, run staleness analysis, auto-promote gotchas.
341///
342/// Full version: includes git-based staleness analysis. Used from CLI path.
343/// For the daemon socket path (tokio::spawn, !Send constraint), use
344/// `session_harvest_no_staleness` instead.
345pub async fn session_harvest(store: &Store, cwd: &Path) -> Result<()> {
346    let now = now_secs();
347
348    // M-12-D: promote gotcha candidates before archiving
349    match promote_gotcha_candidates(store).await {
350        Ok(n) if n > 0 => tracing::info!(promoted = n, "gotcha candidates auto-promoted"),
351        Ok(_) => {}
352        Err(e) => tracing::warn!(error = %e, "gotcha promotion failed"),
353    }
354
355    // M-13-A: run full staleness analysis
356    match StalenessAnalyzer::new(cwd).analyze_all(store).await {
357        Ok(report) if report.updated > 0 => {
358            tracing::info!(
359                scanned = report.scanned,
360                updated = report.updated,
361                tombstoned = report.tombstoned,
362                liability = report.liability,
363                "staleness analysis complete"
364            );
365        }
366        Ok(_) => {}
367        Err(e) => tracing::warn!(error = %e, "staleness analysis failed"),
368    }
369
370    // Read session:current (written by session-flush)
371    let session_rec = match store.get("session:current").await? {
372        Some(r) => r,
373        None => return Ok(()),
374    };
375
376    let session_value = match session_rec.payload.as_ref() {
377        Some(p) => serde_json::to_string(p).unwrap_or_default(),
378        None => session_rec.value.clone(),
379    };
380
381    // M-13-C: collect and store stale reviews for consulted keys
382    match collect_and_store_stale_reviews(store, &session_value, now).await {
383        Ok(n) if n > 0 => tracing::info!(entries = n, "stale review entries collected"),
384        Ok(_) => {}
385        Err(e) => tracing::warn!(error = %e, "stale review collection failed"),
386    }
387
388    // Write permanent session record
389    let session_key = format!("session:{now}");
390    let mut perm = session_record(&session_key, session_value);
391    perm.payload = session_rec.payload;
392    store.put(&session_key, &perm).await?;
393
394    // Clean up session:consulted:* markers
395    let consulted_keys = store.scan_keys("session:consulted:").await?;
396    for k in &consulted_keys {
397        store.delete(k).await?;
398    }
399
400    // Update stage:current with last session timestamp
401    if let Some(mut stage) = store.get("stage:current").await? {
402        stage.updated_at = now;
403        stage.version.logical_clock += 1;
404        stage.version.wall_clock = now;
405        let base = stage
406            .value
407            .lines()
408            .filter(|l| !l.starts_with("last_session:"))
409            .collect::<Vec<_>>()
410            .join("\n");
411        stage.value = if base.is_empty() {
412            format!("last_session: {session_key}")
413        } else {
414            format!("{base}\nlast_session: {session_key}")
415        };
416        store.put("stage:current", &stage).await?;
417    }
418
419    Ok(())
420}
421
422/// Session harvest without git-based staleness analysis.
423///
424/// Used from the daemon socket (tokio::spawn requires Send, but StalenessAnalyzer
425/// contains git2::Repository which is !Send). Staleness analysis is deferred to
426/// the next CLI-path harvest (when the MCP server is not holding the lock).
427pub async fn session_harvest_no_staleness(store: &Store) -> Result<()> {
428    let now = now_secs();
429
430    // M-12-D: promote gotcha candidates
431    match promote_gotcha_candidates(store).await {
432        Ok(n) if n > 0 => tracing::info!(promoted = n, "gotcha candidates auto-promoted"),
433        Ok(_) => {}
434        Err(e) => tracing::warn!(error = %e, "gotcha promotion failed"),
435    }
436
437    // Read session:current (written by session-flush)
438    let session_rec = match store.get("session:current").await? {
439        Some(r) => r,
440        None => return Ok(()),
441    };
442
443    let session_value = match session_rec.payload.as_ref() {
444        Some(p) => serde_json::to_string(p).unwrap_or_default(),
445        None => session_rec.value.clone(),
446    };
447
448    // M-13-C: collect stale reviews (no git analysis — uses existing staleness values)
449    match collect_and_store_stale_reviews(store, &session_value, now).await {
450        Ok(n) if n > 0 => tracing::info!(entries = n, "stale review entries collected"),
451        Ok(_) => {}
452        Err(e) => tracing::warn!(error = %e, "stale review collection failed"),
453    }
454
455    // Write permanent session record
456    let session_key = format!("session:{now}");
457    let mut perm = session_record(&session_key, session_value);
458    perm.payload = session_rec.payload;
459    store.put(&session_key, &perm).await?;
460
461    // Clean up consulted markers
462    let consulted_keys = store.scan_keys("session:consulted:").await?;
463    for k in &consulted_keys {
464        store.delete(k).await?;
465    }
466
467    // Update stage:current
468    if let Some(mut stage) = store.get("stage:current").await? {
469        stage.updated_at = now;
470        stage.version.logical_clock += 1;
471        stage.version.wall_clock = now;
472        let base = stage
473            .value
474            .lines()
475            .filter(|l| !l.starts_with("last_session:"))
476            .collect::<Vec<_>>()
477            .join("\n");
478        stage.value = if base.is_empty() {
479            format!("last_session: {session_key}")
480        } else {
481            format!("{base}\nlast_session: {session_key}")
482        };
483        store.put("stage:current", &stage).await?;
484    }
485
486    Ok(())
487}
488
489// ── doc_capture ───────────────────────────────────────────────────────────────
490
491/// Extract a canonical doc comment from `content` and update `file:{path}` record.
492///
493/// No-ops when: no record exists, record source is not StaticAnalysis, or no
494/// doc comment found in content.
495pub async fn doc_capture(store: &Store, path: &str, content: &str) -> Result<()> {
496    let purpose = extract_doc_comment(path, content);
497    if purpose.is_empty() {
498        return Ok(());
499    }
500
501    let file_key = format!("file:{path}");
502    let mut record = match store.get(&file_key).await? {
503        Some(r) => r,
504        None => return Ok(()),
505    };
506
507    // Only update when the record's current source is static analysis
508    // (Layer 0 stub) — don't overwrite developer-manual or higher-quality records.
509    if record.source != RecordSource::StaticAnalysis {
510        return Ok(());
511    }
512
513    if let Some(mut fr) = record.payload_as::<FileRecord>() {
514        fr.purpose = purpose.clone();
515        record.payload = serde_json::to_value(&fr).ok();
516    } else {
517        return Ok(());
518    }
519
520    let now = now_secs();
521    record.value = purpose;
522    record.source = RecordSource::SessionHook;
523    record.confidence.value = 0.65;
524    record.quality = QualityScore::doc_comment_default();
525    record.updated_at = now;
526    record.version.logical_clock += 1;
527    record.version.wall_clock = now;
528
529    if let Err(e) = store.put(&file_key, &record).await {
530        tracing::warn!(path, "doc-capture put failed: {e}");
531    }
532    Ok(())
533}
534
535// ── Doc comment extraction ────────────────────────────────────────────────────
536
537pub fn extract_doc_comment(path: &str, content: &str) -> String {
538    let ext = std::path::Path::new(path)
539        .extension()
540        .and_then(|e| e.to_str())
541        .unwrap_or("");
542
543    match ext {
544        "rs" => extract_rust_module_doc(content),
545        "py" => extract_python_docstring(content),
546        "go" => extract_go_package_doc_comment(content),
547        "ts" | "tsx" | "js" | "jsx" | "mjs" | "cjs" => extract_jsdoc(content),
548        _ => String::new(),
549    }
550}
551
552fn extract_rust_module_doc(content: &str) -> String {
553    let lines: Vec<&str> = content
554        .lines()
555        .take_while(|l| l.trim_start().starts_with("//!"))
556        .map(|l| l.trim_start().trim_start_matches("//!").trim())
557        .collect();
558    lines.join(" ").trim().to_string()
559}
560
561fn extract_python_docstring(content: &str) -> String {
562    let trimmed = content.trim_start();
563    for delim in &[r#"""""#, "'''"] {
564        if let Some(rest) = trimmed.strip_prefix(delim) {
565            if let Some(end) = rest.find(delim) {
566                return rest[..end]
567                    .trim()
568                    .lines()
569                    .next()
570                    .unwrap_or("")
571                    .trim()
572                    .to_string();
573            }
574        }
575    }
576    String::new()
577}
578
579fn extract_go_package_doc_comment(content: &str) -> String {
580    let mut lines: Vec<String> = Vec::new();
581    for line in content.lines() {
582        let t = line.trim();
583        if t.starts_with("//") {
584            lines.push(t.trim_start_matches("//").trim().to_string());
585        } else if t.starts_with("package ") {
586            break;
587        } else if !t.is_empty() {
588            lines.clear();
589        }
590    }
591    lines.join(" ").trim().to_string()
592}
593
594fn extract_jsdoc(content: &str) -> String {
595    let trimmed = content.trim_start();
596    if let Some(rest) = trimmed.strip_prefix("/**") {
597        if let Some(end) = rest.find("*/") {
598            let text: Vec<&str> = rest[..end]
599                .lines()
600                .map(|l| l.trim().trim_start_matches('*').trim())
601                .filter(|l| !l.is_empty())
602                .collect();
603            return text.join(" ").trim().to_string();
604        }
605    }
606    String::new()
607}
608
609// ── M-12-D: Gotcha auto-promotion ────────────────────────────────────────────
610
611pub async fn promote_gotcha_candidates(store: &Store) -> Result<u32> {
612    let gotchas = store.scan_prefix("gotcha:").await?;
613    let now = now_secs();
614    let mut promoted = 0u32;
615
616    for mut record in gotchas {
617        if record.access_count < GOTCHA_PROMOTION_ACCESS_THRESHOLD {
618            continue;
619        }
620        let mut gotcha: GotchaRecord = match record.payload_as::<GotchaRecord>() {
621            Some(g) => g,
622            None => continue,
623        };
624        if gotcha.confirmed {
625            continue;
626        }
627        gotcha.confirmed = true;
628        record.payload = serde_json::to_value(&gotcha).ok();
629        // NOTE: confirmation_count includes auto-promotions. Downstream consumers
630        // should not assume this counter reflects only human confirmations.
631        record.confidence.confirmation_count += 1;
632        record.updated_at = now;
633        record.version.logical_clock += 1;
634        record.version.wall_clock = now;
635        store.put(&record.key, &record).await?;
636        promoted += 1;
637    }
638
639    Ok(promoted)
640}
641
642// ── M-13-C: Stale review collection ──────────────────────────────────────────
643
644pub fn format_review_date(now_secs: u64) -> String {
645    let dt = chrono::DateTime::from_timestamp(now_secs as i64, 0).unwrap_or_else(chrono::Utc::now);
646    dt.format("%Y-%m-%d").to_string()
647}
648
649pub async fn collect_and_store_stale_reviews(
650    store: &Store,
651    session_value: &str,
652    now: u64,
653) -> Result<usize> {
654    let session: serde_json::Value = serde_json::from_str(session_value)?;
655    let consulted_keys = match session["consulted_keys"].as_array() {
656        Some(arr) => arr
657            .iter()
658            .filter_map(|v| v.as_str().map(|s| s.to_string()))
659            .collect::<Vec<_>>(),
660        None => return Ok(0),
661    };
662    if consulted_keys.is_empty() {
663        return Ok(0);
664    }
665
666    let new_entries = collect_stale_entries(store, &consulted_keys).await?;
667    if new_entries.is_empty() {
668        return Ok(0);
669    }
670
671    let date = format_review_date(now);
672    let review_key = format!("analytics:stale_review_{date}");
673    let new_count = new_entries.len();
674
675    let mut payload = match store.get(&review_key).await? {
676        Some(existing) => {
677            existing
678                .payload_as::<StaleReviewPayload>()
679                .unwrap_or(StaleReviewPayload {
680                    session_timestamp: now,
681                    entries: vec![],
682                })
683        }
684        None => StaleReviewPayload {
685            session_timestamp: now,
686            entries: vec![],
687        },
688    };
689
690    // Merge: new entries take priority, dedup by key
691    let mut seen_keys = std::collections::HashSet::new();
692    let mut merged = Vec::new();
693    for entry in new_entries {
694        if seen_keys.insert(entry.key.clone()) {
695            merged.push(entry);
696        }
697    }
698    for entry in payload.entries {
699        if seen_keys.insert(entry.key.clone()) {
700            merged.push(entry);
701        }
702    }
703
704    // Sort descending by staleness, truncate
705    merged.sort_by(|a, b| {
706        b.staleness_value
707            .partial_cmp(&a.staleness_value)
708            .unwrap_or(std::cmp::Ordering::Equal)
709    });
710    merged.truncate(MAX_STALE_REVIEW_ENTRIES);
711
712    payload.session_timestamp = now;
713    payload.entries = merged;
714
715    let mut record = analytics_record(&review_key, String::new());
716    record.payload = serde_json::to_value(&payload).ok();
717    store.put(&review_key, &record).await?;
718
719    Ok(new_count)
720}
721
722pub async fn collect_stale_entries(
723    store: &Store,
724    consulted_keys: &[String],
725) -> Result<Vec<StaleReviewEntry>> {
726    let mut entries = Vec::new();
727
728    for key in consulted_keys {
729        let record = match store.get(key).await? {
730            Some(r) => r,
731            None => continue,
732        };
733
734        // Exclude non-Active lifecycle
735        if !matches!(record.lifecycle, RecordLifecycle::Active) {
736            continue;
737        }
738
739        // Exclude Liability and Tombstone tiers
740        if matches!(
741            record.staleness.tier,
742            StalenessTier::Liability | StalenessTier::Tombstone
743        ) {
744            continue;
745        }
746
747        // Filter to [STALE_REVIEW_MIN, STALE_REVIEW_MAX) range
748        if record.staleness.value < STALE_REVIEW_MIN || record.staleness.value >= STALE_REVIEW_MAX {
749            continue;
750        }
751
752        let top_signals: Vec<String> = record
753            .staleness
754            .signals
755            .iter()
756            .take(3)
757            .map(|s| s.to_string())
758            .collect();
759
760        entries.push(StaleReviewEntry {
761            key: key.clone(),
762            staleness_value: record.staleness.value,
763            tier: record.staleness.tier.clone(),
764            last_updated: record.updated_at,
765            signals: top_signals,
766        });
767    }
768
769    entries.sort_by(|a, b| {
770        b.staleness_value
771            .partial_cmp(&a.staleness_value)
772            .unwrap_or(std::cmp::Ordering::Equal)
773    });
774    entries.truncate(MAX_STALE_REVIEW_ENTRIES);
775
776    Ok(entries)
777}
778
779#[cfg(test)]
780mod tests {
781    use tempfile::TempDir;
782
783    use super::*;
784
785    async fn temp_store() -> (TempDir, Store) {
786        let dir = TempDir::new().expect("tempdir");
787        let store = Store::open(dir.path()).await.expect("open store");
788        (dir, store)
789    }
790
791    #[tokio::test]
792    async fn log_bootstrap_creates_daily_aggregate() {
793        let (_dir, store) = temp_store().await;
794
795        log_bootstrap(&store, "__bootstrap__")
796            .await
797            .expect("log bootstrap");
798
799        let key = today_key("analytics:bootstrap_");
800        let record = store
801            .get(&key)
802            .await
803            .expect("get bootstrap aggregate")
804            .expect("bootstrap record exists");
805        let agg = record.payload_as::<DailyAgg>().expect("daily agg payload");
806        assert_eq!(agg.count, 1);
807        assert_eq!(agg.keys, vec!["__bootstrap__".to_string()]);
808    }
809
810    #[tokio::test]
811    async fn check_consulted_recent_uses_receipt_ttl() {
812        let (_dir, store) = temp_store().await;
813        let key = "file:src/main.rs";
814
815        assert!(!check_consulted_recent(&store, key, 900)
816            .await
817            .expect("no receipt yet"));
818
819        log_hit(&store, key).await.expect("log consultation hit");
820
821        assert!(check_consulted_recent(&store, key, 900)
822            .await
823            .expect("fresh receipt should be valid"));
824    }
825}