Skip to main content

kbolt_core/engine/
update_ops.rs

1use super::*;
2use kbolt_types::{UpdateDecision, UpdateDecisionKind};
3
4const CANONICAL_TEXT_GENERATION: u32 = 1;
5const CHUNKER_GENERATION: u32 = 2;
6
7impl Engine {
8    pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
9        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
10        self.update_unlocked(options)
11    }
12
13    pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
14        let _profile = crate::profile::UpdateProfileGuard::start();
15        let update_result = self.run_update_unlocked(options);
16        let flush_result = crate::profile::timed_update_stage("usearch_save_dirty", || {
17            self.storage.save_dirty_usearch_indexes()
18        });
19        finish_update_with_usearch_flush(update_result, flush_result)
20    }
21
22    fn run_update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
23        let started = Instant::now();
24        let mut report = UpdateReport {
25            scanned_docs: 0,
26            skipped_mtime_docs: 0,
27            skipped_hash_docs: 0,
28            added_docs: 0,
29            updated_docs: 0,
30            failed_docs: 0,
31            deactivated_docs: 0,
32            reactivated_docs: 0,
33            reaped_docs: 0,
34            embedded_chunks: 0,
35            decisions: Vec::new(),
36            errors: Vec::new(),
37            elapsed_ms: 0,
38        };
39        let mut failed_docs = HashSet::new();
40
41        let targets = crate::profile::timed_update_stage("resolve_targets", || {
42            self.resolve_targets(TargetScope {
43                space: options.space.as_deref(),
44                collections: &options.collections,
45            })
46        })?;
47        if targets.is_empty() {
48            report.elapsed_ms = started.elapsed().as_millis() as u64;
49            return Ok(report);
50        }
51        let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
52
53        crate::profile::timed_update_stage("dense_integrity", || {
54            self.reconcile_dense_integrity(&targets, &repair_scope, &options)
55        })?;
56
57        if !options.dry_run {
58            crate::profile::timed_update_stage("fts_dirty_replay", || {
59                self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)
60            })?;
61        }
62
63        let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
64        let mut pending_embeddings = Vec::new();
65        let mut failed_embedding_chunk_ids = HashSet::new();
66        for target in &targets {
67            self.update_collection_target(
68                target,
69                &options,
70                &mut report,
71                &mut fts_dirty_by_space,
72                &mut pending_embeddings,
73                &mut failed_embedding_chunk_ids,
74                &mut failed_docs,
75            )?;
76        }
77
78        if !options.dry_run {
79            crate::profile::timed_update_stage("embedding_flush_buffered", || {
80                self.flush_buffered_embeddings(
81                    &mut pending_embeddings,
82                    &mut failed_embedding_chunk_ids,
83                    &mut report,
84                    &mut failed_docs,
85                )
86            })?;
87
88            for (space, doc_ids) in fts_dirty_by_space {
89                if doc_ids.is_empty() {
90                    continue;
91                }
92
93                crate::profile::timed_update_stage("tantivy_commit", || {
94                    self.storage.commit_tantivy(&space)
95                })?;
96                let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
97                ids.sort_unstable();
98                crate::profile::timed_update_stage("sqlite_clear_fts_dirty", || {
99                    self.storage.batch_clear_fts_dirty(&ids)
100                })?;
101            }
102
103            crate::profile::timed_update_stage("embedding_backlog", || {
104                self.embed_pending_chunks(
105                    &repair_scope,
106                    &options,
107                    &mut failed_embedding_chunk_ids,
108                    &mut report,
109                    &mut failed_docs,
110                )
111            })?;
112
113            let reaped = crate::profile::timed_update_stage("sqlite_list_reapable", || {
114                self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)
115            })?;
116            let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
117            let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
118            for document in reaped {
119                reaped_doc_ids.push(document.doc_id);
120                if !document.chunk_ids.is_empty() {
121                    chunk_ids_by_space
122                        .entry(document.space_name)
123                        .or_default()
124                        .extend(document.chunk_ids);
125                }
126            }
127            for (space, chunk_ids) in chunk_ids_by_space {
128                crate::profile::timed_update_stage("purge_reaped_indexes", || {
129                    self.purge_space_chunks(&space, &chunk_ids)
130                })?;
131            }
132            crate::profile::timed_update_stage("sqlite_delete_reaped_documents", || {
133                self.storage.delete_documents(&reaped_doc_ids)
134            })?;
135            report.reaped_docs = reaped_doc_ids.len();
136        }
137
138        report.failed_docs = failed_docs.len();
139        report.elapsed_ms = started.elapsed().as_millis() as u64;
140        Ok(report)
141    }
142
143    fn reconcile_dense_integrity(
144        &self,
145        targets: &[UpdateTarget],
146        repair_scope: &UpdateRepairScope,
147        options: &UpdateOptions,
148    ) -> Result<()> {
149        if options.no_embed || options.dry_run {
150            return Ok(());
151        }
152
153        let expected_model = self.embedding_model_key();
154        let mut visited_spaces = HashSet::new();
155        for target in targets {
156            if !visited_spaces.insert(target.collection.space_id) {
157                continue;
158            }
159
160            let models = self
161                .storage
162                .list_embedding_models_in_space(target.collection.space_id)?;
163            let mut reasons = Vec::new();
164            if models.iter().any(|model| model != expected_model) {
165                reasons.push(format!(
166                    "stored embedding models {:?} do not match current model '{}'",
167                    models, expected_model
168                ));
169            }
170
171            let sqlite_count = self
172                .storage
173                .count_embedded_chunks(Some(target.collection.space_id))?;
174            let usearch_count = self.storage.count_usearch(&target.space)?;
175            if sqlite_count != usearch_count {
176                reasons.push(format!(
177                    "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
178                ));
179            }
180
181            if reasons.is_empty() {
182                continue;
183            }
184
185            if !repair_scope.allows_space_dense_repair() {
186                return Err(KboltError::SpaceDenseRepairRequired {
187                    space: target.space.clone(),
188                    reason: reasons.join("; "),
189                }
190                .into());
191            }
192
193            self.storage
194                .delete_embeddings_for_space(target.collection.space_id)?;
195            self.storage.clear_usearch(&target.space)?;
196        }
197
198        Ok(())
199    }
200
201    fn embed_pending_chunks(
202        &self,
203        repair_scope: &UpdateRepairScope,
204        options: &UpdateOptions,
205        failed_chunk_ids: &mut HashSet<i64>,
206        report: &mut UpdateReport,
207        failed_docs: &mut HashSet<UpdateDocKey>,
208    ) -> Result<()> {
209        if options.no_embed || options.dry_run {
210            return Ok(());
211        }
212
213        let Some(embedder) = self.embedder.as_ref() else {
214            return Ok(());
215        };
216
217        let model = self.embedding_model_key();
218        let batch_window = embedder.preferred_document_batch_window().max(1);
219        let mut after_chunk_id = 0_i64;
220        loop {
221            let backlog =
222                crate::profile::timed_update_stage("sqlite_get_unembedded_chunks", || {
223                    self.get_unembedded_chunks_for_scope(
224                        model,
225                        repair_scope,
226                        after_chunk_id,
227                        batch_window,
228                    )
229                })?;
230            if backlog.is_empty() {
231                break;
232            }
233
234            after_chunk_id = backlog
235                .last()
236                .map(|record| record.chunk.id)
237                .expect("non-empty backlog should have a last chunk id");
238
239            let backlog_doc_ids = backlog
240                .iter()
241                .filter(|record| !failed_chunk_ids.contains(&record.chunk.id))
242                .map(|record| record.chunk.doc_id)
243                .collect::<Vec<_>>();
244            let document_text_cache =
245                crate::profile::timed_update_stage("embedding_backlog_read", || {
246                    self.storage.get_existing_document_texts(&backlog_doc_ids)
247                })?;
248
249            let mut pending = Vec::new();
250            for record in backlog {
251                if failed_chunk_ids.contains(&record.chunk.id) {
252                    continue;
253                }
254
255                let full_path = record.collection_path.join(&record.doc_path);
256                let Some(document_text) = document_text_cache.get(&record.chunk.doc_id) else {
257                    report.errors.push(file_error(
258                        Some(full_path.clone()),
259                        format!(
260                            "embed canonical text load failed: {}",
261                            crate::storage::missing_document_text_error(record.chunk.doc_id)
262                        ),
263                    ));
264                    failed_docs.insert(update_doc_key(
265                        &record.space_name,
266                        &record.collection_path,
267                        &record.doc_path,
268                    ));
269                    continue;
270                };
271                let chunk_canonical_text = match crate::storage::chunk_text_from_canonical(
272                    document_text.text.as_str(),
273                    &record.chunk,
274                ) {
275                    Ok(text) => text,
276                    Err(err) => {
277                        report.errors.push(file_error(
278                            Some(full_path.clone()),
279                            format!("embed canonical text load failed: {err}"),
280                        ));
281                        failed_docs.insert(update_doc_key(
282                            &record.space_name,
283                            &record.collection_path,
284                            &record.doc_path,
285                        ));
286                        continue;
287                    }
288                };
289                let mut text = crate::ingest::chunk::chunk_retrieval_body(
290                    chunk_canonical_text.as_str(),
291                    record.chunk.retrieval_prefix.as_deref(),
292                );
293                if text.trim().is_empty() {
294                    text = " ".to_string();
295                }
296                let policy = resolve_policy(
297                    &self.config.chunking,
298                    Some(document_text.extractor_key.as_str()),
299                    None,
300                );
301                let max_document_tokens = effective_chunk_hard_max(&policy);
302
303                pending.push(PendingChunkEmbedding {
304                    chunk_id: record.chunk.id,
305                    doc_key: update_doc_key(
306                        &record.space_name,
307                        &record.collection_path,
308                        &record.doc_path,
309                    ),
310                    space_name: record.space_name,
311                    path: full_path,
312                    text,
313                    max_document_tokens,
314                });
315            }
316
317            if pending.is_empty() {
318                continue;
319            }
320
321            let mut preflight_failed_chunk_ids = Vec::new();
322            let pending = self.preflight_pending_embeddings(
323                pending,
324                report,
325                failed_docs,
326                &mut preflight_failed_chunk_ids,
327            )?;
328            failed_chunk_ids.extend(preflight_failed_chunk_ids);
329            if pending.is_empty() {
330                continue;
331            }
332
333            let result = self.embed_preflighted_pending_batch_with_partial_failures(
334                embedder.as_ref(),
335                pending,
336                report,
337                failed_docs,
338            )?;
339            failed_chunk_ids.extend(result.failed_chunk_ids);
340            if !result.embeddings.is_empty() {
341                self.store_chunk_embeddings(model, result.embeddings, report)?;
342            }
343        }
344
345        Ok(())
346    }
347
348    fn flush_buffered_embeddings(
349        &self,
350        pending_embeddings: &mut Vec<PendingChunkEmbedding>,
351        failed_chunk_ids: &mut HashSet<i64>,
352        report: &mut UpdateReport,
353        failed_docs: &mut HashSet<UpdateDocKey>,
354    ) -> Result<()> {
355        if pending_embeddings.is_empty() {
356            return Ok(());
357        }
358
359        let Some(embedder) = self.embedder.as_ref() else {
360            pending_embeddings.clear();
361            return Ok(());
362        };
363
364        let pending = std::mem::take(pending_embeddings);
365        let result = self.embed_preflighted_pending_batch_with_partial_failures(
366            embedder.as_ref(),
367            pending,
368            report,
369            failed_docs,
370        )?;
371        failed_chunk_ids.extend(result.failed_chunk_ids);
372        if result.embeddings.is_empty() {
373            return Ok(());
374        }
375
376        self.store_chunk_embeddings(self.embedding_model_key(), result.embeddings, report)
377    }
378
379    fn document_embedding_batch_window(&self) -> usize {
380        self.embedder
381            .as_ref()
382            .map(|embedder| embedder.preferred_document_batch_window())
383            .unwrap_or(crate::models::DEFAULT_DOCUMENT_BATCH_WINDOW)
384            .max(1)
385    }
386
387    fn embed_preflighted_pending_batch_with_partial_failures(
388        &self,
389        embedder: &dyn crate::models::Embedder,
390        pending: Vec<PendingChunkEmbedding>,
391        report: &mut UpdateReport,
392        failed_docs: &mut HashSet<UpdateDocKey>,
393    ) -> Result<EmbeddedPendingBatch> {
394        if pending.is_empty() {
395            return Ok(EmbeddedPendingBatch::default());
396        }
397
398        let texts = pending
399            .iter()
400            .map(|embedding| embedding.text.clone())
401            .collect::<Vec<_>>();
402        crate::profile::increment_update_count("embedding_batch_calls", 1);
403        crate::profile::increment_update_count("embedding_input_texts", texts.len() as u64);
404        match crate::profile::timed_update_stage("embedding_http", || {
405            embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts)
406        }) {
407            Ok(vectors) => {
408                if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
409                    return self.split_pending_embedding_batch(
410                        embedder,
411                        pending,
412                        report,
413                        failed_docs,
414                        detail,
415                    );
416                }
417
418                let embeddings = pending
419                    .into_iter()
420                    .zip(vectors)
421                    .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
422                    .collect::<Vec<_>>();
423                Ok(EmbeddedPendingBatch {
424                    embeddings,
425                    failed_chunk_ids: Vec::new(),
426                })
427            }
428            Err(err) => self.split_pending_embedding_batch(
429                embedder,
430                pending,
431                report,
432                failed_docs,
433                err.to_string(),
434            ),
435        }
436    }
437
438    fn split_pending_embedding_batch(
439        &self,
440        embedder: &dyn crate::models::Embedder,
441        pending: Vec<PendingChunkEmbedding>,
442        report: &mut UpdateReport,
443        failed_docs: &mut HashSet<UpdateDocKey>,
444        detail: String,
445    ) -> Result<EmbeddedPendingBatch> {
446        if pending.len() == 1 {
447            let embedding = pending
448                .into_iter()
449                .next()
450                .expect("single pending embedding should exist");
451            report.errors.push(file_error(
452                Some(embedding.path),
453                format!("embed failed: {detail}"),
454            ));
455            failed_docs.insert(embedding.doc_key);
456            return Ok(EmbeddedPendingBatch {
457                embeddings: Vec::new(),
458                failed_chunk_ids: vec![embedding.chunk_id],
459            });
460        }
461
462        let mid = pending.len() / 2;
463        let mut right = pending;
464        let left = right.drain(..mid).collect::<Vec<_>>();
465
466        let mut left_result = self.embed_preflighted_pending_batch_with_partial_failures(
467            embedder,
468            left,
469            report,
470            failed_docs,
471        )?;
472        let right_result = self.embed_preflighted_pending_batch_with_partial_failures(
473            embedder,
474            right,
475            report,
476            failed_docs,
477        )?;
478        left_result.embeddings.extend(right_result.embeddings);
479        left_result
480            .failed_chunk_ids
481            .extend(right_result.failed_chunk_ids);
482        Ok(left_result)
483    }
484
485    fn store_chunk_embeddings(
486        &self,
487        model: &str,
488        embeddings: Vec<(i64, String, Vec<f32>)>,
489        report: &mut UpdateReport,
490    ) -> Result<()> {
491        let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
492        let mut embedding_rows = Vec::with_capacity(embeddings.len());
493        for (chunk_id, space_name, vector) in embeddings {
494            if vector.is_empty() {
495                return Err(KboltError::Inference(format!(
496                    "embedder returned an empty vector for chunk {chunk_id}"
497                ))
498                .into());
499            }
500
501            grouped_vectors
502                .entry(space_name)
503                .or_default()
504                .push((chunk_id, vector));
505            embedding_rows.push((chunk_id, model));
506        }
507
508        for (space, vectors) in grouped_vectors {
509            let refs = vectors
510                .iter()
511                .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
512                .collect::<Vec<_>>();
513            crate::profile::increment_update_count("usearch_vectors", refs.len() as u64);
514            self.storage.batch_insert_usearch_deferred(&space, &refs)?;
515        }
516
517        crate::profile::timed_update_stage("sqlite_insert_embeddings", || {
518            self.storage.insert_embeddings(&embedding_rows)
519        })?;
520        report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
521        Ok(())
522    }
523
524    fn preflight_pending_embeddings(
525        &self,
526        pending: Vec<PendingChunkEmbedding>,
527        report: &mut UpdateReport,
528        failed_docs: &mut HashSet<UpdateDocKey>,
529        failed_chunk_ids: &mut Vec<i64>,
530    ) -> Result<Vec<PendingChunkEmbedding>> {
531        let Some(sizer) = self.embedding_document_sizer.as_ref() else {
532            return Ok(pending);
533        };
534
535        let mut accepted = Vec::with_capacity(pending.len());
536        for embedding in pending {
537            if sizer.fits_within_token_limit_by_byte_len(
538                embedding.text.len(),
539                embedding.max_document_tokens,
540            ) {
541                crate::profile::increment_update_count("tokenize_preflight_byte_skips", 1);
542                accepted.push(embedding);
543                continue;
544            }
545
546            match count_document_tokens_profiled(
547                sizer.as_ref(),
548                &embedding.text,
549                "tokenize_preflight",
550                "tokenize_preflight_calls",
551            ) {
552                Ok(token_count) if token_count <= embedding.max_document_tokens => {
553                    accepted.push(embedding);
554                }
555                Ok(token_count) => {
556                    report.errors.push(file_error(
557                        Some(embedding.path.clone()),
558                        format!(
559                            "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
560                            embedding.max_document_tokens
561                        ),
562                    ));
563                    failed_docs.insert(embedding.doc_key);
564                    failed_chunk_ids.push(embedding.chunk_id);
565                }
566                Err(err) => {
567                    report.errors.push(file_error(
568                        Some(embedding.path.clone()),
569                        format!("embed preflight token count failed: {err}"),
570                    ));
571                    failed_docs.insert(embedding.doc_key);
572                    failed_chunk_ids.push(embedding.chunk_id);
573                }
574            }
575        }
576
577        Ok(accepted)
578    }
579
580    fn preflight_prepared_embeddings(
581        &self,
582        prepared: Vec<PreparedChunkEmbedding>,
583        report: &mut UpdateReport,
584        failed_docs: &mut HashSet<UpdateDocKey>,
585    ) -> Result<PreparedEmbeddingPreflight> {
586        let Some(sizer) = self.embedding_document_sizer.as_ref() else {
587            return Ok(PreparedEmbeddingPreflight {
588                accepted: prepared,
589                rejected_chunk_indexes: Vec::new(),
590            });
591        };
592
593        let mut preflight = PreparedEmbeddingPreflight {
594            accepted: Vec::with_capacity(prepared.len()),
595            rejected_chunk_indexes: Vec::new(),
596        };
597        for embedding in prepared {
598            if sizer.fits_within_token_limit_by_byte_len(
599                embedding.text.len(),
600                embedding.max_document_tokens,
601            ) {
602                crate::profile::increment_update_count("tokenize_preflight_byte_skips", 1);
603                preflight.accepted.push(embedding);
604                continue;
605            }
606
607            match count_document_tokens_profiled(
608                sizer.as_ref(),
609                &embedding.text,
610                "tokenize_preflight",
611                "tokenize_preflight_calls",
612            ) {
613                Ok(token_count) if token_count <= embedding.max_document_tokens => {
614                    preflight.accepted.push(embedding);
615                }
616                Ok(token_count) => {
617                    report.errors.push(file_error(
618                        Some(embedding.path.clone()),
619                        format!(
620                            "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
621                            embedding.max_document_tokens
622                        ),
623                    ));
624                    failed_docs.insert(embedding.doc_key);
625                    preflight.rejected_chunk_indexes.push(embedding.chunk_index);
626                }
627                Err(err) => {
628                    report.errors.push(file_error(
629                        Some(embedding.path.clone()),
630                        format!("embed preflight token count failed: {err}"),
631                    ));
632                    failed_docs.insert(embedding.doc_key);
633                    preflight.rejected_chunk_indexes.push(embedding.chunk_index);
634                }
635            }
636        }
637
638        Ok(preflight)
639    }
640
641    fn replay_fts_dirty_documents(
642        &self,
643        repair_scope: &UpdateRepairScope,
644        report: &mut UpdateReport,
645        failed_docs: &mut HashSet<UpdateDocKey>,
646    ) -> Result<()> {
647        let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
648        if records.is_empty() {
649            return Ok(());
650        }
651
652        let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
653        for record in records {
654            let space_name = record.space_name;
655            let doc_id = record.doc_id;
656
657            if record.chunks.is_empty() {
658                self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
659                cleared_by_space.entry(space_name).or_default().push(doc_id);
660                continue;
661            }
662
663            let document_text = match self.storage.get_document_text(doc_id) {
664                Ok(row) => row,
665                Err(err) => {
666                    failed_docs.insert(update_doc_key(
667                        &space_name,
668                        &record.collection_path,
669                        &record.doc_path,
670                    ));
671                    report.errors.push(file_error(
672                        Some(record.collection_path.join(&record.doc_path)),
673                        format!("fts replay canonical text load failed: {err}"),
674                    ));
675                    continue;
676                }
677            };
678            let policy = resolve_policy(
679                &self.config.chunking,
680                Some(document_text.extractor_key.as_str()),
681                None,
682            );
683
684            let entries = record
685                .chunks
686                .iter()
687                .map(|chunk| -> Result<TantivyEntry> {
688                    let chunk_text = crate::storage::chunk_text_from_canonical(
689                        document_text.text.as_str(),
690                        chunk,
691                    )?;
692                    let chunk_body = crate::ingest::chunk::chunk_retrieval_body(
693                        chunk_text.as_str(),
694                        chunk.retrieval_prefix.as_deref(),
695                    );
696                    Ok(TantivyEntry {
697                        chunk_id: chunk.id,
698                        doc_id,
699                        filepath: record.doc_path.clone(),
700                        semantic_title: record
701                            .doc_title_source
702                            .semantic_title(record.doc_title.as_str())
703                            .map(ToString::to_string),
704                        heading: chunk.heading.clone(),
705                        body: retrieval_text_with_prefix(
706                            chunk_body.as_str(),
707                            record
708                                .doc_title_source
709                                .semantic_title(record.doc_title.as_str()),
710                            chunk.heading.as_deref(),
711                            policy.contextual_prefix,
712                        ),
713                    })
714                })
715                .collect::<Result<Vec<_>>>()?;
716
717            self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
718            self.storage.index_tantivy(&space_name, &entries)?;
719            cleared_by_space.entry(space_name).or_default().push(doc_id);
720        }
721
722        for (space_name, mut doc_ids) in cleared_by_space {
723            if doc_ids.is_empty() {
724                continue;
725            }
726
727            doc_ids.sort_unstable();
728            doc_ids.dedup();
729            self.storage.commit_tantivy(&space_name)?;
730            self.storage.batch_clear_fts_dirty(&doc_ids)?;
731        }
732
733        Ok(())
734    }
735
736    fn get_fts_dirty_documents_for_scope(
737        &self,
738        repair_scope: &UpdateRepairScope,
739    ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
740        match repair_scope {
741            UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
742            UpdateRepairScope::Space { space_id } => {
743                self.storage.get_fts_dirty_documents_in_space(*space_id)
744            }
745            UpdateRepairScope::Collections { collection_ids } => self
746                .storage
747                .get_fts_dirty_documents_in_collections(collection_ids),
748        }
749    }
750
751    fn get_unembedded_chunks_for_scope(
752        &self,
753        model: &str,
754        repair_scope: &UpdateRepairScope,
755        after_chunk_id: i64,
756        limit: usize,
757    ) -> Result<Vec<crate::storage::EmbedRecord>> {
758        match repair_scope {
759            UpdateRepairScope::Global => {
760                self.storage
761                    .get_unembedded_chunks(model, after_chunk_id, limit)
762            }
763            UpdateRepairScope::Space { space_id } => {
764                self.storage
765                    .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
766            }
767            UpdateRepairScope::Collections { collection_ids } => self
768                .storage
769                .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
770        }
771    }
772
773    fn list_reapable_documents_for_scope(
774        &self,
775        older_than_days: u32,
776        repair_scope: &UpdateRepairScope,
777    ) -> Result<Vec<crate::storage::ReapableDocument>> {
778        match repair_scope {
779            UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
780            UpdateRepairScope::Space { space_id } => self
781                .storage
782                .list_reapable_documents_in_space(older_than_days, *space_id),
783            UpdateRepairScope::Collections { collection_ids } => self
784                .storage
785                .list_reapable_documents_in_collections(older_than_days, collection_ids),
786        }
787    }
788
789    pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
790        self.resolve_targets(TargetScope {
791            space: options.space.as_deref(),
792            collections: &options.collections,
793        })
794    }
795
796    pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
797        let mut targets = Vec::new();
798
799        if scope.collections.is_empty() {
800            return self.resolve_update_targets_for_all_collections(scope.space);
801        }
802
803        let mut seen = std::collections::HashSet::new();
804        for raw_collection_name in scope.collections {
805            let collection_name = raw_collection_name.trim();
806            if collection_name.is_empty() {
807                return Err(KboltError::InvalidInput(
808                    "collection names cannot be empty".to_string(),
809                )
810                .into());
811            }
812
813            let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
814            let collection = self
815                .storage
816                .get_collection(resolved_space.id, collection_name)?;
817
818            if seen.insert((collection.space_id, collection.name.clone())) {
819                targets.push(UpdateTarget {
820                    space: resolved_space.name,
821                    collection,
822                });
823            }
824        }
825
826        Ok(targets)
827    }
828
829    fn resolve_update_targets_for_all_collections(
830        &self,
831        space: Option<&str>,
832    ) -> Result<Vec<UpdateTarget>> {
833        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
834            let resolved = self.resolve_space_row(Some(space_name), None)?;
835            let mut map = std::collections::HashMap::new();
836            map.insert(resolved.id, resolved.name.clone());
837            (Some(resolved.id), map)
838        } else {
839            let spaces = self.storage.list_spaces()?;
840            let map = spaces
841                .into_iter()
842                .map(|space| (space.id, space.name))
843                .collect::<std::collections::HashMap<_, _>>();
844            (None, map)
845        };
846
847        let collections = self.storage.list_collections(space_id_filter)?;
848        let mut targets = Vec::with_capacity(collections.len());
849        for collection in collections {
850            let space_name = spaces_by_id
851                .get(&collection.space_id)
852                .ok_or_else(|| {
853                    KboltError::Internal(format!(
854                        "missing space mapping for collection '{}'",
855                        collection.name
856                    ))
857                })?
858                .clone();
859            targets.push(UpdateTarget {
860                space: space_name,
861                collection,
862            });
863        }
864
865        Ok(targets)
866    }
867
868    fn update_collection_target(
869        &self,
870        target: &UpdateTarget,
871        options: &UpdateOptions,
872        report: &mut UpdateReport,
873        fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
874        pending_embeddings: &mut Vec<PendingChunkEmbedding>,
875        failed_chunk_ids: &mut HashSet<i64>,
876        failed_docs: &mut HashSet<UpdateDocKey>,
877    ) -> Result<()> {
878        let all_documents = crate::profile::timed_update_stage("sqlite_list_documents", || {
879            self.storage.list_documents(target.collection.id, false)
880        })?;
881        let document_ids = all_documents.iter().map(|doc| doc.id).collect::<Vec<_>>();
882        let document_text_generations =
883            crate::profile::timed_update_stage("sqlite_list_document_text_generations", || {
884                self.storage
885                    .get_document_text_generation_keys(&document_ids)
886            })?;
887        let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
888            .into_iter()
889            .map(|doc| (doc.path.clone(), doc))
890            .collect();
891        let mut seen_paths = HashSet::new();
892        let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
893        let ignore_matcher = load_collection_ignore_matcher(
894            &self.config.config_dir,
895            &target.collection.path,
896            &target.space,
897            &target.collection.name,
898        )?;
899        let extractor_registry = default_registry();
900        let embedding_batch_window = self.document_embedding_batch_window();
901        let mut touched_collection = false;
902        let mut failed_walk_prefixes = Vec::new();
903        let mut collection_walk_incomplete = false;
904
905        for entry in super::ignore_helpers::build_collection_walk(&target.collection.path) {
906            let entry = match entry {
907                Ok(item) => item,
908                Err(err) => {
909                    let error_scope = collect_walk_error_scope(&err);
910                    let error_path = error_scope.paths.first().cloned();
911                    if error_scope.collection_incomplete || error_scope.paths.is_empty() {
912                        collection_walk_incomplete = true;
913                    }
914                    for path in &error_scope.paths {
915                        let failed_path =
916                            match collection_relative_path(&target.collection.path, path) {
917                                Ok(relative) => {
918                                    if relative.is_empty() || relative == "." {
919                                        collection_walk_incomplete = true;
920                                    } else {
921                                        failed_walk_prefixes.push(relative.clone());
922                                    }
923                                    relative
924                                }
925                                Err(_) => {
926                                    collection_walk_incomplete = true;
927                                    path.display().to_string()
928                                }
929                            };
930                        failed_docs.insert(update_doc_key(
931                            &target.space,
932                            &target.collection.path,
933                            &failed_path,
934                        ));
935                    }
936                    report
937                        .errors
938                        .push(file_error(error_path, format!("walk error: {err}")));
939                    continue;
940                }
941            };
942
943            if !entry
944                .file_type()
945                .is_some_and(|file_type| file_type.is_file())
946            {
947                continue;
948            }
949
950            if is_hard_ignored_file(entry.path()) {
951                continue;
952            }
953
954            let relative_path =
955                match collection_relative_path(&target.collection.path, entry.path()) {
956                    Ok(path) => path,
957                    Err(err) => {
958                        failed_docs.insert(update_doc_key(
959                            &target.space,
960                            &target.collection.path,
961                            &entry.path().display().to_string(),
962                        ));
963                        report.errors.push(file_error(
964                            Some(entry.path().to_path_buf()),
965                            err.to_string(),
966                        ));
967                        continue;
968                    }
969                };
970
971            if !extension_allowed(entry.path(), extension_filter.as_ref()) {
972                push_update_decision(
973                    report,
974                    options,
975                    target,
976                    &relative_path,
977                    UpdateDecisionKind::Unsupported,
978                    Some("extension not allowed".to_string()),
979                );
980                continue;
981            }
982
983            if let Some(matcher) = ignore_matcher.as_ref() {
984                if matcher
985                    .matched(Path::new(&relative_path), false)
986                    .is_ignore()
987                {
988                    push_update_decision(
989                        report,
990                        options,
991                        target,
992                        &relative_path,
993                        UpdateDecisionKind::Ignored,
994                        Some("matched ignore patterns".to_string()),
995                    );
996                    continue;
997                }
998            }
999
1000            let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
1001                push_update_decision(
1002                    report,
1003                    options,
1004                    target,
1005                    &relative_path,
1006                    UpdateDecisionKind::Unsupported,
1007                    Some("no extractor available".to_string()),
1008                );
1009                continue;
1010            };
1011            let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
1012            let generation_key =
1013                ingestion_generation_key(extractor.profile_key(), extractor.version(), &policy);
1014
1015            report.scanned_docs += 1;
1016            crate::profile::increment_update_count("docs_scanned", 1);
1017            seen_paths.insert(relative_path.clone());
1018
1019            let scan_started = Instant::now();
1020            let metadata = match entry.metadata() {
1021                Ok(data) => data,
1022                Err(err) => {
1023                    let detail = format!("metadata error: {err}");
1024                    failed_docs.insert(update_doc_key(
1025                        &target.space,
1026                        &target.collection.path,
1027                        &relative_path,
1028                    ));
1029                    push_update_decision(
1030                        report,
1031                        options,
1032                        target,
1033                        &relative_path,
1034                        UpdateDecisionKind::ReadFailed,
1035                        Some(detail.clone()),
1036                    );
1037                    report
1038                        .errors
1039                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1040                    crate::profile::record_update_stage(
1041                        "scan_metadata_mtime",
1042                        scan_started.elapsed(),
1043                    );
1044                    continue;
1045                }
1046            };
1047
1048            let modified = match modified_token(&metadata) {
1049                Ok(value) => value,
1050                Err(err) => {
1051                    let detail = format!("modified timestamp error: {err}");
1052                    failed_docs.insert(update_doc_key(
1053                        &target.space,
1054                        &target.collection.path,
1055                        &relative_path,
1056                    ));
1057                    push_update_decision(
1058                        report,
1059                        options,
1060                        target,
1061                        &relative_path,
1062                        UpdateDecisionKind::ReadFailed,
1063                        Some(detail.clone()),
1064                    );
1065                    report
1066                        .errors
1067                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1068                    crate::profile::record_update_stage(
1069                        "scan_metadata_mtime",
1070                        scan_started.elapsed(),
1071                    );
1072                    continue;
1073                }
1074            };
1075
1076            if let Some(existing) = docs_by_path.get(&relative_path) {
1077                let has_current_canonical_text = document_has_current_canonical_text(
1078                    &document_text_generations,
1079                    existing.id,
1080                    generation_key.as_str(),
1081                );
1082                if existing.active && existing.modified == modified && has_current_canonical_text {
1083                    report.skipped_mtime_docs += 1;
1084                    push_update_decision(
1085                        report,
1086                        options,
1087                        target,
1088                        &relative_path,
1089                        UpdateDecisionKind::SkippedMtime,
1090                        None,
1091                    );
1092                    crate::profile::record_update_stage(
1093                        "scan_metadata_mtime",
1094                        scan_started.elapsed(),
1095                    );
1096                    continue;
1097                }
1098            }
1099            crate::profile::record_update_stage("scan_metadata_mtime", scan_started.elapsed());
1100
1101            let read_hash_started = Instant::now();
1102            let bytes = match std::fs::read(entry.path()) {
1103                Ok(data) => data,
1104                Err(err) => {
1105                    let detail = err.to_string();
1106                    failed_docs.insert(update_doc_key(
1107                        &target.space,
1108                        &target.collection.path,
1109                        &relative_path,
1110                    ));
1111                    push_update_decision(
1112                        report,
1113                        options,
1114                        target,
1115                        &relative_path,
1116                        UpdateDecisionKind::ReadFailed,
1117                        Some(detail.clone()),
1118                    );
1119                    report
1120                        .errors
1121                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1122                    crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1123                    continue;
1124                }
1125            };
1126            let hash = sha256_hex(&bytes);
1127            crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1128            let mut title = file_title(entry.path());
1129            let mut title_source = DocumentTitleSource::FilenameFallback;
1130
1131            let existing = docs_by_path.get(&relative_path).cloned();
1132            let pending_decision;
1133            let pending_indexing;
1134            if let Some(doc) = existing.as_ref() {
1135                let has_current_canonical_text = document_has_current_canonical_text(
1136                    &document_text_generations,
1137                    doc.id,
1138                    generation_key.as_str(),
1139                );
1140                if doc.hash == hash && has_current_canonical_text {
1141                    if doc.active {
1142                        report.skipped_hash_docs += 1;
1143                        push_update_decision(
1144                            report,
1145                            options,
1146                            target,
1147                            &relative_path,
1148                            UpdateDecisionKind::SkippedHash,
1149                            None,
1150                        );
1151                    } else {
1152                        report.reactivated_docs += 1;
1153                        push_update_decision(
1154                            report,
1155                            options,
1156                            target,
1157                            &relative_path,
1158                            UpdateDecisionKind::Reactivated,
1159                            None,
1160                        );
1161                    }
1162
1163                    if !options.dry_run {
1164                        crate::profile::timed_update_stage("sqlite_refresh_document", || {
1165                            self.storage.refresh_document_activity(doc.id, &modified)
1166                        })?;
1167                    }
1168                    continue;
1169                }
1170
1171                pending_indexing = PendingDocumentIndexing::Updated {
1172                    reactivated: !doc.active,
1173                };
1174                pending_decision = (
1175                    UpdateDecisionKind::Changed,
1176                    (!doc.active).then_some("reactivated".to_string()),
1177                );
1178            } else {
1179                pending_indexing = PendingDocumentIndexing::Added;
1180                pending_decision = (UpdateDecisionKind::New, None);
1181            }
1182
1183            if options.dry_run {
1184                pending_indexing.record(report);
1185                let (kind, detail) = pending_decision;
1186                push_update_decision(report, options, target, &relative_path, kind, detail);
1187                continue;
1188            }
1189
1190            let extracted = match crate::profile::timed_update_stage("extract", || {
1191                extractor.extract(entry.path(), &bytes)
1192            }) {
1193                Ok(document) => document,
1194                Err(err) => {
1195                    let detail = format!("extract failed: {err}");
1196                    failed_docs.insert(update_doc_key(
1197                        &target.space,
1198                        &target.collection.path,
1199                        &relative_path,
1200                    ));
1201                    push_update_decision(
1202                        report,
1203                        options,
1204                        target,
1205                        &relative_path,
1206                        UpdateDecisionKind::ExtractFailed,
1207                        Some(detail.clone()),
1208                    );
1209                    report
1210                        .errors
1211                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1212                    continue;
1213                }
1214            };
1215            if let Some(extracted_title) = extracted
1216                .title
1217                .as_deref()
1218                .map(str::trim)
1219                .filter(|title| !title.is_empty())
1220            {
1221                title = extracted_title.to_string();
1222                title_source = DocumentTitleSource::Extracted;
1223            }
1224
1225            let canonical = crate::profile::timed_update_stage("canonical_text", || {
1226                build_canonical_document(&extracted)
1227            });
1228            let text_hash = sha256_hex(canonical.text.as_bytes());
1229            let max_document_tokens = effective_chunk_hard_max(&policy);
1230            let chunk_started = Instant::now();
1231            let final_chunks_result = match self.embedding_document_sizer.as_ref() {
1232                Some(sizer) => {
1233                    let sizer_counter = EmbeddingDocumentSizerCounter {
1234                        inner: sizer.as_ref(),
1235                    };
1236                    chunk_canonical_document_with_counter(
1237                        &canonical.document,
1238                        &policy,
1239                        &sizer_counter,
1240                    )
1241                }
1242                None => Ok(chunk_canonical_document(&canonical.document, &policy)),
1243            };
1244            crate::profile::record_update_stage("chunk", chunk_started.elapsed());
1245            let final_chunks = match final_chunks_result {
1246                Ok(chunks) => chunks,
1247                Err(err) => {
1248                    let detail = format!("chunking failed: {err}");
1249                    failed_docs.insert(update_doc_key(
1250                        &target.space,
1251                        &target.collection.path,
1252                        &relative_path,
1253                    ));
1254                    push_update_decision(
1255                        report,
1256                        options,
1257                        target,
1258                        &relative_path,
1259                        UpdateDecisionKind::ExtractFailed,
1260                        Some(detail.clone()),
1261                    );
1262                    report
1263                        .errors
1264                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1265                    continue;
1266                }
1267            };
1268            crate::profile::increment_update_count("chunks_created", final_chunks.len() as u64);
1269
1270            let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1271            let chunk_inserts = final_chunks
1272                .iter()
1273                .enumerate()
1274                .map(|(index, chunk)| ChunkInsert {
1275                    seq: index as i32,
1276                    offset: chunk.offset,
1277                    length: chunk.length,
1278                    heading: chunk.heading.clone(),
1279                    kind: chunk.kind,
1280                    retrieval_prefix: chunk.retrieval_prefix.clone(),
1281                })
1282                .collect::<Vec<_>>();
1283            let mut prepared_embeddings = Vec::new();
1284            if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1285                let prepared = crate::profile::timed_update_stage("embedding_prepare_text", || {
1286                    prepare_chunk_embeddings(
1287                        &final_chunks,
1288                        &doc_key,
1289                        target,
1290                        entry.path(),
1291                        max_document_tokens,
1292                    )
1293                });
1294                if existing.is_some() {
1295                    let preflight =
1296                        self.preflight_prepared_embeddings(prepared, report, failed_docs)?;
1297                    prepared_embeddings = preflight.accepted;
1298                    if !preflight.rejected_chunk_indexes.is_empty() {
1299                        push_update_decision(
1300                            report,
1301                            options,
1302                            target,
1303                            &relative_path,
1304                            UpdateDecisionKind::ExtractFailed,
1305                            Some("embed preflight failed".to_string()),
1306                        );
1307                        continue;
1308                    }
1309                } else {
1310                    prepared_embeddings = prepared;
1311                }
1312            }
1313
1314            let replacement =
1315                crate::profile::timed_update_stage("sqlite_replace_document_generation", || {
1316                    self.storage
1317                        .replace_document_generation(DocumentGenerationReplace {
1318                            collection_id: target.collection.id,
1319                            path: &relative_path,
1320                            title: &title,
1321                            title_source,
1322                            hash: &hash,
1323                            modified: &modified,
1324                            extractor_key: extractor.profile_key(),
1325                            source_hash: &hash,
1326                            text_hash: &text_hash,
1327                            generation_key: &generation_key,
1328                            text: canonical.text.as_str(),
1329                            chunks: &chunk_inserts,
1330                        })
1331                })?;
1332            let doc_id = replacement.doc_id;
1333            let chunk_ids = replacement.chunk_ids;
1334
1335            if !replacement.old_chunk_ids.is_empty() {
1336                crate::profile::timed_update_stage("tantivy_delete", || {
1337                    self.storage
1338                        .delete_tantivy(&target.space, &replacement.old_chunk_ids)
1339                })?;
1340                crate::profile::timed_update_stage("usearch_delete", || {
1341                    self.storage
1342                        .delete_usearch_deferred(&target.space, &replacement.old_chunk_ids)
1343                })?;
1344            }
1345
1346            fts_dirty_by_space
1347                .entry(target.space.clone())
1348                .or_default()
1349                .insert(doc_id);
1350
1351            if !chunk_ids.is_empty() {
1352                if !options.no_embed && self.embedder.is_some() {
1353                    for prepared in prepared_embeddings {
1354                        pending_embeddings.push(PendingChunkEmbedding {
1355                            chunk_id: chunk_ids[prepared.chunk_index],
1356                            doc_key: prepared.doc_key,
1357                            space_name: prepared.space_name,
1358                            path: prepared.path,
1359                            text: prepared.text,
1360                            max_document_tokens: prepared.max_document_tokens,
1361                        });
1362                    }
1363                    if pending_embeddings.len() >= embedding_batch_window {
1364                        self.flush_buffered_embeddings(
1365                            pending_embeddings,
1366                            failed_chunk_ids,
1367                            report,
1368                            failed_docs,
1369                        )?;
1370                    }
1371                }
1372
1373                let entries = chunk_ids
1374                    .iter()
1375                    .zip(final_chunks.iter())
1376                    .map(|(chunk_id, chunk)| TantivyEntry {
1377                        chunk_id: *chunk_id,
1378                        doc_id,
1379                        filepath: relative_path.clone(),
1380                        semantic_title: title_source
1381                            .semantic_title(title.as_str())
1382                            .map(ToString::to_string),
1383                        heading: chunk.heading.clone(),
1384                        body: retrieval_text_with_prefix(
1385                            chunk.retrieval_text().as_str(),
1386                            title_source.semantic_title(title.as_str()),
1387                            chunk.heading.as_deref(),
1388                            policy.contextual_prefix,
1389                        ),
1390                    })
1391                    .collect::<Vec<_>>();
1392                crate::profile::increment_update_count("tantivy_entries", entries.len() as u64);
1393                crate::profile::timed_update_stage("tantivy_add", || {
1394                    self.storage.index_tantivy(&target.space, &entries)
1395                })?;
1396            }
1397
1398            docs_by_path.insert(
1399                relative_path.clone(),
1400                crate::profile::timed_update_stage("sqlite_get_document_after_write", || {
1401                    self.storage
1402                        .get_document_by_path(target.collection.id, &relative_path)
1403                })?
1404                .ok_or_else(|| {
1405                    KboltError::Internal(format!(
1406                        "document missing after upsert: collection={}, path={relative_path}",
1407                        target.collection.id
1408                    ))
1409                })?,
1410            );
1411            pending_indexing.record(report);
1412            let (kind, detail) = pending_decision;
1413            push_update_decision(report, options, target, &relative_path, kind, detail);
1414            touched_collection = true;
1415        }
1416
1417        let mut missing_docs = docs_by_path
1418            .values()
1419            .filter(|doc| {
1420                doc.active
1421                    && !seen_paths.contains(&doc.path)
1422                    && !path_is_under_failed_walk(
1423                        doc.path.as_str(),
1424                        &failed_walk_prefixes,
1425                        collection_walk_incomplete,
1426                    )
1427            })
1428            .cloned()
1429            .collect::<Vec<_>>();
1430        missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1431
1432        for doc in missing_docs {
1433            if doc.active && !seen_paths.contains(&doc.path) {
1434                report.deactivated_docs += 1;
1435                push_update_decision(
1436                    report,
1437                    options,
1438                    target,
1439                    &doc.path,
1440                    UpdateDecisionKind::Deactivated,
1441                    None,
1442                );
1443                if !options.dry_run {
1444                    crate::profile::timed_update_stage("sqlite_deactivate_document", || {
1445                        self.storage.deactivate_document(doc.id)
1446                    })?;
1447                    touched_collection = true;
1448                }
1449            }
1450        }
1451
1452        if touched_collection && !options.dry_run {
1453            crate::profile::timed_update_stage("sqlite_update_collection_timestamp", || {
1454                self.storage
1455                    .update_collection_timestamp(target.collection.id)
1456            })?;
1457        }
1458
1459        Ok(())
1460    }
1461}
1462
1463#[derive(Debug, Default)]
1464struct WalkErrorScope {
1465    paths: Vec<std::path::PathBuf>,
1466    collection_incomplete: bool,
1467}
1468
1469fn collect_walk_error_scope(err: &ignore::Error) -> WalkErrorScope {
1470    let mut scope = WalkErrorScope::default();
1471    collect_walk_error_scope_into(err, &mut scope, false);
1472    scope
1473}
1474
1475fn collect_walk_error_scope_into(
1476    err: &ignore::Error,
1477    scope: &mut WalkErrorScope,
1478    has_path_context: bool,
1479) {
1480    match err {
1481        ignore::Error::Partial(errors) => {
1482            if errors.is_empty() {
1483                scope.collection_incomplete = true;
1484            }
1485            for err in errors {
1486                collect_walk_error_scope_into(err, scope, has_path_context);
1487            }
1488        }
1489        ignore::Error::WithLineNumber { err, .. } | ignore::Error::WithDepth { err, .. } => {
1490            collect_walk_error_scope_into(err, scope, has_path_context);
1491        }
1492        ignore::Error::WithPath { path, err } => {
1493            scope.paths.push(path.clone());
1494            collect_walk_error_scope_into(err, scope, true);
1495        }
1496        ignore::Error::Loop { child, .. } => {
1497            scope.paths.push(child.clone());
1498        }
1499        ignore::Error::Io(_) => {
1500            if !has_path_context {
1501                scope.collection_incomplete = true;
1502            }
1503        }
1504        ignore::Error::Glob { .. }
1505        | ignore::Error::UnrecognizedFileType(_)
1506        | ignore::Error::InvalidDefinition => {
1507            scope.collection_incomplete = true;
1508        }
1509    }
1510}
1511
1512fn path_is_under_failed_walk(
1513    doc_path: &str,
1514    failed_walk_prefixes: &[String],
1515    collection_walk_incomplete: bool,
1516) -> bool {
1517    if collection_walk_incomplete {
1518        return true;
1519    }
1520
1521    failed_walk_prefixes.iter().any(|prefix| {
1522        prefix.is_empty()
1523            || prefix == "."
1524            || doc_path == prefix
1525            || doc_path
1526                .strip_prefix(prefix.as_str())
1527                .is_some_and(|suffix| suffix.starts_with('/'))
1528    })
1529}
1530
1531#[cfg(test)]
1532mod tests {
1533    use super::*;
1534    use std::io;
1535    use std::path::PathBuf;
1536
1537    #[test]
1538    fn walk_error_scope_collects_every_partial_path() {
1539        let err = ignore::Error::Partial(vec![
1540            ignore::Error::WithPath {
1541                path: PathBuf::from("/collection/blocked-a"),
1542                err: Box::new(ignore::Error::Io(io::Error::new(
1543                    io::ErrorKind::PermissionDenied,
1544                    "blocked",
1545                ))),
1546            },
1547            ignore::Error::WithDepth {
1548                depth: 2,
1549                err: Box::new(ignore::Error::Loop {
1550                    ancestor: PathBuf::from("/collection"),
1551                    child: PathBuf::from("/collection/blocked-b"),
1552                }),
1553            },
1554        ]);
1555
1556        let scope = collect_walk_error_scope(&err);
1557
1558        assert_eq!(
1559            scope.paths,
1560            vec![
1561                PathBuf::from("/collection/blocked-a"),
1562                PathBuf::from("/collection/blocked-b")
1563            ]
1564        );
1565        assert!(!scope.collection_incomplete);
1566    }
1567
1568    #[test]
1569    fn walk_error_scope_marks_pathless_errors_incomplete() {
1570        let err = ignore::Error::Partial(vec![
1571            ignore::Error::WithPath {
1572                path: PathBuf::from("/collection/blocked"),
1573                err: Box::new(ignore::Error::Io(io::Error::new(
1574                    io::ErrorKind::PermissionDenied,
1575                    "blocked",
1576                ))),
1577            },
1578            ignore::Error::Io(io::Error::new(io::ErrorKind::Other, "unknown")),
1579        ]);
1580
1581        let scope = collect_walk_error_scope(&err);
1582
1583        assert_eq!(scope.paths, vec![PathBuf::from("/collection/blocked")]);
1584        assert!(scope.collection_incomplete);
1585    }
1586
1587    #[test]
1588    fn walk_error_scope_treats_ignore_rule_errors_as_incomplete() {
1589        let err = ignore::Error::WithPath {
1590            path: PathBuf::from("/collection/.gitignore"),
1591            err: Box::new(ignore::Error::WithLineNumber {
1592                line: 4,
1593                err: Box::new(ignore::Error::Glob {
1594                    glob: Some("[".to_string()),
1595                    err: "invalid glob".to_string(),
1596                }),
1597            }),
1598        };
1599
1600        let scope = collect_walk_error_scope(&err);
1601
1602        assert_eq!(scope.paths, vec![PathBuf::from("/collection/.gitignore")]);
1603        assert!(scope.collection_incomplete);
1604    }
1605}
1606
1607#[derive(Debug)]
1608struct PreparedChunkEmbedding {
1609    chunk_index: usize,
1610    doc_key: UpdateDocKey,
1611    space_name: String,
1612    path: std::path::PathBuf,
1613    text: String,
1614    max_document_tokens: usize,
1615}
1616
1617#[derive(Debug, Default)]
1618struct PreparedEmbeddingPreflight {
1619    accepted: Vec<PreparedChunkEmbedding>,
1620    rejected_chunk_indexes: Vec<usize>,
1621}
1622
1623#[derive(Debug)]
1624struct PendingChunkEmbedding {
1625    chunk_id: i64,
1626    doc_key: UpdateDocKey,
1627    space_name: String,
1628    path: std::path::PathBuf,
1629    text: String,
1630    max_document_tokens: usize,
1631}
1632
1633#[derive(Default)]
1634struct EmbeddedPendingBatch {
1635    embeddings: Vec<(i64, String, Vec<f32>)>,
1636    failed_chunk_ids: Vec<i64>,
1637}
1638
1639#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1640struct UpdateDocKey {
1641    space: String,
1642    collection_path: std::path::PathBuf,
1643    path: String,
1644}
1645
1646struct EmbeddingDocumentSizerCounter<'a> {
1647    inner: &'a dyn crate::models::EmbeddingDocumentSizer,
1648}
1649
1650impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1651    fn count(&self, text: &str) -> Result<usize> {
1652        count_document_tokens_profiled(
1653            self.inner,
1654            text,
1655            "tokenize_chunking",
1656            "tokenize_chunking_calls",
1657        )
1658    }
1659
1660    fn fits_within_token_limit_by_byte_len(&self, byte_len: usize, max_tokens: usize) -> bool {
1661        let fits = self
1662            .inner
1663            .fits_within_token_limit_by_byte_len(byte_len, max_tokens);
1664        if fits {
1665            crate::profile::increment_update_count("tokenize_chunking_byte_skips", 1);
1666        }
1667        fits
1668    }
1669}
1670
1671fn count_document_tokens_profiled(
1672    sizer: &dyn crate::models::EmbeddingDocumentSizer,
1673    text: &str,
1674    stage: &'static str,
1675    count: &'static str,
1676) -> Result<usize> {
1677    let started = Instant::now();
1678    let result = sizer.count_document_tokens(text);
1679    crate::profile::record_update_stage(stage, started.elapsed());
1680    crate::profile::increment_update_count(count, 1);
1681    result
1682}
1683
1684fn prepare_chunk_embeddings(
1685    final_chunks: &[crate::ingest::chunk::FinalChunk],
1686    doc_key: &UpdateDocKey,
1687    target: &UpdateTarget,
1688    path: &Path,
1689    max_document_tokens: usize,
1690) -> Vec<PreparedChunkEmbedding> {
1691    final_chunks
1692        .iter()
1693        .enumerate()
1694        .map(|(chunk_index, chunk)| {
1695            let mut text = chunk.retrieval_text();
1696            if text.trim().is_empty() {
1697                text = " ".to_string();
1698            }
1699            PreparedChunkEmbedding {
1700                chunk_index,
1701                doc_key: doc_key.clone(),
1702                space_name: target.space.clone(),
1703                path: path.to_path_buf(),
1704                text,
1705                max_document_tokens,
1706            }
1707        })
1708        .collect()
1709}
1710
1711#[derive(Debug, Clone)]
1712enum UpdateRepairScope {
1713    Global,
1714    Space { space_id: i64 },
1715    Collections { collection_ids: Vec<i64> },
1716}
1717
1718impl UpdateRepairScope {
1719    fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1720        if options.collections.is_empty() {
1721            if let Some(target) = targets.first() {
1722                if options.space.is_some() {
1723                    return Self::Space {
1724                        space_id: target.collection.space_id,
1725                    };
1726                }
1727            }
1728            return Self::Global;
1729        }
1730
1731        let mut collection_ids = targets
1732            .iter()
1733            .map(|target| target.collection.id)
1734            .collect::<Vec<_>>();
1735        collection_ids.sort_unstable();
1736        collection_ids.dedup();
1737        Self::Collections { collection_ids }
1738    }
1739
1740    fn allows_space_dense_repair(&self) -> bool {
1741        !matches!(self, Self::Collections { .. })
1742    }
1743}
1744
1745#[derive(Debug, Clone, Copy)]
1746enum PendingDocumentIndexing {
1747    Added,
1748    Updated { reactivated: bool },
1749}
1750
1751impl PendingDocumentIndexing {
1752    fn record(self, report: &mut UpdateReport) {
1753        match self {
1754            Self::Added => {
1755                report.added_docs += 1;
1756            }
1757            Self::Updated { reactivated } => {
1758                report.updated_docs += 1;
1759                if reactivated {
1760                    report.reactivated_docs += 1;
1761                }
1762            }
1763        }
1764    }
1765}
1766
1767fn document_has_current_canonical_text(
1768    document_text_generations: &HashMap<i64, String>,
1769    doc_id: i64,
1770    generation_key: &str,
1771) -> bool {
1772    document_text_generations
1773        .get(&doc_id)
1774        .is_some_and(|stored| stored == generation_key)
1775}
1776
1777fn finish_update_with_usearch_flush(
1778    update_result: Result<UpdateReport>,
1779    flush_result: Result<()>,
1780) -> Result<UpdateReport> {
1781    match (update_result, flush_result) {
1782        (Ok(report), Ok(())) => Ok(report),
1783        (Err(err), Ok(())) => Err(err),
1784        (Ok(_), Err(err)) => Err(err),
1785        (Err(update_err), Err(flush_err)) => Err(KboltError::Internal(format!(
1786            "update failed: {update_err}; additionally failed to save dirty vector indexes: {flush_err}"
1787        ))
1788        .into()),
1789    }
1790}
1791
1792fn invalid_pending_embedding_batch_detail(
1793    pending: &[PendingChunkEmbedding],
1794    vectors: &[Vec<f32>],
1795) -> Option<String> {
1796    if vectors.len() != pending.len() {
1797        return Some(format!(
1798            "embedder returned {} vectors for {} chunks",
1799            vectors.len(),
1800            pending.len()
1801        ));
1802    }
1803
1804    if vectors.iter().any(|vector| vector.is_empty()) {
1805        if pending.len() == 1 {
1806            return Some(format!(
1807                "embedder returned an empty vector for chunk {}",
1808                pending[0].chunk_id
1809            ));
1810        }
1811        return Some("embedder returned an empty vector".to_string());
1812    }
1813
1814    None
1815}
1816
1817fn push_update_decision(
1818    report: &mut UpdateReport,
1819    options: &UpdateOptions,
1820    target: &UpdateTarget,
1821    path: &str,
1822    kind: UpdateDecisionKind,
1823    detail: Option<String>,
1824) {
1825    if !options.verbose {
1826        return;
1827    }
1828
1829    report.decisions.push(UpdateDecision {
1830        space: target.space.clone(),
1831        collection: target.collection.name.clone(),
1832        path: path.to_string(),
1833        kind,
1834        detail,
1835    });
1836}
1837
1838fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1839    UpdateDocKey {
1840        space: space.to_string(),
1841        collection_path: collection_path.to_path_buf(),
1842        path: path.to_string(),
1843    }
1844}
1845
1846fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1847    policy
1848        .hard_max_tokens
1849        .max(policy.soft_max_tokens)
1850        .max(policy.target_tokens)
1851        .max(1)
1852}
1853
1854fn ingestion_generation_key(
1855    extractor_key: &str,
1856    extractor_version: u32,
1857    policy: &crate::config::ChunkPolicy,
1858) -> String {
1859    format!(
1860        "canonical=v{CANONICAL_TEXT_GENERATION};chunker=v{CHUNKER_GENERATION};extractor={extractor_key}:v{extractor_version};chunk=target:{}:soft:{}:hard:{}:overlap:{}:prefix:{}",
1861        policy.target_tokens,
1862        policy.soft_max_tokens,
1863        policy.hard_max_tokens,
1864        policy.boundary_overlap_tokens,
1865        policy.contextual_prefix
1866    )
1867}