Skip to main content

kbolt_core/engine/
update_ops.rs

1use super::*;
2use crate::retrieval_context::{
3    render_bm25_document, render_dense_document, ChunkRetrievalContext,
4};
5use kbolt_types::{UpdateDecision, UpdateDecisionKind};
6
7const CANONICAL_TEXT_GENERATION: u32 = 1;
8const CHUNKER_GENERATION: u32 = 2;
9
10impl Engine {
11    pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
12        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
13        self.update_unlocked(options)
14    }
15
16    pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
17        let _profile = crate::profile::UpdateProfileGuard::start();
18        let update_result = self.run_update_unlocked(options);
19        let flush_result = crate::profile::timed_update_stage("usearch_save_dirty", || {
20            self.storage.save_dirty_usearch_indexes()
21        });
22        finish_update_with_usearch_flush(update_result, flush_result)
23    }
24
25    fn run_update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
26        let started = Instant::now();
27        let mut report = UpdateReport {
28            scanned_docs: 0,
29            skipped_mtime_docs: 0,
30            skipped_hash_docs: 0,
31            added_docs: 0,
32            updated_docs: 0,
33            failed_docs: 0,
34            deactivated_docs: 0,
35            reactivated_docs: 0,
36            reaped_docs: 0,
37            embedded_chunks: 0,
38            decisions: Vec::new(),
39            errors: Vec::new(),
40            elapsed_ms: 0,
41        };
42        let mut failed_docs = HashSet::new();
43
44        let targets = crate::profile::timed_update_stage("resolve_targets", || {
45            self.resolve_targets(TargetScope {
46                space: options.space.as_deref(),
47                collections: &options.collections,
48            })
49        })?;
50        if targets.is_empty() {
51            report.elapsed_ms = started.elapsed().as_millis() as u64;
52            return Ok(report);
53        }
54        let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
55
56        crate::profile::timed_update_stage("dense_integrity", || {
57            self.reconcile_dense_integrity(&targets, &repair_scope, &options)
58        })?;
59
60        if !options.dry_run {
61            crate::profile::timed_update_stage("fts_dirty_replay", || {
62                self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)
63            })?;
64        }
65
66        let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
67        let mut pending_embeddings = Vec::new();
68        let mut failed_embedding_chunk_ids = HashSet::new();
69        for target in &targets {
70            self.update_collection_target(
71                target,
72                &options,
73                &mut report,
74                &mut fts_dirty_by_space,
75                &mut pending_embeddings,
76                &mut failed_embedding_chunk_ids,
77                &mut failed_docs,
78            )?;
79        }
80
81        if !options.dry_run {
82            crate::profile::timed_update_stage("embedding_flush_buffered", || {
83                self.flush_buffered_embeddings(
84                    &mut pending_embeddings,
85                    &mut failed_embedding_chunk_ids,
86                    &mut report,
87                    &mut failed_docs,
88                )
89            })?;
90
91            for (space, doc_ids) in fts_dirty_by_space {
92                if doc_ids.is_empty() {
93                    continue;
94                }
95
96                crate::profile::timed_update_stage("tantivy_commit", || {
97                    self.storage.commit_tantivy(&space)
98                })?;
99                let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
100                ids.sort_unstable();
101                crate::profile::timed_update_stage("sqlite_clear_fts_dirty", || {
102                    self.storage.batch_clear_fts_dirty(&ids)
103                })?;
104            }
105
106            crate::profile::timed_update_stage("embedding_backlog", || {
107                self.embed_pending_chunks(
108                    &repair_scope,
109                    &options,
110                    &mut failed_embedding_chunk_ids,
111                    &mut report,
112                    &mut failed_docs,
113                )
114            })?;
115
116            let reaped = crate::profile::timed_update_stage("sqlite_list_reapable", || {
117                self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)
118            })?;
119            let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
120            let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
121            for document in reaped {
122                reaped_doc_ids.push(document.doc_id);
123                if !document.chunk_ids.is_empty() {
124                    chunk_ids_by_space
125                        .entry(document.space_name)
126                        .or_default()
127                        .extend(document.chunk_ids);
128                }
129            }
130            for (space, chunk_ids) in chunk_ids_by_space {
131                crate::profile::timed_update_stage("purge_reaped_indexes", || {
132                    self.purge_space_chunks(&space, &chunk_ids)
133                })?;
134            }
135            crate::profile::timed_update_stage("sqlite_delete_reaped_documents", || {
136                self.storage.delete_documents(&reaped_doc_ids)
137            })?;
138            report.reaped_docs = reaped_doc_ids.len();
139        }
140
141        report.failed_docs = failed_docs.len();
142        report.elapsed_ms = started.elapsed().as_millis() as u64;
143        Ok(report)
144    }
145
146    fn reconcile_dense_integrity(
147        &self,
148        targets: &[UpdateTarget],
149        repair_scope: &UpdateRepairScope,
150        options: &UpdateOptions,
151    ) -> Result<()> {
152        if options.no_embed || options.dry_run {
153            return Ok(());
154        }
155
156        let expected_model = self.embedding_model_key();
157        let mut visited_spaces = HashSet::new();
158        for target in targets {
159            if !visited_spaces.insert(target.collection.space_id) {
160                continue;
161            }
162
163            let models = self
164                .storage
165                .list_embedding_models_in_space(target.collection.space_id)?;
166            let mut reasons = Vec::new();
167            if models
168                .iter()
169                .any(|model| model.as_str() != expected_model.as_str())
170            {
171                reasons.push(format!(
172                    "stored embedding models {:?} do not match current model '{}'",
173                    models, expected_model
174                ));
175            }
176
177            let sqlite_count = self
178                .storage
179                .count_embedded_chunks(Some(target.collection.space_id))?;
180            let usearch_count = self.storage.count_usearch(&target.space)?;
181            if sqlite_count != usearch_count {
182                reasons.push(format!(
183                    "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
184                ));
185            }
186
187            if reasons.is_empty() {
188                continue;
189            }
190
191            if !repair_scope.allows_space_dense_repair() {
192                return Err(KboltError::SpaceDenseRepairRequired {
193                    space: target.space.clone(),
194                    reason: reasons.join("; "),
195                }
196                .into());
197            }
198
199            self.storage
200                .delete_embeddings_for_space(target.collection.space_id)?;
201            self.storage.clear_usearch(&target.space)?;
202        }
203
204        Ok(())
205    }
206
207    fn embed_pending_chunks(
208        &self,
209        repair_scope: &UpdateRepairScope,
210        options: &UpdateOptions,
211        failed_chunk_ids: &mut HashSet<i64>,
212        report: &mut UpdateReport,
213        failed_docs: &mut HashSet<UpdateDocKey>,
214    ) -> Result<()> {
215        if options.no_embed || options.dry_run {
216            return Ok(());
217        }
218
219        let Some(embedder) = self.embedder.as_ref() else {
220            return Ok(());
221        };
222
223        let model = self.embedding_model_key();
224        let batch_window = embedder.preferred_document_batch_window().max(1);
225        let mut after_chunk_id = 0_i64;
226        loop {
227            let backlog =
228                crate::profile::timed_update_stage("sqlite_get_unembedded_chunks", || {
229                    self.get_unembedded_chunks_for_scope(
230                        model.as_str(),
231                        repair_scope,
232                        after_chunk_id,
233                        batch_window,
234                    )
235                })?;
236            if backlog.is_empty() {
237                break;
238            }
239
240            after_chunk_id = backlog
241                .last()
242                .map(|record| record.chunk.id)
243                .expect("non-empty backlog should have a last chunk id");
244
245            let backlog_doc_ids = backlog
246                .iter()
247                .filter(|record| !failed_chunk_ids.contains(&record.chunk.id))
248                .map(|record| record.chunk.doc_id)
249                .collect::<Vec<_>>();
250            let document_text_cache =
251                crate::profile::timed_update_stage("embedding_backlog_read", || {
252                    self.storage.get_existing_document_texts(&backlog_doc_ids)
253                })?;
254
255            let mut pending = Vec::new();
256            for record in backlog {
257                if failed_chunk_ids.contains(&record.chunk.id) {
258                    continue;
259                }
260
261                let full_path = record.collection_path.join(&record.doc_path);
262                let Some(document_text) = document_text_cache.get(&record.chunk.doc_id) else {
263                    report.errors.push(file_error(
264                        Some(full_path.clone()),
265                        format!(
266                            "embed canonical text load failed: {}",
267                            crate::storage::missing_document_text_error(record.chunk.doc_id)
268                        ),
269                    ));
270                    failed_docs.insert(update_doc_key(
271                        &record.space_name,
272                        &record.collection_path,
273                        &record.doc_path,
274                    ));
275                    continue;
276                };
277                let chunk_canonical_text = match crate::storage::chunk_text_from_canonical(
278                    document_text.text.as_str(),
279                    &record.chunk,
280                ) {
281                    Ok(text) => text,
282                    Err(err) => {
283                        report.errors.push(file_error(
284                            Some(full_path.clone()),
285                            format!("embed canonical text load failed: {err}"),
286                        ));
287                        failed_docs.insert(update_doc_key(
288                            &record.space_name,
289                            &record.collection_path,
290                            &record.doc_path,
291                        ));
292                        continue;
293                    }
294                };
295                let dense_input = render_dense_document(ChunkRetrievalContext {
296                    body: chunk_canonical_text.as_str(),
297                    retrieval_prefix: record.chunk.retrieval_prefix.as_deref(),
298                    title: record.doc_title.as_deref(),
299                    heading: record.chunk.heading.as_deref(),
300                });
301                let embedding_text =
302                    normalize_embedding_payload(self.format_embedding_document(&dense_input));
303                let policy = resolve_policy(
304                    &self.config.chunking,
305                    Some(document_text.extractor_key.as_str()),
306                    None,
307                );
308                let max_document_tokens = effective_chunk_hard_max(&policy);
309
310                pending.push(PendingChunkEmbedding {
311                    chunk_id: record.chunk.id,
312                    doc_key: update_doc_key(
313                        &record.space_name,
314                        &record.collection_path,
315                        &record.doc_path,
316                    ),
317                    space_name: record.space_name,
318                    path: full_path,
319                    embedding_text,
320                    max_document_tokens,
321                });
322            }
323
324            if pending.is_empty() {
325                continue;
326            }
327
328            let mut preflight_failed_chunk_ids = Vec::new();
329            let pending = self.preflight_pending_embeddings(
330                pending,
331                report,
332                failed_docs,
333                &mut preflight_failed_chunk_ids,
334            )?;
335            failed_chunk_ids.extend(preflight_failed_chunk_ids);
336            if pending.is_empty() {
337                continue;
338            }
339
340            let result = self.embed_preflighted_pending_batch_with_partial_failures(
341                embedder.as_ref(),
342                pending,
343                report,
344                failed_docs,
345            )?;
346            failed_chunk_ids.extend(result.failed_chunk_ids);
347            if !result.embeddings.is_empty() {
348                self.store_chunk_embeddings(model.as_str(), result.embeddings, report)?;
349            }
350        }
351
352        Ok(())
353    }
354
355    fn flush_buffered_embeddings(
356        &self,
357        pending_embeddings: &mut Vec<PendingChunkEmbedding>,
358        failed_chunk_ids: &mut HashSet<i64>,
359        report: &mut UpdateReport,
360        failed_docs: &mut HashSet<UpdateDocKey>,
361    ) -> Result<()> {
362        if pending_embeddings.is_empty() {
363            return Ok(());
364        }
365
366        let Some(embedder) = self.embedder.as_ref() else {
367            pending_embeddings.clear();
368            return Ok(());
369        };
370
371        let pending = std::mem::take(pending_embeddings);
372        let result = self.embed_preflighted_pending_batch_with_partial_failures(
373            embedder.as_ref(),
374            pending,
375            report,
376            failed_docs,
377        )?;
378        failed_chunk_ids.extend(result.failed_chunk_ids);
379        if result.embeddings.is_empty() {
380            return Ok(());
381        }
382
383        let model = self.embedding_model_key();
384        self.store_chunk_embeddings(model.as_str(), result.embeddings, report)
385    }
386
387    fn document_embedding_batch_window(&self) -> usize {
388        self.embedder
389            .as_ref()
390            .map(|embedder| embedder.preferred_document_batch_window())
391            .unwrap_or(crate::models::DEFAULT_DOCUMENT_BATCH_WINDOW)
392            .max(1)
393    }
394
395    fn embed_preflighted_pending_batch_with_partial_failures(
396        &self,
397        embedder: &dyn crate::models::Embedder,
398        pending: Vec<PendingChunkEmbedding>,
399        report: &mut UpdateReport,
400        failed_docs: &mut HashSet<UpdateDocKey>,
401    ) -> Result<EmbeddedPendingBatch> {
402        if pending.is_empty() {
403            return Ok(EmbeddedPendingBatch::default());
404        }
405
406        let texts = pending
407            .iter()
408            .map(|embedding| embedding.embedding_text.clone())
409            .collect::<Vec<_>>();
410        crate::profile::increment_update_count("embedding_batch_calls", 1);
411        crate::profile::increment_update_count("embedding_input_texts", texts.len() as u64);
412        match crate::profile::timed_update_stage("embedding_http", || {
413            embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts)
414        }) {
415            Ok(vectors) => {
416                if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
417                    return self.split_pending_embedding_batch(
418                        embedder,
419                        pending,
420                        report,
421                        failed_docs,
422                        detail,
423                    );
424                }
425
426                let embeddings = pending
427                    .into_iter()
428                    .zip(vectors)
429                    .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
430                    .collect::<Vec<_>>();
431                Ok(EmbeddedPendingBatch {
432                    embeddings,
433                    failed_chunk_ids: Vec::new(),
434                })
435            }
436            Err(err) => self.split_pending_embedding_batch(
437                embedder,
438                pending,
439                report,
440                failed_docs,
441                err.to_string(),
442            ),
443        }
444    }
445
446    fn split_pending_embedding_batch(
447        &self,
448        embedder: &dyn crate::models::Embedder,
449        pending: Vec<PendingChunkEmbedding>,
450        report: &mut UpdateReport,
451        failed_docs: &mut HashSet<UpdateDocKey>,
452        detail: String,
453    ) -> Result<EmbeddedPendingBatch> {
454        if pending.len() == 1 {
455            let embedding = pending
456                .into_iter()
457                .next()
458                .expect("single pending embedding should exist");
459            report.errors.push(file_error(
460                Some(embedding.path),
461                format!("embed failed: {detail}"),
462            ));
463            failed_docs.insert(embedding.doc_key);
464            return Ok(EmbeddedPendingBatch {
465                embeddings: Vec::new(),
466                failed_chunk_ids: vec![embedding.chunk_id],
467            });
468        }
469
470        let mid = pending.len() / 2;
471        let mut right = pending;
472        let left = right.drain(..mid).collect::<Vec<_>>();
473
474        let mut left_result = self.embed_preflighted_pending_batch_with_partial_failures(
475            embedder,
476            left,
477            report,
478            failed_docs,
479        )?;
480        let right_result = self.embed_preflighted_pending_batch_with_partial_failures(
481            embedder,
482            right,
483            report,
484            failed_docs,
485        )?;
486        left_result.embeddings.extend(right_result.embeddings);
487        left_result
488            .failed_chunk_ids
489            .extend(right_result.failed_chunk_ids);
490        Ok(left_result)
491    }
492
493    fn store_chunk_embeddings(
494        &self,
495        model: &str,
496        embeddings: Vec<(i64, String, Vec<f32>)>,
497        report: &mut UpdateReport,
498    ) -> Result<()> {
499        let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
500        let mut embedding_rows = Vec::with_capacity(embeddings.len());
501        for (chunk_id, space_name, vector) in embeddings {
502            if vector.is_empty() {
503                return Err(KboltError::Inference(format!(
504                    "embedder returned an empty vector for chunk {chunk_id}"
505                ))
506                .into());
507            }
508
509            grouped_vectors
510                .entry(space_name)
511                .or_default()
512                .push((chunk_id, vector));
513            embedding_rows.push((chunk_id, model));
514        }
515
516        for (space, vectors) in grouped_vectors {
517            let refs = vectors
518                .iter()
519                .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
520                .collect::<Vec<_>>();
521            crate::profile::increment_update_count("usearch_vectors", refs.len() as u64);
522            self.storage.batch_insert_usearch_deferred(&space, &refs)?;
523        }
524
525        crate::profile::timed_update_stage("sqlite_insert_embeddings", || {
526            self.storage.insert_embeddings(&embedding_rows)
527        })?;
528        report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
529        Ok(())
530    }
531
532    fn preflight_pending_embeddings(
533        &self,
534        pending: Vec<PendingChunkEmbedding>,
535        report: &mut UpdateReport,
536        failed_docs: &mut HashSet<UpdateDocKey>,
537        failed_chunk_ids: &mut Vec<i64>,
538    ) -> Result<Vec<PendingChunkEmbedding>> {
539        let Some(sizer) = self.embedding_document_sizer.as_ref() else {
540            return Ok(pending);
541        };
542
543        let mut accepted = Vec::with_capacity(pending.len());
544        for embedding in pending {
545            match check_embedding_preflight(
546                sizer.as_ref(),
547                &embedding.embedding_text,
548                embedding.max_document_tokens,
549            ) {
550                EmbeddingPreflightDecision::Accept => {
551                    accepted.push(embedding);
552                }
553                EmbeddingPreflightDecision::Reject(detail) => {
554                    report
555                        .errors
556                        .push(file_error(Some(embedding.path.clone()), detail));
557                    failed_docs.insert(embedding.doc_key);
558                    failed_chunk_ids.push(embedding.chunk_id);
559                }
560            }
561        }
562
563        Ok(accepted)
564    }
565
566    fn preflight_prepared_embeddings(
567        &self,
568        prepared: Vec<PreparedChunkEmbedding>,
569        report: &mut UpdateReport,
570        failed_docs: &mut HashSet<UpdateDocKey>,
571    ) -> Result<PreparedEmbeddingPreflight> {
572        let Some(sizer) = self.embedding_document_sizer.as_ref() else {
573            return Ok(PreparedEmbeddingPreflight {
574                accepted: prepared,
575                rejected_chunk_indexes: Vec::new(),
576            });
577        };
578
579        let mut preflight = PreparedEmbeddingPreflight {
580            accepted: Vec::with_capacity(prepared.len()),
581            rejected_chunk_indexes: Vec::new(),
582        };
583        for embedding in prepared {
584            match check_embedding_preflight(
585                sizer.as_ref(),
586                &embedding.embedding_text,
587                embedding.max_document_tokens,
588            ) {
589                EmbeddingPreflightDecision::Accept => {
590                    preflight.accepted.push(embedding);
591                }
592                EmbeddingPreflightDecision::Reject(detail) => {
593                    report
594                        .errors
595                        .push(file_error(Some(embedding.path.clone()), detail));
596                    failed_docs.insert(embedding.doc_key);
597                    preflight.rejected_chunk_indexes.push(embedding.chunk_index);
598                }
599            }
600        }
601
602        Ok(preflight)
603    }
604
605    fn replay_fts_dirty_documents(
606        &self,
607        repair_scope: &UpdateRepairScope,
608        report: &mut UpdateReport,
609        failed_docs: &mut HashSet<UpdateDocKey>,
610    ) -> Result<()> {
611        let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
612        if records.is_empty() {
613            return Ok(());
614        }
615
616        let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
617        for record in records {
618            let space_name = record.space_name;
619            let doc_id = record.doc_id;
620
621            if record.chunks.is_empty() {
622                self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
623                cleared_by_space.entry(space_name).or_default().push(doc_id);
624                continue;
625            }
626
627            let document_text = match self.storage.get_document_text(doc_id) {
628                Ok(row) => row,
629                Err(err) => {
630                    failed_docs.insert(update_doc_key(
631                        &space_name,
632                        &record.collection_path,
633                        &record.doc_path,
634                    ));
635                    report.errors.push(file_error(
636                        Some(record.collection_path.join(&record.doc_path)),
637                        format!("fts replay canonical text load failed: {err}"),
638                    ));
639                    continue;
640                }
641            };
642            let policy = resolve_policy(
643                &self.config.chunking,
644                Some(document_text.extractor_key.as_str()),
645                None,
646            );
647
648            let entries = record
649                .chunks
650                .iter()
651                .map(|chunk| -> Result<TantivyEntry> {
652                    let chunk_text = crate::storage::chunk_text_from_canonical(
653                        document_text.text.as_str(),
654                        chunk,
655                    )?;
656                    Ok(TantivyEntry {
657                        chunk_id: chunk.id,
658                        doc_id,
659                        filepath: record.doc_path.clone(),
660                        title: record.doc_title.clone(),
661                        heading: chunk.heading.clone(),
662                        body: render_bm25_document(
663                            ChunkRetrievalContext {
664                                body: chunk_text.as_str(),
665                                retrieval_prefix: chunk.retrieval_prefix.as_deref(),
666                                title: record.doc_title.as_deref(),
667                                heading: chunk.heading.as_deref(),
668                            },
669                            policy.contextual_prefix,
670                        ),
671                    })
672                })
673                .collect::<Result<Vec<_>>>()?;
674
675            self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
676            self.storage.index_tantivy(&space_name, &entries)?;
677            cleared_by_space.entry(space_name).or_default().push(doc_id);
678        }
679
680        for (space_name, mut doc_ids) in cleared_by_space {
681            if doc_ids.is_empty() {
682                continue;
683            }
684
685            doc_ids.sort_unstable();
686            doc_ids.dedup();
687            self.storage.commit_tantivy(&space_name)?;
688            self.storage.batch_clear_fts_dirty(&doc_ids)?;
689        }
690
691        Ok(())
692    }
693
694    fn get_fts_dirty_documents_for_scope(
695        &self,
696        repair_scope: &UpdateRepairScope,
697    ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
698        match repair_scope {
699            UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
700            UpdateRepairScope::Space { space_id } => {
701                self.storage.get_fts_dirty_documents_in_space(*space_id)
702            }
703            UpdateRepairScope::Collections { collection_ids } => self
704                .storage
705                .get_fts_dirty_documents_in_collections(collection_ids),
706        }
707    }
708
709    fn get_unembedded_chunks_for_scope(
710        &self,
711        model: &str,
712        repair_scope: &UpdateRepairScope,
713        after_chunk_id: i64,
714        limit: usize,
715    ) -> Result<Vec<crate::storage::EmbedRecord>> {
716        match repair_scope {
717            UpdateRepairScope::Global => {
718                self.storage
719                    .get_unembedded_chunks(model, after_chunk_id, limit)
720            }
721            UpdateRepairScope::Space { space_id } => {
722                self.storage
723                    .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
724            }
725            UpdateRepairScope::Collections { collection_ids } => self
726                .storage
727                .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
728        }
729    }
730
731    fn list_reapable_documents_for_scope(
732        &self,
733        older_than_days: u32,
734        repair_scope: &UpdateRepairScope,
735    ) -> Result<Vec<crate::storage::ReapableDocument>> {
736        match repair_scope {
737            UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
738            UpdateRepairScope::Space { space_id } => self
739                .storage
740                .list_reapable_documents_in_space(older_than_days, *space_id),
741            UpdateRepairScope::Collections { collection_ids } => self
742                .storage
743                .list_reapable_documents_in_collections(older_than_days, collection_ids),
744        }
745    }
746
747    pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
748        self.resolve_targets(TargetScope {
749            space: options.space.as_deref(),
750            collections: &options.collections,
751        })
752    }
753
754    pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
755        let mut targets = Vec::new();
756
757        if scope.collections.is_empty() {
758            return self.resolve_update_targets_for_all_collections(scope.space);
759        }
760
761        let mut seen = std::collections::HashSet::new();
762        for raw_collection_name in scope.collections {
763            let collection_name = raw_collection_name.trim();
764            if collection_name.is_empty() {
765                return Err(KboltError::InvalidInput(
766                    "collection names cannot be empty".to_string(),
767                )
768                .into());
769            }
770
771            let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
772            let collection = self
773                .storage
774                .get_collection(resolved_space.id, collection_name)?;
775
776            if seen.insert((collection.space_id, collection.name.clone())) {
777                targets.push(UpdateTarget {
778                    space: resolved_space.name,
779                    collection,
780                });
781            }
782        }
783
784        Ok(targets)
785    }
786
787    fn resolve_update_targets_for_all_collections(
788        &self,
789        space: Option<&str>,
790    ) -> Result<Vec<UpdateTarget>> {
791        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
792            let resolved = self.resolve_space_row(Some(space_name), None)?;
793            let mut map = std::collections::HashMap::new();
794            map.insert(resolved.id, resolved.name.clone());
795            (Some(resolved.id), map)
796        } else {
797            let spaces = self.storage.list_spaces()?;
798            let map = spaces
799                .into_iter()
800                .map(|space| (space.id, space.name))
801                .collect::<std::collections::HashMap<_, _>>();
802            (None, map)
803        };
804
805        let collections = self.storage.list_collections(space_id_filter)?;
806        let mut targets = Vec::with_capacity(collections.len());
807        for collection in collections {
808            let space_name = spaces_by_id
809                .get(&collection.space_id)
810                .ok_or_else(|| {
811                    KboltError::Internal(format!(
812                        "missing space mapping for collection '{}'",
813                        collection.name
814                    ))
815                })?
816                .clone();
817            targets.push(UpdateTarget {
818                space: space_name,
819                collection,
820            });
821        }
822
823        Ok(targets)
824    }
825
826    fn update_collection_target(
827        &self,
828        target: &UpdateTarget,
829        options: &UpdateOptions,
830        report: &mut UpdateReport,
831        fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
832        pending_embeddings: &mut Vec<PendingChunkEmbedding>,
833        failed_chunk_ids: &mut HashSet<i64>,
834        failed_docs: &mut HashSet<UpdateDocKey>,
835    ) -> Result<()> {
836        let all_documents = crate::profile::timed_update_stage("sqlite_list_documents", || {
837            self.storage.list_documents(target.collection.id, false)
838        })?;
839        let document_ids = all_documents.iter().map(|doc| doc.id).collect::<Vec<_>>();
840        let document_text_generations =
841            crate::profile::timed_update_stage("sqlite_list_document_text_generations", || {
842                self.storage
843                    .get_document_text_generation_keys(&document_ids)
844            })?;
845        let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
846            .into_iter()
847            .map(|doc| (doc.path.clone(), doc))
848            .collect();
849        let mut seen_paths = HashSet::new();
850        let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
851        let ignore_matcher = load_collection_ignore_matcher(
852            &self.config.config_dir,
853            &target.collection.path,
854            &target.space,
855            &target.collection.name,
856        )?;
857        let extractor_registry = default_registry();
858        let embedding_batch_window = self.document_embedding_batch_window();
859        let mut touched_collection = false;
860        let mut failed_walk_prefixes = Vec::new();
861        let mut collection_walk_incomplete = false;
862
863        for entry in super::ignore_helpers::build_collection_walk(&target.collection.path) {
864            let entry = match entry {
865                Ok(item) => item,
866                Err(err) => {
867                    let error_scope = collect_walk_error_scope(&err);
868                    let error_path = error_scope.paths.first().cloned();
869                    if error_scope.collection_incomplete || error_scope.paths.is_empty() {
870                        collection_walk_incomplete = true;
871                    }
872                    for path in &error_scope.paths {
873                        let failed_path =
874                            match collection_relative_path(&target.collection.path, path) {
875                                Ok(relative) => {
876                                    if relative.is_empty() || relative == "." {
877                                        collection_walk_incomplete = true;
878                                    } else {
879                                        failed_walk_prefixes.push(relative.clone());
880                                    }
881                                    relative
882                                }
883                                Err(_) => {
884                                    collection_walk_incomplete = true;
885                                    path.display().to_string()
886                                }
887                            };
888                        failed_docs.insert(update_doc_key(
889                            &target.space,
890                            &target.collection.path,
891                            &failed_path,
892                        ));
893                    }
894                    report
895                        .errors
896                        .push(file_error(error_path, format!("walk error: {err}")));
897                    continue;
898                }
899            };
900
901            if !entry
902                .file_type()
903                .is_some_and(|file_type| file_type.is_file())
904            {
905                continue;
906            }
907
908            if is_hard_ignored_file(entry.path()) {
909                continue;
910            }
911
912            let relative_path =
913                match collection_relative_path(&target.collection.path, entry.path()) {
914                    Ok(path) => path,
915                    Err(err) => {
916                        failed_docs.insert(update_doc_key(
917                            &target.space,
918                            &target.collection.path,
919                            &entry.path().display().to_string(),
920                        ));
921                        report.errors.push(file_error(
922                            Some(entry.path().to_path_buf()),
923                            err.to_string(),
924                        ));
925                        continue;
926                    }
927                };
928
929            if !extension_allowed(entry.path(), extension_filter.as_ref()) {
930                push_update_decision(
931                    report,
932                    options,
933                    target,
934                    &relative_path,
935                    UpdateDecisionKind::Unsupported,
936                    Some("extension not allowed".to_string()),
937                );
938                continue;
939            }
940
941            if let Some(matcher) = ignore_matcher.as_ref() {
942                if matcher
943                    .matched(Path::new(&relative_path), false)
944                    .is_ignore()
945                {
946                    push_update_decision(
947                        report,
948                        options,
949                        target,
950                        &relative_path,
951                        UpdateDecisionKind::Ignored,
952                        Some("matched ignore patterns".to_string()),
953                    );
954                    continue;
955                }
956            }
957
958            let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
959                push_update_decision(
960                    report,
961                    options,
962                    target,
963                    &relative_path,
964                    UpdateDecisionKind::Unsupported,
965                    Some("no extractor available".to_string()),
966                );
967                continue;
968            };
969            let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
970            let generation_key =
971                ingestion_generation_key(extractor.profile_key(), extractor.version(), &policy);
972
973            report.scanned_docs += 1;
974            crate::profile::increment_update_count("docs_scanned", 1);
975            seen_paths.insert(relative_path.clone());
976
977            let scan_started = Instant::now();
978            let metadata = match entry.metadata() {
979                Ok(data) => data,
980                Err(err) => {
981                    let detail = format!("metadata error: {err}");
982                    failed_docs.insert(update_doc_key(
983                        &target.space,
984                        &target.collection.path,
985                        &relative_path,
986                    ));
987                    push_update_decision(
988                        report,
989                        options,
990                        target,
991                        &relative_path,
992                        UpdateDecisionKind::ReadFailed,
993                        Some(detail.clone()),
994                    );
995                    report
996                        .errors
997                        .push(file_error(Some(entry.path().to_path_buf()), detail));
998                    crate::profile::record_update_stage(
999                        "scan_metadata_mtime",
1000                        scan_started.elapsed(),
1001                    );
1002                    continue;
1003                }
1004            };
1005
1006            let modified = match modified_token(&metadata) {
1007                Ok(value) => value,
1008                Err(err) => {
1009                    let detail = format!("modified timestamp error: {err}");
1010                    failed_docs.insert(update_doc_key(
1011                        &target.space,
1012                        &target.collection.path,
1013                        &relative_path,
1014                    ));
1015                    push_update_decision(
1016                        report,
1017                        options,
1018                        target,
1019                        &relative_path,
1020                        UpdateDecisionKind::ReadFailed,
1021                        Some(detail.clone()),
1022                    );
1023                    report
1024                        .errors
1025                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1026                    crate::profile::record_update_stage(
1027                        "scan_metadata_mtime",
1028                        scan_started.elapsed(),
1029                    );
1030                    continue;
1031                }
1032            };
1033
1034            if let Some(existing) = docs_by_path.get(&relative_path) {
1035                let has_current_canonical_text = document_has_current_canonical_text(
1036                    &document_text_generations,
1037                    existing.id,
1038                    generation_key.as_str(),
1039                );
1040                if existing.active && existing.modified == modified && has_current_canonical_text {
1041                    report.skipped_mtime_docs += 1;
1042                    push_update_decision(
1043                        report,
1044                        options,
1045                        target,
1046                        &relative_path,
1047                        UpdateDecisionKind::SkippedMtime,
1048                        None,
1049                    );
1050                    crate::profile::record_update_stage(
1051                        "scan_metadata_mtime",
1052                        scan_started.elapsed(),
1053                    );
1054                    continue;
1055                }
1056            }
1057            crate::profile::record_update_stage("scan_metadata_mtime", scan_started.elapsed());
1058
1059            let read_hash_started = Instant::now();
1060            let bytes = match std::fs::read(entry.path()) {
1061                Ok(data) => data,
1062                Err(err) => {
1063                    let detail = err.to_string();
1064                    failed_docs.insert(update_doc_key(
1065                        &target.space,
1066                        &target.collection.path,
1067                        &relative_path,
1068                    ));
1069                    push_update_decision(
1070                        report,
1071                        options,
1072                        target,
1073                        &relative_path,
1074                        UpdateDecisionKind::ReadFailed,
1075                        Some(detail.clone()),
1076                    );
1077                    report
1078                        .errors
1079                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1080                    crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1081                    continue;
1082                }
1083            };
1084            let hash = sha256_hex(&bytes);
1085            crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1086            let mut title = None::<String>;
1087
1088            let existing = docs_by_path.get(&relative_path).cloned();
1089            let pending_decision;
1090            let pending_indexing;
1091            if let Some(doc) = existing.as_ref() {
1092                let has_current_canonical_text = document_has_current_canonical_text(
1093                    &document_text_generations,
1094                    doc.id,
1095                    generation_key.as_str(),
1096                );
1097                if doc.hash == hash && has_current_canonical_text {
1098                    if doc.active {
1099                        report.skipped_hash_docs += 1;
1100                        push_update_decision(
1101                            report,
1102                            options,
1103                            target,
1104                            &relative_path,
1105                            UpdateDecisionKind::SkippedHash,
1106                            None,
1107                        );
1108                    } else {
1109                        report.reactivated_docs += 1;
1110                        push_update_decision(
1111                            report,
1112                            options,
1113                            target,
1114                            &relative_path,
1115                            UpdateDecisionKind::Reactivated,
1116                            None,
1117                        );
1118                    }
1119
1120                    if !options.dry_run {
1121                        crate::profile::timed_update_stage("sqlite_refresh_document", || {
1122                            self.storage.refresh_document_activity(doc.id, &modified)
1123                        })?;
1124                    }
1125                    continue;
1126                }
1127
1128                pending_indexing = PendingDocumentIndexing::Updated {
1129                    reactivated: !doc.active,
1130                };
1131                pending_decision = (
1132                    UpdateDecisionKind::Changed,
1133                    (!doc.active).then_some("reactivated".to_string()),
1134                );
1135            } else {
1136                pending_indexing = PendingDocumentIndexing::Added;
1137                pending_decision = (UpdateDecisionKind::New, None);
1138            }
1139
1140            if options.dry_run {
1141                pending_indexing.record(report);
1142                let (kind, detail) = pending_decision;
1143                push_update_decision(report, options, target, &relative_path, kind, detail);
1144                continue;
1145            }
1146
1147            let extracted = match crate::profile::timed_update_stage("extract", || {
1148                extractor.extract(entry.path(), &bytes)
1149            }) {
1150                Ok(document) => document,
1151                Err(err) => {
1152                    let detail = format!("extract failed: {err}");
1153                    failed_docs.insert(update_doc_key(
1154                        &target.space,
1155                        &target.collection.path,
1156                        &relative_path,
1157                    ));
1158                    push_update_decision(
1159                        report,
1160                        options,
1161                        target,
1162                        &relative_path,
1163                        UpdateDecisionKind::ExtractFailed,
1164                        Some(detail.clone()),
1165                    );
1166                    report
1167                        .errors
1168                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1169                    continue;
1170                }
1171            };
1172            if let Some(extracted_title) = extracted
1173                .title
1174                .as_deref()
1175                .map(str::trim)
1176                .filter(|title| !title.is_empty())
1177            {
1178                title = Some(extracted_title.to_string());
1179            }
1180
1181            let canonical = crate::profile::timed_update_stage("canonical_text", || {
1182                build_canonical_document(&extracted)
1183            });
1184            let text_hash = sha256_hex(canonical.text.as_bytes());
1185            let max_document_tokens = effective_chunk_hard_max(&policy);
1186            let chunk_started = Instant::now();
1187            let final_chunks_result = match self.embedding_document_sizer.as_ref() {
1188                Some(sizer) => {
1189                    let sizer_counter = EmbeddingDocumentSizerCounter {
1190                        inner: sizer.as_ref(),
1191                    };
1192                    chunk_canonical_document_with_counter(
1193                        &canonical.document,
1194                        &policy,
1195                        &sizer_counter,
1196                    )
1197                }
1198                None => Ok(chunk_canonical_document(&canonical.document, &policy)),
1199            };
1200            crate::profile::record_update_stage("chunk", chunk_started.elapsed());
1201            let final_chunks = match final_chunks_result {
1202                Ok(chunks) => chunks,
1203                Err(err) => {
1204                    let detail = format!("chunking failed: {err}");
1205                    failed_docs.insert(update_doc_key(
1206                        &target.space,
1207                        &target.collection.path,
1208                        &relative_path,
1209                    ));
1210                    push_update_decision(
1211                        report,
1212                        options,
1213                        target,
1214                        &relative_path,
1215                        UpdateDecisionKind::ExtractFailed,
1216                        Some(detail.clone()),
1217                    );
1218                    report
1219                        .errors
1220                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1221                    continue;
1222                }
1223            };
1224            crate::profile::increment_update_count("chunks_created", final_chunks.len() as u64);
1225
1226            let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1227            let chunk_inserts = final_chunks
1228                .iter()
1229                .enumerate()
1230                .map(|(index, chunk)| ChunkInsert {
1231                    seq: index as i32,
1232                    offset: chunk.offset,
1233                    length: chunk.length,
1234                    heading: chunk.heading.clone(),
1235                    kind: chunk.kind,
1236                    retrieval_prefix: chunk.retrieval_prefix.clone(),
1237                })
1238                .collect::<Vec<_>>();
1239            let mut prepared_embeddings = Vec::new();
1240            if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1241                let prepared = crate::profile::timed_update_stage("embedding_prepare_text", || {
1242                    prepare_chunk_embeddings(
1243                        &final_chunks,
1244                        &doc_key,
1245                        target,
1246                        entry.path(),
1247                        max_document_tokens,
1248                        title.as_deref(),
1249                        self.embedding_input_formatter(),
1250                    )
1251                });
1252                let preflight =
1253                    self.preflight_prepared_embeddings(prepared, report, failed_docs)?;
1254                prepared_embeddings = preflight.accepted;
1255                if !preflight.rejected_chunk_indexes.is_empty() {
1256                    push_update_decision(
1257                        report,
1258                        options,
1259                        target,
1260                        &relative_path,
1261                        UpdateDecisionKind::ExtractFailed,
1262                        Some("embed preflight failed".to_string()),
1263                    );
1264                    continue;
1265                }
1266            }
1267
1268            let replacement =
1269                crate::profile::timed_update_stage("sqlite_replace_document_generation", || {
1270                    self.storage
1271                        .replace_document_generation(DocumentGenerationReplace {
1272                            collection_id: target.collection.id,
1273                            path: &relative_path,
1274                            title: title.as_deref(),
1275                            hash: &hash,
1276                            modified: &modified,
1277                            extractor_key: extractor.profile_key(),
1278                            source_hash: &hash,
1279                            text_hash: &text_hash,
1280                            generation_key: &generation_key,
1281                            text: canonical.text.as_str(),
1282                            chunks: &chunk_inserts,
1283                        })
1284                })?;
1285            let doc_id = replacement.doc_id;
1286            let chunk_ids = replacement.chunk_ids;
1287
1288            if !replacement.old_chunk_ids.is_empty() {
1289                crate::profile::timed_update_stage("tantivy_delete", || {
1290                    self.storage
1291                        .delete_tantivy(&target.space, &replacement.old_chunk_ids)
1292                })?;
1293                crate::profile::timed_update_stage("usearch_delete", || {
1294                    self.storage
1295                        .delete_usearch_deferred(&target.space, &replacement.old_chunk_ids)
1296                })?;
1297            }
1298
1299            fts_dirty_by_space
1300                .entry(target.space.clone())
1301                .or_default()
1302                .insert(doc_id);
1303
1304            if !chunk_ids.is_empty() {
1305                if !options.no_embed && self.embedder.is_some() {
1306                    for prepared in prepared_embeddings {
1307                        pending_embeddings.push(PendingChunkEmbedding {
1308                            chunk_id: chunk_ids[prepared.chunk_index],
1309                            doc_key: prepared.doc_key,
1310                            space_name: prepared.space_name,
1311                            path: prepared.path,
1312                            embedding_text: prepared.embedding_text,
1313                            max_document_tokens: prepared.max_document_tokens,
1314                        });
1315                    }
1316                    if pending_embeddings.len() >= embedding_batch_window {
1317                        self.flush_buffered_embeddings(
1318                            pending_embeddings,
1319                            failed_chunk_ids,
1320                            report,
1321                            failed_docs,
1322                        )?;
1323                    }
1324                }
1325
1326                let entries = chunk_ids
1327                    .iter()
1328                    .zip(final_chunks.iter())
1329                    .map(|(chunk_id, chunk)| TantivyEntry {
1330                        chunk_id: *chunk_id,
1331                        doc_id,
1332                        filepath: relative_path.clone(),
1333                        title: title.clone(),
1334                        heading: chunk.heading.clone(),
1335                        body: render_bm25_document(
1336                            ChunkRetrievalContext {
1337                                body: chunk.text.as_str(),
1338                                retrieval_prefix: chunk.retrieval_prefix.as_deref(),
1339                                title: title.as_deref(),
1340                                heading: chunk.heading.as_deref(),
1341                            },
1342                            policy.contextual_prefix,
1343                        ),
1344                    })
1345                    .collect::<Vec<_>>();
1346                crate::profile::increment_update_count("tantivy_entries", entries.len() as u64);
1347                crate::profile::timed_update_stage("tantivy_add", || {
1348                    self.storage.index_tantivy(&target.space, &entries)
1349                })?;
1350            }
1351
1352            docs_by_path.insert(
1353                relative_path.clone(),
1354                crate::profile::timed_update_stage("sqlite_get_document_after_write", || {
1355                    self.storage
1356                        .get_document_by_path(target.collection.id, &relative_path)
1357                })?
1358                .ok_or_else(|| {
1359                    KboltError::Internal(format!(
1360                        "document missing after upsert: collection={}, path={relative_path}",
1361                        target.collection.id
1362                    ))
1363                })?,
1364            );
1365            pending_indexing.record(report);
1366            let (kind, detail) = pending_decision;
1367            push_update_decision(report, options, target, &relative_path, kind, detail);
1368            touched_collection = true;
1369        }
1370
1371        let mut missing_docs = docs_by_path
1372            .values()
1373            .filter(|doc| {
1374                doc.active
1375                    && !seen_paths.contains(&doc.path)
1376                    && !path_is_under_failed_walk(
1377                        doc.path.as_str(),
1378                        &failed_walk_prefixes,
1379                        collection_walk_incomplete,
1380                    )
1381            })
1382            .cloned()
1383            .collect::<Vec<_>>();
1384        missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1385
1386        for doc in missing_docs {
1387            if doc.active && !seen_paths.contains(&doc.path) {
1388                report.deactivated_docs += 1;
1389                push_update_decision(
1390                    report,
1391                    options,
1392                    target,
1393                    &doc.path,
1394                    UpdateDecisionKind::Deactivated,
1395                    None,
1396                );
1397                if !options.dry_run {
1398                    crate::profile::timed_update_stage("sqlite_deactivate_document", || {
1399                        self.storage.deactivate_document(doc.id)
1400                    })?;
1401                    touched_collection = true;
1402                }
1403            }
1404        }
1405
1406        if touched_collection && !options.dry_run {
1407            crate::profile::timed_update_stage("sqlite_update_collection_timestamp", || {
1408                self.storage
1409                    .update_collection_timestamp(target.collection.id)
1410            })?;
1411        }
1412
1413        Ok(())
1414    }
1415}
1416
1417#[derive(Debug, Default)]
1418struct WalkErrorScope {
1419    paths: Vec<std::path::PathBuf>,
1420    collection_incomplete: bool,
1421}
1422
1423fn collect_walk_error_scope(err: &ignore::Error) -> WalkErrorScope {
1424    let mut scope = WalkErrorScope::default();
1425    collect_walk_error_scope_into(err, &mut scope, false);
1426    scope
1427}
1428
1429fn collect_walk_error_scope_into(
1430    err: &ignore::Error,
1431    scope: &mut WalkErrorScope,
1432    has_path_context: bool,
1433) {
1434    match err {
1435        ignore::Error::Partial(errors) => {
1436            if errors.is_empty() {
1437                scope.collection_incomplete = true;
1438            }
1439            for err in errors {
1440                collect_walk_error_scope_into(err, scope, has_path_context);
1441            }
1442        }
1443        ignore::Error::WithLineNumber { err, .. } | ignore::Error::WithDepth { err, .. } => {
1444            collect_walk_error_scope_into(err, scope, has_path_context);
1445        }
1446        ignore::Error::WithPath { path, err } => {
1447            scope.paths.push(path.clone());
1448            collect_walk_error_scope_into(err, scope, true);
1449        }
1450        ignore::Error::Loop { child, .. } => {
1451            scope.paths.push(child.clone());
1452        }
1453        ignore::Error::Io(_) => {
1454            if !has_path_context {
1455                scope.collection_incomplete = true;
1456            }
1457        }
1458        ignore::Error::Glob { .. }
1459        | ignore::Error::UnrecognizedFileType(_)
1460        | ignore::Error::InvalidDefinition => {
1461            scope.collection_incomplete = true;
1462        }
1463    }
1464}
1465
1466fn path_is_under_failed_walk(
1467    doc_path: &str,
1468    failed_walk_prefixes: &[String],
1469    collection_walk_incomplete: bool,
1470) -> bool {
1471    if collection_walk_incomplete {
1472        return true;
1473    }
1474
1475    failed_walk_prefixes.iter().any(|prefix| {
1476        prefix.is_empty()
1477            || prefix == "."
1478            || doc_path == prefix
1479            || doc_path
1480                .strip_prefix(prefix.as_str())
1481                .is_some_and(|suffix| suffix.starts_with('/'))
1482    })
1483}
1484
1485#[cfg(test)]
1486mod tests {
1487    use super::*;
1488    use std::io;
1489    use std::path::PathBuf;
1490
1491    #[test]
1492    fn walk_error_scope_collects_every_partial_path() {
1493        let err = ignore::Error::Partial(vec![
1494            ignore::Error::WithPath {
1495                path: PathBuf::from("/collection/blocked-a"),
1496                err: Box::new(ignore::Error::Io(io::Error::new(
1497                    io::ErrorKind::PermissionDenied,
1498                    "blocked",
1499                ))),
1500            },
1501            ignore::Error::WithDepth {
1502                depth: 2,
1503                err: Box::new(ignore::Error::Loop {
1504                    ancestor: PathBuf::from("/collection"),
1505                    child: PathBuf::from("/collection/blocked-b"),
1506                }),
1507            },
1508        ]);
1509
1510        let scope = collect_walk_error_scope(&err);
1511
1512        assert_eq!(
1513            scope.paths,
1514            vec![
1515                PathBuf::from("/collection/blocked-a"),
1516                PathBuf::from("/collection/blocked-b")
1517            ]
1518        );
1519        assert!(!scope.collection_incomplete);
1520    }
1521
1522    #[test]
1523    fn walk_error_scope_marks_pathless_errors_incomplete() {
1524        let err = ignore::Error::Partial(vec![
1525            ignore::Error::WithPath {
1526                path: PathBuf::from("/collection/blocked"),
1527                err: Box::new(ignore::Error::Io(io::Error::new(
1528                    io::ErrorKind::PermissionDenied,
1529                    "blocked",
1530                ))),
1531            },
1532            ignore::Error::Io(io::Error::new(io::ErrorKind::Other, "unknown")),
1533        ]);
1534
1535        let scope = collect_walk_error_scope(&err);
1536
1537        assert_eq!(scope.paths, vec![PathBuf::from("/collection/blocked")]);
1538        assert!(scope.collection_incomplete);
1539    }
1540
1541    #[test]
1542    fn walk_error_scope_treats_ignore_rule_errors_as_incomplete() {
1543        let err = ignore::Error::WithPath {
1544            path: PathBuf::from("/collection/.gitignore"),
1545            err: Box::new(ignore::Error::WithLineNumber {
1546                line: 4,
1547                err: Box::new(ignore::Error::Glob {
1548                    glob: Some("[".to_string()),
1549                    err: "invalid glob".to_string(),
1550                }),
1551            }),
1552        };
1553
1554        let scope = collect_walk_error_scope(&err);
1555
1556        assert_eq!(scope.paths, vec![PathBuf::from("/collection/.gitignore")]);
1557        assert!(scope.collection_incomplete);
1558    }
1559}
1560
1561#[derive(Debug)]
1562struct PreparedChunkEmbedding {
1563    chunk_index: usize,
1564    doc_key: UpdateDocKey,
1565    space_name: String,
1566    path: std::path::PathBuf,
1567    embedding_text: String,
1568    max_document_tokens: usize,
1569}
1570
1571#[derive(Debug, Default)]
1572struct PreparedEmbeddingPreflight {
1573    accepted: Vec<PreparedChunkEmbedding>,
1574    rejected_chunk_indexes: Vec<usize>,
1575}
1576
1577#[derive(Debug)]
1578struct PendingChunkEmbedding {
1579    chunk_id: i64,
1580    doc_key: UpdateDocKey,
1581    space_name: String,
1582    path: std::path::PathBuf,
1583    embedding_text: String,
1584    max_document_tokens: usize,
1585}
1586
1587#[derive(Default)]
1588struct EmbeddedPendingBatch {
1589    embeddings: Vec<(i64, String, Vec<f32>)>,
1590    failed_chunk_ids: Vec<i64>,
1591}
1592
1593#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1594struct UpdateDocKey {
1595    space: String,
1596    collection_path: std::path::PathBuf,
1597    path: String,
1598}
1599
1600struct EmbeddingDocumentSizerCounter<'a> {
1601    inner: &'a dyn crate::models::EmbeddingDocumentSizer,
1602}
1603
1604impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1605    fn count(&self, text: &str) -> Result<usize> {
1606        count_document_tokens_profiled(
1607            self.inner,
1608            text,
1609            "tokenize_chunking",
1610            "tokenize_chunking_calls",
1611        )
1612    }
1613
1614    fn fits_within_token_limit_by_byte_len(&self, byte_len: usize, max_tokens: usize) -> bool {
1615        let fits = self
1616            .inner
1617            .fits_within_token_limit_by_byte_len(byte_len, max_tokens);
1618        if fits {
1619            crate::profile::increment_update_count("tokenize_chunking_byte_skips", 1);
1620        }
1621        fits
1622    }
1623}
1624
1625fn count_document_tokens_profiled(
1626    sizer: &dyn crate::models::EmbeddingDocumentSizer,
1627    text: &str,
1628    stage: &'static str,
1629    count: &'static str,
1630) -> Result<usize> {
1631    let started = Instant::now();
1632    let result = sizer.count_document_tokens(text);
1633    crate::profile::record_update_stage(stage, started.elapsed());
1634    crate::profile::increment_update_count(count, 1);
1635    result
1636}
1637
1638enum EmbeddingPreflightDecision {
1639    Accept,
1640    Reject(String),
1641}
1642
1643fn check_embedding_preflight(
1644    sizer: &dyn crate::models::EmbeddingDocumentSizer,
1645    embedding_text: &str,
1646    max_document_tokens: usize,
1647) -> EmbeddingPreflightDecision {
1648    if sizer.fits_within_token_limit_by_byte_len(embedding_text.len(), max_document_tokens) {
1649        crate::profile::increment_update_count("tokenize_preflight_byte_skips", 1);
1650        return EmbeddingPreflightDecision::Accept;
1651    }
1652
1653    match count_document_tokens_profiled(
1654        sizer,
1655        embedding_text,
1656        "tokenize_preflight",
1657        "tokenize_preflight_calls",
1658    ) {
1659        Ok(token_count) if token_count <= max_document_tokens => EmbeddingPreflightDecision::Accept,
1660        Ok(token_count) => EmbeddingPreflightDecision::Reject(format!(
1661            "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {max_document_tokens}"
1662        )),
1663        Err(err) => EmbeddingPreflightDecision::Reject(format!(
1664            "embed preflight token count failed: {err}"
1665        )),
1666    }
1667}
1668
1669fn prepare_chunk_embeddings(
1670    final_chunks: &[crate::ingest::chunk::FinalChunk],
1671    doc_key: &UpdateDocKey,
1672    target: &UpdateTarget,
1673    path: &Path,
1674    max_document_tokens: usize,
1675    title: Option<&str>,
1676    formatter: crate::models::EmbeddingInputFormatter,
1677) -> Vec<PreparedChunkEmbedding> {
1678    final_chunks
1679        .iter()
1680        .enumerate()
1681        .map(|(chunk_index, chunk)| {
1682            let dense_input = render_dense_document(ChunkRetrievalContext {
1683                body: chunk.text.as_str(),
1684                retrieval_prefix: chunk.retrieval_prefix.as_deref(),
1685                title,
1686                heading: chunk.heading.as_deref(),
1687            });
1688            let embedding_text =
1689                normalize_embedding_payload(formatter.format_document(&dense_input));
1690            PreparedChunkEmbedding {
1691                chunk_index,
1692                doc_key: doc_key.clone(),
1693                space_name: target.space.clone(),
1694                path: path.to_path_buf(),
1695                embedding_text,
1696                max_document_tokens,
1697            }
1698        })
1699        .collect()
1700}
1701
1702fn normalize_embedding_payload(text: String) -> String {
1703    if text.trim().is_empty() {
1704        " ".to_string()
1705    } else {
1706        text
1707    }
1708}
1709
1710#[derive(Debug, Clone)]
1711enum UpdateRepairScope {
1712    Global,
1713    Space { space_id: i64 },
1714    Collections { collection_ids: Vec<i64> },
1715}
1716
1717impl UpdateRepairScope {
1718    fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1719        if options.collections.is_empty() {
1720            if let Some(target) = targets.first() {
1721                if options.space.is_some() {
1722                    return Self::Space {
1723                        space_id: target.collection.space_id,
1724                    };
1725                }
1726            }
1727            return Self::Global;
1728        }
1729
1730        let mut collection_ids = targets
1731            .iter()
1732            .map(|target| target.collection.id)
1733            .collect::<Vec<_>>();
1734        collection_ids.sort_unstable();
1735        collection_ids.dedup();
1736        Self::Collections { collection_ids }
1737    }
1738
1739    fn allows_space_dense_repair(&self) -> bool {
1740        !matches!(self, Self::Collections { .. })
1741    }
1742}
1743
1744#[derive(Debug, Clone, Copy)]
1745enum PendingDocumentIndexing {
1746    Added,
1747    Updated { reactivated: bool },
1748}
1749
1750impl PendingDocumentIndexing {
1751    fn record(self, report: &mut UpdateReport) {
1752        match self {
1753            Self::Added => {
1754                report.added_docs += 1;
1755            }
1756            Self::Updated { reactivated } => {
1757                report.updated_docs += 1;
1758                if reactivated {
1759                    report.reactivated_docs += 1;
1760                }
1761            }
1762        }
1763    }
1764}
1765
1766fn document_has_current_canonical_text(
1767    document_text_generations: &HashMap<i64, String>,
1768    doc_id: i64,
1769    generation_key: &str,
1770) -> bool {
1771    document_text_generations
1772        .get(&doc_id)
1773        .is_some_and(|stored| stored == generation_key)
1774}
1775
1776fn finish_update_with_usearch_flush(
1777    update_result: Result<UpdateReport>,
1778    flush_result: Result<()>,
1779) -> Result<UpdateReport> {
1780    match (update_result, flush_result) {
1781        (Ok(report), Ok(())) => Ok(report),
1782        (Err(err), Ok(())) => Err(err),
1783        (Ok(_), Err(err)) => Err(err),
1784        (Err(update_err), Err(flush_err)) => Err(KboltError::Internal(format!(
1785            "update failed: {update_err}; additionally failed to save dirty vector indexes: {flush_err}"
1786        ))
1787        .into()),
1788    }
1789}
1790
1791fn invalid_pending_embedding_batch_detail(
1792    pending: &[PendingChunkEmbedding],
1793    vectors: &[Vec<f32>],
1794) -> Option<String> {
1795    if vectors.len() != pending.len() {
1796        return Some(format!(
1797            "embedder returned {} vectors for {} chunks",
1798            vectors.len(),
1799            pending.len()
1800        ));
1801    }
1802
1803    if vectors.iter().any(|vector| vector.is_empty()) {
1804        if pending.len() == 1 {
1805            return Some(format!(
1806                "embedder returned an empty vector for chunk {}",
1807                pending[0].chunk_id
1808            ));
1809        }
1810        return Some("embedder returned an empty vector".to_string());
1811    }
1812
1813    None
1814}
1815
1816fn push_update_decision(
1817    report: &mut UpdateReport,
1818    options: &UpdateOptions,
1819    target: &UpdateTarget,
1820    path: &str,
1821    kind: UpdateDecisionKind,
1822    detail: Option<String>,
1823) {
1824    if !options.verbose {
1825        return;
1826    }
1827
1828    report.decisions.push(UpdateDecision {
1829        space: target.space.clone(),
1830        collection: target.collection.name.clone(),
1831        path: path.to_string(),
1832        kind,
1833        detail,
1834    });
1835}
1836
1837fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1838    UpdateDocKey {
1839        space: space.to_string(),
1840        collection_path: collection_path.to_path_buf(),
1841        path: path.to_string(),
1842    }
1843}
1844
1845fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1846    policy
1847        .hard_max_tokens
1848        .max(policy.soft_max_tokens)
1849        .max(policy.target_tokens)
1850        .max(1)
1851}
1852
1853fn ingestion_generation_key(
1854    extractor_key: &str,
1855    extractor_version: u32,
1856    policy: &crate::config::ChunkPolicy,
1857) -> String {
1858    format!(
1859        "canonical=v{CANONICAL_TEXT_GENERATION};chunker=v{CHUNKER_GENERATION};extractor={extractor_key}:v{extractor_version};chunk=target:{}:soft:{}:hard:{}:overlap:{}:prefix:{}",
1860        policy.target_tokens,
1861        policy.soft_max_tokens,
1862        policy.hard_max_tokens,
1863        policy.boundary_overlap_tokens,
1864        policy.contextual_prefix
1865    )
1866}