1use std::path::Path;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use anyhow::{Context, Result};
14
15use super::{
16 Category, ConfidenceScore, FileRecord, GotchaRecord, Priority, QualityScore, Record,
17 RecordLifecycle, RecordSource, RecordVersion, StaleReviewEntry, StaleReviewPayload,
18 StalenessScore, StalenessTier, Store,
19};
20use crate::health::staleness::StalenessAnalyzer;
21
22pub fn now_secs() -> u64 {
25 SystemTime::now()
26 .duration_since(UNIX_EPOCH)
27 .unwrap_or_default()
28 .as_secs()
29}
30
31pub fn today_key(prefix: &str) -> String {
32 let now = chrono::Utc::now().format("%Y-%m-%d");
33 format!("{prefix}{now}")
34}
35
36pub fn session_record(key: &str, value: String) -> Record {
37 let now = now_secs();
38 Record {
39 key: key.to_string(),
40 value,
41 category: Category::Session,
42 priority: Priority::Normal,
43 tags: vec![],
44 created_at: now,
45 updated_at: now,
46 ref_url: None,
47 staleness: StalenessScore::fresh(),
48 lifecycle: RecordLifecycle::Active,
49 version: RecordVersion {
50 device_id: uuid::Uuid::new_v4(),
51 logical_clock: 1,
52 wall_clock: now,
53 },
54 quality: QualityScore::layer0_default(),
55 access_count: 0,
56 last_accessed: 0,
57 source: RecordSource::SessionHook,
58 confidence: ConfidenceScore::for_new_record(&RecordSource::SessionHook),
59 gap_analysis_score: 0.0,
60 payload: None,
61 }
62}
63
64pub fn analytics_record(key: &str, value: String) -> Record {
65 let mut r = session_record(key, value);
66 r.category = Category::Analytics;
67 r
68}
69
70#[derive(serde::Serialize, serde::Deserialize, Debug)]
72pub struct DailyAgg {
73 pub count: u64,
74 pub keys: Vec<String>,
75}
76
77pub const MAX_AGG_KEYS: usize = 100;
78
79const STALE_REVIEW_MIN: f32 = 0.4;
81const STALE_REVIEW_MAX: f32 = 0.7;
83pub const CONSULTED_RECENT_TTL_SECS: u64 = 900;
85pub const MAX_STALE_REVIEW_ENTRIES: usize = 20;
87pub const GOTCHA_PROMOTION_ACCESS_THRESHOLD: u32 = 3;
89
90pub async fn upsert_daily_agg(store: &Store, agg_key: &str, target_key: &str) -> Result<()> {
91 let now = now_secs();
92
93 match store.get(agg_key).await? {
94 Some(mut record) => {
95 let mut agg: DailyAgg = record.payload_as::<DailyAgg>().unwrap_or(DailyAgg {
96 count: 0,
97 keys: vec![],
98 });
99 agg.count += 1;
100 if agg.keys.len() < MAX_AGG_KEYS && !agg.keys.iter().any(|k| k == target_key) {
101 agg.keys.push(target_key.to_string());
102 }
103 record.payload = serde_json::to_value(&agg).ok();
104 record.updated_at = now;
105 record.version.logical_clock += 1;
106 record.version.wall_clock = now;
107 store.put(agg_key, &record).await?;
108 }
109 None => {
110 let agg = DailyAgg {
111 count: 1,
112 keys: vec![target_key.to_string()],
113 };
114 let mut record = analytics_record(agg_key, String::new());
115 record.payload = serde_json::to_value(&agg).ok();
116 store.put(agg_key, &record).await?;
117 }
118 }
119
120 Ok(())
121}
122
123pub async fn upsert_daily_agg_staged(
129 store: &Store,
130 agg_key: &str,
131 target_key: &str,
132) -> Result<(String, Vec<u8>)> {
133 let now = now_secs();
134
135 let record = match store.get(agg_key).await? {
136 Some(mut record) => {
137 let mut agg: DailyAgg = record.payload_as::<DailyAgg>().unwrap_or(DailyAgg {
138 count: 0,
139 keys: vec![],
140 });
141 agg.count += 1;
142 if agg.keys.len() < MAX_AGG_KEYS && !agg.keys.iter().any(|k| k == target_key) {
143 agg.keys.push(target_key.to_string());
144 }
145 record.payload = serde_json::to_value(&agg).ok();
146 record.updated_at = now;
147 record.version.logical_clock += 1;
148 record.version.wall_clock = now;
149 record
150 }
151 None => {
152 let agg = DailyAgg {
153 count: 1,
154 keys: vec![target_key.to_string()],
155 };
156 let mut record = analytics_record(agg_key, String::new());
157 record.payload = serde_json::to_value(&agg).ok();
158 record
159 }
160 };
161
162 let bytes = rmp_serde::to_vec_named(&record)
163 .with_context(|| format!("failed to serialize agg record for {agg_key}"))?;
164 Ok((agg_key.to_string(), bytes))
165}
166
167pub fn consultation_receipt_staged(key: &str) -> Result<(String, Vec<u8>)> {
171 let consulted_key = format!("session:consulted:{key}");
172 let record = session_record(&consulted_key, String::new());
173 let bytes = rmp_serde::to_vec_named(&record)
174 .with_context(|| format!("failed to serialize consulted receipt for {consulted_key}"))?;
175 Ok((consulted_key, bytes))
176}
177
178pub async fn session_flush_staged(store: &Store) -> Result<Option<(String, Vec<u8>)>> {
182 let now = now_secs();
183 let consulted_keys = store.scan_keys("session:consulted:").await?;
184 let stripped: Vec<String> = consulted_keys
185 .iter()
186 .map(|k| {
187 k.strip_prefix("session:consulted:")
188 .unwrap_or(k)
189 .to_string()
190 })
191 .collect();
192
193 let session_data = serde_json::json!({
194 "consulted_keys": stripped,
195 "flushed_at": now,
196 });
197 let mut rec = session_record("session:current", String::new());
198 rec.payload = Some(session_data);
199 let bytes = rmp_serde::to_vec_named(&rec)?;
200 Ok(Some(("session:current".to_string(), bytes)))
201}
202
203pub async fn log_hit(store: &Store, key: &str) -> Result<()> {
207 let now = now_secs();
208
209 let agg_key = today_key("analytics:hit_");
211 upsert_daily_agg(store, &agg_key, key).await?;
212
213 let consulted_key = format!("session:consulted:{key}");
215 store
216 .put(
217 &consulted_key,
218 &session_record(&consulted_key, String::new()),
219 )
220 .await?;
221
222 if let Some(mut record) = store.get(key).await? {
224 record.access_count += 1;
225 record.last_accessed = now;
226 store.put(key, &record).await?;
227 }
228
229 let _ = crate::store::enforcement::record_event(
238 store,
239 crate::store::enforcement::EnforcementEventType::ReceiptMinted,
240 crate::store::enforcement::SubjectKind::File,
241 key.to_string(),
242 "claude".to_string(),
243 None,
244 "consultation_requested".to_string(),
245 None,
246 )
247 .await;
248
249 Ok(())
250}
251
252pub async fn log_miss(store: &Store, key: &str) -> Result<()> {
256 let agg_key = today_key("analytics:miss_");
257 upsert_daily_agg(store, &agg_key, key).await
258}
259
260pub async fn log_compliance_miss(store: &Store, key: &str) -> Result<()> {
264 let agg_key = today_key("compliance:miss_");
265 upsert_daily_agg(store, &agg_key, key).await
266}
267
268pub async fn log_compliance_hit(store: &Store, key: &str) -> Result<()> {
272 let agg_key = today_key("compliance:allow_after_receipt_");
273 upsert_daily_agg(store, &agg_key, key).await
274}
275
276pub async fn log_codex_shell_miss(store: &Store, key: &str) -> Result<()> {
278 let agg_key = today_key("compliance:codex_shell_miss_");
279 upsert_daily_agg(store, &agg_key, key).await
280}
281
282pub async fn log_prompt_nudge(store: &Store, key: &str) -> Result<()> {
284 let agg_key = today_key("analytics:codex_prompt_nudge_");
285 upsert_daily_agg(store, &agg_key, key).await
286}
287
288pub async fn log_bootstrap(store: &Store, key: &str) -> Result<()> {
290 let agg_key = today_key("analytics:bootstrap_");
291 upsert_daily_agg(store, &agg_key, key).await
292}
293
294pub async fn check_consulted(store: &Store, key: &str) -> Result<bool> {
298 let consulted_key = format!("session:consulted:{key}");
299 Ok(store.get(&consulted_key).await?.is_some())
300}
301
302pub async fn check_consulted_recent(store: &Store, key: &str, ttl_secs: u64) -> Result<bool> {
304 let consulted_key = format!("session:consulted:{key}");
305 let Some(record) = store.get(&consulted_key).await? else {
306 return Ok(false);
307 };
308 let age = now_secs().saturating_sub(record.updated_at);
309 Ok(age <= ttl_secs)
310}
311
312pub async fn session_flush(store: &Store) -> Result<()> {
316 let now = now_secs();
317
318 let consulted_keys = store.scan_keys("session:consulted:").await?;
319 let stripped: Vec<String> = consulted_keys
320 .iter()
321 .map(|k| {
322 k.strip_prefix("session:consulted:")
323 .unwrap_or(k)
324 .to_string()
325 })
326 .collect();
327
328 let session_data = serde_json::json!({
329 "consulted_keys": stripped,
330 "flushed_at": now,
331 });
332 let mut rec = session_record("session:current", String::new());
333 rec.payload = Some(session_data);
334 store.put("session:current", &rec).await?;
335 Ok(())
336}
337
338pub async fn session_harvest(store: &Store, cwd: &Path) -> Result<()> {
346 let now = now_secs();
347
348 match promote_gotcha_candidates(store).await {
350 Ok(n) if n > 0 => tracing::info!(promoted = n, "gotcha candidates auto-promoted"),
351 Ok(_) => {}
352 Err(e) => tracing::warn!(error = %e, "gotcha promotion failed"),
353 }
354
355 match StalenessAnalyzer::new(cwd).analyze_all(store).await {
357 Ok(report) if report.updated > 0 => {
358 tracing::info!(
359 scanned = report.scanned,
360 updated = report.updated,
361 tombstoned = report.tombstoned,
362 liability = report.liability,
363 "staleness analysis complete"
364 );
365 }
366 Ok(_) => {}
367 Err(e) => tracing::warn!(error = %e, "staleness analysis failed"),
368 }
369
370 let session_rec = match store.get("session:current").await? {
372 Some(r) => r,
373 None => return Ok(()),
374 };
375
376 let session_value = match session_rec.payload.as_ref() {
377 Some(p) => serde_json::to_string(p).unwrap_or_default(),
378 None => session_rec.value.clone(),
379 };
380
381 match collect_and_store_stale_reviews(store, &session_value, now).await {
383 Ok(n) if n > 0 => tracing::info!(entries = n, "stale review entries collected"),
384 Ok(_) => {}
385 Err(e) => tracing::warn!(error = %e, "stale review collection failed"),
386 }
387
388 let session_key = format!("session:{now}");
390 let mut perm = session_record(&session_key, session_value);
391 perm.payload = session_rec.payload;
392 store.put(&session_key, &perm).await?;
393
394 let consulted_keys = store.scan_keys("session:consulted:").await?;
396 for k in &consulted_keys {
397 store.delete(k).await?;
398 }
399
400 if let Some(mut stage) = store.get("stage:current").await? {
402 stage.updated_at = now;
403 stage.version.logical_clock += 1;
404 stage.version.wall_clock = now;
405 let base = stage
406 .value
407 .lines()
408 .filter(|l| !l.starts_with("last_session:"))
409 .collect::<Vec<_>>()
410 .join("\n");
411 stage.value = if base.is_empty() {
412 format!("last_session: {session_key}")
413 } else {
414 format!("{base}\nlast_session: {session_key}")
415 };
416 store.put("stage:current", &stage).await?;
417 }
418
419 Ok(())
420}
421
422pub async fn session_harvest_no_staleness(store: &Store) -> Result<()> {
428 let now = now_secs();
429
430 match promote_gotcha_candidates(store).await {
432 Ok(n) if n > 0 => tracing::info!(promoted = n, "gotcha candidates auto-promoted"),
433 Ok(_) => {}
434 Err(e) => tracing::warn!(error = %e, "gotcha promotion failed"),
435 }
436
437 let session_rec = match store.get("session:current").await? {
439 Some(r) => r,
440 None => return Ok(()),
441 };
442
443 let session_value = match session_rec.payload.as_ref() {
444 Some(p) => serde_json::to_string(p).unwrap_or_default(),
445 None => session_rec.value.clone(),
446 };
447
448 match collect_and_store_stale_reviews(store, &session_value, now).await {
450 Ok(n) if n > 0 => tracing::info!(entries = n, "stale review entries collected"),
451 Ok(_) => {}
452 Err(e) => tracing::warn!(error = %e, "stale review collection failed"),
453 }
454
455 let session_key = format!("session:{now}");
457 let mut perm = session_record(&session_key, session_value);
458 perm.payload = session_rec.payload;
459 store.put(&session_key, &perm).await?;
460
461 let consulted_keys = store.scan_keys("session:consulted:").await?;
463 for k in &consulted_keys {
464 store.delete(k).await?;
465 }
466
467 if let Some(mut stage) = store.get("stage:current").await? {
469 stage.updated_at = now;
470 stage.version.logical_clock += 1;
471 stage.version.wall_clock = now;
472 let base = stage
473 .value
474 .lines()
475 .filter(|l| !l.starts_with("last_session:"))
476 .collect::<Vec<_>>()
477 .join("\n");
478 stage.value = if base.is_empty() {
479 format!("last_session: {session_key}")
480 } else {
481 format!("{base}\nlast_session: {session_key}")
482 };
483 store.put("stage:current", &stage).await?;
484 }
485
486 Ok(())
487}
488
489pub async fn doc_capture(store: &Store, path: &str, content: &str) -> Result<()> {
496 let purpose = extract_doc_comment(path, content);
497 if purpose.is_empty() {
498 return Ok(());
499 }
500
501 let file_key = format!("file:{path}");
502 let mut record = match store.get(&file_key).await? {
503 Some(r) => r,
504 None => return Ok(()),
505 };
506
507 if record.source != RecordSource::StaticAnalysis {
510 return Ok(());
511 }
512
513 if let Some(mut fr) = record.payload_as::<FileRecord>() {
514 fr.purpose = purpose.clone();
515 record.payload = serde_json::to_value(&fr).ok();
516 } else {
517 return Ok(());
518 }
519
520 let now = now_secs();
521 record.value = purpose;
522 record.source = RecordSource::SessionHook;
523 record.confidence.value = 0.65;
524 record.quality = QualityScore::doc_comment_default();
525 record.updated_at = now;
526 record.version.logical_clock += 1;
527 record.version.wall_clock = now;
528
529 if let Err(e) = store.put(&file_key, &record).await {
530 tracing::warn!(path, "doc-capture put failed: {e}");
531 }
532 Ok(())
533}
534
535pub fn extract_doc_comment(path: &str, content: &str) -> String {
538 let ext = std::path::Path::new(path)
539 .extension()
540 .and_then(|e| e.to_str())
541 .unwrap_or("");
542
543 match ext {
544 "rs" => extract_rust_module_doc(content),
545 "py" => extract_python_docstring(content),
546 "go" => extract_go_package_doc_comment(content),
547 "ts" | "tsx" | "js" | "jsx" | "mjs" | "cjs" => extract_jsdoc(content),
548 _ => String::new(),
549 }
550}
551
552fn extract_rust_module_doc(content: &str) -> String {
553 let lines: Vec<&str> = content
554 .lines()
555 .take_while(|l| l.trim_start().starts_with("//!"))
556 .map(|l| l.trim_start().trim_start_matches("//!").trim())
557 .collect();
558 lines.join(" ").trim().to_string()
559}
560
561fn extract_python_docstring(content: &str) -> String {
562 let trimmed = content.trim_start();
563 for delim in &[r#"""""#, "'''"] {
564 if let Some(rest) = trimmed.strip_prefix(delim) {
565 if let Some(end) = rest.find(delim) {
566 return rest[..end]
567 .trim()
568 .lines()
569 .next()
570 .unwrap_or("")
571 .trim()
572 .to_string();
573 }
574 }
575 }
576 String::new()
577}
578
579fn extract_go_package_doc_comment(content: &str) -> String {
580 let mut lines: Vec<String> = Vec::new();
581 for line in content.lines() {
582 let t = line.trim();
583 if t.starts_with("//") {
584 lines.push(t.trim_start_matches("//").trim().to_string());
585 } else if t.starts_with("package ") {
586 break;
587 } else if !t.is_empty() {
588 lines.clear();
589 }
590 }
591 lines.join(" ").trim().to_string()
592}
593
594fn extract_jsdoc(content: &str) -> String {
595 let trimmed = content.trim_start();
596 if let Some(rest) = trimmed.strip_prefix("/**") {
597 if let Some(end) = rest.find("*/") {
598 let text: Vec<&str> = rest[..end]
599 .lines()
600 .map(|l| l.trim().trim_start_matches('*').trim())
601 .filter(|l| !l.is_empty())
602 .collect();
603 return text.join(" ").trim().to_string();
604 }
605 }
606 String::new()
607}
608
609pub async fn promote_gotcha_candidates(store: &Store) -> Result<u32> {
612 let gotchas = store.scan_prefix("gotcha:").await?;
613 let now = now_secs();
614 let mut promoted = 0u32;
615
616 for mut record in gotchas {
617 if record.access_count < GOTCHA_PROMOTION_ACCESS_THRESHOLD {
618 continue;
619 }
620 let mut gotcha: GotchaRecord = match record.payload_as::<GotchaRecord>() {
621 Some(g) => g,
622 None => continue,
623 };
624 if gotcha.confirmed {
625 continue;
626 }
627 gotcha.confirmed = true;
628 record.payload = serde_json::to_value(&gotcha).ok();
629 record.confidence.confirmation_count += 1;
632 record.updated_at = now;
633 record.version.logical_clock += 1;
634 record.version.wall_clock = now;
635 store.put(&record.key, &record).await?;
636 promoted += 1;
637 }
638
639 Ok(promoted)
640}
641
642pub fn format_review_date(now_secs: u64) -> String {
645 let dt = chrono::DateTime::from_timestamp(now_secs as i64, 0).unwrap_or_else(chrono::Utc::now);
646 dt.format("%Y-%m-%d").to_string()
647}
648
649pub async fn collect_and_store_stale_reviews(
650 store: &Store,
651 session_value: &str,
652 now: u64,
653) -> Result<usize> {
654 let session: serde_json::Value = serde_json::from_str(session_value)?;
655 let consulted_keys = match session["consulted_keys"].as_array() {
656 Some(arr) => arr
657 .iter()
658 .filter_map(|v| v.as_str().map(|s| s.to_string()))
659 .collect::<Vec<_>>(),
660 None => return Ok(0),
661 };
662 if consulted_keys.is_empty() {
663 return Ok(0);
664 }
665
666 let new_entries = collect_stale_entries(store, &consulted_keys).await?;
667 if new_entries.is_empty() {
668 return Ok(0);
669 }
670
671 let date = format_review_date(now);
672 let review_key = format!("analytics:stale_review_{date}");
673 let new_count = new_entries.len();
674
675 let mut payload = match store.get(&review_key).await? {
676 Some(existing) => {
677 existing
678 .payload_as::<StaleReviewPayload>()
679 .unwrap_or(StaleReviewPayload {
680 session_timestamp: now,
681 entries: vec![],
682 })
683 }
684 None => StaleReviewPayload {
685 session_timestamp: now,
686 entries: vec![],
687 },
688 };
689
690 let mut seen_keys = std::collections::HashSet::new();
692 let mut merged = Vec::new();
693 for entry in new_entries {
694 if seen_keys.insert(entry.key.clone()) {
695 merged.push(entry);
696 }
697 }
698 for entry in payload.entries {
699 if seen_keys.insert(entry.key.clone()) {
700 merged.push(entry);
701 }
702 }
703
704 merged.sort_by(|a, b| {
706 b.staleness_value
707 .partial_cmp(&a.staleness_value)
708 .unwrap_or(std::cmp::Ordering::Equal)
709 });
710 merged.truncate(MAX_STALE_REVIEW_ENTRIES);
711
712 payload.session_timestamp = now;
713 payload.entries = merged;
714
715 let mut record = analytics_record(&review_key, String::new());
716 record.payload = serde_json::to_value(&payload).ok();
717 store.put(&review_key, &record).await?;
718
719 Ok(new_count)
720}
721
722pub async fn collect_stale_entries(
723 store: &Store,
724 consulted_keys: &[String],
725) -> Result<Vec<StaleReviewEntry>> {
726 let mut entries = Vec::new();
727
728 for key in consulted_keys {
729 let record = match store.get(key).await? {
730 Some(r) => r,
731 None => continue,
732 };
733
734 if !matches!(record.lifecycle, RecordLifecycle::Active) {
736 continue;
737 }
738
739 if matches!(
741 record.staleness.tier,
742 StalenessTier::Liability | StalenessTier::Tombstone
743 ) {
744 continue;
745 }
746
747 if record.staleness.value < STALE_REVIEW_MIN || record.staleness.value >= STALE_REVIEW_MAX {
749 continue;
750 }
751
752 let top_signals: Vec<String> = record
753 .staleness
754 .signals
755 .iter()
756 .take(3)
757 .map(|s| s.to_string())
758 .collect();
759
760 entries.push(StaleReviewEntry {
761 key: key.clone(),
762 staleness_value: record.staleness.value,
763 tier: record.staleness.tier.clone(),
764 last_updated: record.updated_at,
765 signals: top_signals,
766 });
767 }
768
769 entries.sort_by(|a, b| {
770 b.staleness_value
771 .partial_cmp(&a.staleness_value)
772 .unwrap_or(std::cmp::Ordering::Equal)
773 });
774 entries.truncate(MAX_STALE_REVIEW_ENTRIES);
775
776 Ok(entries)
777}
778
779#[cfg(test)]
780mod tests {
781 use tempfile::TempDir;
782
783 use super::*;
784
785 async fn temp_store() -> (TempDir, Store) {
786 let dir = TempDir::new().expect("tempdir");
787 let store = Store::open(dir.path()).await.expect("open store");
788 (dir, store)
789 }
790
791 #[tokio::test]
792 async fn log_bootstrap_creates_daily_aggregate() {
793 let (_dir, store) = temp_store().await;
794
795 log_bootstrap(&store, "__bootstrap__")
796 .await
797 .expect("log bootstrap");
798
799 let key = today_key("analytics:bootstrap_");
800 let record = store
801 .get(&key)
802 .await
803 .expect("get bootstrap aggregate")
804 .expect("bootstrap record exists");
805 let agg = record.payload_as::<DailyAgg>().expect("daily agg payload");
806 assert_eq!(agg.count, 1);
807 assert_eq!(agg.keys, vec!["__bootstrap__".to_string()]);
808 }
809
810 #[tokio::test]
811 async fn check_consulted_recent_uses_receipt_ttl() {
812 let (_dir, store) = temp_store().await;
813 let key = "file:src/main.rs";
814
815 assert!(!check_consulted_recent(&store, key, 900)
816 .await
817 .expect("no receipt yet"));
818
819 log_hit(&store, key).await.expect("log consultation hit");
820
821 assert!(check_consulted_recent(&store, key, 900)
822 .await
823 .expect("fresh receipt should be valid"));
824 }
825}