1use super::*;
2use crate::retrieval_context::{
3 render_bm25_document, render_dense_document, ChunkRetrievalContext,
4};
5use kbolt_types::{UpdateDecision, UpdateDecisionKind};
6
7const CANONICAL_TEXT_GENERATION: u32 = 1;
8const CHUNKER_GENERATION: u32 = 2;
9
10impl Engine {
11 pub fn update(&self, options: UpdateOptions) -> Result<UpdateReport> {
12 let _lock = self.acquire_operation_lock(LockMode::Exclusive)?;
13 self.update_unlocked(options)
14 }
15
16 pub(super) fn update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
17 let _profile = crate::profile::UpdateProfileGuard::start();
18 let update_result = self.run_update_unlocked(options);
19 let flush_result = crate::profile::timed_update_stage("usearch_save_dirty", || {
20 self.storage.save_dirty_usearch_indexes()
21 });
22 finish_update_with_usearch_flush(update_result, flush_result)
23 }
24
25 fn run_update_unlocked(&self, options: UpdateOptions) -> Result<UpdateReport> {
26 let started = Instant::now();
27 let mut report = UpdateReport {
28 scanned_docs: 0,
29 skipped_mtime_docs: 0,
30 skipped_hash_docs: 0,
31 added_docs: 0,
32 updated_docs: 0,
33 failed_docs: 0,
34 deactivated_docs: 0,
35 reactivated_docs: 0,
36 reaped_docs: 0,
37 embedded_chunks: 0,
38 decisions: Vec::new(),
39 errors: Vec::new(),
40 elapsed_ms: 0,
41 };
42 let mut failed_docs = HashSet::new();
43
44 let targets = crate::profile::timed_update_stage("resolve_targets", || {
45 self.resolve_targets(TargetScope {
46 space: options.space.as_deref(),
47 collections: &options.collections,
48 })
49 })?;
50 if targets.is_empty() {
51 report.elapsed_ms = started.elapsed().as_millis() as u64;
52 return Ok(report);
53 }
54 let repair_scope = UpdateRepairScope::from_options_and_targets(&options, &targets);
55
56 crate::profile::timed_update_stage("dense_integrity", || {
57 self.reconcile_dense_integrity(&targets, &repair_scope, &options)
58 })?;
59
60 if !options.dry_run {
61 crate::profile::timed_update_stage("fts_dirty_replay", || {
62 self.replay_fts_dirty_documents(&repair_scope, &mut report, &mut failed_docs)
63 })?;
64 }
65
66 let mut fts_dirty_by_space: HashMap<String, HashSet<i64>> = HashMap::new();
67 let mut pending_embeddings = Vec::new();
68 let mut failed_embedding_chunk_ids = HashSet::new();
69 for target in &targets {
70 self.update_collection_target(
71 target,
72 &options,
73 &mut report,
74 &mut fts_dirty_by_space,
75 &mut pending_embeddings,
76 &mut failed_embedding_chunk_ids,
77 &mut failed_docs,
78 )?;
79 }
80
81 if !options.dry_run {
82 crate::profile::timed_update_stage("embedding_flush_buffered", || {
83 self.flush_buffered_embeddings(
84 &mut pending_embeddings,
85 &mut failed_embedding_chunk_ids,
86 &mut report,
87 &mut failed_docs,
88 )
89 })?;
90
91 for (space, doc_ids) in fts_dirty_by_space {
92 if doc_ids.is_empty() {
93 continue;
94 }
95
96 crate::profile::timed_update_stage("tantivy_commit", || {
97 self.storage.commit_tantivy(&space)
98 })?;
99 let mut ids = doc_ids.into_iter().collect::<Vec<_>>();
100 ids.sort_unstable();
101 crate::profile::timed_update_stage("sqlite_clear_fts_dirty", || {
102 self.storage.batch_clear_fts_dirty(&ids)
103 })?;
104 }
105
106 crate::profile::timed_update_stage("embedding_backlog", || {
107 self.embed_pending_chunks(
108 &repair_scope,
109 &options,
110 &mut failed_embedding_chunk_ids,
111 &mut report,
112 &mut failed_docs,
113 )
114 })?;
115
116 let reaped = crate::profile::timed_update_stage("sqlite_list_reapable", || {
117 self.list_reapable_documents_for_scope(self.config.reaping.days, &repair_scope)
118 })?;
119 let mut reaped_doc_ids = Vec::with_capacity(reaped.len());
120 let mut chunk_ids_by_space: HashMap<String, Vec<i64>> = HashMap::new();
121 for document in reaped {
122 reaped_doc_ids.push(document.doc_id);
123 if !document.chunk_ids.is_empty() {
124 chunk_ids_by_space
125 .entry(document.space_name)
126 .or_default()
127 .extend(document.chunk_ids);
128 }
129 }
130 for (space, chunk_ids) in chunk_ids_by_space {
131 crate::profile::timed_update_stage("purge_reaped_indexes", || {
132 self.purge_space_chunks(&space, &chunk_ids)
133 })?;
134 }
135 crate::profile::timed_update_stage("sqlite_delete_reaped_documents", || {
136 self.storage.delete_documents(&reaped_doc_ids)
137 })?;
138 report.reaped_docs = reaped_doc_ids.len();
139 }
140
141 report.failed_docs = failed_docs.len();
142 report.elapsed_ms = started.elapsed().as_millis() as u64;
143 Ok(report)
144 }
145
146 fn reconcile_dense_integrity(
147 &self,
148 targets: &[UpdateTarget],
149 repair_scope: &UpdateRepairScope,
150 options: &UpdateOptions,
151 ) -> Result<()> {
152 if options.no_embed || options.dry_run {
153 return Ok(());
154 }
155
156 let expected_model = self.embedding_model_key();
157 let mut visited_spaces = HashSet::new();
158 for target in targets {
159 if !visited_spaces.insert(target.collection.space_id) {
160 continue;
161 }
162
163 let models = self
164 .storage
165 .list_embedding_models_in_space(target.collection.space_id)?;
166 let mut reasons = Vec::new();
167 if models
168 .iter()
169 .any(|model| model.as_str() != expected_model.as_str())
170 {
171 reasons.push(format!(
172 "stored embedding models {:?} do not match current model '{}'",
173 models, expected_model
174 ));
175 }
176
177 let sqlite_count = self
178 .storage
179 .count_embedded_chunks(Some(target.collection.space_id))?;
180 let usearch_count = self.storage.count_usearch(&target.space)?;
181 if sqlite_count != usearch_count {
182 reasons.push(format!(
183 "sqlite embedded chunk count {sqlite_count} does not match USearch vector count {usearch_count}"
184 ));
185 }
186
187 if reasons.is_empty() {
188 continue;
189 }
190
191 if !repair_scope.allows_space_dense_repair() {
192 return Err(KboltError::SpaceDenseRepairRequired {
193 space: target.space.clone(),
194 reason: reasons.join("; "),
195 }
196 .into());
197 }
198
199 self.storage
200 .delete_embeddings_for_space(target.collection.space_id)?;
201 self.storage.clear_usearch(&target.space)?;
202 }
203
204 Ok(())
205 }
206
207 fn embed_pending_chunks(
208 &self,
209 repair_scope: &UpdateRepairScope,
210 options: &UpdateOptions,
211 failed_chunk_ids: &mut HashSet<i64>,
212 report: &mut UpdateReport,
213 failed_docs: &mut HashSet<UpdateDocKey>,
214 ) -> Result<()> {
215 if options.no_embed || options.dry_run {
216 return Ok(());
217 }
218
219 let Some(embedder) = self.embedder.as_ref() else {
220 return Ok(());
221 };
222
223 let model = self.embedding_model_key();
224 let batch_window = embedder.preferred_document_batch_window().max(1);
225 let mut after_chunk_id = 0_i64;
226 loop {
227 let backlog =
228 crate::profile::timed_update_stage("sqlite_get_unembedded_chunks", || {
229 self.get_unembedded_chunks_for_scope(
230 model.as_str(),
231 repair_scope,
232 after_chunk_id,
233 batch_window,
234 )
235 })?;
236 if backlog.is_empty() {
237 break;
238 }
239
240 after_chunk_id = backlog
241 .last()
242 .map(|record| record.chunk.id)
243 .expect("non-empty backlog should have a last chunk id");
244
245 let backlog_doc_ids = backlog
246 .iter()
247 .filter(|record| !failed_chunk_ids.contains(&record.chunk.id))
248 .map(|record| record.chunk.doc_id)
249 .collect::<Vec<_>>();
250 let document_text_cache =
251 crate::profile::timed_update_stage("embedding_backlog_read", || {
252 self.storage.get_existing_document_texts(&backlog_doc_ids)
253 })?;
254
255 let mut pending = Vec::new();
256 for record in backlog {
257 if failed_chunk_ids.contains(&record.chunk.id) {
258 continue;
259 }
260
261 let full_path = record.collection_path.join(&record.doc_path);
262 let Some(document_text) = document_text_cache.get(&record.chunk.doc_id) else {
263 report.errors.push(file_error(
264 Some(full_path.clone()),
265 format!(
266 "embed canonical text load failed: {}",
267 crate::storage::missing_document_text_error(record.chunk.doc_id)
268 ),
269 ));
270 failed_docs.insert(update_doc_key(
271 &record.space_name,
272 &record.collection_path,
273 &record.doc_path,
274 ));
275 continue;
276 };
277 let chunk_canonical_text = match crate::storage::chunk_text_from_canonical(
278 document_text.text.as_str(),
279 &record.chunk,
280 ) {
281 Ok(text) => text,
282 Err(err) => {
283 report.errors.push(file_error(
284 Some(full_path.clone()),
285 format!("embed canonical text load failed: {err}"),
286 ));
287 failed_docs.insert(update_doc_key(
288 &record.space_name,
289 &record.collection_path,
290 &record.doc_path,
291 ));
292 continue;
293 }
294 };
295 let dense_input = render_dense_document(ChunkRetrievalContext {
296 body: chunk_canonical_text.as_str(),
297 retrieval_prefix: record.chunk.retrieval_prefix.as_deref(),
298 title: record.doc_title.as_deref(),
299 heading: record.chunk.heading.as_deref(),
300 });
301 let embedding_text =
302 normalize_embedding_payload(self.format_embedding_document(&dense_input));
303 let policy = resolve_policy(
304 &self.config.chunking,
305 Some(document_text.extractor_key.as_str()),
306 None,
307 );
308 let max_document_tokens = effective_chunk_hard_max(&policy);
309
310 pending.push(PendingChunkEmbedding {
311 chunk_id: record.chunk.id,
312 doc_key: update_doc_key(
313 &record.space_name,
314 &record.collection_path,
315 &record.doc_path,
316 ),
317 space_name: record.space_name,
318 path: full_path,
319 embedding_text,
320 max_document_tokens,
321 });
322 }
323
324 if pending.is_empty() {
325 continue;
326 }
327
328 let mut preflight_failed_chunk_ids = Vec::new();
329 let pending = self.preflight_pending_embeddings(
330 pending,
331 report,
332 failed_docs,
333 &mut preflight_failed_chunk_ids,
334 )?;
335 failed_chunk_ids.extend(preflight_failed_chunk_ids);
336 if pending.is_empty() {
337 continue;
338 }
339
340 let result = self.embed_preflighted_pending_batch_with_partial_failures(
341 embedder.as_ref(),
342 pending,
343 report,
344 failed_docs,
345 )?;
346 failed_chunk_ids.extend(result.failed_chunk_ids);
347 if !result.embeddings.is_empty() {
348 self.store_chunk_embeddings(model.as_str(), result.embeddings, report)?;
349 }
350 }
351
352 Ok(())
353 }
354
355 fn flush_buffered_embeddings(
356 &self,
357 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
358 failed_chunk_ids: &mut HashSet<i64>,
359 report: &mut UpdateReport,
360 failed_docs: &mut HashSet<UpdateDocKey>,
361 ) -> Result<()> {
362 if pending_embeddings.is_empty() {
363 return Ok(());
364 }
365
366 let Some(embedder) = self.embedder.as_ref() else {
367 pending_embeddings.clear();
368 return Ok(());
369 };
370
371 let pending = std::mem::take(pending_embeddings);
372 let result = self.embed_preflighted_pending_batch_with_partial_failures(
373 embedder.as_ref(),
374 pending,
375 report,
376 failed_docs,
377 )?;
378 failed_chunk_ids.extend(result.failed_chunk_ids);
379 if result.embeddings.is_empty() {
380 return Ok(());
381 }
382
383 let model = self.embedding_model_key();
384 self.store_chunk_embeddings(model.as_str(), result.embeddings, report)
385 }
386
387 fn document_embedding_batch_window(&self) -> usize {
388 self.embedder
389 .as_ref()
390 .map(|embedder| embedder.preferred_document_batch_window())
391 .unwrap_or(crate::models::DEFAULT_DOCUMENT_BATCH_WINDOW)
392 .max(1)
393 }
394
395 fn embed_preflighted_pending_batch_with_partial_failures(
396 &self,
397 embedder: &dyn crate::models::Embedder,
398 pending: Vec<PendingChunkEmbedding>,
399 report: &mut UpdateReport,
400 failed_docs: &mut HashSet<UpdateDocKey>,
401 ) -> Result<EmbeddedPendingBatch> {
402 if pending.is_empty() {
403 return Ok(EmbeddedPendingBatch::default());
404 }
405
406 let texts = pending
407 .iter()
408 .map(|embedding| embedding.embedding_text.clone())
409 .collect::<Vec<_>>();
410 crate::profile::increment_update_count("embedding_batch_calls", 1);
411 crate::profile::increment_update_count("embedding_input_texts", texts.len() as u64);
412 match crate::profile::timed_update_stage("embedding_http", || {
413 embedder.embed_batch(crate::models::EmbeddingInputKind::Document, &texts)
414 }) {
415 Ok(vectors) => {
416 if let Some(detail) = invalid_pending_embedding_batch_detail(&pending, &vectors) {
417 return self.split_pending_embedding_batch(
418 embedder,
419 pending,
420 report,
421 failed_docs,
422 detail,
423 );
424 }
425
426 let embeddings = pending
427 .into_iter()
428 .zip(vectors)
429 .map(|(embedding, vector)| (embedding.chunk_id, embedding.space_name, vector))
430 .collect::<Vec<_>>();
431 Ok(EmbeddedPendingBatch {
432 embeddings,
433 failed_chunk_ids: Vec::new(),
434 })
435 }
436 Err(err) => self.split_pending_embedding_batch(
437 embedder,
438 pending,
439 report,
440 failed_docs,
441 err.to_string(),
442 ),
443 }
444 }
445
446 fn split_pending_embedding_batch(
447 &self,
448 embedder: &dyn crate::models::Embedder,
449 pending: Vec<PendingChunkEmbedding>,
450 report: &mut UpdateReport,
451 failed_docs: &mut HashSet<UpdateDocKey>,
452 detail: String,
453 ) -> Result<EmbeddedPendingBatch> {
454 if pending.len() == 1 {
455 let embedding = pending
456 .into_iter()
457 .next()
458 .expect("single pending embedding should exist");
459 report.errors.push(file_error(
460 Some(embedding.path),
461 format!("embed failed: {detail}"),
462 ));
463 failed_docs.insert(embedding.doc_key);
464 return Ok(EmbeddedPendingBatch {
465 embeddings: Vec::new(),
466 failed_chunk_ids: vec![embedding.chunk_id],
467 });
468 }
469
470 let mid = pending.len() / 2;
471 let mut right = pending;
472 let left = right.drain(..mid).collect::<Vec<_>>();
473
474 let mut left_result = self.embed_preflighted_pending_batch_with_partial_failures(
475 embedder,
476 left,
477 report,
478 failed_docs,
479 )?;
480 let right_result = self.embed_preflighted_pending_batch_with_partial_failures(
481 embedder,
482 right,
483 report,
484 failed_docs,
485 )?;
486 left_result.embeddings.extend(right_result.embeddings);
487 left_result
488 .failed_chunk_ids
489 .extend(right_result.failed_chunk_ids);
490 Ok(left_result)
491 }
492
493 fn store_chunk_embeddings(
494 &self,
495 model: &str,
496 embeddings: Vec<(i64, String, Vec<f32>)>,
497 report: &mut UpdateReport,
498 ) -> Result<()> {
499 let mut grouped_vectors: HashMap<String, Vec<(i64, Vec<f32>)>> = HashMap::new();
500 let mut embedding_rows = Vec::with_capacity(embeddings.len());
501 for (chunk_id, space_name, vector) in embeddings {
502 if vector.is_empty() {
503 return Err(KboltError::Inference(format!(
504 "embedder returned an empty vector for chunk {chunk_id}"
505 ))
506 .into());
507 }
508
509 grouped_vectors
510 .entry(space_name)
511 .or_default()
512 .push((chunk_id, vector));
513 embedding_rows.push((chunk_id, model));
514 }
515
516 for (space, vectors) in grouped_vectors {
517 let refs = vectors
518 .iter()
519 .map(|(chunk_id, vector)| (*chunk_id, vector.as_slice()))
520 .collect::<Vec<_>>();
521 crate::profile::increment_update_count("usearch_vectors", refs.len() as u64);
522 self.storage.batch_insert_usearch_deferred(&space, &refs)?;
523 }
524
525 crate::profile::timed_update_stage("sqlite_insert_embeddings", || {
526 self.storage.insert_embeddings(&embedding_rows)
527 })?;
528 report.embedded_chunks = report.embedded_chunks.saturating_add(embedding_rows.len());
529 Ok(())
530 }
531
532 fn preflight_pending_embeddings(
533 &self,
534 pending: Vec<PendingChunkEmbedding>,
535 report: &mut UpdateReport,
536 failed_docs: &mut HashSet<UpdateDocKey>,
537 failed_chunk_ids: &mut Vec<i64>,
538 ) -> Result<Vec<PendingChunkEmbedding>> {
539 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
540 return Ok(pending);
541 };
542
543 let mut accepted = Vec::with_capacity(pending.len());
544 for embedding in pending {
545 match check_embedding_preflight(
546 sizer.as_ref(),
547 &embedding.embedding_text,
548 embedding.max_document_tokens,
549 ) {
550 EmbeddingPreflightDecision::Accept => {
551 accepted.push(embedding);
552 }
553 EmbeddingPreflightDecision::Reject(detail) => {
554 report
555 .errors
556 .push(file_error(Some(embedding.path.clone()), detail));
557 failed_docs.insert(embedding.doc_key);
558 failed_chunk_ids.push(embedding.chunk_id);
559 }
560 }
561 }
562
563 Ok(accepted)
564 }
565
566 fn preflight_prepared_embeddings(
567 &self,
568 prepared: Vec<PreparedChunkEmbedding>,
569 report: &mut UpdateReport,
570 failed_docs: &mut HashSet<UpdateDocKey>,
571 ) -> Result<PreparedEmbeddingPreflight> {
572 let Some(sizer) = self.embedding_document_sizer.as_ref() else {
573 return Ok(PreparedEmbeddingPreflight {
574 accepted: prepared,
575 rejected_chunk_indexes: Vec::new(),
576 });
577 };
578
579 let mut preflight = PreparedEmbeddingPreflight {
580 accepted: Vec::with_capacity(prepared.len()),
581 rejected_chunk_indexes: Vec::new(),
582 };
583 for embedding in prepared {
584 match check_embedding_preflight(
585 sizer.as_ref(),
586 &embedding.embedding_text,
587 embedding.max_document_tokens,
588 ) {
589 EmbeddingPreflightDecision::Accept => {
590 preflight.accepted.push(embedding);
591 }
592 EmbeddingPreflightDecision::Reject(detail) => {
593 report
594 .errors
595 .push(file_error(Some(embedding.path.clone()), detail));
596 failed_docs.insert(embedding.doc_key);
597 preflight.rejected_chunk_indexes.push(embedding.chunk_index);
598 }
599 }
600 }
601
602 Ok(preflight)
603 }
604
605 fn replay_fts_dirty_documents(
606 &self,
607 repair_scope: &UpdateRepairScope,
608 report: &mut UpdateReport,
609 failed_docs: &mut HashSet<UpdateDocKey>,
610 ) -> Result<()> {
611 let records = self.get_fts_dirty_documents_for_scope(repair_scope)?;
612 if records.is_empty() {
613 return Ok(());
614 }
615
616 let mut cleared_by_space: HashMap<String, Vec<i64>> = HashMap::new();
617 for record in records {
618 let space_name = record.space_name;
619 let doc_id = record.doc_id;
620
621 if record.chunks.is_empty() {
622 self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
623 cleared_by_space.entry(space_name).or_default().push(doc_id);
624 continue;
625 }
626
627 let document_text = match self.storage.get_document_text(doc_id) {
628 Ok(row) => row,
629 Err(err) => {
630 failed_docs.insert(update_doc_key(
631 &space_name,
632 &record.collection_path,
633 &record.doc_path,
634 ));
635 report.errors.push(file_error(
636 Some(record.collection_path.join(&record.doc_path)),
637 format!("fts replay canonical text load failed: {err}"),
638 ));
639 continue;
640 }
641 };
642 let policy = resolve_policy(
643 &self.config.chunking,
644 Some(document_text.extractor_key.as_str()),
645 None,
646 );
647
648 let entries = record
649 .chunks
650 .iter()
651 .map(|chunk| -> Result<TantivyEntry> {
652 let chunk_text = crate::storage::chunk_text_from_canonical(
653 document_text.text.as_str(),
654 chunk,
655 )?;
656 Ok(TantivyEntry {
657 chunk_id: chunk.id,
658 doc_id,
659 filepath: record.doc_path.clone(),
660 title: record.doc_title.clone(),
661 heading: chunk.heading.clone(),
662 body: render_bm25_document(
663 ChunkRetrievalContext {
664 body: chunk_text.as_str(),
665 retrieval_prefix: chunk.retrieval_prefix.as_deref(),
666 title: record.doc_title.as_deref(),
667 heading: chunk.heading.as_deref(),
668 },
669 policy.contextual_prefix,
670 ),
671 })
672 })
673 .collect::<Result<Vec<_>>>()?;
674
675 self.storage.delete_tantivy_by_doc(&space_name, doc_id)?;
676 self.storage.index_tantivy(&space_name, &entries)?;
677 cleared_by_space.entry(space_name).or_default().push(doc_id);
678 }
679
680 for (space_name, mut doc_ids) in cleared_by_space {
681 if doc_ids.is_empty() {
682 continue;
683 }
684
685 doc_ids.sort_unstable();
686 doc_ids.dedup();
687 self.storage.commit_tantivy(&space_name)?;
688 self.storage.batch_clear_fts_dirty(&doc_ids)?;
689 }
690
691 Ok(())
692 }
693
694 fn get_fts_dirty_documents_for_scope(
695 &self,
696 repair_scope: &UpdateRepairScope,
697 ) -> Result<Vec<crate::storage::FtsDirtyRecord>> {
698 match repair_scope {
699 UpdateRepairScope::Global => self.storage.get_fts_dirty_documents(),
700 UpdateRepairScope::Space { space_id } => {
701 self.storage.get_fts_dirty_documents_in_space(*space_id)
702 }
703 UpdateRepairScope::Collections { collection_ids } => self
704 .storage
705 .get_fts_dirty_documents_in_collections(collection_ids),
706 }
707 }
708
709 fn get_unembedded_chunks_for_scope(
710 &self,
711 model: &str,
712 repair_scope: &UpdateRepairScope,
713 after_chunk_id: i64,
714 limit: usize,
715 ) -> Result<Vec<crate::storage::EmbedRecord>> {
716 match repair_scope {
717 UpdateRepairScope::Global => {
718 self.storage
719 .get_unembedded_chunks(model, after_chunk_id, limit)
720 }
721 UpdateRepairScope::Space { space_id } => {
722 self.storage
723 .get_unembedded_chunks_in_space(model, *space_id, after_chunk_id, limit)
724 }
725 UpdateRepairScope::Collections { collection_ids } => self
726 .storage
727 .get_unembedded_chunks_in_collections(model, collection_ids, after_chunk_id, limit),
728 }
729 }
730
731 fn list_reapable_documents_for_scope(
732 &self,
733 older_than_days: u32,
734 repair_scope: &UpdateRepairScope,
735 ) -> Result<Vec<crate::storage::ReapableDocument>> {
736 match repair_scope {
737 UpdateRepairScope::Global => self.storage.list_reapable_documents(older_than_days),
738 UpdateRepairScope::Space { space_id } => self
739 .storage
740 .list_reapable_documents_in_space(older_than_days, *space_id),
741 UpdateRepairScope::Collections { collection_ids } => self
742 .storage
743 .list_reapable_documents_in_collections(older_than_days, collection_ids),
744 }
745 }
746
747 pub fn resolve_update_targets(&self, options: &UpdateOptions) -> Result<Vec<UpdateTarget>> {
748 self.resolve_targets(TargetScope {
749 space: options.space.as_deref(),
750 collections: &options.collections,
751 })
752 }
753
754 pub(super) fn resolve_targets(&self, scope: TargetScope<'_>) -> Result<Vec<UpdateTarget>> {
755 let mut targets = Vec::new();
756
757 if scope.collections.is_empty() {
758 return self.resolve_update_targets_for_all_collections(scope.space);
759 }
760
761 let mut seen = std::collections::HashSet::new();
762 for raw_collection_name in scope.collections {
763 let collection_name = raw_collection_name.trim();
764 if collection_name.is_empty() {
765 return Err(KboltError::InvalidInput(
766 "collection names cannot be empty".to_string(),
767 )
768 .into());
769 }
770
771 let resolved_space = self.resolve_space_row(scope.space, Some(collection_name))?;
772 let collection = self
773 .storage
774 .get_collection(resolved_space.id, collection_name)?;
775
776 if seen.insert((collection.space_id, collection.name.clone())) {
777 targets.push(UpdateTarget {
778 space: resolved_space.name,
779 collection,
780 });
781 }
782 }
783
784 Ok(targets)
785 }
786
787 fn resolve_update_targets_for_all_collections(
788 &self,
789 space: Option<&str>,
790 ) -> Result<Vec<UpdateTarget>> {
791 let (space_id_filter, spaces_by_id) = if let Some(space_name) = space {
792 let resolved = self.resolve_space_row(Some(space_name), None)?;
793 let mut map = std::collections::HashMap::new();
794 map.insert(resolved.id, resolved.name.clone());
795 (Some(resolved.id), map)
796 } else {
797 let spaces = self.storage.list_spaces()?;
798 let map = spaces
799 .into_iter()
800 .map(|space| (space.id, space.name))
801 .collect::<std::collections::HashMap<_, _>>();
802 (None, map)
803 };
804
805 let collections = self.storage.list_collections(space_id_filter)?;
806 let mut targets = Vec::with_capacity(collections.len());
807 for collection in collections {
808 let space_name = spaces_by_id
809 .get(&collection.space_id)
810 .ok_or_else(|| {
811 KboltError::Internal(format!(
812 "missing space mapping for collection '{}'",
813 collection.name
814 ))
815 })?
816 .clone();
817 targets.push(UpdateTarget {
818 space: space_name,
819 collection,
820 });
821 }
822
823 Ok(targets)
824 }
825
826 fn update_collection_target(
827 &self,
828 target: &UpdateTarget,
829 options: &UpdateOptions,
830 report: &mut UpdateReport,
831 fts_dirty_by_space: &mut HashMap<String, HashSet<i64>>,
832 pending_embeddings: &mut Vec<PendingChunkEmbedding>,
833 failed_chunk_ids: &mut HashSet<i64>,
834 failed_docs: &mut HashSet<UpdateDocKey>,
835 ) -> Result<()> {
836 let all_documents = crate::profile::timed_update_stage("sqlite_list_documents", || {
837 self.storage.list_documents(target.collection.id, false)
838 })?;
839 let document_ids = all_documents.iter().map(|doc| doc.id).collect::<Vec<_>>();
840 let document_text_generations =
841 crate::profile::timed_update_stage("sqlite_list_document_text_generations", || {
842 self.storage
843 .get_document_text_generation_keys(&document_ids)
844 })?;
845 let mut docs_by_path: HashMap<String, DocumentRow> = all_documents
846 .into_iter()
847 .map(|doc| (doc.path.clone(), doc))
848 .collect();
849 let mut seen_paths = HashSet::new();
850 let extension_filter = normalized_extension_filter(target.collection.extensions.as_deref());
851 let ignore_matcher = load_collection_ignore_matcher(
852 &self.config.config_dir,
853 &target.collection.path,
854 &target.space,
855 &target.collection.name,
856 )?;
857 let extractor_registry = default_registry();
858 let embedding_batch_window = self.document_embedding_batch_window();
859 let mut touched_collection = false;
860 let mut failed_walk_prefixes = Vec::new();
861 let mut collection_walk_incomplete = false;
862
863 for entry in super::ignore_helpers::build_collection_walk(&target.collection.path) {
864 let entry = match entry {
865 Ok(item) => item,
866 Err(err) => {
867 let error_scope = collect_walk_error_scope(&err);
868 let error_path = error_scope.paths.first().cloned();
869 if error_scope.collection_incomplete || error_scope.paths.is_empty() {
870 collection_walk_incomplete = true;
871 }
872 for path in &error_scope.paths {
873 let failed_path =
874 match collection_relative_path(&target.collection.path, path) {
875 Ok(relative) => {
876 if relative.is_empty() || relative == "." {
877 collection_walk_incomplete = true;
878 } else {
879 failed_walk_prefixes.push(relative.clone());
880 }
881 relative
882 }
883 Err(_) => {
884 collection_walk_incomplete = true;
885 path.display().to_string()
886 }
887 };
888 failed_docs.insert(update_doc_key(
889 &target.space,
890 &target.collection.path,
891 &failed_path,
892 ));
893 }
894 report
895 .errors
896 .push(file_error(error_path, format!("walk error: {err}")));
897 continue;
898 }
899 };
900
901 if !entry
902 .file_type()
903 .is_some_and(|file_type| file_type.is_file())
904 {
905 continue;
906 }
907
908 if is_hard_ignored_file(entry.path()) {
909 continue;
910 }
911
912 let relative_path =
913 match collection_relative_path(&target.collection.path, entry.path()) {
914 Ok(path) => path,
915 Err(err) => {
916 failed_docs.insert(update_doc_key(
917 &target.space,
918 &target.collection.path,
919 &entry.path().display().to_string(),
920 ));
921 report.errors.push(file_error(
922 Some(entry.path().to_path_buf()),
923 err.to_string(),
924 ));
925 continue;
926 }
927 };
928
929 if !extension_allowed(entry.path(), extension_filter.as_ref()) {
930 push_update_decision(
931 report,
932 options,
933 target,
934 &relative_path,
935 UpdateDecisionKind::Unsupported,
936 Some("extension not allowed".to_string()),
937 );
938 continue;
939 }
940
941 if let Some(matcher) = ignore_matcher.as_ref() {
942 if matcher
943 .matched(Path::new(&relative_path), false)
944 .is_ignore()
945 {
946 push_update_decision(
947 report,
948 options,
949 target,
950 &relative_path,
951 UpdateDecisionKind::Ignored,
952 Some("matched ignore patterns".to_string()),
953 );
954 continue;
955 }
956 }
957
958 let Some(extractor) = extractor_registry.resolve_for_path(entry.path()) else {
959 push_update_decision(
960 report,
961 options,
962 target,
963 &relative_path,
964 UpdateDecisionKind::Unsupported,
965 Some("no extractor available".to_string()),
966 );
967 continue;
968 };
969 let policy = resolve_policy(&self.config.chunking, Some(extractor.profile_key()), None);
970 let generation_key =
971 ingestion_generation_key(extractor.profile_key(), extractor.version(), &policy);
972
973 report.scanned_docs += 1;
974 crate::profile::increment_update_count("docs_scanned", 1);
975 seen_paths.insert(relative_path.clone());
976
977 let scan_started = Instant::now();
978 let metadata = match entry.metadata() {
979 Ok(data) => data,
980 Err(err) => {
981 let detail = format!("metadata error: {err}");
982 failed_docs.insert(update_doc_key(
983 &target.space,
984 &target.collection.path,
985 &relative_path,
986 ));
987 push_update_decision(
988 report,
989 options,
990 target,
991 &relative_path,
992 UpdateDecisionKind::ReadFailed,
993 Some(detail.clone()),
994 );
995 report
996 .errors
997 .push(file_error(Some(entry.path().to_path_buf()), detail));
998 crate::profile::record_update_stage(
999 "scan_metadata_mtime",
1000 scan_started.elapsed(),
1001 );
1002 continue;
1003 }
1004 };
1005
1006 let modified = match modified_token(&metadata) {
1007 Ok(value) => value,
1008 Err(err) => {
1009 let detail = format!("modified timestamp error: {err}");
1010 failed_docs.insert(update_doc_key(
1011 &target.space,
1012 &target.collection.path,
1013 &relative_path,
1014 ));
1015 push_update_decision(
1016 report,
1017 options,
1018 target,
1019 &relative_path,
1020 UpdateDecisionKind::ReadFailed,
1021 Some(detail.clone()),
1022 );
1023 report
1024 .errors
1025 .push(file_error(Some(entry.path().to_path_buf()), detail));
1026 crate::profile::record_update_stage(
1027 "scan_metadata_mtime",
1028 scan_started.elapsed(),
1029 );
1030 continue;
1031 }
1032 };
1033
1034 if let Some(existing) = docs_by_path.get(&relative_path) {
1035 let has_current_canonical_text = document_has_current_canonical_text(
1036 &document_text_generations,
1037 existing.id,
1038 generation_key.as_str(),
1039 );
1040 if existing.active && existing.modified == modified && has_current_canonical_text {
1041 report.skipped_mtime_docs += 1;
1042 push_update_decision(
1043 report,
1044 options,
1045 target,
1046 &relative_path,
1047 UpdateDecisionKind::SkippedMtime,
1048 None,
1049 );
1050 crate::profile::record_update_stage(
1051 "scan_metadata_mtime",
1052 scan_started.elapsed(),
1053 );
1054 continue;
1055 }
1056 }
1057 crate::profile::record_update_stage("scan_metadata_mtime", scan_started.elapsed());
1058
1059 let read_hash_started = Instant::now();
1060 let bytes = match std::fs::read(entry.path()) {
1061 Ok(data) => data,
1062 Err(err) => {
1063 let detail = err.to_string();
1064 failed_docs.insert(update_doc_key(
1065 &target.space,
1066 &target.collection.path,
1067 &relative_path,
1068 ));
1069 push_update_decision(
1070 report,
1071 options,
1072 target,
1073 &relative_path,
1074 UpdateDecisionKind::ReadFailed,
1075 Some(detail.clone()),
1076 );
1077 report
1078 .errors
1079 .push(file_error(Some(entry.path().to_path_buf()), detail));
1080 crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1081 continue;
1082 }
1083 };
1084 let hash = sha256_hex(&bytes);
1085 crate::profile::record_update_stage("read_hash", read_hash_started.elapsed());
1086 let mut title = None::<String>;
1087
1088 let existing = docs_by_path.get(&relative_path).cloned();
1089 let pending_decision;
1090 let pending_indexing;
1091 if let Some(doc) = existing.as_ref() {
1092 let has_current_canonical_text = document_has_current_canonical_text(
1093 &document_text_generations,
1094 doc.id,
1095 generation_key.as_str(),
1096 );
1097 if doc.hash == hash && has_current_canonical_text {
1098 if doc.active {
1099 report.skipped_hash_docs += 1;
1100 push_update_decision(
1101 report,
1102 options,
1103 target,
1104 &relative_path,
1105 UpdateDecisionKind::SkippedHash,
1106 None,
1107 );
1108 } else {
1109 report.reactivated_docs += 1;
1110 push_update_decision(
1111 report,
1112 options,
1113 target,
1114 &relative_path,
1115 UpdateDecisionKind::Reactivated,
1116 None,
1117 );
1118 }
1119
1120 if !options.dry_run {
1121 crate::profile::timed_update_stage("sqlite_refresh_document", || {
1122 self.storage.refresh_document_activity(doc.id, &modified)
1123 })?;
1124 }
1125 continue;
1126 }
1127
1128 pending_indexing = PendingDocumentIndexing::Updated {
1129 reactivated: !doc.active,
1130 };
1131 pending_decision = (
1132 UpdateDecisionKind::Changed,
1133 (!doc.active).then_some("reactivated".to_string()),
1134 );
1135 } else {
1136 pending_indexing = PendingDocumentIndexing::Added;
1137 pending_decision = (UpdateDecisionKind::New, None);
1138 }
1139
1140 if options.dry_run {
1141 pending_indexing.record(report);
1142 let (kind, detail) = pending_decision;
1143 push_update_decision(report, options, target, &relative_path, kind, detail);
1144 continue;
1145 }
1146
1147 let extracted = match crate::profile::timed_update_stage("extract", || {
1148 extractor.extract(entry.path(), &bytes)
1149 }) {
1150 Ok(document) => document,
1151 Err(err) => {
1152 let detail = format!("extract failed: {err}");
1153 failed_docs.insert(update_doc_key(
1154 &target.space,
1155 &target.collection.path,
1156 &relative_path,
1157 ));
1158 push_update_decision(
1159 report,
1160 options,
1161 target,
1162 &relative_path,
1163 UpdateDecisionKind::ExtractFailed,
1164 Some(detail.clone()),
1165 );
1166 report
1167 .errors
1168 .push(file_error(Some(entry.path().to_path_buf()), detail));
1169 continue;
1170 }
1171 };
1172 if let Some(extracted_title) = extracted
1173 .title
1174 .as_deref()
1175 .map(str::trim)
1176 .filter(|title| !title.is_empty())
1177 {
1178 title = Some(extracted_title.to_string());
1179 }
1180
1181 let canonical = crate::profile::timed_update_stage("canonical_text", || {
1182 build_canonical_document(&extracted)
1183 });
1184 let text_hash = sha256_hex(canonical.text.as_bytes());
1185 let max_document_tokens = effective_chunk_hard_max(&policy);
1186 let chunk_started = Instant::now();
1187 let final_chunks_result = match self.embedding_document_sizer.as_ref() {
1188 Some(sizer) => {
1189 let sizer_counter = EmbeddingDocumentSizerCounter {
1190 inner: sizer.as_ref(),
1191 };
1192 chunk_canonical_document_with_counter(
1193 &canonical.document,
1194 &policy,
1195 &sizer_counter,
1196 )
1197 }
1198 None => Ok(chunk_canonical_document(&canonical.document, &policy)),
1199 };
1200 crate::profile::record_update_stage("chunk", chunk_started.elapsed());
1201 let final_chunks = match final_chunks_result {
1202 Ok(chunks) => chunks,
1203 Err(err) => {
1204 let detail = format!("chunking failed: {err}");
1205 failed_docs.insert(update_doc_key(
1206 &target.space,
1207 &target.collection.path,
1208 &relative_path,
1209 ));
1210 push_update_decision(
1211 report,
1212 options,
1213 target,
1214 &relative_path,
1215 UpdateDecisionKind::ExtractFailed,
1216 Some(detail.clone()),
1217 );
1218 report
1219 .errors
1220 .push(file_error(Some(entry.path().to_path_buf()), detail));
1221 continue;
1222 }
1223 };
1224 crate::profile::increment_update_count("chunks_created", final_chunks.len() as u64);
1225
1226 let doc_key = update_doc_key(&target.space, &target.collection.path, &relative_path);
1227 let chunk_inserts = final_chunks
1228 .iter()
1229 .enumerate()
1230 .map(|(index, chunk)| ChunkInsert {
1231 seq: index as i32,
1232 offset: chunk.offset,
1233 length: chunk.length,
1234 heading: chunk.heading.clone(),
1235 kind: chunk.kind,
1236 retrieval_prefix: chunk.retrieval_prefix.clone(),
1237 })
1238 .collect::<Vec<_>>();
1239 let mut prepared_embeddings = Vec::new();
1240 if !chunk_inserts.is_empty() && !options.no_embed && self.embedder.is_some() {
1241 let prepared = crate::profile::timed_update_stage("embedding_prepare_text", || {
1242 prepare_chunk_embeddings(
1243 &final_chunks,
1244 &doc_key,
1245 target,
1246 entry.path(),
1247 max_document_tokens,
1248 title.as_deref(),
1249 self.embedding_input_formatter(),
1250 )
1251 });
1252 let preflight =
1253 self.preflight_prepared_embeddings(prepared, report, failed_docs)?;
1254 prepared_embeddings = preflight.accepted;
1255 if !preflight.rejected_chunk_indexes.is_empty() {
1256 push_update_decision(
1257 report,
1258 options,
1259 target,
1260 &relative_path,
1261 UpdateDecisionKind::ExtractFailed,
1262 Some("embed preflight failed".to_string()),
1263 );
1264 continue;
1265 }
1266 }
1267
1268 let replacement =
1269 crate::profile::timed_update_stage("sqlite_replace_document_generation", || {
1270 self.storage
1271 .replace_document_generation(DocumentGenerationReplace {
1272 collection_id: target.collection.id,
1273 path: &relative_path,
1274 title: title.as_deref(),
1275 hash: &hash,
1276 modified: &modified,
1277 extractor_key: extractor.profile_key(),
1278 source_hash: &hash,
1279 text_hash: &text_hash,
1280 generation_key: &generation_key,
1281 text: canonical.text.as_str(),
1282 chunks: &chunk_inserts,
1283 })
1284 })?;
1285 let doc_id = replacement.doc_id;
1286 let chunk_ids = replacement.chunk_ids;
1287
1288 if !replacement.old_chunk_ids.is_empty() {
1289 crate::profile::timed_update_stage("tantivy_delete", || {
1290 self.storage
1291 .delete_tantivy(&target.space, &replacement.old_chunk_ids)
1292 })?;
1293 crate::profile::timed_update_stage("usearch_delete", || {
1294 self.storage
1295 .delete_usearch_deferred(&target.space, &replacement.old_chunk_ids)
1296 })?;
1297 }
1298
1299 fts_dirty_by_space
1300 .entry(target.space.clone())
1301 .or_default()
1302 .insert(doc_id);
1303
1304 if !chunk_ids.is_empty() {
1305 if !options.no_embed && self.embedder.is_some() {
1306 for prepared in prepared_embeddings {
1307 pending_embeddings.push(PendingChunkEmbedding {
1308 chunk_id: chunk_ids[prepared.chunk_index],
1309 doc_key: prepared.doc_key,
1310 space_name: prepared.space_name,
1311 path: prepared.path,
1312 embedding_text: prepared.embedding_text,
1313 max_document_tokens: prepared.max_document_tokens,
1314 });
1315 }
1316 if pending_embeddings.len() >= embedding_batch_window {
1317 self.flush_buffered_embeddings(
1318 pending_embeddings,
1319 failed_chunk_ids,
1320 report,
1321 failed_docs,
1322 )?;
1323 }
1324 }
1325
1326 let entries = chunk_ids
1327 .iter()
1328 .zip(final_chunks.iter())
1329 .map(|(chunk_id, chunk)| TantivyEntry {
1330 chunk_id: *chunk_id,
1331 doc_id,
1332 filepath: relative_path.clone(),
1333 title: title.clone(),
1334 heading: chunk.heading.clone(),
1335 body: render_bm25_document(
1336 ChunkRetrievalContext {
1337 body: chunk.text.as_str(),
1338 retrieval_prefix: chunk.retrieval_prefix.as_deref(),
1339 title: title.as_deref(),
1340 heading: chunk.heading.as_deref(),
1341 },
1342 policy.contextual_prefix,
1343 ),
1344 })
1345 .collect::<Vec<_>>();
1346 crate::profile::increment_update_count("tantivy_entries", entries.len() as u64);
1347 crate::profile::timed_update_stage("tantivy_add", || {
1348 self.storage.index_tantivy(&target.space, &entries)
1349 })?;
1350 }
1351
1352 docs_by_path.insert(
1353 relative_path.clone(),
1354 crate::profile::timed_update_stage("sqlite_get_document_after_write", || {
1355 self.storage
1356 .get_document_by_path(target.collection.id, &relative_path)
1357 })?
1358 .ok_or_else(|| {
1359 KboltError::Internal(format!(
1360 "document missing after upsert: collection={}, path={relative_path}",
1361 target.collection.id
1362 ))
1363 })?,
1364 );
1365 pending_indexing.record(report);
1366 let (kind, detail) = pending_decision;
1367 push_update_decision(report, options, target, &relative_path, kind, detail);
1368 touched_collection = true;
1369 }
1370
1371 let mut missing_docs = docs_by_path
1372 .values()
1373 .filter(|doc| {
1374 doc.active
1375 && !seen_paths.contains(&doc.path)
1376 && !path_is_under_failed_walk(
1377 doc.path.as_str(),
1378 &failed_walk_prefixes,
1379 collection_walk_incomplete,
1380 )
1381 })
1382 .cloned()
1383 .collect::<Vec<_>>();
1384 missing_docs.sort_by(|left, right| left.path.cmp(&right.path));
1385
1386 for doc in missing_docs {
1387 if doc.active && !seen_paths.contains(&doc.path) {
1388 report.deactivated_docs += 1;
1389 push_update_decision(
1390 report,
1391 options,
1392 target,
1393 &doc.path,
1394 UpdateDecisionKind::Deactivated,
1395 None,
1396 );
1397 if !options.dry_run {
1398 crate::profile::timed_update_stage("sqlite_deactivate_document", || {
1399 self.storage.deactivate_document(doc.id)
1400 })?;
1401 touched_collection = true;
1402 }
1403 }
1404 }
1405
1406 if touched_collection && !options.dry_run {
1407 crate::profile::timed_update_stage("sqlite_update_collection_timestamp", || {
1408 self.storage
1409 .update_collection_timestamp(target.collection.id)
1410 })?;
1411 }
1412
1413 Ok(())
1414 }
1415}
1416
1417#[derive(Debug, Default)]
1418struct WalkErrorScope {
1419 paths: Vec<std::path::PathBuf>,
1420 collection_incomplete: bool,
1421}
1422
1423fn collect_walk_error_scope(err: &ignore::Error) -> WalkErrorScope {
1424 let mut scope = WalkErrorScope::default();
1425 collect_walk_error_scope_into(err, &mut scope, false);
1426 scope
1427}
1428
1429fn collect_walk_error_scope_into(
1430 err: &ignore::Error,
1431 scope: &mut WalkErrorScope,
1432 has_path_context: bool,
1433) {
1434 match err {
1435 ignore::Error::Partial(errors) => {
1436 if errors.is_empty() {
1437 scope.collection_incomplete = true;
1438 }
1439 for err in errors {
1440 collect_walk_error_scope_into(err, scope, has_path_context);
1441 }
1442 }
1443 ignore::Error::WithLineNumber { err, .. } | ignore::Error::WithDepth { err, .. } => {
1444 collect_walk_error_scope_into(err, scope, has_path_context);
1445 }
1446 ignore::Error::WithPath { path, err } => {
1447 scope.paths.push(path.clone());
1448 collect_walk_error_scope_into(err, scope, true);
1449 }
1450 ignore::Error::Loop { child, .. } => {
1451 scope.paths.push(child.clone());
1452 }
1453 ignore::Error::Io(_) => {
1454 if !has_path_context {
1455 scope.collection_incomplete = true;
1456 }
1457 }
1458 ignore::Error::Glob { .. }
1459 | ignore::Error::UnrecognizedFileType(_)
1460 | ignore::Error::InvalidDefinition => {
1461 scope.collection_incomplete = true;
1462 }
1463 }
1464}
1465
1466fn path_is_under_failed_walk(
1467 doc_path: &str,
1468 failed_walk_prefixes: &[String],
1469 collection_walk_incomplete: bool,
1470) -> bool {
1471 if collection_walk_incomplete {
1472 return true;
1473 }
1474
1475 failed_walk_prefixes.iter().any(|prefix| {
1476 prefix.is_empty()
1477 || prefix == "."
1478 || doc_path == prefix
1479 || doc_path
1480 .strip_prefix(prefix.as_str())
1481 .is_some_and(|suffix| suffix.starts_with('/'))
1482 })
1483}
1484
1485#[cfg(test)]
1486mod tests {
1487 use super::*;
1488 use std::io;
1489 use std::path::PathBuf;
1490
1491 #[test]
1492 fn walk_error_scope_collects_every_partial_path() {
1493 let err = ignore::Error::Partial(vec![
1494 ignore::Error::WithPath {
1495 path: PathBuf::from("/collection/blocked-a"),
1496 err: Box::new(ignore::Error::Io(io::Error::new(
1497 io::ErrorKind::PermissionDenied,
1498 "blocked",
1499 ))),
1500 },
1501 ignore::Error::WithDepth {
1502 depth: 2,
1503 err: Box::new(ignore::Error::Loop {
1504 ancestor: PathBuf::from("/collection"),
1505 child: PathBuf::from("/collection/blocked-b"),
1506 }),
1507 },
1508 ]);
1509
1510 let scope = collect_walk_error_scope(&err);
1511
1512 assert_eq!(
1513 scope.paths,
1514 vec![
1515 PathBuf::from("/collection/blocked-a"),
1516 PathBuf::from("/collection/blocked-b")
1517 ]
1518 );
1519 assert!(!scope.collection_incomplete);
1520 }
1521
1522 #[test]
1523 fn walk_error_scope_marks_pathless_errors_incomplete() {
1524 let err = ignore::Error::Partial(vec![
1525 ignore::Error::WithPath {
1526 path: PathBuf::from("/collection/blocked"),
1527 err: Box::new(ignore::Error::Io(io::Error::new(
1528 io::ErrorKind::PermissionDenied,
1529 "blocked",
1530 ))),
1531 },
1532 ignore::Error::Io(io::Error::new(io::ErrorKind::Other, "unknown")),
1533 ]);
1534
1535 let scope = collect_walk_error_scope(&err);
1536
1537 assert_eq!(scope.paths, vec![PathBuf::from("/collection/blocked")]);
1538 assert!(scope.collection_incomplete);
1539 }
1540
1541 #[test]
1542 fn walk_error_scope_treats_ignore_rule_errors_as_incomplete() {
1543 let err = ignore::Error::WithPath {
1544 path: PathBuf::from("/collection/.gitignore"),
1545 err: Box::new(ignore::Error::WithLineNumber {
1546 line: 4,
1547 err: Box::new(ignore::Error::Glob {
1548 glob: Some("[".to_string()),
1549 err: "invalid glob".to_string(),
1550 }),
1551 }),
1552 };
1553
1554 let scope = collect_walk_error_scope(&err);
1555
1556 assert_eq!(scope.paths, vec![PathBuf::from("/collection/.gitignore")]);
1557 assert!(scope.collection_incomplete);
1558 }
1559}
1560
1561#[derive(Debug)]
1562struct PreparedChunkEmbedding {
1563 chunk_index: usize,
1564 doc_key: UpdateDocKey,
1565 space_name: String,
1566 path: std::path::PathBuf,
1567 embedding_text: String,
1568 max_document_tokens: usize,
1569}
1570
1571#[derive(Debug, Default)]
1572struct PreparedEmbeddingPreflight {
1573 accepted: Vec<PreparedChunkEmbedding>,
1574 rejected_chunk_indexes: Vec<usize>,
1575}
1576
1577#[derive(Debug)]
1578struct PendingChunkEmbedding {
1579 chunk_id: i64,
1580 doc_key: UpdateDocKey,
1581 space_name: String,
1582 path: std::path::PathBuf,
1583 embedding_text: String,
1584 max_document_tokens: usize,
1585}
1586
1587#[derive(Default)]
1588struct EmbeddedPendingBatch {
1589 embeddings: Vec<(i64, String, Vec<f32>)>,
1590 failed_chunk_ids: Vec<i64>,
1591}
1592
1593#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1594struct UpdateDocKey {
1595 space: String,
1596 collection_path: std::path::PathBuf,
1597 path: String,
1598}
1599
1600struct EmbeddingDocumentSizerCounter<'a> {
1601 inner: &'a dyn crate::models::EmbeddingDocumentSizer,
1602}
1603
1604impl crate::ingest::chunk::TokenCounter for EmbeddingDocumentSizerCounter<'_> {
1605 fn count(&self, text: &str) -> Result<usize> {
1606 count_document_tokens_profiled(
1607 self.inner,
1608 text,
1609 "tokenize_chunking",
1610 "tokenize_chunking_calls",
1611 )
1612 }
1613
1614 fn fits_within_token_limit_by_byte_len(&self, byte_len: usize, max_tokens: usize) -> bool {
1615 let fits = self
1616 .inner
1617 .fits_within_token_limit_by_byte_len(byte_len, max_tokens);
1618 if fits {
1619 crate::profile::increment_update_count("tokenize_chunking_byte_skips", 1);
1620 }
1621 fits
1622 }
1623}
1624
1625fn count_document_tokens_profiled(
1626 sizer: &dyn crate::models::EmbeddingDocumentSizer,
1627 text: &str,
1628 stage: &'static str,
1629 count: &'static str,
1630) -> Result<usize> {
1631 let started = Instant::now();
1632 let result = sizer.count_document_tokens(text);
1633 crate::profile::record_update_stage(stage, started.elapsed());
1634 crate::profile::increment_update_count(count, 1);
1635 result
1636}
1637
1638enum EmbeddingPreflightDecision {
1639 Accept,
1640 Reject(String),
1641}
1642
1643fn check_embedding_preflight(
1644 sizer: &dyn crate::models::EmbeddingDocumentSizer,
1645 embedding_text: &str,
1646 max_document_tokens: usize,
1647) -> EmbeddingPreflightDecision {
1648 if sizer.fits_within_token_limit_by_byte_len(embedding_text.len(), max_document_tokens) {
1649 crate::profile::increment_update_count("tokenize_preflight_byte_skips", 1);
1650 return EmbeddingPreflightDecision::Accept;
1651 }
1652
1653 match count_document_tokens_profiled(
1654 sizer,
1655 embedding_text,
1656 "tokenize_preflight",
1657 "tokenize_preflight_calls",
1658 ) {
1659 Ok(token_count) if token_count <= max_document_tokens => EmbeddingPreflightDecision::Accept,
1660 Ok(token_count) => EmbeddingPreflightDecision::Reject(format!(
1661 "embed preflight failed: payload has {token_count} tokens, exceeding hard_max_tokens {max_document_tokens}"
1662 )),
1663 Err(err) => EmbeddingPreflightDecision::Reject(format!(
1664 "embed preflight token count failed: {err}"
1665 )),
1666 }
1667}
1668
1669fn prepare_chunk_embeddings(
1670 final_chunks: &[crate::ingest::chunk::FinalChunk],
1671 doc_key: &UpdateDocKey,
1672 target: &UpdateTarget,
1673 path: &Path,
1674 max_document_tokens: usize,
1675 title: Option<&str>,
1676 formatter: crate::models::EmbeddingInputFormatter,
1677) -> Vec<PreparedChunkEmbedding> {
1678 final_chunks
1679 .iter()
1680 .enumerate()
1681 .map(|(chunk_index, chunk)| {
1682 let dense_input = render_dense_document(ChunkRetrievalContext {
1683 body: chunk.text.as_str(),
1684 retrieval_prefix: chunk.retrieval_prefix.as_deref(),
1685 title,
1686 heading: chunk.heading.as_deref(),
1687 });
1688 let embedding_text =
1689 normalize_embedding_payload(formatter.format_document(&dense_input));
1690 PreparedChunkEmbedding {
1691 chunk_index,
1692 doc_key: doc_key.clone(),
1693 space_name: target.space.clone(),
1694 path: path.to_path_buf(),
1695 embedding_text,
1696 max_document_tokens,
1697 }
1698 })
1699 .collect()
1700}
1701
1702fn normalize_embedding_payload(text: String) -> String {
1703 if text.trim().is_empty() {
1704 " ".to_string()
1705 } else {
1706 text
1707 }
1708}
1709
1710#[derive(Debug, Clone)]
1711enum UpdateRepairScope {
1712 Global,
1713 Space { space_id: i64 },
1714 Collections { collection_ids: Vec<i64> },
1715}
1716
1717impl UpdateRepairScope {
1718 fn from_options_and_targets(options: &UpdateOptions, targets: &[UpdateTarget]) -> Self {
1719 if options.collections.is_empty() {
1720 if let Some(target) = targets.first() {
1721 if options.space.is_some() {
1722 return Self::Space {
1723 space_id: target.collection.space_id,
1724 };
1725 }
1726 }
1727 return Self::Global;
1728 }
1729
1730 let mut collection_ids = targets
1731 .iter()
1732 .map(|target| target.collection.id)
1733 .collect::<Vec<_>>();
1734 collection_ids.sort_unstable();
1735 collection_ids.dedup();
1736 Self::Collections { collection_ids }
1737 }
1738
1739 fn allows_space_dense_repair(&self) -> bool {
1740 !matches!(self, Self::Collections { .. })
1741 }
1742}
1743
1744#[derive(Debug, Clone, Copy)]
1745enum PendingDocumentIndexing {
1746 Added,
1747 Updated { reactivated: bool },
1748}
1749
1750impl PendingDocumentIndexing {
1751 fn record(self, report: &mut UpdateReport) {
1752 match self {
1753 Self::Added => {
1754 report.added_docs += 1;
1755 }
1756 Self::Updated { reactivated } => {
1757 report.updated_docs += 1;
1758 if reactivated {
1759 report.reactivated_docs += 1;
1760 }
1761 }
1762 }
1763 }
1764}
1765
1766fn document_has_current_canonical_text(
1767 document_text_generations: &HashMap<i64, String>,
1768 doc_id: i64,
1769 generation_key: &str,
1770) -> bool {
1771 document_text_generations
1772 .get(&doc_id)
1773 .is_some_and(|stored| stored == generation_key)
1774}
1775
1776fn finish_update_with_usearch_flush(
1777 update_result: Result<UpdateReport>,
1778 flush_result: Result<()>,
1779) -> Result<UpdateReport> {
1780 match (update_result, flush_result) {
1781 (Ok(report), Ok(())) => Ok(report),
1782 (Err(err), Ok(())) => Err(err),
1783 (Ok(_), Err(err)) => Err(err),
1784 (Err(update_err), Err(flush_err)) => Err(KboltError::Internal(format!(
1785 "update failed: {update_err}; additionally failed to save dirty vector indexes: {flush_err}"
1786 ))
1787 .into()),
1788 }
1789}
1790
1791fn invalid_pending_embedding_batch_detail(
1792 pending: &[PendingChunkEmbedding],
1793 vectors: &[Vec<f32>],
1794) -> Option<String> {
1795 if vectors.len() != pending.len() {
1796 return Some(format!(
1797 "embedder returned {} vectors for {} chunks",
1798 vectors.len(),
1799 pending.len()
1800 ));
1801 }
1802
1803 if vectors.iter().any(|vector| vector.is_empty()) {
1804 if pending.len() == 1 {
1805 return Some(format!(
1806 "embedder returned an empty vector for chunk {}",
1807 pending[0].chunk_id
1808 ));
1809 }
1810 return Some("embedder returned an empty vector".to_string());
1811 }
1812
1813 None
1814}
1815
1816fn push_update_decision(
1817 report: &mut UpdateReport,
1818 options: &UpdateOptions,
1819 target: &UpdateTarget,
1820 path: &str,
1821 kind: UpdateDecisionKind,
1822 detail: Option<String>,
1823) {
1824 if !options.verbose {
1825 return;
1826 }
1827
1828 report.decisions.push(UpdateDecision {
1829 space: target.space.clone(),
1830 collection: target.collection.name.clone(),
1831 path: path.to_string(),
1832 kind,
1833 detail,
1834 });
1835}
1836
1837fn update_doc_key(space: &str, collection_path: &Path, path: &str) -> UpdateDocKey {
1838 UpdateDocKey {
1839 space: space.to_string(),
1840 collection_path: collection_path.to_path_buf(),
1841 path: path.to_string(),
1842 }
1843}
1844
1845fn effective_chunk_hard_max(policy: &crate::config::ChunkPolicy) -> usize {
1846 policy
1847 .hard_max_tokens
1848 .max(policy.soft_max_tokens)
1849 .max(policy.target_tokens)
1850 .max(1)
1851}
1852
1853fn ingestion_generation_key(
1854 extractor_key: &str,
1855 extractor_version: u32,
1856 policy: &crate::config::ChunkPolicy,
1857) -> String {
1858 format!(
1859 "canonical=v{CANONICAL_TEXT_GENERATION};chunker=v{CHUNKER_GENERATION};extractor={extractor_key}:v{extractor_version};chunk=target:{}:soft:{}:hard:{}:overlap:{}:prefix:{}",
1860 policy.target_tokens,
1861 policy.soft_max_tokens,
1862 policy.hard_max_tokens,
1863 policy.boundary_overlap_tokens,
1864 policy.contextual_prefix
1865 )
1866}