1use super::*;
2use kbolt_types::{UpdateDecision, UpdateDecisionKind};
3
4impl Engine {
5 pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
6 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
7 self.update_unlocked(options)
8 }
9
10 pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
11 let started = Instant::now();
12 let mut report = UpdateReport {
13 scanned_docs: 0,
14 skipped_mtime_docs: 0,
15 skipped_hash_docs: 0,
16 added_docs: 0,
17 updated_docs: 0,
18 failed_docs: 0,
19 deactivated_docs: 0,
20 reactivated_docs: 0,
21 reaped_docs: 0,
22 embedded_chunks: 0,
23 decisions: Vec::new(),
24 errors: Vec::new(),
25 elapsed_ms: 0,
26 };
27 let mut failed_docs = HashSet::new();
28
29 let targets = self.resolve_targets(TargetScope {
30 space: options.space.as_deref(),
31 collections: &options.collections,
32 })?;
33 if targets.is_empty() {
34 report.elapsed_ms = started.elapsed().as_millis() as u64;
35 return Ok(report);
36 }
37 let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
38
39 self.reconcile_dense_integrity(&targets, &repair_scope, &options)?;
40
41 if !options.dry_run {
42 self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)?;
43 }
44
45 let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
46 let mut pending_embeddings = Vec::new();
47 let mut failed_embedding_chunk_ids = HashSet::new();
48 for target in &targets {
49 self.update_collection_target(
50 target,
51 &options,
52 &mut report,
53 &mut fts_dirty_by_space,
54 &mut pending_embeddings,
55 &mut failed_embedding_chunk_ids,
56 &mut failed_docs,
57 )?;
58 }
59
60 if !options.dry_run {
61 self.flush_buffered_embeddings(
62 &mut pending_embeddings,
63 &mut failed_embedding_chunk_ids,
64 &mut report,
65 &mut failed_docs,
66 )?;
67
68 for (space, doc_ids) in fts_dirty_by_space {
69 if doc_ids.is_empty() {
70 continue;
71 }
72
73 self.storage.commit_tantivy(&space)?;
74 let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
75 ids.sort_unstable();
76 self.storage.batch_clear_fts_dirty(&ids)?;
77 }
78
79 self.embed_pending_chunks(
80 &repair_scope,
81 &options,
82 &mut failed_embedding_chunk_ids,
83 &mut report,
84 &mut failed_docs,
85 )?;
86
87 let reaped =
88 self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)?;
89 let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
90 let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
91 for document in reaped {
92 reaped_doc_ids.push(document.doc_id);
93 if !document.chunk_ids.is_empty() {
94 chunk_ids_by_space
95 .entry(document.space_name)
96 .or_default()
97 .extend(document.chunk_ids);
98 }
99 }
100 for (space, chunk_ids) in chunk_ids_by_space {
101 self.purge_space_chunks(&space, &chunk_ids)?;
102 }
103 self.storage.delete_documents(&reaped_doc_ids)?;
104 report.reaped_docs = reaped_doc_ids.len();
105 }
106
107 report.failed_docs = failed_docs.len();
108 report.elapsed_ms = started.elapsed().as_millis() as u64;
109 Ok(report)
110 }
111
112 fn reconcile_dense_integrity(
113 &self,
114 targets: &[UpdateTarget],
115 repair_scope: &UpdateRepairScope,
116 options: &UpdateOptions,
117 ) -> Result<()> {
118 if options.no_embed || options.dry_run {
119 return Ok(());
120 }
121
122 let expected_model = self.embedding_model_key();
123 let mut visited_spaces = HashSet::new();
124 for target in targets {
125 if !visited_spaces.insert(target.collection.space_id) {
126 continue;
127 }
128
129 let models = self
130 .storage
131 .list_embedding_models_in_space(target.collection.space_id)?;
132 let mut reasons = Vec::new();
133 if models.iter().any(|model| model != expected_model) {
134 reasons.push(format!(
135 "stored embedding models {:?} do not match current model '{}'",
136 models, expected_model
137 ));
138 }
139
140 let sqlite_count = self
141 .storage
142 .count_embedded_chunks(Some(target.collection.space_id))?;
143 let usearch_count = self.storage.count_usearch(&target.space)?;
144 if sqlite_count != usearch_count {
145 reasons.push(format!(
146 "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
147 ));
148 }
149
150 if reasons.is_empty() {
151 continue;
152 }
153
154 if !repair_scope.allows_space_dense_repair() {
155 return Err(KboltError::SpaceDenseRepairRequired {
156 space: target.space.clone(),
157 reason: reasons.join("; "),
158 }
159 .into());
160 }
161
162 self.storage
163 .delete_embeddings_for_space(target.collection.space_id)?;
164 self.storage.clear_usearch(&target.space)?;
165 }
166
167 Ok(())
168 }
169
170 fn embed_pending_chunks(
171 &self,
172 repair_scope: &UpdateRepairScope,
173 options: &UpdateOptions,
174 failed_chunk_ids: &mut HashSet<i64>,
175 report: &mut UpdateReport,
176 failed_docs: &mut HashSet<UpdateDocKey>,
177 ) -> Result<()> {
178 if options.no_embed || options.dry_run {
179 return Ok(());
180 }
181
182 let Some(embedder) = self.embedder.as_ref() else {
183 return Ok(());
184 };
185
186 let model = self.embedding_model_key();
187 let mut after_chunk_id = 0_i64;
188 loop {
189 let backlog =
190 self.get_unembedded_chunks_for_scope(model, repair_scope, after_chunk_id, 64)?;
191 if backlog.is_empty() {
192 break;
193 }
194
195 after_chunk_id = backlog
196 .last()
197 .map(|record| record.chunk_id)
198 .expect("non-empty backlog should have a last chunk id");
199
200 let mut pending = Vec::new();
201 let mut bytes_by_path: HashMap<std::path::PathBuf, Option<Vec<u8>>> = HashMap::new();
202 for record in backlog {
203 if failed_chunk_ids.contains(&record.chunk_id) {
204 continue;
205 }
206
207 let full_path = record.collection_path.join(&record.doc_path);
208 if !bytes_by_path.contains_key(&full_path) {
209 let bytes = match std::fs::read(&full_path) {
210 Ok(bytes) => Some(bytes),
211 Err(err) if err.kind() == std::io::ErrorKind::NotFound => None,
212 Err(err) => {
213 report.errors.push(file_error(
214 Some(full_path.clone()),
215 format!("embed read failed: {err}"),
216 ));
217 failed_docs.insert(update_doc_key(
218 &record.space_name,
219 &record.collection_path,
220 &record.doc_path,
221 ));
222 None
223 }
224 };
225 bytes_by_path.insert(full_path.clone(), bytes);
226 }
227
228 let Some(bytes) = bytes_by_path
229 .get(&full_path)
230 .and_then(|bytes| bytes.as_deref())
231 else {
232 continue;
233 };
234
235 let mut text = chunk_text_from_bytes(&bytes, record.offset, record.length);
236 if text.trim().is_empty() {
237 text = " ".to_string();
238 }
239 let max_document_tokens = match self.chunk_policy_for_path(&full_path) {
240 Ok(policy) => effective_chunk_hard_max(&policy),
241 Err(err) => {
242 report.errors.push(file_error(
243 Some(full_path.clone()),
244 format!("embed preflight policy resolution failed: {err}"),
245 ));
246 failed_docs.insert(update_doc_key(
247 &record.space_name,
248 &record.collection_path,
249 &record.doc_path,
250 ));
251 continue;
252 }
253 };
254
255 pending.push(PendingChunkEmbedding {
256 chunk_id: record.chunk_id,
257 doc_key: update_doc_key(
258 &record.space_name,
259 &record.collection_path,
260 &record.doc_path,
261 ),
262 space_name: record.space_name,
263 path: full_path,
264 text,
265 max_document_tokens,
266 });
267 }
268
269 if pending.is_empty() {
270 continue;
271 }
272
273 let result = self.embed_pending_batch_with_partial_failures(
274 embedder.as_ref(),
275 pending,
276 report,
277 failed_docs,
278 )?;
279 failed_chunk_ids.extend(result.failed_chunk_ids);
280 if !result.embeddings.is_empty() {
281 self.store_chunk_embeddings(model, result.embeddings, report)?;
282 }
283 }
284
285 Ok(())
286 }
287
288 fn flush_buffered_embeddings(
289 &self,
290 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
291 failed_chunk_ids: &mut HashSet<i64>,
292 report: &mut UpdateReport,
293 failed_docs: &mut HashSet<UpdateDocKey>,
294 ) -> Result<()> {
295 if pending_embeddings.is_empty() {
296 return Ok(());
297 }
298
299 let Some(embedder) = self.embedder.as_ref() else {
300 pending_embeddings.clear();
301 return Ok(());
302 };
303
304 let pending = std::mem::take(pending_embeddings);
305 let result = self.embed_pending_batch_with_partial_failures(
306 embedder.as_ref(),
307 pending,
308 report,
309 failed_docs,
310 )?;
311 failed_chunk_ids.extend(result.failed_chunk_ids);
312 if result.embeddings.is_empty() {
313 return Ok(());
314 }
315
316 self.store_chunk_embeddings(self.embedding_model_key(), result.embeddings, report)
317 }
318
319 fn embed_pending_batch_with_partial_failures(
320 &self,
321 embedder: &dyn crate::models::Embedder,
322 pending: Vec<PendingChunkEmbedding>,
323 report: &mut UpdateReport,
324 failed_docs: &mut HashSet<UpdateDocKey>,
325 ) -> Result<EmbeddedPendingBatch> {
326 if pending.is_empty() {
327 return Ok(EmbeddedPendingBatch::default());
328 }
329
330 let mut failed_chunk_ids = Vec::new();
331 let pending =
332 self.preflight_pending_embeddings(pending, report, failed_docs, &mut failed_chunk_ids)?;
333 if pending.is_empty() {
334 return Ok(EmbeddedPendingBatch {
335 embeddings: Vec::new(),
336 failed_chunk_ids,
337 });
338 }
339
340 let texts = pending
341 .iter()
342 .map(|embedding| embedding.text.clone())
343 .collect::<Vec<_>>();
344 match embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts) {
345 Ok(vectors) => {
346 if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
347 return self.split_pending_embedding_batch(
348 embedder,
349 pending,
350 report,
351 failed_docs,
352 detail,
353 );
354 }
355
356 let embeddings = pending
357 .into_iter()
358 .zip(vectors)
359 .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
360 .collect::<Vec<_>>();
361 Ok(EmbeddedPendingBatch {
362 embeddings,
363 failed_chunk_ids,
364 })
365 }
366 Err(err) => self
367 .split_pending_embedding_batch(
368 embedder,
369 pending,
370 report,
371 failed_docs,
372 err.to_string(),
373 )
374 .map(|mut result| {
375 result.failed_chunk_ids.extend(failed_chunk_ids);
376 result
377 }),
378 }
379 }
380
381 fn split_pending_embedding_batch(
382 &self,
383 embedder: &dyn crate::models::Embedder,
384 pending: Vec<PendingChunkEmbedding>,
385 report: &mut UpdateReport,
386 failed_docs: &mut HashSet<UpdateDocKey>,
387 detail: String,
388 ) -> Result<EmbeddedPendingBatch> {
389 if pending.len() == 1 {
390 let embedding = pending
391 .into_iter()
392 .next()
393 .expect("single pending embedding should exist");
394 report.errors.push(file_error(
395 Some(embedding.path),
396 format!("embed failed: {detail}"),
397 ));
398 failed_docs.insert(embedding.doc_key);
399 return Ok(EmbeddedPendingBatch {
400 embeddings: Vec::new(),
401 failed_chunk_ids: vec![embedding.chunk_id],
402 });
403 }
404
405 let mid = pending.len() / 2;
406 let mut right = pending;
407 let left = right.drain(..mid).collect::<Vec<_>>();
408
409 let mut left_result =
410 self.embed_pending_batch_with_partial_failures(embedder, left, report, failed_docs)?;
411 let right_result =
412 self.embed_pending_batch_with_partial_failures(embedder, right, report, failed_docs)?;
413 left_result.embeddings.extend(right_result.embeddings);
414 left_result
415 .failed_chunk_ids
416 .extend(right_result.failed_chunk_ids);
417 Ok(left_result)
418 }
419
420 fn store_chunk_embeddings(
421 &self,
422 model: &str,
423 embeddings: Vec<(i64, String, Vec<f32>)>,
424 report: &mut UpdateReport,
425 ) -> Result<()> {
426 let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
427 let mut embedding_rows = Vec::with_capacity(embeddings.len());
428 for (chunk_id, space_name, vector) in embeddings {
429 if vector.is_empty() {
430 return Err(KboltError::Inference(format!(
431 "embedder returned an empty vector for chunk {chunk_id}"
432 ))
433 .into());
434 }
435
436 grouped_vectors
437 .entry(space_name)
438 .or_default()
439 .push((chunk_id, vector));
440 embedding_rows.push((chunk_id, model));
441 }
442
443 for (space, vectors) in grouped_vectors {
444 let refs = vectors
445 .iter()
446 .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
447 .collect::<Vec<_>>();
448 self.storage.batch_insert_usearch(&space, &refs)?;
449 }
450
451 self.storage.insert_embeddings(&embedding_rows)?;
452 report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
453 Ok(())
454 }
455
456 fn preflight_pending_embeddings(
457 &self,
458 pending: Vec<PendingChunkEmbedding>,
459 report: &mut UpdateReport,
460 failed_docs: &mut HashSet<UpdateDocKey>,
461 failed_chunk_ids: &mut Vec<i64>,
462 ) -> Result<Vec<PendingChunkEmbedding>> {
463 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
464 return Ok(pending);
465 };
466
467 let mut accepted = Vec::with_capacity(pending.len());
468 for embedding in pending {
469 match sizer.count_document_tokens(&embedding.text) {
470 Ok(token_count) if token_count <= embedding.max_document_tokens => {
471 accepted.push(embedding);
472 }
473 Ok(token_count) => {
474 report.errors.push(file_error(
475 Some(embedding.path.clone()),
476 format!(
477 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
478 embedding.max_document_tokens
479 ),
480 ));
481 failed_docs.insert(embedding.doc_key);
482 failed_chunk_ids.push(embedding.chunk_id);
483 }
484 Err(err) => {
485 report.errors.push(file_error(
486 Some(embedding.path.clone()),
487 format!("embed preflight token count failed: {err}"),
488 ));
489 failed_docs.insert(embedding.doc_key);
490 failed_chunk_ids.push(embedding.chunk_id);
491 }
492 }
493 }
494
495 Ok(accepted)
496 }
497
498 fn preflight_prepared_embeddings(
499 &self,
500 prepared: Vec<PreparedChunkEmbedding>,
501 report: &mut UpdateReport,
502 failed_docs: &mut HashSet<UpdateDocKey>,
503 ) -> Result<PreparedEmbeddingPreflight> {
504 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
505 return Ok(PreparedEmbeddingPreflight {
506 accepted: prepared,
507 rejected_chunk_indexes: Vec::new(),
508 });
509 };
510
511 let mut preflight = PreparedEmbeddingPreflight {
512 accepted: Vec::with_capacity(prepared.len()),
513 rejected_chunk_indexes: Vec::new(),
514 };
515 for embedding in prepared {
516 match sizer.count_document_tokens(&embedding.text) {
517 Ok(token_count) if token_count <= embedding.max_document_tokens => {
518 preflight.accepted.push(embedding);
519 }
520 Ok(token_count) => {
521 report.errors.push(file_error(
522 Some(embedding.path.clone()),
523 format!(
524 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
525 embedding.max_document_tokens
526 ),
527 ));
528 failed_docs.insert(embedding.doc_key);
529 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
530 }
531 Err(err) => {
532 report.errors.push(file_error(
533 Some(embedding.path.clone()),
534 format!("embed preflight token count failed: {err}"),
535 ));
536 failed_docs.insert(embedding.doc_key);
537 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
538 }
539 }
540 }
541
542 Ok(preflight)
543 }
544
545 fn chunk_policy_for_path(&self, path: &Path) -> Result<crate::config::ChunkPolicy> {
546 let extractor = default_registry().resolve_for_path(path).ok_or_else(|| {
547 KboltError::InvalidInput(format!(
548 "no extractor available for embedding preflight path {}",
549 path.display()
550 ))
551 })?;
552 Ok(resolve_policy(
553 &self.config.chunking,
554 Some(extractor.profile_key()),
555 None,
556 ))
557 }
558
559 fn replay_fts_dirty_documents(
560 &self,
561 repair_scope: &UpdateRepairScope,
562 report: &mut UpdateReport,
563 failed_docs: &mut HashSet<UpdateDocKey>,
564 ) -> Result<()> {
565 let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
566 if records.is_empty() {
567 return Ok(());
568 }
569
570 let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
571 for record in records {
572 let space_name = record.space_name;
573 let doc_id = record.doc_id;
574
575 if record.chunks.is_empty() {
576 cleared_by_space.entry(space_name).or_default().push(doc_id);
577 continue;
578 }
579
580 let full_path = record.collection_path.join(&record.doc_path);
581 let bytes = match std::fs::read(&full_path) {
582 Ok(bytes) => bytes,
583 Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue,
584 Err(err) => {
585 failed_docs.insert(update_doc_key(
586 &space_name,
587 &record.collection_path,
588 &record.doc_path,
589 ));
590 report.errors.push(file_error(
591 Some(full_path),
592 format!("fts replay read failed: {err}"),
593 ));
594 continue;
595 }
596 };
597 if sha256_hex(&bytes) != record.doc_hash {
598 continue;
599 }
600
601 self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
602
603 let file_body = String::from_utf8_lossy(&bytes).into_owned();
604 let entries = record
605 .chunks
606 .iter()
607 .map(|chunk| {
608 let chunk_body = chunk_text_from_bytes(&bytes, chunk.offset, chunk.length);
609 let source_body = if chunk_body.is_empty() {
610 file_body.as_str()
611 } else {
612 chunk_body.as_str()
613 };
614 TantivyEntry {
615 chunk_id: chunk.id,
616 doc_id,
617 filepath: record.doc_path.clone(),
618 semantic_title: record
619 .doc_title_source
620 .semantic_title(record.doc_title.as_str())
621 .map(ToString::to_string),
622 heading: chunk.heading.clone(),
623 body: retrieval_text_with_prefix(
624 source_body,
625 record
626 .doc_title_source
627 .semantic_title(record.doc_title.as_str()),
628 chunk.heading.as_deref(),
629 self.config.chunking.defaults.contextual_prefix,
630 ),
631 }
632 })
633 .collect::<Vec<_>>();
634
635 self.storage.index_tantivy(&space_name, &entries)?;
636 cleared_by_space.entry(space_name).or_default().push(doc_id);
637 }
638
639 for (space_name, mut doc_ids) in cleared_by_space {
640 if doc_ids.is_empty() {
641 continue;
642 }
643
644 doc_ids.sort_unstable();
645 doc_ids.dedup();
646 self.storage.commit_tantivy(&space_name)?;
647 self.storage.batch_clear_fts_dirty(&doc_ids)?;
648 }
649
650 Ok(())
651 }
652
653 fn get_fts_dirty_documents_for_scope(
654 &self,
655 repair_scope: &UpdateRepairScope,
656 ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
657 match repair_scope {
658 UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
659 UpdateRepairScope::Space { space_id } => {
660 self.storage.get_fts_dirty_documents_in_space(*space_id)
661 }
662 UpdateRepairScope::Collections { collection_ids } => self
663 .storage
664 .get_fts_dirty_documents_in_collections(collection_ids),
665 }
666 }
667
668 fn get_unembedded_chunks_for_scope(
669 &self,
670 model: &str,
671 repair_scope: &UpdateRepairScope,
672 after_chunk_id: i64,
673 limit: usize,
674 ) -> Result<Vec<crate::storage::EmbedRecord>> {
675 match repair_scope {
676 UpdateRepairScope::Global => {
677 self.storage
678 .get_unembedded_chunks(model, after_chunk_id, limit)
679 }
680 UpdateRepairScope::Space { space_id } => {
681 self.storage
682 .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
683 }
684 UpdateRepairScope::Collections { collection_ids } => self
685 .storage
686 .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
687 }
688 }
689
690 fn list_reapable_documents_for_scope(
691 &self,
692 older_than_days: u32,
693 repair_scope: &UpdateRepairScope,
694 ) -> Result<Vec<crate::storage::ReapableDocument>> {
695 match repair_scope {
696 UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
697 UpdateRepairScope::Space { space_id } => self
698 .storage
699 .list_reapable_documents_in_space(older_than_days, *space_id),
700 UpdateRepairScope::Collections { collection_ids } => self
701 .storage
702 .list_reapable_documents_in_collections(older_than_days, collection_ids),
703 }
704 }
705
706 pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
707 self.resolve_targets(TargetScope {
708 space: options.space.as_deref(),
709 collections: &options.collections,
710 })
711 }
712
713 pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
714 let mut targets = Vec::new();
715
716 if scope.collections.is_empty() {
717 return self.resolve_update_targets_for_all_collections(scope.space);
718 }
719
720 let mut seen = std::collections::HashSet::new();
721 for raw_collection_name in scope.collections {
722 let collection_name = raw_collection_name.trim();
723 if collection_name.is_empty() {
724 return Err(KboltError::InvalidInput(
725 "collection names cannot be empty".to_string(),
726 )
727 .into());
728 }
729
730 let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
731 let collection = self
732 .storage
733 .get_collection(resolved_space.id, collection_name)?;
734
735 if seen.insert((collection.space_id, collection.name.clone())) {
736 targets.push(UpdateTarget {
737 space: resolved_space.name,
738 collection,
739 });
740 }
741 }
742
743 Ok(targets)
744 }
745
746 fn resolve_update_targets_for_all_collections(
747 &self,
748 space: Option<&str>,
749 ) -> Result<Vec<UpdateTarget>> {
750 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
751 let resolved = self.resolve_space_row(Some(space_name), None)?;
752 let mut map = std::collections::HashMap::new();
753 map.insert(resolved.id, resolved.name.clone());
754 (Some(resolved.id), map)
755 } else {
756 let spaces = self.storage.list_spaces()?;
757 let map = spaces
758 .into_iter()
759 .map(|space| (space.id, space.name))
760 .collect::<std::collections::HashMap<_, _>>();
761 (None, map)
762 };
763
764 let collections = self.storage.list_collections(space_id_filter)?;
765 let mut targets = Vec::with_capacity(collections.len());
766 for collection in collections {
767 let space_name = spaces_by_id
768 .get(&collection.space_id)
769 .ok_or_else(|| {
770 KboltError::Internal(format!(
771 "missing space mapping for collection '{}'",
772 collection.name
773 ))
774 })?
775 .clone();
776 targets.push(UpdateTarget {
777 space: space_name,
778 collection,
779 });
780 }
781
782 Ok(targets)
783 }
784
785 fn update_collection_target(
786 &self,
787 target: &UpdateTarget,
788 options: &UpdateOptions,
789 report: &mut UpdateReport,
790 fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
791 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
792 failed_chunk_ids: &mut HashSet<i64>,
793 failed_docs: &mut HashSet<UpdateDocKey>,
794 ) -> Result<()> {
795 let all_documents = self.storage.list_documents(target.collection.id, false)?;
796 let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
797 .into_iter()
798 .map(|doc| (doc.path.clone(), doc))
799 .collect();
800 let mut seen_paths = HashSet::new();
801 let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
802 let ignore_matcher = load_collection_ignore_matcher(
803 &self.config.config_dir,
804 &target.collection.path,
805 &target.space,
806 &target.collection.name,
807 )?;
808 let extractor_registry = default_registry();
809 let mut touched_collection = false;
810
811 for entry in super::ignore_helpers::build_collection_walk(&target.collection.path) {
812 let entry = match entry {
813 Ok(item) => item,
814 Err(err) => {
815 let error_path = walk_error_path(&err).map(Path::to_path_buf);
816 if let Some(path) = error_path.as_deref() {
817 let failed_path = collection_relative_path(&target.collection.path, path)
818 .unwrap_or_else(|_| path.display().to_string());
819 failed_docs.insert(update_doc_key(
820 &target.space,
821 &target.collection.path,
822 &failed_path,
823 ));
824 }
825 report
826 .errors
827 .push(file_error(error_path, format!("walk error: {err}")));
828 continue;
829 }
830 };
831
832 if !entry
833 .file_type()
834 .is_some_and(|file_type| file_type.is_file())
835 {
836 continue;
837 }
838
839 if is_hard_ignored_file(entry.path()) {
840 continue;
841 }
842
843 let relative_path =
844 match collection_relative_path(&target.collection.path, entry.path()) {
845 Ok(path) => path,
846 Err(err) => {
847 failed_docs.insert(update_doc_key(
848 &target.space,
849 &target.collection.path,
850 &entry.path().display().to_string(),
851 ));
852 report.errors.push(file_error(
853 Some(entry.path().to_path_buf()),
854 err.to_string(),
855 ));
856 continue;
857 }
858 };
859
860 if !extension_allowed(entry.path(), extension_filter.as_ref()) {
861 push_update_decision(
862 report,
863 options,
864 target,
865 &relative_path,
866 UpdateDecisionKind::Unsupported,
867 Some("extension not allowed".to_string()),
868 );
869 continue;
870 }
871
872 if let Some(matcher) = ignore_matcher.as_ref() {
873 if matcher
874 .matched(Path::new(&relative_path), false)
875 .is_ignore()
876 {
877 push_update_decision(
878 report,
879 options,
880 target,
881 &relative_path,
882 UpdateDecisionKind::Ignored,
883 Some("matched ignore patterns".to_string()),
884 );
885 continue;
886 }
887 }
888
889 let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
890 push_update_decision(
891 report,
892 options,
893 target,
894 &relative_path,
895 UpdateDecisionKind::Unsupported,
896 Some("no extractor available".to_string()),
897 );
898 continue;
899 };
900
901 report.scanned_docs += 1;
902 seen_paths.insert(relative_path.clone());
903
904 let metadata = match entry.metadata() {
905 Ok(data) => data,
906 Err(err) => {
907 let detail = format!("metadata error: {err}");
908 failed_docs.insert(update_doc_key(
909 &target.space,
910 &target.collection.path,
911 &relative_path,
912 ));
913 push_update_decision(
914 report,
915 options,
916 target,
917 &relative_path,
918 UpdateDecisionKind::ReadFailed,
919 Some(detail.clone()),
920 );
921 report
922 .errors
923 .push(file_error(Some(entry.path().to_path_buf()), detail));
924 continue;
925 }
926 };
927
928 let modified = match modified_token(&metadata) {
929 Ok(value) => value,
930 Err(err) => {
931 let detail = format!("modified timestamp error: {err}");
932 failed_docs.insert(update_doc_key(
933 &target.space,
934 &target.collection.path,
935 &relative_path,
936 ));
937 push_update_decision(
938 report,
939 options,
940 target,
941 &relative_path,
942 UpdateDecisionKind::ReadFailed,
943 Some(detail.clone()),
944 );
945 report
946 .errors
947 .push(file_error(Some(entry.path().to_path_buf()), detail));
948 continue;
949 }
950 };
951
952 if let Some(existing) = docs_by_path.get(&relative_path) {
953 if existing.active && existing.modified == modified {
954 report.skipped_mtime_docs += 1;
955 push_update_decision(
956 report,
957 options,
958 target,
959 &relative_path,
960 UpdateDecisionKind::SkippedMtime,
961 None,
962 );
963 continue;
964 }
965 }
966
967 let bytes = match std::fs::read(entry.path()) {
968 Ok(data) => data,
969 Err(err) => {
970 let detail = err.to_string();
971 failed_docs.insert(update_doc_key(
972 &target.space,
973 &target.collection.path,
974 &relative_path,
975 ));
976 push_update_decision(
977 report,
978 options,
979 target,
980 &relative_path,
981 UpdateDecisionKind::ReadFailed,
982 Some(detail.clone()),
983 );
984 report
985 .errors
986 .push(file_error(Some(entry.path().to_path_buf()), detail));
987 continue;
988 }
989 };
990 let hash = sha256_hex(&bytes);
991 let mut title = file_title(entry.path());
992 let mut title_source = DocumentTitleSource::FilenameFallback;
993
994 let existing = docs_by_path.get(&relative_path).cloned();
995 let pending_decision;
996 let pending_indexing;
997 if let Some(doc) = existing.as_ref() {
998 if doc.hash == hash {
999 if doc.active {
1000 report.skipped_hash_docs += 1;
1001 push_update_decision(
1002 report,
1003 options,
1004 target,
1005 &relative_path,
1006 UpdateDecisionKind::SkippedHash,
1007 None,
1008 );
1009 } else {
1010 report.reactivated_docs += 1;
1011 push_update_decision(
1012 report,
1013 options,
1014 target,
1015 &relative_path,
1016 UpdateDecisionKind::Reactivated,
1017 None,
1018 );
1019 }
1020
1021 if !options.dry_run {
1022 self.storage.refresh_document_activity(doc.id, &modified)?;
1023 }
1024 continue;
1025 }
1026
1027 pending_indexing = PendingDocumentIndexing::Updated {
1028 reactivated: !doc.active,
1029 };
1030 pending_decision = (
1031 UpdateDecisionKind::Changed,
1032 (!doc.active).then_some("reactivated".to_string()),
1033 );
1034 } else {
1035 pending_indexing = PendingDocumentIndexing::Added;
1036 pending_decision = (UpdateDecisionKind::New, None);
1037 }
1038
1039 if options.dry_run {
1040 pending_indexing.record(report);
1041 let (kind, detail) = pending_decision;
1042 push_update_decision(report, options, target, &relative_path, kind, detail);
1043 continue;
1044 }
1045
1046 let extracted = match extractor.extract(entry.path(), &bytes) {
1047 Ok(document) => document,
1048 Err(err) => {
1049 let detail = format!("extract failed: {err}");
1050 failed_docs.insert(update_doc_key(
1051 &target.space,
1052 &target.collection.path,
1053 &relative_path,
1054 ));
1055 push_update_decision(
1056 report,
1057 options,
1058 target,
1059 &relative_path,
1060 UpdateDecisionKind::ExtractFailed,
1061 Some(detail.clone()),
1062 );
1063 report
1064 .errors
1065 .push(file_error(Some(entry.path().to_path_buf()), detail));
1066 continue;
1067 }
1068 };
1069 if let Some(extracted_title) = extracted
1070 .title
1071 .as_deref()
1072 .map(str::trim)
1073 .filter(|title| !title.is_empty())
1074 {
1075 title = extracted_title.to_string();
1076 title_source = DocumentTitleSource::Extracted;
1077 }
1078
1079 let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
1080 let max_document_tokens = effective_chunk_hard_max(&policy);
1081 let final_chunks = match self.embedding_document_sizer.as_ref() {
1082 Some(sizer) => {
1083 let sizer_counter = EmbeddingDocumentSizerCounter(sizer.as_ref());
1084 match chunk_document_with_counter(&extracted, &policy, &sizer_counter) {
1085 Ok(chunks) => chunks,
1086 Err(err) => {
1087 let detail = format!("chunking failed: {err}");
1088 failed_docs.insert(update_doc_key(
1089 &target.space,
1090 &target.collection.path,
1091 &relative_path,
1092 ));
1093 push_update_decision(
1094 report,
1095 options,
1096 target,
1097 &relative_path,
1098 UpdateDecisionKind::ExtractFailed,
1099 Some(detail.clone()),
1100 );
1101 report
1102 .errors
1103 .push(file_error(Some(entry.path().to_path_buf()), detail));
1104 continue;
1105 }
1106 }
1107 }
1108 None => chunk_document(&extracted, &policy),
1109 };
1110
1111 let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1112 let chunk_inserts = final_chunks
1113 .iter()
1114 .enumerate()
1115 .map(|(index, chunk)| ChunkInsert {
1116 seq: index as i32,
1117 offset: chunk.offset,
1118 length: chunk.length,
1119 heading: chunk.heading.clone(),
1120 kind: chunk.kind,
1121 })
1122 .collect::<Vec<_>>();
1123 let body = String::from_utf8_lossy(&bytes).into_owned();
1124 let mut prepared_embeddings = Vec::new();
1125 let mut rejected_chunk_indexes = Vec::new();
1126 if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1127 let preflight = self.preflight_prepared_embeddings(
1128 prepare_chunk_embeddings(
1129 &final_chunks,
1130 &bytes,
1131 &doc_key,
1132 target,
1133 entry.path(),
1134 max_document_tokens,
1135 ),
1136 report,
1137 failed_docs,
1138 )?;
1139 prepared_embeddings = preflight.accepted;
1140 rejected_chunk_indexes = preflight.rejected_chunk_indexes;
1141 if existing.is_some() && !rejected_chunk_indexes.is_empty() {
1142 push_update_decision(
1143 report,
1144 options,
1145 target,
1146 &relative_path,
1147 UpdateDecisionKind::ExtractFailed,
1148 Some("embed preflight failed".to_string()),
1149 );
1150 continue;
1151 }
1152 }
1153
1154 let doc_id = self.storage.upsert_document(
1155 target.collection.id,
1156 &relative_path,
1157 &title,
1158 title_source,
1159 &hash,
1160 &modified,
1161 )?;
1162
1163 if let Some(doc) = existing.as_ref() {
1164 let old_chunk_ids = self.storage.delete_chunks_for_document(doc.id)?;
1165 if !old_chunk_ids.is_empty() {
1166 self.storage.delete_tantivy(&target.space, &old_chunk_ids)?;
1167 self.storage.delete_usearch(&target.space, &old_chunk_ids)?;
1168 }
1169 }
1170
1171 let chunk_ids = self.storage.insert_chunks(doc_id, &chunk_inserts)?;
1172 for chunk_index in rejected_chunk_indexes {
1173 if let Some(chunk_id) = chunk_ids.get(chunk_index) {
1174 failed_chunk_ids.insert(*chunk_id);
1175 }
1176 }
1177
1178 if !chunk_ids.is_empty() {
1179 if !options.no_embed && self.embedder.is_some() {
1180 for prepared in prepared_embeddings {
1181 pending_embeddings.push(PendingChunkEmbedding {
1182 chunk_id: chunk_ids[prepared.chunk_index],
1183 doc_key: prepared.doc_key,
1184 space_name: prepared.space_name,
1185 path: prepared.path,
1186 text: prepared.text,
1187 max_document_tokens: prepared.max_document_tokens,
1188 });
1189 }
1190 if pending_embeddings.len() >= 64 {
1191 self.flush_buffered_embeddings(
1192 pending_embeddings,
1193 failed_chunk_ids,
1194 report,
1195 failed_docs,
1196 )?;
1197 }
1198 }
1199
1200 let entries = chunk_ids
1201 .iter()
1202 .zip(final_chunks.iter())
1203 .map(|(chunk_id, chunk)| TantivyEntry {
1204 chunk_id: *chunk_id,
1205 doc_id,
1206 filepath: relative_path.clone(),
1207 semantic_title: title_source
1208 .semantic_title(title.as_str())
1209 .map(ToString::to_string),
1210 heading: chunk.heading.clone(),
1211 body: retrieval_text_with_prefix(
1212 if chunk.text.is_empty() {
1213 body.as_str()
1214 } else {
1215 chunk.text.as_str()
1216 },
1217 title_source.semantic_title(title.as_str()),
1218 chunk.heading.as_deref(),
1219 policy.contextual_prefix,
1220 ),
1221 })
1222 .collect::<Vec<_>>();
1223 self.storage.index_tantivy(&target.space, &entries)?;
1224 fts_dirty_by_space
1225 .entry(target.space.clone())
1226 .or_default()
1227 .insert(doc_id);
1228 }
1229
1230 docs_by_path.insert(
1231 relative_path.clone(),
1232 self.storage
1233 .get_document_by_path(target.collection.id, &relative_path)?
1234 .ok_or_else(|| {
1235 KboltError::Internal(format!(
1236 "document missing after upsert: collection={}, path={relative_path}",
1237 target.collection.id
1238 ))
1239 })?,
1240 );
1241 pending_indexing.record(report);
1242 let (kind, detail) = pending_decision;
1243 push_update_decision(report, options, target, &relative_path, kind, detail);
1244 touched_collection = true;
1245 }
1246
1247 let mut missing_docs = docs_by_path
1248 .values()
1249 .filter(|doc| doc.active && !seen_paths.contains(&doc.path))
1250 .cloned()
1251 .collect::<Vec<_>>();
1252 missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1253
1254 for doc in missing_docs {
1255 if doc.active && !seen_paths.contains(&doc.path) {
1256 report.deactivated_docs += 1;
1257 push_update_decision(
1258 report,
1259 options,
1260 target,
1261 &doc.path,
1262 UpdateDecisionKind::Deactivated,
1263 None,
1264 );
1265 if !options.dry_run {
1266 self.storage.deactivate_document(doc.id)?;
1267 touched_collection = true;
1268 }
1269 }
1270 }
1271
1272 if touched_collection && !options.dry_run {
1273 self.storage
1274 .update_collection_timestamp(target.collection.id)?;
1275 }
1276
1277 Ok(())
1278 }
1279}
1280
1281fn walk_error_path(err: &ignore::Error) -> Option<&Path> {
1282 match err {
1283 ignore::Error::Partial(errors) => errors.iter().find_map(walk_error_path),
1284 ignore::Error::WithLineNumber { err, .. } | ignore::Error::WithDepth { err, .. } => {
1285 walk_error_path(err)
1286 }
1287 ignore::Error::WithPath { path, .. } => Some(path.as_path()),
1288 ignore::Error::Loop { child, .. } => Some(child.as_path()),
1289 ignore::Error::Io(_)
1290 | ignore::Error::Glob { .. }
1291 | ignore::Error::UnrecognizedFileType(_)
1292 | ignore::Error::InvalidDefinition => None,
1293 }
1294}
1295
1296#[derive(Debug)]
1297struct PreparedChunkEmbedding {
1298 chunk_index: usize,
1299 doc_key: UpdateDocKey,
1300 space_name: String,
1301 path: std::path::PathBuf,
1302 text: String,
1303 max_document_tokens: usize,
1304}
1305
1306#[derive(Debug, Default)]
1307struct PreparedEmbeddingPreflight {
1308 accepted: Vec<PreparedChunkEmbedding>,
1309 rejected_chunk_indexes: Vec<usize>,
1310}
1311
1312#[derive(Debug)]
1313struct PendingChunkEmbedding {
1314 chunk_id: i64,
1315 doc_key: UpdateDocKey,
1316 space_name: String,
1317 path: std::path::PathBuf,
1318 text: String,
1319 max_document_tokens: usize,
1320}
1321
1322#[derive(Default)]
1323struct EmbeddedPendingBatch {
1324 embeddings: Vec<(i64, String, Vec<f32>)>,
1325 failed_chunk_ids: Vec<i64>,
1326}
1327
1328#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1329struct UpdateDocKey {
1330 space: String,
1331 collection_path: std::path::PathBuf,
1332 path: String,
1333}
1334
1335struct EmbeddingDocumentSizerCounter<'a>(&'a dyn crate::models::EmbeddingDocumentSizer);
1336
1337impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1338 fn count(&self, text: &str) -> Result<usize> {
1339 self.0.count_document_tokens(text)
1340 }
1341}
1342
1343fn prepare_chunk_embeddings(
1344 final_chunks: &[crate::ingest::chunk::FinalChunk],
1345 bytes: &[u8],
1346 doc_key: &UpdateDocKey,
1347 target: &UpdateTarget,
1348 path: &Path,
1349 max_document_tokens: usize,
1350) -> Vec<PreparedChunkEmbedding> {
1351 final_chunks
1352 .iter()
1353 .enumerate()
1354 .map(|(chunk_index, chunk)| {
1355 let mut text = chunk_text_from_bytes(bytes, chunk.offset, chunk.length);
1356 if text.trim().is_empty() {
1357 text = " ".to_string();
1358 }
1359 PreparedChunkEmbedding {
1360 chunk_index,
1361 doc_key: doc_key.clone(),
1362 space_name: target.space.clone(),
1363 path: path.to_path_buf(),
1364 text,
1365 max_document_tokens,
1366 }
1367 })
1368 .collect()
1369}
1370
1371#[derive(Debug, Clone)]
1372enum UpdateRepairScope {
1373 Global,
1374 Space { space_id: i64 },
1375 Collections { collection_ids: Vec<i64> },
1376}
1377
1378impl UpdateRepairScope {
1379 fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1380 if options.collections.is_empty() {
1381 if let Some(target) = targets.first() {
1382 if options.space.is_some() {
1383 return Self::Space {
1384 space_id: target.collection.space_id,
1385 };
1386 }
1387 }
1388 return Self::Global;
1389 }
1390
1391 let mut collection_ids = targets
1392 .iter()
1393 .map(|target| target.collection.id)
1394 .collect::<Vec<_>>();
1395 collection_ids.sort_unstable();
1396 collection_ids.dedup();
1397 Self::Collections { collection_ids }
1398 }
1399
1400 fn allows_space_dense_repair(&self) -> bool {
1401 !matches!(self, Self::Collections { .. })
1402 }
1403}
1404
1405#[derive(Debug, Clone, Copy)]
1406enum PendingDocumentIndexing {
1407 Added,
1408 Updated { reactivated: bool },
1409}
1410
1411impl PendingDocumentIndexing {
1412 fn record(self, report: &mut UpdateReport) {
1413 match self {
1414 Self::Added => {
1415 report.added_docs += 1;
1416 }
1417 Self::Updated { reactivated } => {
1418 report.updated_docs += 1;
1419 if reactivated {
1420 report.reactivated_docs += 1;
1421 }
1422 }
1423 }
1424 }
1425}
1426
1427fn invalid_pending_embedding_batch_detail(
1428 pending: &[PendingChunkEmbedding],
1429 vectors: &[Vec<f32>],
1430) -> Option<String> {
1431 if vectors.len() != pending.len() {
1432 return Some(format!(
1433 "embedder returned {} vectors for {} chunks",
1434 vectors.len(),
1435 pending.len()
1436 ));
1437 }
1438
1439 if vectors.iter().any(|vector| vector.is_empty()) {
1440 if pending.len() == 1 {
1441 return Some(format!(
1442 "embedder returned an empty vector for chunk {}",
1443 pending[0].chunk_id
1444 ));
1445 }
1446 return Some("embedder returned an empty vector".to_string());
1447 }
1448
1449 None
1450}
1451
1452fn push_update_decision(
1453 report: &mut UpdateReport,
1454 options: &UpdateOptions,
1455 target: &UpdateTarget,
1456 path: &str,
1457 kind: UpdateDecisionKind,
1458 detail: Option<String>,
1459) {
1460 if !options.verbose {
1461 return;
1462 }
1463
1464 report.decisions.push(UpdateDecision {
1465 space: target.space.clone(),
1466 collection: target.collection.name.clone(),
1467 path: path.to_string(),
1468 kind,
1469 detail,
1470 });
1471}
1472
1473fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1474 UpdateDocKey {
1475 space: space.to_string(),
1476 collection_path: collection_path.to_path_buf(),
1477 path: path.to_string(),
1478 }
1479}
1480
1481fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1482 policy
1483 .hard_max_tokens
1484 .max(policy.soft_max_tokens)
1485 .max(policy.target_tokens)
1486 .max(1)
1487}