1use super::*;
2use kbolt_types::{UpdateDecision, UpdateDecisionKind};
3
4const CANONICAL_TEXT_GENERATION: u32 = 1;
5const CHUNKER_GENERATION: u32 = 2;
6
7impl Engine {
8 pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
9 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
10 self.update_unlocked(options)
11 }
12
13 pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
14 let _profile = crate::profile::UpdateProfileGuard::start();
15 let update_result = self.run_update_unlocked(options);
16 let flush_result = crate::profile::timed_update_stage("usearch_save_dirty", || {
17 self.storage.save_dirty_usearch_indexes()
18 });
19 finish_update_with_usearch_flush(update_result, flush_result)
20 }
21
22 fn run_update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
23 let started = Instant::now();
24 let mut report = UpdateReport {
25 scanned_docs: 0,
26 skipped_mtime_docs: 0,
27 skipped_hash_docs: 0,
28 added_docs: 0,
29 updated_docs: 0,
30 failed_docs: 0,
31 deactivated_docs: 0,
32 reactivated_docs: 0,
33 reaped_docs: 0,
34 embedded_chunks: 0,
35 decisions: Vec::new(),
36 errors: Vec::new(),
37 elapsed_ms: 0,
38 };
39 let mut failed_docs = HashSet::new();
40
41 let targets = crate::profile::timed_update_stage("resolve_targets", || {
42 self.resolve_targets(TargetScope {
43 space: options.space.as_deref(),
44 collections: &options.collections,
45 })
46 })?;
47 if targets.is_empty() {
48 report.elapsed_ms = started.elapsed().as_millis() as u64;
49 return Ok(report);
50 }
51 let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
52
53 crate::profile::timed_update_stage("dense_integrity", || {
54 self.reconcile_dense_integrity(&targets, &repair_scope, &options)
55 })?;
56
57 if !options.dry_run {
58 crate::profile::timed_update_stage("fts_dirty_replay", || {
59 self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)
60 })?;
61 }
62
63 let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
64 let mut pending_embeddings = Vec::new();
65 let mut failed_embedding_chunk_ids = HashSet::new();
66 for target in &targets {
67 self.update_collection_target(
68 target,
69 &options,
70 &mut report,
71 &mut fts_dirty_by_space,
72 &mut pending_embeddings,
73 &mut failed_embedding_chunk_ids,
74 &mut failed_docs,
75 )?;
76 }
77
78 if !options.dry_run {
79 crate::profile::timed_update_stage("embedding_flush_buffered", || {
80 self.flush_buffered_embeddings(
81 &mut pending_embeddings,
82 &mut failed_embedding_chunk_ids,
83 &mut report,
84 &mut failed_docs,
85 )
86 })?;
87
88 for (space, doc_ids) in fts_dirty_by_space {
89 if doc_ids.is_empty() {
90 continue;
91 }
92
93 crate::profile::timed_update_stage("tantivy_commit", || {
94 self.storage.commit_tantivy(&space)
95 })?;
96 let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
97 ids.sort_unstable();
98 crate::profile::timed_update_stage("sqlite_clear_fts_dirty", || {
99 self.storage.batch_clear_fts_dirty(&ids)
100 })?;
101 }
102
103 crate::profile::timed_update_stage("embedding_backlog", || {
104 self.embed_pending_chunks(
105 &repair_scope,
106 &options,
107 &mut failed_embedding_chunk_ids,
108 &mut report,
109 &mut failed_docs,
110 )
111 })?;
112
113 let reaped = crate::profile::timed_update_stage("sqlite_list_reapable", || {
114 self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)
115 })?;
116 let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
117 let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
118 for document in reaped {
119 reaped_doc_ids.push(document.doc_id);
120 if !document.chunk_ids.is_empty() {
121 chunk_ids_by_space
122 .entry(document.space_name)
123 .or_default()
124 .extend(document.chunk_ids);
125 }
126 }
127 for (space, chunk_ids) in chunk_ids_by_space {
128 crate::profile::timed_update_stage("purge_reaped_indexes", || {
129 self.purge_space_chunks(&space, &chunk_ids)
130 })?;
131 }
132 crate::profile::timed_update_stage("sqlite_delete_reaped_documents", || {
133 self.storage.delete_documents(&reaped_doc_ids)
134 })?;
135 report.reaped_docs = reaped_doc_ids.len();
136 }
137
138 report.failed_docs = failed_docs.len();
139 report.elapsed_ms = started.elapsed().as_millis() as u64;
140 Ok(report)
141 }
142
143 fn reconcile_dense_integrity(
144 &self,
145 targets: &[UpdateTarget],
146 repair_scope: &UpdateRepairScope,
147 options: &UpdateOptions,
148 ) -> Result<()> {
149 if options.no_embed || options.dry_run {
150 return Ok(());
151 }
152
153 let expected_model = self.embedding_model_key();
154 let mut visited_spaces = HashSet::new();
155 for target in targets {
156 if !visited_spaces.insert(target.collection.space_id) {
157 continue;
158 }
159
160 let models = self
161 .storage
162 .list_embedding_models_in_space(target.collection.space_id)?;
163 let mut reasons = Vec::new();
164 if models.iter().any(|model| model != expected_model) {
165 reasons.push(format!(
166 "stored embedding models {:?} do not match current model '{}'",
167 models, expected_model
168 ));
169 }
170
171 let sqlite_count = self
172 .storage
173 .count_embedded_chunks(Some(target.collection.space_id))?;
174 let usearch_count = self.storage.count_usearch(&target.space)?;
175 if sqlite_count != usearch_count {
176 reasons.push(format!(
177 "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
178 ));
179 }
180
181 if reasons.is_empty() {
182 continue;
183 }
184
185 if !repair_scope.allows_space_dense_repair() {
186 return Err(KboltError::SpaceDenseRepairRequired {
187 space: target.space.clone(),
188 reason: reasons.join("; "),
189 }
190 .into());
191 }
192
193 self.storage
194 .delete_embeddings_for_space(target.collection.space_id)?;
195 self.storage.clear_usearch(&target.space)?;
196 }
197
198 Ok(())
199 }
200
201 fn embed_pending_chunks(
202 &self,
203 repair_scope: &UpdateRepairScope,
204 options: &UpdateOptions,
205 failed_chunk_ids: &mut HashSet<i64>,
206 report: &mut UpdateReport,
207 failed_docs: &mut HashSet<UpdateDocKey>,
208 ) -> Result<()> {
209 if options.no_embed || options.dry_run {
210 return Ok(());
211 }
212
213 let Some(embedder) = self.embedder.as_ref() else {
214 return Ok(());
215 };
216
217 let model = self.embedding_model_key();
218 let batch_window = embedder.preferred_document_batch_window().max(1);
219 let mut after_chunk_id = 0_i64;
220 loop {
221 let backlog =
222 crate::profile::timed_update_stage("sqlite_get_unembedded_chunks", || {
223 self.get_unembedded_chunks_for_scope(
224 model,
225 repair_scope,
226 after_chunk_id,
227 batch_window,
228 )
229 })?;
230 if backlog.is_empty() {
231 break;
232 }
233
234 after_chunk_id = backlog
235 .last()
236 .map(|record| record.chunk.id)
237 .expect("non-empty backlog should have a last chunk id");
238
239 let backlog_doc_ids = backlog
240 .iter()
241 .filter(|record| !failed_chunk_ids.contains(&record.chunk.id))
242 .map(|record| record.chunk.doc_id)
243 .collect::<Vec<_>>();
244 let document_text_cache =
245 crate::profile::timed_update_stage("embedding_backlog_read", || {
246 self.storage.get_existing_document_texts(&backlog_doc_ids)
247 })?;
248
249 let mut pending = Vec::new();
250 for record in backlog {
251 if failed_chunk_ids.contains(&record.chunk.id) {
252 continue;
253 }
254
255 let full_path = record.collection_path.join(&record.doc_path);
256 let Some(document_text) = document_text_cache.get(&record.chunk.doc_id) else {
257 report.errors.push(file_error(
258 Some(full_path.clone()),
259 format!(
260 "embed canonical text load failed: {}",
261 crate::storage::missing_document_text_error(record.chunk.doc_id)
262 ),
263 ));
264 failed_docs.insert(update_doc_key(
265 &record.space_name,
266 &record.collection_path,
267 &record.doc_path,
268 ));
269 continue;
270 };
271 let chunk_canonical_text = match crate::storage::chunk_text_from_canonical(
272 document_text.text.as_str(),
273 &record.chunk,
274 ) {
275 Ok(text) => text,
276 Err(err) => {
277 report.errors.push(file_error(
278 Some(full_path.clone()),
279 format!("embed canonical text load failed: {err}"),
280 ));
281 failed_docs.insert(update_doc_key(
282 &record.space_name,
283 &record.collection_path,
284 &record.doc_path,
285 ));
286 continue;
287 }
288 };
289 let mut text = crate::ingest::chunk::chunk_retrieval_body(
290 chunk_canonical_text.as_str(),
291 record.chunk.retrieval_prefix.as_deref(),
292 );
293 if text.trim().is_empty() {
294 text = " ".to_string();
295 }
296 let policy = resolve_policy(
297 &self.config.chunking,
298 Some(document_text.extractor_key.as_str()),
299 None,
300 );
301 let max_document_tokens = effective_chunk_hard_max(&policy);
302
303 pending.push(PendingChunkEmbedding {
304 chunk_id: record.chunk.id,
305 doc_key: update_doc_key(
306 &record.space_name,
307 &record.collection_path,
308 &record.doc_path,
309 ),
310 space_name: record.space_name,
311 path: full_path,
312 text,
313 max_document_tokens,
314 });
315 }
316
317 if pending.is_empty() {
318 continue;
319 }
320
321 let mut preflight_failed_chunk_ids = Vec::new();
322 let pending = self.preflight_pending_embeddings(
323 pending,
324 report,
325 failed_docs,
326 &mut preflight_failed_chunk_ids,
327 )?;
328 failed_chunk_ids.extend(preflight_failed_chunk_ids);
329 if pending.is_empty() {
330 continue;
331 }
332
333 let result = self.embed_preflighted_pending_batch_with_partial_failures(
334 embedder.as_ref(),
335 pending,
336 report,
337 failed_docs,
338 )?;
339 failed_chunk_ids.extend(result.failed_chunk_ids);
340 if !result.embeddings.is_empty() {
341 self.store_chunk_embeddings(model, result.embeddings, report)?;
342 }
343 }
344
345 Ok(())
346 }
347
348 fn flush_buffered_embeddings(
349 &self,
350 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
351 failed_chunk_ids: &mut HashSet<i64>,
352 report: &mut UpdateReport,
353 failed_docs: &mut HashSet<UpdateDocKey>,
354 ) -> Result<()> {
355 if pending_embeddings.is_empty() {
356 return Ok(());
357 }
358
359 let Some(embedder) = self.embedder.as_ref() else {
360 pending_embeddings.clear();
361 return Ok(());
362 };
363
364 let pending = std::mem::take(pending_embeddings);
365 let result = self.embed_preflighted_pending_batch_with_partial_failures(
366 embedder.as_ref(),
367 pending,
368 report,
369 failed_docs,
370 )?;
371 failed_chunk_ids.extend(result.failed_chunk_ids);
372 if result.embeddings.is_empty() {
373 return Ok(());
374 }
375
376 self.store_chunk_embeddings(self.embedding_model_key(), result.embeddings, report)
377 }
378
379 fn document_embedding_batch_window(&self) -> usize {
380 self.embedder
381 .as_ref()
382 .map(|embedder| embedder.preferred_document_batch_window())
383 .unwrap_or(crate::models::DEFAULT_DOCUMENT_BATCH_WINDOW)
384 .max(1)
385 }
386
387 fn embed_preflighted_pending_batch_with_partial_failures(
388 &self,
389 embedder: &dyn crate::models::Embedder,
390 pending: Vec<PendingChunkEmbedding>,
391 report: &mut UpdateReport,
392 failed_docs: &mut HashSet<UpdateDocKey>,
393 ) -> Result<EmbeddedPendingBatch> {
394 if pending.is_empty() {
395 return Ok(EmbeddedPendingBatch::default());
396 }
397
398 let texts = pending
399 .iter()
400 .map(|embedding| embedding.text.clone())
401 .collect::<Vec<_>>();
402 crate::profile::increment_update_count("embedding_batch_calls", 1);
403 crate::profile::increment_update_count("embedding_input_texts", texts.len() as u64);
404 match crate::profile::timed_update_stage("embedding_http", || {
405 embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts)
406 }) {
407 Ok(vectors) => {
408 if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
409 return self.split_pending_embedding_batch(
410 embedder,
411 pending,
412 report,
413 failed_docs,
414 detail,
415 );
416 }
417
418 let embeddings = pending
419 .into_iter()
420 .zip(vectors)
421 .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
422 .collect::<Vec<_>>();
423 Ok(EmbeddedPendingBatch {
424 embeddings,
425 failed_chunk_ids: Vec::new(),
426 })
427 }
428 Err(err) => self.split_pending_embedding_batch(
429 embedder,
430 pending,
431 report,
432 failed_docs,
433 err.to_string(),
434 ),
435 }
436 }
437
438 fn split_pending_embedding_batch(
439 &self,
440 embedder: &dyn crate::models::Embedder,
441 pending: Vec<PendingChunkEmbedding>,
442 report: &mut UpdateReport,
443 failed_docs: &mut HashSet<UpdateDocKey>,
444 detail: String,
445 ) -> Result<EmbeddedPendingBatch> {
446 if pending.len() == 1 {
447 let embedding = pending
448 .into_iter()
449 .next()
450 .expect("single pending embedding should exist");
451 report.errors.push(file_error(
452 Some(embedding.path),
453 format!("embed failed: {detail}"),
454 ));
455 failed_docs.insert(embedding.doc_key);
456 return Ok(EmbeddedPendingBatch {
457 embeddings: Vec::new(),
458 failed_chunk_ids: vec![embedding.chunk_id],
459 });
460 }
461
462 let mid = pending.len() / 2;
463 let mut right = pending;
464 let left = right.drain(..mid).collect::<Vec<_>>();
465
466 let mut left_result = self.embed_preflighted_pending_batch_with_partial_failures(
467 embedder,
468 left,
469 report,
470 failed_docs,
471 )?;
472 let right_result = self.embed_preflighted_pending_batch_with_partial_failures(
473 embedder,
474 right,
475 report,
476 failed_docs,
477 )?;
478 left_result.embeddings.extend(right_result.embeddings);
479 left_result
480 .failed_chunk_ids
481 .extend(right_result.failed_chunk_ids);
482 Ok(left_result)
483 }
484
485 fn store_chunk_embeddings(
486 &self,
487 model: &str,
488 embeddings: Vec<(i64, String, Vec<f32>)>,
489 report: &mut UpdateReport,
490 ) -> Result<()> {
491 let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
492 let mut embedding_rows = Vec::with_capacity(embeddings.len());
493 for (chunk_id, space_name, vector) in embeddings {
494 if vector.is_empty() {
495 return Err(KboltError::Inference(format!(
496 "embedder returned an empty vector for chunk {chunk_id}"
497 ))
498 .into());
499 }
500
501 grouped_vectors
502 .entry(space_name)
503 .or_default()
504 .push((chunk_id, vector));
505 embedding_rows.push((chunk_id, model));
506 }
507
508 for (space, vectors) in grouped_vectors {
509 let refs = vectors
510 .iter()
511 .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
512 .collect::<Vec<_>>();
513 crate::profile::increment_update_count("usearch_vectors", refs.len() as u64);
514 self.storage.batch_insert_usearch_deferred(&space, &refs)?;
515 }
516
517 crate::profile::timed_update_stage("sqlite_insert_embeddings", || {
518 self.storage.insert_embeddings(&embedding_rows)
519 })?;
520 report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
521 Ok(())
522 }
523
524 fn preflight_pending_embeddings(
525 &self,
526 pending: Vec<PendingChunkEmbedding>,
527 report: &mut UpdateReport,
528 failed_docs: &mut HashSet<UpdateDocKey>,
529 failed_chunk_ids: &mut Vec<i64>,
530 ) -> Result<Vec<PendingChunkEmbedding>> {
531 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
532 return Ok(pending);
533 };
534
535 let mut accepted = Vec::with_capacity(pending.len());
536 for embedding in pending {
537 if sizer.fits_within_token_limit_by_byte_len(
538 embedding.text.len(),
539 embedding.max_document_tokens,
540 ) {
541 crate::profile::increment_update_count("tokenize_preflight_byte_skips", 1);
542 accepted.push(embedding);
543 continue;
544 }
545
546 match count_document_tokens_profiled(
547 sizer.as_ref(),
548 &embedding.text,
549 "tokenize_preflight",
550 "tokenize_preflight_calls",
551 ) {
552 Ok(token_count) if token_count <= embedding.max_document_tokens => {
553 accepted.push(embedding);
554 }
555 Ok(token_count) => {
556 report.errors.push(file_error(
557 Some(embedding.path.clone()),
558 format!(
559 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
560 embedding.max_document_tokens
561 ),
562 ));
563 failed_docs.insert(embedding.doc_key);
564 failed_chunk_ids.push(embedding.chunk_id);
565 }
566 Err(err) => {
567 report.errors.push(file_error(
568 Some(embedding.path.clone()),
569 format!("embed preflight token count failed: {err}"),
570 ));
571 failed_docs.insert(embedding.doc_key);
572 failed_chunk_ids.push(embedding.chunk_id);
573 }
574 }
575 }
576
577 Ok(accepted)
578 }
579
580 fn preflight_prepared_embeddings(
581 &self,
582 prepared: Vec<PreparedChunkEmbedding>,
583 report: &mut UpdateReport,
584 failed_docs: &mut HashSet<UpdateDocKey>,
585 ) -> Result<PreparedEmbeddingPreflight> {
586 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
587 return Ok(PreparedEmbeddingPreflight {
588 accepted: prepared,
589 rejected_chunk_indexes: Vec::new(),
590 });
591 };
592
593 let mut preflight = PreparedEmbeddingPreflight {
594 accepted: Vec::with_capacity(prepared.len()),
595 rejected_chunk_indexes: Vec::new(),
596 };
597 for embedding in prepared {
598 if sizer.fits_within_token_limit_by_byte_len(
599 embedding.text.len(),
600 embedding.max_document_tokens,
601 ) {
602 crate::profile::increment_update_count("tokenize_preflight_byte_skips", 1);
603 preflight.accepted.push(embedding);
604 continue;
605 }
606
607 match count_document_tokens_profiled(
608 sizer.as_ref(),
609 &embedding.text,
610 "tokenize_preflight",
611 "tokenize_preflight_calls",
612 ) {
613 Ok(token_count) if token_count <= embedding.max_document_tokens => {
614 preflight.accepted.push(embedding);
615 }
616 Ok(token_count) => {
617 report.errors.push(file_error(
618 Some(embedding.path.clone()),
619 format!(
620 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
621 embedding.max_document_tokens
622 ),
623 ));
624 failed_docs.insert(embedding.doc_key);
625 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
626 }
627 Err(err) => {
628 report.errors.push(file_error(
629 Some(embedding.path.clone()),
630 format!("embed preflight token count failed: {err}"),
631 ));
632 failed_docs.insert(embedding.doc_key);
633 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
634 }
635 }
636 }
637
638 Ok(preflight)
639 }
640
641 fn replay_fts_dirty_documents(
642 &self,
643 repair_scope: &UpdateRepairScope,
644 report: &mut UpdateReport,
645 failed_docs: &mut HashSet<UpdateDocKey>,
646 ) -> Result<()> {
647 let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
648 if records.is_empty() {
649 return Ok(());
650 }
651
652 let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
653 for record in records {
654 let space_name = record.space_name;
655 let doc_id = record.doc_id;
656
657 if record.chunks.is_empty() {
658 self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
659 cleared_by_space.entry(space_name).or_default().push(doc_id);
660 continue;
661 }
662
663 let document_text = match self.storage.get_document_text(doc_id) {
664 Ok(row) => row,
665 Err(err) => {
666 failed_docs.insert(update_doc_key(
667 &space_name,
668 &record.collection_path,
669 &record.doc_path,
670 ));
671 report.errors.push(file_error(
672 Some(record.collection_path.join(&record.doc_path)),
673 format!("fts replay canonical text load failed: {err}"),
674 ));
675 continue;
676 }
677 };
678 let policy = resolve_policy(
679 &self.config.chunking,
680 Some(document_text.extractor_key.as_str()),
681 None,
682 );
683
684 let entries = record
685 .chunks
686 .iter()
687 .map(|chunk| -> Result<TantivyEntry> {
688 let chunk_text = crate::storage::chunk_text_from_canonical(
689 document_text.text.as_str(),
690 chunk,
691 )?;
692 let chunk_body = crate::ingest::chunk::chunk_retrieval_body(
693 chunk_text.as_str(),
694 chunk.retrieval_prefix.as_deref(),
695 );
696 Ok(TantivyEntry {
697 chunk_id: chunk.id,
698 doc_id,
699 filepath: record.doc_path.clone(),
700 semantic_title: record
701 .doc_title_source
702 .semantic_title(record.doc_title.as_str())
703 .map(ToString::to_string),
704 heading: chunk.heading.clone(),
705 body: retrieval_text_with_prefix(
706 chunk_body.as_str(),
707 record
708 .doc_title_source
709 .semantic_title(record.doc_title.as_str()),
710 chunk.heading.as_deref(),
711 policy.contextual_prefix,
712 ),
713 })
714 })
715 .collect::<Result<Vec<_>>>()?;
716
717 self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
718 self.storage.index_tantivy(&space_name, &entries)?;
719 cleared_by_space.entry(space_name).or_default().push(doc_id);
720 }
721
722 for (space_name, mut doc_ids) in cleared_by_space {
723 if doc_ids.is_empty() {
724 continue;
725 }
726
727 doc_ids.sort_unstable();
728 doc_ids.dedup();
729 self.storage.commit_tantivy(&space_name)?;
730 self.storage.batch_clear_fts_dirty(&doc_ids)?;
731 }
732
733 Ok(())
734 }
735
736 fn get_fts_dirty_documents_for_scope(
737 &self,
738 repair_scope: &UpdateRepairScope,
739 ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
740 match repair_scope {
741 UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
742 UpdateRepairScope::Space { space_id } => {
743 self.storage.get_fts_dirty_documents_in_space(*space_id)
744 }
745 UpdateRepairScope::Collections { collection_ids } => self
746 .storage
747 .get_fts_dirty_documents_in_collections(collection_ids),
748 }
749 }
750
751 fn get_unembedded_chunks_for_scope(
752 &self,
753 model: &str,
754 repair_scope: &UpdateRepairScope,
755 after_chunk_id: i64,
756 limit: usize,
757 ) -> Result<Vec<crate::storage::EmbedRecord>> {
758 match repair_scope {
759 UpdateRepairScope::Global => {
760 self.storage
761 .get_unembedded_chunks(model, after_chunk_id, limit)
762 }
763 UpdateRepairScope::Space { space_id } => {
764 self.storage
765 .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
766 }
767 UpdateRepairScope::Collections { collection_ids } => self
768 .storage
769 .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
770 }
771 }
772
773 fn list_reapable_documents_for_scope(
774 &self,
775 older_than_days: u32,
776 repair_scope: &UpdateRepairScope,
777 ) -> Result<Vec<crate::storage::ReapableDocument>> {
778 match repair_scope {
779 UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
780 UpdateRepairScope::Space { space_id } => self
781 .storage
782 .list_reapable_documents_in_space(older_than_days, *space_id),
783 UpdateRepairScope::Collections { collection_ids } => self
784 .storage
785 .list_reapable_documents_in_collections(older_than_days, collection_ids),
786 }
787 }
788
789 pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
790 self.resolve_targets(TargetScope {
791 space: options.space.as_deref(),
792 collections: &options.collections,
793 })
794 }
795
796 pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
797 let mut targets = Vec::new();
798
799 if scope.collections.is_empty() {
800 return self.resolve_update_targets_for_all_collections(scope.space);
801 }
802
803 let mut seen = std::collections::HashSet::new();
804 for raw_collection_name in scope.collections {
805 let collection_name = raw_collection_name.trim();
806 if collection_name.is_empty() {
807 return Err(KboltError::InvalidInput(
808 "collection names cannot be empty".to_string(),
809 )
810 .into());
811 }
812
813 let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
814 let collection = self
815 .storage
816 .get_collection(resolved_space.id, collection_name)?;
817
818 if seen.insert((collection.space_id, collection.name.clone())) {
819 targets.push(UpdateTarget {
820 space: resolved_space.name,
821 collection,
822 });
823 }
824 }
825
826 Ok(targets)
827 }
828
829 fn resolve_update_targets_for_all_collections(
830 &self,
831 space: Option<&str>,
832 ) -> Result<Vec<UpdateTarget>> {
833 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
834 let resolved = self.resolve_space_row(Some(space_name), None)?;
835 let mut map = std::collections::HashMap::new();
836 map.insert(resolved.id, resolved.name.clone());
837 (Some(resolved.id), map)
838 } else {
839 let spaces = self.storage.list_spaces()?;
840 let map = spaces
841 .into_iter()
842 .map(|space| (space.id, space.name))
843 .collect::<std::collections::HashMap<_, _>>();
844 (None, map)
845 };
846
847 let collections = self.storage.list_collections(space_id_filter)?;
848 let mut targets = Vec::with_capacity(collections.len());
849 for collection in collections {
850 let space_name = spaces_by_id
851 .get(&collection.space_id)
852 .ok_or_else(|| {
853 KboltError::Internal(format!(
854 "missing space mapping for collection '{}'",
855 collection.name
856 ))
857 })?
858 .clone();
859 targets.push(UpdateTarget {
860 space: space_name,
861 collection,
862 });
863 }
864
865 Ok(targets)
866 }
867
868 fn update_collection_target(
869 &self,
870 target: &UpdateTarget,
871 options: &UpdateOptions,
872 report: &mut UpdateReport,
873 fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
874 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
875 failed_chunk_ids: &mut HashSet<i64>,
876 failed_docs: &mut HashSet<UpdateDocKey>,
877 ) -> Result<()> {
878 let all_documents = crate::profile::timed_update_stage("sqlite_list_documents", || {
879 self.storage.list_documents(target.collection.id, false)
880 })?;
881 let document_ids = all_documents.iter().map(|doc| doc.id).collect::<Vec<_>>();
882 let document_text_generations =
883 crate::profile::timed_update_stage("sqlite_list_document_text_generations", || {
884 self.storage
885 .get_document_text_generation_keys(&document_ids)
886 })?;
887 let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
888 .into_iter()
889 .map(|doc| (doc.path.clone(), doc))
890 .collect();
891 let mut seen_paths = HashSet::new();
892 let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
893 let ignore_matcher = load_collection_ignore_matcher(
894 &self.config.config_dir,
895 &target.collection.path,
896 &target.space,
897 &target.collection.name,
898 )?;
899 let extractor_registry = default_registry();
900 let embedding_batch_window = self.document_embedding_batch_window();
901 let mut touched_collection = false;
902 let mut failed_walk_prefixes = Vec::new();
903 let mut collection_walk_incomplete = false;
904
905 for entry in super::ignore_helpers::build_collection_walk(&target.collection.path) {
906 let entry = match entry {
907 Ok(item) => item,
908 Err(err) => {
909 let error_scope = collect_walk_error_scope(&err);
910 let error_path = error_scope.paths.first().cloned();
911 if error_scope.collection_incomplete || error_scope.paths.is_empty() {
912 collection_walk_incomplete = true;
913 }
914 for path in &error_scope.paths {
915 let failed_path =
916 match collection_relative_path(&target.collection.path, path) {
917 Ok(relative) => {
918 if relative.is_empty() || relative == "." {
919 collection_walk_incomplete = true;
920 } else {
921 failed_walk_prefixes.push(relative.clone());
922 }
923 relative
924 }
925 Err(_) => {
926 collection_walk_incomplete = true;
927 path.display().to_string()
928 }
929 };
930 failed_docs.insert(update_doc_key(
931 &target.space,
932 &target.collection.path,
933 &failed_path,
934 ));
935 }
936 report
937 .errors
938 .push(file_error(error_path, format!("walk error: {err}")));
939 continue;
940 }
941 };
942
943 if !entry
944 .file_type()
945 .is_some_and(|file_type| file_type.is_file())
946 {
947 continue;
948 }
949
950 if is_hard_ignored_file(entry.path()) {
951 continue;
952 }
953
954 let relative_path =
955 match collection_relative_path(&target.collection.path, entry.path()) {
956 Ok(path) => path,
957 Err(err) => {
958 failed_docs.insert(update_doc_key(
959 &target.space,
960 &target.collection.path,
961 &entry.path().display().to_string(),
962 ));
963 report.errors.push(file_error(
964 Some(entry.path().to_path_buf()),
965 err.to_string(),
966 ));
967 continue;
968 }
969 };
970
971 if !extension_allowed(entry.path(), extension_filter.as_ref()) {
972 push_update_decision(
973 report,
974 options,
975 target,
976 &relative_path,
977 UpdateDecisionKind::Unsupported,
978 Some("extension not allowed".to_string()),
979 );
980 continue;
981 }
982
983 if let Some(matcher) = ignore_matcher.as_ref() {
984 if matcher
985 .matched(Path::new(&relative_path), false)
986 .is_ignore()
987 {
988 push_update_decision(
989 report,
990 options,
991 target,
992 &relative_path,
993 UpdateDecisionKind::Ignored,
994 Some("matched ignore patterns".to_string()),
995 );
996 continue;
997 }
998 }
999
1000 let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
1001 push_update_decision(
1002 report,
1003 options,
1004 target,
1005 &relative_path,
1006 UpdateDecisionKind::Unsupported,
1007 Some("no extractor available".to_string()),
1008 );
1009 continue;
1010 };
1011 let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
1012 let generation_key =
1013 ingestion_generation_key(extractor.profile_key(), extractor.version(), &policy);
1014
1015 report.scanned_docs += 1;
1016 crate::profile::increment_update_count("docs_scanned", 1);
1017 seen_paths.insert(relative_path.clone());
1018
1019 let scan_started = Instant::now();
1020 let metadata = match entry.metadata() {
1021 Ok(data) => data,
1022 Err(err) => {
1023 let detail = format!("metadata error: {err}");
1024 failed_docs.insert(update_doc_key(
1025 &target.space,
1026 &target.collection.path,
1027 &relative_path,
1028 ));
1029 push_update_decision(
1030 report,
1031 options,
1032 target,
1033 &relative_path,
1034 UpdateDecisionKind::ReadFailed,
1035 Some(detail.clone()),
1036 );
1037 report
1038 .errors
1039 .push(file_error(Some(entry.path().to_path_buf()), detail));
1040 crate::profile::record_update_stage(
1041 "scan_metadata_mtime",
1042 scan_started.elapsed(),
1043 );
1044 continue;
1045 }
1046 };
1047
1048 let modified = match modified_token(&metadata) {
1049 Ok(value) => value,
1050 Err(err) => {
1051 let detail = format!("modified timestamp error: {err}");
1052 failed_docs.insert(update_doc_key(
1053 &target.space,
1054 &target.collection.path,
1055 &relative_path,
1056 ));
1057 push_update_decision(
1058 report,
1059 options,
1060 target,
1061 &relative_path,
1062 UpdateDecisionKind::ReadFailed,
1063 Some(detail.clone()),
1064 );
1065 report
1066 .errors
1067 .push(file_error(Some(entry.path().to_path_buf()), detail));
1068 crate::profile::record_update_stage(
1069 "scan_metadata_mtime",
1070 scan_started.elapsed(),
1071 );
1072 continue;
1073 }
1074 };
1075
1076 if let Some(existing) = docs_by_path.get(&relative_path) {
1077 let has_current_canonical_text = document_has_current_canonical_text(
1078 &document_text_generations,
1079 existing.id,
1080 generation_key.as_str(),
1081 );
1082 if existing.active && existing.modified == modified && has_current_canonical_text {
1083 report.skipped_mtime_docs += 1;
1084 push_update_decision(
1085 report,
1086 options,
1087 target,
1088 &relative_path,
1089 UpdateDecisionKind::SkippedMtime,
1090 None,
1091 );
1092 crate::profile::record_update_stage(
1093 "scan_metadata_mtime",
1094 scan_started.elapsed(),
1095 );
1096 continue;
1097 }
1098 }
1099 crate::profile::record_update_stage("scan_metadata_mtime", scan_started.elapsed());
1100
1101 let read_hash_started = Instant::now();
1102 let bytes = match std::fs::read(entry.path()) {
1103 Ok(data) => data,
1104 Err(err) => {
1105 let detail = err.to_string();
1106 failed_docs.insert(update_doc_key(
1107 &target.space,
1108 &target.collection.path,
1109 &relative_path,
1110 ));
1111 push_update_decision(
1112 report,
1113 options,
1114 target,
1115 &relative_path,
1116 UpdateDecisionKind::ReadFailed,
1117 Some(detail.clone()),
1118 );
1119 report
1120 .errors
1121 .push(file_error(Some(entry.path().to_path_buf()), detail));
1122 crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1123 continue;
1124 }
1125 };
1126 let hash = sha256_hex(&bytes);
1127 crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1128 let mut title = file_title(entry.path());
1129 let mut title_source = DocumentTitleSource::FilenameFallback;
1130
1131 let existing = docs_by_path.get(&relative_path).cloned();
1132 let pending_decision;
1133 let pending_indexing;
1134 if let Some(doc) = existing.as_ref() {
1135 let has_current_canonical_text = document_has_current_canonical_text(
1136 &document_text_generations,
1137 doc.id,
1138 generation_key.as_str(),
1139 );
1140 if doc.hash == hash && has_current_canonical_text {
1141 if doc.active {
1142 report.skipped_hash_docs += 1;
1143 push_update_decision(
1144 report,
1145 options,
1146 target,
1147 &relative_path,
1148 UpdateDecisionKind::SkippedHash,
1149 None,
1150 );
1151 } else {
1152 report.reactivated_docs += 1;
1153 push_update_decision(
1154 report,
1155 options,
1156 target,
1157 &relative_path,
1158 UpdateDecisionKind::Reactivated,
1159 None,
1160 );
1161 }
1162
1163 if !options.dry_run {
1164 crate::profile::timed_update_stage("sqlite_refresh_document", || {
1165 self.storage.refresh_document_activity(doc.id, &modified)
1166 })?;
1167 }
1168 continue;
1169 }
1170
1171 pending_indexing = PendingDocumentIndexing::Updated {
1172 reactivated: !doc.active,
1173 };
1174 pending_decision = (
1175 UpdateDecisionKind::Changed,
1176 (!doc.active).then_some("reactivated".to_string()),
1177 );
1178 } else {
1179 pending_indexing = PendingDocumentIndexing::Added;
1180 pending_decision = (UpdateDecisionKind::New, None);
1181 }
1182
1183 if options.dry_run {
1184 pending_indexing.record(report);
1185 let (kind, detail) = pending_decision;
1186 push_update_decision(report, options, target, &relative_path, kind, detail);
1187 continue;
1188 }
1189
1190 let extracted = match crate::profile::timed_update_stage("extract", || {
1191 extractor.extract(entry.path(), &bytes)
1192 }) {
1193 Ok(document) => document,
1194 Err(err) => {
1195 let detail = format!("extract failed: {err}");
1196 failed_docs.insert(update_doc_key(
1197 &target.space,
1198 &target.collection.path,
1199 &relative_path,
1200 ));
1201 push_update_decision(
1202 report,
1203 options,
1204 target,
1205 &relative_path,
1206 UpdateDecisionKind::ExtractFailed,
1207 Some(detail.clone()),
1208 );
1209 report
1210 .errors
1211 .push(file_error(Some(entry.path().to_path_buf()), detail));
1212 continue;
1213 }
1214 };
1215 if let Some(extracted_title) = extracted
1216 .title
1217 .as_deref()
1218 .map(str::trim)
1219 .filter(|title| !title.is_empty())
1220 {
1221 title = extracted_title.to_string();
1222 title_source = DocumentTitleSource::Extracted;
1223 }
1224
1225 let canonical = crate::profile::timed_update_stage("canonical_text", || {
1226 build_canonical_document(&extracted)
1227 });
1228 let text_hash = sha256_hex(canonical.text.as_bytes());
1229 let max_document_tokens = effective_chunk_hard_max(&policy);
1230 let chunk_started = Instant::now();
1231 let final_chunks_result = match self.embedding_document_sizer.as_ref() {
1232 Some(sizer) => {
1233 let sizer_counter = EmbeddingDocumentSizerCounter {
1234 inner: sizer.as_ref(),
1235 };
1236 chunk_canonical_document_with_counter(
1237 &canonical.document,
1238 &policy,
1239 &sizer_counter,
1240 )
1241 }
1242 None => Ok(chunk_canonical_document(&canonical.document, &policy)),
1243 };
1244 crate::profile::record_update_stage("chunk", chunk_started.elapsed());
1245 let final_chunks = match final_chunks_result {
1246 Ok(chunks) => chunks,
1247 Err(err) => {
1248 let detail = format!("chunking failed: {err}");
1249 failed_docs.insert(update_doc_key(
1250 &target.space,
1251 &target.collection.path,
1252 &relative_path,
1253 ));
1254 push_update_decision(
1255 report,
1256 options,
1257 target,
1258 &relative_path,
1259 UpdateDecisionKind::ExtractFailed,
1260 Some(detail.clone()),
1261 );
1262 report
1263 .errors
1264 .push(file_error(Some(entry.path().to_path_buf()), detail));
1265 continue;
1266 }
1267 };
1268 crate::profile::increment_update_count("chunks_created", final_chunks.len() as u64);
1269
1270 let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1271 let chunk_inserts = final_chunks
1272 .iter()
1273 .enumerate()
1274 .map(|(index, chunk)| ChunkInsert {
1275 seq: index as i32,
1276 offset: chunk.offset,
1277 length: chunk.length,
1278 heading: chunk.heading.clone(),
1279 kind: chunk.kind,
1280 retrieval_prefix: chunk.retrieval_prefix.clone(),
1281 })
1282 .collect::<Vec<_>>();
1283 let mut prepared_embeddings = Vec::new();
1284 if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1285 let prepared = crate::profile::timed_update_stage("embedding_prepare_text", || {
1286 prepare_chunk_embeddings(
1287 &final_chunks,
1288 &doc_key,
1289 target,
1290 entry.path(),
1291 max_document_tokens,
1292 )
1293 });
1294 if existing.is_some() {
1295 let preflight =
1296 self.preflight_prepared_embeddings(prepared, report, failed_docs)?;
1297 prepared_embeddings = preflight.accepted;
1298 if !preflight.rejected_chunk_indexes.is_empty() {
1299 push_update_decision(
1300 report,
1301 options,
1302 target,
1303 &relative_path,
1304 UpdateDecisionKind::ExtractFailed,
1305 Some("embed preflight failed".to_string()),
1306 );
1307 continue;
1308 }
1309 } else {
1310 prepared_embeddings = prepared;
1311 }
1312 }
1313
1314 let replacement =
1315 crate::profile::timed_update_stage("sqlite_replace_document_generation", || {
1316 self.storage
1317 .replace_document_generation(DocumentGenerationReplace {
1318 collection_id: target.collection.id,
1319 path: &relative_path,
1320 title: &title,
1321 title_source,
1322 hash: &hash,
1323 modified: &modified,
1324 extractor_key: extractor.profile_key(),
1325 source_hash: &hash,
1326 text_hash: &text_hash,
1327 generation_key: &generation_key,
1328 text: canonical.text.as_str(),
1329 chunks: &chunk_inserts,
1330 })
1331 })?;
1332 let doc_id = replacement.doc_id;
1333 let chunk_ids = replacement.chunk_ids;
1334
1335 if !replacement.old_chunk_ids.is_empty() {
1336 crate::profile::timed_update_stage("tantivy_delete", || {
1337 self.storage
1338 .delete_tantivy(&target.space, &replacement.old_chunk_ids)
1339 })?;
1340 crate::profile::timed_update_stage("usearch_delete", || {
1341 self.storage
1342 .delete_usearch_deferred(&target.space, &replacement.old_chunk_ids)
1343 })?;
1344 }
1345
1346 fts_dirty_by_space
1347 .entry(target.space.clone())
1348 .or_default()
1349 .insert(doc_id);
1350
1351 if !chunk_ids.is_empty() {
1352 if !options.no_embed && self.embedder.is_some() {
1353 for prepared in prepared_embeddings {
1354 pending_embeddings.push(PendingChunkEmbedding {
1355 chunk_id: chunk_ids[prepared.chunk_index],
1356 doc_key: prepared.doc_key,
1357 space_name: prepared.space_name,
1358 path: prepared.path,
1359 text: prepared.text,
1360 max_document_tokens: prepared.max_document_tokens,
1361 });
1362 }
1363 if pending_embeddings.len() >= embedding_batch_window {
1364 self.flush_buffered_embeddings(
1365 pending_embeddings,
1366 failed_chunk_ids,
1367 report,
1368 failed_docs,
1369 )?;
1370 }
1371 }
1372
1373 let entries = chunk_ids
1374 .iter()
1375 .zip(final_chunks.iter())
1376 .map(|(chunk_id, chunk)| TantivyEntry {
1377 chunk_id: *chunk_id,
1378 doc_id,
1379 filepath: relative_path.clone(),
1380 semantic_title: title_source
1381 .semantic_title(title.as_str())
1382 .map(ToString::to_string),
1383 heading: chunk.heading.clone(),
1384 body: retrieval_text_with_prefix(
1385 chunk.retrieval_text().as_str(),
1386 title_source.semantic_title(title.as_str()),
1387 chunk.heading.as_deref(),
1388 policy.contextual_prefix,
1389 ),
1390 })
1391 .collect::<Vec<_>>();
1392 crate::profile::increment_update_count("tantivy_entries", entries.len() as u64);
1393 crate::profile::timed_update_stage("tantivy_add", || {
1394 self.storage.index_tantivy(&target.space, &entries)
1395 })?;
1396 }
1397
1398 docs_by_path.insert(
1399 relative_path.clone(),
1400 crate::profile::timed_update_stage("sqlite_get_document_after_write", || {
1401 self.storage
1402 .get_document_by_path(target.collection.id, &relative_path)
1403 })?
1404 .ok_or_else(|| {
1405 KboltError::Internal(format!(
1406 "document missing after upsert: collection={}, path={relative_path}",
1407 target.collection.id
1408 ))
1409 })?,
1410 );
1411 pending_indexing.record(report);
1412 let (kind, detail) = pending_decision;
1413 push_update_decision(report, options, target, &relative_path, kind, detail);
1414 touched_collection = true;
1415 }
1416
1417 let mut missing_docs = docs_by_path
1418 .values()
1419 .filter(|doc| {
1420 doc.active
1421 && !seen_paths.contains(&doc.path)
1422 && !path_is_under_failed_walk(
1423 doc.path.as_str(),
1424 &failed_walk_prefixes,
1425 collection_walk_incomplete,
1426 )
1427 })
1428 .cloned()
1429 .collect::<Vec<_>>();
1430 missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1431
1432 for doc in missing_docs {
1433 if doc.active && !seen_paths.contains(&doc.path) {
1434 report.deactivated_docs += 1;
1435 push_update_decision(
1436 report,
1437 options,
1438 target,
1439 &doc.path,
1440 UpdateDecisionKind::Deactivated,
1441 None,
1442 );
1443 if !options.dry_run {
1444 crate::profile::timed_update_stage("sqlite_deactivate_document", || {
1445 self.storage.deactivate_document(doc.id)
1446 })?;
1447 touched_collection = true;
1448 }
1449 }
1450 }
1451
1452 if touched_collection && !options.dry_run {
1453 crate::profile::timed_update_stage("sqlite_update_collection_timestamp", || {
1454 self.storage
1455 .update_collection_timestamp(target.collection.id)
1456 })?;
1457 }
1458
1459 Ok(())
1460 }
1461}
1462
1463#[derive(Debug, Default)]
1464struct WalkErrorScope {
1465 paths: Vec<std::path::PathBuf>,
1466 collection_incomplete: bool,
1467}
1468
1469fn collect_walk_error_scope(err: &ignore::Error) -> WalkErrorScope {
1470 let mut scope = WalkErrorScope::default();
1471 collect_walk_error_scope_into(err, &mut scope, false);
1472 scope
1473}
1474
1475fn collect_walk_error_scope_into(
1476 err: &ignore::Error,
1477 scope: &mut WalkErrorScope,
1478 has_path_context: bool,
1479) {
1480 match err {
1481 ignore::Error::Partial(errors) => {
1482 if errors.is_empty() {
1483 scope.collection_incomplete = true;
1484 }
1485 for err in errors {
1486 collect_walk_error_scope_into(err, scope, has_path_context);
1487 }
1488 }
1489 ignore::Error::WithLineNumber { err, .. } | ignore::Error::WithDepth { err, .. } => {
1490 collect_walk_error_scope_into(err, scope, has_path_context);
1491 }
1492 ignore::Error::WithPath { path, err } => {
1493 scope.paths.push(path.clone());
1494 collect_walk_error_scope_into(err, scope, true);
1495 }
1496 ignore::Error::Loop { child, .. } => {
1497 scope.paths.push(child.clone());
1498 }
1499 ignore::Error::Io(_) => {
1500 if !has_path_context {
1501 scope.collection_incomplete = true;
1502 }
1503 }
1504 ignore::Error::Glob { .. }
1505 | ignore::Error::UnrecognizedFileType(_)
1506 | ignore::Error::InvalidDefinition => {
1507 scope.collection_incomplete = true;
1508 }
1509 }
1510}
1511
1512fn path_is_under_failed_walk(
1513 doc_path: &str,
1514 failed_walk_prefixes: &[String],
1515 collection_walk_incomplete: bool,
1516) -> bool {
1517 if collection_walk_incomplete {
1518 return true;
1519 }
1520
1521 failed_walk_prefixes.iter().any(|prefix| {
1522 prefix.is_empty()
1523 || prefix == "."
1524 || doc_path == prefix
1525 || doc_path
1526 .strip_prefix(prefix.as_str())
1527 .is_some_and(|suffix| suffix.starts_with('/'))
1528 })
1529}
1530
1531#[cfg(test)]
1532mod tests {
1533 use super::*;
1534 use std::io;
1535 use std::path::PathBuf;
1536
1537 #[test]
1538 fn walk_error_scope_collects_every_partial_path() {
1539 let err = ignore::Error::Partial(vec![
1540 ignore::Error::WithPath {
1541 path: PathBuf::from("/collection/blocked-a"),
1542 err: Box::new(ignore::Error::Io(io::Error::new(
1543 io::ErrorKind::PermissionDenied,
1544 "blocked",
1545 ))),
1546 },
1547 ignore::Error::WithDepth {
1548 depth: 2,
1549 err: Box::new(ignore::Error::Loop {
1550 ancestor: PathBuf::from("/collection"),
1551 child: PathBuf::from("/collection/blocked-b"),
1552 }),
1553 },
1554 ]);
1555
1556 let scope = collect_walk_error_scope(&err);
1557
1558 assert_eq!(
1559 scope.paths,
1560 vec![
1561 PathBuf::from("/collection/blocked-a"),
1562 PathBuf::from("/collection/blocked-b")
1563 ]
1564 );
1565 assert!(!scope.collection_incomplete);
1566 }
1567
1568 #[test]
1569 fn walk_error_scope_marks_pathless_errors_incomplete() {
1570 let err = ignore::Error::Partial(vec![
1571 ignore::Error::WithPath {
1572 path: PathBuf::from("/collection/blocked"),
1573 err: Box::new(ignore::Error::Io(io::Error::new(
1574 io::ErrorKind::PermissionDenied,
1575 "blocked",
1576 ))),
1577 },
1578 ignore::Error::Io(io::Error::new(io::ErrorKind::Other, "unknown")),
1579 ]);
1580
1581 let scope = collect_walk_error_scope(&err);
1582
1583 assert_eq!(scope.paths, vec![PathBuf::from("/collection/blocked")]);
1584 assert!(scope.collection_incomplete);
1585 }
1586
1587 #[test]
1588 fn walk_error_scope_treats_ignore_rule_errors_as_incomplete() {
1589 let err = ignore::Error::WithPath {
1590 path: PathBuf::from("/collection/.gitignore"),
1591 err: Box::new(ignore::Error::WithLineNumber {
1592 line: 4,
1593 err: Box::new(ignore::Error::Glob {
1594 glob: Some("[".to_string()),
1595 err: "invalid glob".to_string(),
1596 }),
1597 }),
1598 };
1599
1600 let scope = collect_walk_error_scope(&err);
1601
1602 assert_eq!(scope.paths, vec![PathBuf::from("/collection/.gitignore")]);
1603 assert!(scope.collection_incomplete);
1604 }
1605}
1606
1607#[derive(Debug)]
1608struct PreparedChunkEmbedding {
1609 chunk_index: usize,
1610 doc_key: UpdateDocKey,
1611 space_name: String,
1612 path: std::path::PathBuf,
1613 text: String,
1614 max_document_tokens: usize,
1615}
1616
1617#[derive(Debug, Default)]
1618struct PreparedEmbeddingPreflight {
1619 accepted: Vec<PreparedChunkEmbedding>,
1620 rejected_chunk_indexes: Vec<usize>,
1621}
1622
1623#[derive(Debug)]
1624struct PendingChunkEmbedding {
1625 chunk_id: i64,
1626 doc_key: UpdateDocKey,
1627 space_name: String,
1628 path: std::path::PathBuf,
1629 text: String,
1630 max_document_tokens: usize,
1631}
1632
1633#[derive(Default)]
1634struct EmbeddedPendingBatch {
1635 embeddings: Vec<(i64, String, Vec<f32>)>,
1636 failed_chunk_ids: Vec<i64>,
1637}
1638
1639#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1640struct UpdateDocKey {
1641 space: String,
1642 collection_path: std::path::PathBuf,
1643 path: String,
1644}
1645
1646struct EmbeddingDocumentSizerCounter<'a> {
1647 inner: &'a dyn crate::models::EmbeddingDocumentSizer,
1648}
1649
1650impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1651 fn count(&self, text: &str) -> Result<usize> {
1652 count_document_tokens_profiled(
1653 self.inner,
1654 text,
1655 "tokenize_chunking",
1656 "tokenize_chunking_calls",
1657 )
1658 }
1659
1660 fn fits_within_token_limit_by_byte_len(&self, byte_len: usize, max_tokens: usize) -> bool {
1661 let fits = self
1662 .inner
1663 .fits_within_token_limit_by_byte_len(byte_len, max_tokens);
1664 if fits {
1665 crate::profile::increment_update_count("tokenize_chunking_byte_skips", 1);
1666 }
1667 fits
1668 }
1669}
1670
1671fn count_document_tokens_profiled(
1672 sizer: &dyn crate::models::EmbeddingDocumentSizer,
1673 text: &str,
1674 stage: &'static str,
1675 count: &'static str,
1676) -> Result<usize> {
1677 let started = Instant::now();
1678 let result = sizer.count_document_tokens(text);
1679 crate::profile::record_update_stage(stage, started.elapsed());
1680 crate::profile::increment_update_count(count, 1);
1681 result
1682}
1683
1684fn prepare_chunk_embeddings(
1685 final_chunks: &[crate::ingest::chunk::FinalChunk],
1686 doc_key: &UpdateDocKey,
1687 target: &UpdateTarget,
1688 path: &Path,
1689 max_document_tokens: usize,
1690) -> Vec<PreparedChunkEmbedding> {
1691 final_chunks
1692 .iter()
1693 .enumerate()
1694 .map(|(chunk_index, chunk)| {
1695 let mut text = chunk.retrieval_text();
1696 if text.trim().is_empty() {
1697 text = " ".to_string();
1698 }
1699 PreparedChunkEmbedding {
1700 chunk_index,
1701 doc_key: doc_key.clone(),
1702 space_name: target.space.clone(),
1703 path: path.to_path_buf(),
1704 text,
1705 max_document_tokens,
1706 }
1707 })
1708 .collect()
1709}
1710
1711#[derive(Debug, Clone)]
1712enum UpdateRepairScope {
1713 Global,
1714 Space { space_id: i64 },
1715 Collections { collection_ids: Vec<i64> },
1716}
1717
1718impl UpdateRepairScope {
1719 fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1720 if options.collections.is_empty() {
1721 if let Some(target) = targets.first() {
1722 if options.space.is_some() {
1723 return Self::Space {
1724 space_id: target.collection.space_id,
1725 };
1726 }
1727 }
1728 return Self::Global;
1729 }
1730
1731 let mut collection_ids = targets
1732 .iter()
1733 .map(|target| target.collection.id)
1734 .collect::<Vec<_>>();
1735 collection_ids.sort_unstable();
1736 collection_ids.dedup();
1737 Self::Collections { collection_ids }
1738 }
1739
1740 fn allows_space_dense_repair(&self) -> bool {
1741 !matches!(self, Self::Collections { .. })
1742 }
1743}
1744
1745#[derive(Debug, Clone, Copy)]
1746enum PendingDocumentIndexing {
1747 Added,
1748 Updated { reactivated: bool },
1749}
1750
1751impl PendingDocumentIndexing {
1752 fn record(self, report: &mut UpdateReport) {
1753 match self {
1754 Self::Added => {
1755 report.added_docs += 1;
1756 }
1757 Self::Updated { reactivated } => {
1758 report.updated_docs += 1;
1759 if reactivated {
1760 report.reactivated_docs += 1;
1761 }
1762 }
1763 }
1764 }
1765}
1766
1767fn document_has_current_canonical_text(
1768 document_text_generations: &HashMap<i64, String>,
1769 doc_id: i64,
1770 generation_key: &str,
1771) -> bool {
1772 document_text_generations
1773 .get(&doc_id)
1774 .is_some_and(|stored| stored == generation_key)
1775}
1776
1777fn finish_update_with_usearch_flush(
1778 update_result: Result<UpdateReport>,
1779 flush_result: Result<()>,
1780) -> Result<UpdateReport> {
1781 match (update_result, flush_result) {
1782 (Ok(report), Ok(())) => Ok(report),
1783 (Err(err), Ok(())) => Err(err),
1784 (Ok(_), Err(err)) => Err(err),
1785 (Err(update_err), Err(flush_err)) => Err(KboltError::Internal(format!(
1786 "update failed: {update_err}; additionally failed to save dirty vector indexes: {flush_err}"
1787 ))
1788 .into()),
1789 }
1790}
1791
1792fn invalid_pending_embedding_batch_detail(
1793 pending: &[PendingChunkEmbedding],
1794 vectors: &[Vec<f32>],
1795) -> Option<String> {
1796 if vectors.len() != pending.len() {
1797 return Some(format!(
1798 "embedder returned {} vectors for {} chunks",
1799 vectors.len(),
1800 pending.len()
1801 ));
1802 }
1803
1804 if vectors.iter().any(|vector| vector.is_empty()) {
1805 if pending.len() == 1 {
1806 return Some(format!(
1807 "embedder returned an empty vector for chunk {}",
1808 pending[0].chunk_id
1809 ));
1810 }
1811 return Some("embedder returned an empty vector".to_string());
1812 }
1813
1814 None
1815}
1816
1817fn push_update_decision(
1818 report: &mut UpdateReport,
1819 options: &UpdateOptions,
1820 target: &UpdateTarget,
1821 path: &str,
1822 kind: UpdateDecisionKind,
1823 detail: Option<String>,
1824) {
1825 if !options.verbose {
1826 return;
1827 }
1828
1829 report.decisions.push(UpdateDecision {
1830 space: target.space.clone(),
1831 collection: target.collection.name.clone(),
1832 path: path.to_string(),
1833 kind,
1834 detail,
1835 });
1836}
1837
1838fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1839 UpdateDocKey {
1840 space: space.to_string(),
1841 collection_path: collection_path.to_path_buf(),
1842 path: path.to_string(),
1843 }
1844}
1845
1846fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1847 policy
1848 .hard_max_tokens
1849 .max(policy.soft_max_tokens)
1850 .max(policy.target_tokens)
1851 .max(1)
1852}
1853
1854fn ingestion_generation_key(
1855 extractor_key: &str,
1856 extractor_version: u32,
1857 policy: &crate::config::ChunkPolicy,
1858) -> String {
1859 format!(
1860 "canonical=v{CANONICAL_TEXT_GENERATION};chunker=v{CHUNKER_GENERATION};extractor={extractor_key}:v{extractor_version};chunk=target:{}:soft:{}:hard:{}:overlap:{}:prefix:{}",
1861 policy.target_tokens,
1862 policy.soft_max_tokens,
1863 policy.hard_max_tokens,
1864 policy.boundary_overlap_tokens,
1865 policy.contextual_prefix
1866 )
1867}