1use super::*;
2use kbolt_types::{UpdateDecision, UpdateDecisionKind};
3
4const CANONICAL_TEXT_GENERATION: u32 = 1;
5const CHUNKER_GENERATION: u32 = 2;
6
7impl Engine {
8 pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
9 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
10 self.update_unlocked(options)
11 }
12
13 pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
14 let _profile = crate::profile::UpdateProfileGuard::start();
15 let update_result = self.run_update_unlocked(options);
16 let flush_result = crate::profile::timed_update_stage("usearch_save_dirty", || {
17 self.storage.save_dirty_usearch_indexes()
18 });
19 finish_update_with_usearch_flush(update_result, flush_result)
20 }
21
22 fn run_update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
23 let started = Instant::now();
24 let mut report = UpdateReport {
25 scanned_docs: 0,
26 skipped_mtime_docs: 0,
27 skipped_hash_docs: 0,
28 added_docs: 0,
29 updated_docs: 0,
30 failed_docs: 0,
31 deactivated_docs: 0,
32 reactivated_docs: 0,
33 reaped_docs: 0,
34 embedded_chunks: 0,
35 decisions: Vec::new(),
36 errors: Vec::new(),
37 elapsed_ms: 0,
38 };
39 let mut failed_docs = HashSet::new();
40
41 let targets = crate::profile::timed_update_stage("resolve_targets", || {
42 self.resolve_targets(TargetScope {
43 space: options.space.as_deref(),
44 collections: &options.collections,
45 })
46 })?;
47 if targets.is_empty() {
48 report.elapsed_ms = started.elapsed().as_millis() as u64;
49 return Ok(report);
50 }
51 let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
52
53 crate::profile::timed_update_stage("dense_integrity", || {
54 self.reconcile_dense_integrity(&targets, &repair_scope, &options)
55 })?;
56
57 if !options.dry_run {
58 crate::profile::timed_update_stage("fts_dirty_replay", || {
59 self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)
60 })?;
61 }
62
63 let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
64 let mut pending_embeddings = Vec::new();
65 let mut failed_embedding_chunk_ids = HashSet::new();
66 for target in &targets {
67 self.update_collection_target(
68 target,
69 &options,
70 &mut report,
71 &mut fts_dirty_by_space,
72 &mut pending_embeddings,
73 &mut failed_embedding_chunk_ids,
74 &mut failed_docs,
75 )?;
76 }
77
78 if !options.dry_run {
79 crate::profile::timed_update_stage("embedding_flush_buffered", || {
80 self.flush_buffered_embeddings(
81 &mut pending_embeddings,
82 &mut failed_embedding_chunk_ids,
83 &mut report,
84 &mut failed_docs,
85 )
86 })?;
87
88 for (space, doc_ids) in fts_dirty_by_space {
89 if doc_ids.is_empty() {
90 continue;
91 }
92
93 crate::profile::timed_update_stage("tantivy_commit", || {
94 self.storage.commit_tantivy(&space)
95 })?;
96 let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
97 ids.sort_unstable();
98 crate::profile::timed_update_stage("sqlite_clear_fts_dirty", || {
99 self.storage.batch_clear_fts_dirty(&ids)
100 })?;
101 }
102
103 crate::profile::timed_update_stage("embedding_backlog", || {
104 self.embed_pending_chunks(
105 &repair_scope,
106 &options,
107 &mut failed_embedding_chunk_ids,
108 &mut report,
109 &mut failed_docs,
110 )
111 })?;
112
113 let reaped = crate::profile::timed_update_stage("sqlite_list_reapable", || {
114 self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)
115 })?;
116 let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
117 let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
118 for document in reaped {
119 reaped_doc_ids.push(document.doc_id);
120 if !document.chunk_ids.is_empty() {
121 chunk_ids_by_space
122 .entry(document.space_name)
123 .or_default()
124 .extend(document.chunk_ids);
125 }
126 }
127 for (space, chunk_ids) in chunk_ids_by_space {
128 crate::profile::timed_update_stage("purge_reaped_indexes", || {
129 self.purge_space_chunks(&space, &chunk_ids)
130 })?;
131 }
132 crate::profile::timed_update_stage("sqlite_delete_reaped_documents", || {
133 self.storage.delete_documents(&reaped_doc_ids)
134 })?;
135 report.reaped_docs = reaped_doc_ids.len();
136 }
137
138 report.failed_docs = failed_docs.len();
139 report.elapsed_ms = started.elapsed().as_millis() as u64;
140 Ok(report)
141 }
142
143 fn reconcile_dense_integrity(
144 &self,
145 targets: &[UpdateTarget],
146 repair_scope: &UpdateRepairScope,
147 options: &UpdateOptions,
148 ) -> Result<()> {
149 if options.no_embed || options.dry_run {
150 return Ok(());
151 }
152
153 let expected_model = self.embedding_model_key();
154 let mut visited_spaces = HashSet::new();
155 for target in targets {
156 if !visited_spaces.insert(target.collection.space_id) {
157 continue;
158 }
159
160 let models = self
161 .storage
162 .list_embedding_models_in_space(target.collection.space_id)?;
163 let mut reasons = Vec::new();
164 if models.iter().any(|model| model != expected_model) {
165 reasons.push(format!(
166 "stored embedding models {:?} do not match current model '{}'",
167 models, expected_model
168 ));
169 }
170
171 let sqlite_count = self
172 .storage
173 .count_embedded_chunks(Some(target.collection.space_id))?;
174 let usearch_count = self.storage.count_usearch(&target.space)?;
175 if sqlite_count != usearch_count {
176 reasons.push(format!(
177 "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
178 ));
179 }
180
181 if reasons.is_empty() {
182 continue;
183 }
184
185 if !repair_scope.allows_space_dense_repair() {
186 return Err(KboltError::SpaceDenseRepairRequired {
187 space: target.space.clone(),
188 reason: reasons.join("; "),
189 }
190 .into());
191 }
192
193 self.storage
194 .delete_embeddings_for_space(target.collection.space_id)?;
195 self.storage.clear_usearch(&target.space)?;
196 }
197
198 Ok(())
199 }
200
201 fn embed_pending_chunks(
202 &self,
203 repair_scope: &UpdateRepairScope,
204 options: &UpdateOptions,
205 failed_chunk_ids: &mut HashSet<i64>,
206 report: &mut UpdateReport,
207 failed_docs: &mut HashSet<UpdateDocKey>,
208 ) -> Result<()> {
209 if options.no_embed || options.dry_run {
210 return Ok(());
211 }
212
213 let Some(embedder) = self.embedder.as_ref() else {
214 return Ok(());
215 };
216
217 let model = self.embedding_model_key();
218 let mut after_chunk_id = 0_i64;
219 loop {
220 let backlog =
221 crate::profile::timed_update_stage("sqlite_get_unembedded_chunks", || {
222 self.get_unembedded_chunks_for_scope(model, repair_scope, after_chunk_id, 64)
223 })?;
224 if backlog.is_empty() {
225 break;
226 }
227
228 after_chunk_id = backlog
229 .last()
230 .map(|record| record.chunk.id)
231 .expect("non-empty backlog should have a last chunk id");
232
233 let backlog_doc_ids = backlog
234 .iter()
235 .filter(|record| !failed_chunk_ids.contains(&record.chunk.id))
236 .map(|record| record.chunk.doc_id)
237 .collect::<Vec<_>>();
238 let document_text_cache =
239 crate::profile::timed_update_stage("embedding_backlog_read", || {
240 self.storage.get_existing_document_texts(&backlog_doc_ids)
241 })?;
242
243 let mut pending = Vec::new();
244 for record in backlog {
245 if failed_chunk_ids.contains(&record.chunk.id) {
246 continue;
247 }
248
249 let full_path = record.collection_path.join(&record.doc_path);
250 let Some(document_text) = document_text_cache.get(&record.chunk.doc_id) else {
251 report.errors.push(file_error(
252 Some(full_path.clone()),
253 format!(
254 "embed canonical text load failed: {}",
255 crate::storage::missing_document_text_error(record.chunk.doc_id)
256 ),
257 ));
258 failed_docs.insert(update_doc_key(
259 &record.space_name,
260 &record.collection_path,
261 &record.doc_path,
262 ));
263 continue;
264 };
265 let chunk_canonical_text = match crate::storage::chunk_text_from_canonical(
266 document_text.text.as_str(),
267 &record.chunk,
268 ) {
269 Ok(text) => text,
270 Err(err) => {
271 report.errors.push(file_error(
272 Some(full_path.clone()),
273 format!("embed canonical text load failed: {err}"),
274 ));
275 failed_docs.insert(update_doc_key(
276 &record.space_name,
277 &record.collection_path,
278 &record.doc_path,
279 ));
280 continue;
281 }
282 };
283 let mut text = crate::ingest::chunk::chunk_retrieval_body(
284 chunk_canonical_text.as_str(),
285 record.chunk.retrieval_prefix.as_deref(),
286 );
287 if text.trim().is_empty() {
288 text = " ".to_string();
289 }
290 let policy = resolve_policy(
291 &self.config.chunking,
292 Some(document_text.extractor_key.as_str()),
293 None,
294 );
295 let max_document_tokens = effective_chunk_hard_max(&policy);
296
297 pending.push(PendingChunkEmbedding {
298 chunk_id: record.chunk.id,
299 doc_key: update_doc_key(
300 &record.space_name,
301 &record.collection_path,
302 &record.doc_path,
303 ),
304 space_name: record.space_name,
305 path: full_path,
306 text,
307 max_document_tokens,
308 });
309 }
310
311 if pending.is_empty() {
312 continue;
313 }
314
315 let mut preflight_failed_chunk_ids = Vec::new();
316 let pending = self.preflight_pending_embeddings(
317 pending,
318 report,
319 failed_docs,
320 &mut preflight_failed_chunk_ids,
321 )?;
322 failed_chunk_ids.extend(preflight_failed_chunk_ids);
323 if pending.is_empty() {
324 continue;
325 }
326
327 let result = self.embed_preflighted_pending_batch_with_partial_failures(
328 embedder.as_ref(),
329 pending,
330 report,
331 failed_docs,
332 )?;
333 failed_chunk_ids.extend(result.failed_chunk_ids);
334 if !result.embeddings.is_empty() {
335 self.store_chunk_embeddings(model, result.embeddings, report)?;
336 }
337 }
338
339 Ok(())
340 }
341
342 fn flush_buffered_embeddings(
343 &self,
344 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
345 failed_chunk_ids: &mut HashSet<i64>,
346 report: &mut UpdateReport,
347 failed_docs: &mut HashSet<UpdateDocKey>,
348 ) -> Result<()> {
349 if pending_embeddings.is_empty() {
350 return Ok(());
351 }
352
353 let Some(embedder) = self.embedder.as_ref() else {
354 pending_embeddings.clear();
355 return Ok(());
356 };
357
358 let pending = std::mem::take(pending_embeddings);
359 let result = self.embed_preflighted_pending_batch_with_partial_failures(
360 embedder.as_ref(),
361 pending,
362 report,
363 failed_docs,
364 )?;
365 failed_chunk_ids.extend(result.failed_chunk_ids);
366 if result.embeddings.is_empty() {
367 return Ok(());
368 }
369
370 self.store_chunk_embeddings(self.embedding_model_key(), result.embeddings, report)
371 }
372
373 fn embed_preflighted_pending_batch_with_partial_failures(
374 &self,
375 embedder: &dyn crate::models::Embedder,
376 pending: Vec<PendingChunkEmbedding>,
377 report: &mut UpdateReport,
378 failed_docs: &mut HashSet<UpdateDocKey>,
379 ) -> Result<EmbeddedPendingBatch> {
380 if pending.is_empty() {
381 return Ok(EmbeddedPendingBatch::default());
382 }
383
384 let texts = pending
385 .iter()
386 .map(|embedding| embedding.text.clone())
387 .collect::<Vec<_>>();
388 crate::profile::increment_update_count("embedding_batch_calls", 1);
389 crate::profile::increment_update_count("embedding_input_texts", texts.len() as u64);
390 match crate::profile::timed_update_stage("embedding_http", || {
391 embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts)
392 }) {
393 Ok(vectors) => {
394 if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
395 return self.split_pending_embedding_batch(
396 embedder,
397 pending,
398 report,
399 failed_docs,
400 detail,
401 );
402 }
403
404 let embeddings = pending
405 .into_iter()
406 .zip(vectors)
407 .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
408 .collect::<Vec<_>>();
409 Ok(EmbeddedPendingBatch {
410 embeddings,
411 failed_chunk_ids: Vec::new(),
412 })
413 }
414 Err(err) => self.split_pending_embedding_batch(
415 embedder,
416 pending,
417 report,
418 failed_docs,
419 err.to_string(),
420 ),
421 }
422 }
423
424 fn split_pending_embedding_batch(
425 &self,
426 embedder: &dyn crate::models::Embedder,
427 pending: Vec<PendingChunkEmbedding>,
428 report: &mut UpdateReport,
429 failed_docs: &mut HashSet<UpdateDocKey>,
430 detail: String,
431 ) -> Result<EmbeddedPendingBatch> {
432 if pending.len() == 1 {
433 let embedding = pending
434 .into_iter()
435 .next()
436 .expect("single pending embedding should exist");
437 report.errors.push(file_error(
438 Some(embedding.path),
439 format!("embed failed: {detail}"),
440 ));
441 failed_docs.insert(embedding.doc_key);
442 return Ok(EmbeddedPendingBatch {
443 embeddings: Vec::new(),
444 failed_chunk_ids: vec![embedding.chunk_id],
445 });
446 }
447
448 let mid = pending.len() / 2;
449 let mut right = pending;
450 let left = right.drain(..mid).collect::<Vec<_>>();
451
452 let mut left_result = self.embed_preflighted_pending_batch_with_partial_failures(
453 embedder,
454 left,
455 report,
456 failed_docs,
457 )?;
458 let right_result = self.embed_preflighted_pending_batch_with_partial_failures(
459 embedder,
460 right,
461 report,
462 failed_docs,
463 )?;
464 left_result.embeddings.extend(right_result.embeddings);
465 left_result
466 .failed_chunk_ids
467 .extend(right_result.failed_chunk_ids);
468 Ok(left_result)
469 }
470
471 fn store_chunk_embeddings(
472 &self,
473 model: &str,
474 embeddings: Vec<(i64, String, Vec<f32>)>,
475 report: &mut UpdateReport,
476 ) -> Result<()> {
477 let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
478 let mut embedding_rows = Vec::with_capacity(embeddings.len());
479 for (chunk_id, space_name, vector) in embeddings {
480 if vector.is_empty() {
481 return Err(KboltError::Inference(format!(
482 "embedder returned an empty vector for chunk {chunk_id}"
483 ))
484 .into());
485 }
486
487 grouped_vectors
488 .entry(space_name)
489 .or_default()
490 .push((chunk_id, vector));
491 embedding_rows.push((chunk_id, model));
492 }
493
494 for (space, vectors) in grouped_vectors {
495 let refs = vectors
496 .iter()
497 .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
498 .collect::<Vec<_>>();
499 crate::profile::increment_update_count("usearch_vectors", refs.len() as u64);
500 self.storage.batch_insert_usearch_deferred(&space, &refs)?;
501 }
502
503 crate::profile::timed_update_stage("sqlite_insert_embeddings", || {
504 self.storage.insert_embeddings(&embedding_rows)
505 })?;
506 report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
507 Ok(())
508 }
509
510 fn preflight_pending_embeddings(
511 &self,
512 pending: Vec<PendingChunkEmbedding>,
513 report: &mut UpdateReport,
514 failed_docs: &mut HashSet<UpdateDocKey>,
515 failed_chunk_ids: &mut Vec<i64>,
516 ) -> Result<Vec<PendingChunkEmbedding>> {
517 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
518 return Ok(pending);
519 };
520
521 let mut accepted = Vec::with_capacity(pending.len());
522 for embedding in pending {
523 match count_document_tokens_profiled(
524 sizer.as_ref(),
525 &embedding.text,
526 "tokenize_preflight",
527 "tokenize_preflight_calls",
528 ) {
529 Ok(token_count) if token_count <= embedding.max_document_tokens => {
530 accepted.push(embedding);
531 }
532 Ok(token_count) => {
533 report.errors.push(file_error(
534 Some(embedding.path.clone()),
535 format!(
536 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
537 embedding.max_document_tokens
538 ),
539 ));
540 failed_docs.insert(embedding.doc_key);
541 failed_chunk_ids.push(embedding.chunk_id);
542 }
543 Err(err) => {
544 report.errors.push(file_error(
545 Some(embedding.path.clone()),
546 format!("embed preflight token count failed: {err}"),
547 ));
548 failed_docs.insert(embedding.doc_key);
549 failed_chunk_ids.push(embedding.chunk_id);
550 }
551 }
552 }
553
554 Ok(accepted)
555 }
556
557 fn preflight_prepared_embeddings(
558 &self,
559 prepared: Vec<PreparedChunkEmbedding>,
560 report: &mut UpdateReport,
561 failed_docs: &mut HashSet<UpdateDocKey>,
562 ) -> Result<PreparedEmbeddingPreflight> {
563 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
564 return Ok(PreparedEmbeddingPreflight {
565 accepted: prepared,
566 rejected_chunk_indexes: Vec::new(),
567 });
568 };
569
570 let mut preflight = PreparedEmbeddingPreflight {
571 accepted: Vec::with_capacity(prepared.len()),
572 rejected_chunk_indexes: Vec::new(),
573 };
574 for embedding in prepared {
575 match count_document_tokens_profiled(
576 sizer.as_ref(),
577 &embedding.text,
578 "tokenize_preflight",
579 "tokenize_preflight_calls",
580 ) {
581 Ok(token_count) if token_count <= embedding.max_document_tokens => {
582 preflight.accepted.push(embedding);
583 }
584 Ok(token_count) => {
585 report.errors.push(file_error(
586 Some(embedding.path.clone()),
587 format!(
588 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {}",
589 embedding.max_document_tokens
590 ),
591 ));
592 failed_docs.insert(embedding.doc_key);
593 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
594 }
595 Err(err) => {
596 report.errors.push(file_error(
597 Some(embedding.path.clone()),
598 format!("embed preflight token count failed: {err}"),
599 ));
600 failed_docs.insert(embedding.doc_key);
601 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
602 }
603 }
604 }
605
606 Ok(preflight)
607 }
608
609 fn replay_fts_dirty_documents(
610 &self,
611 repair_scope: &UpdateRepairScope,
612 report: &mut UpdateReport,
613 failed_docs: &mut HashSet<UpdateDocKey>,
614 ) -> Result<()> {
615 let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
616 if records.is_empty() {
617 return Ok(());
618 }
619
620 let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
621 for record in records {
622 let space_name = record.space_name;
623 let doc_id = record.doc_id;
624
625 if record.chunks.is_empty() {
626 self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
627 cleared_by_space.entry(space_name).or_default().push(doc_id);
628 continue;
629 }
630
631 let document_text = match self.storage.get_document_text(doc_id) {
632 Ok(row) => row,
633 Err(err) => {
634 failed_docs.insert(update_doc_key(
635 &space_name,
636 &record.collection_path,
637 &record.doc_path,
638 ));
639 report.errors.push(file_error(
640 Some(record.collection_path.join(&record.doc_path)),
641 format!("fts replay canonical text load failed: {err}"),
642 ));
643 continue;
644 }
645 };
646 let policy = resolve_policy(
647 &self.config.chunking,
648 Some(document_text.extractor_key.as_str()),
649 None,
650 );
651
652 let entries = record
653 .chunks
654 .iter()
655 .map(|chunk| -> Result<TantivyEntry> {
656 let chunk_text = crate::storage::chunk_text_from_canonical(
657 document_text.text.as_str(),
658 chunk,
659 )?;
660 let chunk_body = crate::ingest::chunk::chunk_retrieval_body(
661 chunk_text.as_str(),
662 chunk.retrieval_prefix.as_deref(),
663 );
664 Ok(TantivyEntry {
665 chunk_id: chunk.id,
666 doc_id,
667 filepath: record.doc_path.clone(),
668 semantic_title: record
669 .doc_title_source
670 .semantic_title(record.doc_title.as_str())
671 .map(ToString::to_string),
672 heading: chunk.heading.clone(),
673 body: retrieval_text_with_prefix(
674 chunk_body.as_str(),
675 record
676 .doc_title_source
677 .semantic_title(record.doc_title.as_str()),
678 chunk.heading.as_deref(),
679 policy.contextual_prefix,
680 ),
681 })
682 })
683 .collect::<Result<Vec<_>>>()?;
684
685 self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
686 self.storage.index_tantivy(&space_name, &entries)?;
687 cleared_by_space.entry(space_name).or_default().push(doc_id);
688 }
689
690 for (space_name, mut doc_ids) in cleared_by_space {
691 if doc_ids.is_empty() {
692 continue;
693 }
694
695 doc_ids.sort_unstable();
696 doc_ids.dedup();
697 self.storage.commit_tantivy(&space_name)?;
698 self.storage.batch_clear_fts_dirty(&doc_ids)?;
699 }
700
701 Ok(())
702 }
703
704 fn get_fts_dirty_documents_for_scope(
705 &self,
706 repair_scope: &UpdateRepairScope,
707 ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
708 match repair_scope {
709 UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
710 UpdateRepairScope::Space { space_id } => {
711 self.storage.get_fts_dirty_documents_in_space(*space_id)
712 }
713 UpdateRepairScope::Collections { collection_ids } => self
714 .storage
715 .get_fts_dirty_documents_in_collections(collection_ids),
716 }
717 }
718
719 fn get_unembedded_chunks_for_scope(
720 &self,
721 model: &str,
722 repair_scope: &UpdateRepairScope,
723 after_chunk_id: i64,
724 limit: usize,
725 ) -> Result<Vec<crate::storage::EmbedRecord>> {
726 match repair_scope {
727 UpdateRepairScope::Global => {
728 self.storage
729 .get_unembedded_chunks(model, after_chunk_id, limit)
730 }
731 UpdateRepairScope::Space { space_id } => {
732 self.storage
733 .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
734 }
735 UpdateRepairScope::Collections { collection_ids } => self
736 .storage
737 .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
738 }
739 }
740
741 fn list_reapable_documents_for_scope(
742 &self,
743 older_than_days: u32,
744 repair_scope: &UpdateRepairScope,
745 ) -> Result<Vec<crate::storage::ReapableDocument>> {
746 match repair_scope {
747 UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
748 UpdateRepairScope::Space { space_id } => self
749 .storage
750 .list_reapable_documents_in_space(older_than_days, *space_id),
751 UpdateRepairScope::Collections { collection_ids } => self
752 .storage
753 .list_reapable_documents_in_collections(older_than_days, collection_ids),
754 }
755 }
756
757 pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
758 self.resolve_targets(TargetScope {
759 space: options.space.as_deref(),
760 collections: &options.collections,
761 })
762 }
763
764 pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
765 let mut targets = Vec::new();
766
767 if scope.collections.is_empty() {
768 return self.resolve_update_targets_for_all_collections(scope.space);
769 }
770
771 let mut seen = std::collections::HashSet::new();
772 for raw_collection_name in scope.collections {
773 let collection_name = raw_collection_name.trim();
774 if collection_name.is_empty() {
775 return Err(KboltError::InvalidInput(
776 "collection names cannot be empty".to_string(),
777 )
778 .into());
779 }
780
781 let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
782 let collection = self
783 .storage
784 .get_collection(resolved_space.id, collection_name)?;
785
786 if seen.insert((collection.space_id, collection.name.clone())) {
787 targets.push(UpdateTarget {
788 space: resolved_space.name,
789 collection,
790 });
791 }
792 }
793
794 Ok(targets)
795 }
796
797 fn resolve_update_targets_for_all_collections(
798 &self,
799 space: Option<&str>,
800 ) -> Result<Vec<UpdateTarget>> {
801 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
802 let resolved = self.resolve_space_row(Some(space_name), None)?;
803 let mut map = std::collections::HashMap::new();
804 map.insert(resolved.id, resolved.name.clone());
805 (Some(resolved.id), map)
806 } else {
807 let spaces = self.storage.list_spaces()?;
808 let map = spaces
809 .into_iter()
810 .map(|space| (space.id, space.name))
811 .collect::<std::collections::HashMap<_, _>>();
812 (None, map)
813 };
814
815 let collections = self.storage.list_collections(space_id_filter)?;
816 let mut targets = Vec::with_capacity(collections.len());
817 for collection in collections {
818 let space_name = spaces_by_id
819 .get(&collection.space_id)
820 .ok_or_else(|| {
821 KboltError::Internal(format!(
822 "missing space mapping for collection '{}'",
823 collection.name
824 ))
825 })?
826 .clone();
827 targets.push(UpdateTarget {
828 space: space_name,
829 collection,
830 });
831 }
832
833 Ok(targets)
834 }
835
836 fn update_collection_target(
837 &self,
838 target: &UpdateTarget,
839 options: &UpdateOptions,
840 report: &mut UpdateReport,
841 fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
842 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
843 failed_chunk_ids: &mut HashSet<i64>,
844 failed_docs: &mut HashSet<UpdateDocKey>,
845 ) -> Result<()> {
846 let all_documents = crate::profile::timed_update_stage("sqlite_list_documents", || {
847 self.storage.list_documents(target.collection.id, false)
848 })?;
849 let document_ids = all_documents.iter().map(|doc| doc.id).collect::<Vec<_>>();
850 let document_text_generations =
851 crate::profile::timed_update_stage("sqlite_list_document_text_generations", || {
852 self.storage
853 .get_document_text_generation_keys(&document_ids)
854 })?;
855 let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
856 .into_iter()
857 .map(|doc| (doc.path.clone(), doc))
858 .collect();
859 let mut seen_paths = HashSet::new();
860 let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
861 let ignore_matcher = load_collection_ignore_matcher(
862 &self.config.config_dir,
863 &target.collection.path,
864 &target.space,
865 &target.collection.name,
866 )?;
867 let extractor_registry = default_registry();
868 let mut touched_collection = false;
869 let mut failed_walk_prefixes = Vec::new();
870 let mut collection_walk_incomplete = false;
871
872 for entry in super::ignore_helpers::build_collection_walk(&target.collection.path) {
873 let entry = match entry {
874 Ok(item) => item,
875 Err(err) => {
876 let error_scope = collect_walk_error_scope(&err);
877 let error_path = error_scope.paths.first().cloned();
878 if error_scope.collection_incomplete || error_scope.paths.is_empty() {
879 collection_walk_incomplete = true;
880 }
881 for path in &error_scope.paths {
882 let failed_path =
883 match collection_relative_path(&target.collection.path, path) {
884 Ok(relative) => {
885 if relative.is_empty() || relative == "." {
886 collection_walk_incomplete = true;
887 } else {
888 failed_walk_prefixes.push(relative.clone());
889 }
890 relative
891 }
892 Err(_) => {
893 collection_walk_incomplete = true;
894 path.display().to_string()
895 }
896 };
897 failed_docs.insert(update_doc_key(
898 &target.space,
899 &target.collection.path,
900 &failed_path,
901 ));
902 }
903 report
904 .errors
905 .push(file_error(error_path, format!("walk error: {err}")));
906 continue;
907 }
908 };
909
910 if !entry
911 .file_type()
912 .is_some_and(|file_type| file_type.is_file())
913 {
914 continue;
915 }
916
917 if is_hard_ignored_file(entry.path()) {
918 continue;
919 }
920
921 let relative_path =
922 match collection_relative_path(&target.collection.path, entry.path()) {
923 Ok(path) => path,
924 Err(err) => {
925 failed_docs.insert(update_doc_key(
926 &target.space,
927 &target.collection.path,
928 &entry.path().display().to_string(),
929 ));
930 report.errors.push(file_error(
931 Some(entry.path().to_path_buf()),
932 err.to_string(),
933 ));
934 continue;
935 }
936 };
937
938 if !extension_allowed(entry.path(), extension_filter.as_ref()) {
939 push_update_decision(
940 report,
941 options,
942 target,
943 &relative_path,
944 UpdateDecisionKind::Unsupported,
945 Some("extension not allowed".to_string()),
946 );
947 continue;
948 }
949
950 if let Some(matcher) = ignore_matcher.as_ref() {
951 if matcher
952 .matched(Path::new(&relative_path), false)
953 .is_ignore()
954 {
955 push_update_decision(
956 report,
957 options,
958 target,
959 &relative_path,
960 UpdateDecisionKind::Ignored,
961 Some("matched ignore patterns".to_string()),
962 );
963 continue;
964 }
965 }
966
967 let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
968 push_update_decision(
969 report,
970 options,
971 target,
972 &relative_path,
973 UpdateDecisionKind::Unsupported,
974 Some("no extractor available".to_string()),
975 );
976 continue;
977 };
978 let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
979 let generation_key =
980 ingestion_generation_key(extractor.profile_key(), extractor.version(), &policy);
981
982 report.scanned_docs += 1;
983 crate::profile::increment_update_count("docs_scanned", 1);
984 seen_paths.insert(relative_path.clone());
985
986 let scan_started = Instant::now();
987 let metadata = match entry.metadata() {
988 Ok(data) => data,
989 Err(err) => {
990 let detail = format!("metadata error: {err}");
991 failed_docs.insert(update_doc_key(
992 &target.space,
993 &target.collection.path,
994 &relative_path,
995 ));
996 push_update_decision(
997 report,
998 options,
999 target,
1000 &relative_path,
1001 UpdateDecisionKind::ReadFailed,
1002 Some(detail.clone()),
1003 );
1004 report
1005 .errors
1006 .push(file_error(Some(entry.path().to_path_buf()), detail));
1007 crate::profile::record_update_stage(
1008 "scan_metadata_mtime",
1009 scan_started.elapsed(),
1010 );
1011 continue;
1012 }
1013 };
1014
1015 let modified = match modified_token(&metadata) {
1016 Ok(value) => value,
1017 Err(err) => {
1018 let detail = format!("modified timestamp error: {err}");
1019 failed_docs.insert(update_doc_key(
1020 &target.space,
1021 &target.collection.path,
1022 &relative_path,
1023 ));
1024 push_update_decision(
1025 report,
1026 options,
1027 target,
1028 &relative_path,
1029 UpdateDecisionKind::ReadFailed,
1030 Some(detail.clone()),
1031 );
1032 report
1033 .errors
1034 .push(file_error(Some(entry.path().to_path_buf()), detail));
1035 crate::profile::record_update_stage(
1036 "scan_metadata_mtime",
1037 scan_started.elapsed(),
1038 );
1039 continue;
1040 }
1041 };
1042
1043 if let Some(existing) = docs_by_path.get(&relative_path) {
1044 let has_current_canonical_text = document_has_current_canonical_text(
1045 &document_text_generations,
1046 existing.id,
1047 generation_key.as_str(),
1048 );
1049 if existing.active && existing.modified == modified && has_current_canonical_text {
1050 report.skipped_mtime_docs += 1;
1051 push_update_decision(
1052 report,
1053 options,
1054 target,
1055 &relative_path,
1056 UpdateDecisionKind::SkippedMtime,
1057 None,
1058 );
1059 crate::profile::record_update_stage(
1060 "scan_metadata_mtime",
1061 scan_started.elapsed(),
1062 );
1063 continue;
1064 }
1065 }
1066 crate::profile::record_update_stage("scan_metadata_mtime", scan_started.elapsed());
1067
1068 let read_hash_started = Instant::now();
1069 let bytes = match std::fs::read(entry.path()) {
1070 Ok(data) => data,
1071 Err(err) => {
1072 let detail = err.to_string();
1073 failed_docs.insert(update_doc_key(
1074 &target.space,
1075 &target.collection.path,
1076 &relative_path,
1077 ));
1078 push_update_decision(
1079 report,
1080 options,
1081 target,
1082 &relative_path,
1083 UpdateDecisionKind::ReadFailed,
1084 Some(detail.clone()),
1085 );
1086 report
1087 .errors
1088 .push(file_error(Some(entry.path().to_path_buf()), detail));
1089 crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1090 continue;
1091 }
1092 };
1093 let hash = sha256_hex(&bytes);
1094 crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1095 let mut title = file_title(entry.path());
1096 let mut title_source = DocumentTitleSource::FilenameFallback;
1097
1098 let existing = docs_by_path.get(&relative_path).cloned();
1099 let pending_decision;
1100 let pending_indexing;
1101 if let Some(doc) = existing.as_ref() {
1102 let has_current_canonical_text = document_has_current_canonical_text(
1103 &document_text_generations,
1104 doc.id,
1105 generation_key.as_str(),
1106 );
1107 if doc.hash == hash && has_current_canonical_text {
1108 if doc.active {
1109 report.skipped_hash_docs += 1;
1110 push_update_decision(
1111 report,
1112 options,
1113 target,
1114 &relative_path,
1115 UpdateDecisionKind::SkippedHash,
1116 None,
1117 );
1118 } else {
1119 report.reactivated_docs += 1;
1120 push_update_decision(
1121 report,
1122 options,
1123 target,
1124 &relative_path,
1125 UpdateDecisionKind::Reactivated,
1126 None,
1127 );
1128 }
1129
1130 if !options.dry_run {
1131 crate::profile::timed_update_stage("sqlite_refresh_document", || {
1132 self.storage.refresh_document_activity(doc.id, &modified)
1133 })?;
1134 }
1135 continue;
1136 }
1137
1138 pending_indexing = PendingDocumentIndexing::Updated {
1139 reactivated: !doc.active,
1140 };
1141 pending_decision = (
1142 UpdateDecisionKind::Changed,
1143 (!doc.active).then_some("reactivated".to_string()),
1144 );
1145 } else {
1146 pending_indexing = PendingDocumentIndexing::Added;
1147 pending_decision = (UpdateDecisionKind::New, None);
1148 }
1149
1150 if options.dry_run {
1151 pending_indexing.record(report);
1152 let (kind, detail) = pending_decision;
1153 push_update_decision(report, options, target, &relative_path, kind, detail);
1154 continue;
1155 }
1156
1157 let extracted = match crate::profile::timed_update_stage("extract", || {
1158 extractor.extract(entry.path(), &bytes)
1159 }) {
1160 Ok(document) => document,
1161 Err(err) => {
1162 let detail = format!("extract failed: {err}");
1163 failed_docs.insert(update_doc_key(
1164 &target.space,
1165 &target.collection.path,
1166 &relative_path,
1167 ));
1168 push_update_decision(
1169 report,
1170 options,
1171 target,
1172 &relative_path,
1173 UpdateDecisionKind::ExtractFailed,
1174 Some(detail.clone()),
1175 );
1176 report
1177 .errors
1178 .push(file_error(Some(entry.path().to_path_buf()), detail));
1179 continue;
1180 }
1181 };
1182 if let Some(extracted_title) = extracted
1183 .title
1184 .as_deref()
1185 .map(str::trim)
1186 .filter(|title| !title.is_empty())
1187 {
1188 title = extracted_title.to_string();
1189 title_source = DocumentTitleSource::Extracted;
1190 }
1191
1192 let canonical = crate::profile::timed_update_stage("canonical_text", || {
1193 build_canonical_document(&extracted)
1194 });
1195 let text_hash = sha256_hex(canonical.text.as_bytes());
1196 let max_document_tokens = effective_chunk_hard_max(&policy);
1197 let chunk_started = Instant::now();
1198 let final_chunks_result = match self.embedding_document_sizer.as_ref() {
1199 Some(sizer) => {
1200 let sizer_counter = EmbeddingDocumentSizerCounter {
1201 inner: sizer.as_ref(),
1202 };
1203 chunk_canonical_document_with_counter(
1204 &canonical.document,
1205 &policy,
1206 &sizer_counter,
1207 )
1208 }
1209 None => Ok(chunk_canonical_document(&canonical.document, &policy)),
1210 };
1211 crate::profile::record_update_stage("chunk", chunk_started.elapsed());
1212 let final_chunks = match final_chunks_result {
1213 Ok(chunks) => chunks,
1214 Err(err) => {
1215 let detail = format!("chunking failed: {err}");
1216 failed_docs.insert(update_doc_key(
1217 &target.space,
1218 &target.collection.path,
1219 &relative_path,
1220 ));
1221 push_update_decision(
1222 report,
1223 options,
1224 target,
1225 &relative_path,
1226 UpdateDecisionKind::ExtractFailed,
1227 Some(detail.clone()),
1228 );
1229 report
1230 .errors
1231 .push(file_error(Some(entry.path().to_path_buf()), detail));
1232 continue;
1233 }
1234 };
1235 crate::profile::increment_update_count("chunks_created", final_chunks.len() as u64);
1236
1237 let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1238 let chunk_inserts = final_chunks
1239 .iter()
1240 .enumerate()
1241 .map(|(index, chunk)| ChunkInsert {
1242 seq: index as i32,
1243 offset: chunk.offset,
1244 length: chunk.length,
1245 heading: chunk.heading.clone(),
1246 kind: chunk.kind,
1247 retrieval_prefix: chunk.retrieval_prefix.clone(),
1248 })
1249 .collect::<Vec<_>>();
1250 let mut prepared_embeddings = Vec::new();
1251 if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1252 let prepared = crate::profile::timed_update_stage("embedding_prepare_text", || {
1253 prepare_chunk_embeddings(
1254 &final_chunks,
1255 &doc_key,
1256 target,
1257 entry.path(),
1258 max_document_tokens,
1259 )
1260 });
1261 if existing.is_some() {
1262 let preflight =
1263 self.preflight_prepared_embeddings(prepared, report, failed_docs)?;
1264 prepared_embeddings = preflight.accepted;
1265 if !preflight.rejected_chunk_indexes.is_empty() {
1266 push_update_decision(
1267 report,
1268 options,
1269 target,
1270 &relative_path,
1271 UpdateDecisionKind::ExtractFailed,
1272 Some("embed preflight failed".to_string()),
1273 );
1274 continue;
1275 }
1276 } else {
1277 prepared_embeddings = prepared;
1278 }
1279 }
1280
1281 let replacement =
1282 crate::profile::timed_update_stage("sqlite_replace_document_generation", || {
1283 self.storage
1284 .replace_document_generation(DocumentGenerationReplace {
1285 collection_id: target.collection.id,
1286 path: &relative_path,
1287 title: &title,
1288 title_source,
1289 hash: &hash,
1290 modified: &modified,
1291 extractor_key: extractor.profile_key(),
1292 source_hash: &hash,
1293 text_hash: &text_hash,
1294 generation_key: &generation_key,
1295 text: canonical.text.as_str(),
1296 chunks: &chunk_inserts,
1297 })
1298 })?;
1299 let doc_id = replacement.doc_id;
1300 let chunk_ids = replacement.chunk_ids;
1301
1302 if !replacement.old_chunk_ids.is_empty() {
1303 crate::profile::timed_update_stage("tantivy_delete", || {
1304 self.storage
1305 .delete_tantivy(&target.space, &replacement.old_chunk_ids)
1306 })?;
1307 crate::profile::timed_update_stage("usearch_delete", || {
1308 self.storage
1309 .delete_usearch_deferred(&target.space, &replacement.old_chunk_ids)
1310 })?;
1311 }
1312
1313 fts_dirty_by_space
1314 .entry(target.space.clone())
1315 .or_default()
1316 .insert(doc_id);
1317
1318 if !chunk_ids.is_empty() {
1319 if !options.no_embed && self.embedder.is_some() {
1320 for prepared in prepared_embeddings {
1321 pending_embeddings.push(PendingChunkEmbedding {
1322 chunk_id: chunk_ids[prepared.chunk_index],
1323 doc_key: prepared.doc_key,
1324 space_name: prepared.space_name,
1325 path: prepared.path,
1326 text: prepared.text,
1327 max_document_tokens: prepared.max_document_tokens,
1328 });
1329 }
1330 if pending_embeddings.len() >= 64 {
1331 self.flush_buffered_embeddings(
1332 pending_embeddings,
1333 failed_chunk_ids,
1334 report,
1335 failed_docs,
1336 )?;
1337 }
1338 }
1339
1340 let entries = chunk_ids
1341 .iter()
1342 .zip(final_chunks.iter())
1343 .map(|(chunk_id, chunk)| TantivyEntry {
1344 chunk_id: *chunk_id,
1345 doc_id,
1346 filepath: relative_path.clone(),
1347 semantic_title: title_source
1348 .semantic_title(title.as_str())
1349 .map(ToString::to_string),
1350 heading: chunk.heading.clone(),
1351 body: retrieval_text_with_prefix(
1352 chunk.retrieval_text().as_str(),
1353 title_source.semantic_title(title.as_str()),
1354 chunk.heading.as_deref(),
1355 policy.contextual_prefix,
1356 ),
1357 })
1358 .collect::<Vec<_>>();
1359 crate::profile::increment_update_count("tantivy_entries", entries.len() as u64);
1360 crate::profile::timed_update_stage("tantivy_add", || {
1361 self.storage.index_tantivy(&target.space, &entries)
1362 })?;
1363 }
1364
1365 docs_by_path.insert(
1366 relative_path.clone(),
1367 crate::profile::timed_update_stage("sqlite_get_document_after_write", || {
1368 self.storage
1369 .get_document_by_path(target.collection.id, &relative_path)
1370 })?
1371 .ok_or_else(|| {
1372 KboltError::Internal(format!(
1373 "document missing after upsert: collection={}, path={relative_path}",
1374 target.collection.id
1375 ))
1376 })?,
1377 );
1378 pending_indexing.record(report);
1379 let (kind, detail) = pending_decision;
1380 push_update_decision(report, options, target, &relative_path, kind, detail);
1381 touched_collection = true;
1382 }
1383
1384 let mut missing_docs = docs_by_path
1385 .values()
1386 .filter(|doc| {
1387 doc.active
1388 && !seen_paths.contains(&doc.path)
1389 && !path_is_under_failed_walk(
1390 doc.path.as_str(),
1391 &failed_walk_prefixes,
1392 collection_walk_incomplete,
1393 )
1394 })
1395 .cloned()
1396 .collect::<Vec<_>>();
1397 missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1398
1399 for doc in missing_docs {
1400 if doc.active && !seen_paths.contains(&doc.path) {
1401 report.deactivated_docs += 1;
1402 push_update_decision(
1403 report,
1404 options,
1405 target,
1406 &doc.path,
1407 UpdateDecisionKind::Deactivated,
1408 None,
1409 );
1410 if !options.dry_run {
1411 crate::profile::timed_update_stage("sqlite_deactivate_document", || {
1412 self.storage.deactivate_document(doc.id)
1413 })?;
1414 touched_collection = true;
1415 }
1416 }
1417 }
1418
1419 if touched_collection && !options.dry_run {
1420 crate::profile::timed_update_stage("sqlite_update_collection_timestamp", || {
1421 self.storage
1422 .update_collection_timestamp(target.collection.id)
1423 })?;
1424 }
1425
1426 Ok(())
1427 }
1428}
1429
1430#[derive(Debug, Default)]
1431struct WalkErrorScope {
1432 paths: Vec<std::path::PathBuf>,
1433 collection_incomplete: bool,
1434}
1435
1436fn collect_walk_error_scope(err: &ignore::Error) -> WalkErrorScope {
1437 let mut scope = WalkErrorScope::default();
1438 collect_walk_error_scope_into(err, &mut scope, false);
1439 scope
1440}
1441
1442fn collect_walk_error_scope_into(
1443 err: &ignore::Error,
1444 scope: &mut WalkErrorScope,
1445 has_path_context: bool,
1446) {
1447 match err {
1448 ignore::Error::Partial(errors) => {
1449 if errors.is_empty() {
1450 scope.collection_incomplete = true;
1451 }
1452 for err in errors {
1453 collect_walk_error_scope_into(err, scope, has_path_context);
1454 }
1455 }
1456 ignore::Error::WithLineNumber { err, .. } | ignore::Error::WithDepth { err, .. } => {
1457 collect_walk_error_scope_into(err, scope, has_path_context);
1458 }
1459 ignore::Error::WithPath { path, err } => {
1460 scope.paths.push(path.clone());
1461 collect_walk_error_scope_into(err, scope, true);
1462 }
1463 ignore::Error::Loop { child, .. } => {
1464 scope.paths.push(child.clone());
1465 }
1466 ignore::Error::Io(_) => {
1467 if !has_path_context {
1468 scope.collection_incomplete = true;
1469 }
1470 }
1471 ignore::Error::Glob { .. }
1472 | ignore::Error::UnrecognizedFileType(_)
1473 | ignore::Error::InvalidDefinition => {
1474 scope.collection_incomplete = true;
1475 }
1476 }
1477}
1478
1479fn path_is_under_failed_walk(
1480 doc_path: &str,
1481 failed_walk_prefixes: &[String],
1482 collection_walk_incomplete: bool,
1483) -> bool {
1484 if collection_walk_incomplete {
1485 return true;
1486 }
1487
1488 failed_walk_prefixes.iter().any(|prefix| {
1489 prefix.is_empty()
1490 || prefix == "."
1491 || doc_path == prefix
1492 || doc_path
1493 .strip_prefix(prefix.as_str())
1494 .is_some_and(|suffix| suffix.starts_with('/'))
1495 })
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500 use super::*;
1501 use std::io;
1502 use std::path::PathBuf;
1503
1504 #[test]
1505 fn walk_error_scope_collects_every_partial_path() {
1506 let err = ignore::Error::Partial(vec![
1507 ignore::Error::WithPath {
1508 path: PathBuf::from("/collection/blocked-a"),
1509 err: Box::new(ignore::Error::Io(io::Error::new(
1510 io::ErrorKind::PermissionDenied,
1511 "blocked",
1512 ))),
1513 },
1514 ignore::Error::WithDepth {
1515 depth: 2,
1516 err: Box::new(ignore::Error::Loop {
1517 ancestor: PathBuf::from("/collection"),
1518 child: PathBuf::from("/collection/blocked-b"),
1519 }),
1520 },
1521 ]);
1522
1523 let scope = collect_walk_error_scope(&err);
1524
1525 assert_eq!(
1526 scope.paths,
1527 vec![
1528 PathBuf::from("/collection/blocked-a"),
1529 PathBuf::from("/collection/blocked-b")
1530 ]
1531 );
1532 assert!(!scope.collection_incomplete);
1533 }
1534
1535 #[test]
1536 fn walk_error_scope_marks_pathless_errors_incomplete() {
1537 let err = ignore::Error::Partial(vec![
1538 ignore::Error::WithPath {
1539 path: PathBuf::from("/collection/blocked"),
1540 err: Box::new(ignore::Error::Io(io::Error::new(
1541 io::ErrorKind::PermissionDenied,
1542 "blocked",
1543 ))),
1544 },
1545 ignore::Error::Io(io::Error::new(io::ErrorKind::Other, "unknown")),
1546 ]);
1547
1548 let scope = collect_walk_error_scope(&err);
1549
1550 assert_eq!(scope.paths, vec![PathBuf::from("/collection/blocked")]);
1551 assert!(scope.collection_incomplete);
1552 }
1553
1554 #[test]
1555 fn walk_error_scope_treats_ignore_rule_errors_as_incomplete() {
1556 let err = ignore::Error::WithPath {
1557 path: PathBuf::from("/collection/.gitignore"),
1558 err: Box::new(ignore::Error::WithLineNumber {
1559 line: 4,
1560 err: Box::new(ignore::Error::Glob {
1561 glob: Some("[".to_string()),
1562 err: "invalid glob".to_string(),
1563 }),
1564 }),
1565 };
1566
1567 let scope = collect_walk_error_scope(&err);
1568
1569 assert_eq!(scope.paths, vec![PathBuf::from("/collection/.gitignore")]);
1570 assert!(scope.collection_incomplete);
1571 }
1572}
1573
1574#[derive(Debug)]
1575struct PreparedChunkEmbedding {
1576 chunk_index: usize,
1577 doc_key: UpdateDocKey,
1578 space_name: String,
1579 path: std::path::PathBuf,
1580 text: String,
1581 max_document_tokens: usize,
1582}
1583
1584#[derive(Debug, Default)]
1585struct PreparedEmbeddingPreflight {
1586 accepted: Vec<PreparedChunkEmbedding>,
1587 rejected_chunk_indexes: Vec<usize>,
1588}
1589
1590#[derive(Debug)]
1591struct PendingChunkEmbedding {
1592 chunk_id: i64,
1593 doc_key: UpdateDocKey,
1594 space_name: String,
1595 path: std::path::PathBuf,
1596 text: String,
1597 max_document_tokens: usize,
1598}
1599
1600#[derive(Default)]
1601struct EmbeddedPendingBatch {
1602 embeddings: Vec<(i64, String, Vec<f32>)>,
1603 failed_chunk_ids: Vec<i64>,
1604}
1605
1606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1607struct UpdateDocKey {
1608 space: String,
1609 collection_path: std::path::PathBuf,
1610 path: String,
1611}
1612
1613struct EmbeddingDocumentSizerCounter<'a> {
1614 inner: &'a dyn crate::models::EmbeddingDocumentSizer,
1615}
1616
1617impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1618 fn count(&self, text: &str) -> Result<usize> {
1619 count_document_tokens_profiled(
1620 self.inner,
1621 text,
1622 "tokenize_chunking",
1623 "tokenize_chunking_calls",
1624 )
1625 }
1626}
1627
1628fn count_document_tokens_profiled(
1629 sizer: &dyn crate::models::EmbeddingDocumentSizer,
1630 text: &str,
1631 stage: &'static str,
1632 count: &'static str,
1633) -> Result<usize> {
1634 let started = Instant::now();
1635 let result = sizer.count_document_tokens(text);
1636 crate::profile::record_update_stage(stage, started.elapsed());
1637 crate::profile::increment_update_count(count, 1);
1638 result
1639}
1640
1641fn prepare_chunk_embeddings(
1642 final_chunks: &[crate::ingest::chunk::FinalChunk],
1643 doc_key: &UpdateDocKey,
1644 target: &UpdateTarget,
1645 path: &Path,
1646 max_document_tokens: usize,
1647) -> Vec<PreparedChunkEmbedding> {
1648 final_chunks
1649 .iter()
1650 .enumerate()
1651 .map(|(chunk_index, chunk)| {
1652 let mut text = chunk.retrieval_text();
1653 if text.trim().is_empty() {
1654 text = " ".to_string();
1655 }
1656 PreparedChunkEmbedding {
1657 chunk_index,
1658 doc_key: doc_key.clone(),
1659 space_name: target.space.clone(),
1660 path: path.to_path_buf(),
1661 text,
1662 max_document_tokens,
1663 }
1664 })
1665 .collect()
1666}
1667
1668#[derive(Debug, Clone)]
1669enum UpdateRepairScope {
1670 Global,
1671 Space { space_id: i64 },
1672 Collections { collection_ids: Vec<i64> },
1673}
1674
1675impl UpdateRepairScope {
1676 fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1677 if options.collections.is_empty() {
1678 if let Some(target) = targets.first() {
1679 if options.space.is_some() {
1680 return Self::Space {
1681 space_id: target.collection.space_id,
1682 };
1683 }
1684 }
1685 return Self::Global;
1686 }
1687
1688 let mut collection_ids = targets
1689 .iter()
1690 .map(|target| target.collection.id)
1691 .collect::<Vec<_>>();
1692 collection_ids.sort_unstable();
1693 collection_ids.dedup();
1694 Self::Collections { collection_ids }
1695 }
1696
1697 fn allows_space_dense_repair(&self) -> bool {
1698 !matches!(self, Self::Collections { .. })
1699 }
1700}
1701
1702#[derive(Debug, Clone, Copy)]
1703enum PendingDocumentIndexing {
1704 Added,
1705 Updated { reactivated: bool },
1706}
1707
1708impl PendingDocumentIndexing {
1709 fn record(self, report: &mut UpdateReport) {
1710 match self {
1711 Self::Added => {
1712 report.added_docs += 1;
1713 }
1714 Self::Updated { reactivated } => {
1715 report.updated_docs += 1;
1716 if reactivated {
1717 report.reactivated_docs += 1;
1718 }
1719 }
1720 }
1721 }
1722}
1723
1724fn document_has_current_canonical_text(
1725 document_text_generations: &HashMap<i64, String>,
1726 doc_id: i64,
1727 generation_key: &str,
1728) -> bool {
1729 document_text_generations
1730 .get(&doc_id)
1731 .is_some_and(|stored| stored == generation_key)
1732}
1733
1734fn finish_update_with_usearch_flush(
1735 update_result: Result<UpdateReport>,
1736 flush_result: Result<()>,
1737) -> Result<UpdateReport> {
1738 match (update_result, flush_result) {
1739 (Ok(report), Ok(())) => Ok(report),
1740 (Err(err), Ok(())) => Err(err),
1741 (Ok(_), Err(err)) => Err(err),
1742 (Err(update_err), Err(flush_err)) => Err(KboltError::Internal(format!(
1743 "update failed: {update_err}; additionally failed to save dirty vector indexes: {flush_err}"
1744 ))
1745 .into()),
1746 }
1747}
1748
1749fn invalid_pending_embedding_batch_detail(
1750 pending: &[PendingChunkEmbedding],
1751 vectors: &[Vec<f32>],
1752) -> Option<String> {
1753 if vectors.len() != pending.len() {
1754 return Some(format!(
1755 "embedder returned {} vectors for {} chunks",
1756 vectors.len(),
1757 pending.len()
1758 ));
1759 }
1760
1761 if vectors.iter().any(|vector| vector.is_empty()) {
1762 if pending.len() == 1 {
1763 return Some(format!(
1764 "embedder returned an empty vector for chunk {}",
1765 pending[0].chunk_id
1766 ));
1767 }
1768 return Some("embedder returned an empty vector".to_string());
1769 }
1770
1771 None
1772}
1773
1774fn push_update_decision(
1775 report: &mut UpdateReport,
1776 options: &UpdateOptions,
1777 target: &UpdateTarget,
1778 path: &str,
1779 kind: UpdateDecisionKind,
1780 detail: Option<String>,
1781) {
1782 if !options.verbose {
1783 return;
1784 }
1785
1786 report.decisions.push(UpdateDecision {
1787 space: target.space.clone(),
1788 collection: target.collection.name.clone(),
1789 path: path.to_string(),
1790 kind,
1791 detail,
1792 });
1793}
1794
1795fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1796 UpdateDocKey {
1797 space: space.to_string(),
1798 collection_path: collection_path.to_path_buf(),
1799 path: path.to_string(),
1800 }
1801}
1802
1803fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1804 policy
1805 .hard_max_tokens
1806 .max(policy.soft_max_tokens)
1807 .max(policy.target_tokens)
1808 .max(1)
1809}
1810
1811fn ingestion_generation_key(
1812 extractor_key: &str,
1813 extractor_version: u32,
1814 policy: &crate::config::ChunkPolicy,
1815) -> String {
1816 format!(
1817 "canonical=v{CANONICAL_TEXT_GENERATION};chunker=v{CHUNKER_GENERATION};extractor={extractor_key}:v{extractor_version};chunk=target:{}:soft:{}:hard:{}:overlap:{}:prefix:{}",
1818 policy.target_tokens,
1819 policy.soft_max_tokens,
1820 policy.hard_max_tokens,
1821 policy.boundary_overlap_tokens,
1822 policy.contextual_prefix
1823 )
1824}