1use super::*;
2use kbolt_types::{UpdateDecision, UpdateDecisionKind};
3
4impl Engine {
5 pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
6 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
7 self.update_unlocked(options)
8 }
9
10 pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
11 let started = Instant::now();
12 let mut report = UpdateReport {
13 scanned_docs: 0,
14 skipped_mtime_docs: 0,
15 skipped_hash_docs: 0,
16 added_docs: 0,
17 updated_docs: 0,
18 failed_docs: 0,
19 deactivated_docs: 0,
20 reactivated_docs: 0,
21 reaped_docs: 0,
22 embedded_chunks: 0,
23 decisions: Vec::new(),
24 errors: Vec::new(),
25 elapsed_ms: 0,
26 };
27 let mut failed_docs = HashSet::new();
28
29 let targets = self.resolve_targets(TargetScope {
30 space: options.space.as_deref(),
31 collections: &options.collections,
32 })?;
33 if targets.is_empty() {
34 report.elapsed_ms = started.elapsed().as_millis() as u64;
35 return Ok(report);
36 }
37 let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
38
39 self.reconcile_dense_integrity(&targets, &repair_scope, &options)?;
40
41 if !options.dry_run {
42 self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)?;
43 }
44
45 let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
46 let mut pending_embeddings = Vec::new();
47 let mut failed_embedding_chunk_ids = HashSet::new();
48 for target in &targets {
49 self.update_collection_target(
50 target,
51 &options,
52 &mut report,
53 &mut fts_dirty_by_space,
54 &mut pending_embeddings,
55 &mut failed_embedding_chunk_ids,
56 &mut failed_docs,
57 )?;
58 }
59
60 if !options.dry_run {
61 self.flush_buffered_embeddings(
62 &mut pending_embeddings,
63 &mut failed_embedding_chunk_ids,
64 &mut report,
65 &mut failed_docs,
66 )?;
67
68 for (space, doc_ids) in fts_dirty_by_space {
69 if doc_ids.is_empty() {
70 continue;
71 }
72
73 self.storage.commit_tantivy(&space)?;
74 let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
75 ids.sort_unstable();
76 self.storage.batch_clear_fts_dirty(&ids)?;
77 }
78
79 self.embed_pending_chunks(
80 &repair_scope,
81 &options,
82 &mut failed_embedding_chunk_ids,
83 &mut report,
84 &mut failed_docs,
85 )?;
86
87 let reaped =
88 self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)?;
89 let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
90 let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
91 for document in reaped {
92 reaped_doc_ids.push(document.doc_id);
93 if !document.chunk_ids.is_empty() {
94 chunk_ids_by_space
95 .entry(document.space_name)
96 .or_default()
97 .extend(document.chunk_ids);
98 }
99 }
100 for (space, chunk_ids) in chunk_ids_by_space {
101 self.purge_space_chunks(&space, &chunk_ids)?;
102 }
103 self.storage.delete_documents(&reaped_doc_ids)?;
104 report.reaped_docs = reaped_doc_ids.len();
105 }
106
107 report.failed_docs = failed_docs.len();
108 report.elapsed_ms = started.elapsed().as_millis() as u64;
109 Ok(report)
110 }
111
112 fn reconcile_dense_integrity(
113 &self,
114 targets: &[UpdateTarget],
115 repair_scope: &UpdateRepairScope,
116 options: &UpdateOptions,
117 ) -> Result<()> {
118 if options.no_embed || options.dry_run {
119 return Ok(());
120 }
121
122 let expected_model = self.embedding_model_key();
123 let mut visited_spaces = HashSet::new();
124 for target in targets {
125 if !visited_spaces.insert(target.collection.space_id) {
126 continue;
127 }
128
129 let models = self
130 .storage
131 .list_embedding_models_in_space(target.collection.space_id)?;
132 let mut reasons = Vec::new();
133 if models.iter().any(|model| model != expected_model) {
134 reasons.push(format!(
135 "stored embedding models {:?} do not match current model '{}'",
136 models, expected_model
137 ));
138 }
139
140 let sqlite_count = self
141 .storage
142 .count_embedded_chunks(Some(target.collection.space_id))?;
143 let usearch_count = self.storage.count_usearch(&target.space)?;
144 if sqlite_count != usearch_count {
145 reasons.push(format!(
146 "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
147 ));
148 }
149
150 if reasons.is_empty() {
151 continue;
152 }
153
154 if !repair_scope.allows_space_dense_repair() {
155 return Err(KboltError::SpaceDenseRepairRequired {
156 space: target.space.clone(),
157 reason: reasons.join("; "),
158 }
159 .into());
160 }
161
162 self.storage
163 .delete_embeddings_for_space(target.collection.space_id)?;
164 self.storage.clear_usearch(&target.space)?;
165 }
166
167 Ok(())
168 }
169
170 fn embed_pending_chunks(
171 &self,
172 repair_scope: &UpdateRepairScope,
173 options: &UpdateOptions,
174 failed_chunk_ids: &mut HashSet<i64>,
175 report: &mut UpdateReport,
176 failed_docs: &mut HashSet<UpdateDocKey>,
177 ) -> Result<()> {
178 if options.no_embed || options.dry_run {
179 return Ok(());
180 }
181
182 let Some(embedder) = self.embedder.as_ref() else {
183 return Ok(());
184 };
185
186 let model = self.embedding_model_key();
187 let mut after_chunk_id = 0_i64;
188 loop {
189 let backlog =
190 self.get_unembedded_chunks_for_scope(model, repair_scope, after_chunk_id, 64)?;
191 if backlog.is_empty() {
192 break;
193 }
194
195 after_chunk_id = backlog
196 .last()
197 .map(|record| record.chunk_id)
198 .expect("non-empty backlog should have a last chunk id");
199
200 let mut pending = Vec::new();
201 let mut bytes_by_path: HashMap<std::path::PathBuf, Option<Vec<u8>>> = HashMap::new();
202 for record in backlog {
203 if failed_chunk_ids.contains(&record.chunk_id) {
204 continue;
205 }
206
207 let full_path = record.collection_path.join(&record.doc_path);
208 if !bytes_by_path.contains_key(&full_path) {
209 let bytes = match std::fs::read(&full_path) {
210 Ok(bytes) => Some(bytes),
211 Err(err) if err.kind() == std::io::ErrorKind::NotFound => None,
212 Err(err) => {
213 report.errors.push(file_error(
214 Some(full_path.clone()),
215 format!("embed read failed: {err}"),
216 ));
217 failed_docs.insert(update_doc_key(
218 &record.space_name,
219 &record.collection_path,
220 &record.doc_path,
221 ));
222 None
223 }
224 };
225 bytes_by_path.insert(full_path.clone(), bytes);
226 }
227
228 let Some(bytes) = bytes_by_path
229 .get(&full_path)
230 .and_then(|bytes| bytes.as_deref())
231 else {
232 continue;
233 };
234
235 let mut text = chunk_text_from_bytes(&bytes, record.offset, record.length);
236 if text.trim().is_empty() {
237 text = " ".to_string();
238 }
239 let max_document_tokens = match self.chunk_policy_for_path(&full_path) {
240 Ok(policy) => effective_chunk_hard_max(&policy),
241 Err(err) => {
242 report.errors.push(file_error(
243 Some(full_path.clone()),
244 format!("embed preflight policy resolution failed: {err}"),
245 ));
246 failed_docs.insert(update_doc_key(
247 &record.space_name,
248 &record.collection_path,
249 &record.doc_path,
250 ));
251 continue;
252 }
253 };
254
255 pending.push(PendingChunkEmbedding {
256 chunk_id: record.chunk_id,
257 doc_key: update_doc_key(
258 &record.space_name,
259 &record.collection_path,
260 &record.doc_path,
261 ),
262 space_name: record.space_name,
263 path: full_path,
264 text,
265 max_document_tokens,
266 });
267 }
268
269 if pending.is_empty() {
270 continue;
271 }
272
273 let result = self.embed_pending_batch_with_partial_failures(
274 embedder.as_ref(),
275 pending,
276 report,
277 failed_docs,
278 )?;
279 failed_chunk_ids.extend(result.failed_chunk_ids);
280 if !result.embeddings.is_empty() {
281 self.store_chunk_embeddings(model, result.embeddings, report)?;
282 }
283 }
284
285 Ok(())
286 }
287
288 fn flush_buffered_embeddings(
289 &self,
290 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
291 failed_chunk_ids: &mut HashSet<i64>,
292 report: &mut UpdateReport,
293 failed_docs: &mut HashSet<UpdateDocKey>,
294 ) -> Result<()> {
295 if pending_embeddings.is_empty() {
296 return Ok(());
297 }
298
299 let Some(embedder) = self.embedder.as_ref() else {
300 pending_embeddings.clear();
301 return Ok(());
302 };
303
304 let pending = std::mem::take(pending_embeddings);
305 let result = self.embed_pending_batch_with_partial_failures(
306 embedder.as_ref(),
307 pending,
308 report,
309 failed_docs,
310 )?;
311 failed_chunk_ids.extend(result.failed_chunk_ids);
312 if result.embeddings.is_empty() {
313 return Ok(());
314 }
315
316 self.store_chunk_embeddings(self.embedding_model_key(), result.embeddings, report)
317 }
318
319 fn embed_pending_batch_with_partial_failures(
320 &self,
321 embedder: &dyn crate::models::Embedder,
322 pending: Vec<PendingChunkEmbedding>,
323 report: &mut UpdateReport,
324 failed_docs: &mut HashSet<UpdateDocKey>,
325 ) -> Result<EmbeddedPendingBatch> {
326 if pending.is_empty() {
327 return Ok(EmbeddedPendingBatch::default());
328 }
329
330 let mut failed_chunk_ids = Vec::new();
331 let pending =
332 self.preflight_pending_embeddings(pending, report, failed_docs, &mut failed_chunk_ids)?;
333 if pending.is_empty() {
334 return Ok(EmbeddedPendingBatch {
335 embeddings: Vec::new(),
336 failed_chunk_ids,
337 });
338 }
339
340 let texts = pending
341 .iter()
342 .map(|embedding| embedding.text.clone())
343 .collect::<Vec<_>>();
344 match embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts) {
345 Ok(vectors) => {
346 if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
347 return self.split_pending_embedding_batch(
348 embedder,
349 pending,
350 report,
351 failed_docs,
352 detail,
353 );
354 }
355
356 let embeddings = pending
357 .into_iter()
358 .zip(vectors)
359 .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
360 .collect::<Vec<_>>();
361 Ok(EmbeddedPendingBatch {
362 embeddings,
363 failed_chunk_ids,
364 })
365 }
366 Err(err) => self
367 .split_pending_embedding_batch(
368 embedder,
369 pending,
370 report,
371 failed_docs,
372 err.to_string(),
373 )
374 .map(|mut result| {
375 result.failed_chunk_ids.extend(failed_chunk_ids);
376 result
377 }),
378 }
379 }
380
381 fn split_pending_embedding_batch(
382 &self,
383 embedder: &dyn crate::models::Embedder,
384 pending: Vec<PendingChunkEmbedding>,
385 report: &mut UpdateReport,
386 failed_docs: &mut HashSet<UpdateDocKey>,
387 detail: String,
388 ) -> Result<EmbeddedPendingBatch> {
389 if pending.len() == 1 {
390 let embedding = pending
391 .into_iter()
392 .next()
393 .expect("single pending embedding should exist");
394 report.errors.push(file_error(
395 Some(embedding.path),
396 format!("embed failed: {detail}"),
397 ));
398 failed_docs.insert(embedding.doc_key);
399 return Ok(EmbeddedPendingBatch {
400 embeddings: Vec::new(),
401 failed_chunk_ids: vec![embedding.chunk_id],
402 });
403 }
404
405 let mid = pending.len() / 2;
406 let mut right = pending;
407 let left = right.drain(..mid).collect::<Vec<_>>();
408
409 let mut left_result =
410 self.embed_pending_batch_with_partial_failures(embedder, left, report, failed_docs)?;
411 let right_result =
412 self.embed_pending_batch_with_partial_failures(embedder, right, report, failed_docs)?;
413 left_result.embeddings.extend(right_result.embeddings);
414 left_result
415 .failed_chunk_ids
416 .extend(right_result.failed_chunk_ids);
417 Ok(left_result)
418 }
419
420 fn store_chunk_embeddings(
421 &self,
422 model: &str,
423 embeddings: Vec<(i64, String, Vec<f32>)>,
424 report: &mut UpdateReport,
425 ) -> Result<()> {
426 let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
427 let mut embedding_rows = Vec::with_capacity(embeddings.len());
428 for (chunk_id, space_name, vector) in embeddings {
429 if vector.is_empty() {
430 return Err(KboltError::Inference(format!(
431 "embedder returned an empty vector for chunk {chunk_id}"
432 ))
433 .into());
434 }
435
436 grouped_vectors
437 .entry(space_name)
438 .or_default()
439 .push((chunk_id, vector));
440 embedding_rows.push((chunk_id, model));
441 }
442
443 for (space, vectors) in grouped_vectors {
444 let refs = vectors
445 .iter()
446 .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
447 .collect::<Vec<_>>();
448 self.storage.batch_insert_usearch(&space, &refs)?;
449 }
450
451 self.storage.insert_embeddings(&embedding_rows)?;
452 report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
453 Ok(())
454 }
455
456 fn preflight_pending_embeddings(
457 &self,
458 pending: Vec<PendingChunkEmbedding>,
459 report: &mut UpdateReport,
460 failed_docs: &mut HashSet<UpdateDocKey>,
461 failed_chunk_ids: &mut Vec<i64>,
462 ) -> Result<Vec<PendingChunkEmbedding>> {
463 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
464 return Ok(pending);
465 };
466
467 let mut accepted = Vec::with_capacity(pending.len());
468 for embedding in pending {
469 match sizer.count_document_tokens(&embedding.text) {
470 Ok(token_count) if token_count <= embedding.max_document_tokens => {
471 accepted.push(embedding);
472 }
473 Ok(token_count) => {
474 report.errors.push(file_error(
475 Some(embedding.path.clone()),
476 format!(
477 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
478 embedding.max_document_tokens
479 ),
480 ));
481 failed_docs.insert(embedding.doc_key);
482 failed_chunk_ids.push(embedding.chunk_id);
483 }
484 Err(err) => {
485 report.errors.push(file_error(
486 Some(embedding.path.clone()),
487 format!("embed preflight token count failed: {err}"),
488 ));
489 failed_docs.insert(embedding.doc_key);
490 failed_chunk_ids.push(embedding.chunk_id);
491 }
492 }
493 }
494
495 Ok(accepted)
496 }
497
498 fn preflight_prepared_embeddings(
499 &self,
500 prepared: Vec<PreparedChunkEmbedding>,
501 report: &mut UpdateReport,
502 failed_docs: &mut HashSet<UpdateDocKey>,
503 ) -> Result<PreparedEmbeddingPreflight> {
504 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
505 return Ok(PreparedEmbeddingPreflight {
506 accepted: prepared,
507 rejected_chunk_indexes: Vec::new(),
508 });
509 };
510
511 let mut preflight = PreparedEmbeddingPreflight {
512 accepted: Vec::with_capacity(prepared.len()),
513 rejected_chunk_indexes: Vec::new(),
514 };
515 for embedding in prepared {
516 match sizer.count_document_tokens(&embedding.text) {
517 Ok(token_count) if token_count <= embedding.max_document_tokens => {
518 preflight.accepted.push(embedding);
519 }
520 Ok(token_count) => {
521 report.errors.push(file_error(
522 Some(embedding.path.clone()),
523 format!(
524 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
525 embedding.max_document_tokens
526 ),
527 ));
528 failed_docs.insert(embedding.doc_key);
529 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
530 }
531 Err(err) => {
532 report.errors.push(file_error(
533 Some(embedding.path.clone()),
534 format!("embed preflight token count failed: {err}"),
535 ));
536 failed_docs.insert(embedding.doc_key);
537 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
538 }
539 }
540 }
541
542 Ok(preflight)
543 }
544
545 fn chunk_policy_for_path(&self, path: &Path) -> Result<crate::config::ChunkPolicy> {
546 let extractor = default_registry().resolve_for_path(path).ok_or_else(|| {
547 KboltError::InvalidInput(format!(
548 "no extractor available for embedding preflight path {}",
549 path.display()
550 ))
551 })?;
552 Ok(resolve_policy(
553 &self.config.chunking,
554 Some(extractor.profile_key()),
555 None,
556 ))
557 }
558
559 fn replay_fts_dirty_documents(
560 &self,
561 repair_scope: &UpdateRepairScope,
562 report: &mut UpdateReport,
563 failed_docs: &mut HashSet<UpdateDocKey>,
564 ) -> Result<()> {
565 let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
566 if records.is_empty() {
567 return Ok(());
568 }
569
570 let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
571 for record in records {
572 let space_name = record.space_name;
573 let doc_id = record.doc_id;
574
575 if record.chunks.is_empty() {
576 cleared_by_space.entry(space_name).or_default().push(doc_id);
577 continue;
578 }
579
580 let full_path = record.collection_path.join(&record.doc_path);
581 let bytes = match std::fs::read(&full_path) {
582 Ok(bytes) => bytes,
583 Err(err) if err.kind() == std::io::ErrorKind::NotFound => continue,
584 Err(err) => {
585 failed_docs.insert(update_doc_key(
586 &space_name,
587 &record.collection_path,
588 &record.doc_path,
589 ));
590 report.errors.push(file_error(
591 Some(full_path),
592 format!("fts replay read failed: {err}"),
593 ));
594 continue;
595 }
596 };
597 if sha256_hex(&bytes) != record.doc_hash {
598 continue;
599 }
600
601 self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
602
603 let file_body = String::from_utf8_lossy(&bytes).into_owned();
604 let entries = record
605 .chunks
606 .iter()
607 .map(|chunk| {
608 let chunk_body = chunk_text_from_bytes(&bytes, chunk.offset, chunk.length);
609 let source_body = if chunk_body.is_empty() {
610 file_body.as_str()
611 } else {
612 chunk_body.as_str()
613 };
614 TantivyEntry {
615 chunk_id: chunk.id,
616 doc_id,
617 filepath: record.doc_path.clone(),
618 semantic_title: record
619 .doc_title_source
620 .semantic_title(record.doc_title.as_str())
621 .map(ToString::to_string),
622 heading: chunk.heading.clone(),
623 body: retrieval_text_with_prefix(
624 source_body,
625 record
626 .doc_title_source
627 .semantic_title(record.doc_title.as_str()),
628 chunk.heading.as_deref(),
629 self.config.chunking.defaults.contextual_prefix,
630 ),
631 }
632 })
633 .collect::<Vec<_>>();
634
635 self.storage.index_tantivy(&space_name, &entries)?;
636 cleared_by_space.entry(space_name).or_default().push(doc_id);
637 }
638
639 for (space_name, mut doc_ids) in cleared_by_space {
640 if doc_ids.is_empty() {
641 continue;
642 }
643
644 doc_ids.sort_unstable();
645 doc_ids.dedup();
646 self.storage.commit_tantivy(&space_name)?;
647 self.storage.batch_clear_fts_dirty(&doc_ids)?;
648 }
649
650 Ok(())
651 }
652
653 fn get_fts_dirty_documents_for_scope(
654 &self,
655 repair_scope: &UpdateRepairScope,
656 ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
657 match repair_scope {
658 UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
659 UpdateRepairScope::Space { space_id } => {
660 self.storage.get_fts_dirty_documents_in_space(*space_id)
661 }
662 UpdateRepairScope::Collections { collection_ids } => self
663 .storage
664 .get_fts_dirty_documents_in_collections(collection_ids),
665 }
666 }
667
668 fn get_unembedded_chunks_for_scope(
669 &self,
670 model: &str,
671 repair_scope: &UpdateRepairScope,
672 after_chunk_id: i64,
673 limit: usize,
674 ) -> Result<Vec<crate::storage::EmbedRecord>> {
675 match repair_scope {
676 UpdateRepairScope::Global => {
677 self.storage
678 .get_unembedded_chunks(model, after_chunk_id, limit)
679 }
680 UpdateRepairScope::Space { space_id } => {
681 self.storage
682 .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
683 }
684 UpdateRepairScope::Collections { collection_ids } => self
685 .storage
686 .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
687 }
688 }
689
690 fn list_reapable_documents_for_scope(
691 &self,
692 older_than_days: u32,
693 repair_scope: &UpdateRepairScope,
694 ) -> Result<Vec<crate::storage::ReapableDocument>> {
695 match repair_scope {
696 UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
697 UpdateRepairScope::Space { space_id } => self
698 .storage
699 .list_reapable_documents_in_space(older_than_days, *space_id),
700 UpdateRepairScope::Collections { collection_ids } => self
701 .storage
702 .list_reapable_documents_in_collections(older_than_days, collection_ids),
703 }
704 }
705
706 pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
707 self.resolve_targets(TargetScope {
708 space: options.space.as_deref(),
709 collections: &options.collections,
710 })
711 }
712
713 pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
714 let mut targets = Vec::new();
715
716 if scope.collections.is_empty() {
717 return self.resolve_update_targets_for_all_collections(scope.space);
718 }
719
720 let mut seen = std::collections::HashSet::new();
721 for raw_collection_name in scope.collections {
722 let collection_name = raw_collection_name.trim();
723 if collection_name.is_empty() {
724 return Err(KboltError::InvalidInput(
725 "collection names cannot be empty".to_string(),
726 )
727 .into());
728 }
729
730 let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
731 let collection = self
732 .storage
733 .get_collection(resolved_space.id, collection_name)?;
734
735 if seen.insert((collection.space_id, collection.name.clone())) {
736 targets.push(UpdateTarget {
737 space: resolved_space.name,
738 collection,
739 });
740 }
741 }
742
743 Ok(targets)
744 }
745
746 fn resolve_update_targets_for_all_collections(
747 &self,
748 space: Option<&str>,
749 ) -> Result<Vec<UpdateTarget>> {
750 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
751 let resolved = self.resolve_space_row(Some(space_name), None)?;
752 let mut map = std::collections::HashMap::new();
753 map.insert(resolved.id, resolved.name.clone());
754 (Some(resolved.id), map)
755 } else {
756 let spaces = self.storage.list_spaces()?;
757 let map = spaces
758 .into_iter()
759 .map(|space| (space.id, space.name))
760 .collect::<std::collections::HashMap<_, _>>();
761 (None, map)
762 };
763
764 let collections = self.storage.list_collections(space_id_filter)?;
765 let mut targets = Vec::with_capacity(collections.len());
766 for collection in collections {
767 let space_name = spaces_by_id
768 .get(&collection.space_id)
769 .ok_or_else(|| {
770 KboltError::Internal(format!(
771 "missing space mapping for collection '{}'",
772 collection.name
773 ))
774 })?
775 .clone();
776 targets.push(UpdateTarget {
777 space: space_name,
778 collection,
779 });
780 }
781
782 Ok(targets)
783 }
784
785 fn update_collection_target(
786 &self,
787 target: &UpdateTarget,
788 options: &UpdateOptions,
789 report: &mut UpdateReport,
790 fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
791 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
792 failed_chunk_ids: &mut HashSet<i64>,
793 failed_docs: &mut HashSet<UpdateDocKey>,
794 ) -> Result<()> {
795 let all_documents = self.storage.list_documents(target.collection.id, false)?;
796 let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
797 .into_iter()
798 .map(|doc| (doc.path.clone(), doc))
799 .collect();
800 let mut seen_paths = HashSet::new();
801 let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
802 let ignore_matcher = load_collection_ignore_matcher(
803 &self.config.config_dir,
804 &target.collection.path,
805 &target.space,
806 &target.collection.name,
807 )?;
808 let extractor_registry = default_registry();
809 let mut touched_collection = false;
810
811 for entry in WalkDir::new(&target.collection.path)
812 .follow_links(false)
813 .into_iter()
814 .filter_entry(|entry| {
815 !entry.file_type().is_dir() || !is_hard_ignored_dir_name(entry.file_name())
816 })
817 {
818 let entry = match entry {
819 Ok(item) => item,
820 Err(err) => {
821 if let Some(path) = err.path() {
822 if let Ok(relative_path) =
823 collection_relative_path(&target.collection.path, path)
824 {
825 failed_docs.insert(update_doc_key(
826 &target.space,
827 &target.collection.path,
828 &relative_path,
829 ));
830 }
831 }
832 report.errors.push(file_error(
833 err.path().map(Path::to_path_buf),
834 format!("walkdir error: {err}"),
835 ));
836 continue;
837 }
838 };
839
840 if !entry.file_type().is_file() {
841 continue;
842 }
843
844 if is_hard_ignored_file(entry.path()) {
845 continue;
846 }
847
848 let relative_path =
849 match collection_relative_path(&target.collection.path, entry.path()) {
850 Ok(path) => path,
851 Err(err) => {
852 failed_docs.insert(update_doc_key(
853 &target.space,
854 &target.collection.path,
855 &entry.path().display().to_string(),
856 ));
857 report.errors.push(file_error(
858 Some(entry.path().to_path_buf()),
859 err.to_string(),
860 ));
861 continue;
862 }
863 };
864
865 if !extension_allowed(entry.path(), extension_filter.as_ref()) {
866 push_update_decision(
867 report,
868 options,
869 target,
870 &relative_path,
871 UpdateDecisionKind::Unsupported,
872 Some("extension not allowed".to_string()),
873 );
874 continue;
875 }
876
877 if let Some(matcher) = ignore_matcher.as_ref() {
878 if matcher
879 .matched(Path::new(&relative_path), false)
880 .is_ignore()
881 {
882 push_update_decision(
883 report,
884 options,
885 target,
886 &relative_path,
887 UpdateDecisionKind::Ignored,
888 Some("matched ignore patterns".to_string()),
889 );
890 continue;
891 }
892 }
893
894 let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
895 push_update_decision(
896 report,
897 options,
898 target,
899 &relative_path,
900 UpdateDecisionKind::Unsupported,
901 Some("no extractor available".to_string()),
902 );
903 continue;
904 };
905
906 report.scanned_docs += 1;
907 seen_paths.insert(relative_path.clone());
908
909 let metadata = match entry.metadata() {
910 Ok(data) => data,
911 Err(err) => {
912 let detail = format!("metadata error: {err}");
913 failed_docs.insert(update_doc_key(
914 &target.space,
915 &target.collection.path,
916 &relative_path,
917 ));
918 push_update_decision(
919 report,
920 options,
921 target,
922 &relative_path,
923 UpdateDecisionKind::ReadFailed,
924 Some(detail.clone()),
925 );
926 report
927 .errors
928 .push(file_error(Some(entry.path().to_path_buf()), detail));
929 continue;
930 }
931 };
932
933 let modified = match modified_token(&metadata) {
934 Ok(value) => value,
935 Err(err) => {
936 let detail = format!("modified timestamp error: {err}");
937 failed_docs.insert(update_doc_key(
938 &target.space,
939 &target.collection.path,
940 &relative_path,
941 ));
942 push_update_decision(
943 report,
944 options,
945 target,
946 &relative_path,
947 UpdateDecisionKind::ReadFailed,
948 Some(detail.clone()),
949 );
950 report
951 .errors
952 .push(file_error(Some(entry.path().to_path_buf()), detail));
953 continue;
954 }
955 };
956
957 if let Some(existing) = docs_by_path.get(&relative_path) {
958 if existing.active && existing.modified == modified {
959 report.skipped_mtime_docs += 1;
960 push_update_decision(
961 report,
962 options,
963 target,
964 &relative_path,
965 UpdateDecisionKind::SkippedMtime,
966 None,
967 );
968 continue;
969 }
970 }
971
972 let bytes = match std::fs::read(entry.path()) {
973 Ok(data) => data,
974 Err(err) => {
975 let detail = err.to_string();
976 failed_docs.insert(update_doc_key(
977 &target.space,
978 &target.collection.path,
979 &relative_path,
980 ));
981 push_update_decision(
982 report,
983 options,
984 target,
985 &relative_path,
986 UpdateDecisionKind::ReadFailed,
987 Some(detail.clone()),
988 );
989 report
990 .errors
991 .push(file_error(Some(entry.path().to_path_buf()), detail));
992 continue;
993 }
994 };
995 let hash = sha256_hex(&bytes);
996 let mut title = file_title(entry.path());
997 let mut title_source = DocumentTitleSource::FilenameFallback;
998
999 let existing = docs_by_path.get(&relative_path).cloned();
1000 let pending_decision;
1001 let pending_indexing;
1002 if let Some(doc) = existing.as_ref() {
1003 if doc.hash == hash {
1004 if doc.active {
1005 report.skipped_hash_docs += 1;
1006 push_update_decision(
1007 report,
1008 options,
1009 target,
1010 &relative_path,
1011 UpdateDecisionKind::SkippedHash,
1012 None,
1013 );
1014 } else {
1015 report.reactivated_docs += 1;
1016 push_update_decision(
1017 report,
1018 options,
1019 target,
1020 &relative_path,
1021 UpdateDecisionKind::Reactivated,
1022 None,
1023 );
1024 }
1025
1026 if !options.dry_run {
1027 self.storage.refresh_document_activity(doc.id, &modified)?;
1028 }
1029 continue;
1030 }
1031
1032 pending_indexing = PendingDocumentIndexing::Updated {
1033 reactivated: !doc.active,
1034 };
1035 pending_decision = (
1036 UpdateDecisionKind::Changed,
1037 (!doc.active).then_some("reactivated".to_string()),
1038 );
1039 } else {
1040 pending_indexing = PendingDocumentIndexing::Added;
1041 pending_decision = (UpdateDecisionKind::New, None);
1042 }
1043
1044 if options.dry_run {
1045 pending_indexing.record(report);
1046 let (kind, detail) = pending_decision;
1047 push_update_decision(report, options, target, &relative_path, kind, detail);
1048 continue;
1049 }
1050
1051 let extracted = match extractor.extract(entry.path(), &bytes) {
1052 Ok(document) => document,
1053 Err(err) => {
1054 let detail = format!("extract failed: {err}");
1055 failed_docs.insert(update_doc_key(
1056 &target.space,
1057 &target.collection.path,
1058 &relative_path,
1059 ));
1060 push_update_decision(
1061 report,
1062 options,
1063 target,
1064 &relative_path,
1065 UpdateDecisionKind::ExtractFailed,
1066 Some(detail.clone()),
1067 );
1068 report
1069 .errors
1070 .push(file_error(Some(entry.path().to_path_buf()), detail));
1071 continue;
1072 }
1073 };
1074 if let Some(extracted_title) = extracted
1075 .title
1076 .as_deref()
1077 .map(str::trim)
1078 .filter(|title| !title.is_empty())
1079 {
1080 title = extracted_title.to_string();
1081 title_source = DocumentTitleSource::Extracted;
1082 }
1083
1084 let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
1085 let max_document_tokens = effective_chunk_hard_max(&policy);
1086 let final_chunks = match self.embedding_document_sizer.as_ref() {
1087 Some(sizer) => {
1088 let sizer_counter = EmbeddingDocumentSizerCounter(sizer.as_ref());
1089 match chunk_document_with_counter(&extracted, &policy, &sizer_counter) {
1090 Ok(chunks) => chunks,
1091 Err(err) => {
1092 let detail = format!("chunking failed: {err}");
1093 failed_docs.insert(update_doc_key(
1094 &target.space,
1095 &target.collection.path,
1096 &relative_path,
1097 ));
1098 push_update_decision(
1099 report,
1100 options,
1101 target,
1102 &relative_path,
1103 UpdateDecisionKind::ExtractFailed,
1104 Some(detail.clone()),
1105 );
1106 report
1107 .errors
1108 .push(file_error(Some(entry.path().to_path_buf()), detail));
1109 continue;
1110 }
1111 }
1112 }
1113 None => chunk_document(&extracted, &policy),
1114 };
1115
1116 let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1117 let chunk_inserts = final_chunks
1118 .iter()
1119 .enumerate()
1120 .map(|(index, chunk)| ChunkInsert {
1121 seq: index as i32,
1122 offset: chunk.offset,
1123 length: chunk.length,
1124 heading: chunk.heading.clone(),
1125 kind: chunk.kind,
1126 })
1127 .collect::<Vec<_>>();
1128 let body = String::from_utf8_lossy(&bytes).into_owned();
1129 let mut prepared_embeddings = Vec::new();
1130 let mut rejected_chunk_indexes = Vec::new();
1131 if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1132 let preflight = self.preflight_prepared_embeddings(
1133 prepare_chunk_embeddings(
1134 &final_chunks,
1135 &bytes,
1136 &doc_key,
1137 target,
1138 entry.path(),
1139 max_document_tokens,
1140 ),
1141 report,
1142 failed_docs,
1143 )?;
1144 prepared_embeddings = preflight.accepted;
1145 rejected_chunk_indexes = preflight.rejected_chunk_indexes;
1146 if existing.is_some() && !rejected_chunk_indexes.is_empty() {
1147 push_update_decision(
1148 report,
1149 options,
1150 target,
1151 &relative_path,
1152 UpdateDecisionKind::ExtractFailed,
1153 Some("embed preflight failed".to_string()),
1154 );
1155 continue;
1156 }
1157 }
1158
1159 let doc_id = self.storage.upsert_document(
1160 target.collection.id,
1161 &relative_path,
1162 &title,
1163 title_source,
1164 &hash,
1165 &modified,
1166 )?;
1167
1168 if let Some(doc) = existing.as_ref() {
1169 let old_chunk_ids = self.storage.delete_chunks_for_document(doc.id)?;
1170 if !old_chunk_ids.is_empty() {
1171 self.storage.delete_tantivy(&target.space, &old_chunk_ids)?;
1172 self.storage.delete_usearch(&target.space, &old_chunk_ids)?;
1173 }
1174 }
1175
1176 let chunk_ids = self.storage.insert_chunks(doc_id, &chunk_inserts)?;
1177 for chunk_index in rejected_chunk_indexes {
1178 if let Some(chunk_id) = chunk_ids.get(chunk_index) {
1179 failed_chunk_ids.insert(*chunk_id);
1180 }
1181 }
1182
1183 if !chunk_ids.is_empty() {
1184 if !options.no_embed && self.embedder.is_some() {
1185 for prepared in prepared_embeddings {
1186 pending_embeddings.push(PendingChunkEmbedding {
1187 chunk_id: chunk_ids[prepared.chunk_index],
1188 doc_key: prepared.doc_key,
1189 space_name: prepared.space_name,
1190 path: prepared.path,
1191 text: prepared.text,
1192 max_document_tokens: prepared.max_document_tokens,
1193 });
1194 }
1195 if pending_embeddings.len() >= 64 {
1196 self.flush_buffered_embeddings(
1197 pending_embeddings,
1198 failed_chunk_ids,
1199 report,
1200 failed_docs,
1201 )?;
1202 }
1203 }
1204
1205 let entries = chunk_ids
1206 .iter()
1207 .zip(final_chunks.iter())
1208 .map(|(chunk_id, chunk)| TantivyEntry {
1209 chunk_id: *chunk_id,
1210 doc_id,
1211 filepath: relative_path.clone(),
1212 semantic_title: title_source
1213 .semantic_title(title.as_str())
1214 .map(ToString::to_string),
1215 heading: chunk.heading.clone(),
1216 body: retrieval_text_with_prefix(
1217 if chunk.text.is_empty() {
1218 body.as_str()
1219 } else {
1220 chunk.text.as_str()
1221 },
1222 title_source.semantic_title(title.as_str()),
1223 chunk.heading.as_deref(),
1224 policy.contextual_prefix,
1225 ),
1226 })
1227 .collect::<Vec<_>>();
1228 self.storage.index_tantivy(&target.space, &entries)?;
1229 fts_dirty_by_space
1230 .entry(target.space.clone())
1231 .or_default()
1232 .insert(doc_id);
1233 }
1234
1235 docs_by_path.insert(
1236 relative_path.clone(),
1237 self.storage
1238 .get_document_by_path(target.collection.id, &relative_path)?
1239 .ok_or_else(|| {
1240 KboltError::Internal(format!(
1241 "document missing after upsert: collection={}, path={relative_path}",
1242 target.collection.id
1243 ))
1244 })?,
1245 );
1246 pending_indexing.record(report);
1247 let (kind, detail) = pending_decision;
1248 push_update_decision(report, options, target, &relative_path, kind, detail);
1249 touched_collection = true;
1250 }
1251
1252 let mut missing_docs = docs_by_path
1253 .values()
1254 .filter(|doc| doc.active && !seen_paths.contains(&doc.path))
1255 .cloned()
1256 .collect::<Vec<_>>();
1257 missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1258
1259 for doc in missing_docs {
1260 if doc.active && !seen_paths.contains(&doc.path) {
1261 report.deactivated_docs += 1;
1262 push_update_decision(
1263 report,
1264 options,
1265 target,
1266 &doc.path,
1267 UpdateDecisionKind::Deactivated,
1268 None,
1269 );
1270 if !options.dry_run {
1271 self.storage.deactivate_document(doc.id)?;
1272 touched_collection = true;
1273 }
1274 }
1275 }
1276
1277 if touched_collection && !options.dry_run {
1278 self.storage
1279 .update_collection_timestamp(target.collection.id)?;
1280 }
1281
1282 Ok(())
1283 }
1284}
1285
1286#[derive(Debug)]
1287struct PreparedChunkEmbedding {
1288 chunk_index: usize,
1289 doc_key: UpdateDocKey,
1290 space_name: String,
1291 path: std::path::PathBuf,
1292 text: String,
1293 max_document_tokens: usize,
1294}
1295
1296#[derive(Debug, Default)]
1297struct PreparedEmbeddingPreflight {
1298 accepted: Vec<PreparedChunkEmbedding>,
1299 rejected_chunk_indexes: Vec<usize>,
1300}
1301
1302#[derive(Debug)]
1303struct PendingChunkEmbedding {
1304 chunk_id: i64,
1305 doc_key: UpdateDocKey,
1306 space_name: String,
1307 path: std::path::PathBuf,
1308 text: String,
1309 max_document_tokens: usize,
1310}
1311
1312#[derive(Default)]
1313struct EmbeddedPendingBatch {
1314 embeddings: Vec<(i64, String, Vec<f32>)>,
1315 failed_chunk_ids: Vec<i64>,
1316}
1317
1318#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1319struct UpdateDocKey {
1320 space: String,
1321 collection_path: std::path::PathBuf,
1322 path: String,
1323}
1324
1325struct EmbeddingDocumentSizerCounter<'a>(&'a dyn crate::models::EmbeddingDocumentSizer);
1326
1327impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1328 fn count(&self, text: &str) -> Result<usize> {
1329 self.0.count_document_tokens(text)
1330 }
1331}
1332
1333fn prepare_chunk_embeddings(
1334 final_chunks: &[crate::ingest::chunk::FinalChunk],
1335 bytes: &[u8],
1336 doc_key: &UpdateDocKey,
1337 target: &UpdateTarget,
1338 path: &Path,
1339 max_document_tokens: usize,
1340) -> Vec<PreparedChunkEmbedding> {
1341 final_chunks
1342 .iter()
1343 .enumerate()
1344 .map(|(chunk_index, chunk)| {
1345 let mut text = chunk_text_from_bytes(bytes, chunk.offset, chunk.length);
1346 if text.trim().is_empty() {
1347 text = " ".to_string();
1348 }
1349 PreparedChunkEmbedding {
1350 chunk_index,
1351 doc_key: doc_key.clone(),
1352 space_name: target.space.clone(),
1353 path: path.to_path_buf(),
1354 text,
1355 max_document_tokens,
1356 }
1357 })
1358 .collect()
1359}
1360
1361#[derive(Debug, Clone)]
1362enum UpdateRepairScope {
1363 Global,
1364 Space { space_id: i64 },
1365 Collections { collection_ids: Vec<i64> },
1366}
1367
1368impl UpdateRepairScope {
1369 fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1370 if options.collections.is_empty() {
1371 if let Some(target) = targets.first() {
1372 if options.space.is_some() {
1373 return Self::Space {
1374 space_id: target.collection.space_id,
1375 };
1376 }
1377 }
1378 return Self::Global;
1379 }
1380
1381 let mut collection_ids = targets
1382 .iter()
1383 .map(|target| target.collection.id)
1384 .collect::<Vec<_>>();
1385 collection_ids.sort_unstable();
1386 collection_ids.dedup();
1387 Self::Collections { collection_ids }
1388 }
1389
1390 fn allows_space_dense_repair(&self) -> bool {
1391 !matches!(self, Self::Collections { .. })
1392 }
1393}
1394
1395#[derive(Debug, Clone, Copy)]
1396enum PendingDocumentIndexing {
1397 Added,
1398 Updated { reactivated: bool },
1399}
1400
1401impl PendingDocumentIndexing {
1402 fn record(self, report: &mut UpdateReport) {
1403 match self {
1404 Self::Added => {
1405 report.added_docs += 1;
1406 }
1407 Self::Updated { reactivated } => {
1408 report.updated_docs += 1;
1409 if reactivated {
1410 report.reactivated_docs += 1;
1411 }
1412 }
1413 }
1414 }
1415}
1416
1417fn invalid_pending_embedding_batch_detail(
1418 pending: &[PendingChunkEmbedding],
1419 vectors: &[Vec<f32>],
1420) -> Option<String> {
1421 if vectors.len() != pending.len() {
1422 return Some(format!(
1423 "embedder returned {} vectors for {} chunks",
1424 vectors.len(),
1425 pending.len()
1426 ));
1427 }
1428
1429 if vectors.iter().any(|vector| vector.is_empty()) {
1430 if pending.len() == 1 {
1431 return Some(format!(
1432 "embedder returned an empty vector for chunk {}",
1433 pending[0].chunk_id
1434 ));
1435 }
1436 return Some("embedder returned an empty vector".to_string());
1437 }
1438
1439 None
1440}
1441
1442fn push_update_decision(
1443 report: &mut UpdateReport,
1444 options: &UpdateOptions,
1445 target: &UpdateTarget,
1446 path: &str,
1447 kind: UpdateDecisionKind,
1448 detail: Option<String>,
1449) {
1450 if !options.verbose {
1451 return;
1452 }
1453
1454 report.decisions.push(UpdateDecision {
1455 space: target.space.clone(),
1456 collection: target.collection.name.clone(),
1457 path: path.to_string(),
1458 kind,
1459 detail,
1460 });
1461}
1462
1463fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1464 UpdateDocKey {
1465 space: space.to_string(),
1466 collection_path: collection_path.to_path_buf(),
1467 path: path.to_string(),
1468 }
1469}
1470
1471fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1472 policy
1473 .hard_max_tokens
1474 .max(policy.soft_max_tokens)
1475 .max(policy.target_tokens)
1476 .max(1)
1477}