1use std::path::Path;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use anyhow::{Context, Result};
14
15use super::{
16 Category, ConfidenceScore, FileRecord, GotchaRecord, Priority, QualityScore, Record,
17 RecordLifecycle, RecordSource, RecordVersion, StaleReviewEntry, StaleReviewPayload,
18 StalenessScore, StalenessTier, Store,
19};
20use crate::health::staleness::StalenessAnalyzer;
21
22pub fn now_secs() -> u64 {
25 SystemTime::now()
26 .duration_since(UNIX_EPOCH)
27 .unwrap_or_default()
28 .as_secs()
29}
30
31pub fn today_key(prefix: &str) -> String {
32 let now = chrono::Utc::now().format("%Y-%m-%d");
33 format!("{prefix}{now}")
34}
35
36pub fn session_record(key: &str, value: String) -> Record {
37 let now = now_secs();
38 Record {
39 key: key.to_string(),
40 value,
41 category: Category::Session,
42 priority: Priority::Normal,
43 tags: vec![],
44 created_at: now,
45 updated_at: now,
46 ref_url: None,
47 staleness: StalenessScore::fresh(),
48 lifecycle: RecordLifecycle::Active,
49 version: RecordVersion {
50 device_id: crate::store::stable_device_id(),
51 logical_clock: 1,
52 wall_clock: now,
53 },
54 quality: QualityScore::layer0_default(),
55 access_count: 0,
56 last_accessed: 0,
57 source: RecordSource::SessionHook,
58 confidence: ConfidenceScore::for_new_record(&RecordSource::SessionHook),
59 gap_analysis_score: 0.0,
60 payload: None,
61 }
62}
63
64pub fn analytics_record(key: &str, value: String) -> Record {
65 let mut r = session_record(key, value);
66 r.category = Category::Analytics;
67 r
68}
69
70#[derive(serde::Serialize, serde::Deserialize, Debug)]
72pub struct DailyAgg {
73 pub count: u64,
74 pub keys: Vec<String>,
75}
76
77pub const MAX_AGG_KEYS: usize = 100;
78
79const STALE_REVIEW_MIN: f32 = 0.4;
81const STALE_REVIEW_MAX: f32 = 0.7;
83pub const CONSULTED_RECENT_TTL_SECS: u64 = 900;
85pub const MAX_STALE_REVIEW_ENTRIES: usize = 20;
87pub const GOTCHA_PROMOTION_ACCESS_THRESHOLD: u32 = 3;
89
90pub async fn upsert_daily_agg(store: &Store, agg_key: &str, target_key: &str) -> Result<()> {
91 let now = now_secs();
92
93 match store.get(agg_key).await? {
94 Some(mut record) => {
95 let mut agg: DailyAgg = record.payload_as::<DailyAgg>().unwrap_or(DailyAgg {
96 count: 0,
97 keys: vec![],
98 });
99 agg.count += 1;
100 if agg.keys.len() < MAX_AGG_KEYS && !agg.keys.iter().any(|k| k == target_key) {
101 agg.keys.push(target_key.to_string());
102 }
103 record.payload = serde_json::to_value(&agg).ok();
104 record.updated_at = now;
105 record.version.logical_clock += 1;
106 record.version.wall_clock = now;
107 store.put(agg_key, &record).await?;
108 }
109 None => {
110 let agg = DailyAgg {
111 count: 1,
112 keys: vec![target_key.to_string()],
113 };
114 let mut record = analytics_record(agg_key, String::new());
115 record.payload = serde_json::to_value(&agg).ok();
116 store.put(agg_key, &record).await?;
117 }
118 }
119
120 Ok(())
121}
122
123pub async fn upsert_daily_agg_staged(
129 store: &Store,
130 agg_key: &str,
131 target_key: &str,
132) -> Result<(String, Vec<u8>)> {
133 let now = now_secs();
134
135 let record = match store.get(agg_key).await? {
136 Some(mut record) => {
137 let mut agg: DailyAgg = record.payload_as::<DailyAgg>().unwrap_or(DailyAgg {
138 count: 0,
139 keys: vec![],
140 });
141 agg.count += 1;
142 if agg.keys.len() < MAX_AGG_KEYS && !agg.keys.iter().any(|k| k == target_key) {
143 agg.keys.push(target_key.to_string());
144 }
145 record.payload = serde_json::to_value(&agg).ok();
146 record.updated_at = now;
147 record.version.logical_clock += 1;
148 record.version.wall_clock = now;
149 record
150 }
151 None => {
152 let agg = DailyAgg {
153 count: 1,
154 keys: vec![target_key.to_string()],
155 };
156 let mut record = analytics_record(agg_key, String::new());
157 record.payload = serde_json::to_value(&agg).ok();
158 record
159 }
160 };
161
162 let bytes = rmp_serde::to_vec_named(&record)
163 .with_context(|| format!("failed to serialize agg record for {agg_key}"))?;
164 Ok((agg_key.to_string(), bytes))
165}
166
167fn receipt_key(key: &str, actor: Option<&str>) -> String {
168 match actor {
169 Some(a) => format!("session:consulted:{a}:{key}"),
170 None => format!("session:consulted:{key}"),
171 }
172}
173
174pub fn consultation_receipt_staged(key: &str, actor: Option<&str>) -> Result<(String, Vec<u8>)> {
179 let consulted_key = receipt_key(key, actor);
180 let record = session_record(&consulted_key, String::new());
181 let bytes = rmp_serde::to_vec_named(&record)
182 .with_context(|| format!("failed to serialize consulted receipt for {consulted_key}"))?;
183 Ok((consulted_key, bytes))
184}
185
186pub async fn session_flush_staged(store: &Store) -> Result<Option<(String, Vec<u8>)>> {
190 let now = now_secs();
191 let consulted_keys = store.scan_keys("session:consulted:").await?;
192 let stripped: Vec<String> = consulted_keys
193 .iter()
194 .map(|k| {
195 k.strip_prefix("session:consulted:")
196 .unwrap_or(k)
197 .to_string()
198 })
199 .collect();
200
201 let session_data = serde_json::json!({
202 "consulted_keys": stripped,
203 "flushed_at": now,
204 });
205 let mut rec = session_record("session:current", String::new());
206 rec.payload = Some(session_data);
207 let bytes = rmp_serde::to_vec_named(&rec)?;
208 Ok(Some(("session:current".to_string(), bytes)))
209}
210
211pub async fn log_hit(store: &Store, key: &str) -> Result<()> {
215 let now = now_secs();
216
217 let agg_key = today_key("analytics:hit_");
219 upsert_daily_agg(store, &agg_key, key).await?;
220
221 let consulted_key = receipt_key(key, None);
223 store
224 .put(
225 &consulted_key,
226 &session_record(&consulted_key, String::new()),
227 )
228 .await?;
229
230 if let Some(mut record) = store.get(key).await? {
232 record.access_count += 1;
233 record.last_accessed = now;
234 store.put(key, &record).await?;
235 }
236
237 let _ = crate::store::enforcement::record_event(
246 store,
247 crate::store::enforcement::EnforcementEventType::ReceiptMinted,
248 crate::store::enforcement::SubjectKind::File,
249 key.to_string(),
250 "claude".to_string(),
251 None,
252 "consultation_requested".to_string(),
253 None,
254 )
255 .await;
256
257 Ok(())
258}
259
260pub async fn log_miss(store: &Store, key: &str) -> Result<()> {
264 let agg_key = today_key("analytics:miss_");
265 upsert_daily_agg(store, &agg_key, key).await
266}
267
268pub async fn log_compliance_miss(store: &Store, key: &str) -> Result<()> {
272 let agg_key = today_key("compliance:miss_");
273 upsert_daily_agg(store, &agg_key, key).await
274}
275
276pub async fn log_compliance_hit(store: &Store, key: &str) -> Result<()> {
280 let agg_key = today_key("compliance:allow_after_receipt_");
281 upsert_daily_agg(store, &agg_key, key).await
282}
283
284pub async fn log_codex_shell_miss(store: &Store, key: &str) -> Result<()> {
286 let agg_key = today_key("compliance:codex_shell_miss_");
287 upsert_daily_agg(store, &agg_key, key).await
288}
289
290pub async fn log_prompt_nudge(store: &Store, key: &str) -> Result<()> {
292 let agg_key = today_key("analytics:codex_prompt_nudge_");
293 upsert_daily_agg(store, &agg_key, key).await
294}
295
296pub async fn log_bootstrap(store: &Store, key: &str) -> Result<()> {
298 let agg_key = today_key("analytics:bootstrap_");
299 upsert_daily_agg(store, &agg_key, key).await
300}
301
302pub async fn check_consulted(store: &Store, key: &str, actor: Option<&str>) -> Result<bool> {
310 let consulted_key = receipt_key(key, actor);
311 Ok(store.get(&consulted_key).await?.is_some())
312}
313
314pub async fn check_consulted_recent(
321 store: &Store,
322 key: &str,
323 ttl_secs: u64,
324 actor: Option<&str>,
325) -> Result<bool> {
326 let consulted_key = receipt_key(key, actor);
327 let Some(record) = store.get(&consulted_key).await? else {
328 return Ok(false);
329 };
330 let age = now_secs().saturating_sub(record.updated_at);
331 Ok(age <= ttl_secs)
332}
333
334pub async fn session_flush(store: &Store) -> Result<()> {
338 let now = now_secs();
339
340 let consulted_keys = store.scan_keys("session:consulted:").await?;
341 let stripped: Vec<String> = consulted_keys
342 .iter()
343 .map(|k| {
344 k.strip_prefix("session:consulted:")
345 .unwrap_or(k)
346 .to_string()
347 })
348 .collect();
349
350 let session_data = serde_json::json!({
351 "consulted_keys": stripped,
352 "flushed_at": now,
353 });
354 let mut rec = session_record("session:current", String::new());
355 rec.payload = Some(session_data);
356 store.put("session:current", &rec).await?;
357 Ok(())
358}
359
360async fn delete_all_receipts(store: &Store) -> Result<()> {
366 let consulted_keys = store.scan_keys("session:consulted:").await?;
367 for k in &consulted_keys {
368 store.delete(k).await?;
369 }
370 Ok(())
371}
372
373pub async fn session_clear_consults(store: &Store) -> Result<()> {
379 delete_all_receipts(store).await
380}
381
382pub async fn session_harvest(store: &Store, cwd: &Path) -> Result<()> {
390 let now = now_secs();
391
392 match promote_gotcha_candidates(store).await {
394 Ok(n) if n > 0 => tracing::info!(promoted = n, "gotcha candidates auto-promoted"),
395 Ok(_) => {}
396 Err(e) => tracing::warn!(error = %e, "gotcha promotion failed"),
397 }
398
399 match StalenessAnalyzer::new(cwd).analyze_all(store).await {
401 Ok(report) if report.updated > 0 => {
402 tracing::info!(
403 scanned = report.scanned,
404 updated = report.updated,
405 tombstoned = report.tombstoned,
406 liability = report.liability,
407 "staleness analysis complete"
408 );
409 }
410 Ok(_) => {}
411 Err(e) => tracing::warn!(error = %e, "staleness analysis failed"),
412 }
413
414 let session_rec = match store.get("session:current").await? {
416 Some(r) => r,
417 None => return Ok(()),
418 };
419
420 let session_value = match session_rec.payload.as_ref() {
421 Some(p) => serde_json::to_string(p).unwrap_or_default(),
422 None => session_rec.value.clone(),
423 };
424
425 match collect_and_store_stale_reviews(store, &session_value, now).await {
427 Ok(n) if n > 0 => tracing::info!(entries = n, "stale review entries collected"),
428 Ok(_) => {}
429 Err(e) => tracing::warn!(error = %e, "stale review collection failed"),
430 }
431
432 let session_key = format!("session:{now}");
434 let mut perm = session_record(&session_key, session_value);
435 perm.payload = session_rec.payload;
436 store.put(&session_key, &perm).await?;
437
438 delete_all_receipts(store).await?;
440
441 if let Some(mut stage) = store.get("stage:current").await? {
443 stage.updated_at = now;
444 stage.version.logical_clock += 1;
445 stage.version.wall_clock = now;
446 let base = stage
447 .value
448 .lines()
449 .filter(|l| !l.starts_with("last_session:"))
450 .collect::<Vec<_>>()
451 .join("\n");
452 stage.value = if base.is_empty() {
453 format!("last_session: {session_key}")
454 } else {
455 format!("{base}\nlast_session: {session_key}")
456 };
457 store.put("stage:current", &stage).await?;
458 }
459
460 Ok(())
461}
462
463pub async fn session_harvest_no_staleness(store: &Store) -> Result<()> {
469 let now = now_secs();
470
471 match promote_gotcha_candidates(store).await {
473 Ok(n) if n > 0 => tracing::info!(promoted = n, "gotcha candidates auto-promoted"),
474 Ok(_) => {}
475 Err(e) => tracing::warn!(error = %e, "gotcha promotion failed"),
476 }
477
478 let session_rec = match store.get("session:current").await? {
480 Some(r) => r,
481 None => return Ok(()),
482 };
483
484 let session_value = match session_rec.payload.as_ref() {
485 Some(p) => serde_json::to_string(p).unwrap_or_default(),
486 None => session_rec.value.clone(),
487 };
488
489 match collect_and_store_stale_reviews(store, &session_value, now).await {
491 Ok(n) if n > 0 => tracing::info!(entries = n, "stale review entries collected"),
492 Ok(_) => {}
493 Err(e) => tracing::warn!(error = %e, "stale review collection failed"),
494 }
495
496 let session_key = format!("session:{now}");
498 let mut perm = session_record(&session_key, session_value);
499 perm.payload = session_rec.payload;
500 store.put(&session_key, &perm).await?;
501
502 delete_all_receipts(store).await?;
504
505 if let Some(mut stage) = store.get("stage:current").await? {
507 stage.updated_at = now;
508 stage.version.logical_clock += 1;
509 stage.version.wall_clock = now;
510 let base = stage
511 .value
512 .lines()
513 .filter(|l| !l.starts_with("last_session:"))
514 .collect::<Vec<_>>()
515 .join("\n");
516 stage.value = if base.is_empty() {
517 format!("last_session: {session_key}")
518 } else {
519 format!("{base}\nlast_session: {session_key}")
520 };
521 store.put("stage:current", &stage).await?;
522 }
523
524 Ok(())
525}
526
527pub async fn doc_capture(store: &Store, path: &str, content: &str) -> Result<()> {
534 let purpose = extract_doc_comment(path, content);
535 if purpose.is_empty() {
536 return Ok(());
537 }
538
539 let file_key = format!("file:{path}");
540 let mut record = match store.get(&file_key).await? {
541 Some(r) => r,
542 None => return Ok(()),
543 };
544
545 if record.source != RecordSource::StaticAnalysis {
548 return Ok(());
549 }
550
551 if let Some(mut fr) = record.payload_as::<FileRecord>() {
552 fr.purpose = purpose.clone();
553 record.payload = serde_json::to_value(&fr).ok();
554 } else {
555 return Ok(());
556 }
557
558 let now = now_secs();
559 record.value = purpose;
560 record.source = RecordSource::SessionHook;
561 record.confidence.value = 0.65;
562 record.quality = QualityScore::doc_comment_default();
563 record.updated_at = now;
564 record.version.logical_clock += 1;
565 record.version.wall_clock = now;
566
567 if let Err(e) = store.put(&file_key, &record).await {
568 tracing::warn!(path, "doc-capture put failed: {e}");
569 }
570 Ok(())
571}
572
573pub fn extract_doc_comment(path: &str, content: &str) -> String {
576 let ext = std::path::Path::new(path)
577 .extension()
578 .and_then(|e| e.to_str())
579 .unwrap_or("");
580
581 match ext {
582 "rs" => extract_rust_module_doc(content),
583 "py" => extract_python_docstring(content),
584 "go" => extract_go_package_doc_comment(content),
585 "ts" | "tsx" | "js" | "jsx" | "mjs" | "cjs" => extract_jsdoc(content),
586 _ => String::new(),
587 }
588}
589
590fn extract_rust_module_doc(content: &str) -> String {
591 let lines: Vec<&str> = content
592 .lines()
593 .take_while(|l| l.trim_start().starts_with("//!"))
594 .map(|l| l.trim_start().trim_start_matches("//!").trim())
595 .collect();
596 lines.join(" ").trim().to_string()
597}
598
599fn extract_python_docstring(content: &str) -> String {
600 let trimmed = content.trim_start();
601 for delim in &[r#"""""#, "'''"] {
602 if let Some(rest) = trimmed.strip_prefix(delim) {
603 if let Some(end) = rest.find(delim) {
604 return rest[..end]
605 .trim()
606 .lines()
607 .next()
608 .unwrap_or("")
609 .trim()
610 .to_string();
611 }
612 }
613 }
614 String::new()
615}
616
617fn extract_go_package_doc_comment(content: &str) -> String {
618 let mut lines: Vec<String> = Vec::new();
619 for line in content.lines() {
620 let t = line.trim();
621 if t.starts_with("//") {
622 lines.push(t.trim_start_matches("//").trim().to_string());
623 } else if t.starts_with("package ") {
624 break;
625 } else if !t.is_empty() {
626 lines.clear();
627 }
628 }
629 lines.join(" ").trim().to_string()
630}
631
632fn extract_jsdoc(content: &str) -> String {
633 let trimmed = content.trim_start();
634 if let Some(rest) = trimmed.strip_prefix("/**") {
635 if let Some(end) = rest.find("*/") {
636 let text: Vec<&str> = rest[..end]
637 .lines()
638 .map(|l| l.trim().trim_start_matches('*').trim())
639 .filter(|l| !l.is_empty())
640 .collect();
641 return text.join(" ").trim().to_string();
642 }
643 }
644 String::new()
645}
646
647pub async fn promote_gotcha_candidates(store: &Store) -> Result<u32> {
650 let gotchas = store.scan_prefix("gotcha:").await?;
651 let now = now_secs();
652 let mut promoted = 0u32;
653
654 for mut record in gotchas {
655 if record.access_count < GOTCHA_PROMOTION_ACCESS_THRESHOLD {
656 continue;
657 }
658 let mut gotcha: GotchaRecord = match record.payload_as::<GotchaRecord>() {
659 Some(g) => g,
660 None => continue,
661 };
662 if gotcha.confirmed {
663 continue;
664 }
665 gotcha.confirmed = true;
666 record.payload = serde_json::to_value(&gotcha).ok();
667 record.confidence.confirmation_count += 1;
670 record.updated_at = now;
671 record.version.logical_clock += 1;
672 record.version.wall_clock = now;
673 store.put(&record.key, &record).await?;
674 promoted += 1;
675 }
676
677 Ok(promoted)
678}
679
680pub fn format_review_date(now_secs: u64) -> String {
683 let dt = chrono::DateTime::from_timestamp(now_secs as i64, 0).unwrap_or_else(chrono::Utc::now);
684 dt.format("%Y-%m-%d").to_string()
685}
686
687pub async fn collect_and_store_stale_reviews(
688 store: &Store,
689 session_value: &str,
690 now: u64,
691) -> Result<usize> {
692 let session: serde_json::Value = serde_json::from_str(session_value)?;
693 let consulted_keys = match session["consulted_keys"].as_array() {
694 Some(arr) => arr
695 .iter()
696 .filter_map(|v| v.as_str().map(|s| s.to_string()))
697 .collect::<Vec<_>>(),
698 None => return Ok(0),
699 };
700 if consulted_keys.is_empty() {
701 return Ok(0);
702 }
703
704 let new_entries = collect_stale_entries(store, &consulted_keys).await?;
705 if new_entries.is_empty() {
706 return Ok(0);
707 }
708
709 let date = format_review_date(now);
710 let review_key = format!("analytics:stale_review_{date}");
711 let new_count = new_entries.len();
712
713 let mut payload = match store.get(&review_key).await? {
714 Some(existing) => {
715 existing
716 .payload_as::<StaleReviewPayload>()
717 .unwrap_or(StaleReviewPayload {
718 session_timestamp: now,
719 entries: vec![],
720 })
721 }
722 None => StaleReviewPayload {
723 session_timestamp: now,
724 entries: vec![],
725 },
726 };
727
728 let mut seen_keys = std::collections::HashSet::new();
730 let mut merged = Vec::new();
731 for entry in new_entries {
732 if seen_keys.insert(entry.key.clone()) {
733 merged.push(entry);
734 }
735 }
736 for entry in payload.entries {
737 if seen_keys.insert(entry.key.clone()) {
738 merged.push(entry);
739 }
740 }
741
742 merged.sort_by(|a, b| {
744 b.staleness_value
745 .partial_cmp(&a.staleness_value)
746 .unwrap_or(std::cmp::Ordering::Equal)
747 });
748 merged.truncate(MAX_STALE_REVIEW_ENTRIES);
749
750 payload.session_timestamp = now;
751 payload.entries = merged;
752
753 let mut record = analytics_record(&review_key, String::new());
754 record.payload = serde_json::to_value(&payload).ok();
755 store.put(&review_key, &record).await?;
756
757 Ok(new_count)
758}
759
760pub async fn collect_stale_entries(
761 store: &Store,
762 consulted_keys: &[String],
763) -> Result<Vec<StaleReviewEntry>> {
764 let mut entries = Vec::new();
765
766 for key in consulted_keys {
767 let record = match store.get(key).await? {
768 Some(r) => r,
769 None => continue,
770 };
771
772 if !matches!(record.lifecycle, RecordLifecycle::Active) {
774 continue;
775 }
776
777 if matches!(
779 record.staleness.tier,
780 StalenessTier::Liability | StalenessTier::Tombstone
781 ) {
782 continue;
783 }
784
785 if record.staleness.value < STALE_REVIEW_MIN || record.staleness.value >= STALE_REVIEW_MAX {
787 continue;
788 }
789
790 let top_signals: Vec<String> = record
791 .staleness
792 .signals
793 .iter()
794 .take(3)
795 .map(|s| s.to_string())
796 .collect();
797
798 entries.push(StaleReviewEntry {
799 key: key.clone(),
800 staleness_value: record.staleness.value,
801 tier: record.staleness.tier.clone(),
802 last_updated: record.updated_at,
803 signals: top_signals,
804 });
805 }
806
807 entries.sort_by(|a, b| {
808 b.staleness_value
809 .partial_cmp(&a.staleness_value)
810 .unwrap_or(std::cmp::Ordering::Equal)
811 });
812 entries.truncate(MAX_STALE_REVIEW_ENTRIES);
813
814 Ok(entries)
815}
816
817#[cfg(test)]
818mod tests {
819 use tempfile::TempDir;
820
821 use super::*;
822
823 async fn temp_store() -> (TempDir, Store) {
824 let dir = TempDir::new().expect("tempdir");
825 let store = Store::open(dir.path()).await.expect("open store");
826 (dir, store)
827 }
828
829 #[tokio::test]
830 async fn log_bootstrap_creates_daily_aggregate() {
831 let (_dir, store) = temp_store().await;
832
833 log_bootstrap(&store, "__bootstrap__")
834 .await
835 .expect("log bootstrap");
836
837 let key = today_key("analytics:bootstrap_");
838 let record = store
839 .get(&key)
840 .await
841 .expect("get bootstrap aggregate")
842 .expect("bootstrap record exists");
843 let agg = record.payload_as::<DailyAgg>().expect("daily agg payload");
844 assert_eq!(agg.count, 1);
845 assert_eq!(agg.keys, vec!["__bootstrap__".to_string()]);
846 }
847
848 #[tokio::test]
849 async fn check_consulted_recent_uses_receipt_ttl() {
850 let (_dir, store) = temp_store().await;
851 let key = "file:src/main.rs";
852
853 assert!(!check_consulted_recent(&store, key, 900, None)
854 .await
855 .expect("no receipt yet"));
856
857 log_hit(&store, key).await.expect("log consultation hit");
858
859 assert!(check_consulted_recent(&store, key, 900, None)
860 .await
861 .expect("fresh receipt should be valid"));
862 }
863
864 #[tokio::test]
865 async fn consult_receipt_is_actor_scoped_when_actor_present() {
866 let (_dir, store) = temp_store().await;
867
868 let (k, v) = consultation_receipt_staged("file:x", Some("agentA")).unwrap();
870 store.transact_sessions_raw(&[(&k, &v)]).await.unwrap();
871
872 let keys = store.scan_keys("session:consulted:").await.unwrap();
873 assert!(
874 keys.iter().any(|k| k == "session:consulted:agentA:file:x"),
875 "actor-scoped key must be present, got: {keys:?}"
876 );
877 assert!(
878 !keys.iter().any(|k| k == "session:consulted:file:x"),
879 "global key must NOT be written by actor-scoped call, got: {keys:?}"
880 );
881
882 let (k2, v2) = consultation_receipt_staged("file:x", None).unwrap();
884 store.transact_sessions_raw(&[(&k2, &v2)]).await.unwrap();
885
886 let keys2 = store.scan_keys("session:consulted:").await.unwrap();
887 assert!(
888 keys2.iter().any(|k| k == "session:consulted:file:x"),
889 "global key must be present with actor=None, got: {keys2:?}"
890 );
891 }
892
893 #[tokio::test]
894 async fn gate_requires_actor_scoped_receipt_for_subagent() {
895 let (_dir, store) = temp_store().await;
896
897 let (k, v) = consultation_receipt_staged("file:x", Some("agentA")).unwrap();
899 store.transact_sessions_raw(&[(&k, &v)]).await.unwrap();
900
901 assert!(
903 check_consulted_recent(&store, "file:x", 900, Some("agentA"))
904 .await
905 .expect("agentA receipt lookup"),
906 "agentA should see its own actor-scoped receipt"
907 );
908
909 assert!(
911 !check_consulted_recent(&store, "file:x", 900, Some("agentB"))
912 .await
913 .expect("agentB receipt lookup"),
914 "agentB must NOT ride agentA's receipt"
915 );
916
917 let (k2, v2) = consultation_receipt_staged("file:y", None).unwrap();
919 store.transact_sessions_raw(&[(&k2, &v2)]).await.unwrap();
920
921 assert!(
923 check_consulted_recent(&store, "file:y", 900, None)
924 .await
925 .expect("global receipt lookup"),
926 "main thread must still see the global receipt"
927 );
928
929 assert!(
931 !check_consulted_recent(&store, "file:y", 900, Some("agentA"))
932 .await
933 .expect("agentA vs global receipt lookup"),
934 "subagent must NOT ride the global main-thread receipt"
935 );
936 }
937
938 #[tokio::test]
939 async fn session_clear_consults_deletes_all_receipts() {
940 let (_dir, store) = temp_store().await;
941 let key1 = "file:src/main.rs";
942 let key2 = "file:src/lib.rs";
943
944 log_hit(&store, key1).await.expect("log first hit");
945 log_hit(&store, key2).await.expect("log second hit");
946
947 let before = store
949 .scan_keys("session:consulted:")
950 .await
951 .expect("scan before");
952 assert_eq!(before.len(), 2, "expected two receipts before clear");
953
954 session_clear_consults(&store)
955 .await
956 .expect("clear_consults should succeed");
957
958 let after = store
959 .scan_keys("session:consulted:")
960 .await
961 .expect("scan after");
962 assert!(after.is_empty(), "all receipts should be gone after clear");
963 }
964}