1use std::collections::{BTreeMap, BTreeSet, HashMap};
2
3use anyhow::Result;
4use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, TimeZone, Utc};
5use memex_contracts::{
6 audit::{AuditRecommendation, AuditResult},
7 stats::{DatabaseStats, NamespaceStats},
8 timeline::TimelineEntry,
9};
10use serde::{Deserialize, Serialize};
11
12use crate::storage::ChromaDocument;
13use crate::{IntegrityRecommendation, SliceLayer, StorageManager, TextIntegrityMetrics};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum KeepStrategy {
17 Oldest,
19 Newest,
21 HighestScore,
23}
24
25impl From<&str> for KeepStrategy {
26 fn from(value: &str) -> Self {
27 match value {
28 "newest" => Self::Newest,
29 "highest-score" => Self::HighestScore,
30 _ => Self::Oldest,
31 }
32 }
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
43#[serde(rename_all = "kebab-case")]
44pub enum DedupGroupBy {
45 #[default]
49 SourceHashLayer,
50 SourceHash,
54 ContentHash,
58}
59
60impl DedupGroupBy {
61 pub fn parse(value: &str) -> Self {
64 match value {
65 "content-hash" | "content_hash" => Self::ContentHash,
66 "source-hash" | "source_hash" => Self::SourceHash,
67 _ => Self::SourceHashLayer,
68 }
69 }
70
71 pub fn label(self) -> &'static str {
73 match self {
74 Self::SourceHashLayer => "source-hash-layer",
75 Self::SourceHash => "source-hash",
76 Self::ContentHash => "content-hash",
77 }
78 }
79}
80
81impl From<&str> for DedupGroupBy {
82 fn from(value: &str) -> Self {
83 Self::parse(value)
84 }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct DedupDuplicate {
89 pub id: String,
90 pub namespace: String,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct DedupGroup {
103 pub content_hash: String,
104 #[serde(default)]
105 pub group_key: String,
106 pub kept_id: String,
107 pub kept_namespace: String,
108 pub removed: Vec<DedupDuplicate>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct DedupResult {
113 pub total_docs: usize,
114 pub unique_docs: usize,
115 pub duplicate_groups: usize,
116 pub duplicates_removed: usize,
117 pub docs_without_hash: usize,
118 #[serde(default)]
122 pub group_by: DedupGroupBy,
123 pub groups: Vec<DedupGroup>,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct PurgeQualityCandidate {
128 pub namespace: String,
129 pub quality_score: f32,
130 pub document_count: usize,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct PurgeQualityResult {
135 pub namespace_filter: Option<String>,
136 pub threshold: u8,
137 pub dry_run: bool,
138 pub purged_namespaces: usize,
139 pub candidates: Vec<PurgeQualityCandidate>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct TimelineCoverage {
144 pub earliest: Option<String>,
145 pub latest: Option<String>,
146 pub total_days: usize,
147 pub days_with_data: usize,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct TimelineReport {
152 pub namespaces: Vec<String>,
153 pub entries: Vec<TimelineEntry>,
154 pub coverage: TimelineCoverage,
155 pub gaps: Vec<String>,
156}
157
158#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
159#[serde(rename_all = "snake_case")]
160pub enum TimelineBucket {
161 #[default]
162 Day,
163 Hour,
164}
165
166impl TimelineBucket {
167 pub fn parse(value: &str) -> Self {
168 match value {
169 "hour" => Self::Hour,
170 _ => Self::Day,
171 }
172 }
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, Default)]
176pub struct TimelineQuery {
177 pub namespace: Option<String>,
178 pub since: Option<String>,
179 pub until: Option<String>,
180 pub bucket: TimelineBucket,
181}
182
183pub async fn audit_namespaces(
184 storage: &StorageManager,
185 namespace: Option<&str>,
186 threshold: u8,
187) -> Result<Vec<AuditResult>> {
188 let namespaces: Vec<String> = if let Some(ns) = namespace {
189 vec![ns.to_string()]
190 } else {
191 storage
192 .list_namespaces()
193 .await?
194 .into_iter()
195 .map(|(name, _count)| name)
196 .collect()
197 };
198
199 let threshold_f32 = threshold as f32 / 100.0;
200 let mut results = Vec::with_capacity(namespaces.len());
201
202 for ns in namespaces {
203 let docs = storage.get_all_in_namespace(&ns).await?;
204 if docs.is_empty() {
205 results.push(AuditResult {
206 namespace: ns,
207 document_count: 0,
208 avg_chunk_length: 0,
209 sentence_integrity: 0.0,
210 word_integrity: 0.0,
211 chunk_quality: 0.0,
212 overall_score: 0.0,
213 recommendation: AuditRecommendation::Empty,
214 passes_threshold: false,
215 });
216 continue;
217 }
218
219 let chunks: Vec<String> = docs.iter().map(|doc| doc.document.clone()).collect();
220 let combined_text = chunks.join(" ");
221 let metrics = TextIntegrityMetrics::compute(&combined_text, &chunks);
222
223 results.push(AuditResult {
224 namespace: ns,
225 document_count: docs.len(),
226 avg_chunk_length: metrics.avg_chunk_length,
227 sentence_integrity: metrics.sentence_integrity,
228 word_integrity: metrics.word_integrity,
229 chunk_quality: metrics.chunk_quality,
230 overall_score: metrics.overall,
231 recommendation: integrity_recommendation(metrics.recommendation()),
232 passes_threshold: metrics.overall >= threshold_f32,
233 });
234 }
235
236 Ok(results)
237}
238
239pub async fn purge_quality_namespaces(
240 storage: &StorageManager,
241 namespace: Option<&str>,
242 threshold: u8,
243 dry_run: bool,
244) -> Result<PurgeQualityResult> {
245 let candidates = audit_namespaces(storage, namespace, threshold)
246 .await?
247 .into_iter()
248 .filter(|result| !result.passes_threshold)
249 .map(|result| PurgeQualityCandidate {
250 namespace: result.namespace,
251 quality_score: result.overall_score,
252 document_count: result.document_count,
253 })
254 .collect::<Vec<_>>();
255
256 let mut purged_namespaces = 0usize;
257 if !dry_run {
258 for candidate in &candidates {
259 storage
260 .delete_namespace_documents(&candidate.namespace)
261 .await?;
262 purged_namespaces += 1;
263 }
264 }
265
266 Ok(PurgeQualityResult {
267 namespace_filter: namespace.map(ToOwned::to_owned),
268 threshold,
269 dry_run,
270 purged_namespaces,
271 candidates,
272 })
273}
274
275pub async fn deduplicate_documents(
276 storage: &StorageManager,
277 namespace: Option<&str>,
278 dry_run: bool,
279 keep_strategy: KeepStrategy,
280 cross_namespace: bool,
281 group_by: DedupGroupBy,
282) -> Result<DedupResult> {
283 let all_docs = storage.all_documents(namespace, 1_000_000).await?;
284
285 let mut hash_groups: HashMap<String, Vec<_>> = HashMap::new();
286 let mut docs_without_hash = 0usize;
287
288 for doc in &all_docs {
289 let raw_key: Option<String> = match group_by {
293 DedupGroupBy::ContentHash => doc
294 .content_hash
295 .as_deref()
296 .filter(|hash| !hash.is_empty())
297 .map(ToOwned::to_owned),
298 DedupGroupBy::SourceHash => doc
299 .source_hash
300 .as_deref()
301 .filter(|hash| !hash.is_empty())
302 .map(ToOwned::to_owned),
303 DedupGroupBy::SourceHashLayer => doc
304 .source_hash
305 .as_deref()
306 .filter(|hash| !hash.is_empty())
307 .map(|hash| format!("{}|layer{}", hash, doc.layer)),
308 };
309
310 let Some(key) = raw_key else {
311 docs_without_hash += 1;
312 continue;
313 };
314
315 let scoped_key = if cross_namespace {
316 key
317 } else {
318 format!("{}:{}", doc.namespace, key)
319 };
320 hash_groups.entry(scoped_key).or_default().push(doc);
321 }
322
323 let mut result = DedupResult {
324 total_docs: all_docs.len(),
325 unique_docs: 0,
326 duplicate_groups: 0,
327 duplicates_removed: 0,
328 docs_without_hash,
329 group_by,
330 groups: Vec::new(),
331 };
332
333 for (key, mut docs) in hash_groups {
334 if docs.len() == 1 {
335 result.unique_docs += 1;
336 continue;
337 }
338
339 match keep_strategy {
340 KeepStrategy::Oldest => docs.sort_by(|left, right| left.id.cmp(&right.id)),
341 KeepStrategy::Newest => docs.sort_by(|left, right| right.id.cmp(&left.id)),
342 KeepStrategy::HighestScore => {}
343 }
344
345 let kept = docs[0];
346 let removed_docs = docs.into_iter().skip(1).collect::<Vec<_>>();
347
348 if !dry_run {
349 for doc in &removed_docs {
350 storage.delete_document(&doc.namespace, &doc.id).await?;
351 }
352 }
353
354 let display_key = if cross_namespace {
357 key.clone()
358 } else {
359 key.split_once(':')
360 .map(|(_ns, rest)| rest.to_string())
361 .unwrap_or_else(|| key.clone())
362 };
363
364 result.unique_docs += 1;
365 result.duplicate_groups += 1;
366 result.duplicates_removed += removed_docs.len();
367 result.groups.push(DedupGroup {
368 content_hash: display_key.clone(),
369 group_key: display_key,
370 kept_id: kept.id.clone(),
371 kept_namespace: kept.namespace.clone(),
372 removed: removed_docs
373 .iter()
374 .map(|doc| DedupDuplicate {
375 id: doc.id.clone(),
376 namespace: doc.namespace.clone(),
377 })
378 .collect(),
379 });
380 }
381
382 Ok(result)
383}
384
385#[derive(Debug, Clone, Serialize, Deserialize, Default)]
396pub struct BackfillHashesResult {
397 pub total_docs: usize,
399 pub content_hash_backfilled: usize,
402 pub source_hash_backfilled: usize,
406 pub already_consistent: usize,
409 pub skipped_no_embedding: usize,
412 pub dry_run: bool,
414}
415
416fn fmt_duration(secs: f64) -> String {
432 if secs > 3600.0 {
433 format!("{:.0}h{:02.0}m", secs / 3600.0, (secs % 3600.0) / 60.0)
434 } else if secs > 60.0 {
435 format!("{:.0}m{:02.0}s", secs / 60.0, secs % 60.0)
436 } else {
437 format!("{:.0}s", secs)
438 }
439}
440
441const SPINNER: &[char] = &['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
442
443fn emit_backfill_progress(
444 processed: usize,
445 total: usize,
446 started: &std::time::Instant,
447 last_report: &mut std::time::Instant,
448) {
449 if total == 0 || last_report.elapsed().as_secs() < 10 {
450 return;
451 }
452 *last_report = std::time::Instant::now();
453 let pct = (processed as f64 / total as f64 * 100.0).min(100.0);
454 let elapsed = started.elapsed().as_secs_f64();
455 let rate = processed as f64 / elapsed;
456 let eta = if rate > 0.0 {
457 (total - processed) as f64 / rate
458 } else {
459 0.0
460 };
461 let tick = (elapsed as usize / 10) % SPINNER.len();
462 eprint!(
463 "\r {} [{:>6}/{:>6}] {:5.1}% {:.0} docs/s ETA {} ",
464 SPINNER[tick],
465 processed,
466 total,
467 pct,
468 rate,
469 fmt_duration(eta)
470 );
471}
472
473pub async fn backfill_chunk_and_source_hashes(
475 storage: &StorageManager,
476 namespace: Option<&str>,
477 dry_run: bool,
478) -> Result<BackfillHashesResult> {
479 let mut result = BackfillHashesResult {
480 dry_run,
481 ..Default::default()
482 };
483
484 let total_count = match namespace {
486 Some(ns) => storage.count_namespace(ns).await.unwrap_or(0),
487 None => storage.stats().await.map(|s| s.row_count).unwrap_or(0),
488 };
489
490 const PAGE: usize = 5_000;
491 let mut offset = 0;
492 let mut processed = 0usize;
493 let started = std::time::Instant::now();
494 let mut last_report = started;
495
496 loop {
497 let page = storage.all_documents_page(namespace, offset, PAGE).await?;
498 let page_len = page.len();
499 if page_len == 0 {
500 break;
501 }
502 result.total_docs += page_len;
503
504 for doc in &page {
505 processed += 1;
506
507 if doc.embedding.is_empty() {
508 result.skipped_no_embedding += 1;
509 emit_backfill_progress(processed, total_count, &started, &mut last_report);
510 continue;
511 }
512
513 let true_chunk_hash = crate::rag::compute_content_hash(&doc.document);
514 let mut needs_content = false;
515 let mut needs_source = false;
516
517 let new_content_hash = match doc.content_hash.as_deref() {
518 Some(stored) if stored == true_chunk_hash => stored.to_string(),
519 Some(_) => {
520 needs_content = true;
521 true_chunk_hash.clone()
522 }
523 None => {
524 needs_content = true;
525 true_chunk_hash.clone()
526 }
527 };
528
529 let recovered_source_hash = match (&doc.source_hash, &doc.content_hash) {
534 (Some(s), _) if !s.is_empty() => s.clone(),
535 (_, Some(legacy)) if legacy.as_str() != true_chunk_hash.as_str() => {
536 needs_source = true;
538 legacy.clone()
539 }
540 _ => {
541 needs_source = doc.source_hash.is_none();
542 new_content_hash.clone()
543 }
544 };
545
546 if !needs_content && !needs_source {
547 result.already_consistent += 1;
548 emit_backfill_progress(processed, total_count, &started, &mut last_report);
549 continue;
550 }
551
552 if needs_content {
553 result.content_hash_backfilled += 1;
554 }
555 if needs_source {
556 result.source_hash_backfilled += 1;
557 }
558
559 if dry_run {
560 emit_backfill_progress(processed, total_count, &started, &mut last_report);
561 continue;
562 }
563
564 let new_doc = ChromaDocument {
567 id: doc.id.clone(),
568 namespace: doc.namespace.clone(),
569 embedding: doc.embedding.clone(),
570 metadata: doc.metadata.clone(),
571 document: doc.document.clone(),
572 layer: doc.layer,
573 parent_id: doc.parent_id.clone(),
574 children_ids: doc.children_ids.clone(),
575 keywords: doc.keywords.clone(),
576 content_hash: Some(new_content_hash.clone()),
577 source_hash: Some(recovered_source_hash.clone()),
578 };
579 storage.delete_document(&doc.namespace, &doc.id).await?;
580 storage.add_to_store(vec![new_doc]).await?;
581 emit_backfill_progress(processed, total_count, &started, &mut last_report);
582 }
583
584 if page_len < PAGE {
585 break;
586 }
587 offset += page_len;
588 }
589
590 if total_count > 0 && processed > 0 {
591 eprintln!(
592 "\r [{0}/{0}] 100.0% done in {1} ",
593 processed,
594 fmt_duration(started.elapsed().as_secs_f64())
595 );
596 }
597
598 Ok(result)
599}
600
601pub async fn database_stats(storage: &StorageManager) -> Result<DatabaseStats> {
602 match storage.stats().await {
603 Ok(stats) => Ok(DatabaseStats {
604 row_count: stats.row_count,
605 version_count: stats.version_count,
606 table_name: stats.table_name,
607 db_path: stats.db_path,
608 }),
609 Err(_) => Ok(DatabaseStats {
610 row_count: 0,
611 version_count: 0,
612 table_name: storage.get_collection_name().to_string(),
613 db_path: storage.lance_path().to_string(),
614 }),
615 }
616}
617
618pub async fn namespace_stats(
619 storage: &StorageManager,
620 namespace: Option<&str>,
621) -> Result<Vec<NamespaceStats>> {
622 let all_docs = storage.all_documents(namespace, 100_000).await?;
623 let mut by_namespace: HashMap<String, Vec<_>> = HashMap::new();
624 for doc in &all_docs {
625 by_namespace
626 .entry(doc.namespace.clone())
627 .or_default()
628 .push(doc);
629 }
630
631 let mut stats_list = Vec::with_capacity(by_namespace.len());
632 for (name, docs) in by_namespace {
633 let total_chunks = docs.len();
634 let mut layer_counts = HashMap::new();
635 let mut keyword_counts = HashMap::new();
636 let mut dates = Vec::new();
637
638 for doc in docs {
639 let layer_name = SliceLayer::from_u8(doc.layer)
640 .map(|layer| layer.name().to_string())
641 .unwrap_or_else(|| "flat".to_string());
642 *layer_counts.entry(layer_name).or_insert(0) += 1;
643
644 for keyword in &doc.keywords {
645 *keyword_counts.entry(keyword.clone()).or_insert(0) += 1;
646 }
647
648 if let Some(timestamp) = extract_doc_timestamp_string(doc.metadata.as_object()) {
649 dates.push(timestamp);
650 }
651 }
652
653 let mut top_keywords = keyword_counts.into_iter().collect::<Vec<_>>();
654 top_keywords.sort_by_key(|entry| std::cmp::Reverse(entry.1));
655 top_keywords.truncate(10);
656 dates.sort();
657
658 stats_list.push(NamespaceStats {
659 name,
660 total_chunks,
661 layer_counts,
662 top_keywords,
663 has_timestamps: !dates.is_empty(),
664 earliest_indexed: dates.first().cloned(),
665 latest_indexed: dates.last().cloned(),
666 });
667 }
668
669 stats_list.sort_by(|left, right| left.name.cmp(&right.name));
670 Ok(stats_list)
671}
672
673pub async fn timeline_report(
674 storage: &StorageManager,
675 query: &TimelineQuery,
676) -> Result<TimelineReport> {
677 let namespaces: Vec<String> = if let Some(namespace) = query.namespace.as_deref() {
678 vec![namespace.to_string()]
679 } else {
680 storage
681 .list_namespaces()
682 .await?
683 .into_iter()
684 .map(|(name, _count)| name)
685 .collect()
686 };
687
688 let since = query.since.as_deref().and_then(parse_time_bound);
689 let until = query.until.as_deref().and_then(parse_time_bound);
690
691 let mut timeline: BTreeMap<String, BTreeMap<String, BTreeMap<String, usize>>> = BTreeMap::new();
692 let mut all_dates = BTreeSet::new();
693
694 for namespace in &namespaces {
695 let docs = storage.get_all_in_namespace(namespace).await?;
696 for doc in docs {
697 let Some(timestamp) = extract_doc_timestamp(&doc) else {
698 all_dates.insert("unknown".to_string());
699 *timeline
700 .entry("unknown".to_string())
701 .or_default()
702 .entry(namespace.clone())
703 .or_default()
704 .entry("unknown".to_string())
705 .or_default() += 1;
706 continue;
707 };
708
709 if since.is_some_and(|lower| timestamp < lower) {
710 continue;
711 }
712 if until.is_some_and(|upper| timestamp > upper) {
713 continue;
714 }
715
716 let bucket = match query.bucket {
717 TimelineBucket::Day => timestamp.format("%Y-%m-%d").to_string(),
718 TimelineBucket::Hour => timestamp.format("%Y-%m-%dT%H:00:00Z").to_string(),
719 };
720 let source = doc
721 .metadata
722 .get("source")
723 .and_then(|value| value.as_str())
724 .or_else(|| {
725 doc.metadata
726 .get("file_path")
727 .and_then(|value| value.as_str())
728 })
729 .map(filename_from_path)
730 .unwrap_or_else(|| "unknown".to_string());
731
732 all_dates.insert(bucket.clone());
733 *timeline
734 .entry(bucket)
735 .or_default()
736 .entry(namespace.clone())
737 .or_default()
738 .entry(source)
739 .or_default() += 1;
740 }
741 }
742
743 let entries = timeline
744 .iter()
745 .flat_map(|(date, namespace_map)| {
746 namespace_map
747 .iter()
748 .flat_map(move |(namespace, source_map)| {
749 source_map
750 .iter()
751 .map(move |(source, chunk_count)| TimelineEntry {
752 date: date.clone(),
753 namespace: namespace.clone(),
754 source: Some(source.clone()),
755 chunk_count: *chunk_count,
756 })
757 })
758 })
759 .collect::<Vec<_>>();
760
761 let ordered_dates = all_dates
762 .iter()
763 .filter(|date| *date != "unknown")
764 .collect::<Vec<_>>();
765 let earliest = ordered_dates.first().map(|date| (*date).clone());
766 let latest = ordered_dates.last().map(|date| (*date).clone());
767 let gaps = compute_gaps(&ordered_dates, query.bucket);
768 let total_days = match (ordered_dates.first(), ordered_dates.last()) {
769 (Some(first), Some(last)) => {
770 let first_date = timeline_gap_date(first, query.bucket);
771 let last_date = timeline_gap_date(last, query.bucket);
772 match (first_date, last_date) {
773 (Some(first_date), Some(last_date)) => {
774 (last_date - first_date).num_days() as usize + 1
775 }
776 _ => 0,
777 }
778 }
779 _ => 0,
780 };
781
782 Ok(TimelineReport {
783 namespaces,
784 entries,
785 coverage: TimelineCoverage {
786 earliest,
787 latest,
788 total_days,
789 days_with_data: ordered_dates.len(),
790 },
791 gaps,
792 })
793}
794
795fn integrity_recommendation(recommendation: IntegrityRecommendation) -> AuditRecommendation {
796 match recommendation {
797 IntegrityRecommendation::Excellent => AuditRecommendation::Excellent,
798 IntegrityRecommendation::Good => AuditRecommendation::Good,
799 IntegrityRecommendation::Warn => AuditRecommendation::Warn,
800 IntegrityRecommendation::Purge => AuditRecommendation::Purge,
801 }
802}
803
804fn extract_doc_timestamp(doc: &crate::ChromaDocument) -> Option<DateTime<Utc>> {
805 doc.metadata
806 .get("indexed_at")
807 .and_then(|value| value.as_str())
808 .or_else(|| {
809 doc.metadata
810 .get("timestamp")
811 .and_then(|value| value.as_str())
812 })
813 .or_else(|| {
814 doc.metadata
815 .get("created_at")
816 .and_then(|value| value.as_str())
817 })
818 .and_then(parse_iso_or_date)
819}
820
821fn extract_doc_timestamp_string(
822 metadata: Option<&serde_json::Map<String, serde_json::Value>>,
823) -> Option<String> {
824 metadata.and_then(|object| {
825 object.iter().find_map(|(key, value)| {
826 if !(key.contains("date") || key.contains("timestamp") || key.contains("time")) {
827 return None;
828 }
829 value.as_str().map(ToOwned::to_owned)
830 })
831 })
832}
833
834fn parse_time_bound(input: &str) -> Option<DateTime<Utc>> {
835 if let Some(days_str) = input.strip_suffix('d')
836 && let Ok(days) = days_str.parse::<i64>()
837 {
838 return Some(Utc::now() - Duration::days(days));
839 }
840
841 if input.len() == 7
842 && input.chars().nth(4) == Some('-')
843 && let Ok(date) = NaiveDate::parse_from_str(&format!("{input}-01"), "%Y-%m-%d")
844 {
845 return date
846 .and_hms_opt(0, 0, 0)
847 .map(|dt| Utc.from_utc_datetime(&dt));
848 }
849
850 parse_iso_or_date(input)
851}
852
853fn parse_iso_or_date(input: &str) -> Option<DateTime<Utc>> {
854 DateTime::parse_from_rfc3339(input)
855 .map(|dt| dt.with_timezone(&Utc))
856 .ok()
857 .or_else(|| {
858 NaiveDateTime::parse_from_str(input, "%Y-%m-%dT%H:%M:%S")
859 .ok()
860 .map(|dt| Utc.from_utc_datetime(&dt))
861 })
862 .or_else(|| {
863 NaiveDate::parse_from_str(input, "%Y-%m-%d")
864 .ok()
865 .and_then(|date| date.and_hms_opt(0, 0, 0))
866 .map(|dt| Utc.from_utc_datetime(&dt))
867 })
868}
869
870fn filename_from_path(path: &str) -> String {
871 std::path::Path::new(path)
872 .file_name()
873 .and_then(|name| name.to_str())
874 .unwrap_or(path)
875 .to_string()
876}
877
878fn compute_gaps(dates: &[&String], bucket: TimelineBucket) -> Vec<String> {
879 let parsed_dates = dates
880 .iter()
881 .filter_map(|date| timeline_gap_date(date, bucket))
882 .collect::<Vec<_>>();
883
884 let mut gaps = Vec::new();
885 for window in parsed_dates.windows(2) {
886 let diff = window[1] - window[0];
887 let missing_units = match bucket {
888 TimelineBucket::Day => diff.num_days() - 1,
889 TimelineBucket::Hour => diff.num_hours() - 1,
890 };
891 if missing_units > 0 {
892 gaps.push(format!(
893 "{} to {} ({} missing {})",
894 format_gap_date(window[0], bucket),
895 format_gap_date(window[1], bucket),
896 missing_units,
897 match bucket {
898 TimelineBucket::Day => "day(s)",
899 TimelineBucket::Hour => "hour(s)",
900 }
901 ));
902 }
903 }
904 gaps
905}
906
907fn timeline_gap_date(date: &str, bucket: TimelineBucket) -> Option<DateTime<Utc>> {
908 match bucket {
909 TimelineBucket::Day => NaiveDate::parse_from_str(date, "%Y-%m-%d")
910 .ok()
911 .and_then(|date| date.and_hms_opt(0, 0, 0))
912 .map(|dt| Utc.from_utc_datetime(&dt)),
913 TimelineBucket::Hour => DateTime::parse_from_rfc3339(date)
914 .map(|dt| dt.with_timezone(&Utc))
915 .ok()
916 .or_else(|| parse_iso_or_date(date)),
917 }
918}
919
920fn format_gap_date(date: DateTime<Utc>, bucket: TimelineBucket) -> String {
921 match bucket {
922 TimelineBucket::Day => date.format("%Y-%m-%d").to_string(),
923 TimelineBucket::Hour => date.format("%Y-%m-%dT%H:00:00Z").to_string(),
924 }
925}
926
927#[cfg(test)]
928mod backfill_tests {
929 use super::*;
930 use crate::rag::compute_content_hash;
931 use crate::storage::ChromaDocument;
932 use tempfile::TempDir;
933
934 #[tokio::test]
939 async fn backfill_promotes_legacy_content_hash_to_source_hash() {
940 let tmp = TempDir::new().expect("temp dir");
941 let db_path = tmp.path().join("lancedb");
942 let storage = StorageManager::new_lance_only(db_path.to_str().expect("utf-8 temp db path"))
943 .await
944 .expect("storage");
945 storage.ensure_collection().await.expect("collection");
946
947 let namespace = "kb:transcripts-test".to_string();
948 let source_text = "full source document text";
949 let source_hash = compute_content_hash(source_text);
950
951 let chunk_a_text = "outer summary text";
955 let chunk_b_text = "inner detailed text";
956 let doc_a = ChromaDocument {
957 id: "chunk-a".to_string(),
958 namespace: namespace.clone(),
959 embedding: vec![0.1_f32; 8],
960 metadata: serde_json::json!({"path": "doc.md"}),
961 document: chunk_a_text.to_string(),
962 layer: 1,
963 parent_id: None,
964 children_ids: vec![],
965 keywords: vec![],
966 content_hash: Some(source_hash.clone()),
967 source_hash: None,
968 };
969 let doc_b = ChromaDocument {
970 id: "chunk-b".to_string(),
971 namespace: namespace.clone(),
972 embedding: vec![0.2_f32; 8],
973 metadata: serde_json::json!({"path": "doc.md"}),
974 document: chunk_b_text.to_string(),
975 layer: 3,
976 parent_id: None,
977 children_ids: vec![],
978 keywords: vec![],
979 content_hash: Some(source_hash.clone()),
980 source_hash: None,
981 };
982 storage
983 .add_to_store(vec![doc_a, doc_b])
984 .await
985 .expect("seed pre-v4 rows");
986
987 let dry = backfill_chunk_and_source_hashes(&storage, Some(&namespace), true)
988 .await
989 .expect("dry run");
990 assert!(dry.dry_run);
991 assert_eq!(dry.total_docs, 2);
992 assert_eq!(dry.content_hash_backfilled, 2);
993 assert_eq!(dry.source_hash_backfilled, 2);
994 assert_eq!(dry.already_consistent, 0);
995
996 let live = backfill_chunk_and_source_hashes(&storage, Some(&namespace), false)
997 .await
998 .expect("live run");
999 assert!(!live.dry_run);
1000 assert_eq!(live.content_hash_backfilled, 2);
1001 assert_eq!(live.source_hash_backfilled, 2);
1002
1003 let after_a = storage
1004 .get_document(&namespace, "chunk-a")
1005 .await
1006 .expect("lookup a")
1007 .expect("doc-a present");
1008 assert_eq!(
1009 after_a.content_hash.as_deref(),
1010 Some(compute_content_hash(chunk_a_text)).as_deref()
1011 );
1012 assert_eq!(after_a.source_hash.as_deref(), Some(source_hash.as_str()));
1013
1014 let after_b = storage
1015 .get_document(&namespace, "chunk-b")
1016 .await
1017 .expect("lookup b")
1018 .expect("doc-b present");
1019 assert_eq!(
1020 after_b.content_hash.as_deref(),
1021 Some(compute_content_hash(chunk_b_text)).as_deref()
1022 );
1023 assert_eq!(after_b.source_hash.as_deref(), Some(source_hash.as_str()));
1024
1025 let again = backfill_chunk_and_source_hashes(&storage, Some(&namespace), false)
1027 .await
1028 .expect("idempotent");
1029 assert_eq!(again.content_hash_backfilled, 0);
1030 assert_eq!(again.source_hash_backfilled, 0);
1031 assert_eq!(again.already_consistent, 2);
1032 }
1033}
1034
1035#[cfg(test)]
1036mod dedup_grouping_tests {
1037 use super::*;
1038 use crate::rag::compute_content_hash;
1039 use crate::storage::ChromaDocument;
1040 use tempfile::TempDir;
1041
1042 fn doc(
1043 id: &str,
1044 ns: &str,
1045 layer: u8,
1046 text: &str,
1047 source: &str,
1048 chunk_hash: &str,
1049 ) -> ChromaDocument {
1050 ChromaDocument {
1051 id: id.to_string(),
1052 namespace: ns.to_string(),
1053 embedding: vec![0.1_f32; 8],
1054 metadata: serde_json::json!({}),
1055 document: text.to_string(),
1056 layer,
1057 parent_id: None,
1058 children_ids: vec![],
1059 keywords: vec![],
1060 content_hash: Some(chunk_hash.to_string()),
1061 source_hash: Some(source.to_string()),
1062 }
1063 }
1064
1065 #[tokio::test]
1070 async fn source_hash_layer_grouping_preserves_onion_structure() {
1071 let tmp = TempDir::new().expect("temp dir");
1072 let db_path = tmp.path().join("db");
1073 let storage = StorageManager::new_lance_only(db_path.to_str().expect("utf-8 temp db path"))
1074 .await
1075 .expect("storage");
1076 storage.ensure_collection().await.expect("collection");
1077
1078 let ns = "kb:transcripts-test";
1079 let source_a = compute_content_hash("source document A — full transcript");
1080 let source_b = compute_content_hash("source document B — different transcript");
1081
1082 let mut docs = Vec::new();
1085 for (suffix, _variant) in [("clean", "clean"), ("dupe", "dupe")].iter() {
1086 for layer in 0u8..4 {
1087 let text = format!("source-A layer-{layer} variant-{suffix}");
1088 let chunk_hash = compute_content_hash(&text);
1089 docs.push(doc(
1090 &format!("a-{suffix}-l{layer}"),
1091 ns,
1092 layer,
1093 &text,
1094 &source_a,
1095 &chunk_hash,
1096 ));
1097 }
1098 }
1099 for layer in 0u8..4 {
1101 let text = format!("source-B layer-{layer}");
1102 let chunk_hash = compute_content_hash(&text);
1103 docs.push(doc(
1104 &format!("b-l{layer}"),
1105 ns,
1106 layer,
1107 &text,
1108 &source_b,
1109 &chunk_hash,
1110 ));
1111 }
1112 storage
1113 .add_to_store(docs)
1114 .await
1115 .expect("seed dedup fixture");
1116
1117 let result = deduplicate_documents(
1118 &storage,
1119 Some(ns),
1120 true,
1121 KeepStrategy::Oldest,
1122 false,
1123 DedupGroupBy::SourceHashLayer,
1124 )
1125 .await
1126 .expect("dedup");
1127
1128 assert_eq!(result.total_docs, 12);
1129 assert_eq!(result.duplicate_groups, 4, "one group per onion layer");
1130 assert_eq!(
1131 result.duplicates_removed, 4,
1132 "remove one variant per layer, keep the other"
1133 );
1134 assert_eq!(
1135 result.docs_without_hash, 0,
1136 "every chunk has source_hash populated"
1137 );
1138 assert_eq!(result.unique_docs, 4 + 4);
1140 assert!(
1143 result.groups.iter().all(|g| g.group_key.contains("|layer")),
1144 "source-hash-layer keys must encode layer: {:?}",
1145 result
1146 .groups
1147 .iter()
1148 .map(|g| &g.group_key)
1149 .collect::<Vec<_>>()
1150 );
1151 }
1152
1153 #[tokio::test]
1158 async fn content_hash_grouping_finds_zero_duplicates_on_fresh_onion() {
1159 let tmp = TempDir::new().expect("temp dir");
1160 let db_path = tmp.path().join("db");
1161 let storage = StorageManager::new_lance_only(db_path.to_str().expect("utf-8 temp db path"))
1162 .await
1163 .expect("storage");
1164 storage.ensure_collection().await.expect("collection");
1165
1166 let ns = "kb:transcripts-test";
1167 let source_a = compute_content_hash("source document A");
1168 let mut docs = Vec::new();
1169 for layer in 0u8..4 {
1170 let text = format!("source-A unique-layer-{layer}");
1172 let chunk_hash = compute_content_hash(&text);
1173 docs.push(doc(
1174 &format!("a-l{layer}"),
1175 ns,
1176 layer,
1177 &text,
1178 &source_a,
1179 &chunk_hash,
1180 ));
1181 }
1182 storage
1183 .add_to_store(docs)
1184 .await
1185 .expect("seed unique chunks");
1186
1187 let result = deduplicate_documents(
1188 &storage,
1189 Some(ns),
1190 true,
1191 KeepStrategy::Oldest,
1192 false,
1193 DedupGroupBy::ContentHash,
1194 )
1195 .await
1196 .expect("dedup");
1197
1198 assert_eq!(result.total_docs, 4);
1199 assert_eq!(
1200 result.duplicate_groups, 0,
1201 "post-P0 each chunk has unique content_hash, legacy grouping must find none"
1202 );
1203 assert_eq!(result.duplicates_removed, 0);
1204 }
1205
1206 #[test]
1207 fn dedup_group_by_parses_known_aliases_and_falls_back_to_default() {
1208 assert_eq!(
1209 DedupGroupBy::parse("source-hash-layer"),
1210 DedupGroupBy::SourceHashLayer
1211 );
1212 assert_eq!(DedupGroupBy::parse("source-hash"), DedupGroupBy::SourceHash);
1213 assert_eq!(
1214 DedupGroupBy::parse("source_hash"),
1215 DedupGroupBy::SourceHash,
1216 "underscore form accepted as alias"
1217 );
1218 assert_eq!(
1219 DedupGroupBy::parse("content-hash"),
1220 DedupGroupBy::ContentHash
1221 );
1222 assert_eq!(
1223 DedupGroupBy::parse("content_hash"),
1224 DedupGroupBy::ContentHash
1225 );
1226 assert_eq!(DedupGroupBy::parse(""), DedupGroupBy::SourceHashLayer);
1229 assert_eq!(DedupGroupBy::parse("nope"), DedupGroupBy::SourceHashLayer);
1230 }
1231}