Skip to main content

rust_memex/
diagnostics.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use anyhow::Result;
4use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, TimeZone, Utc};
5use memex_contracts::{
6    audit::{AuditRecommendation, AuditResult},
7    stats::{DatabaseStats, NamespaceStats},
8    timeline::TimelineEntry,
9};
10use serde::{Deserialize, Serialize};
11
12use crate::storage::ChromaDocument;
13use crate::{IntegrityRecommendation, SliceLayer, StorageManager, TextIntegrityMetrics};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum KeepStrategy {
17    /// Keep the document with the earliest ID (lexicographic).
18    Oldest,
19    /// Keep the document with the latest ID (lexicographic).
20    Newest,
21    /// Keep the first document returned from storage iteration.
22    HighestScore,
23}
24
25impl From<&str> for KeepStrategy {
26    fn from(value: &str) -> Self {
27        match value {
28            "newest" => Self::Newest,
29            "highest-score" => Self::HighestScore,
30            _ => Self::Oldest,
31        }
32    }
33}
34
35/// How to group chunks when looking for duplicates.
36///
37/// After the v4 schema (`source_hash` + per-chunk `content_hash`) the legacy
38/// `content_hash`-only grouping is broken for onion namespaces: every onion
39/// layer of the same source has a unique chunk hash, so naive grouping reports
40/// zero duplicates. Spec `2026-04-27_kb-transcripts-onion-slicer-fix-spec.md`,
41/// P4 fixes this by making `(source_hash, layer)` the default.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
43#[serde(rename_all = "kebab-case")]
44pub enum DedupGroupBy {
45    /// `(source_hash, layer)` — preserves onion structure: keeps one chunk
46    /// per layer per source, removing only true repeats (e.g. `__dupe__` +
47    /// `__clean__` variants of the same file). This is the post-v4 default.
48    #[default]
49    SourceHashLayer,
50    /// `source_hash` alone — collapses all layers of one source into a single
51    /// group. Useful when callers already plan to re-slice and only want to
52    /// deduplicate at the source-document level.
53    SourceHash,
54    /// `content_hash` (per-chunk text SHA256) — legacy v3 grouping. After P0
55    /// every chunk is unique, so this finds duplicates only when two chunk
56    /// texts are byte-identical. Kept as opt-in for force-reindex edge cases.
57    ContentHash,
58}
59
60impl DedupGroupBy {
61    /// Parse the CLI / HTTP string form. Unknown values fall through to the
62    /// default (`source-hash-layer`) so older clients keep working safely.
63    pub fn parse(value: &str) -> Self {
64        match value {
65            "content-hash" | "content_hash" => Self::ContentHash,
66            "source-hash" | "source_hash" => Self::SourceHash,
67            _ => Self::SourceHashLayer,
68        }
69    }
70
71    /// Stable string label for logging / CLI display.
72    pub fn label(self) -> &'static str {
73        match self {
74            Self::SourceHashLayer => "source-hash-layer",
75            Self::SourceHash => "source-hash",
76            Self::ContentHash => "content-hash",
77        }
78    }
79}
80
81impl From<&str> for DedupGroupBy {
82    fn from(value: &str) -> Self {
83        Self::parse(value)
84    }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct DedupDuplicate {
89    pub id: String,
90    pub namespace: String,
91}
92
93/// One duplicate cluster.
94///
95/// `content_hash` keeps its legacy field name for wire-compat with older
96/// callers. Its semantic is now "the value of the grouping key" — for the
97/// post-v4 default this is `<source_hash>:layer<N>`, for `SourceHash` it is
98/// the source hash alone, and for the legacy `ContentHash` mode it is the
99/// per-chunk text hash. The new `group_key` field carries the same value with
100/// a clearer name; new clients should prefer it.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct DedupGroup {
103    pub content_hash: String,
104    #[serde(default)]
105    pub group_key: String,
106    pub kept_id: String,
107    pub kept_namespace: String,
108    pub removed: Vec<DedupDuplicate>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct DedupResult {
113    pub total_docs: usize,
114    pub unique_docs: usize,
115    pub duplicate_groups: usize,
116    pub duplicates_removed: usize,
117    pub docs_without_hash: usize,
118    /// Strategy used to bucket chunks into duplicate groups.
119    /// Defaults to `SourceHashLayer` so legacy serialized payloads keep
120    /// deserializing into the spec-default shape.
121    #[serde(default)]
122    pub group_by: DedupGroupBy,
123    pub groups: Vec<DedupGroup>,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct PurgeQualityCandidate {
128    pub namespace: String,
129    pub quality_score: f32,
130    pub document_count: usize,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct PurgeQualityResult {
135    pub namespace_filter: Option<String>,
136    pub threshold: u8,
137    pub dry_run: bool,
138    pub purged_namespaces: usize,
139    pub candidates: Vec<PurgeQualityCandidate>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct TimelineCoverage {
144    pub earliest: Option<String>,
145    pub latest: Option<String>,
146    pub total_days: usize,
147    pub days_with_data: usize,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct TimelineReport {
152    pub namespaces: Vec<String>,
153    pub entries: Vec<TimelineEntry>,
154    pub coverage: TimelineCoverage,
155    pub gaps: Vec<String>,
156}
157
158#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
159#[serde(rename_all = "snake_case")]
160pub enum TimelineBucket {
161    #[default]
162    Day,
163    Hour,
164}
165
166impl TimelineBucket {
167    pub fn parse(value: &str) -> Self {
168        match value {
169            "hour" => Self::Hour,
170            _ => Self::Day,
171        }
172    }
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, Default)]
176pub struct TimelineQuery {
177    pub namespace: Option<String>,
178    pub since: Option<String>,
179    pub until: Option<String>,
180    pub bucket: TimelineBucket,
181}
182
183pub async fn audit_namespaces(
184    storage: &StorageManager,
185    namespace: Option<&str>,
186    threshold: u8,
187) -> Result<Vec<AuditResult>> {
188    let namespaces: Vec<String> = if let Some(ns) = namespace {
189        vec![ns.to_string()]
190    } else {
191        storage
192            .list_namespaces()
193            .await?
194            .into_iter()
195            .map(|(name, _count)| name)
196            .collect()
197    };
198
199    let threshold_f32 = threshold as f32 / 100.0;
200    let mut results = Vec::with_capacity(namespaces.len());
201
202    for ns in namespaces {
203        let docs = storage.get_all_in_namespace(&ns).await?;
204        if docs.is_empty() {
205            results.push(AuditResult {
206                namespace: ns,
207                document_count: 0,
208                avg_chunk_length: 0,
209                sentence_integrity: 0.0,
210                word_integrity: 0.0,
211                chunk_quality: 0.0,
212                overall_score: 0.0,
213                recommendation: AuditRecommendation::Empty,
214                passes_threshold: false,
215            });
216            continue;
217        }
218
219        let chunks: Vec<String> = docs.iter().map(|doc| doc.document.clone()).collect();
220        let combined_text = chunks.join(" ");
221        let metrics = TextIntegrityMetrics::compute(&combined_text, &chunks);
222
223        results.push(AuditResult {
224            namespace: ns,
225            document_count: docs.len(),
226            avg_chunk_length: metrics.avg_chunk_length,
227            sentence_integrity: metrics.sentence_integrity,
228            word_integrity: metrics.word_integrity,
229            chunk_quality: metrics.chunk_quality,
230            overall_score: metrics.overall,
231            recommendation: integrity_recommendation(metrics.recommendation()),
232            passes_threshold: metrics.overall >= threshold_f32,
233        });
234    }
235
236    Ok(results)
237}
238
239pub async fn purge_quality_namespaces(
240    storage: &StorageManager,
241    namespace: Option<&str>,
242    threshold: u8,
243    dry_run: bool,
244) -> Result<PurgeQualityResult> {
245    let candidates = audit_namespaces(storage, namespace, threshold)
246        .await?
247        .into_iter()
248        .filter(|result| !result.passes_threshold)
249        .map(|result| PurgeQualityCandidate {
250            namespace: result.namespace,
251            quality_score: result.overall_score,
252            document_count: result.document_count,
253        })
254        .collect::<Vec<_>>();
255
256    let mut purged_namespaces = 0usize;
257    if !dry_run {
258        for candidate in &candidates {
259            storage
260                .delete_namespace_documents(&candidate.namespace)
261                .await?;
262            purged_namespaces += 1;
263        }
264    }
265
266    Ok(PurgeQualityResult {
267        namespace_filter: namespace.map(ToOwned::to_owned),
268        threshold,
269        dry_run,
270        purged_namespaces,
271        candidates,
272    })
273}
274
275pub async fn deduplicate_documents(
276    storage: &StorageManager,
277    namespace: Option<&str>,
278    dry_run: bool,
279    keep_strategy: KeepStrategy,
280    cross_namespace: bool,
281    group_by: DedupGroupBy,
282) -> Result<DedupResult> {
283    let all_docs = storage.all_documents(namespace, 1_000_000).await?;
284
285    let mut hash_groups: HashMap<String, Vec<_>> = HashMap::new();
286    let mut docs_without_hash = 0usize;
287
288    for doc in &all_docs {
289        // Build the bucket key per requested strategy. A doc is "without hash"
290        // if the strategy's required field is empty for this row — the caller
291        // can then decide whether to backfill or fall back.
292        let raw_key: Option<String> = match group_by {
293            DedupGroupBy::ContentHash => doc
294                .content_hash
295                .as_deref()
296                .filter(|hash| !hash.is_empty())
297                .map(ToOwned::to_owned),
298            DedupGroupBy::SourceHash => doc
299                .source_hash
300                .as_deref()
301                .filter(|hash| !hash.is_empty())
302                .map(ToOwned::to_owned),
303            DedupGroupBy::SourceHashLayer => doc
304                .source_hash
305                .as_deref()
306                .filter(|hash| !hash.is_empty())
307                .map(|hash| format!("{}|layer{}", hash, doc.layer)),
308        };
309
310        let Some(key) = raw_key else {
311            docs_without_hash += 1;
312            continue;
313        };
314
315        let scoped_key = if cross_namespace {
316            key
317        } else {
318            format!("{}:{}", doc.namespace, key)
319        };
320        hash_groups.entry(scoped_key).or_default().push(doc);
321    }
322
323    let mut result = DedupResult {
324        total_docs: all_docs.len(),
325        unique_docs: 0,
326        duplicate_groups: 0,
327        duplicates_removed: 0,
328        docs_without_hash,
329        group_by,
330        groups: Vec::new(),
331    };
332
333    for (key, mut docs) in hash_groups {
334        if docs.len() == 1 {
335            result.unique_docs += 1;
336            continue;
337        }
338
339        match keep_strategy {
340            KeepStrategy::Oldest => docs.sort_by(|left, right| left.id.cmp(&right.id)),
341            KeepStrategy::Newest => docs.sort_by(|left, right| right.id.cmp(&left.id)),
342            KeepStrategy::HighestScore => {}
343        }
344
345        let kept = docs[0];
346        let removed_docs = docs.into_iter().skip(1).collect::<Vec<_>>();
347
348        if !dry_run {
349            for doc in &removed_docs {
350                storage.delete_document(&doc.namespace, &doc.id).await?;
351            }
352        }
353
354        // Strip the namespace prefix when reporting the group key so consumers
355        // see the strategy-native value (e.g. `<source_hash>|layer3`).
356        let display_key = if cross_namespace {
357            key.clone()
358        } else {
359            key.split_once(':')
360                .map(|(_ns, rest)| rest.to_string())
361                .unwrap_or_else(|| key.clone())
362        };
363
364        result.unique_docs += 1;
365        result.duplicate_groups += 1;
366        result.duplicates_removed += removed_docs.len();
367        result.groups.push(DedupGroup {
368            content_hash: display_key.clone(),
369            group_key: display_key,
370            kept_id: kept.id.clone(),
371            kept_namespace: kept.namespace.clone(),
372            removed: removed_docs
373                .iter()
374                .map(|doc| DedupDuplicate {
375                    id: doc.id.clone(),
376                    namespace: doc.namespace.clone(),
377                })
378                .collect(),
379        });
380    }
381
382    Ok(result)
383}
384
385/// Result of a backfill-hashes pass.
386///
387/// `content_hash` is per-chunk SHA256 (semantics introduced in schema v4).
388/// `source_hash` is the SHA256 of the source document text — same value across
389/// all four onion layers from one source. Pre-v4 namespaces stored the source
390/// hash in `content_hash`; this backfill (a) re-derives a true per-chunk
391/// `content_hash`, and (b) preserves the legacy hash as `source_hash` so
392/// pre-index dedup works without re-reading source files.
393///
394/// Spec: `2026-04-27_kb-transcripts-onion-slicer-fix-spec.md`, P0 backfill.
395#[derive(Debug, Clone, Serialize, Deserialize, Default)]
396pub struct BackfillHashesResult {
397    /// Total documents inspected in the namespace (or across all namespaces).
398    pub total_docs: usize,
399    /// Documents that needed a `content_hash` write because the column was
400    /// either empty or stored the source hash (pre-v4 schema).
401    pub content_hash_backfilled: usize,
402    /// Documents that needed a `source_hash` write because the column was
403    /// empty (pre-v4 schema, or v4 chunk written before the field was wired
404    /// up end-to-end).
405    pub source_hash_backfilled: usize,
406    /// Documents skipped because they already have correct per-chunk
407    /// `content_hash` AND a populated `source_hash`.
408    pub already_consistent: usize,
409    /// Documents that could not be backfilled because the embedding column
410    /// was missing or zero-length (extremely unlikely; logged in storage warn).
411    pub skipped_no_embedding: usize,
412    /// `true` when the caller asked for a dry run (no writes performed).
413    pub dry_run: bool,
414}
415
416/// Recompute per-chunk `content_hash` and recover `source_hash` for legacy
417/// chunks. Safe to run repeatedly; chunks that already match the v4 contract
418/// are counted under `already_consistent` and left alone.
419///
420/// Strategy:
421/// 1. For each chunk, recompute `chunk_hash = SHA256(document_text)`.
422/// 2. If the stored `content_hash` differs from `chunk_hash`, the row is
423///    pre-v4 — its `content_hash` was actually the source hash. Move it into
424///    `source_hash` (when empty) before overwriting `content_hash`.
425/// 3. If `source_hash` is still empty after step 2, fall back to copying the
426///    new `content_hash` so dedup has *something* to key on (better than
427///    nothing — operators can re-index from source for true provenance).
428/// 4. Re-write the row by deleting + inserting (LanceDB has no per-row update
429///    that accepts a fixed-size vector update for our schema).
430///
431fn fmt_duration(secs: f64) -> String {
432    if secs > 3600.0 {
433        format!("{:.0}h{:02.0}m", secs / 3600.0, (secs % 3600.0) / 60.0)
434    } else if secs > 60.0 {
435        format!("{:.0}m{:02.0}s", secs / 60.0, secs % 60.0)
436    } else {
437        format!("{:.0}s", secs)
438    }
439}
440
441const SPINNER: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
442
443fn emit_backfill_progress(
444    processed: usize,
445    total: usize,
446    started: &std::time::Instant,
447    last_report: &mut std::time::Instant,
448) {
449    if total == 0 || last_report.elapsed().as_secs() < 10 {
450        return;
451    }
452    *last_report = std::time::Instant::now();
453    let pct = (processed as f64 / total as f64 * 100.0).min(100.0);
454    let elapsed = started.elapsed().as_secs_f64();
455    let rate = processed as f64 / elapsed;
456    let eta = if rate > 0.0 {
457        (total - processed) as f64 / rate
458    } else {
459        0.0
460    };
461    let tick = (elapsed as usize / 10) % SPINNER.len();
462    eprint!(
463        "\r  {} [{:>6}/{:>6}] {:5.1}%  {:.0} docs/s  ETA {}   ",
464        SPINNER[tick],
465        processed,
466        total,
467        pct,
468        rate,
469        fmt_duration(eta)
470    );
471}
472
473/// Spec: `2026-04-27_kb-transcripts-onion-slicer-fix-spec.md`, P0 backfill.
474pub async fn backfill_chunk_and_source_hashes(
475    storage: &StorageManager,
476    namespace: Option<&str>,
477    dry_run: bool,
478) -> Result<BackfillHashesResult> {
479    let mut result = BackfillHashesResult {
480        dry_run,
481        ..Default::default()
482    };
483
484    // Pre-count total documents for progress reporting.
485    let total_count = match namespace {
486        Some(ns) => storage.count_namespace(ns).await.unwrap_or(0),
487        None => storage.stats().await.map(|s| s.row_count).unwrap_or(0),
488    };
489
490    const PAGE: usize = 5_000;
491    let mut offset = 0;
492    let mut processed = 0usize;
493    let started = std::time::Instant::now();
494    let mut last_report = started;
495
496    loop {
497        let page = storage.all_documents_page(namespace, offset, PAGE).await?;
498        let page_len = page.len();
499        if page_len == 0 {
500            break;
501        }
502        result.total_docs += page_len;
503
504        for doc in &page {
505            processed += 1;
506
507            if doc.embedding.is_empty() {
508                result.skipped_no_embedding += 1;
509                emit_backfill_progress(processed, total_count, &started, &mut last_report);
510                continue;
511            }
512
513            let true_chunk_hash = crate::rag::compute_content_hash(&doc.document);
514            let mut needs_content = false;
515            let mut needs_source = false;
516
517            let new_content_hash = match doc.content_hash.as_deref() {
518                Some(stored) if stored == true_chunk_hash => stored.to_string(),
519                Some(_) => {
520                    needs_content = true;
521                    true_chunk_hash.clone()
522                }
523                None => {
524                    needs_content = true;
525                    true_chunk_hash.clone()
526                }
527            };
528
529            // Recover source_hash. Pre-v4 chunks stored the source hash under
530            // `content_hash`; if that was the case, prefer it over the freshly
531            // computed chunk hash. Otherwise fall back to chunk_hash so dedup
532            // has a key (better than `None`).
533            let recovered_source_hash = match (&doc.source_hash, &doc.content_hash) {
534                (Some(s), _) if !s.is_empty() => s.clone(),
535                (_, Some(legacy)) if legacy.as_str() != true_chunk_hash.as_str() => {
536                    // Legacy content_hash was actually the source hash.
537                    needs_source = true;
538                    legacy.clone()
539                }
540                _ => {
541                    needs_source = doc.source_hash.is_none();
542                    new_content_hash.clone()
543                }
544            };
545
546            if !needs_content && !needs_source {
547                result.already_consistent += 1;
548                emit_backfill_progress(processed, total_count, &started, &mut last_report);
549                continue;
550            }
551
552            if needs_content {
553                result.content_hash_backfilled += 1;
554            }
555            if needs_source {
556                result.source_hash_backfilled += 1;
557            }
558
559            if dry_run {
560                emit_backfill_progress(processed, total_count, &started, &mut last_report);
561                continue;
562            }
563
564            // Atomic-per-row swap: delete then re-insert with corrected
565            // hashes. LanceDB lacks an update-with-vector path for our schema.
566            let new_doc = ChromaDocument {
567                id: doc.id.clone(),
568                namespace: doc.namespace.clone(),
569                embedding: doc.embedding.clone(),
570                metadata: doc.metadata.clone(),
571                document: doc.document.clone(),
572                layer: doc.layer,
573                parent_id: doc.parent_id.clone(),
574                children_ids: doc.children_ids.clone(),
575                keywords: doc.keywords.clone(),
576                content_hash: Some(new_content_hash.clone()),
577                source_hash: Some(recovered_source_hash.clone()),
578            };
579            storage.delete_document(&doc.namespace, &doc.id).await?;
580            storage.add_to_store(vec![new_doc]).await?;
581            emit_backfill_progress(processed, total_count, &started, &mut last_report);
582        }
583
584        if page_len < PAGE {
585            break;
586        }
587        offset += page_len;
588    }
589
590    if total_count > 0 && processed > 0 {
591        eprintln!(
592            "\r  [{0}/{0}] 100.0%  done in {1}                    ",
593            processed,
594            fmt_duration(started.elapsed().as_secs_f64())
595        );
596    }
597
598    Ok(result)
599}
600
601pub async fn database_stats(storage: &StorageManager) -> Result<DatabaseStats> {
602    match storage.stats().await {
603        Ok(stats) => Ok(DatabaseStats {
604            row_count: stats.row_count,
605            version_count: stats.version_count,
606            table_name: stats.table_name,
607            db_path: stats.db_path,
608        }),
609        Err(_) => Ok(DatabaseStats {
610            row_count: 0,
611            version_count: 0,
612            table_name: storage.get_collection_name().to_string(),
613            db_path: storage.lance_path().to_string(),
614        }),
615    }
616}
617
618pub async fn namespace_stats(
619    storage: &StorageManager,
620    namespace: Option<&str>,
621) -> Result<Vec<NamespaceStats>> {
622    let all_docs = storage.all_documents(namespace, 100_000).await?;
623    let mut by_namespace: HashMap<String, Vec<_>> = HashMap::new();
624    for doc in &all_docs {
625        by_namespace
626            .entry(doc.namespace.clone())
627            .or_default()
628            .push(doc);
629    }
630
631    let mut stats_list = Vec::with_capacity(by_namespace.len());
632    for (name, docs) in by_namespace {
633        let total_chunks = docs.len();
634        let mut layer_counts = HashMap::new();
635        let mut keyword_counts = HashMap::new();
636        let mut dates = Vec::new();
637
638        for doc in docs {
639            let layer_name = SliceLayer::from_u8(doc.layer)
640                .map(|layer| layer.name().to_string())
641                .unwrap_or_else(|| "flat".to_string());
642            *layer_counts.entry(layer_name).or_insert(0) += 1;
643
644            for keyword in &doc.keywords {
645                *keyword_counts.entry(keyword.clone()).or_insert(0) += 1;
646            }
647
648            if let Some(timestamp) = extract_doc_timestamp_string(doc.metadata.as_object()) {
649                dates.push(timestamp);
650            }
651        }
652
653        let mut top_keywords = keyword_counts.into_iter().collect::<Vec<_>>();
654        top_keywords.sort_by_key(|entry| std::cmp::Reverse(entry.1));
655        top_keywords.truncate(10);
656        dates.sort();
657
658        stats_list.push(NamespaceStats {
659            name,
660            total_chunks,
661            layer_counts,
662            top_keywords,
663            has_timestamps: !dates.is_empty(),
664            earliest_indexed: dates.first().cloned(),
665            latest_indexed: dates.last().cloned(),
666        });
667    }
668
669    stats_list.sort_by(|left, right| left.name.cmp(&right.name));
670    Ok(stats_list)
671}
672
673pub async fn timeline_report(
674    storage: &StorageManager,
675    query: &TimelineQuery,
676) -> Result<TimelineReport> {
677    let namespaces: Vec<String> = if let Some(namespace) = query.namespace.as_deref() {
678        vec![namespace.to_string()]
679    } else {
680        storage
681            .list_namespaces()
682            .await?
683            .into_iter()
684            .map(|(name, _count)| name)
685            .collect()
686    };
687
688    let since = query.since.as_deref().and_then(parse_time_bound);
689    let until = query.until.as_deref().and_then(parse_time_bound);
690
691    let mut timeline: BTreeMap<String, BTreeMap<String, BTreeMap<String, usize>>> = BTreeMap::new();
692    let mut all_dates = BTreeSet::new();
693
694    for namespace in &namespaces {
695        let docs = storage.get_all_in_namespace(namespace).await?;
696        for doc in docs {
697            let Some(timestamp) = extract_doc_timestamp(&doc) else {
698                all_dates.insert("unknown".to_string());
699                *timeline
700                    .entry("unknown".to_string())
701                    .or_default()
702                    .entry(namespace.clone())
703                    .or_default()
704                    .entry("unknown".to_string())
705                    .or_default() += 1;
706                continue;
707            };
708
709            if since.is_some_and(|lower| timestamp < lower) {
710                continue;
711            }
712            if until.is_some_and(|upper| timestamp > upper) {
713                continue;
714            }
715
716            let bucket = match query.bucket {
717                TimelineBucket::Day => timestamp.format("%Y-%m-%d").to_string(),
718                TimelineBucket::Hour => timestamp.format("%Y-%m-%dT%H:00:00Z").to_string(),
719            };
720            let source = doc
721                .metadata
722                .get("source")
723                .and_then(|value| value.as_str())
724                .or_else(|| {
725                    doc.metadata
726                        .get("file_path")
727                        .and_then(|value| value.as_str())
728                })
729                .map(filename_from_path)
730                .unwrap_or_else(|| "unknown".to_string());
731
732            all_dates.insert(bucket.clone());
733            *timeline
734                .entry(bucket)
735                .or_default()
736                .entry(namespace.clone())
737                .or_default()
738                .entry(source)
739                .or_default() += 1;
740        }
741    }
742
743    let entries = timeline
744        .iter()
745        .flat_map(|(date, namespace_map)| {
746            namespace_map
747                .iter()
748                .flat_map(move |(namespace, source_map)| {
749                    source_map
750                        .iter()
751                        .map(move |(source, chunk_count)| TimelineEntry {
752                            date: date.clone(),
753                            namespace: namespace.clone(),
754                            source: Some(source.clone()),
755                            chunk_count: *chunk_count,
756                        })
757                })
758        })
759        .collect::<Vec<_>>();
760
761    let ordered_dates = all_dates
762        .iter()
763        .filter(|date| *date != "unknown")
764        .collect::<Vec<_>>();
765    let earliest = ordered_dates.first().map(|date| (*date).clone());
766    let latest = ordered_dates.last().map(|date| (*date).clone());
767    let gaps = compute_gaps(&ordered_dates, query.bucket);
768    let total_days = match (ordered_dates.first(), ordered_dates.last()) {
769        (Some(first), Some(last)) => {
770            let first_date = timeline_gap_date(first, query.bucket);
771            let last_date = timeline_gap_date(last, query.bucket);
772            match (first_date, last_date) {
773                (Some(first_date), Some(last_date)) => {
774                    (last_date - first_date).num_days() as usize + 1
775                }
776                _ => 0,
777            }
778        }
779        _ => 0,
780    };
781
782    Ok(TimelineReport {
783        namespaces,
784        entries,
785        coverage: TimelineCoverage {
786            earliest,
787            latest,
788            total_days,
789            days_with_data: ordered_dates.len(),
790        },
791        gaps,
792    })
793}
794
795fn integrity_recommendation(recommendation: IntegrityRecommendation) -> AuditRecommendation {
796    match recommendation {
797        IntegrityRecommendation::Excellent => AuditRecommendation::Excellent,
798        IntegrityRecommendation::Good => AuditRecommendation::Good,
799        IntegrityRecommendation::Warn => AuditRecommendation::Warn,
800        IntegrityRecommendation::Purge => AuditRecommendation::Purge,
801    }
802}
803
804fn extract_doc_timestamp(doc: &crate::ChromaDocument) -> Option<DateTime<Utc>> {
805    doc.metadata
806        .get("indexed_at")
807        .and_then(|value| value.as_str())
808        .or_else(|| {
809            doc.metadata
810                .get("timestamp")
811                .and_then(|value| value.as_str())
812        })
813        .or_else(|| {
814            doc.metadata
815                .get("created_at")
816                .and_then(|value| value.as_str())
817        })
818        .and_then(parse_iso_or_date)
819}
820
821fn extract_doc_timestamp_string(
822    metadata: Option<&serde_json::Map<String, serde_json::Value>>,
823) -> Option<String> {
824    metadata.and_then(|object| {
825        object.iter().find_map(|(key, value)| {
826            if !(key.contains("date") || key.contains("timestamp") || key.contains("time")) {
827                return None;
828            }
829            value.as_str().map(ToOwned::to_owned)
830        })
831    })
832}
833
834fn parse_time_bound(input: &str) -> Option<DateTime<Utc>> {
835    if let Some(days_str) = input.strip_suffix('d')
836        && let Ok(days) = days_str.parse::<i64>()
837    {
838        return Some(Utc::now() - Duration::days(days));
839    }
840
841    if input.len() == 7
842        && input.chars().nth(4) == Some('-')
843        && let Ok(date) = NaiveDate::parse_from_str(&format!("{input}-01"), "%Y-%m-%d")
844    {
845        return date
846            .and_hms_opt(0, 0, 0)
847            .map(|dt| Utc.from_utc_datetime(&dt));
848    }
849
850    parse_iso_or_date(input)
851}
852
853fn parse_iso_or_date(input: &str) -> Option<DateTime<Utc>> {
854    DateTime::parse_from_rfc3339(input)
855        .map(|dt| dt.with_timezone(&Utc))
856        .ok()
857        .or_else(|| {
858            NaiveDateTime::parse_from_str(input, "%Y-%m-%dT%H:%M:%S")
859                .ok()
860                .map(|dt| Utc.from_utc_datetime(&dt))
861        })
862        .or_else(|| {
863            NaiveDate::parse_from_str(input, "%Y-%m-%d")
864                .ok()
865                .and_then(|date| date.and_hms_opt(0, 0, 0))
866                .map(|dt| Utc.from_utc_datetime(&dt))
867        })
868}
869
870fn filename_from_path(path: &str) -> String {
871    std::path::Path::new(path)
872        .file_name()
873        .and_then(|name| name.to_str())
874        .unwrap_or(path)
875        .to_string()
876}
877
878fn compute_gaps(dates: &[&String], bucket: TimelineBucket) -> Vec<String> {
879    let parsed_dates = dates
880        .iter()
881        .filter_map(|date| timeline_gap_date(date, bucket))
882        .collect::<Vec<_>>();
883
884    let mut gaps = Vec::new();
885    for window in parsed_dates.windows(2) {
886        let diff = window[1] - window[0];
887        let missing_units = match bucket {
888            TimelineBucket::Day => diff.num_days() - 1,
889            TimelineBucket::Hour => diff.num_hours() - 1,
890        };
891        if missing_units > 0 {
892            gaps.push(format!(
893                "{} to {} ({} missing {})",
894                format_gap_date(window[0], bucket),
895                format_gap_date(window[1], bucket),
896                missing_units,
897                match bucket {
898                    TimelineBucket::Day => "day(s)",
899                    TimelineBucket::Hour => "hour(s)",
900                }
901            ));
902        }
903    }
904    gaps
905}
906
907fn timeline_gap_date(date: &str, bucket: TimelineBucket) -> Option<DateTime<Utc>> {
908    match bucket {
909        TimelineBucket::Day => NaiveDate::parse_from_str(date, "%Y-%m-%d")
910            .ok()
911            .and_then(|date| date.and_hms_opt(0, 0, 0))
912            .map(|dt| Utc.from_utc_datetime(&dt)),
913        TimelineBucket::Hour => DateTime::parse_from_rfc3339(date)
914            .map(|dt| dt.with_timezone(&Utc))
915            .ok()
916            .or_else(|| parse_iso_or_date(date)),
917    }
918}
919
920fn format_gap_date(date: DateTime<Utc>, bucket: TimelineBucket) -> String {
921    match bucket {
922        TimelineBucket::Day => date.format("%Y-%m-%d").to_string(),
923        TimelineBucket::Hour => date.format("%Y-%m-%dT%H:00:00Z").to_string(),
924    }
925}
926
927#[cfg(test)]
928mod backfill_tests {
929    use super::*;
930    use crate::rag::compute_content_hash;
931    use crate::storage::ChromaDocument;
932    use tempfile::TempDir;
933
934    /// Pre-v4 row stored `content_hash = SHA256(source_doc)` for every layer
935    /// of the same source. After backfill, `content_hash` must become
936    /// SHA256(chunk_text) and `source_hash` must hold the legacy value so
937    /// pre-index dedup keeps working.
938    #[tokio::test]
939    async fn backfill_promotes_legacy_content_hash_to_source_hash() {
940        let tmp = TempDir::new().expect("temp dir");
941        let db_path = tmp.path().join("lancedb");
942        let storage = StorageManager::new_lance_only(db_path.to_str().expect("utf-8 temp db path"))
943            .await
944            .expect("storage");
945        storage.ensure_collection().await.expect("collection");
946
947        let namespace = "kb:transcripts-test".to_string();
948        let source_text = "full source document text";
949        let source_hash = compute_content_hash(source_text);
950
951        // Two chunks from one source, written under the v3 contract:
952        // both rows carry the source-text hash in `content_hash` and
953        // leave `source_hash` empty.
954        let chunk_a_text = "outer summary text";
955        let chunk_b_text = "inner detailed text";
956        let doc_a = ChromaDocument {
957            id: "chunk-a".to_string(),
958            namespace: namespace.clone(),
959            embedding: vec![0.1_f32; 8],
960            metadata: serde_json::json!({"path": "doc.md"}),
961            document: chunk_a_text.to_string(),
962            layer: 1,
963            parent_id: None,
964            children_ids: vec![],
965            keywords: vec![],
966            content_hash: Some(source_hash.clone()),
967            source_hash: None,
968        };
969        let doc_b = ChromaDocument {
970            id: "chunk-b".to_string(),
971            namespace: namespace.clone(),
972            embedding: vec![0.2_f32; 8],
973            metadata: serde_json::json!({"path": "doc.md"}),
974            document: chunk_b_text.to_string(),
975            layer: 3,
976            parent_id: None,
977            children_ids: vec![],
978            keywords: vec![],
979            content_hash: Some(source_hash.clone()),
980            source_hash: None,
981        };
982        storage
983            .add_to_store(vec![doc_a, doc_b])
984            .await
985            .expect("seed pre-v4 rows");
986
987        let dry = backfill_chunk_and_source_hashes(&storage, Some(&namespace), true)
988            .await
989            .expect("dry run");
990        assert!(dry.dry_run);
991        assert_eq!(dry.total_docs, 2);
992        assert_eq!(dry.content_hash_backfilled, 2);
993        assert_eq!(dry.source_hash_backfilled, 2);
994        assert_eq!(dry.already_consistent, 0);
995
996        let live = backfill_chunk_and_source_hashes(&storage, Some(&namespace), false)
997            .await
998            .expect("live run");
999        assert!(!live.dry_run);
1000        assert_eq!(live.content_hash_backfilled, 2);
1001        assert_eq!(live.source_hash_backfilled, 2);
1002
1003        let after_a = storage
1004            .get_document(&namespace, "chunk-a")
1005            .await
1006            .expect("lookup a")
1007            .expect("doc-a present");
1008        assert_eq!(
1009            after_a.content_hash.as_deref(),
1010            Some(compute_content_hash(chunk_a_text)).as_deref()
1011        );
1012        assert_eq!(after_a.source_hash.as_deref(), Some(source_hash.as_str()));
1013
1014        let after_b = storage
1015            .get_document(&namespace, "chunk-b")
1016            .await
1017            .expect("lookup b")
1018            .expect("doc-b present");
1019        assert_eq!(
1020            after_b.content_hash.as_deref(),
1021            Some(compute_content_hash(chunk_b_text)).as_deref()
1022        );
1023        assert_eq!(after_b.source_hash.as_deref(), Some(source_hash.as_str()));
1024
1025        // Idempotency: running again leaves the rows alone.
1026        let again = backfill_chunk_and_source_hashes(&storage, Some(&namespace), false)
1027            .await
1028            .expect("idempotent");
1029        assert_eq!(again.content_hash_backfilled, 0);
1030        assert_eq!(again.source_hash_backfilled, 0);
1031        assert_eq!(again.already_consistent, 2);
1032    }
1033}
1034
1035#[cfg(test)]
1036mod dedup_grouping_tests {
1037    use super::*;
1038    use crate::rag::compute_content_hash;
1039    use crate::storage::ChromaDocument;
1040    use tempfile::TempDir;
1041
1042    fn doc(
1043        id: &str,
1044        ns: &str,
1045        layer: u8,
1046        text: &str,
1047        source: &str,
1048        chunk_hash: &str,
1049    ) -> ChromaDocument {
1050        ChromaDocument {
1051            id: id.to_string(),
1052            namespace: ns.to_string(),
1053            embedding: vec![0.1_f32; 8],
1054            metadata: serde_json::json!({}),
1055            document: text.to_string(),
1056            layer,
1057            parent_id: None,
1058            children_ids: vec![],
1059            keywords: vec![],
1060            content_hash: Some(chunk_hash.to_string()),
1061            source_hash: Some(source.to_string()),
1062        }
1063    }
1064
1065    /// Spec P4: post-v4 default (`source-hash-layer`) must keep the onion
1066    /// intact: with two `__dupe__` + `__clean__` variants of the same source,
1067    /// each layer collapses to a single survivor (4 dups removed for a 4-layer
1068    /// onion), while distinct sources remain untouched.
1069    #[tokio::test]
1070    async fn source_hash_layer_grouping_preserves_onion_structure() {
1071        let tmp = TempDir::new().expect("temp dir");
1072        let db_path = tmp.path().join("db");
1073        let storage = StorageManager::new_lance_only(db_path.to_str().expect("utf-8 temp db path"))
1074            .await
1075            .expect("storage");
1076        storage.ensure_collection().await.expect("collection");
1077
1078        let ns = "kb:transcripts-test";
1079        let source_a = compute_content_hash("source document A — full transcript");
1080        let source_b = compute_content_hash("source document B — different transcript");
1081
1082        // Source A indexed twice (`__dupe__` + `__clean__`): 4 layers × 2 = 8 chunks.
1083        // After source-hash-layer grouping: 4 groups of 2 → 4 duplicates removed.
1084        let mut docs = Vec::new();
1085        for (suffix, _variant) in [("clean", "clean"), ("dupe", "dupe")].iter() {
1086            for layer in 0u8..4 {
1087                let text = format!("source-A layer-{layer} variant-{suffix}");
1088                let chunk_hash = compute_content_hash(&text);
1089                docs.push(doc(
1090                    &format!("a-{suffix}-l{layer}"),
1091                    ns,
1092                    layer,
1093                    &text,
1094                    &source_a,
1095                    &chunk_hash,
1096                ));
1097            }
1098        }
1099        // Source B indexed once: 4 unique chunks, must survive.
1100        for layer in 0u8..4 {
1101            let text = format!("source-B layer-{layer}");
1102            let chunk_hash = compute_content_hash(&text);
1103            docs.push(doc(
1104                &format!("b-l{layer}"),
1105                ns,
1106                layer,
1107                &text,
1108                &source_b,
1109                &chunk_hash,
1110            ));
1111        }
1112        storage
1113            .add_to_store(docs)
1114            .await
1115            .expect("seed dedup fixture");
1116
1117        let result = deduplicate_documents(
1118            &storage,
1119            Some(ns),
1120            true,
1121            KeepStrategy::Oldest,
1122            false,
1123            DedupGroupBy::SourceHashLayer,
1124        )
1125        .await
1126        .expect("dedup");
1127
1128        assert_eq!(result.total_docs, 12);
1129        assert_eq!(result.duplicate_groups, 4, "one group per onion layer");
1130        assert_eq!(
1131            result.duplicates_removed, 4,
1132            "remove one variant per layer, keep the other"
1133        );
1134        assert_eq!(
1135            result.docs_without_hash, 0,
1136            "every chunk has source_hash populated"
1137        );
1138        // Source B contributes 4 singleton groups counted as unique_docs.
1139        assert_eq!(result.unique_docs, 4 + 4);
1140        // Group keys must contain the layer suffix so consumers can verify
1141        // per-layer onion preservation.
1142        assert!(
1143            result.groups.iter().all(|g| g.group_key.contains("|layer")),
1144            "source-hash-layer keys must encode layer: {:?}",
1145            result
1146                .groups
1147                .iter()
1148                .map(|g| &g.group_key)
1149                .collect::<Vec<_>>()
1150        );
1151    }
1152
1153    /// Spec P4 invariant: post-P0 every chunk has a unique `content_hash`, so
1154    /// the legacy grouping must report zero duplicates on a freshly-indexed
1155    /// onion namespace. This test exists to lock that semantic in place — it
1156    /// is the symptom that motivated adding `--group-by`.
1157    #[tokio::test]
1158    async fn content_hash_grouping_finds_zero_duplicates_on_fresh_onion() {
1159        let tmp = TempDir::new().expect("temp dir");
1160        let db_path = tmp.path().join("db");
1161        let storage = StorageManager::new_lance_only(db_path.to_str().expect("utf-8 temp db path"))
1162            .await
1163            .expect("storage");
1164        storage.ensure_collection().await.expect("collection");
1165
1166        let ns = "kb:transcripts-test";
1167        let source_a = compute_content_hash("source document A");
1168        let mut docs = Vec::new();
1169        for layer in 0u8..4 {
1170            // Distinct chunk text per layer → unique per-chunk content_hash.
1171            let text = format!("source-A unique-layer-{layer}");
1172            let chunk_hash = compute_content_hash(&text);
1173            docs.push(doc(
1174                &format!("a-l{layer}"),
1175                ns,
1176                layer,
1177                &text,
1178                &source_a,
1179                &chunk_hash,
1180            ));
1181        }
1182        storage
1183            .add_to_store(docs)
1184            .await
1185            .expect("seed unique chunks");
1186
1187        let result = deduplicate_documents(
1188            &storage,
1189            Some(ns),
1190            true,
1191            KeepStrategy::Oldest,
1192            false,
1193            DedupGroupBy::ContentHash,
1194        )
1195        .await
1196        .expect("dedup");
1197
1198        assert_eq!(result.total_docs, 4);
1199        assert_eq!(
1200            result.duplicate_groups, 0,
1201            "post-P0 each chunk has unique content_hash, legacy grouping must find none"
1202        );
1203        assert_eq!(result.duplicates_removed, 0);
1204    }
1205
1206    #[test]
1207    fn dedup_group_by_parses_known_aliases_and_falls_back_to_default() {
1208        assert_eq!(
1209            DedupGroupBy::parse("source-hash-layer"),
1210            DedupGroupBy::SourceHashLayer
1211        );
1212        assert_eq!(DedupGroupBy::parse("source-hash"), DedupGroupBy::SourceHash);
1213        assert_eq!(
1214            DedupGroupBy::parse("source_hash"),
1215            DedupGroupBy::SourceHash,
1216            "underscore form accepted as alias"
1217        );
1218        assert_eq!(
1219            DedupGroupBy::parse("content-hash"),
1220            DedupGroupBy::ContentHash
1221        );
1222        assert_eq!(
1223            DedupGroupBy::parse("content_hash"),
1224            DedupGroupBy::ContentHash
1225        );
1226        // Unknown / empty values resolve to the post-v4 default rather than
1227        // erroring out — keeps older HTTP/CLI clients working.
1228        assert_eq!(DedupGroupBy::parse(""), DedupGroupBy::SourceHashLayer);
1229        assert_eq!(DedupGroupBy::parse("nope"), DedupGroupBy::SourceHashLayer);
1230    }
1231}