1use std::fs::{self, OpenOptions};
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use ring::rand::{SecureRandom, SystemRandom};
33use serde::{Deserialize, Serialize};
34
35use super::policy::{
36 CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
37 SemanticAssetManifest as PolicyManifest, SemanticPolicy,
38};
39
40pub const MANIFEST_FORMAT_VERSION: u32 = 1;
45
46pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
48
49pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
51
52pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
53 let trimmed = recorded_path.trim();
54 if trimmed.is_empty() || trimmed != recorded_path {
55 return false;
56 }
57 let path = Path::new(recorded_path);
58 !path.is_absolute()
59 && path
60 .components()
61 .all(|component| matches!(component, Component::Normal(_)))
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
68#[serde(rename_all = "snake_case")]
69pub enum TierKind {
70 Fast,
71 Quality,
72}
73
74impl TierKind {
75 pub fn as_str(&self) -> &'static str {
76 match self {
77 Self::Fast => "fast",
78 Self::Quality => "quality",
79 }
80 }
81}
82
83#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum TierReadiness {
89 Ready,
91 Building { progress_pct: u8 },
93 Stale { reason: String },
95 Missing,
97 Incompatible { reason: String },
99}
100
101impl TierReadiness {
102 pub fn is_ready(&self) -> bool {
103 matches!(self, Self::Ready)
104 }
105
106 pub fn is_usable(&self) -> bool {
107 matches!(self, Self::Ready | Self::Stale { .. })
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
115pub struct ArtifactRecord {
116 pub tier: TierKind,
118 pub embedder_id: String,
120 pub model_revision: String,
122 pub schema_version: u32,
124 pub chunking_version: u32,
126 pub dimension: usize,
128 pub doc_count: u64,
130 pub conversation_count: u64,
132 pub db_fingerprint: String,
134 pub index_path: String,
136 pub size_bytes: u64,
138 pub started_at_ms: i64,
140 pub completed_at_ms: i64,
142 pub ready: bool,
144}
145
146impl ArtifactRecord {
147 pub fn to_policy_manifest(&self) -> PolicyManifest {
149 PolicyManifest {
150 embedder_id: self.embedder_id.clone(),
151 model_revision: self.model_revision.clone(),
152 schema_version: self.schema_version,
153 chunking_version: self.chunking_version,
154 doc_count: self.doc_count,
155 built_at_ms: self.completed_at_ms,
156 }
157 }
158
159 pub fn readiness(
169 &self,
170 policy: &SemanticPolicy,
171 current_db_fingerprint: &str,
172 current_model_revision: &str,
173 ) -> TierReadiness {
174 let action = self.to_policy_manifest().invalidation_action(
175 policy,
176 current_model_revision,
177 &self.embedder_id,
178 );
179
180 match action {
181 InvalidationAction::UpToDate => {
182 if self.db_fingerprint != current_db_fingerprint {
183 TierReadiness::Stale {
184 reason: "DB content changed since artifact was built".to_owned(),
185 }
186 } else if !self.ready {
187 TierReadiness::Building { progress_pct: 100 }
188 } else {
189 TierReadiness::Ready
190 }
191 }
192 InvalidationAction::RebuildInBackground => TierReadiness::Stale {
193 reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
194 },
195 InvalidationAction::DiscardAndRebuild { reason } => {
196 TierReadiness::Incompatible { reason }
197 }
198 InvalidationAction::Evict => TierReadiness::Incompatible {
199 reason: "semantic mode set to lexical-only".to_owned(),
200 },
201 }
202 }
203}
204
205#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
209pub struct HnswRecord {
210 pub base_tier: TierKind,
212 pub embedder_id: String,
214 pub ef_search: usize,
216 pub index_path: String,
218 pub size_bytes: u64,
220 pub built_at_ms: i64,
222 pub ready: bool,
224}
225
226#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
236pub struct SemanticShardRecord {
237 pub tier: TierKind,
239 pub embedder_id: String,
241 pub model_revision: String,
243 pub schema_version: u32,
245 pub chunking_version: u32,
247 pub dimension: usize,
249 pub shard_index: u32,
251 pub shard_count: u32,
253 pub doc_count: u64,
255 pub total_conversations: u64,
257 pub db_fingerprint: String,
259 pub index_path: String,
261 pub quantization: String,
263 pub mmap_ready: bool,
265 pub ann_index_path: Option<String>,
267 pub ann_size_bytes: u64,
269 pub ann_ready: bool,
271 pub size_bytes: u64,
273 pub started_at_ms: i64,
275 pub completed_at_ms: i64,
277 pub ready: bool,
279}
280
281impl SemanticShardRecord {
282 pub fn to_policy_manifest(&self) -> PolicyManifest {
283 PolicyManifest {
284 embedder_id: self.embedder_id.clone(),
285 model_revision: self.model_revision.clone(),
286 schema_version: self.schema_version,
287 chunking_version: self.chunking_version,
288 doc_count: self.doc_count,
289 built_at_ms: self.completed_at_ms,
290 }
291 }
292
293 pub fn matches_generation(
294 &self,
295 tier: TierKind,
296 embedder_id: &str,
297 db_fingerprint: &str,
298 ) -> bool {
299 self.tier == tier
300 && self.embedder_id == embedder_id
301 && self.db_fingerprint == db_fingerprint
302 }
303}
304
305#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
307pub struct SemanticShardSummary {
308 pub shard_count: u32,
309 pub ready_shards: u32,
310 pub ann_ready_shards: u32,
311 pub doc_count: u64,
312 pub total_conversations: u64,
313 pub size_bytes: u64,
314 pub ann_size_bytes: u64,
315 pub complete: bool,
316}
317
318#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
320pub struct SemanticShardManifest {
321 pub manifest_version: u32,
322 pub shards: Vec<SemanticShardRecord>,
323 pub updated_at_ms: i64,
324}
325
326impl Default for SemanticShardManifest {
327 fn default() -> Self {
328 Self {
329 manifest_version: MANIFEST_FORMAT_VERSION,
330 shards: Vec::new(),
331 updated_at_ms: 0,
332 }
333 }
334}
335
336impl SemanticShardManifest {
337 pub fn path(data_dir: &Path) -> PathBuf {
338 data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
339 }
340
341 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
342 let path = Self::path(data_dir);
343 let bytes = match fs::read(&path) {
344 Ok(bytes) => bytes,
345 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
346 Err(e) => {
347 return Err(ManifestError::Io {
348 path,
349 source: e.to_string(),
350 });
351 }
352 };
353
354 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
355 path: path.clone(),
356 source: e.to_string(),
357 })?;
358
359 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
360 return Err(ManifestError::UnsupportedVersion {
361 found: manifest.manifest_version,
362 max_supported: MANIFEST_FORMAT_VERSION,
363 });
364 }
365
366 Ok(Some(manifest))
367 }
368
369 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
370 match Self::load(data_dir) {
371 Ok(Some(manifest)) => Ok(manifest),
372 Ok(None) => Ok(Self::default()),
373 Err(e @ ManifestError::Io { .. }) => Err(e),
374 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
375 Ok(Self::default())
376 }
377 Err(e) => Err(e),
378 }
379 }
380
381 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
382 let path = Self::path(data_dir);
383 if let Some(parent) = path.parent() {
384 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
385 path: parent.to_path_buf(),
386 source: e.to_string(),
387 })?;
388 }
389
390 self.updated_at_ms = now_ms();
391 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
392 source: e.to_string(),
393 })?;
394 let (tmp_path, mut file) =
395 create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
396 path: path.clone(),
397 source: e.to_string(),
398 })?;
399 file.write_all(json.as_bytes())
400 .map_err(|e| ManifestError::Io {
401 path: tmp_path.clone(),
402 source: e.to_string(),
403 })?;
404 file.sync_all().map_err(|e| ManifestError::Io {
405 path: tmp_path.clone(),
406 source: e.to_string(),
407 })?;
408 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
409 path: path.clone(),
410 source: e.to_string(),
411 })?;
412 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
413 path: path
414 .parent()
415 .map(Path::to_path_buf)
416 .unwrap_or_else(|| path.clone()),
417 source: e.to_string(),
418 })?;
419
420 Ok(())
421 }
422
423 pub fn replace_shards_for_generation(
424 &mut self,
425 tier: TierKind,
426 embedder_id: &str,
427 db_fingerprint: &str,
428 mut shards: Vec<SemanticShardRecord>,
429 ) {
430 self.shards
431 .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
432 self.shards.append(&mut shards);
433 self.shards.sort_by(|a, b| {
434 (
435 a.tier.as_str(),
436 &a.embedder_id,
437 &a.db_fingerprint,
438 a.shard_index,
439 )
440 .cmp(&(
441 b.tier.as_str(),
442 &b.embedder_id,
443 &b.db_fingerprint,
444 b.shard_index,
445 ))
446 });
447 }
448
449 pub fn summary(
450 &self,
451 tier: TierKind,
452 embedder_id: &str,
453 db_fingerprint: &str,
454 ) -> SemanticShardSummary {
455 let mut summary = SemanticShardSummary::default();
456 let mut ready_indices = std::collections::BTreeSet::new();
457 let mut ann_ready_indices = std::collections::BTreeSet::new();
458 let mut seen_indices = std::collections::BTreeSet::new();
459 let mut seen_index_paths = std::collections::BTreeSet::new();
460 let mut seen_ann_index_paths = std::collections::BTreeSet::new();
461 let mut expected_shard_count = None;
462 let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
463 let mut generation_consistent = true;
464 for shard in self
465 .shards
466 .iter()
467 .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
468 {
469 if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
470 generation_consistent = false;
471 }
472 if !seen_indices.insert(shard.shard_index) {
473 generation_consistent = false;
474 }
475 if !semantic_shard_artifact_path_is_safe(&shard.index_path)
476 || !seen_index_paths.insert(&shard.index_path)
477 {
478 generation_consistent = false;
479 }
480 match expected_shard_count {
481 Some(expected) if expected != shard.shard_count => {
482 generation_consistent = false;
483 }
484 None => expected_shard_count = Some(shard.shard_count),
485 _ => {}
486 }
487 let generation_metadata = (
488 shard.model_revision.as_str(),
489 shard.schema_version,
490 shard.chunking_version,
491 shard.dimension,
492 shard.total_conversations,
493 shard.quantization.as_str(),
494 );
495 match expected_generation_metadata {
496 Some(expected) if expected != generation_metadata => {
497 generation_consistent = false;
498 }
499 None => expected_generation_metadata = Some(generation_metadata),
500 _ => {}
501 }
502 if shard.schema_version != SEMANTIC_SCHEMA_VERSION
503 || shard.chunking_version != CHUNKING_STRATEGY_VERSION
504 || shard.dimension == 0
505 {
506 generation_consistent = false;
507 }
508 summary.shard_count = summary.shard_count.max(shard.shard_count);
509 summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
510 summary.total_conversations =
511 summary.total_conversations.max(shard.total_conversations);
512 summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
513 summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
514 if shard.ready && shard.mmap_ready {
515 ready_indices.insert(shard.shard_index);
516 }
517 if shard.ready
518 && shard.mmap_ready
519 && shard.ann_ready
520 && shard.ann_size_bytes > 0
521 && let Some(ann_index_path) = shard.ann_index_path.as_deref()
522 && semantic_shard_artifact_path_is_safe(ann_index_path)
523 && seen_ann_index_paths.insert(ann_index_path)
524 {
525 ann_ready_indices.insert(shard.shard_index);
526 }
527 }
528 summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
529 summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
530 summary.complete = generation_consistent
531 && summary.shard_count > 0
532 && summary.ready_shards == summary.shard_count
533 && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
534 summary
535 }
536
537 pub fn invalidate_incompatible(
538 &mut self,
539 policy: &SemanticPolicy,
540 current_model_revision: &str,
541 ) -> usize {
542 let before = self.shards.len();
543 self.shards.retain(|shard| {
544 !matches!(
545 shard.to_policy_manifest().invalidation_action(
546 policy,
547 current_model_revision,
548 &shard.embedder_id,
549 ),
550 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
551 )
552 });
553 before.saturating_sub(self.shards.len())
554 }
555
556 pub fn total_size_bytes(&self) -> u64 {
557 self.shards
558 .iter()
559 .map(|shard| shard.size_bytes)
560 .fold(0, u64::saturating_add)
561 }
562}
563
564#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
568pub struct BuildCheckpoint {
569 pub tier: TierKind,
571 pub embedder_id: String,
573 pub last_offset: i64,
575 pub docs_embedded: u64,
577 pub conversations_processed: u64,
579 pub total_conversations: u64,
581 pub db_fingerprint: String,
583 pub schema_version: u32,
585 pub chunking_version: u32,
587 pub saved_at_ms: i64,
589}
590
591impl BuildCheckpoint {
592 pub fn progress_pct(&self) -> u8 {
594 if self.total_conversations == 0 {
595 return 0;
596 }
597 let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
598 (pct as u8).min(100)
599 }
600
601 pub fn is_complete(&self) -> bool {
603 self.conversations_processed >= self.total_conversations
604 }
605
606 pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
608 self.db_fingerprint == current_db_fingerprint
609 && self.schema_version == SEMANTIC_SCHEMA_VERSION
610 && self.chunking_version == CHUNKING_STRATEGY_VERSION
611 }
612}
613
614#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
618pub struct BacklogLedger {
619 pub total_conversations: u64,
621 pub fast_tier_processed: u64,
623 pub quality_tier_processed: u64,
625 pub db_fingerprint: String,
627 pub computed_at_ms: i64,
629}
630
631impl BacklogLedger {
632 pub fn fast_tier_remaining(&self) -> u64 {
634 self.total_conversations
635 .saturating_sub(self.fast_tier_processed)
636 }
637
638 pub fn quality_tier_remaining(&self) -> u64 {
640 self.total_conversations
641 .saturating_sub(self.quality_tier_processed)
642 }
643
644 pub fn has_pending_work(&self) -> bool {
646 self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
647 }
648
649 pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
651 self.db_fingerprint == current_db_fingerprint
652 }
653}
654
655#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
662pub struct SemanticManifest {
663 pub manifest_version: u32,
665 pub fast_tier: Option<ArtifactRecord>,
667 pub quality_tier: Option<ArtifactRecord>,
669 pub hnsw: Option<HnswRecord>,
671 pub backlog: BacklogLedger,
673 pub checkpoint: Option<BuildCheckpoint>,
675 pub updated_at_ms: i64,
677}
678
679impl Default for SemanticManifest {
680 fn default() -> Self {
681 Self {
682 manifest_version: MANIFEST_FORMAT_VERSION,
683 fast_tier: None,
684 quality_tier: None,
685 hnsw: None,
686 backlog: BacklogLedger {
687 total_conversations: 0,
688 fast_tier_processed: 0,
689 quality_tier_processed: 0,
690 db_fingerprint: String::new(),
691 computed_at_ms: 0,
692 },
693 checkpoint: None,
694 updated_at_ms: 0,
695 }
696 }
697}
698
699impl SemanticManifest {
700 pub fn path(data_dir: &Path) -> PathBuf {
704 data_dir.join("vector_index").join(MANIFEST_FILENAME)
705 }
706
707 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
712 let path = Self::path(data_dir);
713 let bytes = match fs::read(&path) {
714 Ok(b) => b,
715 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
716 Err(e) => {
717 return Err(ManifestError::Io {
718 path,
719 source: e.to_string(),
720 });
721 }
722 };
723
724 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
725 path: path.clone(),
726 source: e.to_string(),
727 })?;
728
729 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
731 return Err(ManifestError::UnsupportedVersion {
732 found: manifest.manifest_version,
733 max_supported: MANIFEST_FORMAT_VERSION,
734 });
735 }
736
737 Ok(Some(manifest))
738 }
739
740 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
747 match Self::load(data_dir) {
748 Ok(Some(manifest)) => Ok(manifest),
749 Ok(None) => Ok(Self::default()),
750 Err(e @ ManifestError::Io { .. }) => Err(e),
752 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
754 Ok(Self::default())
755 }
756 Err(e) => Err(e),
757 }
758 }
759
760 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
762 let path = Self::path(data_dir);
763
764 if let Some(parent) = path.parent() {
766 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
767 path: parent.to_path_buf(),
768 source: e.to_string(),
769 })?;
770 }
771
772 self.updated_at_ms = now_ms();
773
774 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
775 source: e.to_string(),
776 })?;
777
778 let (tmp_path, mut file) =
780 create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
781 path: path.clone(),
782 source: e.to_string(),
783 })?;
784 file.write_all(json.as_bytes())
785 .map_err(|e| ManifestError::Io {
786 path: tmp_path.clone(),
787 source: e.to_string(),
788 })?;
789 file.sync_all().map_err(|e| ManifestError::Io {
790 path: tmp_path.clone(),
791 source: e.to_string(),
792 })?;
793 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
794 path: path.clone(),
795 source: e.to_string(),
796 })?;
797 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
798 path: path
799 .parent()
800 .map(Path::to_path_buf)
801 .unwrap_or_else(|| path.clone()),
802 source: e.to_string(),
803 })?;
804
805 Ok(())
806 }
807
808 pub fn fast_tier_readiness(
812 &self,
813 policy: &SemanticPolicy,
814 current_db_fingerprint: &str,
815 current_model_revision: &str,
816 ) -> TierReadiness {
817 match &self.fast_tier {
818 Some(artifact) => {
819 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
820 }
821 None => {
822 if let Some(cp) = &self.checkpoint
824 && cp.tier == TierKind::Fast
825 && cp.is_valid(current_db_fingerprint)
826 {
827 TierReadiness::Building {
828 progress_pct: cp.progress_pct(),
829 }
830 } else {
831 TierReadiness::Missing
832 }
833 }
834 }
835 }
836
837 pub fn quality_tier_readiness(
839 &self,
840 policy: &SemanticPolicy,
841 current_db_fingerprint: &str,
842 current_model_revision: &str,
843 ) -> TierReadiness {
844 match &self.quality_tier {
845 Some(artifact) => {
846 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
847 }
848 None => {
849 if let Some(cp) = &self.checkpoint
850 && cp.tier == TierKind::Quality
851 && cp.is_valid(current_db_fingerprint)
852 {
853 TierReadiness::Building {
854 progress_pct: cp.progress_pct(),
855 }
856 } else {
857 TierReadiness::Missing
858 }
859 }
860 }
861 }
862
863 pub fn can_hybrid_search(
865 &self,
866 policy: &SemanticPolicy,
867 current_db_fingerprint: &str,
868 current_model_revision: &str,
869 ) -> bool {
870 self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
871 .is_usable()
872 }
873
874 pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
878 let fast_processed = self
879 .fast_tier
880 .as_ref()
881 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
882 .map_or(0, |a| a.conversation_count);
883 let quality_processed = self
884 .quality_tier
885 .as_ref()
886 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
887 .map_or(0, |a| a.conversation_count);
888
889 self.backlog = BacklogLedger {
890 total_conversations,
891 fast_tier_processed: fast_processed,
892 quality_tier_processed: quality_processed,
893 db_fingerprint: current_db_fingerprint.to_owned(),
894 computed_at_ms: now_ms(),
895 };
896 }
897
898 pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
900 self.checkpoint = Some(checkpoint);
901 }
902
903 pub fn clear_checkpoint(&mut self) {
905 self.checkpoint = None;
906 }
907
908 pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
910 if self
912 .checkpoint
913 .as_ref()
914 .is_some_and(|cp| cp.tier == artifact.tier)
915 {
916 self.checkpoint = None;
917 }
918
919 match artifact.tier {
920 TierKind::Fast => self.fast_tier = Some(artifact),
921 TierKind::Quality => self.quality_tier = Some(artifact),
922 }
923 }
924
925 pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
927 self.hnsw = Some(hnsw);
928 }
929
930 #[allow(clippy::too_many_arguments)]
933 pub fn adopt_legacy_artifact(
934 &mut self,
935 tier: TierKind,
936 embedder_id: &str,
937 model_revision: &str,
938 dimension: usize,
939 doc_count: u64,
940 conversation_count: u64,
941 db_fingerprint: &str,
942 index_path: &str,
943 size_bytes: u64,
944 ) -> bool {
945 let record = ArtifactRecord {
946 tier,
947 embedder_id: embedder_id.to_owned(),
948 model_revision: model_revision.to_owned(),
949 schema_version: SEMANTIC_SCHEMA_VERSION,
950 chunking_version: CHUNKING_STRATEGY_VERSION,
951 dimension,
952 doc_count,
953 conversation_count,
954 db_fingerprint: db_fingerprint.to_owned(),
955 index_path: index_path.to_owned(),
956 size_bytes,
957 started_at_ms: 0,
958 completed_at_ms: now_ms(),
959 ready: true,
960 };
961
962 match tier {
963 TierKind::Fast => self.fast_tier = Some(record),
964 TierKind::Quality => self.quality_tier = Some(record),
965 }
966 true
967 }
968
969 pub fn invalidate_incompatible(
979 &mut self,
980 policy: &SemanticPolicy,
981 current_model_revision: &str,
982 ) -> usize {
983 let mut count = 0;
984
985 if let Some(ref artifact) = self.fast_tier {
986 let pm = artifact.to_policy_manifest();
987 if matches!(
988 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
989 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
990 ) {
991 self.fast_tier = None;
992 count += 1;
993 }
994 }
995
996 if let Some(ref artifact) = self.quality_tier {
997 let pm = artifact.to_policy_manifest();
998 if matches!(
999 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1000 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1001 ) {
1002 self.quality_tier = None;
1003 count += 1;
1004 }
1005 }
1006
1007 if let Some(ref hnsw) = self.hnsw {
1009 let base_gone = match hnsw.base_tier {
1010 TierKind::Fast => self.fast_tier.is_none(),
1011 TierKind::Quality => self.quality_tier.is_none(),
1012 };
1013 if base_gone {
1014 self.hnsw = None;
1015 count += 1;
1016 }
1017 }
1018
1019 if let Some(ref cp) = self.checkpoint
1021 && (cp.schema_version != policy.semantic_schema_version
1022 || cp.chunking_version != policy.chunking_strategy_version)
1023 {
1024 self.checkpoint = None;
1025 }
1026
1027 count
1028 }
1029
1030 pub fn total_size_bytes(&self) -> u64 {
1032 let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1033 let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1034 let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1035 fast + quality + hnsw
1036 }
1037
1038 pub fn total_size_mb(&self) -> u64 {
1040 self.total_size_bytes().div_ceil(1_048_576)
1041 }
1042}
1043
1044#[derive(Debug)]
1047pub enum ManifestError {
1048 Io { path: PathBuf, source: String },
1049 Parse { path: PathBuf, source: String },
1050 Serialize { source: String },
1051 UnsupportedVersion { found: u32, max_supported: u32 },
1052}
1053
1054impl std::fmt::Display for ManifestError {
1055 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1056 match self {
1057 Self::Io { path, source } => {
1058 write!(f, "manifest I/O error at {}: {source}", path.display())
1059 }
1060 Self::Parse { path, source } => {
1061 write!(f, "manifest parse error at {}: {source}", path.display())
1062 }
1063 Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1064 Self::UnsupportedVersion {
1065 found,
1066 max_supported,
1067 } => write!(
1068 f,
1069 "manifest version {found} is newer than supported version {max_supported}"
1070 ),
1071 }
1072 }
1073}
1074
1075impl std::error::Error for ManifestError {}
1076
1077fn now_ms() -> i64 {
1080 SystemTime::now()
1081 .duration_since(UNIX_EPOCH)
1082 .map(|d| d.as_millis() as i64)
1083 .unwrap_or(0)
1084}
1085
1086fn unique_manifest_temp_path(path: &Path, attempt: u32, random: u64) -> PathBuf {
1087 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1088
1089 let file_name = path
1090 .file_name()
1091 .and_then(|name| name.to_str())
1092 .unwrap_or(MANIFEST_FILENAME);
1093 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1094 path.with_file_name(format!(
1095 ".{file_name}.tmp.{attempt}.{}.{}.{random:016x}",
1096 now_ms(),
1097 nonce
1098 ))
1099}
1100
1101fn create_unique_manifest_temp_file(path: &Path) -> std::io::Result<(PathBuf, fs::File)> {
1102 for attempt in 0..100 {
1103 let random = random_manifest_path_nonce()?;
1104 let tmp_path = unique_manifest_temp_path(path, attempt, random);
1105 match OpenOptions::new()
1106 .write(true)
1107 .create_new(true)
1108 .open(&tmp_path)
1109 {
1110 Ok(file) => return Ok((tmp_path, file)),
1111 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
1112 Err(err) => return Err(err),
1113 }
1114 }
1115
1116 Err(std::io::Error::new(
1117 std::io::ErrorKind::AlreadyExists,
1118 format!(
1119 "could not create a unique temporary manifest file for {} after 100 attempts",
1120 path.display()
1121 ),
1122 ))
1123}
1124
1125#[cfg(windows)]
1126fn unique_manifest_backup_path(path: &Path) -> std::io::Result<PathBuf> {
1127 let random = random_manifest_path_nonce()?;
1128 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1129
1130 let file_name = path
1131 .file_name()
1132 .and_then(|name| name.to_str())
1133 .unwrap_or(MANIFEST_FILENAME);
1134 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1135 Ok(path.with_file_name(format!(
1136 ".{file_name}.bak.{}.{nonce}.{random:016x}",
1137 now_ms()
1138 )))
1139}
1140
1141fn random_manifest_path_nonce() -> std::io::Result<u64> {
1142 let mut random_bytes = [0u8; 8];
1143 SystemRandom::new()
1144 .fill(&mut random_bytes)
1145 .map_err(|_| std::io::Error::other("failed to generate manifest temp path nonce"))?;
1146 Ok(u64::from_le_bytes(random_bytes))
1147}
1148
1149fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1150 #[cfg(windows)]
1151 {
1152 match fs::rename(temp_path, final_path) {
1153 Ok(()) => sync_parent_directory(final_path),
1154 Err(first_err)
1155 if final_path.exists()
1156 && matches!(
1157 first_err.kind(),
1158 std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1159 ) =>
1160 {
1161 let backup_path = unique_manifest_backup_path(final_path)?;
1162 fs::rename(final_path, &backup_path).map_err(|backup_err| {
1163 let _ = fs::remove_file(temp_path);
1164 std::io::Error::other(format!(
1165 "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1166 backup_path.display(),
1167 final_path.display(),
1168 first_err,
1169 backup_err
1170 ))
1171 })?;
1172 match fs::rename(temp_path, final_path) {
1173 Ok(()) => {
1174 let _ = fs::remove_file(&backup_path);
1175 sync_parent_directory(final_path)
1176 }
1177 Err(second_err) => match fs::rename(&backup_path, final_path) {
1178 Ok(()) => {
1179 let _ = fs::remove_file(temp_path);
1180 sync_parent_directory(final_path)?;
1181 Err(std::io::Error::other(format!(
1182 "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1183 final_path.display(),
1184 temp_path.display(),
1185 first_err,
1186 second_err
1187 )))
1188 }
1189 Err(restore_err) => Err(std::io::Error::other(format!(
1190 "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1191 final_path.display(),
1192 temp_path.display(),
1193 first_err,
1194 second_err,
1195 restore_err,
1196 temp_path.display()
1197 ))),
1198 },
1199 }
1200 }
1201 Err(err) => Err(err),
1202 }
1203 }
1204
1205 #[cfg(not(windows))]
1206 {
1207 fs::rename(temp_path, final_path)
1208 }
1209}
1210
1211#[cfg(not(windows))]
1212fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1213 let Some(parent) = path.parent() else {
1214 return Ok(());
1215 };
1216 let directory = fs::File::open(parent)?;
1217 directory.sync_all()
1218}
1219
1220#[cfg(windows)]
1221fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1222 Ok(())
1223}
1224
1225#[cfg(test)]
1228mod tests {
1229 use super::*;
1230 use crate::search::policy::SemanticPolicy;
1231
1232 fn test_policy() -> SemanticPolicy {
1233 SemanticPolicy::compiled_defaults()
1234 }
1235
1236 fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1237 ArtifactRecord {
1238 tier,
1239 embedder_id: match tier {
1240 TierKind::Fast => "fnv1a-384".to_owned(),
1241 TierKind::Quality => "minilm-384".to_owned(),
1242 },
1243 model_revision: "abc123".to_owned(),
1244 schema_version: SEMANTIC_SCHEMA_VERSION,
1245 chunking_version: CHUNKING_STRATEGY_VERSION,
1246 dimension: 384,
1247 doc_count: 1000,
1248 conversation_count: 250,
1249 db_fingerprint: "fp-1234".to_owned(),
1250 index_path: format!(
1251 "vector_index/index-{}.fsvi",
1252 match tier {
1253 TierKind::Fast => "fnv1a-384",
1254 TierKind::Quality => "minilm-384",
1255 }
1256 ),
1257 size_bytes: 150_000,
1258 started_at_ms: 1_700_000_000_000,
1259 completed_at_ms: 1_700_000_060_000,
1260 ready,
1261 }
1262 }
1263
1264 fn test_hnsw() -> HnswRecord {
1265 HnswRecord {
1266 base_tier: TierKind::Quality,
1267 embedder_id: "minilm-384".to_owned(),
1268 ef_search: 128,
1269 index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1270 size_bytes: 50_000,
1271 built_at_ms: 1_700_000_070_000,
1272 ready: true,
1273 }
1274 }
1275
1276 fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1277 SemanticShardRecord {
1278 tier: TierKind::Fast,
1279 embedder_id: "fnv1a-384".to_owned(),
1280 model_revision: "hash".to_owned(),
1281 schema_version: SEMANTIC_SCHEMA_VERSION,
1282 chunking_version: CHUNKING_STRATEGY_VERSION,
1283 dimension: 384,
1284 shard_index,
1285 shard_count,
1286 doc_count: 25,
1287 total_conversations: 10,
1288 db_fingerprint: "fp-sharded".to_owned(),
1289 index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1290 quantization: "f16".to_owned(),
1291 mmap_ready: true,
1292 ann_index_path: None,
1293 ann_size_bytes: 0,
1294 ann_ready: false,
1295 size_bytes: 4096,
1296 started_at_ms: 1_700_000_080_000,
1297 completed_at_ms: 1_700_000_081_000,
1298 ready,
1299 }
1300 }
1301
1302 fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1303 BuildCheckpoint {
1304 tier,
1305 embedder_id: "minilm-384".to_owned(),
1306 last_offset: 500,
1307 docs_embedded: 3000,
1308 conversations_processed: 500,
1309 total_conversations: 1000,
1310 db_fingerprint: "fp-1234".to_owned(),
1311 schema_version: SEMANTIC_SCHEMA_VERSION,
1312 chunking_version: CHUNKING_STRATEGY_VERSION,
1313 saved_at_ms: 1_700_000_030_000,
1314 }
1315 }
1316
1317 #[derive(Debug, Clone, Copy)]
1318 enum ExpectedTierReadiness {
1319 Ready,
1320 Stale,
1321 Incompatible,
1322 Building(u8),
1323 }
1324
1325 fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1326
1327 type TierReadinessCase = (
1328 &'static str,
1329 TierKind,
1330 bool,
1331 &'static str,
1332 &'static str,
1333 fn(&mut ArtifactRecord),
1334 ExpectedTierReadiness,
1335 );
1336
1337 fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1338 artifact.schema_version = 0;
1339 }
1340
1341 fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1342 match expected {
1343 ExpectedTierReadiness::Ready => {
1344 assert_eq!(actual, TierReadiness::Ready, "{label}");
1345 }
1346 ExpectedTierReadiness::Stale => {
1347 assert!(
1348 matches!(actual, TierReadiness::Stale { .. }),
1349 "{label}: {actual:?}"
1350 );
1351 }
1352 ExpectedTierReadiness::Incompatible => {
1353 assert!(
1354 matches!(actual, TierReadiness::Incompatible { .. }),
1355 "{label}: {actual:?}"
1356 );
1357 }
1358 ExpectedTierReadiness::Building(progress_pct) => {
1359 assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1360 }
1361 }
1362 }
1363
1364 #[test]
1367 fn manifest_round_trip_via_disk() {
1368 let temp = tempfile::tempdir().unwrap();
1369 let mut manifest = SemanticManifest {
1370 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1371 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1372 hnsw: Some(test_hnsw()),
1373 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1374 backlog: BacklogLedger {
1375 total_conversations: 2000,
1376 fast_tier_processed: 1000,
1377 quality_tier_processed: 500,
1378 db_fingerprint: "fp-1234".to_owned(),
1379 computed_at_ms: 1_700_000_000_000,
1380 },
1381 ..Default::default()
1382 };
1383
1384 manifest.save(temp.path()).unwrap();
1385 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1386
1387 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1388 assert!(loaded.fast_tier.is_some());
1389 assert!(loaded.quality_tier.is_some());
1390 assert!(loaded.hnsw.is_some());
1391 assert!(loaded.checkpoint.is_some());
1392 assert_eq!(loaded.backlog.total_conversations, 2000);
1393 assert!(loaded.updated_at_ms > 0);
1394 }
1395
1396 #[test]
1397 fn manifest_save_overwrites_existing_file() {
1398 let temp = tempfile::tempdir().unwrap();
1399 let mut first = SemanticManifest {
1400 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1401 ..Default::default()
1402 };
1403 first.save(temp.path()).unwrap();
1404
1405 let mut second = SemanticManifest {
1406 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1407 backlog: BacklogLedger {
1408 total_conversations: 99,
1409 fast_tier_processed: 0,
1410 quality_tier_processed: 99,
1411 db_fingerprint: "fp-overwrite".to_owned(),
1412 computed_at_ms: 1_700_000_000_123,
1413 },
1414 ..Default::default()
1415 };
1416 second.save(temp.path()).unwrap();
1417
1418 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1419 assert!(loaded.fast_tier.is_none());
1420 assert!(loaded.quality_tier.is_some());
1421 assert_eq!(loaded.backlog.total_conversations, 99);
1422 }
1423
1424 #[test]
1425 fn manifest_temp_file_creation_is_exclusive_and_unique() -> Result<(), String> {
1426 let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1427 let final_path = SemanticManifest::path(temp.path());
1428 let manifest_dir = final_path
1429 .parent()
1430 .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1431 fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1432
1433 let (first_path, mut first_file) =
1434 create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1435 first_file.write_all(b"first").map_err(|e| e.to_string())?;
1436 let (second_path, mut second_file) =
1437 create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1438 second_file
1439 .write_all(b"second")
1440 .map_err(|e| e.to_string())?;
1441
1442 if first_path == second_path {
1443 return Err("exclusive temp creation reused the same path".to_string());
1444 }
1445 if !first_path.exists() {
1446 return Err(format!(
1447 "first temp file is missing: {}",
1448 first_path.display()
1449 ));
1450 }
1451 if !second_path.exists() {
1452 return Err(format!(
1453 "second temp file is missing: {}",
1454 second_path.display()
1455 ));
1456 }
1457 if first_path.parent() != Some(manifest_dir) {
1458 return Err(format!(
1459 "first temp path escaped manifest directory: {}",
1460 first_path.display()
1461 ));
1462 }
1463 if second_path.parent() != Some(manifest_dir) {
1464 return Err(format!(
1465 "second temp path escaped manifest directory: {}",
1466 second_path.display()
1467 ));
1468 }
1469
1470 Ok(())
1471 }
1472
1473 #[test]
1474 fn manifest_load_missing_returns_none() {
1475 let temp = tempfile::tempdir().unwrap();
1476 let loaded = SemanticManifest::load(temp.path()).unwrap();
1477 assert!(loaded.is_none());
1478 }
1479
1480 #[test]
1481 fn manifest_load_or_default_returns_defaults() {
1482 let temp = tempfile::tempdir().unwrap();
1483 let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1484 assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1485 assert!(manifest.fast_tier.is_none());
1486 assert!(manifest.quality_tier.is_none());
1487 }
1488
1489 #[test]
1490 fn manifest_load_corrupt_returns_parse_error() {
1491 let temp = tempfile::tempdir().unwrap();
1492 let path = SemanticManifest::path(temp.path());
1493 fs::create_dir_all(path.parent().unwrap()).unwrap();
1494 fs::write(&path, b"not json").unwrap();
1495
1496 let result = SemanticManifest::load(temp.path());
1497 assert!(matches!(result, Err(ManifestError::Parse { .. })));
1498 }
1499
1500 #[test]
1501 fn manifest_load_future_version_returns_error() {
1502 let temp = tempfile::tempdir().unwrap();
1503 let path = SemanticManifest::path(temp.path());
1504 fs::create_dir_all(path.parent().unwrap()).unwrap();
1505
1506 let manifest = SemanticManifest {
1507 manifest_version: MANIFEST_FORMAT_VERSION + 1,
1508 ..Default::default()
1509 };
1510 let json = serde_json::to_string(&manifest).unwrap();
1511 fs::write(&path, json).unwrap();
1512
1513 let result = SemanticManifest::load(temp.path());
1514 assert!(matches!(
1515 result,
1516 Err(ManifestError::UnsupportedVersion { .. })
1517 ));
1518 }
1519
1520 #[test]
1523 fn tier_readiness_cases() {
1524 let policy = test_policy();
1525 let db_fp = "fp-1234";
1526 let model_rev = "abc123";
1527 let cases: &[TierReadinessCase] = &[
1528 (
1529 "ready artifact with matching fingerprint",
1530 TierKind::Fast,
1531 true,
1532 db_fp,
1533 model_rev,
1534 no_artifact_mutation,
1535 ExpectedTierReadiness::Ready,
1536 ),
1537 (
1538 "ready artifact with changed DB fingerprint",
1539 TierKind::Fast,
1540 true,
1541 "different-fp",
1542 model_rev,
1543 no_artifact_mutation,
1544 ExpectedTierReadiness::Stale,
1545 ),
1546 (
1547 "ready artifact with changed model revision",
1548 TierKind::Quality,
1549 true,
1550 db_fp,
1551 "new-revision",
1552 no_artifact_mutation,
1553 ExpectedTierReadiness::Stale,
1554 ),
1555 (
1556 "schema version mismatch",
1557 TierKind::Quality,
1558 true,
1559 db_fp,
1560 model_rev,
1561 set_schema_version_to_zero,
1562 ExpectedTierReadiness::Incompatible,
1563 ),
1564 (
1565 "not yet published artifact",
1566 TierKind::Fast,
1567 false,
1568 db_fp,
1569 model_rev,
1570 no_artifact_mutation,
1571 ExpectedTierReadiness::Building(100),
1572 ),
1573 ];
1574
1575 for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1576 let mut artifact = test_artifact(*tier, *ready);
1577 mutate(&mut artifact);
1578 assert_tier_readiness(
1579 artifact.readiness(&policy, current_db_fp, current_model_rev),
1580 *expected,
1581 label,
1582 );
1583 }
1584 }
1585
1586 #[test]
1589 fn manifest_tier_readiness_missing() {
1590 let manifest = SemanticManifest::default();
1591 let policy = test_policy();
1592 assert_eq!(
1593 manifest.fast_tier_readiness(&policy, "fp", "rev"),
1594 TierReadiness::Missing,
1595 );
1596 assert_eq!(
1597 manifest.quality_tier_readiness(&policy, "fp", "rev"),
1598 TierReadiness::Missing,
1599 );
1600 }
1601
1602 #[test]
1603 fn manifest_tier_readiness_with_checkpoint() {
1604 let manifest = SemanticManifest {
1605 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1606 ..Default::default()
1607 };
1608
1609 let policy = test_policy();
1610 assert_eq!(
1612 manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1613 TierReadiness::Missing,
1614 );
1615 assert!(matches!(
1617 manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1618 TierReadiness::Building { progress_pct: 50 },
1619 ));
1620 }
1621
1622 #[test]
1623 fn manifest_tier_readiness_checkpoint_invalid_db() {
1624 let manifest = SemanticManifest {
1625 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1626 ..Default::default()
1627 };
1628
1629 let policy = test_policy();
1630 assert_eq!(
1632 manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1633 TierReadiness::Missing,
1634 );
1635 }
1636
1637 #[test]
1640 fn can_hybrid_search_requires_usable_fast_tier() {
1641 let policy = test_policy();
1642 let db_fp = "fp-1234";
1643 let rev = "abc123";
1644
1645 let manifest = SemanticManifest::default();
1647 assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1648
1649 let manifest = SemanticManifest {
1651 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1652 ..Default::default()
1653 };
1654 assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1655 }
1656
1657 #[test]
1660 fn backlog_remaining_and_pending() {
1661 let ledger = BacklogLedger {
1662 total_conversations: 1000,
1663 fast_tier_processed: 800,
1664 quality_tier_processed: 300,
1665 db_fingerprint: "fp".to_owned(),
1666 computed_at_ms: 0,
1667 };
1668
1669 assert_eq!(ledger.fast_tier_remaining(), 200);
1670 assert_eq!(ledger.quality_tier_remaining(), 700);
1671 assert!(ledger.has_pending_work());
1672 assert!(ledger.is_current("fp"));
1673 assert!(!ledger.is_current("other"));
1674 }
1675
1676 #[test]
1677 fn backlog_no_pending_when_fully_processed() {
1678 let ledger = BacklogLedger {
1679 total_conversations: 500,
1680 fast_tier_processed: 500,
1681 quality_tier_processed: 500,
1682 db_fingerprint: "fp".to_owned(),
1683 computed_at_ms: 0,
1684 };
1685
1686 assert_eq!(ledger.fast_tier_remaining(), 0);
1687 assert_eq!(ledger.quality_tier_remaining(), 0);
1688 assert!(!ledger.has_pending_work());
1689 }
1690
1691 #[test]
1694 fn checkpoint_progress_and_completion() {
1695 let cp = test_checkpoint(TierKind::Quality);
1696 assert_eq!(cp.progress_pct(), 50);
1697 assert!(!cp.is_complete());
1698 assert!(cp.is_valid("fp-1234"));
1699 assert!(!cp.is_valid("other-fp"));
1700
1701 let mut cp = test_checkpoint(TierKind::Quality);
1703 cp.conversations_processed = 1000;
1704 assert_eq!(cp.progress_pct(), 100);
1705 assert!(cp.is_complete());
1706 }
1707
1708 #[test]
1709 fn checkpoint_zero_total_gives_zero_pct() {
1710 let mut cp = test_checkpoint(TierKind::Fast);
1711 cp.total_conversations = 0;
1712 cp.conversations_processed = 0;
1713 assert_eq!(cp.progress_pct(), 0);
1714 }
1715
1716 #[test]
1719 fn publish_artifact_clears_matching_checkpoint() {
1720 let mut manifest = SemanticManifest {
1721 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1722 ..Default::default()
1723 };
1724
1725 manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1726 assert!(manifest.checkpoint.is_none());
1727 assert!(manifest.quality_tier.is_some());
1728 }
1729
1730 #[test]
1731 fn publish_artifact_keeps_non_matching_checkpoint() {
1732 let mut manifest = SemanticManifest {
1733 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1734 ..Default::default()
1735 };
1736
1737 manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1738 assert!(manifest.checkpoint.is_some()); assert!(manifest.fast_tier.is_some());
1740 }
1741
1742 #[test]
1745 fn refresh_backlog_computes_from_ready_artifacts() {
1746 let mut manifest = SemanticManifest {
1747 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1748 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1749 ..Default::default()
1750 };
1751
1752 manifest.refresh_backlog(2000, "fp-1234");
1753 assert_eq!(manifest.backlog.total_conversations, 2000);
1754 assert_eq!(manifest.backlog.fast_tier_processed, 250);
1755 assert_eq!(manifest.backlog.quality_tier_processed, 250);
1756 }
1757
1758 #[test]
1759 fn refresh_backlog_ignores_stale_artifacts() {
1760 let mut manifest = SemanticManifest {
1761 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1762 ..Default::default()
1763 };
1764
1765 manifest.refresh_backlog(2000, "different-fp");
1767 assert_eq!(manifest.backlog.fast_tier_processed, 0);
1768 }
1769
1770 #[test]
1773 fn invalidate_incompatible_removes_schema_mismatch() {
1774 let mut artifact = test_artifact(TierKind::Quality, true);
1775 artifact.schema_version = 0; let mut manifest = SemanticManifest {
1777 quality_tier: Some(artifact),
1778 hnsw: Some(test_hnsw()), ..Default::default()
1780 };
1781
1782 let policy = test_policy();
1783 let count = manifest.invalidate_incompatible(&policy, "abc123");
1784
1785 assert_eq!(count, 2); assert!(manifest.quality_tier.is_none());
1787 assert!(manifest.hnsw.is_none());
1788 }
1789
1790 #[test]
1791 fn invalidate_incompatible_keeps_compatible() {
1792 let mut manifest = SemanticManifest {
1793 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1794 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1795 ..Default::default()
1796 };
1797
1798 let policy = test_policy();
1799 let count = manifest.invalidate_incompatible(&policy, "abc123");
1800
1801 assert_eq!(count, 0);
1802 assert!(manifest.fast_tier.is_some());
1803 assert!(manifest.quality_tier.is_some());
1804 }
1805
1806 #[test]
1809 fn adopt_legacy_artifact() {
1810 let mut manifest = SemanticManifest::default();
1811 let doc_count = 500;
1812 let conversation_count = 125;
1813 let index_path = "vector_index/index-fnv1a-384.fsvi";
1814 let db_fingerprint = "fp-old";
1815 let size_bytes = 75_000;
1816 let adopted = manifest.adopt_legacy_artifact(
1817 TierKind::Fast,
1818 "fnv1a-384",
1819 "hash",
1820 384,
1821 doc_count,
1822 conversation_count,
1823 db_fingerprint,
1824 index_path,
1825 size_bytes,
1826 );
1827
1828 assert!(adopted);
1829 let fast = manifest.fast_tier.as_ref().unwrap();
1830 assert_eq!(fast.embedder_id, "fnv1a-384");
1831 assert!(fast.ready);
1832 assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1833 }
1834
1835 #[test]
1838 fn total_size_accounts_for_all_artifacts() {
1839 let manifest = SemanticManifest {
1840 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1841 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1842 hnsw: Some(test_hnsw()),
1843 ..Default::default()
1844 };
1845
1846 assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1847 assert_eq!(manifest.total_size_mb(), 1); }
1849
1850 #[test]
1851 fn total_size_empty_is_zero() {
1852 let manifest = SemanticManifest::default();
1853 assert_eq!(manifest.total_size_bytes(), 0);
1854 assert_eq!(manifest.total_size_mb(), 0);
1855 }
1856
1857 #[test]
1860 fn manifest_json_round_trip() {
1861 let manifest = SemanticManifest {
1862 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1863 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1864 hnsw: Some(test_hnsw()),
1865 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1866 ..Default::default()
1867 };
1868
1869 let json = serde_json::to_string_pretty(&manifest).unwrap();
1870 let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
1871 assert_eq!(deser.fast_tier, manifest.fast_tier);
1872 assert_eq!(deser.quality_tier, manifest.quality_tier);
1873 assert_eq!(deser.hnsw, manifest.hnsw);
1874 assert_eq!(deser.checkpoint, manifest.checkpoint);
1875 }
1876
1877 #[test]
1880 fn shard_manifest_round_trip_via_sidecar() {
1881 let temp = tempfile::tempdir().unwrap();
1882 let mut shards = SemanticShardManifest::default();
1883 shards.replace_shards_for_generation(
1884 TierKind::Fast,
1885 "fnv1a-384",
1886 "fp-sharded",
1887 vec![test_shard(1, 2, true), test_shard(0, 2, true)],
1888 );
1889
1890 shards.save(temp.path()).unwrap();
1891 let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
1892
1893 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1894 assert_eq!(loaded.shards.len(), 2);
1895 assert_eq!(loaded.shards[0].shard_index, 0);
1896 assert_eq!(loaded.shards[1].shard_index, 1);
1897 assert!(loaded.updated_at_ms > 0);
1898 }
1899
1900 #[test]
1901 fn shard_summary_requires_every_ready_shard() {
1902 let mut shards = SemanticShardManifest::default();
1903 shards.replace_shards_for_generation(
1904 TierKind::Fast,
1905 "fnv1a-384",
1906 "fp-sharded",
1907 vec![test_shard(0, 3, true), test_shard(2, 3, true)],
1908 );
1909
1910 let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1911 assert_eq!(partial.shard_count, 3);
1912 assert_eq!(partial.ready_shards, 2);
1913 assert!(!partial.complete);
1914
1915 shards.replace_shards_for_generation(
1916 TierKind::Fast,
1917 "fnv1a-384",
1918 "fp-sharded",
1919 vec![
1920 test_shard(0, 3, true),
1921 test_shard(1, 3, true),
1922 test_shard(2, 3, true),
1923 ],
1924 );
1925
1926 let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1927 assert_eq!(complete.ready_shards, 3);
1928 assert!(complete.complete);
1929 assert_eq!(complete.doc_count, 75);
1930 assert_eq!(complete.total_conversations, 10);
1931 }
1932
1933 #[test]
1934 fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
1935 let mut non_mmap = test_shard(0, 1, true);
1936 non_mmap.mmap_ready = false;
1937 let mut shards = SemanticShardManifest::default();
1938 shards.replace_shards_for_generation(
1939 TierKind::Fast,
1940 "fnv1a-384",
1941 "fp-sharded",
1942 vec![non_mmap],
1943 );
1944
1945 let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1946 assert_eq!(non_mmap_summary.ready_shards, 0);
1947 assert!(!non_mmap_summary.complete);
1948
1949 let mut inconsistent = test_shard(1, 3, true);
1950 inconsistent.ann_ready = true;
1951 inconsistent.ann_index_path = None;
1952 inconsistent.ann_size_bytes = 4096;
1953 shards.replace_shards_for_generation(
1954 TierKind::Fast,
1955 "fnv1a-384",
1956 "fp-sharded",
1957 vec![test_shard(0, 2, true), inconsistent],
1958 );
1959
1960 let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1961 assert_eq!(inconsistent_summary.shard_count, 3);
1962 assert_eq!(inconsistent_summary.ready_shards, 2);
1963 assert_eq!(inconsistent_summary.ann_ready_shards, 0);
1964 assert!(!inconsistent_summary.complete);
1965
1966 shards.replace_shards_for_generation(
1967 TierKind::Fast,
1968 "fnv1a-384",
1969 "fp-sharded",
1970 vec![
1971 test_shard(0, 2, true),
1972 test_shard(1, 2, true),
1973 test_shard(1, 2, false),
1974 ],
1975 );
1976 let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1977 assert_eq!(duplicate_summary.shard_count, 2);
1978 assert_eq!(duplicate_summary.ready_shards, 2);
1979 assert!(
1980 !duplicate_summary.complete,
1981 "duplicate shard indexes must not summarize as a complete generation"
1982 );
1983
1984 let mut duplicate_path = test_shard(1, 2, true);
1985 duplicate_path.index_path = test_shard(0, 2, true).index_path;
1986 shards.replace_shards_for_generation(
1987 TierKind::Fast,
1988 "fnv1a-384",
1989 "fp-sharded",
1990 vec![test_shard(0, 2, true), duplicate_path],
1991 );
1992 let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1993 assert_eq!(duplicate_path_summary.shard_count, 2);
1994 assert_eq!(duplicate_path_summary.ready_shards, 2);
1995 assert!(
1996 !duplicate_path_summary.complete,
1997 "duplicate shard index paths must not summarize as a complete generation"
1998 );
1999
2000 let mut blank_path = test_shard(0, 1, true);
2001 blank_path.index_path.clear();
2002 shards.replace_shards_for_generation(
2003 TierKind::Fast,
2004 "fnv1a-384",
2005 "fp-sharded",
2006 vec![blank_path],
2007 );
2008 let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2009 assert_eq!(blank_path_summary.shard_count, 1);
2010 assert_eq!(blank_path_summary.ready_shards, 1);
2011 assert!(
2012 !blank_path_summary.complete,
2013 "blank shard index paths must not summarize as complete"
2014 );
2015
2016 for unsafe_path in [
2017 tempfile::tempdir()
2018 .unwrap()
2019 .path()
2020 .join("outside.fsvi")
2021 .to_string_lossy()
2022 .to_string(),
2023 "vector_index/shards/../outside.fsvi".to_string(),
2024 "./vector_index/shards/fast/hash.fsvi".to_string(),
2025 " vector_index/shards/fast/hash.fsvi".to_string(),
2026 ] {
2027 let mut unsafe_shard = test_shard(0, 1, true);
2028 unsafe_shard.index_path = unsafe_path;
2029 shards.replace_shards_for_generation(
2030 TierKind::Fast,
2031 "fnv1a-384",
2032 "fp-sharded",
2033 vec![unsafe_shard],
2034 );
2035 let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2036 assert_eq!(unsafe_path_summary.shard_count, 1);
2037 assert_eq!(unsafe_path_summary.ready_shards, 1);
2038 assert!(
2039 !unsafe_path_summary.complete,
2040 "unsafe shard index paths must not summarize as complete"
2041 );
2042 }
2043
2044 let outside_ann_dir = tempfile::tempdir().unwrap();
2045 for unsafe_ann_path in [
2046 outside_ann_dir
2047 .path()
2048 .join("outside.chsw")
2049 .to_string_lossy()
2050 .to_string(),
2051 "vector_index/shards/../outside.chsw".to_string(),
2052 "./vector_index/shards/fast/hash.chsw".to_string(),
2053 " vector_index/shards/fast/hash.chsw".to_string(),
2054 ] {
2055 let mut unsafe_ann = test_shard(0, 1, true);
2056 unsafe_ann.ann_ready = true;
2057 unsafe_ann.ann_index_path = Some(unsafe_ann_path);
2058 unsafe_ann.ann_size_bytes = 4096;
2059 shards.replace_shards_for_generation(
2060 TierKind::Fast,
2061 "fnv1a-384",
2062 "fp-sharded",
2063 vec![unsafe_ann],
2064 );
2065 let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2066 assert_eq!(unsafe_ann_summary.shard_count, 1);
2067 assert_eq!(unsafe_ann_summary.ready_shards, 1);
2068 assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
2069 assert!(
2070 unsafe_ann_summary.complete,
2071 "unsafe optional ANN paths must not invalidate the vector shard generation"
2072 );
2073 }
2074
2075 let mut duplicate_ann_path = test_shard(1, 2, true);
2076 duplicate_ann_path.ann_ready = true;
2077 duplicate_ann_path.ann_index_path =
2078 Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
2079 duplicate_ann_path.ann_size_bytes = 4096;
2080 let mut first_ann_path = test_shard(0, 2, true);
2081 first_ann_path.ann_ready = true;
2082 first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2083 first_ann_path.ann_size_bytes = 4096;
2084 shards.replace_shards_for_generation(
2085 TierKind::Fast,
2086 "fnv1a-384",
2087 "fp-sharded",
2088 vec![first_ann_path, duplicate_ann_path],
2089 );
2090 let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2091 assert_eq!(duplicate_ann_summary.shard_count, 2);
2092 assert_eq!(duplicate_ann_summary.ready_shards, 2);
2093 assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2094 assert!(
2095 duplicate_ann_summary.complete,
2096 "duplicate optional ANN paths must not invalidate the vector shard generation"
2097 );
2098
2099 shards.replace_shards_for_generation(
2100 TierKind::Fast,
2101 "fnv1a-384",
2102 "fp-sharded",
2103 vec![test_shard(2, 2, true)],
2104 );
2105 let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2106 assert_eq!(out_of_range_summary.shard_count, 2);
2107 assert_eq!(out_of_range_summary.ready_shards, 1);
2108 assert!(
2109 !out_of_range_summary.complete,
2110 "shard indexes outside the declared shard count are malformed"
2111 );
2112
2113 let mut mismatched_metadata = test_shard(1, 2, true);
2114 mismatched_metadata.dimension = 768;
2115 shards.replace_shards_for_generation(
2116 TierKind::Fast,
2117 "fnv1a-384",
2118 "fp-sharded",
2119 vec![test_shard(0, 2, true), mismatched_metadata],
2120 );
2121 let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2122 assert_eq!(metadata_summary.shard_count, 2);
2123 assert_eq!(metadata_summary.ready_shards, 2);
2124 assert!(
2125 !metadata_summary.complete,
2126 "complete shard generations require consistent shard metadata"
2127 );
2128
2129 let mut stale_schema = test_shard(0, 1, true);
2130 stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2131 shards.replace_shards_for_generation(
2132 TierKind::Fast,
2133 "fnv1a-384",
2134 "fp-sharded",
2135 vec![stale_schema],
2136 );
2137 let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2138 assert_eq!(stale_schema_summary.shard_count, 1);
2139 assert_eq!(stale_schema_summary.ready_shards, 1);
2140 assert!(
2141 !stale_schema_summary.complete,
2142 "stale schema shards must not summarize as complete"
2143 );
2144 }
2145
2146 #[test]
2147 fn shard_sidecar_does_not_make_main_manifest_ready() {
2148 let mut shards = SemanticShardManifest::default();
2149 shards.replace_shards_for_generation(
2150 TierKind::Fast,
2151 "fnv1a-384",
2152 "fp-sharded",
2153 vec![test_shard(0, 1, true)],
2154 );
2155 assert!(
2156 shards
2157 .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2158 .complete
2159 );
2160
2161 let manifest = SemanticManifest::default();
2162 let policy = test_policy();
2163 assert_eq!(
2164 manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2165 TierReadiness::Missing,
2166 "sidecar shards must not publish runtime semantic readiness"
2167 );
2168 }
2169
2170 #[test]
2171 fn shard_manifest_invalidates_incompatible_shards() {
2172 let mut bad = test_shard(0, 1, true);
2173 bad.schema_version = 0;
2174 let mut shards = SemanticShardManifest {
2175 shards: vec![bad, test_shard(0, 1, true)],
2176 ..Default::default()
2177 };
2178
2179 let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2180
2181 assert_eq!(invalidated, 1);
2182 assert_eq!(shards.shards.len(), 1);
2183 assert_eq!(shards.total_size_bytes(), 4096);
2184 }
2185}