Skip to main content

rust_memex/
lifecycle.rs

1use anyhow::{Result, anyhow};
2use async_stream::try_stream;
3use axum::body::Bytes;
4use futures::{Stream, StreamExt};
5use memex_contracts::progress::{ReindexProgress, ReprocessProgress};
6use serde::{Deserialize, Serialize};
7use serde_json::{Map, Value, json};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
12
13use crate::{
14    ChromaDocument, ChunkerKind, PreprocessingConfig, Preprocessor, RAGPipeline, SliceMode,
15    StorageManager, compute_content_hash, detect_default_chunker, path_utils,
16};
17
18const EXPORT_PAGE_SIZE: usize = 5_000;
19
20/// JSONL export record structure shared across CLI and HTTP lifecycle flows.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ExportRecord {
23    pub id: String,
24    pub text: String,
25    pub metadata: Value,
26    pub content_hash: Option<String>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub embeddings: Option<Vec<f32>>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
32pub struct ReprocessOutcome {
33    pub target_namespace: String,
34    pub source_label: String,
35    pub source_records: usize,
36    pub canonical_documents: usize,
37    pub indexed_documents: usize,
38    pub replaced_documents: usize,
39    pub skipped_existing_documents: usize,
40    pub skipped_empty_documents: usize,
41    pub skipped_preprocess_short_documents: usize,
42    pub failed_documents: usize,
43    pub failed_ids: Vec<String>,
44    pub parse_errors: usize,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
48pub struct ReindexOutcome {
49    pub source_namespace: String,
50    pub target_namespace: String,
51    pub source_records: usize,
52    pub canonical_documents: usize,
53    pub indexed_documents: usize,
54    pub replaced_documents: usize,
55    pub skipped_documents: usize,
56    pub failed_documents: usize,
57    pub failed_ids: Vec<String>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
61pub struct ImportOutcome {
62    pub imported_count: usize,
63    pub skipped_count: usize,
64    pub error_count: usize,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
68pub struct NamespaceMigrationOutcome {
69    pub from_namespace: String,
70    pub to_namespace: String,
71    pub migrated_chunks: usize,
72}
73
74#[derive(Debug, Clone)]
75pub struct ReprocessJob {
76    pub input_path: PathBuf,
77    pub target_namespace: String,
78    pub slice_mode: SliceMode,
79    pub chunker: Option<ChunkerKind>,
80    pub preprocess: bool,
81    pub skip_existing: bool,
82    pub allow_duplicates: bool,
83    pub dry_run: bool,
84}
85
86#[derive(Debug, Clone)]
87pub struct ReindexJob {
88    pub source_namespace: String,
89    pub target_namespace: String,
90    pub slice_mode: SliceMode,
91    pub chunker: Option<ChunkerKind>,
92    pub preprocess: bool,
93    pub skip_existing: bool,
94    pub allow_duplicates: bool,
95    pub dry_run: bool,
96}
97
98#[derive(Debug, Clone)]
99struct ReprocessDocument {
100    canonical_id: String,
101    source_record_id: String,
102    text: String,
103    metadata: Value,
104    source_text_hash: String,
105    collapsed_records: usize,
106}
107
108#[derive(Debug, Clone)]
109struct RebuildPlan {
110    namespace: String,
111    source_label: String,
112    source_records: usize,
113    docs: Vec<ReprocessDocument>,
114    slice_mode: SliceMode,
115    chunker: Option<ChunkerKind>,
116    preprocess: bool,
117    skip_existing: bool,
118    allow_duplicates: bool,
119    dry_run: bool,
120    parse_errors: usize,
121}
122
123#[derive(Debug, Clone, Default)]
124struct RebuildProgress {
125    processed_documents: usize,
126    indexed_documents: usize,
127    skipped_documents: usize,
128    failed_documents: usize,
129}
130
131#[derive(Debug, Clone, Default)]
132struct RebuildStats {
133    indexed_documents: usize,
134    replaced_documents: usize,
135    skipped_existing_documents: usize,
136    skipped_empty_documents: usize,
137    skipped_preprocess_short_documents: usize,
138    failed_ids: Vec<String>,
139}
140
141pub fn default_reindexed_namespace(namespace: &str) -> String {
142    format!("{namespace}-reindexed")
143}
144
145pub fn export_namespace_jsonl_stream(
146    storage: Arc<StorageManager>,
147    namespace: String,
148    include_embeddings: bool,
149) -> impl Stream<Item = Result<String>> + Send + 'static {
150    try_stream! {
151        let mut offset = 0usize;
152        loop {
153            let page = storage
154                .all_documents_page(Some(&namespace), offset, EXPORT_PAGE_SIZE)
155                .await?;
156            let page_len = page.len();
157
158            for doc in page {
159                let record = ExportRecord {
160                    id: doc.id,
161                    text: doc.document,
162                    metadata: doc.metadata,
163                    content_hash: doc.content_hash,
164                    embeddings: include_embeddings.then_some(doc.embedding),
165                };
166
167                let mut line = serde_json::to_string(&record)?;
168                line.push('\n');
169                yield line;
170            }
171
172            if page_len < EXPORT_PAGE_SIZE {
173                break;
174            }
175            offset += page_len;
176        }
177    }
178}
179
180pub async fn import_jsonl_file(
181    rag: Arc<RAGPipeline>,
182    namespace: String,
183    input: &Path,
184    skip_existing: bool,
185) -> Result<ImportOutcome> {
186    let (_validated, file) = path_utils::safe_open_file_async(input).await?;
187    let reader = BufReader::new(file);
188    import_jsonl_reader(rag, namespace, skip_existing, reader).await
189}
190
191pub async fn import_jsonl_reader<R>(
192    rag: Arc<RAGPipeline>,
193    namespace: String,
194    skip_existing: bool,
195    reader: R,
196) -> Result<ImportOutcome>
197where
198    R: AsyncBufRead + Unpin,
199{
200    let storage = rag.storage_manager();
201    let mut outcome = ImportOutcome::default();
202    let mut lines = reader.lines();
203
204    while let Some(line) = lines.next_line().await? {
205        if process_import_line(
206            rag.as_ref(),
207            storage.as_ref(),
208            &namespace,
209            skip_existing,
210            line.as_bytes(),
211            &mut outcome,
212        )
213        .await
214        .is_err()
215        {
216            outcome.error_count += 1;
217        }
218    }
219
220    Ok(outcome)
221}
222
223pub async fn import_jsonl_bytes_stream<S, E>(
224    rag: Arc<RAGPipeline>,
225    namespace: String,
226    skip_existing: bool,
227    mut stream: S,
228) -> Result<ImportOutcome>
229where
230    S: Stream<Item = std::result::Result<Bytes, E>> + Unpin,
231    E: std::fmt::Display,
232{
233    let storage = rag.storage_manager();
234    let mut outcome = ImportOutcome::default();
235    let mut buffer = Vec::new();
236
237    while let Some(chunk) = stream.next().await {
238        let chunk = chunk.map_err(|err| anyhow!("multipart stream error: {err}"))?;
239        buffer.extend_from_slice(&chunk);
240
241        while let Some(position) = buffer.iter().position(|byte| *byte == b'\n') {
242            let line = buffer.drain(..=position).collect::<Vec<_>>();
243            if process_import_line(
244                rag.as_ref(),
245                storage.as_ref(),
246                &namespace,
247                skip_existing,
248                &line,
249                &mut outcome,
250            )
251            .await
252            .is_err()
253            {
254                outcome.error_count += 1;
255            }
256        }
257    }
258
259    if !buffer.is_empty()
260        && process_import_line(
261            rag.as_ref(),
262            storage.as_ref(),
263            &namespace,
264            skip_existing,
265            &buffer,
266            &mut outcome,
267        )
268        .await
269        .is_err()
270    {
271        outcome.error_count += 1;
272    }
273
274    Ok(outcome)
275}
276
277pub async fn migrate_namespace_atomic(
278    storage: &StorageManager,
279    from: &str,
280    to: &str,
281) -> Result<NamespaceMigrationOutcome> {
282    let migrated_chunks = storage.rename_namespace_atomic(from, to).await?;
283    Ok(NamespaceMigrationOutcome {
284        from_namespace: from.to_string(),
285        to_namespace: to.to_string(),
286        migrated_chunks,
287    })
288}
289
290pub async fn reprocess_jsonl_file<F>(
291    rag: Arc<RAGPipeline>,
292    job: ReprocessJob,
293    mut on_progress: F,
294) -> Result<ReprocessOutcome>
295where
296    F: FnMut(ReprocessProgress),
297{
298    let ReprocessJob {
299        input_path,
300        target_namespace,
301        slice_mode,
302        chunker,
303        preprocess,
304        skip_existing,
305        allow_duplicates,
306        dry_run,
307    } = job;
308    let (_validated, content) = path_utils::safe_read_to_string_async(&input_path).await?;
309    let (records, parse_errors) = parse_export_records(&content);
310
311    if records.is_empty() {
312        return Ok(ReprocessOutcome {
313            target_namespace,
314            source_label: input_path.display().to_string(),
315            parse_errors,
316            ..ReprocessOutcome::default()
317        });
318    }
319
320    let source_records = records.len();
321    let docs = collapse_export_records(records);
322    let source_label = input_path.display().to_string();
323    let canonical_documents = docs.len();
324
325    let stats = run_rebuild_documents(
326        rag,
327        RebuildPlan {
328            namespace: target_namespace.clone(),
329            source_label: source_label.clone(),
330            source_records,
331            docs,
332            slice_mode,
333            chunker,
334            preprocess,
335            skip_existing,
336            allow_duplicates,
337            dry_run,
338            parse_errors,
339        },
340        |progress| {
341            on_progress(ReprocessProgress {
342                source_label: source_label.clone(),
343                processed_documents: progress.processed_documents,
344                indexed_documents: progress.indexed_documents,
345                skipped_documents: progress.skipped_documents,
346                failed_documents: progress.failed_documents,
347            });
348        },
349    )
350    .await?;
351
352    Ok(ReprocessOutcome {
353        target_namespace,
354        source_label,
355        source_records,
356        canonical_documents,
357        indexed_documents: stats.indexed_documents,
358        replaced_documents: stats.replaced_documents,
359        skipped_existing_documents: stats.skipped_existing_documents,
360        skipped_empty_documents: stats.skipped_empty_documents,
361        skipped_preprocess_short_documents: stats.skipped_preprocess_short_documents,
362        failed_documents: stats.failed_ids.len(),
363        failed_ids: stats.failed_ids,
364        parse_errors,
365    })
366}
367
368pub async fn reindex_namespace<F>(
369    rag: Arc<RAGPipeline>,
370    job: ReindexJob,
371    mut on_progress: F,
372) -> Result<ReindexOutcome>
373where
374    F: FnMut(ReindexProgress),
375{
376    let ReindexJob {
377        source_namespace,
378        target_namespace,
379        slice_mode,
380        chunker,
381        preprocess,
382        skip_existing,
383        allow_duplicates,
384        dry_run,
385    } = job;
386    if source_namespace == target_namespace {
387        return Err(anyhow!(
388            "Source and target namespace are the same ('{}'). Use a different target namespace.",
389            source_namespace
390        ));
391    }
392
393    let storage = rag.storage_manager();
394    let target_count = storage.count_namespace(&target_namespace).await?;
395    if target_count > 0 && !skip_existing {
396        return Err(anyhow!(
397            "Target namespace '{}' already contains {} documents. Pass skip_existing to resume, or use a fresh target namespace.",
398            target_namespace,
399            target_count
400        ));
401    }
402
403    let mut records = Vec::new();
404    let mut offset = 0usize;
405    loop {
406        let page = storage
407            .all_documents_page(Some(&source_namespace), offset, EXPORT_PAGE_SIZE)
408            .await?;
409        let page_len = page.len();
410
411        for doc in page {
412            records.push(ExportRecord {
413                id: doc.id,
414                text: doc.document,
415                metadata: doc.metadata,
416                content_hash: doc.content_hash,
417                embeddings: None,
418            });
419        }
420
421        if page_len < EXPORT_PAGE_SIZE {
422            break;
423        }
424        offset += page_len;
425    }
426
427    if records.is_empty() {
428        return Ok(ReindexOutcome {
429            source_namespace,
430            target_namespace,
431            ..ReindexOutcome::default()
432        });
433    }
434
435    let source_records = records.len();
436    let docs = collapse_export_records(records);
437    let canonical_documents = docs.len();
438    let progress_namespace = target_namespace.clone();
439
440    let stats = run_rebuild_documents(
441        rag,
442        RebuildPlan {
443            namespace: target_namespace.clone(),
444            source_label: format!("namespace:{source_namespace}"),
445            source_records,
446            docs,
447            slice_mode,
448            chunker,
449            preprocess,
450            skip_existing,
451            allow_duplicates,
452            dry_run,
453            parse_errors: 0,
454        },
455        |progress| {
456            on_progress(ReindexProgress {
457                namespace: progress_namespace.clone(),
458                total_files: canonical_documents,
459                processed_files: progress.processed_documents,
460                indexed_files: progress.indexed_documents,
461                skipped_files: progress.skipped_documents,
462                failed_files: progress.failed_documents,
463                total_chunks: source_records,
464            });
465        },
466    )
467    .await?;
468
469    Ok(ReindexOutcome {
470        source_namespace,
471        target_namespace,
472        source_records,
473        canonical_documents,
474        indexed_documents: stats.indexed_documents,
475        replaced_documents: stats.replaced_documents,
476        skipped_documents: stats.skipped_existing_documents
477            + stats.skipped_empty_documents
478            + stats.skipped_preprocess_short_documents,
479        failed_documents: stats.failed_ids.len(),
480        failed_ids: stats.failed_ids,
481    })
482}
483
484fn preferred_reprocess_id(record: &ExportRecord) -> String {
485    record
486        .metadata
487        .get("original_id")
488        .and_then(Value::as_str)
489        .filter(|value| !value.trim().is_empty())
490        .or_else(|| {
491            record
492                .metadata
493                .get("doc_id")
494                .and_then(Value::as_str)
495                .filter(|value| !value.trim().is_empty())
496        })
497        .map(ToOwned::to_owned)
498        .unwrap_or_else(|| record.id.clone())
499}
500
501fn reprocess_layer_rank(metadata: &Value) -> u8 {
502    match metadata.get("layer").and_then(Value::as_str) {
503        Some("core") => 4,
504        Some("inner") => 3,
505        Some("middle") => 2,
506        Some("outer") => 1,
507        _ => 0,
508    }
509}
510
511fn should_replace_reprocess_candidate(
512    current: &ReprocessDocument,
513    candidate: &ReprocessDocument,
514) -> bool {
515    let current_rank = (current.text.len(), reprocess_layer_rank(&current.metadata));
516    let candidate_rank = (
517        candidate.text.len(),
518        reprocess_layer_rank(&candidate.metadata),
519    );
520    candidate_rank > current_rank
521}
522
523fn collapse_export_records(records: Vec<ExportRecord>) -> Vec<ReprocessDocument> {
524    let mut grouped: HashMap<String, ReprocessDocument> = HashMap::new();
525
526    for record in records {
527        let text = record.text.trim();
528        if text.is_empty() {
529            continue;
530        }
531
532        let candidate = ReprocessDocument {
533            canonical_id: preferred_reprocess_id(&record),
534            source_record_id: record.id,
535            text: text.to_string(),
536            metadata: record.metadata,
537            source_text_hash: compute_content_hash(text),
538            collapsed_records: 1,
539        };
540
541        match grouped.entry(candidate.canonical_id.clone()) {
542            std::collections::hash_map::Entry::Vacant(entry) => {
543                entry.insert(candidate);
544            }
545            std::collections::hash_map::Entry::Occupied(mut entry) => {
546                let total_records = entry.get().collapsed_records + 1;
547                if should_replace_reprocess_candidate(entry.get(), &candidate) {
548                    let mut replacement = candidate;
549                    replacement.collapsed_records = total_records;
550                    entry.insert(replacement);
551                } else {
552                    entry.get_mut().collapsed_records = total_records;
553                }
554            }
555        }
556    }
557
558    let mut docs: Vec<ReprocessDocument> = grouped.into_values().collect();
559    docs.sort_by(|left, right| left.canonical_id.cmp(&right.canonical_id));
560    docs
561}
562
563fn reprocess_slice_mode_name(slice_mode: SliceMode) -> &'static str {
564    match slice_mode {
565        SliceMode::Onion => "onion",
566        SliceMode::OnionFast => "onion-fast",
567        SliceMode::Flat => "flat",
568    }
569}
570
571struct ReprocessMetadataInput<'a> {
572    metadata: &'a Value,
573    source_record_id: &'a str,
574    source_text_hash: &'a str,
575    collapsed_records: usize,
576    slice_mode: SliceMode,
577    chunker: Option<ChunkerKind>,
578    namespace: &'a str,
579    source_label: &'a str,
580    preprocess: bool,
581}
582
583fn prepare_reprocess_metadata(input: ReprocessMetadataInput<'_>) -> Value {
584    let mut map = match input.metadata.clone() {
585        Value::Object(map) => map,
586        _ => Map::new(),
587    };
588
589    for key in [
590        "layer",
591        "parent_id",
592        "children_ids",
593        "original_id",
594        "slice_mode",
595        "chunker",
596        "content_hash",
597    ] {
598        map.remove(key);
599    }
600
601    let selected_chunker = input
602        .chunker
603        .unwrap_or_else(|| detect_default_chunker(Path::new(input.source_label), input.namespace));
604    map.insert(
605        "slice_mode".to_string(),
606        json!(reprocess_slice_mode_name(input.slice_mode)),
607    );
608    map.insert("chunker".to_string(), json!(selected_chunker.name()));
609    map.insert(
610        "reprocess_source_record_id".to_string(),
611        json!(input.source_record_id),
612    );
613    map.insert(
614        "reprocess_source_hash".to_string(),
615        json!(input.source_text_hash),
616    );
617    map.insert(
618        "reprocess_collapsed_records".to_string(),
619        json!(input.collapsed_records),
620    );
621    map.insert("reprocess_source".to_string(), json!(input.source_label));
622    if input.preprocess {
623        map.insert("reprocess_preprocessed".to_string(), json!(true));
624    }
625
626    Value::Object(map)
627}
628
629fn parse_export_records(content: &str) -> (Vec<ExportRecord>, usize) {
630    let mut parse_errors = 0usize;
631    let mut records = Vec::new();
632
633    for line in content.lines() {
634        let trimmed = line.trim();
635        if trimmed.is_empty() {
636            continue;
637        }
638
639        match serde_json::from_str::<ExportRecord>(trimmed) {
640            Ok(record) => records.push(record),
641            Err(_) => {
642                parse_errors += 1;
643            }
644        }
645    }
646
647    (records, parse_errors)
648}
649
650async fn process_import_line(
651    rag: &RAGPipeline,
652    storage: &StorageManager,
653    namespace: &str,
654    skip_existing: bool,
655    raw_line: &[u8],
656    outcome: &mut ImportOutcome,
657) -> Result<()> {
658    let text = std::str::from_utf8(raw_line)?.trim();
659    if text.is_empty() {
660        return Ok(());
661    }
662
663    let record: ExportRecord = serde_json::from_str(text)?;
664    let content_hash = record
665        .content_hash
666        .clone()
667        .unwrap_or_else(|| compute_content_hash(&record.text));
668
669    if skip_existing && storage.has_content_hash(namespace, &content_hash).await? {
670        outcome.skipped_count += 1;
671        return Ok(());
672    }
673
674    if let Some(embedding) = record.embeddings {
675        let document = ChromaDocument::new_flat_with_hash(
676            record.id,
677            namespace.to_string(),
678            embedding,
679            record.metadata,
680            record.text,
681            content_hash,
682        );
683        storage.add_to_store(vec![document]).await?;
684    } else {
685        rag.memory_upsert(namespace, record.id, record.text, record.metadata)
686            .await?;
687    }
688
689    outcome.imported_count += 1;
690    Ok(())
691}
692
693async fn run_rebuild_documents<F>(
694    rag: Arc<RAGPipeline>,
695    plan: RebuildPlan,
696    mut emit_progress: F,
697) -> Result<RebuildStats>
698where
699    F: FnMut(&RebuildProgress),
700{
701    let RebuildPlan {
702        namespace,
703        source_label,
704        source_records,
705        docs,
706        slice_mode,
707        chunker,
708        preprocess,
709        skip_existing,
710        allow_duplicates,
711        dry_run,
712        parse_errors: _parse_errors,
713    } = plan;
714
715    if docs.is_empty() {
716        return Ok(RebuildStats::default());
717    }
718
719    if dry_run {
720        return Ok(RebuildStats::default());
721    }
722
723    rag.embedding_healthcheck().await?;
724
725    let preprocessor = preprocess.then(|| Preprocessor::new(PreprocessingConfig::default()));
726    let min_length = PreprocessingConfig::default().min_content_length;
727    let storage = rag.storage_manager();
728    let mut stats = RebuildStats::default();
729    let mut progress = RebuildProgress::default();
730
731    for (idx, doc) in docs.iter().enumerate() {
732        if !allow_duplicates
733            && storage
734                .has_source_hash(&namespace, &doc.source_text_hash)
735                .await?
736        {
737            tracing::info!(
738                "Skip duplicate source during rebuild: {}#{} (source_hash {})",
739                source_label,
740                doc.source_record_id,
741                &doc.source_text_hash[..16]
742            );
743            stats.skipped_existing_documents += 1;
744            progress.processed_documents = idx + 1;
745            progress.skipped_documents = stats.skipped_existing_documents
746                + stats.skipped_empty_documents
747                + stats.skipped_preprocess_short_documents;
748            progress.failed_documents = stats.failed_ids.len();
749            progress.indexed_documents = stats.indexed_documents;
750            emit_progress(&progress);
751            continue;
752        }
753
754        let existing = rag.lookup_memory(&namespace, &doc.canonical_id).await?;
755        if let Some(existing_doc) = existing.as_ref()
756            && skip_existing
757            && existing_doc
758                .metadata
759                .get("reprocess_source_hash")
760                .and_then(Value::as_str)
761                == Some(doc.source_text_hash.as_str())
762        {
763            stats.skipped_existing_documents += 1;
764            progress.processed_documents = idx + 1;
765            progress.skipped_documents = stats.skipped_existing_documents
766                + stats.skipped_empty_documents
767                + stats.skipped_preprocess_short_documents;
768            progress.failed_documents = stats.failed_ids.len();
769            progress.indexed_documents = stats.indexed_documents;
770            emit_progress(&progress);
771            continue;
772        }
773
774        let text = if let Some(preprocessor) = &preprocessor {
775            preprocessor.extract_semantic_content(&doc.text)
776        } else {
777            doc.text.clone()
778        };
779
780        if text.trim().is_empty() {
781            stats.skipped_empty_documents += 1;
782            progress.processed_documents = idx + 1;
783            progress.skipped_documents = stats.skipped_existing_documents
784                + stats.skipped_empty_documents
785                + stats.skipped_preprocess_short_documents;
786            progress.failed_documents = stats.failed_ids.len();
787            progress.indexed_documents = stats.indexed_documents;
788            emit_progress(&progress);
789            continue;
790        }
791
792        if preprocess && text.trim().len() < min_length {
793            stats.skipped_preprocess_short_documents += 1;
794            progress.processed_documents = idx + 1;
795            progress.skipped_documents = stats.skipped_existing_documents
796                + stats.skipped_empty_documents
797                + stats.skipped_preprocess_short_documents;
798            progress.failed_documents = stats.failed_ids.len();
799            progress.indexed_documents = stats.indexed_documents;
800            emit_progress(&progress);
801            continue;
802        }
803
804        let metadata = prepare_reprocess_metadata(ReprocessMetadataInput {
805            metadata: &doc.metadata,
806            source_record_id: &doc.source_record_id,
807            source_text_hash: &doc.source_text_hash,
808            collapsed_records: doc.collapsed_records,
809            slice_mode,
810            chunker,
811            namespace: &namespace,
812            source_label: &source_label,
813            preprocess,
814        });
815
816        if existing.is_some() {
817            stats.replaced_documents += 1;
818        }
819
820        match rag
821            .memory_upsert(&namespace, doc.canonical_id.clone(), text, metadata)
822            .await
823        {
824            Ok(()) => {
825                stats.indexed_documents += 1;
826            }
827            Err(err) => {
828                tracing::warn!(
829                    "Failed to rebuild {}#{} into namespace '{}': {}",
830                    source_label,
831                    doc.canonical_id,
832                    namespace,
833                    err
834                );
835                eprintln!(
836                    "Failed to rebuild {}#{} into namespace '{}': {}",
837                    source_label, doc.canonical_id, namespace, err
838                );
839                stats.failed_ids.push(doc.canonical_id.clone());
840            }
841        }
842
843        progress.processed_documents = idx + 1;
844        progress.indexed_documents = stats.indexed_documents;
845        progress.skipped_documents = stats.skipped_existing_documents
846            + stats.skipped_empty_documents
847            + stats.skipped_preprocess_short_documents;
848        progress.failed_documents = stats.failed_ids.len();
849        emit_progress(&progress);
850    }
851
852    let _ = source_records;
853    Ok(stats)
854}
855
856#[cfg(test)]
857mod tests {
858    use super::*;
859
860    #[test]
861    fn collapse_export_records_prefers_core_slice_for_same_original_id() {
862        let records = vec![
863            ExportRecord {
864                id: "outer-1".to_string(),
865                text: "Short summary".to_string(),
866                metadata: json!({
867                    "original_id": "doc-1",
868                    "layer": "outer",
869                    "project": "vista"
870                }),
871                content_hash: Some("outer-hash".to_string()),
872                embeddings: None,
873            },
874            ExportRecord {
875                id: "core-1".to_string(),
876                text: "Longer full document content that should win during reprocess collapse."
877                    .to_string(),
878                metadata: json!({
879                    "original_id": "doc-1",
880                    "layer": "core",
881                    "project": "vista"
882                }),
883                content_hash: Some("core-hash".to_string()),
884                embeddings: None,
885            },
886        ];
887
888        let docs = collapse_export_records(records);
889        assert_eq!(docs.len(), 1);
890        assert_eq!(docs[0].canonical_id, "doc-1");
891        assert_eq!(docs[0].source_record_id, "core-1");
892        assert_eq!(docs[0].collapsed_records, 2);
893        assert!(docs[0].text.contains("full document content"));
894    }
895
896    #[test]
897    fn collapse_export_records_falls_back_to_doc_id_then_record_id() {
898        let records = vec![
899            ExportRecord {
900                id: "record-a".to_string(),
901                text: "alpha".to_string(),
902                metadata: json!({"doc_id": "doc-a"}),
903                content_hash: None,
904                embeddings: None,
905            },
906            ExportRecord {
907                id: "record-b".to_string(),
908                text: "beta".to_string(),
909                metadata: json!({}),
910                content_hash: None,
911                embeddings: None,
912            },
913        ];
914
915        let docs = collapse_export_records(records);
916        assert_eq!(docs.len(), 2);
917        assert_eq!(docs[0].canonical_id, "doc-a");
918        assert_eq!(docs[1].canonical_id, "record-b");
919    }
920
921    #[test]
922    fn prepare_reprocess_metadata_replaces_stale_chunk_fields() {
923        let metadata = json!({
924            "original_id": "old-doc",
925            "layer": "outer",
926            "slice_mode": "onion",
927            "content_hash": "stale",
928            "project": "vista"
929        });
930
931        let prepared = prepare_reprocess_metadata(ReprocessMetadataInput {
932            metadata: &metadata,
933            source_record_id: "core-1",
934            source_text_hash: "fresh-hash",
935            collapsed_records: 4,
936            slice_mode: SliceMode::OnionFast,
937            chunker: Some(ChunkerKind::Onion),
938            namespace: "kb:test",
939            source_label: "legacy.jsonl",
940            preprocess: true,
941        });
942
943        assert_eq!(prepared["project"], "vista");
944        assert_eq!(prepared["slice_mode"], "onion-fast");
945        assert_eq!(prepared["chunker"], "onion");
946        assert_eq!(prepared["reprocess_source_record_id"], "core-1");
947        assert_eq!(prepared["reprocess_source_hash"], "fresh-hash");
948        assert_eq!(prepared["reprocess_collapsed_records"], 4);
949        assert_eq!(prepared["reprocess_source"], "legacy.jsonl");
950        assert!(prepared["reprocess_preprocessed"].as_bool().unwrap());
951        assert!(prepared.get("layer").is_none());
952        assert!(prepared.get("original_id").is_none());
953        assert!(prepared.get("content_hash").is_none());
954    }
955
956    #[test]
957    fn default_reindexed_namespace_appends_suffix() {
958        assert_eq!(
959            default_reindexed_namespace("kodowanie"),
960            "kodowanie-reindexed"
961        );
962    }
963}