Skip to main content

kbolt_core/engine/
update_ops.rs

1use super::*;
2use kbolt_types::{UpdateDecision, UpdateDecisionKind};
3
4impl Engine {
5    pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
6        let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
7        self.update_unlocked(options)
8    }
9
10    pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
11        let started = Instant::now();
12        let mut report = UpdateReport {
13            scanned_docs: 0,
14            skipped_mtime_docs: 0,
15            skipped_hash_docs: 0,
16            added_docs: 0,
17            updated_docs: 0,
18            failed_docs: 0,
19            deactivated_docs: 0,
20            reactivated_docs: 0,
21            reaped_docs: 0,
22            embedded_chunks: 0,
23            decisions: Vec::new(),
24            errors: Vec::new(),
25            elapsed_ms: 0,
26        };
27        let mut failed_docs = HashSet::new();
28
29        let targets = self.resolve_targets(TargetScope {
30            space: options.space.as_deref(),
31            collections: &options.collections,
32        })?;
33        if targets.is_empty() {
34            report.elapsed_ms = started.elapsed().as_millis() as u64;
35            return Ok(report);
36        }
37        let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
38
39        self.reconcile_dense_integrity(&targets, &repair_scope, &options)?;
40
41        if !options.dry_run {
42            self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)?;
43        }
44
45        let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
46        let mut pending_embeddings = Vec::new();
47        let mut failed_embedding_chunk_ids = HashSet::new();
48        for target in &targets {
49            self.update_collection_target(
50                target,
51                &options,
52                &mut report,
53                &mut fts_dirty_by_space,
54                &mut pending_embeddings,
55                &mut failed_embedding_chunk_ids,
56                &mut failed_docs,
57            )?;
58        }
59
60        if !options.dry_run {
61            self.flush_buffered_embeddings(
62                &mut pending_embeddings,
63                &mut failed_embedding_chunk_ids,
64                &mut report,
65                &mut failed_docs,
66            )?;
67
68            for (space, doc_ids) in fts_dirty_by_space {
69                if doc_ids.is_empty() {
70                    continue;
71                }
72
73                self.storage.commit_tantivy(&space)?;
74                let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
75                ids.sort_unstable();
76                self.storage.batch_clear_fts_dirty(&ids)?;
77            }
78
79            self.embed_pending_chunks(
80                &repair_scope,
81                &options,
82                &mut failed_embedding_chunk_ids,
83                &mut report,
84                &mut failed_docs,
85            )?;
86
87            let reaped =
88                self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)?;
89            let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
90            let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
91            for document in reaped {
92                reaped_doc_ids.push(document.doc_id);
93                if !document.chunk_ids.is_empty() {
94                    chunk_ids_by_space
95                        .entry(document.space_name)
96                        .or_default()
97                        .extend(document.chunk_ids);
98                }
99            }
100            for (space, chunk_ids) in chunk_ids_by_space {
101                self.purge_space_chunks(&space, &chunk_ids)?;
102            }
103            self.storage.delete_documents(&reaped_doc_ids)?;
104            report.reaped_docs = reaped_doc_ids.len();
105        }
106
107        report.failed_docs = failed_docs.len();
108        report.elapsed_ms = started.elapsed().as_millis() as u64;
109        Ok(report)
110    }
111
112    fn reconcile_dense_integrity(
113        &self,
114        targets: &[UpdateTarget],
115        repair_scope: &UpdateRepairScope,
116        options: &UpdateOptions,
117    ) -> Result<()> {
118        if options.no_embed || options.dry_run {
119            return Ok(());
120        }
121
122        let expected_model = self.embedding_model_key();
123        let mut visited_spaces = HashSet::new();
124        for target in targets {
125            if !visited_spaces.insert(target.collection.space_id) {
126                continue;
127            }
128
129            let models = self
130                .storage
131                .list_embedding_models_in_space(target.collection.space_id)?;
132            let mut reasons = Vec::new();
133            if models.iter().any(|model| model != expected_model) {
134                reasons.push(format!(
135                    "stored embedding models {:?} do not match current model '{}'",
136                    models, expected_model
137                ));
138            }
139
140            let sqlite_count = self
141                .storage
142                .count_embedded_chunks(Some(target.collection.space_id))?;
143            let usearch_count = self.storage.count_usearch(&target.space)?;
144            if sqlite_count != usearch_count {
145                reasons.push(format!(
146                    "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
147                ));
148            }
149
150            if reasons.is_empty() {
151                continue;
152            }
153
154            if !repair_scope.allows_space_dense_repair() {
155                return Err(KboltError::SpaceDenseRepairRequired {
156                    space: target.space.clone(),
157                    reason: reasons.join("; "),
158                }
159                .into());
160            }
161
162            self.storage
163                .delete_embeddings_for_space(target.collection.space_id)?;
164            self.storage.clear_usearch(&target.space)?;
165        }
166
167        Ok(())
168    }
169
170    fn embed_pending_chunks(
171        &self,
172        repair_scope: &UpdateRepairScope,
173        options: &UpdateOptions,
174        failed_chunk_ids: &mut HashSet<i64>,
175        report: &mut UpdateReport,
176        failed_docs: &mut HashSet<UpdateDocKey>,
177    ) -> Result<()> {
178        if options.no_embed || options.dry_run {
179            return Ok(());
180        }
181
182        let Some(embedder) = self.embedder.as_ref() else {
183            return Ok(());
184        };
185
186        let model = self.embedding_model_key();
187        let mut after_chunk_id = 0_i64;
188        loop {
189            let backlog =
190                self.get_unembedded_chunks_for_scope(model, repair_scope, after_chunk_id, 64)?;
191            if backlog.is_empty() {
192                break;
193            }
194
195            after_chunk_id = backlog
196                .last()
197                .map(|record| record.chunk_id)
198                .expect("non-empty backlog should have a last chunk id");
199
200            let mut pending = Vec::new();
201            let mut bytes_by_path: HashMap<std::path::PathBuf, Option<Vec<u8>>> = HashMap::new();
202            for record in backlog {
203                if failed_chunk_ids.contains(&record.chunk_id) {
204                    continue;
205                }
206
207                let full_path = record.collection_path.join(&record.doc_path);
208                if !bytes_by_path.contains_key(&full_path) {
209                    let bytes = match std::fs::read(&full_path) {
210                        Ok(bytes) => Some(bytes),
211                        Err(err) if err.kind() == std::io::ErrorKind::NotFound => None,
212                        Err(err) => {
213                            report.errors.push(file_error(
214                                Some(full_path.clone()),
215                                format!("embed read failed: {err}"),
216                            ));
217                            failed_docs.insert(update_doc_key(
218                                &record.space_name,
219                                &record.collection_path,
220                                &record.doc_path,
221                            ));
222                            None
223                        }
224                    };
225                    bytes_by_path.insert(full_path.clone(), bytes);
226                }
227
228                let Some(bytes) = bytes_by_path
229                    .get(&full_path)
230                    .and_then(|bytes| bytes.as_deref())
231                else {
232                    continue;
233                };
234
235                let mut text = chunk_text_from_bytes(&bytes, record.offset, record.length);
236                if text.trim().is_empty() {
237                    text = " ".to_string();
238                }
239                let max_document_tokens = match self.chunk_policy_for_path(&full_path) {
240                    Ok(policy) => effective_chunk_hard_max(&policy),
241                    Err(err) => {
242                        report.errors.push(file_error(
243                            Some(full_path.clone()),
244                            format!("embed preflight policy resolution failed: {err}"),
245                        ));
246                        failed_docs.insert(update_doc_key(
247                            &record.space_name,
248                            &record.collection_path,
249                            &record.doc_path,
250                        ));
251                        continue;
252                    }
253                };
254
255                pending.push(PendingChunkEmbedding {
256                    chunk_id: record.chunk_id,
257                    doc_key: update_doc_key(
258                        &record.space_name,
259                        &record.collection_path,
260                        &record.doc_path,
261                    ),
262                    space_name: record.space_name,
263                    path: full_path,
264                    text,
265                    max_document_tokens,
266                });
267            }
268
269            if pending.is_empty() {
270                continue;
271            }
272
273            let result = self.embed_pending_batch_with_partial_failures(
274                embedder.as_ref(),
275                pending,
276                report,
277                failed_docs,
278            )?;
279            failed_chunk_ids.extend(result.failed_chunk_ids);
280            if !result.embeddings.is_empty() {
281                self.store_chunk_embeddings(model, result.embeddings, report)?;
282            }
283        }
284
285        Ok(())
286    }
287
288    fn flush_buffered_embeddings(
289        &self,
290        pending_embeddings: &mut Vec<PendingChunkEmbedding>,
291        failed_chunk_ids: &mut HashSet<i64>,
292        report: &mut UpdateReport,
293        failed_docs: &mut HashSet<UpdateDocKey>,
294    ) -> Result<()> {
295        if pending_embeddings.is_empty() {
296            return Ok(());
297        }
298
299        let Some(embedder) = self.embedder.as_ref() else {
300            pending_embeddings.clear();
301            return Ok(());
302        };
303
304        let pending = std::mem::take(pending_embeddings);
305        let result = self.embed_pending_batch_with_partial_failures(
306            embedder.as_ref(),
307            pending,
308            report,
309            failed_docs,
310        )?;
311        failed_chunk_ids.extend(result.failed_chunk_ids);
312        if result.embeddings.is_empty() {
313            return Ok(());
314        }
315
316        self.store_chunk_embeddings(self.embedding_model_key(), result.embeddings, report)
317    }
318
319    fn embed_pending_batch_with_partial_failures(
320        &self,
321        embedder: &dyn crate::models::Embedder,
322        pending: Vec<PendingChunkEmbedding>,
323        report: &mut UpdateReport,
324        failed_docs: &mut HashSet<UpdateDocKey>,
325    ) -> Result<EmbeddedPendingBatch> {
326        if pending.is_empty() {
327            return Ok(EmbeddedPendingBatch::default());
328        }
329
330        let mut failed_chunk_ids = Vec::new();
331        let pending =
332            self.preflight_pending_embeddings(pending, report, failed_docs, &mut failed_chunk_ids)?;
333        if pending.is_empty() {
334            return Ok(EmbeddedPendingBatch {
335                embeddings: Vec::new(),
336                failed_chunk_ids,
337            });
338        }
339
340        let texts = pending
341            .iter()
342            .map(|embedding| embedding.text.clone())
343            .collect::<Vec<_>>();
344        match embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts) {
345            Ok(vectors) => {
346                if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
347                    return self.split_pending_embedding_batch(
348                        embedder,
349                        pending,
350                        report,
351                        failed_docs,
352                        detail,
353                    );
354                }
355
356                let embeddings = pending
357                    .into_iter()
358                    .zip(vectors)
359                    .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
360                    .collect::<Vec<_>>();
361                Ok(EmbeddedPendingBatch {
362                    embeddings,
363                    failed_chunk_ids,
364                })
365            }
366            Err(err) => self
367                .split_pending_embedding_batch(
368                    embedder,
369                    pending,
370                    report,
371                    failed_docs,
372                    err.to_string(),
373                )
374                .map(|mut result| {
375                    result.failed_chunk_ids.extend(failed_chunk_ids);
376                    result
377                }),
378        }
379    }
380
381    fn split_pending_embedding_batch(
382        &self,
383        embedder: &dyn crate::models::Embedder,
384        pending: Vec<PendingChunkEmbedding>,
385        report: &mut UpdateReport,
386        failed_docs: &mut HashSet<UpdateDocKey>,
387        detail: String,
388    ) -> Result<EmbeddedPendingBatch> {
389        if pending.len() == 1 {
390            let embedding = pending
391                .into_iter()
392                .next()
393                .expect("single pending embedding should exist");
394            report.errors.push(file_error(
395                Some(embedding.path),
396                format!("embed failed: {detail}"),
397            ));
398            failed_docs.insert(embedding.doc_key);
399            return Ok(EmbeddedPendingBatch {
400                embeddings: Vec::new(),
401                failed_chunk_ids: vec![embedding.chunk_id],
402            });
403        }
404
405        let mid = pending.len() / 2;
406        let mut right = pending;
407        let left = right.drain(..mid).collect::<Vec<_>>();
408
409        let mut left_result =
410            self.embed_pending_batch_with_partial_failures(embedder, left, report, failed_docs)?;
411        let right_result =
412            self.embed_pending_batch_with_partial_failures(embedder, right, report, failed_docs)?;
413        left_result.embeddings.extend(right_result.embeddings);
414        left_result
415            .failed_chunk_ids
416            .extend(right_result.failed_chunk_ids);
417        Ok(left_result)
418    }
419
420    fn store_chunk_embeddings(
421        &self,
422        model: &str,
423        embeddings: Vec<(i64, String, Vec<f32>)>,
424        report: &mut UpdateReport,
425    ) -> Result<()> {
426        let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
427        let mut embedding_rows = Vec::with_capacity(embeddings.len());
428        for (chunk_id, space_name, vector) in embeddings {
429            if vector.is_empty() {
430                return Err(KboltError::Inference(format!(
431                    "embedder returned an empty vector for chunk {chunk_id}"
432                ))
433                .into());
434            }
435
436            grouped_vectors
437                .entry(space_name)
438                .or_default()
439                .push((chunk_id, vector));
440            embedding_rows.push((chunk_id, model));
441        }
442
443        for (space, vectors) in grouped_vectors {
444            let refs = vectors
445                .iter()
446                .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
447                .collect::<Vec<_>>();
448            self.storage.batch_insert_usearch(&space, &refs)?;
449        }
450
451        self.storage.insert_embeddings(&embedding_rows)?;
452        report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
453        Ok(())
454    }
455
456    fn preflight_pending_embeddings(
457        &self,
458        pending: Vec<PendingChunkEmbedding>,
459        report: &mut UpdateReport,
460        failed_docs: &mut HashSet<UpdateDocKey>,
461        failed_chunk_ids: &mut Vec<i64>,
462    ) -> Result<Vec<PendingChunkEmbedding>> {
463        let Some(sizer) = self.embedding_document_sizer.as_ref() else {
464            return Ok(pending);
465        };
466
467        let mut accepted = Vec::with_capacity(pending.len());
468        for embedding in pending {
469            match sizer.count_document_tokens(&embedding.text) {
470                Ok(token_count) if token_count <= embedding.max_document_tokens => {
471                    accepted.push(embedding);
472                }
473                Ok(token_count) => {
474                    report.errors.push(file_error(
475                        Some(embedding.path.clone()),
476                        format!(
477                            "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
478                            embedding.max_document_tokens
479                        ),
480                    ));
481                    failed_docs.insert(embedding.doc_key);
482                    failed_chunk_ids.push(embedding.chunk_id);
483                }
484                Err(err) => {
485                    report.errors.push(file_error(
486                        Some(embedding.path.clone()),
487                        format!("embed preflight token count failed: {err}"),
488                    ));
489                    failed_docs.insert(embedding.doc_key);
490                    failed_chunk_ids.push(embedding.chunk_id);
491                }
492            }
493        }
494
495        Ok(accepted)
496    }
497
498    fn preflight_prepared_embeddings(
499        &self,
500        prepared: Vec<PreparedChunkEmbedding>,
501        report: &mut UpdateReport,
502        failed_docs: &mut HashSet<UpdateDocKey>,
503    ) -> Result<PreparedEmbeddingPreflight> {
504        let Some(sizer) = self.embedding_document_sizer.as_ref() else {
505            return Ok(PreparedEmbeddingPreflight {
506                accepted: prepared,
507                rejected_chunk_indexes: Vec::new(),
508            });
509        };
510
511        let mut preflight = PreparedEmbeddingPreflight {
512            accepted: Vec::with_capacity(prepared.len()),
513            rejected_chunk_indexes: Vec::new(),
514        };
515        for embedding in prepared {
516            match sizer.count_document_tokens(&embedding.text) {
517                Ok(token_count) if token_count <= embedding.max_document_tokens => {
518                    preflight.accepted.push(embedding);
519                }
520                Ok(token_count) => {
521                    report.errors.push(file_error(
522                        Some(embedding.path.clone()),
523                        format!(
524                            "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
525                            embedding.max_document_tokens
526                        ),
527                    ));
528                    failed_docs.insert(embedding.doc_key);
529                    preflight.rejected_chunk_indexes.push(embedding.chunk_index);
530                }
531                Err(err) => {
532                    report.errors.push(file_error(
533                        Some(embedding.path.clone()),
534                        format!("embed preflight token count failed: {err}"),
535                    ));
536                    failed_docs.insert(embedding.doc_key);
537                    preflight.rejected_chunk_indexes.push(embedding.chunk_index);
538                }
539            }
540        }
541
542        Ok(preflight)
543    }
544
545    fn chunk_policy_for_path(&self, path: &Path) -> Result<crate::config::ChunkPolicy> {
546        let extractor = default_registry().resolve_for_path(path).ok_or_else(|| {
547            KboltError::InvalidInput(format!(
548                "no extractor available for embedding preflight path {}",
549                path.display()
550            ))
551        })?;
552        Ok(resolve_policy(
553            &self.config.chunking,
554            Some(extractor.profile_key()),
555            None,
556        ))
557    }
558
559    fn replay_fts_dirty_documents(
560        &self,
561        repair_scope: &UpdateRepairScope,
562        report: &mut UpdateReport,
563        failed_docs: &mut HashSet<UpdateDocKey>,
564    ) -> Result<()> {
565        let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
566        if records.is_empty() {
567            return Ok(());
568        }
569
570        let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
571        for record in records {
572            let space_name = record.space_name;
573            let doc_id = record.doc_id;
574
575            if record.chunks.is_empty() {
576                cleared_by_space.entry(space_name).or_default().push(doc_id);
577                continue;
578            }
579
580            let full_path = record.collection_path.join(&record.doc_path);
581            let bytes = match std::fs::read(&full_path) {
582                Ok(bytes) => bytes,
583                Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue,
584                Err(err) => {
585                    failed_docs.insert(update_doc_key(
586                        &space_name,
587                        &record.collection_path,
588                        &record.doc_path,
589                    ));
590                    report.errors.push(file_error(
591                        Some(full_path),
592                        format!("fts replay read failed: {err}"),
593                    ));
594                    continue;
595                }
596            };
597            if sha256_hex(&bytes) != record.doc_hash {
598                continue;
599            }
600
601            self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
602
603            let file_body = String::from_utf8_lossy(&bytes).into_owned();
604            let entries = record
605                .chunks
606                .iter()
607                .map(|chunk| {
608                    let chunk_body = chunk_text_from_bytes(&bytes, chunk.offset, chunk.length);
609                    let source_body = if chunk_body.is_empty() {
610                        file_body.as_str()
611                    } else {
612                        chunk_body.as_str()
613                    };
614                    TantivyEntry {
615                        chunk_id: chunk.id,
616                        doc_id,
617                        filepath: record.doc_path.clone(),
618                        semantic_title: record
619                            .doc_title_source
620                            .semantic_title(record.doc_title.as_str())
621                            .map(ToString::to_string),
622                        heading: chunk.heading.clone(),
623                        body: retrieval_text_with_prefix(
624                            source_body,
625                            record
626                                .doc_title_source
627                                .semantic_title(record.doc_title.as_str()),
628                            chunk.heading.as_deref(),
629                            self.config.chunking.defaults.contextual_prefix,
630                        ),
631                    }
632                })
633                .collect::<Vec<_>>();
634
635            self.storage.index_tantivy(&space_name, &entries)?;
636            cleared_by_space.entry(space_name).or_default().push(doc_id);
637        }
638
639        for (space_name, mut doc_ids) in cleared_by_space {
640            if doc_ids.is_empty() {
641                continue;
642            }
643
644            doc_ids.sort_unstable();
645            doc_ids.dedup();
646            self.storage.commit_tantivy(&space_name)?;
647            self.storage.batch_clear_fts_dirty(&doc_ids)?;
648        }
649
650        Ok(())
651    }
652
653    fn get_fts_dirty_documents_for_scope(
654        &self,
655        repair_scope: &UpdateRepairScope,
656    ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
657        match repair_scope {
658            UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
659            UpdateRepairScope::Space { space_id } => {
660                self.storage.get_fts_dirty_documents_in_space(*space_id)
661            }
662            UpdateRepairScope::Collections { collection_ids } => self
663                .storage
664                .get_fts_dirty_documents_in_collections(collection_ids),
665        }
666    }
667
668    fn get_unembedded_chunks_for_scope(
669        &self,
670        model: &str,
671        repair_scope: &UpdateRepairScope,
672        after_chunk_id: i64,
673        limit: usize,
674    ) -> Result<Vec<crate::storage::EmbedRecord>> {
675        match repair_scope {
676            UpdateRepairScope::Global => {
677                self.storage
678                    .get_unembedded_chunks(model, after_chunk_id, limit)
679            }
680            UpdateRepairScope::Space { space_id } => {
681                self.storage
682                    .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
683            }
684            UpdateRepairScope::Collections { collection_ids } => self
685                .storage
686                .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
687        }
688    }
689
690    fn list_reapable_documents_for_scope(
691        &self,
692        older_than_days: u32,
693        repair_scope: &UpdateRepairScope,
694    ) -> Result<Vec<crate::storage::ReapableDocument>> {
695        match repair_scope {
696            UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
697            UpdateRepairScope::Space { space_id } => self
698                .storage
699                .list_reapable_documents_in_space(older_than_days, *space_id),
700            UpdateRepairScope::Collections { collection_ids } => self
701                .storage
702                .list_reapable_documents_in_collections(older_than_days, collection_ids),
703        }
704    }
705
706    pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
707        self.resolve_targets(TargetScope {
708            space: options.space.as_deref(),
709            collections: &options.collections,
710        })
711    }
712
713    pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
714        let mut targets = Vec::new();
715
716        if scope.collections.is_empty() {
717            return self.resolve_update_targets_for_all_collections(scope.space);
718        }
719
720        let mut seen = std::collections::HashSet::new();
721        for raw_collection_name in scope.collections {
722            let collection_name = raw_collection_name.trim();
723            if collection_name.is_empty() {
724                return Err(KboltError::InvalidInput(
725                    "collection names cannot be empty".to_string(),
726                )
727                .into());
728            }
729
730            let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
731            let collection = self
732                .storage
733                .get_collection(resolved_space.id, collection_name)?;
734
735            if seen.insert((collection.space_id, collection.name.clone())) {
736                targets.push(UpdateTarget {
737                    space: resolved_space.name,
738                    collection,
739                });
740            }
741        }
742
743        Ok(targets)
744    }
745
746    fn resolve_update_targets_for_all_collections(
747        &self,
748        space: Option<&str>,
749    ) -> Result<Vec<UpdateTarget>> {
750        let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
751            let resolved = self.resolve_space_row(Some(space_name), None)?;
752            let mut map = std::collections::HashMap::new();
753            map.insert(resolved.id, resolved.name.clone());
754            (Some(resolved.id), map)
755        } else {
756            let spaces = self.storage.list_spaces()?;
757            let map = spaces
758                .into_iter()
759                .map(|space| (space.id, space.name))
760                .collect::<std::collections::HashMap<_, _>>();
761            (None, map)
762        };
763
764        let collections = self.storage.list_collections(space_id_filter)?;
765        let mut targets = Vec::with_capacity(collections.len());
766        for collection in collections {
767            let space_name = spaces_by_id
768                .get(&collection.space_id)
769                .ok_or_else(|| {
770                    KboltError::Internal(format!(
771                        "missing space mapping for collection '{}'",
772                        collection.name
773                    ))
774                })?
775                .clone();
776            targets.push(UpdateTarget {
777                space: space_name,
778                collection,
779            });
780        }
781
782        Ok(targets)
783    }
784
785    fn update_collection_target(
786        &self,
787        target: &UpdateTarget,
788        options: &UpdateOptions,
789        report: &mut UpdateReport,
790        fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
791        pending_embeddings: &mut Vec<PendingChunkEmbedding>,
792        failed_chunk_ids: &mut HashSet<i64>,
793        failed_docs: &mut HashSet<UpdateDocKey>,
794    ) -> Result<()> {
795        let all_documents = self.storage.list_documents(target.collection.id, false)?;
796        let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
797            .into_iter()
798            .map(|doc| (doc.path.clone(), doc))
799            .collect();
800        let mut seen_paths = HashSet::new();
801        let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
802        let ignore_matcher = load_collection_ignore_matcher(
803            &self.config.config_dir,
804            &target.collection.path,
805            &target.space,
806            &target.collection.name,
807        )?;
808        let extractor_registry = default_registry();
809        let mut touched_collection = false;
810
811        for entry in WalkDir::new(&target.collection.path)
812            .follow_links(false)
813            .into_iter()
814            .filter_entry(|entry| {
815                !entry.file_type().is_dir() || !is_hard_ignored_dir_name(entry.file_name())
816            })
817        {
818            let entry = match entry {
819                Ok(item) => item,
820                Err(err) => {
821                    if let Some(path) = err.path() {
822                        if let Ok(relative_path) =
823                            collection_relative_path(&target.collection.path, path)
824                        {
825                            failed_docs.insert(update_doc_key(
826                                &target.space,
827                                &target.collection.path,
828                                &relative_path,
829                            ));
830                        }
831                    }
832                    report.errors.push(file_error(
833                        err.path().map(Path::to_path_buf),
834                        format!("walkdir error: {err}"),
835                    ));
836                    continue;
837                }
838            };
839
840            if !entry.file_type().is_file() {
841                continue;
842            }
843
844            if is_hard_ignored_file(entry.path()) {
845                continue;
846            }
847
848            let relative_path =
849                match collection_relative_path(&target.collection.path, entry.path()) {
850                    Ok(path) => path,
851                    Err(err) => {
852                        failed_docs.insert(update_doc_key(
853                            &target.space,
854                            &target.collection.path,
855                            &entry.path().display().to_string(),
856                        ));
857                        report.errors.push(file_error(
858                            Some(entry.path().to_path_buf()),
859                            err.to_string(),
860                        ));
861                        continue;
862                    }
863                };
864
865            if !extension_allowed(entry.path(), extension_filter.as_ref()) {
866                push_update_decision(
867                    report,
868                    options,
869                    target,
870                    &relative_path,
871                    UpdateDecisionKind::Unsupported,
872                    Some("extension not allowed".to_string()),
873                );
874                continue;
875            }
876
877            if let Some(matcher) = ignore_matcher.as_ref() {
878                if matcher
879                    .matched(Path::new(&relative_path), false)
880                    .is_ignore()
881                {
882                    push_update_decision(
883                        report,
884                        options,
885                        target,
886                        &relative_path,
887                        UpdateDecisionKind::Ignored,
888                        Some("matched ignore patterns".to_string()),
889                    );
890                    continue;
891                }
892            }
893
894            let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
895                push_update_decision(
896                    report,
897                    options,
898                    target,
899                    &relative_path,
900                    UpdateDecisionKind::Unsupported,
901                    Some("no extractor available".to_string()),
902                );
903                continue;
904            };
905
906            report.scanned_docs += 1;
907            seen_paths.insert(relative_path.clone());
908
909            let metadata = match entry.metadata() {
910                Ok(data) => data,
911                Err(err) => {
912                    let detail = format!("metadata error: {err}");
913                    failed_docs.insert(update_doc_key(
914                        &target.space,
915                        &target.collection.path,
916                        &relative_path,
917                    ));
918                    push_update_decision(
919                        report,
920                        options,
921                        target,
922                        &relative_path,
923                        UpdateDecisionKind::ReadFailed,
924                        Some(detail.clone()),
925                    );
926                    report
927                        .errors
928                        .push(file_error(Some(entry.path().to_path_buf()), detail));
929                    continue;
930                }
931            };
932
933            let modified = match modified_token(&metadata) {
934                Ok(value) => value,
935                Err(err) => {
936                    let detail = format!("modified timestamp error: {err}");
937                    failed_docs.insert(update_doc_key(
938                        &target.space,
939                        &target.collection.path,
940                        &relative_path,
941                    ));
942                    push_update_decision(
943                        report,
944                        options,
945                        target,
946                        &relative_path,
947                        UpdateDecisionKind::ReadFailed,
948                        Some(detail.clone()),
949                    );
950                    report
951                        .errors
952                        .push(file_error(Some(entry.path().to_path_buf()), detail));
953                    continue;
954                }
955            };
956
957            if let Some(existing) = docs_by_path.get(&relative_path) {
958                if existing.active && existing.modified == modified {
959                    report.skipped_mtime_docs += 1;
960                    push_update_decision(
961                        report,
962                        options,
963                        target,
964                        &relative_path,
965                        UpdateDecisionKind::SkippedMtime,
966                        None,
967                    );
968                    continue;
969                }
970            }
971
972            let bytes = match std::fs::read(entry.path()) {
973                Ok(data) => data,
974                Err(err) => {
975                    let detail = err.to_string();
976                    failed_docs.insert(update_doc_key(
977                        &target.space,
978                        &target.collection.path,
979                        &relative_path,
980                    ));
981                    push_update_decision(
982                        report,
983                        options,
984                        target,
985                        &relative_path,
986                        UpdateDecisionKind::ReadFailed,
987                        Some(detail.clone()),
988                    );
989                    report
990                        .errors
991                        .push(file_error(Some(entry.path().to_path_buf()), detail));
992                    continue;
993                }
994            };
995            let hash = sha256_hex(&bytes);
996            let mut title = file_title(entry.path());
997            let mut title_source = DocumentTitleSource::FilenameFallback;
998
999            let existing = docs_by_path.get(&relative_path).cloned();
1000            let pending_decision;
1001            let pending_indexing;
1002            if let Some(doc) = existing.as_ref() {
1003                if doc.hash == hash {
1004                    if doc.active {
1005                        report.skipped_hash_docs += 1;
1006                        push_update_decision(
1007                            report,
1008                            options,
1009                            target,
1010                            &relative_path,
1011                            UpdateDecisionKind::SkippedHash,
1012                            None,
1013                        );
1014                    } else {
1015                        report.reactivated_docs += 1;
1016                        push_update_decision(
1017                            report,
1018                            options,
1019                            target,
1020                            &relative_path,
1021                            UpdateDecisionKind::Reactivated,
1022                            None,
1023                        );
1024                    }
1025
1026                    if !options.dry_run {
1027                        self.storage.refresh_document_activity(doc.id, &modified)?;
1028                    }
1029                    continue;
1030                }
1031
1032                pending_indexing = PendingDocumentIndexing::Updated {
1033                    reactivated: !doc.active,
1034                };
1035                pending_decision = (
1036                    UpdateDecisionKind::Changed,
1037                    (!doc.active).then_some("reactivated".to_string()),
1038                );
1039            } else {
1040                pending_indexing = PendingDocumentIndexing::Added;
1041                pending_decision = (UpdateDecisionKind::New, None);
1042            }
1043
1044            if options.dry_run {
1045                pending_indexing.record(report);
1046                let (kind, detail) = pending_decision;
1047                push_update_decision(report, options, target, &relative_path, kind, detail);
1048                continue;
1049            }
1050
1051            let extracted = match extractor.extract(entry.path(), &bytes) {
1052                Ok(document) => document,
1053                Err(err) => {
1054                    let detail = format!("extract failed: {err}");
1055                    failed_docs.insert(update_doc_key(
1056                        &target.space,
1057                        &target.collection.path,
1058                        &relative_path,
1059                    ));
1060                    push_update_decision(
1061                        report,
1062                        options,
1063                        target,
1064                        &relative_path,
1065                        UpdateDecisionKind::ExtractFailed,
1066                        Some(detail.clone()),
1067                    );
1068                    report
1069                        .errors
1070                        .push(file_error(Some(entry.path().to_path_buf()), detail));
1071                    continue;
1072                }
1073            };
1074            if let Some(extracted_title) = extracted
1075                .title
1076                .as_deref()
1077                .map(str::trim)
1078                .filter(|title| !title.is_empty())
1079            {
1080                title = extracted_title.to_string();
1081                title_source = DocumentTitleSource::Extracted;
1082            }
1083
1084            let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
1085            let max_document_tokens = effective_chunk_hard_max(&policy);
1086            let final_chunks = match self.embedding_document_sizer.as_ref() {
1087                Some(sizer) => {
1088                    let sizer_counter = EmbeddingDocumentSizerCounter(sizer.as_ref());
1089                    match chunk_document_with_counter(&extracted, &policy, &sizer_counter) {
1090                        Ok(chunks) => chunks,
1091                        Err(err) => {
1092                            let detail = format!("chunking failed: {err}");
1093                            failed_docs.insert(update_doc_key(
1094                                &target.space,
1095                                &target.collection.path,
1096                                &relative_path,
1097                            ));
1098                            push_update_decision(
1099                                report,
1100                                options,
1101                                target,
1102                                &relative_path,
1103                                UpdateDecisionKind::ExtractFailed,
1104                                Some(detail.clone()),
1105                            );
1106                            report
1107                                .errors
1108                                .push(file_error(Some(entry.path().to_path_buf()), detail));
1109                            continue;
1110                        }
1111                    }
1112                }
1113                None => chunk_document(&extracted, &policy),
1114            };
1115
1116            let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1117            let chunk_inserts = final_chunks
1118                .iter()
1119                .enumerate()
1120                .map(|(index, chunk)| ChunkInsert {
1121                    seq: index as i32,
1122                    offset: chunk.offset,
1123                    length: chunk.length,
1124                    heading: chunk.heading.clone(),
1125                    kind: chunk.kind,
1126                })
1127                .collect::<Vec<_>>();
1128            let body = String::from_utf8_lossy(&bytes).into_owned();
1129            let mut prepared_embeddings = Vec::new();
1130            let mut rejected_chunk_indexes = Vec::new();
1131            if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1132                let preflight = self.preflight_prepared_embeddings(
1133                    prepare_chunk_embeddings(
1134                        &final_chunks,
1135                        &bytes,
1136                        &doc_key,
1137                        target,
1138                        entry.path(),
1139                        max_document_tokens,
1140                    ),
1141                    report,
1142                    failed_docs,
1143                )?;
1144                prepared_embeddings = preflight.accepted;
1145                rejected_chunk_indexes = preflight.rejected_chunk_indexes;
1146                if existing.is_some() && !rejected_chunk_indexes.is_empty() {
1147                    push_update_decision(
1148                        report,
1149                        options,
1150                        target,
1151                        &relative_path,
1152                        UpdateDecisionKind::ExtractFailed,
1153                        Some("embed preflight failed".to_string()),
1154                    );
1155                    continue;
1156                }
1157            }
1158
1159            let doc_id = self.storage.upsert_document(
1160                target.collection.id,
1161                &relative_path,
1162                &title,
1163                title_source,
1164                &hash,
1165                &modified,
1166            )?;
1167
1168            if let Some(doc) = existing.as_ref() {
1169                let old_chunk_ids = self.storage.delete_chunks_for_document(doc.id)?;
1170                if !old_chunk_ids.is_empty() {
1171                    self.storage.delete_tantivy(&target.space, &old_chunk_ids)?;
1172                    self.storage.delete_usearch(&target.space, &old_chunk_ids)?;
1173                }
1174            }
1175
1176            let chunk_ids = self.storage.insert_chunks(doc_id, &chunk_inserts)?;
1177            for chunk_index in rejected_chunk_indexes {
1178                if let Some(chunk_id) = chunk_ids.get(chunk_index) {
1179                    failed_chunk_ids.insert(*chunk_id);
1180                }
1181            }
1182
1183            if !chunk_ids.is_empty() {
1184                if !options.no_embed && self.embedder.is_some() {
1185                    for prepared in prepared_embeddings {
1186                        pending_embeddings.push(PendingChunkEmbedding {
1187                            chunk_id: chunk_ids[prepared.chunk_index],
1188                            doc_key: prepared.doc_key,
1189                            space_name: prepared.space_name,
1190                            path: prepared.path,
1191                            text: prepared.text,
1192                            max_document_tokens: prepared.max_document_tokens,
1193                        });
1194                    }
1195                    if pending_embeddings.len() >= 64 {
1196                        self.flush_buffered_embeddings(
1197                            pending_embeddings,
1198                            failed_chunk_ids,
1199                            report,
1200                            failed_docs,
1201                        )?;
1202                    }
1203                }
1204
1205                let entries = chunk_ids
1206                    .iter()
1207                    .zip(final_chunks.iter())
1208                    .map(|(chunk_id, chunk)| TantivyEntry {
1209                        chunk_id: *chunk_id,
1210                        doc_id,
1211                        filepath: relative_path.clone(),
1212                        semantic_title: title_source
1213                            .semantic_title(title.as_str())
1214                            .map(ToString::to_string),
1215                        heading: chunk.heading.clone(),
1216                        body: retrieval_text_with_prefix(
1217                            if chunk.text.is_empty() {
1218                                body.as_str()
1219                            } else {
1220                                chunk.text.as_str()
1221                            },
1222                            title_source.semantic_title(title.as_str()),
1223                            chunk.heading.as_deref(),
1224                            policy.contextual_prefix,
1225                        ),
1226                    })
1227                    .collect::<Vec<_>>();
1228                self.storage.index_tantivy(&target.space, &entries)?;
1229                fts_dirty_by_space
1230                    .entry(target.space.clone())
1231                    .or_default()
1232                    .insert(doc_id);
1233            }
1234
1235            docs_by_path.insert(
1236                relative_path.clone(),
1237                self.storage
1238                    .get_document_by_path(target.collection.id, &relative_path)?
1239                    .ok_or_else(|| {
1240                        KboltError::Internal(format!(
1241                            "document missing after upsert: collection={}, path={relative_path}",
1242                            target.collection.id
1243                        ))
1244                    })?,
1245            );
1246            pending_indexing.record(report);
1247            let (kind, detail) = pending_decision;
1248            push_update_decision(report, options, target, &relative_path, kind, detail);
1249            touched_collection = true;
1250        }
1251
1252        let mut missing_docs = docs_by_path
1253            .values()
1254            .filter(|doc| doc.active && !seen_paths.contains(&doc.path))
1255            .cloned()
1256            .collect::<Vec<_>>();
1257        missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1258
1259        for doc in missing_docs {
1260            if doc.active && !seen_paths.contains(&doc.path) {
1261                report.deactivated_docs += 1;
1262                push_update_decision(
1263                    report,
1264                    options,
1265                    target,
1266                    &doc.path,
1267                    UpdateDecisionKind::Deactivated,
1268                    None,
1269                );
1270                if !options.dry_run {
1271                    self.storage.deactivate_document(doc.id)?;
1272                    touched_collection = true;
1273                }
1274            }
1275        }
1276
1277        if touched_collection && !options.dry_run {
1278            self.storage
1279                .update_collection_timestamp(target.collection.id)?;
1280        }
1281
1282        Ok(())
1283    }
1284}
1285
1286#[derive(Debug)]
1287struct PreparedChunkEmbedding {
1288    chunk_index: usize,
1289    doc_key: UpdateDocKey,
1290    space_name: String,
1291    path: std::path::PathBuf,
1292    text: String,
1293    max_document_tokens: usize,
1294}
1295
1296#[derive(Debug, Default)]
1297struct PreparedEmbeddingPreflight {
1298    accepted: Vec<PreparedChunkEmbedding>,
1299    rejected_chunk_indexes: Vec<usize>,
1300}
1301
1302#[derive(Debug)]
1303struct PendingChunkEmbedding {
1304    chunk_id: i64,
1305    doc_key: UpdateDocKey,
1306    space_name: String,
1307    path: std::path::PathBuf,
1308    text: String,
1309    max_document_tokens: usize,
1310}
1311
1312#[derive(Default)]
1313struct EmbeddedPendingBatch {
1314    embeddings: Vec<(i64, String, Vec<f32>)>,
1315    failed_chunk_ids: Vec<i64>,
1316}
1317
1318#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1319struct UpdateDocKey {
1320    space: String,
1321    collection_path: std::path::PathBuf,
1322    path: String,
1323}
1324
1325struct EmbeddingDocumentSizerCounter<'a>(&'a dyn crate::models::EmbeddingDocumentSizer);
1326
1327impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1328    fn count(&self, text: &str) -> Result<usize> {
1329        self.0.count_document_tokens(text)
1330    }
1331}
1332
1333fn prepare_chunk_embeddings(
1334    final_chunks: &[crate::ingest::chunk::FinalChunk],
1335    bytes: &[u8],
1336    doc_key: &UpdateDocKey,
1337    target: &UpdateTarget,
1338    path: &Path,
1339    max_document_tokens: usize,
1340) -> Vec<PreparedChunkEmbedding> {
1341    final_chunks
1342        .iter()
1343        .enumerate()
1344        .map(|(chunk_index, chunk)| {
1345            let mut text = chunk_text_from_bytes(bytes, chunk.offset, chunk.length);
1346            if text.trim().is_empty() {
1347                text = " ".to_string();
1348            }
1349            PreparedChunkEmbedding {
1350                chunk_index,
1351                doc_key: doc_key.clone(),
1352                space_name: target.space.clone(),
1353                path: path.to_path_buf(),
1354                text,
1355                max_document_tokens,
1356            }
1357        })
1358        .collect()
1359}
1360
1361#[derive(Debug, Clone)]
1362enum UpdateRepairScope {
1363    Global,
1364    Space { space_id: i64 },
1365    Collections { collection_ids: Vec<i64> },
1366}
1367
1368impl UpdateRepairScope {
1369    fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1370        if options.collections.is_empty() {
1371            if let Some(target) = targets.first() {
1372                if options.space.is_some() {
1373                    return Self::Space {
1374                        space_id: target.collection.space_id,
1375                    };
1376                }
1377            }
1378            return Self::Global;
1379        }
1380
1381        let mut collection_ids = targets
1382            .iter()
1383            .map(|target| target.collection.id)
1384            .collect::<Vec<_>>();
1385        collection_ids.sort_unstable();
1386        collection_ids.dedup();
1387        Self::Collections { collection_ids }
1388    }
1389
1390    fn allows_space_dense_repair(&self) -> bool {
1391        !matches!(self, Self::Collections { .. })
1392    }
1393}
1394
1395#[derive(Debug, Clone, Copy)]
1396enum PendingDocumentIndexing {
1397    Added,
1398    Updated { reactivated: bool },
1399}
1400
1401impl PendingDocumentIndexing {
1402    fn record(self, report: &mut UpdateReport) {
1403        match self {
1404            Self::Added => {
1405                report.added_docs += 1;
1406            }
1407            Self::Updated { reactivated } => {
1408                report.updated_docs += 1;
1409                if reactivated {
1410                    report.reactivated_docs += 1;
1411                }
1412            }
1413        }
1414    }
1415}
1416
1417fn invalid_pending_embedding_batch_detail(
1418    pending: &[PendingChunkEmbedding],
1419    vectors: &[Vec<f32>],
1420) -> Option<String> {
1421    if vectors.len() != pending.len() {
1422        return Some(format!(
1423            "embedder returned {} vectors for {} chunks",
1424            vectors.len(),
1425            pending.len()
1426        ));
1427    }
1428
1429    if vectors.iter().any(|vector| vector.is_empty()) {
1430        if pending.len() == 1 {
1431            return Some(format!(
1432                "embedder returned an empty vector for chunk {}",
1433                pending[0].chunk_id
1434            ));
1435        }
1436        return Some("embedder returned an empty vector".to_string());
1437    }
1438
1439    None
1440}
1441
1442fn push_update_decision(
1443    report: &mut UpdateReport,
1444    options: &UpdateOptions,
1445    target: &UpdateTarget,
1446    path: &str,
1447    kind: UpdateDecisionKind,
1448    detail: Option<String>,
1449) {
1450    if !options.verbose {
1451        return;
1452    }
1453
1454    report.decisions.push(UpdateDecision {
1455        space: target.space.clone(),
1456        collection: target.collection.name.clone(),
1457        path: path.to_string(),
1458        kind,
1459        detail,
1460    });
1461}
1462
1463fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1464    UpdateDocKey {
1465        space: space.to_string(),
1466        collection_path: collection_path.to_path_buf(),
1467        path: path.to_string(),
1468    }
1469}
1470
1471fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1472    policy
1473        .hard_max_tokens
1474        .max(policy.soft_max_tokens)
1475        .max(policy.target_tokens)
1476        .max(1)
1477}