Skip to main content

kbolt_core/engine/
update_ops.rs

1use super::*;
2use kbolt_types::{UpdateDecision, UpdateDecisionKind};
3
4const CANONICAL_TEXT_GENERATION: u32 = 1;
5const CHUNKER_GENERATION: u32 = 2;
6
7impl Engine {
8    pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
9        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
10        self.update_unlocked(options)
11    }
12
13    pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
14        let _profile = crate::profile::UpdateProfileGuard::start();
15        let update_result = self.run_update_unlocked(options);
16        let flush_result = crate::profile::timed_update_stage("usearch_save_dirty", || {
17            self.storage.save_dirty_usearch_indexes()
18        });
19        finish_update_with_usearch_flush(update_result, flush_result)
20    }
21
22    fn run_update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
23        let started = Instant::now();
24        let mut report = UpdateReport {
25            scanned_docs: 0,
26            skipped_mtime_docs: 0,
27            skipped_hash_docs: 0,
28            added_docs: 0,
29            updated_docs: 0,
30            failed_docs: 0,
31            deactivated_docs: 0,
32            reactivated_docs: 0,
33            reaped_docs: 0,
34            embedded_chunks: 0,
35            decisions: Vec::new(),
36            errors: Vec::new(),
37            elapsed_ms: 0,
38        };
39        let mut failed_docs = HashSet::new();
40
41        let targets = crate::profile::timed_update_stage("resolve_targets", || {
42            self.resolve_targets(TargetScope {
43                space: options.space.as_deref(),
44                collections: &options.collections,
45            })
46        })?;
47        if targets.is_empty() {
48            report.elapsed_ms = started.elapsed().as_millis() as u64;
49            return Ok(report);
50        }
51        let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
52
53        crate::profile::timed_update_stage("dense_integrity", || {
54            self.reconcile_dense_integrity(&targets, &repair_scope, &options)
55        })?;
56
57        if !options.dry_run {
58            crate::profile::timed_update_stage("fts_dirty_replay", || {
59                self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)
60            })?;
61        }
62
63        let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
64        let mut pending_embeddings = Vec::new();
65        let mut failed_embedding_chunk_ids = HashSet::new();
66        for target in &targets {
67            self.update_collection_target(
68                target,
69                &options,
70                &mut report,
71                &mut fts_dirty_by_space,
72                &mut pending_embeddings,
73                &mut failed_embedding_chunk_ids,
74                &mut failed_docs,
75            )?;
76        }
77
78        if !options.dry_run {
79            crate::profile::timed_update_stage("embedding_flush_buffered", || {
80                self.flush_buffered_embeddings(
81                    &mut pending_embeddings,
82                    &mut failed_embedding_chunk_ids,
83                    &mut report,
84                    &mut failed_docs,
85                )
86            })?;
87
88            for (space, doc_ids) in fts_dirty_by_space {
89                if doc_ids.is_empty() {
90                    continue;
91                }
92
93                crate::profile::timed_update_stage("tantivy_commit", || {
94                    self.storage.commit_tantivy(&space)
95                })?;
96                let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
97                ids.sort_unstable();
98                crate::profile::timed_update_stage("sqlite_clear_fts_dirty", || {
99                    self.storage.batch_clear_fts_dirty(&ids)
100                })?;
101            }
102
103            crate::profile::timed_update_stage("embedding_backlog", || {
104                self.embed_pending_chunks(
105                    &repair_scope,
106                    &options,
107                    &mut failed_embedding_chunk_ids,
108                    &mut report,
109                    &mut failed_docs,
110                )
111            })?;
112
113            let reaped = crate::profile::timed_update_stage("sqlite_list_reapable", || {
114                self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)
115            })?;
116            let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
117            let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
118            for document in reaped {
119                reaped_doc_ids.push(document.doc_id);
120                if !document.chunk_ids.is_empty() {
121                    chunk_ids_by_space
122                        .entry(document.space_name)
123                        .or_default()
124                        .extend(document.chunk_ids);
125                }
126            }
127            for (space, chunk_ids) in chunk_ids_by_space {
128                crate::profile::timed_update_stage("purge_reaped_indexes", || {
129                    self.purge_space_chunks(&space, &chunk_ids)
130                })?;
131            }
132            crate::profile::timed_update_stage("sqlite_delete_reaped_documents", || {
133                self.storage.delete_documents(&reaped_doc_ids)
134            })?;
135            report.reaped_docs = reaped_doc_ids.len();
136        }
137
138        report.failed_docs = failed_docs.len();
139        report.elapsed_ms = started.elapsed().as_millis() as u64;
140        Ok(report)
141    }
142
143    fn reconcile_dense_integrity(
144        &self,
145        targets: &[UpdateTarget],
146        repair_scope: &UpdateRepairScope,
147        options: &UpdateOptions,
148    ) -> Result<()> {
149        if options.no_embed || options.dry_run {
150            return Ok(());
151        }
152
153        let expected_model = self.embedding_model_key();
154        let mut visited_spaces = HashSet::new();
155        for target in targets {
156            if !visited_spaces.insert(target.collection.space_id) {
157                continue;
158            }
159
160            let models = self
161                .storage
162                .list_embedding_models_in_space(target.collection.space_id)?;
163            let mut reasons = Vec::new();
164            if models.iter().any(|model| model != expected_model) {
165                reasons.push(format!(
166                    "stored embedding models {:?} do not match current model '{}'",
167                    models, expected_model
168                ));
169            }
170
171            let sqlite_count = self
172                .storage
173                .count_embedded_chunks(Some(target.collection.space_id))?;
174            let usearch_count = self.storage.count_usearch(&target.space)?;
175            if sqlite_count != usearch_count {
176                reasons.push(format!(
177                    "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
178                ));
179            }
180
181            if reasons.is_empty() {
182                continue;
183            }
184
185            if !repair_scope.allows_space_dense_repair() {
186                return Err(KboltError::SpaceDenseRepairRequired {
187                    space: target.space.clone(),
188                    reason: reasons.join("; "),
189                }
190                .into());
191            }
192
193            self.storage
194                .delete_embeddings_for_space(target.collection.space_id)?;
195            self.storage.clear_usearch(&target.space)?;
196        }
197
198        Ok(())
199    }
200
201    fn embed_pending_chunks(
202        &self,
203        repair_scope: &UpdateRepairScope,
204        options: &UpdateOptions,
205        failed_chunk_ids: &mut HashSet<i64>,
206        report: &mut UpdateReport,
207        failed_docs: &mut HashSet<UpdateDocKey>,
208    ) -> Result<()> {
209        if options.no_embed || options.dry_run {
210            return Ok(());
211        }
212
213        let Some(embedder) = self.embedder.as_ref() else {
214            return Ok(());
215        };
216
217        let model = self.embedding_model_key();
218        let mut after_chunk_id = 0_i64;
219        loop {
220            let backlog =
221                crate::profile::timed_update_stage("sqlite_get_unembedded_chunks", || {
222                    self.get_unembedded_chunks_for_scope(model, repair_scope, after_chunk_id, 64)
223                })?;
224            if backlog.is_empty() {
225                break;
226            }
227
228            after_chunk_id = backlog
229                .last()
230                .map(|record| record.chunk.id)
231                .expect("non-empty backlog should have a last chunk id");
232
233            let backlog_doc_ids = backlog
234                .iter()
235                .filter(|record| !failed_chunk_ids.contains(&record.chunk.id))
236                .map(|record| record.chunk.doc_id)
237                .collect::<Vec<_>>();
238            let document_text_cache =
239                crate::profile::timed_update_stage("embedding_backlog_read", || {
240                    self.storage.get_existing_document_texts(&backlog_doc_ids)
241                })?;
242
243            let mut pending = Vec::new();
244            for record in backlog {
245                if failed_chunk_ids.contains(&record.chunk.id) {
246                    continue;
247                }
248
249                let full_path = record.collection_path.join(&record.doc_path);
250                let Some(document_text) = document_text_cache.get(&record.chunk.doc_id) else {
251                    report.errors.push(file_error(
252                        Some(full_path.clone()),
253                        format!(
254                            "embed canonical text load failed: {}",
255                            crate::storage::missing_document_text_error(record.chunk.doc_id)
256                        ),
257                    ));
258                    failed_docs.insert(update_doc_key(
259                        &record.space_name,
260                        &record.collection_path,
261                        &record.doc_path,
262                    ));
263                    continue;
264                };
265                let chunk_canonical_text = match crate::storage::chunk_text_from_canonical(
266                    document_text.text.as_str(),
267                    &record.chunk,
268                ) {
269                    Ok(text) => text,
270                    Err(err) => {
271                        report.errors.push(file_error(
272                            Some(full_path.clone()),
273                            format!("embed canonical text load failed: {err}"),
274                        ));
275                        failed_docs.insert(update_doc_key(
276                            &record.space_name,
277                            &record.collection_path,
278                            &record.doc_path,
279                        ));
280                        continue;
281                    }
282                };
283                let mut text = crate::ingest::chunk::chunk_retrieval_body(
284                    chunk_canonical_text.as_str(),
285                    record.chunk.retrieval_prefix.as_deref(),
286                );
287                if text.trim().is_empty() {
288                    text = " ".to_string();
289                }
290                let policy = resolve_policy(
291                    &self.config.chunking,
292                    Some(document_text.extractor_key.as_str()),
293                    None,
294                );
295                let max_document_tokens = effective_chunk_hard_max(&policy);
296
297                pending.push(PendingChunkEmbedding {
298                    chunk_id: record.chunk.id,
299                    doc_key: update_doc_key(
300                        &record.space_name,
301                        &record.collection_path,
302                        &record.doc_path,
303                    ),
304                    space_name: record.space_name,
305                    path: full_path,
306                    text,
307                    max_document_tokens,
308                });
309            }
310
311            if pending.is_empty() {
312                continue;
313            }
314
315            let mut preflight_failed_chunk_ids = Vec::new();
316            let pending = self.preflight_pending_embeddings(
317                pending,
318                report,
319                failed_docs,
320                &mut preflight_failed_chunk_ids,
321            )?;
322            failed_chunk_ids.extend(preflight_failed_chunk_ids);
323            if pending.is_empty() {
324                continue;
325            }
326
327            let result = self.embed_preflighted_pending_batch_with_partial_failures(
328                embedder.as_ref(),
329                pending,
330                report,
331                failed_docs,
332            )?;
333            failed_chunk_ids.extend(result.failed_chunk_ids);
334            if !result.embeddings.is_empty() {
335                self.store_chunk_embeddings(model, result.embeddings, report)?;
336            }
337        }
338
339        Ok(())
340    }
341
342    fn flush_buffered_embeddings(
343        &self,
344        pending_embeddings: &mut Vec<PendingChunkEmbedding>,
345        failed_chunk_ids: &mut HashSet<i64>,
346        report: &mut UpdateReport,
347        failed_docs: &mut HashSet<UpdateDocKey>,
348    ) -> Result<()> {
349        if pending_embeddings.is_empty() {
350            return Ok(());
351        }
352
353        let Some(embedder) = self.embedder.as_ref() else {
354            pending_embeddings.clear();
355            return Ok(());
356        };
357
358        let pending = std::mem::take(pending_embeddings);
359        let result = self.embed_preflighted_pending_batch_with_partial_failures(
360            embedder.as_ref(),
361            pending,
362            report,
363            failed_docs,
364        )?;
365        failed_chunk_ids.extend(result.failed_chunk_ids);
366        if result.embeddings.is_empty() {
367            return Ok(());
368        }
369
370        self.store_chunk_embeddings(self.embedding_model_key(), result.embeddings, report)
371    }
372
373    fn embed_preflighted_pending_batch_with_partial_failures(
374        &self,
375        embedder: &dyn crate::models::Embedder,
376        pending: Vec<PendingChunkEmbedding>,
377        report: &mut UpdateReport,
378        failed_docs: &mut HashSet<UpdateDocKey>,
379    ) -> Result<EmbeddedPendingBatch> {
380        if pending.is_empty() {
381            return Ok(EmbeddedPendingBatch::default());
382        }
383
384        let texts = pending
385            .iter()
386            .map(|embedding| embedding.text.clone())
387            .collect::<Vec<_>>();
388        crate::profile::increment_update_count("embedding_batch_calls", 1);
389        crate::profile::increment_update_count("embedding_input_texts", texts.len() as u64);
390        match crate::profile::timed_update_stage("embedding_http", || {
391            embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts)
392        }) {
393            Ok(vectors) => {
394                if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
395                    return self.split_pending_embedding_batch(
396                        embedder,
397                        pending,
398                        report,
399                        failed_docs,
400                        detail,
401                    );
402                }
403
404                let embeddings = pending
405                    .into_iter()
406                    .zip(vectors)
407                    .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
408                    .collect::<Vec<_>>();
409                Ok(EmbeddedPendingBatch {
410                    embeddings,
411                    failed_chunk_ids: Vec::new(),
412                })
413            }
414            Err(err) => self.split_pending_embedding_batch(
415                embedder,
416                pending,
417                report,
418                failed_docs,
419                err.to_string(),
420            ),
421        }
422    }
423
424    fn split_pending_embedding_batch(
425        &self,
426        embedder: &dyn crate::models::Embedder,
427        pending: Vec<PendingChunkEmbedding>,
428        report: &mut UpdateReport,
429        failed_docs: &mut HashSet<UpdateDocKey>,
430        detail: String,
431    ) -> Result<EmbeddedPendingBatch> {
432        if pending.len() == 1 {
433            let embedding = pending
434                .into_iter()
435                .next()
436                .expect("single pending embedding should exist");
437            report.errors.push(file_error(
438                Some(embedding.path),
439                format!("embed failed: {detail}"),
440            ));
441            failed_docs.insert(embedding.doc_key);
442            return Ok(EmbeddedPendingBatch {
443                embeddings: Vec::new(),
444                failed_chunk_ids: vec![embedding.chunk_id],
445            });
446        }
447
448        let mid = pending.len() / 2;
449        let mut right = pending;
450        let left = right.drain(..mid).collect::<Vec<_>>();
451
452        let mut left_result = self.embed_preflighted_pending_batch_with_partial_failures(
453            embedder,
454            left,
455            report,
456            failed_docs,
457        )?;
458        let right_result = self.embed_preflighted_pending_batch_with_partial_failures(
459            embedder,
460            right,
461            report,
462            failed_docs,
463        )?;
464        left_result.embeddings.extend(right_result.embeddings);
465        left_result
466            .failed_chunk_ids
467            .extend(right_result.failed_chunk_ids);
468        Ok(left_result)
469    }
470
471    fn store_chunk_embeddings(
472        &self,
473        model: &str,
474        embeddings: Vec<(i64, String, Vec<f32>)>,
475        report: &mut UpdateReport,
476    ) -> Result<()> {
477        let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
478        let mut embedding_rows = Vec::with_capacity(embeddings.len());
479        for (chunk_id, space_name, vector) in embeddings {
480            if vector.is_empty() {
481                return Err(KboltError::Inference(format!(
482                    "embedder returned an empty vector for chunk {chunk_id}"
483                ))
484                .into());
485            }
486
487            grouped_vectors
488                .entry(space_name)
489                .or_default()
490                .push((chunk_id, vector));
491            embedding_rows.push((chunk_id, model));
492        }
493
494        for (space, vectors) in grouped_vectors {
495            let refs = vectors
496                .iter()
497                .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
498                .collect::<Vec<_>>();
499            crate::profile::increment_update_count("usearch_vectors", refs.len() as u64);
500            self.storage.batch_insert_usearch_deferred(&space, &refs)?;
501        }
502
503        crate::profile::timed_update_stage("sqlite_insert_embeddings", || {
504            self.storage.insert_embeddings(&embedding_rows)
505        })?;
506        report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
507        Ok(())
508    }
509
510    fn preflight_pending_embeddings(
511        &self,
512        pending: Vec<PendingChunkEmbedding>,
513        report: &mut UpdateReport,
514        failed_docs: &mut HashSet<UpdateDocKey>,
515        failed_chunk_ids: &mut Vec<i64>,
516    ) -> Result<Vec<PendingChunkEmbedding>> {
517        let Some(sizer) = self.embedding_document_sizer.as_ref() else {
518            return Ok(pending);
519        };
520
521        let mut accepted = Vec::with_capacity(pending.len());
522        for embedding in pending {
523            match count_document_tokens_profiled(
524                sizer.as_ref(),
525                &embedding.text,
526                "tokenize_preflight",
527                "tokenize_preflight_calls",
528            ) {
529                Ok(token_count) if token_count <= embedding.max_document_tokens => {
530                    accepted.push(embedding);
531                }
532                Ok(token_count) => {
533                    report.errors.push(file_error(
534                        Some(embedding.path.clone()),
535                        format!(
536                            "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
537                            embedding.max_document_tokens
538                        ),
539                    ));
540                    failed_docs.insert(embedding.doc_key);
541                    failed_chunk_ids.push(embedding.chunk_id);
542                }
543                Err(err) => {
544                    report.errors.push(file_error(
545                        Some(embedding.path.clone()),
546                        format!("embed preflight token count failed: {err}"),
547                    ));
548                    failed_docs.insert(embedding.doc_key);
549                    failed_chunk_ids.push(embedding.chunk_id);
550                }
551            }
552        }
553
554        Ok(accepted)
555    }
556
557    fn preflight_prepared_embeddings(
558        &self,
559        prepared: Vec<PreparedChunkEmbedding>,
560        report: &mut UpdateReport,
561        failed_docs: &mut HashSet<UpdateDocKey>,
562    ) -> Result<PreparedEmbeddingPreflight> {
563        let Some(sizer) = self.embedding_document_sizer.as_ref() else {
564            return Ok(PreparedEmbeddingPreflight {
565                accepted: prepared,
566                rejected_chunk_indexes: Vec::new(),
567            });
568        };
569
570        let mut preflight = PreparedEmbeddingPreflight {
571            accepted: Vec::with_capacity(prepared.len()),
572            rejected_chunk_indexes: Vec::new(),
573        };
574        for embedding in prepared {
575            match count_document_tokens_profiled(
576                sizer.as_ref(),
577                &embedding.text,
578                "tokenize_preflight",
579                "tokenize_preflight_calls",
580            ) {
581                Ok(token_count) if token_count <= embedding.max_document_tokens => {
582                    preflight.accepted.push(embedding);
583                }
584                Ok(token_count) => {
585                    report.errors.push(file_error(
586                        Some(embedding.path.clone()),
587                        format!(
588                            "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
589                            embedding.max_document_tokens
590                        ),
591                    ));
592                    failed_docs.insert(embedding.doc_key);
593                    preflight.rejected_chunk_indexes.push(embedding.chunk_index);
594                }
595                Err(err) => {
596                    report.errors.push(file_error(
597                        Some(embedding.path.clone()),
598                        format!("embed preflight token count failed: {err}"),
599                    ));
600                    failed_docs.insert(embedding.doc_key);
601                    preflight.rejected_chunk_indexes.push(embedding.chunk_index);
602                }
603            }
604        }
605
606        Ok(preflight)
607    }
608
609    fn replay_fts_dirty_documents(
610        &self,
611        repair_scope: &UpdateRepairScope,
612        report: &mut UpdateReport,
613        failed_docs: &mut HashSet<UpdateDocKey>,
614    ) -> Result<()> {
615        let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
616        if records.is_empty() {
617            return Ok(());
618        }
619
620        let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
621        for record in records {
622            let space_name = record.space_name;
623            let doc_id = record.doc_id;
624
625            if record.chunks.is_empty() {
626                self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
627                cleared_by_space.entry(space_name).or_default().push(doc_id);
628                continue;
629            }
630
631            let document_text = match self.storage.get_document_text(doc_id) {
632                Ok(row) => row,
633                Err(err) => {
634                    failed_docs.insert(update_doc_key(
635                        &space_name,
636                        &record.collection_path,
637                        &record.doc_path,
638                    ));
639                    report.errors.push(file_error(
640                        Some(record.collection_path.join(&record.doc_path)),
641                        format!("fts replay canonical text load failed: {err}"),
642                    ));
643                    continue;
644                }
645            };
646            let policy = resolve_policy(
647                &self.config.chunking,
648                Some(document_text.extractor_key.as_str()),
649                None,
650            );
651
652            let entries = record
653                .chunks
654                .iter()
655                .map(|chunk| -> Result<TantivyEntry> {
656                    let chunk_text = crate::storage::chunk_text_from_canonical(
657                        document_text.text.as_str(),
658                        chunk,
659                    )?;
660                    let chunk_body = crate::ingest::chunk::chunk_retrieval_body(
661                        chunk_text.as_str(),
662                        chunk.retrieval_prefix.as_deref(),
663                    );
664                    Ok(TantivyEntry {
665                        chunk_id: chunk.id,
666                        doc_id,
667                        filepath: record.doc_path.clone(),
668                        semantic_title: record
669                            .doc_title_source
670                            .semantic_title(record.doc_title.as_str())
671                            .map(ToString::to_string),
672                        heading: chunk.heading.clone(),
673                        body: retrieval_text_with_prefix(
674                            chunk_body.as_str(),
675                            record
676                                .doc_title_source
677                                .semantic_title(record.doc_title.as_str()),
678                            chunk.heading.as_deref(),
679                            policy.contextual_prefix,
680                        ),
681                    })
682                })
683                .collect::<Result<Vec<_>>>()?;
684
685            self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
686            self.storage.index_tantivy(&space_name, &entries)?;
687            cleared_by_space.entry(space_name).or_default().push(doc_id);
688        }
689
690        for (space_name, mut doc_ids) in cleared_by_space {
691            if doc_ids.is_empty() {
692                continue;
693            }
694
695            doc_ids.sort_unstable();
696            doc_ids.dedup();
697            self.storage.commit_tantivy(&space_name)?;
698            self.storage.batch_clear_fts_dirty(&doc_ids)?;
699        }
700
701        Ok(())
702    }
703
704    fn get_fts_dirty_documents_for_scope(
705        &self,
706        repair_scope: &UpdateRepairScope,
707    ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
708        match repair_scope {
709            UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
710            UpdateRepairScope::Space { space_id } => {
711                self.storage.get_fts_dirty_documents_in_space(*space_id)
712            }
713            UpdateRepairScope::Collections { collection_ids } => self
714                .storage
715                .get_fts_dirty_documents_in_collections(collection_ids),
716        }
717    }
718
719    fn get_unembedded_chunks_for_scope(
720        &self,
721        model: &str,
722        repair_scope: &UpdateRepairScope,
723        after_chunk_id: i64,
724        limit: usize,
725    ) -> Result<Vec<crate::storage::EmbedRecord>> {
726        match repair_scope {
727            UpdateRepairScope::Global => {
728                self.storage
729                    .get_unembedded_chunks(model, after_chunk_id, limit)
730            }
731            UpdateRepairScope::Space { space_id } => {
732                self.storage
733                    .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
734            }
735            UpdateRepairScope::Collections { collection_ids } => self
736                .storage
737                .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
738        }
739    }
740
741    fn list_reapable_documents_for_scope(
742        &self,
743        older_than_days: u32,
744        repair_scope: &UpdateRepairScope,
745    ) -> Result<Vec<crate::storage::ReapableDocument>> {
746        match repair_scope {
747            UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
748            UpdateRepairScope::Space { space_id } => self
749                .storage
750                .list_reapable_documents_in_space(older_than_days, *space_id),
751            UpdateRepairScope::Collections { collection_ids } => self
752                .storage
753                .list_reapable_documents_in_collections(older_than_days, collection_ids),
754        }
755    }
756
757    pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
758        self.resolve_targets(TargetScope {
759            space: options.space.as_deref(),
760            collections: &options.collections,
761        })
762    }
763
764    pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
765        let mut targets = Vec::new();
766
767        if scope.collections.is_empty() {
768            return self.resolve_update_targets_for_all_collections(scope.space);
769        }
770
771        let mut seen = std::collections::HashSet::new();
772        for raw_collection_name in scope.collections {
773            let collection_name = raw_collection_name.trim();
774            if collection_name.is_empty() {
775                return Err(KboltError::InvalidInput(
776                    "collection names cannot be empty".to_string(),
777                )
778                .into());
779            }
780
781            let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
782            let collection = self
783                .storage
784                .get_collection(resolved_space.id, collection_name)?;
785
786            if seen.insert((collection.space_id, collection.name.clone())) {
787                targets.push(UpdateTarget {
788                    space: resolved_space.name,
789                    collection,
790                });
791            }
792        }
793
794        Ok(targets)
795    }
796
797    fn resolve_update_targets_for_all_collections(
798        &self,
799        space: Option<&str>,
800    ) -> Result<Vec<UpdateTarget>> {
801        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
802            let resolved = self.resolve_space_row(Some(space_name), None)?;
803            let mut map = std::collections::HashMap::new();
804            map.insert(resolved.id, resolved.name.clone());
805            (Some(resolved.id), map)
806        } else {
807            let spaces = self.storage.list_spaces()?;
808            let map = spaces
809                .into_iter()
810                .map(|space| (space.id, space.name))
811                .collect::<std::collections::HashMap<_, _>>();
812            (None, map)
813        };
814
815        let collections = self.storage.list_collections(space_id_filter)?;
816        let mut targets = Vec::with_capacity(collections.len());
817        for collection in collections {
818            let space_name = spaces_by_id
819                .get(&collection.space_id)
820                .ok_or_else(|| {
821                    KboltError::Internal(format!(
822                        "missing space mapping for collection '{}'",
823                        collection.name
824                    ))
825                })?
826                .clone();
827            targets.push(UpdateTarget {
828                space: space_name,
829                collection,
830            });
831        }
832
833        Ok(targets)
834    }
835
836    fn update_collection_target(
837        &self,
838        target: &UpdateTarget,
839        options: &UpdateOptions,
840        report: &mut UpdateReport,
841        fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
842        pending_embeddings: &mut Vec<PendingChunkEmbedding>,
843        failed_chunk_ids: &mut HashSet<i64>,
844        failed_docs: &mut HashSet<UpdateDocKey>,
845    ) -> Result<()> {
846        let all_documents = crate::profile::timed_update_stage("sqlite_list_documents", || {
847            self.storage.list_documents(target.collection.id, false)
848        })?;
849        let document_ids = all_documents.iter().map(|doc| doc.id).collect::<Vec<_>>();
850        let document_text_generations =
851            crate::profile::timed_update_stage("sqlite_list_document_text_generations", || {
852                self.storage
853                    .get_document_text_generation_keys(&document_ids)
854            })?;
855        let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
856            .into_iter()
857            .map(|doc| (doc.path.clone(), doc))
858            .collect();
859        let mut seen_paths = HashSet::new();
860        let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
861        let ignore_matcher = load_collection_ignore_matcher(
862            &self.config.config_dir,
863            &target.collection.path,
864            &target.space,
865            &target.collection.name,
866        )?;
867        let extractor_registry = default_registry();
868        let mut touched_collection = false;
869        let mut failed_walk_prefixes = Vec::new();
870        let mut collection_walk_incomplete = false;
871
872        for entry in super::ignore_helpers::build_collection_walk(&target.collection.path) {
873            let entry = match entry {
874                Ok(item) => item,
875                Err(err) => {
876                    let error_scope = collect_walk_error_scope(&err);
877                    let error_path = error_scope.paths.first().cloned();
878                    if error_scope.collection_incomplete || error_scope.paths.is_empty() {
879                        collection_walk_incomplete = true;
880                    }
881                    for path in &error_scope.paths {
882                        let failed_path =
883                            match collection_relative_path(&target.collection.path, path) {
884                                Ok(relative) => {
885                                    if relative.is_empty() || relative == "." {
886                                        collection_walk_incomplete = true;
887                                    } else {
888                                        failed_walk_prefixes.push(relative.clone());
889                                    }
890                                    relative
891                                }
892                                Err(_) => {
893                                    collection_walk_incomplete = true;
894                                    path.display().to_string()
895                                }
896                            };
897                        failed_docs.insert(update_doc_key(
898                            &target.space,
899                            &target.collection.path,
900                            &failed_path,
901                        ));
902                    }
903                    report
904                        .errors
905                        .push(file_error(error_path, format!("walk error: {err}")));
906                    continue;
907                }
908            };
909
910            if !entry
911                .file_type()
912                .is_some_and(|file_type| file_type.is_file())
913            {
914                continue;
915            }
916
917            if is_hard_ignored_file(entry.path()) {
918                continue;
919            }
920
921            let relative_path =
922                match collection_relative_path(&target.collection.path, entry.path()) {
923                    Ok(path) => path,
924                    Err(err) => {
925                        failed_docs.insert(update_doc_key(
926                            &target.space,
927                            &target.collection.path,
928                            &entry.path().display().to_string(),
929                        ));
930                        report.errors.push(file_error(
931                            Some(entry.path().to_path_buf()),
932                            err.to_string(),
933                        ));
934                        continue;
935                    }
936                };
937
938            if !extension_allowed(entry.path(), extension_filter.as_ref()) {
939                push_update_decision(
940                    report,
941                    options,
942                    target,
943                    &relative_path,
944                    UpdateDecisionKind::Unsupported,
945                    Some("extension not allowed".to_string()),
946                );
947                continue;
948            }
949
950            if let Some(matcher) = ignore_matcher.as_ref() {
951                if matcher
952                    .matched(Path::new(&relative_path), false)
953                    .is_ignore()
954                {
955                    push_update_decision(
956                        report,
957                        options,
958                        target,
959                        &relative_path,
960                        UpdateDecisionKind::Ignored,
961                        Some("matched ignore patterns".to_string()),
962                    );
963                    continue;
964                }
965            }
966
967            let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
968                push_update_decision(
969                    report,
970                    options,
971                    target,
972                    &relative_path,
973                    UpdateDecisionKind::Unsupported,
974                    Some("no extractor available".to_string()),
975                );
976                continue;
977            };
978            let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
979            let generation_key =
980                ingestion_generation_key(extractor.profile_key(), extractor.version(), &policy);
981
982            report.scanned_docs += 1;
983            crate::profile::increment_update_count("docs_scanned", 1);
984            seen_paths.insert(relative_path.clone());
985
986            let scan_started = Instant::now();
987            let metadata = match entry.metadata() {
988                Ok(data) => data,
989                Err(err) => {
990                    let detail = format!("metadata error: {err}");
991                    failed_docs.insert(update_doc_key(
992                        &target.space,
993                        &target.collection.path,
994                        &relative_path,
995                    ));
996                    push_update_decision(
997                        report,
998                        options,
999                        target,
1000                        &relative_path,
1001                        UpdateDecisionKind::ReadFailed,
1002                        Some(detail.clone()),
1003                    );
1004                    report
1005                        .errors
1006                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1007                    crate::profile::record_update_stage(
1008                        "scan_metadata_mtime",
1009                        scan_started.elapsed(),
1010                    );
1011                    continue;
1012                }
1013            };
1014
1015            let modified = match modified_token(&metadata) {
1016                Ok(value) => value,
1017                Err(err) => {
1018                    let detail = format!("modified timestamp error: {err}");
1019                    failed_docs.insert(update_doc_key(
1020                        &target.space,
1021                        &target.collection.path,
1022                        &relative_path,
1023                    ));
1024                    push_update_decision(
1025                        report,
1026                        options,
1027                        target,
1028                        &relative_path,
1029                        UpdateDecisionKind::ReadFailed,
1030                        Some(detail.clone()),
1031                    );
1032                    report
1033                        .errors
1034                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1035                    crate::profile::record_update_stage(
1036                        "scan_metadata_mtime",
1037                        scan_started.elapsed(),
1038                    );
1039                    continue;
1040                }
1041            };
1042
1043            if let Some(existing) = docs_by_path.get(&relative_path) {
1044                let has_current_canonical_text = document_has_current_canonical_text(
1045                    &document_text_generations,
1046                    existing.id,
1047                    generation_key.as_str(),
1048                );
1049                if existing.active && existing.modified == modified && has_current_canonical_text {
1050                    report.skipped_mtime_docs += 1;
1051                    push_update_decision(
1052                        report,
1053                        options,
1054                        target,
1055                        &relative_path,
1056                        UpdateDecisionKind::SkippedMtime,
1057                        None,
1058                    );
1059                    crate::profile::record_update_stage(
1060                        "scan_metadata_mtime",
1061                        scan_started.elapsed(),
1062                    );
1063                    continue;
1064                }
1065            }
1066            crate::profile::record_update_stage("scan_metadata_mtime", scan_started.elapsed());
1067
1068            let read_hash_started = Instant::now();
1069            let bytes = match std::fs::read(entry.path()) {
1070                Ok(data) => data,
1071                Err(err) => {
1072                    let detail = err.to_string();
1073                    failed_docs.insert(update_doc_key(
1074                        &target.space,
1075                        &target.collection.path,
1076                        &relative_path,
1077                    ));
1078                    push_update_decision(
1079                        report,
1080                        options,
1081                        target,
1082                        &relative_path,
1083                        UpdateDecisionKind::ReadFailed,
1084                        Some(detail.clone()),
1085                    );
1086                    report
1087                        .errors
1088                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1089                    crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1090                    continue;
1091                }
1092            };
1093            let hash = sha256_hex(&bytes);
1094            crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1095            let mut title = file_title(entry.path());
1096            let mut title_source = DocumentTitleSource::FilenameFallback;
1097
1098            let existing = docs_by_path.get(&relative_path).cloned();
1099            let pending_decision;
1100            let pending_indexing;
1101            if let Some(doc) = existing.as_ref() {
1102                let has_current_canonical_text = document_has_current_canonical_text(
1103                    &document_text_generations,
1104                    doc.id,
1105                    generation_key.as_str(),
1106                );
1107                if doc.hash == hash && has_current_canonical_text {
1108                    if doc.active {
1109                        report.skipped_hash_docs += 1;
1110                        push_update_decision(
1111                            report,
1112                            options,
1113                            target,
1114                            &relative_path,
1115                            UpdateDecisionKind::SkippedHash,
1116                            None,
1117                        );
1118                    } else {
1119                        report.reactivated_docs += 1;
1120                        push_update_decision(
1121                            report,
1122                            options,
1123                            target,
1124                            &relative_path,
1125                            UpdateDecisionKind::Reactivated,
1126                            None,
1127                        );
1128                    }
1129
1130                    if !options.dry_run {
1131                        crate::profile::timed_update_stage("sqlite_refresh_document", || {
1132                            self.storage.refresh_document_activity(doc.id, &modified)
1133                        })?;
1134                    }
1135                    continue;
1136                }
1137
1138                pending_indexing = PendingDocumentIndexing::Updated {
1139                    reactivated: !doc.active,
1140                };
1141                pending_decision = (
1142                    UpdateDecisionKind::Changed,
1143                    (!doc.active).then_some("reactivated".to_string()),
1144                );
1145            } else {
1146                pending_indexing = PendingDocumentIndexing::Added;
1147                pending_decision = (UpdateDecisionKind::New, None);
1148            }
1149
1150            if options.dry_run {
1151                pending_indexing.record(report);
1152                let (kind, detail) = pending_decision;
1153                push_update_decision(report, options, target, &relative_path, kind, detail);
1154                continue;
1155            }
1156
1157            let extracted = match crate::profile::timed_update_stage("extract", || {
1158                extractor.extract(entry.path(), &bytes)
1159            }) {
1160                Ok(document) => document,
1161                Err(err) => {
1162                    let detail = format!("extract failed: {err}");
1163                    failed_docs.insert(update_doc_key(
1164                        &target.space,
1165                        &target.collection.path,
1166                        &relative_path,
1167                    ));
1168                    push_update_decision(
1169                        report,
1170                        options,
1171                        target,
1172                        &relative_path,
1173                        UpdateDecisionKind::ExtractFailed,
1174                        Some(detail.clone()),
1175                    );
1176                    report
1177                        .errors
1178                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1179                    continue;
1180                }
1181            };
1182            if let Some(extracted_title) = extracted
1183                .title
1184                .as_deref()
1185                .map(str::trim)
1186                .filter(|title| !title.is_empty())
1187            {
1188                title = extracted_title.to_string();
1189                title_source = DocumentTitleSource::Extracted;
1190            }
1191
1192            let canonical = crate::profile::timed_update_stage("canonical_text", || {
1193                build_canonical_document(&extracted)
1194            });
1195            let text_hash = sha256_hex(canonical.text.as_bytes());
1196            let max_document_tokens = effective_chunk_hard_max(&policy);
1197            let chunk_started = Instant::now();
1198            let final_chunks_result = match self.embedding_document_sizer.as_ref() {
1199                Some(sizer) => {
1200                    let sizer_counter = EmbeddingDocumentSizerCounter {
1201                        inner: sizer.as_ref(),
1202                    };
1203                    chunk_canonical_document_with_counter(
1204                        &canonical.document,
1205                        &policy,
1206                        &sizer_counter,
1207                    )
1208                }
1209                None => Ok(chunk_canonical_document(&canonical.document, &policy)),
1210            };
1211            crate::profile::record_update_stage("chunk", chunk_started.elapsed());
1212            let final_chunks = match final_chunks_result {
1213                Ok(chunks) => chunks,
1214                Err(err) => {
1215                    let detail = format!("chunking failed: {err}");
1216                    failed_docs.insert(update_doc_key(
1217                        &target.space,
1218                        &target.collection.path,
1219                        &relative_path,
1220                    ));
1221                    push_update_decision(
1222                        report,
1223                        options,
1224                        target,
1225                        &relative_path,
1226                        UpdateDecisionKind::ExtractFailed,
1227                        Some(detail.clone()),
1228                    );
1229                    report
1230                        .errors
1231                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1232                    continue;
1233                }
1234            };
1235            crate::profile::increment_update_count("chunks_created", final_chunks.len() as u64);
1236
1237            let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1238            let chunk_inserts = final_chunks
1239                .iter()
1240                .enumerate()
1241                .map(|(index, chunk)| ChunkInsert {
1242                    seq: index as i32,
1243                    offset: chunk.offset,
1244                    length: chunk.length,
1245                    heading: chunk.heading.clone(),
1246                    kind: chunk.kind,
1247                    retrieval_prefix: chunk.retrieval_prefix.clone(),
1248                })
1249                .collect::<Vec<_>>();
1250            let mut prepared_embeddings = Vec::new();
1251            if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1252                let prepared = crate::profile::timed_update_stage("embedding_prepare_text", || {
1253                    prepare_chunk_embeddings(
1254                        &final_chunks,
1255                        &doc_key,
1256                        target,
1257                        entry.path(),
1258                        max_document_tokens,
1259                    )
1260                });
1261                if existing.is_some() {
1262                    let preflight =
1263                        self.preflight_prepared_embeddings(prepared, report, failed_docs)?;
1264                    prepared_embeddings = preflight.accepted;
1265                    if !preflight.rejected_chunk_indexes.is_empty() {
1266                        push_update_decision(
1267                            report,
1268                            options,
1269                            target,
1270                            &relative_path,
1271                            UpdateDecisionKind::ExtractFailed,
1272                            Some("embed preflight failed".to_string()),
1273                        );
1274                        continue;
1275                    }
1276                } else {
1277                    prepared_embeddings = prepared;
1278                }
1279            }
1280
1281            let replacement =
1282                crate::profile::timed_update_stage("sqlite_replace_document_generation", || {
1283                    self.storage
1284                        .replace_document_generation(DocumentGenerationReplace {
1285                            collection_id: target.collection.id,
1286                            path: &relative_path,
1287                            title: &title,
1288                            title_source,
1289                            hash: &hash,
1290                            modified: &modified,
1291                            extractor_key: extractor.profile_key(),
1292                            source_hash: &hash,
1293                            text_hash: &text_hash,
1294                            generation_key: &generation_key,
1295                            text: canonical.text.as_str(),
1296                            chunks: &chunk_inserts,
1297                        })
1298                })?;
1299            let doc_id = replacement.doc_id;
1300            let chunk_ids = replacement.chunk_ids;
1301
1302            if !replacement.old_chunk_ids.is_empty() {
1303                crate::profile::timed_update_stage("tantivy_delete", || {
1304                    self.storage
1305                        .delete_tantivy(&target.space, &replacement.old_chunk_ids)
1306                })?;
1307                crate::profile::timed_update_stage("usearch_delete", || {
1308                    self.storage
1309                        .delete_usearch_deferred(&target.space, &replacement.old_chunk_ids)
1310                })?;
1311            }
1312
1313            fts_dirty_by_space
1314                .entry(target.space.clone())
1315                .or_default()
1316                .insert(doc_id);
1317
1318            if !chunk_ids.is_empty() {
1319                if !options.no_embed && self.embedder.is_some() {
1320                    for prepared in prepared_embeddings {
1321                        pending_embeddings.push(PendingChunkEmbedding {
1322                            chunk_id: chunk_ids[prepared.chunk_index],
1323                            doc_key: prepared.doc_key,
1324                            space_name: prepared.space_name,
1325                            path: prepared.path,
1326                            text: prepared.text,
1327                            max_document_tokens: prepared.max_document_tokens,
1328                        });
1329                    }
1330                    if pending_embeddings.len() >= 64 {
1331                        self.flush_buffered_embeddings(
1332                            pending_embeddings,
1333                            failed_chunk_ids,
1334                            report,
1335                            failed_docs,
1336                        )?;
1337                    }
1338                }
1339
1340                let entries = chunk_ids
1341                    .iter()
1342                    .zip(final_chunks.iter())
1343                    .map(|(chunk_id, chunk)| TantivyEntry {
1344                        chunk_id: *chunk_id,
1345                        doc_id,
1346                        filepath: relative_path.clone(),
1347                        semantic_title: title_source
1348                            .semantic_title(title.as_str())
1349                            .map(ToString::to_string),
1350                        heading: chunk.heading.clone(),
1351                        body: retrieval_text_with_prefix(
1352                            chunk.retrieval_text().as_str(),
1353                            title_source.semantic_title(title.as_str()),
1354                            chunk.heading.as_deref(),
1355                            policy.contextual_prefix,
1356                        ),
1357                    })
1358                    .collect::<Vec<_>>();
1359                crate::profile::increment_update_count("tantivy_entries", entries.len() as u64);
1360                crate::profile::timed_update_stage("tantivy_add", || {
1361                    self.storage.index_tantivy(&target.space, &entries)
1362                })?;
1363            }
1364
1365            docs_by_path.insert(
1366                relative_path.clone(),
1367                crate::profile::timed_update_stage("sqlite_get_document_after_write", || {
1368                    self.storage
1369                        .get_document_by_path(target.collection.id, &relative_path)
1370                })?
1371                .ok_or_else(|| {
1372                    KboltError::Internal(format!(
1373                        "document missing after upsert: collection={}, path={relative_path}",
1374                        target.collection.id
1375                    ))
1376                })?,
1377            );
1378            pending_indexing.record(report);
1379            let (kind, detail) = pending_decision;
1380            push_update_decision(report, options, target, &relative_path, kind, detail);
1381            touched_collection = true;
1382        }
1383
1384        let mut missing_docs = docs_by_path
1385            .values()
1386            .filter(|doc| {
1387                doc.active
1388                    && !seen_paths.contains(&doc.path)
1389                    && !path_is_under_failed_walk(
1390                        doc.path.as_str(),
1391                        &failed_walk_prefixes,
1392                        collection_walk_incomplete,
1393                    )
1394            })
1395            .cloned()
1396            .collect::<Vec<_>>();
1397        missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1398
1399        for doc in missing_docs {
1400            if doc.active && !seen_paths.contains(&doc.path) {
1401                report.deactivated_docs += 1;
1402                push_update_decision(
1403                    report,
1404                    options,
1405                    target,
1406                    &doc.path,
1407                    UpdateDecisionKind::Deactivated,
1408                    None,
1409                );
1410                if !options.dry_run {
1411                    crate::profile::timed_update_stage("sqlite_deactivate_document", || {
1412                        self.storage.deactivate_document(doc.id)
1413                    })?;
1414                    touched_collection = true;
1415                }
1416            }
1417        }
1418
1419        if touched_collection && !options.dry_run {
1420            crate::profile::timed_update_stage("sqlite_update_collection_timestamp", || {
1421                self.storage
1422                    .update_collection_timestamp(target.collection.id)
1423            })?;
1424        }
1425
1426        Ok(())
1427    }
1428}
1429
1430#[derive(Debug, Default)]
1431struct WalkErrorScope {
1432    paths: Vec<std::path::PathBuf>,
1433    collection_incomplete: bool,
1434}
1435
1436fn collect_walk_error_scope(err: &ignore::Error) -> WalkErrorScope {
1437    let mut scope = WalkErrorScope::default();
1438    collect_walk_error_scope_into(err, &mut scope, false);
1439    scope
1440}
1441
1442fn collect_walk_error_scope_into(
1443    err: &ignore::Error,
1444    scope: &mut WalkErrorScope,
1445    has_path_context: bool,
1446) {
1447    match err {
1448        ignore::Error::Partial(errors) => {
1449            if errors.is_empty() {
1450                scope.collection_incomplete = true;
1451            }
1452            for err in errors {
1453                collect_walk_error_scope_into(err, scope, has_path_context);
1454            }
1455        }
1456        ignore::Error::WithLineNumber { err, .. } | ignore::Error::WithDepth { err, .. } => {
1457            collect_walk_error_scope_into(err, scope, has_path_context);
1458        }
1459        ignore::Error::WithPath { path, err } => {
1460            scope.paths.push(path.clone());
1461            collect_walk_error_scope_into(err, scope, true);
1462        }
1463        ignore::Error::Loop { child, .. } => {
1464            scope.paths.push(child.clone());
1465        }
1466        ignore::Error::Io(_) => {
1467            if !has_path_context {
1468                scope.collection_incomplete = true;
1469            }
1470        }
1471        ignore::Error::Glob { .. }
1472        | ignore::Error::UnrecognizedFileType(_)
1473        | ignore::Error::InvalidDefinition => {
1474            scope.collection_incomplete = true;
1475        }
1476    }
1477}
1478
1479fn path_is_under_failed_walk(
1480    doc_path: &str,
1481    failed_walk_prefixes: &[String],
1482    collection_walk_incomplete: bool,
1483) -> bool {
1484    if collection_walk_incomplete {
1485        return true;
1486    }
1487
1488    failed_walk_prefixes.iter().any(|prefix| {
1489        prefix.is_empty()
1490            || prefix == "."
1491            || doc_path == prefix
1492            || doc_path
1493                .strip_prefix(prefix.as_str())
1494                .is_some_and(|suffix| suffix.starts_with('/'))
1495    })
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500    use super::*;
1501    use std::io;
1502    use std::path::PathBuf;
1503
1504    #[test]
1505    fn walk_error_scope_collects_every_partial_path() {
1506        let err = ignore::Error::Partial(vec![
1507            ignore::Error::WithPath {
1508                path: PathBuf::from("/collection/blocked-a"),
1509                err: Box::new(ignore::Error::Io(io::Error::new(
1510                    io::ErrorKind::PermissionDenied,
1511                    "blocked",
1512                ))),
1513            },
1514            ignore::Error::WithDepth {
1515                depth: 2,
1516                err: Box::new(ignore::Error::Loop {
1517                    ancestor: PathBuf::from("/collection"),
1518                    child: PathBuf::from("/collection/blocked-b"),
1519                }),
1520            },
1521        ]);
1522
1523        let scope = collect_walk_error_scope(&err);
1524
1525        assert_eq!(
1526            scope.paths,
1527            vec![
1528                PathBuf::from("/collection/blocked-a"),
1529                PathBuf::from("/collection/blocked-b")
1530            ]
1531        );
1532        assert!(!scope.collection_incomplete);
1533    }
1534
1535    #[test]
1536    fn walk_error_scope_marks_pathless_errors_incomplete() {
1537        let err = ignore::Error::Partial(vec![
1538            ignore::Error::WithPath {
1539                path: PathBuf::from("/collection/blocked"),
1540                err: Box::new(ignore::Error::Io(io::Error::new(
1541                    io::ErrorKind::PermissionDenied,
1542                    "blocked",
1543                ))),
1544            },
1545            ignore::Error::Io(io::Error::new(io::ErrorKind::Other, "unknown")),
1546        ]);
1547
1548        let scope = collect_walk_error_scope(&err);
1549
1550        assert_eq!(scope.paths, vec![PathBuf::from("/collection/blocked")]);
1551        assert!(scope.collection_incomplete);
1552    }
1553
1554    #[test]
1555    fn walk_error_scope_treats_ignore_rule_errors_as_incomplete() {
1556        let err = ignore::Error::WithPath {
1557            path: PathBuf::from("/collection/.gitignore"),
1558            err: Box::new(ignore::Error::WithLineNumber {
1559                line: 4,
1560                err: Box::new(ignore::Error::Glob {
1561                    glob: Some("[".to_string()),
1562                    err: "invalid glob".to_string(),
1563                }),
1564            }),
1565        };
1566
1567        let scope = collect_walk_error_scope(&err);
1568
1569        assert_eq!(scope.paths, vec![PathBuf::from("/collection/.gitignore")]);
1570        assert!(scope.collection_incomplete);
1571    }
1572}
1573
1574#[derive(Debug)]
1575struct PreparedChunkEmbedding {
1576    chunk_index: usize,
1577    doc_key: UpdateDocKey,
1578    space_name: String,
1579    path: std::path::PathBuf,
1580    text: String,
1581    max_document_tokens: usize,
1582}
1583
1584#[derive(Debug, Default)]
1585struct PreparedEmbeddingPreflight {
1586    accepted: Vec<PreparedChunkEmbedding>,
1587    rejected_chunk_indexes: Vec<usize>,
1588}
1589
1590#[derive(Debug)]
1591struct PendingChunkEmbedding {
1592    chunk_id: i64,
1593    doc_key: UpdateDocKey,
1594    space_name: String,
1595    path: std::path::PathBuf,
1596    text: String,
1597    max_document_tokens: usize,
1598}
1599
1600#[derive(Default)]
1601struct EmbeddedPendingBatch {
1602    embeddings: Vec<(i64, String, Vec<f32>)>,
1603    failed_chunk_ids: Vec<i64>,
1604}
1605
1606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1607struct UpdateDocKey {
1608    space: String,
1609    collection_path: std::path::PathBuf,
1610    path: String,
1611}
1612
1613struct EmbeddingDocumentSizerCounter<'a> {
1614    inner: &'a dyn crate::models::EmbeddingDocumentSizer,
1615}
1616
1617impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1618    fn count(&self, text: &str) -> Result<usize> {
1619        count_document_tokens_profiled(
1620            self.inner,
1621            text,
1622            "tokenize_chunking",
1623            "tokenize_chunking_calls",
1624        )
1625    }
1626}
1627
1628fn count_document_tokens_profiled(
1629    sizer: &dyn crate::models::EmbeddingDocumentSizer,
1630    text: &str,
1631    stage: &'static str,
1632    count: &'static str,
1633) -> Result<usize> {
1634    let started = Instant::now();
1635    let result = sizer.count_document_tokens(text);
1636    crate::profile::record_update_stage(stage, started.elapsed());
1637    crate::profile::increment_update_count(count, 1);
1638    result
1639}
1640
1641fn prepare_chunk_embeddings(
1642    final_chunks: &[crate::ingest::chunk::FinalChunk],
1643    doc_key: &UpdateDocKey,
1644    target: &UpdateTarget,
1645    path: &Path,
1646    max_document_tokens: usize,
1647) -> Vec<PreparedChunkEmbedding> {
1648    final_chunks
1649        .iter()
1650        .enumerate()
1651        .map(|(chunk_index, chunk)| {
1652            let mut text = chunk.retrieval_text();
1653            if text.trim().is_empty() {
1654                text = " ".to_string();
1655            }
1656            PreparedChunkEmbedding {
1657                chunk_index,
1658                doc_key: doc_key.clone(),
1659                space_name: target.space.clone(),
1660                path: path.to_path_buf(),
1661                text,
1662                max_document_tokens,
1663            }
1664        })
1665        .collect()
1666}
1667
1668#[derive(Debug, Clone)]
1669enum UpdateRepairScope {
1670    Global,
1671    Space { space_id: i64 },
1672    Collections { collection_ids: Vec<i64> },
1673}
1674
1675impl UpdateRepairScope {
1676    fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1677        if options.collections.is_empty() {
1678            if let Some(target) = targets.first() {
1679                if options.space.is_some() {
1680                    return Self::Space {
1681                        space_id: target.collection.space_id,
1682                    };
1683                }
1684            }
1685            return Self::Global;
1686        }
1687
1688        let mut collection_ids = targets
1689            .iter()
1690            .map(|target| target.collection.id)
1691            .collect::<Vec<_>>();
1692        collection_ids.sort_unstable();
1693        collection_ids.dedup();
1694        Self::Collections { collection_ids }
1695    }
1696
1697    fn allows_space_dense_repair(&self) -> bool {
1698        !matches!(self, Self::Collections { .. })
1699    }
1700}
1701
1702#[derive(Debug, Clone, Copy)]
1703enum PendingDocumentIndexing {
1704    Added,
1705    Updated { reactivated: bool },
1706}
1707
1708impl PendingDocumentIndexing {
1709    fn record(self, report: &mut UpdateReport) {
1710        match self {
1711            Self::Added => {
1712                report.added_docs += 1;
1713            }
1714            Self::Updated { reactivated } => {
1715                report.updated_docs += 1;
1716                if reactivated {
1717                    report.reactivated_docs += 1;
1718                }
1719            }
1720        }
1721    }
1722}
1723
1724fn document_has_current_canonical_text(
1725    document_text_generations: &HashMap<i64, String>,
1726    doc_id: i64,
1727    generation_key: &str,
1728) -> bool {
1729    document_text_generations
1730        .get(&doc_id)
1731        .is_some_and(|stored| stored == generation_key)
1732}
1733
1734fn finish_update_with_usearch_flush(
1735    update_result: Result<UpdateReport>,
1736    flush_result: Result<()>,
1737) -> Result<UpdateReport> {
1738    match (update_result, flush_result) {
1739        (Ok(report), Ok(())) => Ok(report),
1740        (Err(err), Ok(())) => Err(err),
1741        (Ok(_), Err(err)) => Err(err),
1742        (Err(update_err), Err(flush_err)) => Err(KboltError::Internal(format!(
1743            "update failed: {update_err}; additionally failed to save dirty vector indexes: {flush_err}"
1744        ))
1745        .into()),
1746    }
1747}
1748
1749fn invalid_pending_embedding_batch_detail(
1750    pending: &[PendingChunkEmbedding],
1751    vectors: &[Vec<f32>],
1752) -> Option<String> {
1753    if vectors.len() != pending.len() {
1754        return Some(format!(
1755            "embedder returned {} vectors for {} chunks",
1756            vectors.len(),
1757            pending.len()
1758        ));
1759    }
1760
1761    if vectors.iter().any(|vector| vector.is_empty()) {
1762        if pending.len() == 1 {
1763            return Some(format!(
1764                "embedder returned an empty vector for chunk {}",
1765                pending[0].chunk_id
1766            ));
1767        }
1768        return Some("embedder returned an empty vector".to_string());
1769    }
1770
1771    None
1772}
1773
1774fn push_update_decision(
1775    report: &mut UpdateReport,
1776    options: &UpdateOptions,
1777    target: &UpdateTarget,
1778    path: &str,
1779    kind: UpdateDecisionKind,
1780    detail: Option<String>,
1781) {
1782    if !options.verbose {
1783        return;
1784    }
1785
1786    report.decisions.push(UpdateDecision {
1787        space: target.space.clone(),
1788        collection: target.collection.name.clone(),
1789        path: path.to_string(),
1790        kind,
1791        detail,
1792    });
1793}
1794
1795fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1796    UpdateDocKey {
1797        space: space.to_string(),
1798        collection_path: collection_path.to_path_buf(),
1799        path: path.to_string(),
1800    }
1801}
1802
1803fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1804    policy
1805        .hard_max_tokens
1806        .max(policy.soft_max_tokens)
1807        .max(policy.target_tokens)
1808        .max(1)
1809}
1810
1811fn ingestion_generation_key(
1812    extractor_key: &str,
1813    extractor_version: u32,
1814    policy: &crate::config::ChunkPolicy,
1815) -> String {
1816    format!(
1817        "canonical=v{CANONICAL_TEXT_GENERATION};chunker=v{CHUNKER_GENERATION};extractor={extractor_key}:v{extractor_version};chunk=target:{}:soft:{}:hard:{}:overlap:{}:prefix:{}",
1818        policy.target_tokens,
1819        policy.soft_max_tokens,
1820        policy.hard_max_tokens,
1821        policy.boundary_overlap_tokens,
1822        policy.contextual_prefix
1823    )
1824}