1use anyhow::{Result, anyhow};
2use async_stream::try_stream;
3use axum::body::Bytes;
4use futures::{Stream, StreamExt};
5use memex_contracts::progress::{ReindexProgress, ReprocessProgress};
6use serde::{Deserialize, Serialize};
7use serde_json::{Map, Value, json};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
12
13use crate::{
14 ChromaDocument, ChunkerKind, PreprocessingConfig, Preprocessor, RAGPipeline, SliceMode,
15 StorageManager, compute_content_hash, detect_default_chunker, path_utils,
16};
17
18const EXPORT_PAGE_SIZE: usize = 5_000;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ExportRecord {
23 pub id: String,
24 pub text: String,
25 pub metadata: Value,
26 pub content_hash: Option<String>,
27 #[serde(skip_serializing_if = "Option::is_none")]
28 pub embeddings: Option<Vec<f32>>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
32pub struct ReprocessOutcome {
33 pub target_namespace: String,
34 pub source_label: String,
35 pub source_records: usize,
36 pub canonical_documents: usize,
37 pub indexed_documents: usize,
38 pub replaced_documents: usize,
39 pub skipped_existing_documents: usize,
40 pub skipped_empty_documents: usize,
41 pub skipped_preprocess_short_documents: usize,
42 pub failed_documents: usize,
43 pub failed_ids: Vec<String>,
44 pub parse_errors: usize,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
48pub struct ReindexOutcome {
49 pub source_namespace: String,
50 pub target_namespace: String,
51 pub source_records: usize,
52 pub canonical_documents: usize,
53 pub indexed_documents: usize,
54 pub replaced_documents: usize,
55 pub skipped_documents: usize,
56 pub failed_documents: usize,
57 pub failed_ids: Vec<String>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
61pub struct ImportOutcome {
62 pub imported_count: usize,
63 pub skipped_count: usize,
64 pub error_count: usize,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
68pub struct NamespaceMigrationOutcome {
69 pub from_namespace: String,
70 pub to_namespace: String,
71 pub migrated_chunks: usize,
72}
73
74#[derive(Debug, Clone)]
75pub struct ReprocessJob {
76 pub input_path: PathBuf,
77 pub target_namespace: String,
78 pub slice_mode: SliceMode,
79 pub chunker: Option<ChunkerKind>,
80 pub preprocess: bool,
81 pub skip_existing: bool,
82 pub allow_duplicates: bool,
83 pub dry_run: bool,
84}
85
86#[derive(Debug, Clone)]
87pub struct ReindexJob {
88 pub source_namespace: String,
89 pub target_namespace: String,
90 pub slice_mode: SliceMode,
91 pub chunker: Option<ChunkerKind>,
92 pub preprocess: bool,
93 pub skip_existing: bool,
94 pub allow_duplicates: bool,
95 pub dry_run: bool,
96}
97
98#[derive(Debug, Clone)]
99struct ReprocessDocument {
100 canonical_id: String,
101 source_record_id: String,
102 text: String,
103 metadata: Value,
104 source_text_hash: String,
105 collapsed_records: usize,
106}
107
108#[derive(Debug, Clone)]
109struct RebuildPlan {
110 namespace: String,
111 source_label: String,
112 source_records: usize,
113 docs: Vec<ReprocessDocument>,
114 slice_mode: SliceMode,
115 chunker: Option<ChunkerKind>,
116 preprocess: bool,
117 skip_existing: bool,
118 allow_duplicates: bool,
119 dry_run: bool,
120 parse_errors: usize,
121}
122
123#[derive(Debug, Clone, Default)]
124struct RebuildProgress {
125 processed_documents: usize,
126 indexed_documents: usize,
127 skipped_documents: usize,
128 failed_documents: usize,
129}
130
131#[derive(Debug, Clone, Default)]
132struct RebuildStats {
133 indexed_documents: usize,
134 replaced_documents: usize,
135 skipped_existing_documents: usize,
136 skipped_empty_documents: usize,
137 skipped_preprocess_short_documents: usize,
138 failed_ids: Vec<String>,
139}
140
141pub fn default_reindexed_namespace(namespace: &str) -> String {
142 format!("{namespace}-reindexed")
143}
144
145pub fn export_namespace_jsonl_stream(
146 storage: Arc<StorageManager>,
147 namespace: String,
148 include_embeddings: bool,
149) -> impl Stream<Item = Result<String>> + Send + 'static {
150 try_stream! {
151 let mut offset = 0usize;
152 loop {
153 let page = storage
154 .all_documents_page(Some(&namespace), offset, EXPORT_PAGE_SIZE)
155 .await?;
156 let page_len = page.len();
157
158 for doc in page {
159 let record = ExportRecord {
160 id: doc.id,
161 text: doc.document,
162 metadata: doc.metadata,
163 content_hash: doc.content_hash,
164 embeddings: include_embeddings.then_some(doc.embedding),
165 };
166
167 let mut line = serde_json::to_string(&record)?;
168 line.push('\n');
169 yield line;
170 }
171
172 if page_len < EXPORT_PAGE_SIZE {
173 break;
174 }
175 offset += page_len;
176 }
177 }
178}
179
180pub async fn import_jsonl_file(
181 rag: Arc<RAGPipeline>,
182 namespace: String,
183 input: &Path,
184 skip_existing: bool,
185) -> Result<ImportOutcome> {
186 let (_validated, file) = path_utils::safe_open_file_async(input).await?;
187 let reader = BufReader::new(file);
188 import_jsonl_reader(rag, namespace, skip_existing, reader).await
189}
190
191pub async fn import_jsonl_reader<R>(
192 rag: Arc<RAGPipeline>,
193 namespace: String,
194 skip_existing: bool,
195 reader: R,
196) -> Result<ImportOutcome>
197where
198 R: AsyncBufRead + Unpin,
199{
200 let storage = rag.storage_manager();
201 let mut outcome = ImportOutcome::default();
202 let mut lines = reader.lines();
203
204 while let Some(line) = lines.next_line().await? {
205 if process_import_line(
206 rag.as_ref(),
207 storage.as_ref(),
208 &namespace,
209 skip_existing,
210 line.as_bytes(),
211 &mut outcome,
212 )
213 .await
214 .is_err()
215 {
216 outcome.error_count += 1;
217 }
218 }
219
220 Ok(outcome)
221}
222
223pub async fn import_jsonl_bytes_stream<S, E>(
224 rag: Arc<RAGPipeline>,
225 namespace: String,
226 skip_existing: bool,
227 mut stream: S,
228) -> Result<ImportOutcome>
229where
230 S: Stream<Item = std::result::Result<Bytes, E>> + Unpin,
231 E: std::fmt::Display,
232{
233 let storage = rag.storage_manager();
234 let mut outcome = ImportOutcome::default();
235 let mut buffer = Vec::new();
236
237 while let Some(chunk) = stream.next().await {
238 let chunk = chunk.map_err(|err| anyhow!("multipart stream error: {err}"))?;
239 buffer.extend_from_slice(&chunk);
240
241 while let Some(position) = buffer.iter().position(|byte| *byte == b'\n') {
242 let line = buffer.drain(..=position).collect::<Vec<_>>();
243 if process_import_line(
244 rag.as_ref(),
245 storage.as_ref(),
246 &namespace,
247 skip_existing,
248 &line,
249 &mut outcome,
250 )
251 .await
252 .is_err()
253 {
254 outcome.error_count += 1;
255 }
256 }
257 }
258
259 if !buffer.is_empty()
260 && process_import_line(
261 rag.as_ref(),
262 storage.as_ref(),
263 &namespace,
264 skip_existing,
265 &buffer,
266 &mut outcome,
267 )
268 .await
269 .is_err()
270 {
271 outcome.error_count += 1;
272 }
273
274 Ok(outcome)
275}
276
277pub async fn migrate_namespace_atomic(
278 storage: &StorageManager,
279 from: &str,
280 to: &str,
281) -> Result<NamespaceMigrationOutcome> {
282 let migrated_chunks = storage.rename_namespace_atomic(from, to).await?;
283 Ok(NamespaceMigrationOutcome {
284 from_namespace: from.to_string(),
285 to_namespace: to.to_string(),
286 migrated_chunks,
287 })
288}
289
290pub async fn reprocess_jsonl_file<F>(
291 rag: Arc<RAGPipeline>,
292 job: ReprocessJob,
293 mut on_progress: F,
294) -> Result<ReprocessOutcome>
295where
296 F: FnMut(ReprocessProgress),
297{
298 let ReprocessJob {
299 input_path,
300 target_namespace,
301 slice_mode,
302 chunker,
303 preprocess,
304 skip_existing,
305 allow_duplicates,
306 dry_run,
307 } = job;
308 let (_validated, content) = path_utils::safe_read_to_string_async(&input_path).await?;
309 let (records, parse_errors) = parse_export_records(&content);
310
311 if records.is_empty() {
312 return Ok(ReprocessOutcome {
313 target_namespace,
314 source_label: input_path.display().to_string(),
315 parse_errors,
316 ..ReprocessOutcome::default()
317 });
318 }
319
320 let source_records = records.len();
321 let docs = collapse_export_records(records);
322 let source_label = input_path.display().to_string();
323 let canonical_documents = docs.len();
324
325 let stats = run_rebuild_documents(
326 rag,
327 RebuildPlan {
328 namespace: target_namespace.clone(),
329 source_label: source_label.clone(),
330 source_records,
331 docs,
332 slice_mode,
333 chunker,
334 preprocess,
335 skip_existing,
336 allow_duplicates,
337 dry_run,
338 parse_errors,
339 },
340 |progress| {
341 on_progress(ReprocessProgress {
342 source_label: source_label.clone(),
343 processed_documents: progress.processed_documents,
344 indexed_documents: progress.indexed_documents,
345 skipped_documents: progress.skipped_documents,
346 failed_documents: progress.failed_documents,
347 });
348 },
349 )
350 .await?;
351
352 Ok(ReprocessOutcome {
353 target_namespace,
354 source_label,
355 source_records,
356 canonical_documents,
357 indexed_documents: stats.indexed_documents,
358 replaced_documents: stats.replaced_documents,
359 skipped_existing_documents: stats.skipped_existing_documents,
360 skipped_empty_documents: stats.skipped_empty_documents,
361 skipped_preprocess_short_documents: stats.skipped_preprocess_short_documents,
362 failed_documents: stats.failed_ids.len(),
363 failed_ids: stats.failed_ids,
364 parse_errors,
365 })
366}
367
368pub async fn reindex_namespace<F>(
369 rag: Arc<RAGPipeline>,
370 job: ReindexJob,
371 mut on_progress: F,
372) -> Result<ReindexOutcome>
373where
374 F: FnMut(ReindexProgress),
375{
376 let ReindexJob {
377 source_namespace,
378 target_namespace,
379 slice_mode,
380 chunker,
381 preprocess,
382 skip_existing,
383 allow_duplicates,
384 dry_run,
385 } = job;
386 if source_namespace == target_namespace {
387 return Err(anyhow!(
388 "Source and target namespace are the same ('{}'). Use a different target namespace.",
389 source_namespace
390 ));
391 }
392
393 let storage = rag.storage_manager();
394 let target_count = storage.count_namespace(&target_namespace).await?;
395 if target_count > 0 && !skip_existing {
396 return Err(anyhow!(
397 "Target namespace '{}' already contains {} documents. Pass skip_existing to resume, or use a fresh target namespace.",
398 target_namespace,
399 target_count
400 ));
401 }
402
403 let mut records = Vec::new();
404 let mut offset = 0usize;
405 loop {
406 let page = storage
407 .all_documents_page(Some(&source_namespace), offset, EXPORT_PAGE_SIZE)
408 .await?;
409 let page_len = page.len();
410
411 for doc in page {
412 records.push(ExportRecord {
413 id: doc.id,
414 text: doc.document,
415 metadata: doc.metadata,
416 content_hash: doc.content_hash,
417 embeddings: None,
418 });
419 }
420
421 if page_len < EXPORT_PAGE_SIZE {
422 break;
423 }
424 offset += page_len;
425 }
426
427 if records.is_empty() {
428 return Ok(ReindexOutcome {
429 source_namespace,
430 target_namespace,
431 ..ReindexOutcome::default()
432 });
433 }
434
435 let source_records = records.len();
436 let docs = collapse_export_records(records);
437 let canonical_documents = docs.len();
438 let progress_namespace = target_namespace.clone();
439
440 let stats = run_rebuild_documents(
441 rag,
442 RebuildPlan {
443 namespace: target_namespace.clone(),
444 source_label: format!("namespace:{source_namespace}"),
445 source_records,
446 docs,
447 slice_mode,
448 chunker,
449 preprocess,
450 skip_existing,
451 allow_duplicates,
452 dry_run,
453 parse_errors: 0,
454 },
455 |progress| {
456 on_progress(ReindexProgress {
457 namespace: progress_namespace.clone(),
458 total_files: canonical_documents,
459 processed_files: progress.processed_documents,
460 indexed_files: progress.indexed_documents,
461 skipped_files: progress.skipped_documents,
462 failed_files: progress.failed_documents,
463 total_chunks: source_records,
464 });
465 },
466 )
467 .await?;
468
469 Ok(ReindexOutcome {
470 source_namespace,
471 target_namespace,
472 source_records,
473 canonical_documents,
474 indexed_documents: stats.indexed_documents,
475 replaced_documents: stats.replaced_documents,
476 skipped_documents: stats.skipped_existing_documents
477 + stats.skipped_empty_documents
478 + stats.skipped_preprocess_short_documents,
479 failed_documents: stats.failed_ids.len(),
480 failed_ids: stats.failed_ids,
481 })
482}
483
484fn preferred_reprocess_id(record: &ExportRecord) -> String {
485 record
486 .metadata
487 .get("original_id")
488 .and_then(Value::as_str)
489 .filter(|value| !value.trim().is_empty())
490 .or_else(|| {
491 record
492 .metadata
493 .get("doc_id")
494 .and_then(Value::as_str)
495 .filter(|value| !value.trim().is_empty())
496 })
497 .map(ToOwned::to_owned)
498 .unwrap_or_else(|| record.id.clone())
499}
500
501fn reprocess_layer_rank(metadata: &Value) -> u8 {
502 match metadata.get("layer").and_then(Value::as_str) {
503 Some("core") => 4,
504 Some("inner") => 3,
505 Some("middle") => 2,
506 Some("outer") => 1,
507 _ => 0,
508 }
509}
510
511fn should_replace_reprocess_candidate(
512 current: &ReprocessDocument,
513 candidate: &ReprocessDocument,
514) -> bool {
515 let current_rank = (current.text.len(), reprocess_layer_rank(¤t.metadata));
516 let candidate_rank = (
517 candidate.text.len(),
518 reprocess_layer_rank(&candidate.metadata),
519 );
520 candidate_rank > current_rank
521}
522
523fn collapse_export_records(records: Vec<ExportRecord>) -> Vec<ReprocessDocument> {
524 let mut grouped: HashMap<String, ReprocessDocument> = HashMap::new();
525
526 for record in records {
527 let text = record.text.trim();
528 if text.is_empty() {
529 continue;
530 }
531
532 let candidate = ReprocessDocument {
533 canonical_id: preferred_reprocess_id(&record),
534 source_record_id: record.id,
535 text: text.to_string(),
536 metadata: record.metadata,
537 source_text_hash: compute_content_hash(text),
538 collapsed_records: 1,
539 };
540
541 match grouped.entry(candidate.canonical_id.clone()) {
542 std::collections::hash_map::Entry::Vacant(entry) => {
543 entry.insert(candidate);
544 }
545 std::collections::hash_map::Entry::Occupied(mut entry) => {
546 let total_records = entry.get().collapsed_records + 1;
547 if should_replace_reprocess_candidate(entry.get(), &candidate) {
548 let mut replacement = candidate;
549 replacement.collapsed_records = total_records;
550 entry.insert(replacement);
551 } else {
552 entry.get_mut().collapsed_records = total_records;
553 }
554 }
555 }
556 }
557
558 let mut docs: Vec<ReprocessDocument> = grouped.into_values().collect();
559 docs.sort_by(|left, right| left.canonical_id.cmp(&right.canonical_id));
560 docs
561}
562
563fn reprocess_slice_mode_name(slice_mode: SliceMode) -> &'static str {
564 match slice_mode {
565 SliceMode::Onion => "onion",
566 SliceMode::OnionFast => "onion-fast",
567 SliceMode::Flat => "flat",
568 }
569}
570
571struct ReprocessMetadataInput<'a> {
572 metadata: &'a Value,
573 source_record_id: &'a str,
574 source_text_hash: &'a str,
575 collapsed_records: usize,
576 slice_mode: SliceMode,
577 chunker: Option<ChunkerKind>,
578 namespace: &'a str,
579 source_label: &'a str,
580 preprocess: bool,
581}
582
583fn prepare_reprocess_metadata(input: ReprocessMetadataInput<'_>) -> Value {
584 let mut map = match input.metadata.clone() {
585 Value::Object(map) => map,
586 _ => Map::new(),
587 };
588
589 for key in [
590 "layer",
591 "parent_id",
592 "children_ids",
593 "original_id",
594 "slice_mode",
595 "chunker",
596 "content_hash",
597 ] {
598 map.remove(key);
599 }
600
601 let selected_chunker = input
602 .chunker
603 .unwrap_or_else(|| detect_default_chunker(Path::new(input.source_label), input.namespace));
604 map.insert(
605 "slice_mode".to_string(),
606 json!(reprocess_slice_mode_name(input.slice_mode)),
607 );
608 map.insert("chunker".to_string(), json!(selected_chunker.name()));
609 map.insert(
610 "reprocess_source_record_id".to_string(),
611 json!(input.source_record_id),
612 );
613 map.insert(
614 "reprocess_source_hash".to_string(),
615 json!(input.source_text_hash),
616 );
617 map.insert(
618 "reprocess_collapsed_records".to_string(),
619 json!(input.collapsed_records),
620 );
621 map.insert("reprocess_source".to_string(), json!(input.source_label));
622 if input.preprocess {
623 map.insert("reprocess_preprocessed".to_string(), json!(true));
624 }
625
626 Value::Object(map)
627}
628
629fn parse_export_records(content: &str) -> (Vec<ExportRecord>, usize) {
630 let mut parse_errors = 0usize;
631 let mut records = Vec::new();
632
633 for line in content.lines() {
634 let trimmed = line.trim();
635 if trimmed.is_empty() {
636 continue;
637 }
638
639 match serde_json::from_str::<ExportRecord>(trimmed) {
640 Ok(record) => records.push(record),
641 Err(_) => {
642 parse_errors += 1;
643 }
644 }
645 }
646
647 (records, parse_errors)
648}
649
650async fn process_import_line(
651 rag: &RAGPipeline,
652 storage: &StorageManager,
653 namespace: &str,
654 skip_existing: bool,
655 raw_line: &[u8],
656 outcome: &mut ImportOutcome,
657) -> Result<()> {
658 let text = std::str::from_utf8(raw_line)?.trim();
659 if text.is_empty() {
660 return Ok(());
661 }
662
663 let record: ExportRecord = serde_json::from_str(text)?;
664 let content_hash = record
665 .content_hash
666 .clone()
667 .unwrap_or_else(|| compute_content_hash(&record.text));
668
669 if skip_existing && storage.has_content_hash(namespace, &content_hash).await? {
670 outcome.skipped_count += 1;
671 return Ok(());
672 }
673
674 if let Some(embedding) = record.embeddings {
675 let document = ChromaDocument::new_flat_with_hash(
676 record.id,
677 namespace.to_string(),
678 embedding,
679 record.metadata,
680 record.text,
681 content_hash,
682 );
683 storage.add_to_store(vec![document]).await?;
684 } else {
685 rag.memory_upsert(namespace, record.id, record.text, record.metadata)
686 .await?;
687 }
688
689 outcome.imported_count += 1;
690 Ok(())
691}
692
693async fn run_rebuild_documents<F>(
694 rag: Arc<RAGPipeline>,
695 plan: RebuildPlan,
696 mut emit_progress: F,
697) -> Result<RebuildStats>
698where
699 F: FnMut(&RebuildProgress),
700{
701 let RebuildPlan {
702 namespace,
703 source_label,
704 source_records,
705 docs,
706 slice_mode,
707 chunker,
708 preprocess,
709 skip_existing,
710 allow_duplicates,
711 dry_run,
712 parse_errors: _parse_errors,
713 } = plan;
714
715 if docs.is_empty() {
716 return Ok(RebuildStats::default());
717 }
718
719 if dry_run {
720 return Ok(RebuildStats::default());
721 }
722
723 rag.embedding_healthcheck().await?;
724
725 let preprocessor = preprocess.then(|| Preprocessor::new(PreprocessingConfig::default()));
726 let min_length = PreprocessingConfig::default().min_content_length;
727 let storage = rag.storage_manager();
728 let mut stats = RebuildStats::default();
729 let mut progress = RebuildProgress::default();
730
731 for (idx, doc) in docs.iter().enumerate() {
732 if !allow_duplicates
733 && storage
734 .has_source_hash(&namespace, &doc.source_text_hash)
735 .await?
736 {
737 tracing::info!(
738 "Skip duplicate source during rebuild: {}#{} (source_hash {})",
739 source_label,
740 doc.source_record_id,
741 &doc.source_text_hash[..16]
742 );
743 stats.skipped_existing_documents += 1;
744 progress.processed_documents = idx + 1;
745 progress.skipped_documents = stats.skipped_existing_documents
746 + stats.skipped_empty_documents
747 + stats.skipped_preprocess_short_documents;
748 progress.failed_documents = stats.failed_ids.len();
749 progress.indexed_documents = stats.indexed_documents;
750 emit_progress(&progress);
751 continue;
752 }
753
754 let existing = rag.lookup_memory(&namespace, &doc.canonical_id).await?;
755 if let Some(existing_doc) = existing.as_ref()
756 && skip_existing
757 && existing_doc
758 .metadata
759 .get("reprocess_source_hash")
760 .and_then(Value::as_str)
761 == Some(doc.source_text_hash.as_str())
762 {
763 stats.skipped_existing_documents += 1;
764 progress.processed_documents = idx + 1;
765 progress.skipped_documents = stats.skipped_existing_documents
766 + stats.skipped_empty_documents
767 + stats.skipped_preprocess_short_documents;
768 progress.failed_documents = stats.failed_ids.len();
769 progress.indexed_documents = stats.indexed_documents;
770 emit_progress(&progress);
771 continue;
772 }
773
774 let text = if let Some(preprocessor) = &preprocessor {
775 preprocessor.extract_semantic_content(&doc.text)
776 } else {
777 doc.text.clone()
778 };
779
780 if text.trim().is_empty() {
781 stats.skipped_empty_documents += 1;
782 progress.processed_documents = idx + 1;
783 progress.skipped_documents = stats.skipped_existing_documents
784 + stats.skipped_empty_documents
785 + stats.skipped_preprocess_short_documents;
786 progress.failed_documents = stats.failed_ids.len();
787 progress.indexed_documents = stats.indexed_documents;
788 emit_progress(&progress);
789 continue;
790 }
791
792 if preprocess && text.trim().len() < min_length {
793 stats.skipped_preprocess_short_documents += 1;
794 progress.processed_documents = idx + 1;
795 progress.skipped_documents = stats.skipped_existing_documents
796 + stats.skipped_empty_documents
797 + stats.skipped_preprocess_short_documents;
798 progress.failed_documents = stats.failed_ids.len();
799 progress.indexed_documents = stats.indexed_documents;
800 emit_progress(&progress);
801 continue;
802 }
803
804 let metadata = prepare_reprocess_metadata(ReprocessMetadataInput {
805 metadata: &doc.metadata,
806 source_record_id: &doc.source_record_id,
807 source_text_hash: &doc.source_text_hash,
808 collapsed_records: doc.collapsed_records,
809 slice_mode,
810 chunker,
811 namespace: &namespace,
812 source_label: &source_label,
813 preprocess,
814 });
815
816 if existing.is_some() {
817 stats.replaced_documents += 1;
818 }
819
820 match rag
821 .memory_upsert(&namespace, doc.canonical_id.clone(), text, metadata)
822 .await
823 {
824 Ok(()) => {
825 stats.indexed_documents += 1;
826 }
827 Err(err) => {
828 tracing::warn!(
829 "Failed to rebuild {}#{} into namespace '{}': {}",
830 source_label,
831 doc.canonical_id,
832 namespace,
833 err
834 );
835 eprintln!(
836 "Failed to rebuild {}#{} into namespace '{}': {}",
837 source_label, doc.canonical_id, namespace, err
838 );
839 stats.failed_ids.push(doc.canonical_id.clone());
840 }
841 }
842
843 progress.processed_documents = idx + 1;
844 progress.indexed_documents = stats.indexed_documents;
845 progress.skipped_documents = stats.skipped_existing_documents
846 + stats.skipped_empty_documents
847 + stats.skipped_preprocess_short_documents;
848 progress.failed_documents = stats.failed_ids.len();
849 emit_progress(&progress);
850 }
851
852 let _ = source_records;
853 Ok(stats)
854}
855
856#[cfg(test)]
857mod tests {
858 use super::*;
859
860 #[test]
861 fn collapse_export_records_prefers_core_slice_for_same_original_id() {
862 let records = vec![
863 ExportRecord {
864 id: "outer-1".to_string(),
865 text: "Short summary".to_string(),
866 metadata: json!({
867 "original_id": "doc-1",
868 "layer": "outer",
869 "project": "vista"
870 }),
871 content_hash: Some("outer-hash".to_string()),
872 embeddings: None,
873 },
874 ExportRecord {
875 id: "core-1".to_string(),
876 text: "Longer full document content that should win during reprocess collapse."
877 .to_string(),
878 metadata: json!({
879 "original_id": "doc-1",
880 "layer": "core",
881 "project": "vista"
882 }),
883 content_hash: Some("core-hash".to_string()),
884 embeddings: None,
885 },
886 ];
887
888 let docs = collapse_export_records(records);
889 assert_eq!(docs.len(), 1);
890 assert_eq!(docs[0].canonical_id, "doc-1");
891 assert_eq!(docs[0].source_record_id, "core-1");
892 assert_eq!(docs[0].collapsed_records, 2);
893 assert!(docs[0].text.contains("full document content"));
894 }
895
896 #[test]
897 fn collapse_export_records_falls_back_to_doc_id_then_record_id() {
898 let records = vec![
899 ExportRecord {
900 id: "record-a".to_string(),
901 text: "alpha".to_string(),
902 metadata: json!({"doc_id": "doc-a"}),
903 content_hash: None,
904 embeddings: None,
905 },
906 ExportRecord {
907 id: "record-b".to_string(),
908 text: "beta".to_string(),
909 metadata: json!({}),
910 content_hash: None,
911 embeddings: None,
912 },
913 ];
914
915 let docs = collapse_export_records(records);
916 assert_eq!(docs.len(), 2);
917 assert_eq!(docs[0].canonical_id, "doc-a");
918 assert_eq!(docs[1].canonical_id, "record-b");
919 }
920
921 #[test]
922 fn prepare_reprocess_metadata_replaces_stale_chunk_fields() {
923 let metadata = json!({
924 "original_id": "old-doc",
925 "layer": "outer",
926 "slice_mode": "onion",
927 "content_hash": "stale",
928 "project": "vista"
929 });
930
931 let prepared = prepare_reprocess_metadata(ReprocessMetadataInput {
932 metadata: &metadata,
933 source_record_id: "core-1",
934 source_text_hash: "fresh-hash",
935 collapsed_records: 4,
936 slice_mode: SliceMode::OnionFast,
937 chunker: Some(ChunkerKind::Onion),
938 namespace: "kb:test",
939 source_label: "legacy.jsonl",
940 preprocess: true,
941 });
942
943 assert_eq!(prepared["project"], "vista");
944 assert_eq!(prepared["slice_mode"], "onion-fast");
945 assert_eq!(prepared["chunker"], "onion");
946 assert_eq!(prepared["reprocess_source_record_id"], "core-1");
947 assert_eq!(prepared["reprocess_source_hash"], "fresh-hash");
948 assert_eq!(prepared["reprocess_collapsed_records"], 4);
949 assert_eq!(prepared["reprocess_source"], "legacy.jsonl");
950 assert!(prepared["reprocess_preprocessed"].as_bool().unwrap());
951 assert!(prepared.get("layer").is_none());
952 assert!(prepared.get("original_id").is_none());
953 assert!(prepared.get("content_hash").is_none());
954 }
955
956 #[test]
957 fn default_reindexed_namespace_appends_suffix() {
958 assert_eq!(
959 default_reindexed_namespace("kodowanie"),
960 "kodowanie-reindexed"
961 );
962 }
963}