1use std::fs::{self, OpenOptions};
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use ring::rand::{SecureRandom, SystemRandom};
33use serde::{Deserialize, Serialize};
34
35use super::policy::{
36 CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
37 SemanticAssetManifest as PolicyManifest, SemanticPolicy,
38};
39
40pub const MANIFEST_FORMAT_VERSION: u32 = 2;
68
69pub const MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR: u32 = 1;
74
75pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
77
78pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
80
81pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
82 let trimmed = recorded_path.trim();
83 if trimmed.is_empty() || trimmed != recorded_path {
84 return false;
85 }
86 let path = Path::new(recorded_path);
87 !path.is_absolute()
88 && path
89 .components()
90 .all(|component| matches!(component, Component::Normal(_)))
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
97#[serde(rename_all = "snake_case")]
98pub enum TierKind {
99 Fast,
100 Quality,
101}
102
103impl TierKind {
104 pub fn as_str(&self) -> &'static str {
105 match self {
106 Self::Fast => "fast",
107 Self::Quality => "quality",
108 }
109 }
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
116#[serde(rename_all = "snake_case")]
117pub enum TierReadiness {
118 Ready,
120 Building { progress_pct: u8 },
122 Stale { reason: String },
124 Missing,
126 Incompatible { reason: String },
128}
129
130impl TierReadiness {
131 pub fn is_ready(&self) -> bool {
132 matches!(self, Self::Ready)
133 }
134
135 pub fn is_usable(&self) -> bool {
136 matches!(self, Self::Ready | Self::Stale { .. })
137 }
138}
139
140#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub struct ArtifactRecord {
145 pub tier: TierKind,
147 pub embedder_id: String,
149 pub model_revision: String,
151 pub schema_version: u32,
153 pub chunking_version: u32,
155 pub dimension: usize,
157 pub doc_count: u64,
159 pub conversation_count: u64,
161 pub db_fingerprint: String,
163 pub index_path: String,
165 pub size_bytes: u64,
167 pub started_at_ms: i64,
169 pub completed_at_ms: i64,
171 pub ready: bool,
173}
174
175impl ArtifactRecord {
176 pub fn to_policy_manifest(&self) -> PolicyManifest {
178 PolicyManifest {
179 embedder_id: self.embedder_id.clone(),
180 model_revision: self.model_revision.clone(),
181 schema_version: self.schema_version,
182 chunking_version: self.chunking_version,
183 doc_count: self.doc_count,
184 built_at_ms: self.completed_at_ms,
185 }
186 }
187
188 pub fn readiness(
198 &self,
199 policy: &SemanticPolicy,
200 current_db_fingerprint: &str,
201 current_model_revision: &str,
202 ) -> TierReadiness {
203 let action = self.to_policy_manifest().invalidation_action(
204 policy,
205 current_model_revision,
206 &self.embedder_id,
207 );
208
209 match action {
210 InvalidationAction::UpToDate => {
211 if self.db_fingerprint != current_db_fingerprint {
212 TierReadiness::Stale {
213 reason: "DB content changed since artifact was built".to_owned(),
214 }
215 } else if !self.ready {
216 TierReadiness::Building { progress_pct: 100 }
217 } else {
218 TierReadiness::Ready
219 }
220 }
221 InvalidationAction::RebuildInBackground => TierReadiness::Stale {
222 reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
223 },
224 InvalidationAction::DiscardAndRebuild { reason } => {
225 TierReadiness::Incompatible { reason }
226 }
227 InvalidationAction::Evict => TierReadiness::Incompatible {
228 reason: "semantic mode set to lexical-only".to_owned(),
229 },
230 }
231 }
232}
233
234#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
238pub struct HnswRecord {
239 pub base_tier: TierKind,
241 pub embedder_id: String,
243 pub ef_search: usize,
245 pub index_path: String,
247 pub size_bytes: u64,
249 pub built_at_ms: i64,
251 pub ready: bool,
253}
254
255#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
265pub struct SemanticShardRecord {
266 pub tier: TierKind,
268 pub embedder_id: String,
270 pub model_revision: String,
272 pub schema_version: u32,
274 pub chunking_version: u32,
276 pub dimension: usize,
278 pub shard_index: u32,
280 pub shard_count: u32,
282 pub doc_count: u64,
284 pub total_conversations: u64,
286 pub db_fingerprint: String,
288 pub index_path: String,
290 pub quantization: String,
292 pub mmap_ready: bool,
294 pub ann_index_path: Option<String>,
296 pub ann_size_bytes: u64,
298 pub ann_ready: bool,
300 pub size_bytes: u64,
302 pub started_at_ms: i64,
304 pub completed_at_ms: i64,
306 pub ready: bool,
308}
309
310impl SemanticShardRecord {
311 pub fn to_policy_manifest(&self) -> PolicyManifest {
312 PolicyManifest {
313 embedder_id: self.embedder_id.clone(),
314 model_revision: self.model_revision.clone(),
315 schema_version: self.schema_version,
316 chunking_version: self.chunking_version,
317 doc_count: self.doc_count,
318 built_at_ms: self.completed_at_ms,
319 }
320 }
321
322 pub fn matches_generation(
323 &self,
324 tier: TierKind,
325 embedder_id: &str,
326 db_fingerprint: &str,
327 ) -> bool {
328 self.tier == tier
329 && self.embedder_id == embedder_id
330 && self.db_fingerprint == db_fingerprint
331 }
332}
333
334#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
336pub struct SemanticShardSummary {
337 pub shard_count: u32,
338 pub ready_shards: u32,
339 pub ann_ready_shards: u32,
340 pub doc_count: u64,
341 pub total_conversations: u64,
342 pub size_bytes: u64,
343 pub ann_size_bytes: u64,
344 pub complete: bool,
345}
346
347#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
349pub struct SemanticShardManifest {
350 pub manifest_version: u32,
351 pub shards: Vec<SemanticShardRecord>,
352 pub updated_at_ms: i64,
353}
354
355impl Default for SemanticShardManifest {
356 fn default() -> Self {
357 Self {
358 manifest_version: MANIFEST_FORMAT_VERSION,
359 shards: Vec::new(),
360 updated_at_ms: 0,
361 }
362 }
363}
364
365impl SemanticShardManifest {
366 pub fn path(data_dir: &Path) -> PathBuf {
367 data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
368 }
369
370 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
371 let path = Self::path(data_dir);
372 let bytes = match fs::read(&path) {
373 Ok(bytes) => bytes,
374 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
375 Err(e) => {
376 return Err(ManifestError::Io {
377 path,
378 source: e.to_string(),
379 });
380 }
381 };
382
383 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
384 path: path.clone(),
385 source: e.to_string(),
386 })?;
387
388 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
389 return Err(ManifestError::UnsupportedVersion {
390 found: manifest.manifest_version,
391 max_supported: MANIFEST_FORMAT_VERSION,
392 });
393 }
394
395 Ok(Some(manifest))
396 }
397
398 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
399 match Self::load(data_dir) {
400 Ok(Some(manifest)) => Ok(manifest),
401 Ok(None) => Ok(Self::default()),
402 Err(e @ ManifestError::Io { .. }) => Err(e),
403 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
404 Ok(Self::default())
405 }
406 Err(e) => Err(e),
407 }
408 }
409
410 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
411 let path = Self::path(data_dir);
412 if let Some(parent) = path.parent() {
413 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
414 path: parent.to_path_buf(),
415 source: e.to_string(),
416 })?;
417 }
418
419 self.updated_at_ms = now_ms();
420 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
421 source: e.to_string(),
422 })?;
423 let (tmp_path, mut file) =
424 create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
425 path: path.clone(),
426 source: e.to_string(),
427 })?;
428 file.write_all(json.as_bytes())
429 .map_err(|e| ManifestError::Io {
430 path: tmp_path.clone(),
431 source: e.to_string(),
432 })?;
433 file.sync_all().map_err(|e| ManifestError::Io {
434 path: tmp_path.clone(),
435 source: e.to_string(),
436 })?;
437 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
438 path: path.clone(),
439 source: e.to_string(),
440 })?;
441 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
442 path: path
443 .parent()
444 .map(Path::to_path_buf)
445 .unwrap_or_else(|| path.clone()),
446 source: e.to_string(),
447 })?;
448
449 Ok(())
450 }
451
452 pub fn replace_shards_for_generation(
453 &mut self,
454 tier: TierKind,
455 embedder_id: &str,
456 db_fingerprint: &str,
457 mut shards: Vec<SemanticShardRecord>,
458 ) {
459 self.shards
460 .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
461 self.shards.append(&mut shards);
462 self.shards.sort_by(|a, b| {
463 (
464 a.tier.as_str(),
465 &a.embedder_id,
466 &a.db_fingerprint,
467 a.shard_index,
468 )
469 .cmp(&(
470 b.tier.as_str(),
471 &b.embedder_id,
472 &b.db_fingerprint,
473 b.shard_index,
474 ))
475 });
476 }
477
478 pub fn summary(
479 &self,
480 tier: TierKind,
481 embedder_id: &str,
482 db_fingerprint: &str,
483 ) -> SemanticShardSummary {
484 let mut summary = SemanticShardSummary::default();
485 let mut ready_indices = std::collections::BTreeSet::new();
486 let mut ann_ready_indices = std::collections::BTreeSet::new();
487 let mut seen_indices = std::collections::BTreeSet::new();
488 let mut seen_index_paths = std::collections::BTreeSet::new();
489 let mut seen_ann_index_paths = std::collections::BTreeSet::new();
490 let mut expected_shard_count = None;
491 let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
492 let mut generation_consistent = true;
493 for shard in self
494 .shards
495 .iter()
496 .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
497 {
498 if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
499 generation_consistent = false;
500 }
501 if !seen_indices.insert(shard.shard_index) {
502 generation_consistent = false;
503 }
504 if !semantic_shard_artifact_path_is_safe(&shard.index_path)
505 || !seen_index_paths.insert(&shard.index_path)
506 {
507 generation_consistent = false;
508 }
509 match expected_shard_count {
510 Some(expected) if expected != shard.shard_count => {
511 generation_consistent = false;
512 }
513 None => expected_shard_count = Some(shard.shard_count),
514 _ => {}
515 }
516 let generation_metadata = (
517 shard.model_revision.as_str(),
518 shard.schema_version,
519 shard.chunking_version,
520 shard.dimension,
521 shard.total_conversations,
522 shard.quantization.as_str(),
523 );
524 match expected_generation_metadata {
525 Some(expected) if expected != generation_metadata => {
526 generation_consistent = false;
527 }
528 None => expected_generation_metadata = Some(generation_metadata),
529 _ => {}
530 }
531 if shard.schema_version != SEMANTIC_SCHEMA_VERSION
532 || shard.chunking_version != CHUNKING_STRATEGY_VERSION
533 || shard.dimension == 0
534 {
535 generation_consistent = false;
536 }
537 summary.shard_count = summary.shard_count.max(shard.shard_count);
538 summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
539 summary.total_conversations =
540 summary.total_conversations.max(shard.total_conversations);
541 summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
542 summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
543 if shard.ready && shard.mmap_ready {
544 ready_indices.insert(shard.shard_index);
545 }
546 if shard.ready
547 && shard.mmap_ready
548 && shard.ann_ready
549 && shard.ann_size_bytes > 0
550 && let Some(ann_index_path) = shard.ann_index_path.as_deref()
551 && semantic_shard_artifact_path_is_safe(ann_index_path)
552 && seen_ann_index_paths.insert(ann_index_path)
553 {
554 ann_ready_indices.insert(shard.shard_index);
555 }
556 }
557 summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
558 summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
559 summary.complete = generation_consistent
560 && summary.shard_count > 0
561 && summary.ready_shards == summary.shard_count
562 && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
563 summary
564 }
565
566 pub fn invalidate_incompatible(
567 &mut self,
568 policy: &SemanticPolicy,
569 current_model_revision: &str,
570 ) -> usize {
571 let before = self.shards.len();
572 self.shards.retain(|shard| {
573 !matches!(
574 shard.to_policy_manifest().invalidation_action(
575 policy,
576 current_model_revision,
577 &shard.embedder_id,
578 ),
579 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
580 )
581 });
582 before.saturating_sub(self.shards.len())
583 }
584
585 pub fn total_size_bytes(&self) -> u64 {
586 self.shards
587 .iter()
588 .map(|shard| shard.size_bytes)
589 .fold(0, u64::saturating_add)
590 }
591}
592
593#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
605pub struct BuildCheckpoint {
606 pub tier: TierKind,
608 pub embedder_id: String,
610 pub last_offset: i64,
612 pub docs_embedded: u64,
614 pub conversations_processed: u64,
616 pub total_conversations: u64,
618 pub db_fingerprint: String,
620 pub schema_version: u32,
622 pub chunking_version: u32,
624 pub saved_at_ms: i64,
626 #[serde(default, skip_serializing_if = "Option::is_none")]
633 pub last_message_id: Option<i64>,
634 #[serde(default, skip_serializing_if = "is_false")]
641 pub cursor_exhausted: bool,
642}
643
644fn is_false(value: &bool) -> bool {
645 !*value
646}
647
648impl BuildCheckpoint {
649 pub fn progress_pct(&self) -> u8 {
651 if self.total_conversations == 0 {
652 return 0;
653 }
654 let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
655 let pct = (pct as u8).min(100);
656 if pct == 100 && !self.cursor_exhausted {
657 99
658 } else {
659 pct
660 }
661 }
662
663 pub fn is_complete(&self) -> bool {
665 self.cursor_exhausted && self.conversations_processed >= self.total_conversations
666 }
667
668 pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
670 self.db_fingerprint == current_db_fingerprint
671 && self.schema_version == SEMANTIC_SCHEMA_VERSION
672 && self.chunking_version == CHUNKING_STRATEGY_VERSION
673 }
674}
675
676#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
680pub struct BacklogLedger {
681 pub total_conversations: u64,
683 pub fast_tier_processed: u64,
685 pub quality_tier_processed: u64,
687 pub db_fingerprint: String,
689 pub computed_at_ms: i64,
691}
692
693impl BacklogLedger {
694 pub fn fast_tier_remaining(&self) -> u64 {
696 self.total_conversations
697 .saturating_sub(self.fast_tier_processed)
698 }
699
700 pub fn quality_tier_remaining(&self) -> u64 {
702 self.total_conversations
703 .saturating_sub(self.quality_tier_processed)
704 }
705
706 pub fn has_pending_work(&self) -> bool {
708 self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
709 }
710
711 pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
713 self.db_fingerprint == current_db_fingerprint
714 }
715}
716
717#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
724pub struct SemanticManifest {
725 pub manifest_version: u32,
727 pub fast_tier: Option<ArtifactRecord>,
729 pub quality_tier: Option<ArtifactRecord>,
731 pub hnsw: Option<HnswRecord>,
733 pub backlog: BacklogLedger,
735 pub checkpoint: Option<BuildCheckpoint>,
737 pub updated_at_ms: i64,
739}
740
741impl Default for SemanticManifest {
742 fn default() -> Self {
743 Self {
744 manifest_version: MANIFEST_FORMAT_VERSION,
745 fast_tier: None,
746 quality_tier: None,
747 hnsw: None,
748 backlog: BacklogLedger {
749 total_conversations: 0,
750 fast_tier_processed: 0,
751 quality_tier_processed: 0,
752 db_fingerprint: String::new(),
753 computed_at_ms: 0,
754 },
755 checkpoint: None,
756 updated_at_ms: 0,
757 }
758 }
759}
760
761impl SemanticManifest {
762 pub fn path(data_dir: &Path) -> PathBuf {
766 data_dir.join("vector_index").join(MANIFEST_FILENAME)
767 }
768
769 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
781 let path = Self::path(data_dir);
782 let bytes = match fs::read(&path) {
783 Ok(b) => b,
784 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
785 Err(e) => {
786 return Err(ManifestError::Io {
787 path,
788 source: e.to_string(),
789 });
790 }
791 };
792
793 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
794 path: path.clone(),
795 source: e.to_string(),
796 })?;
797
798 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
800 return Err(ManifestError::UnsupportedVersion {
801 found: manifest.manifest_version,
802 max_supported: MANIFEST_FORMAT_VERSION,
803 });
804 }
805
806 if manifest.manifest_version <= MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR
812 && manifest
813 .checkpoint
814 .as_ref()
815 .is_some_and(|cp| cp.last_message_id.is_none())
816 {
817 tracing::warn!(
818 manifest_version = manifest.manifest_version,
819 supported_version = MANIFEST_FORMAT_VERSION,
820 path = %path.display(),
821 "semantic checkpoint manifest predates last_message_id cursor (cass#257 sub-fix 2); resume will fall back to conversation offset until the next checkpoint save"
822 );
823 }
824
825 Ok(Some(manifest))
826 }
827
828 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
835 match Self::load(data_dir) {
836 Ok(Some(manifest)) => Ok(manifest),
837 Ok(None) => Ok(Self::default()),
838 Err(e @ ManifestError::Io { .. }) => Err(e),
840 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
842 Ok(Self::default())
843 }
844 Err(e) => Err(e),
845 }
846 }
847
848 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
850 let path = Self::path(data_dir);
851
852 if let Some(parent) = path.parent() {
854 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
855 path: parent.to_path_buf(),
856 source: e.to_string(),
857 })?;
858 }
859
860 self.updated_at_ms = now_ms();
861
862 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
863 source: e.to_string(),
864 })?;
865
866 let (tmp_path, mut file) =
868 create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
869 path: path.clone(),
870 source: e.to_string(),
871 })?;
872 file.write_all(json.as_bytes())
873 .map_err(|e| ManifestError::Io {
874 path: tmp_path.clone(),
875 source: e.to_string(),
876 })?;
877 file.sync_all().map_err(|e| ManifestError::Io {
878 path: tmp_path.clone(),
879 source: e.to_string(),
880 })?;
881 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
882 path: path.clone(),
883 source: e.to_string(),
884 })?;
885 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
886 path: path
887 .parent()
888 .map(Path::to_path_buf)
889 .unwrap_or_else(|| path.clone()),
890 source: e.to_string(),
891 })?;
892
893 Ok(())
894 }
895
896 pub fn fast_tier_readiness(
900 &self,
901 policy: &SemanticPolicy,
902 current_db_fingerprint: &str,
903 current_model_revision: &str,
904 ) -> TierReadiness {
905 match &self.fast_tier {
906 Some(artifact) => {
907 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
908 }
909 None => {
910 if let Some(cp) = &self.checkpoint
912 && cp.tier == TierKind::Fast
913 && cp.is_valid(current_db_fingerprint)
914 {
915 TierReadiness::Building {
916 progress_pct: cp.progress_pct(),
917 }
918 } else {
919 TierReadiness::Missing
920 }
921 }
922 }
923 }
924
925 pub fn quality_tier_readiness(
927 &self,
928 policy: &SemanticPolicy,
929 current_db_fingerprint: &str,
930 current_model_revision: &str,
931 ) -> TierReadiness {
932 match &self.quality_tier {
933 Some(artifact) => {
934 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
935 }
936 None => {
937 if let Some(cp) = &self.checkpoint
938 && cp.tier == TierKind::Quality
939 && cp.is_valid(current_db_fingerprint)
940 {
941 TierReadiness::Building {
942 progress_pct: cp.progress_pct(),
943 }
944 } else {
945 TierReadiness::Missing
946 }
947 }
948 }
949 }
950
951 pub fn can_hybrid_search(
953 &self,
954 policy: &SemanticPolicy,
955 current_db_fingerprint: &str,
956 current_model_revision: &str,
957 ) -> bool {
958 self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
959 .is_usable()
960 }
961
962 pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
966 let fast_processed = self
967 .fast_tier
968 .as_ref()
969 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
970 .map_or(0, |a| a.conversation_count);
971 let quality_processed = self
972 .quality_tier
973 .as_ref()
974 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
975 .map_or(0, |a| a.conversation_count);
976
977 self.backlog = BacklogLedger {
978 total_conversations,
979 fast_tier_processed: fast_processed,
980 quality_tier_processed: quality_processed,
981 db_fingerprint: current_db_fingerprint.to_owned(),
982 computed_at_ms: now_ms(),
983 };
984 }
985
986 pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
988 self.checkpoint = Some(checkpoint);
989 }
990
991 pub fn clear_checkpoint(&mut self) {
993 self.checkpoint = None;
994 }
995
996 pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
998 if self
1000 .checkpoint
1001 .as_ref()
1002 .is_some_and(|cp| cp.tier == artifact.tier)
1003 {
1004 self.checkpoint = None;
1005 }
1006
1007 match artifact.tier {
1008 TierKind::Fast => self.fast_tier = Some(artifact),
1009 TierKind::Quality => self.quality_tier = Some(artifact),
1010 }
1011 }
1012
1013 pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
1015 self.hnsw = Some(hnsw);
1016 }
1017
1018 #[allow(clippy::too_many_arguments)]
1021 pub fn adopt_legacy_artifact(
1022 &mut self,
1023 tier: TierKind,
1024 embedder_id: &str,
1025 model_revision: &str,
1026 dimension: usize,
1027 doc_count: u64,
1028 conversation_count: u64,
1029 db_fingerprint: &str,
1030 index_path: &str,
1031 size_bytes: u64,
1032 ) -> bool {
1033 let record = ArtifactRecord {
1034 tier,
1035 embedder_id: embedder_id.to_owned(),
1036 model_revision: model_revision.to_owned(),
1037 schema_version: SEMANTIC_SCHEMA_VERSION,
1038 chunking_version: CHUNKING_STRATEGY_VERSION,
1039 dimension,
1040 doc_count,
1041 conversation_count,
1042 db_fingerprint: db_fingerprint.to_owned(),
1043 index_path: index_path.to_owned(),
1044 size_bytes,
1045 started_at_ms: 0,
1046 completed_at_ms: now_ms(),
1047 ready: true,
1048 };
1049
1050 match tier {
1051 TierKind::Fast => self.fast_tier = Some(record),
1052 TierKind::Quality => self.quality_tier = Some(record),
1053 }
1054 true
1055 }
1056
1057 pub fn invalidate_incompatible(
1067 &mut self,
1068 policy: &SemanticPolicy,
1069 current_model_revision: &str,
1070 ) -> usize {
1071 let mut count = 0;
1072
1073 if let Some(ref artifact) = self.fast_tier {
1074 let pm = artifact.to_policy_manifest();
1075 if matches!(
1076 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1077 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1078 ) {
1079 self.fast_tier = None;
1080 count += 1;
1081 }
1082 }
1083
1084 if let Some(ref artifact) = self.quality_tier {
1085 let pm = artifact.to_policy_manifest();
1086 if matches!(
1087 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1088 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1089 ) {
1090 self.quality_tier = None;
1091 count += 1;
1092 }
1093 }
1094
1095 if let Some(ref hnsw) = self.hnsw {
1097 let base_gone = match hnsw.base_tier {
1098 TierKind::Fast => self.fast_tier.is_none(),
1099 TierKind::Quality => self.quality_tier.is_none(),
1100 };
1101 if base_gone {
1102 self.hnsw = None;
1103 count += 1;
1104 }
1105 }
1106
1107 if let Some(ref cp) = self.checkpoint
1109 && (cp.schema_version != policy.semantic_schema_version
1110 || cp.chunking_version != policy.chunking_strategy_version)
1111 {
1112 self.checkpoint = None;
1113 }
1114
1115 count
1116 }
1117
1118 pub fn total_size_bytes(&self) -> u64 {
1120 let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1121 let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1122 let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1123 fast + quality + hnsw
1124 }
1125
1126 pub fn total_size_mb(&self) -> u64 {
1128 self.total_size_bytes().div_ceil(1_048_576)
1129 }
1130}
1131
1132#[derive(Debug)]
1135pub enum ManifestError {
1136 Io { path: PathBuf, source: String },
1137 Parse { path: PathBuf, source: String },
1138 Serialize { source: String },
1139 UnsupportedVersion { found: u32, max_supported: u32 },
1140}
1141
1142impl std::fmt::Display for ManifestError {
1143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1144 match self {
1145 Self::Io { path, source } => {
1146 write!(f, "manifest I/O error at {}: {source}", path.display())
1147 }
1148 Self::Parse { path, source } => {
1149 write!(f, "manifest parse error at {}: {source}", path.display())
1150 }
1151 Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1152 Self::UnsupportedVersion {
1153 found,
1154 max_supported,
1155 } => write!(
1156 f,
1157 "manifest version {found} is newer than supported version {max_supported}"
1158 ),
1159 }
1160 }
1161}
1162
1163impl std::error::Error for ManifestError {}
1164
1165fn now_ms() -> i64 {
1168 SystemTime::now()
1169 .duration_since(UNIX_EPOCH)
1170 .map(|d| d.as_millis() as i64)
1171 .unwrap_or(0)
1172}
1173
1174fn unique_manifest_temp_path(path: &Path, attempt: u32, random: u64) -> PathBuf {
1175 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1176
1177 let file_name = path
1178 .file_name()
1179 .and_then(|name| name.to_str())
1180 .unwrap_or(MANIFEST_FILENAME);
1181 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1182 path.with_file_name(format!(
1183 ".{file_name}.tmp.{attempt}.{}.{}.{random:016x}",
1184 now_ms(),
1185 nonce
1186 ))
1187}
1188
1189fn create_unique_manifest_temp_file(path: &Path) -> std::io::Result<(PathBuf, fs::File)> {
1190 for attempt in 0..100 {
1191 let random = random_manifest_path_nonce()?;
1192 let tmp_path = unique_manifest_temp_path(path, attempt, random);
1193 match OpenOptions::new()
1194 .write(true)
1195 .create_new(true)
1196 .open(&tmp_path)
1197 {
1198 Ok(file) => return Ok((tmp_path, file)),
1199 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
1200 Err(err) => return Err(err),
1201 }
1202 }
1203
1204 Err(std::io::Error::new(
1205 std::io::ErrorKind::AlreadyExists,
1206 format!(
1207 "could not create a unique temporary manifest file for {} after 100 attempts",
1208 path.display()
1209 ),
1210 ))
1211}
1212
1213#[cfg(windows)]
1214fn unique_manifest_backup_path(path: &Path) -> std::io::Result<PathBuf> {
1215 let random = random_manifest_path_nonce()?;
1216 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1217
1218 let file_name = path
1219 .file_name()
1220 .and_then(|name| name.to_str())
1221 .unwrap_or(MANIFEST_FILENAME);
1222 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1223 Ok(path.with_file_name(format!(
1224 ".{file_name}.bak.{}.{nonce}.{random:016x}",
1225 now_ms()
1226 )))
1227}
1228
1229fn random_manifest_path_nonce() -> std::io::Result<u64> {
1230 let mut random_bytes = [0u8; 8];
1231 SystemRandom::new()
1232 .fill(&mut random_bytes)
1233 .map_err(|_| std::io::Error::other("failed to generate manifest temp path nonce"))?;
1234 Ok(u64::from_le_bytes(random_bytes))
1235}
1236
1237fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1238 #[cfg(windows)]
1239 {
1240 match fs::rename(temp_path, final_path) {
1241 Ok(()) => sync_parent_directory(final_path),
1242 Err(first_err)
1243 if replacement_path_entry_exists(final_path)?
1244 && matches!(
1245 first_err.kind(),
1246 std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1247 ) =>
1248 {
1249 let backup_path = unique_manifest_backup_path(final_path)?;
1250 fs::rename(final_path, &backup_path).map_err(|backup_err| {
1251 let _ = fs::remove_file(temp_path);
1252 std::io::Error::other(format!(
1253 "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1254 backup_path.display(),
1255 final_path.display(),
1256 first_err,
1257 backup_err
1258 ))
1259 })?;
1260 match fs::rename(temp_path, final_path) {
1261 Ok(()) => {
1262 let _ = fs::remove_file(&backup_path);
1263 sync_parent_directory(final_path)
1264 }
1265 Err(second_err) => match fs::rename(&backup_path, final_path) {
1266 Ok(()) => {
1267 let _ = fs::remove_file(temp_path);
1268 sync_parent_directory(final_path)?;
1269 Err(std::io::Error::other(format!(
1270 "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1271 final_path.display(),
1272 temp_path.display(),
1273 first_err,
1274 second_err
1275 )))
1276 }
1277 Err(restore_err) => Err(std::io::Error::other(format!(
1278 "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1279 final_path.display(),
1280 temp_path.display(),
1281 first_err,
1282 second_err,
1283 restore_err,
1284 temp_path.display()
1285 ))),
1286 },
1287 }
1288 }
1289 Err(err) => Err(err),
1290 }
1291 }
1292
1293 #[cfg(not(windows))]
1294 {
1295 fs::rename(temp_path, final_path)
1296 }
1297}
1298
1299#[cfg(any(windows, test))]
1300fn replacement_path_entry_exists(path: &Path) -> std::io::Result<bool> {
1301 match fs::symlink_metadata(path) {
1302 Ok(_) => Ok(true),
1303 Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(false),
1304 Err(err) => Err(std::io::Error::new(
1305 err.kind(),
1306 format!(
1307 "failed inspecting semantic manifest replacement target {}: {err}",
1308 path.display()
1309 ),
1310 )),
1311 }
1312}
1313
1314#[cfg(not(windows))]
1315fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1316 let Some(parent) = path.parent() else {
1317 return Ok(());
1318 };
1319 let directory = fs::File::open(parent)?;
1320 directory.sync_all()
1321}
1322
1323#[cfg(windows)]
1324fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1325 Ok(())
1326}
1327
1328#[cfg(test)]
1331mod tests {
1332 use super::*;
1333 use crate::search::policy::SemanticPolicy;
1334
1335 fn test_policy() -> SemanticPolicy {
1336 SemanticPolicy::compiled_defaults()
1337 }
1338
1339 fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1340 ArtifactRecord {
1341 tier,
1342 embedder_id: match tier {
1343 TierKind::Fast => "fnv1a-384".to_owned(),
1344 TierKind::Quality => "minilm-384".to_owned(),
1345 },
1346 model_revision: "abc123".to_owned(),
1347 schema_version: SEMANTIC_SCHEMA_VERSION,
1348 chunking_version: CHUNKING_STRATEGY_VERSION,
1349 dimension: 384,
1350 doc_count: 1000,
1351 conversation_count: 250,
1352 db_fingerprint: "fp-1234".to_owned(),
1353 index_path: format!(
1354 "vector_index/index-{}.fsvi",
1355 match tier {
1356 TierKind::Fast => "fnv1a-384",
1357 TierKind::Quality => "minilm-384",
1358 }
1359 ),
1360 size_bytes: 150_000,
1361 started_at_ms: 1_700_000_000_000,
1362 completed_at_ms: 1_700_000_060_000,
1363 ready,
1364 }
1365 }
1366
1367 fn test_hnsw() -> HnswRecord {
1368 HnswRecord {
1369 base_tier: TierKind::Quality,
1370 embedder_id: "minilm-384".to_owned(),
1371 ef_search: 128,
1372 index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1373 size_bytes: 50_000,
1374 built_at_ms: 1_700_000_070_000,
1375 ready: true,
1376 }
1377 }
1378
1379 fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1380 SemanticShardRecord {
1381 tier: TierKind::Fast,
1382 embedder_id: "fnv1a-384".to_owned(),
1383 model_revision: "hash".to_owned(),
1384 schema_version: SEMANTIC_SCHEMA_VERSION,
1385 chunking_version: CHUNKING_STRATEGY_VERSION,
1386 dimension: 384,
1387 shard_index,
1388 shard_count,
1389 doc_count: 25,
1390 total_conversations: 10,
1391 db_fingerprint: "fp-sharded".to_owned(),
1392 index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1393 quantization: "f16".to_owned(),
1394 mmap_ready: true,
1395 ann_index_path: None,
1396 ann_size_bytes: 0,
1397 ann_ready: false,
1398 size_bytes: 4096,
1399 started_at_ms: 1_700_000_080_000,
1400 completed_at_ms: 1_700_000_081_000,
1401 ready,
1402 }
1403 }
1404
1405 fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1406 BuildCheckpoint {
1407 tier,
1408 embedder_id: "minilm-384".to_owned(),
1409 last_offset: 500,
1410 docs_embedded: 3000,
1411 conversations_processed: 500,
1412 total_conversations: 1000,
1413 db_fingerprint: "fp-1234".to_owned(),
1414 schema_version: SEMANTIC_SCHEMA_VERSION,
1415 chunking_version: CHUNKING_STRATEGY_VERSION,
1416 saved_at_ms: 1_700_000_030_000,
1417 last_message_id: None,
1418 cursor_exhausted: false,
1419 }
1420 }
1421
1422 #[derive(Debug, Clone, Copy)]
1423 enum ExpectedTierReadiness {
1424 Ready,
1425 Stale,
1426 Incompatible,
1427 Building(u8),
1428 }
1429
1430 fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1431
1432 type TierReadinessCase = (
1433 &'static str,
1434 TierKind,
1435 bool,
1436 &'static str,
1437 &'static str,
1438 fn(&mut ArtifactRecord),
1439 ExpectedTierReadiness,
1440 );
1441
1442 fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1443 artifact.schema_version = 0;
1444 }
1445
1446 fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1447 match expected {
1448 ExpectedTierReadiness::Ready => {
1449 assert_eq!(actual, TierReadiness::Ready, "{label}");
1450 }
1451 ExpectedTierReadiness::Stale => {
1452 assert!(
1453 matches!(actual, TierReadiness::Stale { .. }),
1454 "{label}: {actual:?}"
1455 );
1456 }
1457 ExpectedTierReadiness::Incompatible => {
1458 assert!(
1459 matches!(actual, TierReadiness::Incompatible { .. }),
1460 "{label}: {actual:?}"
1461 );
1462 }
1463 ExpectedTierReadiness::Building(progress_pct) => {
1464 assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1465 }
1466 }
1467 }
1468
1469 #[test]
1472 fn manifest_round_trip_via_disk() {
1473 let temp = tempfile::tempdir().unwrap();
1474 let mut manifest = SemanticManifest {
1475 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1476 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1477 hnsw: Some(test_hnsw()),
1478 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1479 backlog: BacklogLedger {
1480 total_conversations: 2000,
1481 fast_tier_processed: 1000,
1482 quality_tier_processed: 500,
1483 db_fingerprint: "fp-1234".to_owned(),
1484 computed_at_ms: 1_700_000_000_000,
1485 },
1486 ..Default::default()
1487 };
1488
1489 manifest.save(temp.path()).unwrap();
1490 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1491
1492 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1493 assert!(loaded.fast_tier.is_some());
1494 assert!(loaded.quality_tier.is_some());
1495 assert!(loaded.hnsw.is_some());
1496 assert!(loaded.checkpoint.is_some());
1497 assert_eq!(loaded.backlog.total_conversations, 2000);
1498 assert!(loaded.updated_at_ms > 0);
1499 }
1500
1501 #[test]
1502 fn manifest_save_overwrites_existing_file() {
1503 let temp = tempfile::tempdir().unwrap();
1504 let mut first = SemanticManifest {
1505 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1506 ..Default::default()
1507 };
1508 first.save(temp.path()).unwrap();
1509
1510 let mut second = SemanticManifest {
1511 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1512 backlog: BacklogLedger {
1513 total_conversations: 99,
1514 fast_tier_processed: 0,
1515 quality_tier_processed: 99,
1516 db_fingerprint: "fp-overwrite".to_owned(),
1517 computed_at_ms: 1_700_000_000_123,
1518 },
1519 ..Default::default()
1520 };
1521 second.save(temp.path()).unwrap();
1522
1523 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1524 assert!(loaded.fast_tier.is_none());
1525 assert!(loaded.quality_tier.is_some());
1526 assert_eq!(loaded.backlog.total_conversations, 99);
1527 }
1528
1529 #[cfg(unix)]
1530 #[test]
1531 fn manifest_replacement_path_entry_exists_detects_dangling_symlink() -> Result<(), String> {
1532 use std::os::unix::fs::symlink;
1533
1534 let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1535 let link_path = SemanticManifest::path(temp.path());
1536 let manifest_dir = link_path
1537 .parent()
1538 .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1539 fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1540 let missing_target = manifest_dir.join("missing-semantic-manifest.json");
1541
1542 symlink(&missing_target, &link_path).map_err(|e| e.to_string())?;
1543
1544 if link_path.exists() {
1545 return Err("dangling manifest symlink unexpectedly resolved".to_string());
1546 }
1547 if !replacement_path_entry_exists(&link_path).map_err(|e| e.to_string())? {
1548 return Err(format!(
1549 "semantic manifest replacement entry check missed dangling symlink {}",
1550 link_path.display()
1551 ));
1552 }
1553
1554 Ok(())
1555 }
1556
1557 #[test]
1558 fn manifest_temp_file_creation_is_exclusive_and_unique() -> Result<(), String> {
1559 let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1560 let final_path = SemanticManifest::path(temp.path());
1561 let manifest_dir = final_path
1562 .parent()
1563 .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1564 fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1565
1566 let (first_path, mut first_file) =
1567 create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1568 first_file.write_all(b"first").map_err(|e| e.to_string())?;
1569 let (second_path, mut second_file) =
1570 create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1571 second_file
1572 .write_all(b"second")
1573 .map_err(|e| e.to_string())?;
1574
1575 if first_path == second_path {
1576 return Err("exclusive temp creation reused the same path".to_string());
1577 }
1578 if !first_path.exists() {
1579 return Err(format!(
1580 "first temp file is missing: {}",
1581 first_path.display()
1582 ));
1583 }
1584 if !second_path.exists() {
1585 return Err(format!(
1586 "second temp file is missing: {}",
1587 second_path.display()
1588 ));
1589 }
1590 if first_path.parent() != Some(manifest_dir) {
1591 return Err(format!(
1592 "first temp path escaped manifest directory: {}",
1593 first_path.display()
1594 ));
1595 }
1596 if second_path.parent() != Some(manifest_dir) {
1597 return Err(format!(
1598 "second temp path escaped manifest directory: {}",
1599 second_path.display()
1600 ));
1601 }
1602
1603 Ok(())
1604 }
1605
1606 #[test]
1607 fn manifest_load_missing_returns_none() {
1608 let temp = tempfile::tempdir().unwrap();
1609 let loaded = SemanticManifest::load(temp.path()).unwrap();
1610 assert!(loaded.is_none());
1611 }
1612
1613 #[test]
1614 fn manifest_load_or_default_returns_defaults() {
1615 let temp = tempfile::tempdir().unwrap();
1616 let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1617 assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1618 assert!(manifest.fast_tier.is_none());
1619 assert!(manifest.quality_tier.is_none());
1620 }
1621
1622 #[test]
1623 fn manifest_load_corrupt_returns_parse_error() {
1624 let temp = tempfile::tempdir().unwrap();
1625 let path = SemanticManifest::path(temp.path());
1626 fs::create_dir_all(path.parent().unwrap()).unwrap();
1627 fs::write(&path, b"not json").unwrap();
1628
1629 let result = SemanticManifest::load(temp.path());
1630 assert!(matches!(result, Err(ManifestError::Parse { .. })));
1631 }
1632
1633 #[test]
1634 fn manifest_load_future_version_returns_error() {
1635 let temp = tempfile::tempdir().unwrap();
1636 let path = SemanticManifest::path(temp.path());
1637 fs::create_dir_all(path.parent().unwrap()).unwrap();
1638
1639 let manifest = SemanticManifest {
1640 manifest_version: MANIFEST_FORMAT_VERSION + 1,
1641 ..Default::default()
1642 };
1643 let json = serde_json::to_string(&manifest).unwrap();
1644 fs::write(&path, json).unwrap();
1645
1646 let result = SemanticManifest::load(temp.path());
1647 assert!(matches!(
1648 result,
1649 Err(ManifestError::UnsupportedVersion { .. })
1650 ));
1651 }
1652
1653 #[test]
1656 fn tier_readiness_cases() {
1657 let policy = test_policy();
1658 let db_fp = "fp-1234";
1659 let model_rev = "abc123";
1660 let cases: &[TierReadinessCase] = &[
1661 (
1662 "ready artifact with matching fingerprint",
1663 TierKind::Fast,
1664 true,
1665 db_fp,
1666 model_rev,
1667 no_artifact_mutation,
1668 ExpectedTierReadiness::Ready,
1669 ),
1670 (
1671 "ready artifact with changed DB fingerprint",
1672 TierKind::Fast,
1673 true,
1674 "different-fp",
1675 model_rev,
1676 no_artifact_mutation,
1677 ExpectedTierReadiness::Stale,
1678 ),
1679 (
1680 "ready artifact with changed model revision",
1681 TierKind::Quality,
1682 true,
1683 db_fp,
1684 "new-revision",
1685 no_artifact_mutation,
1686 ExpectedTierReadiness::Stale,
1687 ),
1688 (
1689 "schema version mismatch",
1690 TierKind::Quality,
1691 true,
1692 db_fp,
1693 model_rev,
1694 set_schema_version_to_zero,
1695 ExpectedTierReadiness::Incompatible,
1696 ),
1697 (
1698 "not yet published artifact",
1699 TierKind::Fast,
1700 false,
1701 db_fp,
1702 model_rev,
1703 no_artifact_mutation,
1704 ExpectedTierReadiness::Building(100),
1705 ),
1706 ];
1707
1708 for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1709 let mut artifact = test_artifact(*tier, *ready);
1710 mutate(&mut artifact);
1711 assert_tier_readiness(
1712 artifact.readiness(&policy, current_db_fp, current_model_rev),
1713 *expected,
1714 label,
1715 );
1716 }
1717 }
1718
1719 #[test]
1722 fn manifest_tier_readiness_missing() {
1723 let manifest = SemanticManifest::default();
1724 let policy = test_policy();
1725 assert_eq!(
1726 manifest.fast_tier_readiness(&policy, "fp", "rev"),
1727 TierReadiness::Missing,
1728 );
1729 assert_eq!(
1730 manifest.quality_tier_readiness(&policy, "fp", "rev"),
1731 TierReadiness::Missing,
1732 );
1733 }
1734
1735 #[test]
1736 fn manifest_tier_readiness_with_checkpoint() {
1737 let manifest = SemanticManifest {
1738 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1739 ..Default::default()
1740 };
1741
1742 let policy = test_policy();
1743 assert_eq!(
1745 manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1746 TierReadiness::Missing,
1747 );
1748 assert!(matches!(
1750 manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1751 TierReadiness::Building { progress_pct: 50 },
1752 ));
1753 }
1754
1755 #[test]
1756 fn manifest_tier_readiness_checkpoint_invalid_db() {
1757 let manifest = SemanticManifest {
1758 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1759 ..Default::default()
1760 };
1761
1762 let policy = test_policy();
1763 assert_eq!(
1765 manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1766 TierReadiness::Missing,
1767 );
1768 }
1769
1770 #[test]
1773 fn can_hybrid_search_requires_usable_fast_tier() {
1774 let policy = test_policy();
1775 let db_fp = "fp-1234";
1776 let rev = "abc123";
1777
1778 let manifest = SemanticManifest::default();
1780 assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1781
1782 let manifest = SemanticManifest {
1784 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1785 ..Default::default()
1786 };
1787 assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1788 }
1789
1790 #[test]
1793 fn backlog_remaining_and_pending() {
1794 let ledger = BacklogLedger {
1795 total_conversations: 1000,
1796 fast_tier_processed: 800,
1797 quality_tier_processed: 300,
1798 db_fingerprint: "fp".to_owned(),
1799 computed_at_ms: 0,
1800 };
1801
1802 assert_eq!(ledger.fast_tier_remaining(), 200);
1803 assert_eq!(ledger.quality_tier_remaining(), 700);
1804 assert!(ledger.has_pending_work());
1805 assert!(ledger.is_current("fp"));
1806 assert!(!ledger.is_current("other"));
1807 }
1808
1809 #[test]
1810 fn backlog_no_pending_when_fully_processed() {
1811 let ledger = BacklogLedger {
1812 total_conversations: 500,
1813 fast_tier_processed: 500,
1814 quality_tier_processed: 500,
1815 db_fingerprint: "fp".to_owned(),
1816 computed_at_ms: 0,
1817 };
1818
1819 assert_eq!(ledger.fast_tier_remaining(), 0);
1820 assert_eq!(ledger.quality_tier_remaining(), 0);
1821 assert!(!ledger.has_pending_work());
1822 }
1823
1824 #[test]
1827 fn checkpoint_progress_and_completion() -> Result<(), String> {
1828 let cp = test_checkpoint(TierKind::Quality);
1829 assert_eq!(cp.progress_pct(), 50);
1830 assert!(!cp.is_complete());
1831 assert!(cp.is_valid("fp-1234"));
1832 assert!(!cp.is_valid("other-fp"));
1833
1834 let mut cp = test_checkpoint(TierKind::Quality);
1836 cp.conversations_processed = 1000;
1837 assert_eq!(cp.progress_pct(), 99);
1838 if cp.is_complete() {
1839 return Err("count parity alone should not complete a checkpoint".to_string());
1840 }
1841 cp.cursor_exhausted = true;
1842 assert_eq!(cp.progress_pct(), 100);
1843 assert!(cp.is_complete());
1844 Ok(())
1845 }
1846
1847 #[test]
1848 fn checkpoint_zero_total_gives_zero_pct() {
1849 let mut cp = test_checkpoint(TierKind::Fast);
1850 cp.total_conversations = 0;
1851 cp.conversations_processed = 0;
1852 assert_eq!(cp.progress_pct(), 0);
1853 }
1854
1855 #[test]
1858 fn publish_artifact_clears_matching_checkpoint() {
1859 let mut manifest = SemanticManifest {
1860 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1861 ..Default::default()
1862 };
1863
1864 manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1865 assert!(manifest.checkpoint.is_none());
1866 assert!(manifest.quality_tier.is_some());
1867 }
1868
1869 #[test]
1870 fn publish_artifact_keeps_non_matching_checkpoint() {
1871 let mut manifest = SemanticManifest {
1872 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1873 ..Default::default()
1874 };
1875
1876 manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1877 assert!(manifest.checkpoint.is_some()); assert!(manifest.fast_tier.is_some());
1879 }
1880
1881 #[test]
1884 fn refresh_backlog_computes_from_ready_artifacts() {
1885 let mut manifest = SemanticManifest {
1886 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1887 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1888 ..Default::default()
1889 };
1890
1891 manifest.refresh_backlog(2000, "fp-1234");
1892 assert_eq!(manifest.backlog.total_conversations, 2000);
1893 assert_eq!(manifest.backlog.fast_tier_processed, 250);
1894 assert_eq!(manifest.backlog.quality_tier_processed, 250);
1895 }
1896
1897 #[test]
1898 fn refresh_backlog_ignores_stale_artifacts() {
1899 let mut manifest = SemanticManifest {
1900 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1901 ..Default::default()
1902 };
1903
1904 manifest.refresh_backlog(2000, "different-fp");
1906 assert_eq!(manifest.backlog.fast_tier_processed, 0);
1907 }
1908
1909 #[test]
1912 fn invalidate_incompatible_removes_schema_mismatch() {
1913 let mut artifact = test_artifact(TierKind::Quality, true);
1914 artifact.schema_version = 0; let mut manifest = SemanticManifest {
1916 quality_tier: Some(artifact),
1917 hnsw: Some(test_hnsw()), ..Default::default()
1919 };
1920
1921 let policy = test_policy();
1922 let count = manifest.invalidate_incompatible(&policy, "abc123");
1923
1924 assert_eq!(count, 2); assert!(manifest.quality_tier.is_none());
1926 assert!(manifest.hnsw.is_none());
1927 }
1928
1929 #[test]
1930 fn invalidate_incompatible_keeps_compatible() {
1931 let mut manifest = SemanticManifest {
1932 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1933 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1934 ..Default::default()
1935 };
1936
1937 let policy = test_policy();
1938 let count = manifest.invalidate_incompatible(&policy, "abc123");
1939
1940 assert_eq!(count, 0);
1941 assert!(manifest.fast_tier.is_some());
1942 assert!(manifest.quality_tier.is_some());
1943 }
1944
1945 #[test]
1948 fn adopt_legacy_artifact() {
1949 let mut manifest = SemanticManifest::default();
1950 let doc_count = 500;
1951 let conversation_count = 125;
1952 let index_path = "vector_index/index-fnv1a-384.fsvi";
1953 let db_fingerprint = "fp-old";
1954 let size_bytes = 75_000;
1955 let adopted = manifest.adopt_legacy_artifact(
1956 TierKind::Fast,
1957 "fnv1a-384",
1958 "hash",
1959 384,
1960 doc_count,
1961 conversation_count,
1962 db_fingerprint,
1963 index_path,
1964 size_bytes,
1965 );
1966
1967 assert!(adopted);
1968 let fast = manifest.fast_tier.as_ref().unwrap();
1969 assert_eq!(fast.embedder_id, "fnv1a-384");
1970 assert!(fast.ready);
1971 assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1972 }
1973
1974 #[test]
1977 fn total_size_accounts_for_all_artifacts() {
1978 let manifest = SemanticManifest {
1979 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1980 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1981 hnsw: Some(test_hnsw()),
1982 ..Default::default()
1983 };
1984
1985 assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1986 assert_eq!(manifest.total_size_mb(), 1); }
1988
1989 #[test]
1990 fn total_size_empty_is_zero() {
1991 let manifest = SemanticManifest::default();
1992 assert_eq!(manifest.total_size_bytes(), 0);
1993 assert_eq!(manifest.total_size_mb(), 0);
1994 }
1995
1996 #[test]
1999 fn manifest_json_round_trip() {
2000 let manifest = SemanticManifest {
2001 fast_tier: Some(test_artifact(TierKind::Fast, true)),
2002 quality_tier: Some(test_artifact(TierKind::Quality, true)),
2003 hnsw: Some(test_hnsw()),
2004 checkpoint: Some(test_checkpoint(TierKind::Quality)),
2005 ..Default::default()
2006 };
2007
2008 let json = serde_json::to_string_pretty(&manifest).unwrap();
2009 let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
2010 assert_eq!(deser.fast_tier, manifest.fast_tier);
2011 assert_eq!(deser.quality_tier, manifest.quality_tier);
2012 assert_eq!(deser.hnsw, manifest.hnsw);
2013 assert_eq!(deser.checkpoint, manifest.checkpoint);
2014 }
2015
2016 #[test]
2019 fn shard_manifest_round_trip_via_sidecar() {
2020 let temp = tempfile::tempdir().unwrap();
2021 let mut shards = SemanticShardManifest::default();
2022 shards.replace_shards_for_generation(
2023 TierKind::Fast,
2024 "fnv1a-384",
2025 "fp-sharded",
2026 vec![test_shard(1, 2, true), test_shard(0, 2, true)],
2027 );
2028
2029 shards.save(temp.path()).unwrap();
2030 let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
2031
2032 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
2033 assert_eq!(loaded.shards.len(), 2);
2034 assert_eq!(loaded.shards[0].shard_index, 0);
2035 assert_eq!(loaded.shards[1].shard_index, 1);
2036 assert!(loaded.updated_at_ms > 0);
2037 }
2038
2039 #[test]
2040 fn shard_summary_requires_every_ready_shard() {
2041 let mut shards = SemanticShardManifest::default();
2042 shards.replace_shards_for_generation(
2043 TierKind::Fast,
2044 "fnv1a-384",
2045 "fp-sharded",
2046 vec![test_shard(0, 3, true), test_shard(2, 3, true)],
2047 );
2048
2049 let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2050 assert_eq!(partial.shard_count, 3);
2051 assert_eq!(partial.ready_shards, 2);
2052 assert!(!partial.complete);
2053
2054 shards.replace_shards_for_generation(
2055 TierKind::Fast,
2056 "fnv1a-384",
2057 "fp-sharded",
2058 vec![
2059 test_shard(0, 3, true),
2060 test_shard(1, 3, true),
2061 test_shard(2, 3, true),
2062 ],
2063 );
2064
2065 let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2066 assert_eq!(complete.ready_shards, 3);
2067 assert!(complete.complete);
2068 assert_eq!(complete.doc_count, 75);
2069 assert_eq!(complete.total_conversations, 10);
2070 }
2071
2072 #[test]
2073 fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
2074 let mut non_mmap = test_shard(0, 1, true);
2075 non_mmap.mmap_ready = false;
2076 let mut shards = SemanticShardManifest::default();
2077 shards.replace_shards_for_generation(
2078 TierKind::Fast,
2079 "fnv1a-384",
2080 "fp-sharded",
2081 vec![non_mmap],
2082 );
2083
2084 let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2085 assert_eq!(non_mmap_summary.ready_shards, 0);
2086 assert!(!non_mmap_summary.complete);
2087
2088 let mut inconsistent = test_shard(1, 3, true);
2089 inconsistent.ann_ready = true;
2090 inconsistent.ann_index_path = None;
2091 inconsistent.ann_size_bytes = 4096;
2092 shards.replace_shards_for_generation(
2093 TierKind::Fast,
2094 "fnv1a-384",
2095 "fp-sharded",
2096 vec![test_shard(0, 2, true), inconsistent],
2097 );
2098
2099 let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2100 assert_eq!(inconsistent_summary.shard_count, 3);
2101 assert_eq!(inconsistent_summary.ready_shards, 2);
2102 assert_eq!(inconsistent_summary.ann_ready_shards, 0);
2103 assert!(!inconsistent_summary.complete);
2104
2105 shards.replace_shards_for_generation(
2106 TierKind::Fast,
2107 "fnv1a-384",
2108 "fp-sharded",
2109 vec![
2110 test_shard(0, 2, true),
2111 test_shard(1, 2, true),
2112 test_shard(1, 2, false),
2113 ],
2114 );
2115 let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2116 assert_eq!(duplicate_summary.shard_count, 2);
2117 assert_eq!(duplicate_summary.ready_shards, 2);
2118 assert!(
2119 !duplicate_summary.complete,
2120 "duplicate shard indexes must not summarize as a complete generation"
2121 );
2122
2123 let mut duplicate_path = test_shard(1, 2, true);
2124 duplicate_path.index_path = test_shard(0, 2, true).index_path;
2125 shards.replace_shards_for_generation(
2126 TierKind::Fast,
2127 "fnv1a-384",
2128 "fp-sharded",
2129 vec![test_shard(0, 2, true), duplicate_path],
2130 );
2131 let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2132 assert_eq!(duplicate_path_summary.shard_count, 2);
2133 assert_eq!(duplicate_path_summary.ready_shards, 2);
2134 assert!(
2135 !duplicate_path_summary.complete,
2136 "duplicate shard index paths must not summarize as a complete generation"
2137 );
2138
2139 let mut blank_path = test_shard(0, 1, true);
2140 blank_path.index_path.clear();
2141 shards.replace_shards_for_generation(
2142 TierKind::Fast,
2143 "fnv1a-384",
2144 "fp-sharded",
2145 vec![blank_path],
2146 );
2147 let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2148 assert_eq!(blank_path_summary.shard_count, 1);
2149 assert_eq!(blank_path_summary.ready_shards, 1);
2150 assert!(
2151 !blank_path_summary.complete,
2152 "blank shard index paths must not summarize as complete"
2153 );
2154
2155 for unsafe_path in [
2156 tempfile::tempdir()
2157 .unwrap()
2158 .path()
2159 .join("outside.fsvi")
2160 .to_string_lossy()
2161 .to_string(),
2162 "vector_index/shards/../outside.fsvi".to_string(),
2163 "./vector_index/shards/fast/hash.fsvi".to_string(),
2164 " vector_index/shards/fast/hash.fsvi".to_string(),
2165 ] {
2166 let mut unsafe_shard = test_shard(0, 1, true);
2167 unsafe_shard.index_path = unsafe_path;
2168 shards.replace_shards_for_generation(
2169 TierKind::Fast,
2170 "fnv1a-384",
2171 "fp-sharded",
2172 vec![unsafe_shard],
2173 );
2174 let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2175 assert_eq!(unsafe_path_summary.shard_count, 1);
2176 assert_eq!(unsafe_path_summary.ready_shards, 1);
2177 assert!(
2178 !unsafe_path_summary.complete,
2179 "unsafe shard index paths must not summarize as complete"
2180 );
2181 }
2182
2183 let outside_ann_dir = tempfile::tempdir().unwrap();
2184 for unsafe_ann_path in [
2185 outside_ann_dir
2186 .path()
2187 .join("outside.chsw")
2188 .to_string_lossy()
2189 .to_string(),
2190 "vector_index/shards/../outside.chsw".to_string(),
2191 "./vector_index/shards/fast/hash.chsw".to_string(),
2192 " vector_index/shards/fast/hash.chsw".to_string(),
2193 ] {
2194 let mut unsafe_ann = test_shard(0, 1, true);
2195 unsafe_ann.ann_ready = true;
2196 unsafe_ann.ann_index_path = Some(unsafe_ann_path);
2197 unsafe_ann.ann_size_bytes = 4096;
2198 shards.replace_shards_for_generation(
2199 TierKind::Fast,
2200 "fnv1a-384",
2201 "fp-sharded",
2202 vec![unsafe_ann],
2203 );
2204 let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2205 assert_eq!(unsafe_ann_summary.shard_count, 1);
2206 assert_eq!(unsafe_ann_summary.ready_shards, 1);
2207 assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
2208 assert!(
2209 unsafe_ann_summary.complete,
2210 "unsafe optional ANN paths must not invalidate the vector shard generation"
2211 );
2212 }
2213
2214 let mut duplicate_ann_path = test_shard(1, 2, true);
2215 duplicate_ann_path.ann_ready = true;
2216 duplicate_ann_path.ann_index_path =
2217 Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
2218 duplicate_ann_path.ann_size_bytes = 4096;
2219 let mut first_ann_path = test_shard(0, 2, true);
2220 first_ann_path.ann_ready = true;
2221 first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2222 first_ann_path.ann_size_bytes = 4096;
2223 shards.replace_shards_for_generation(
2224 TierKind::Fast,
2225 "fnv1a-384",
2226 "fp-sharded",
2227 vec![first_ann_path, duplicate_ann_path],
2228 );
2229 let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2230 assert_eq!(duplicate_ann_summary.shard_count, 2);
2231 assert_eq!(duplicate_ann_summary.ready_shards, 2);
2232 assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2233 assert!(
2234 duplicate_ann_summary.complete,
2235 "duplicate optional ANN paths must not invalidate the vector shard generation"
2236 );
2237
2238 shards.replace_shards_for_generation(
2239 TierKind::Fast,
2240 "fnv1a-384",
2241 "fp-sharded",
2242 vec![test_shard(2, 2, true)],
2243 );
2244 let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2245 assert_eq!(out_of_range_summary.shard_count, 2);
2246 assert_eq!(out_of_range_summary.ready_shards, 1);
2247 assert!(
2248 !out_of_range_summary.complete,
2249 "shard indexes outside the declared shard count are malformed"
2250 );
2251
2252 let mut mismatched_metadata = test_shard(1, 2, true);
2253 mismatched_metadata.dimension = 768;
2254 shards.replace_shards_for_generation(
2255 TierKind::Fast,
2256 "fnv1a-384",
2257 "fp-sharded",
2258 vec![test_shard(0, 2, true), mismatched_metadata],
2259 );
2260 let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2261 assert_eq!(metadata_summary.shard_count, 2);
2262 assert_eq!(metadata_summary.ready_shards, 2);
2263 assert!(
2264 !metadata_summary.complete,
2265 "complete shard generations require consistent shard metadata"
2266 );
2267
2268 let mut stale_schema = test_shard(0, 1, true);
2269 stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2270 shards.replace_shards_for_generation(
2271 TierKind::Fast,
2272 "fnv1a-384",
2273 "fp-sharded",
2274 vec![stale_schema],
2275 );
2276 let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2277 assert_eq!(stale_schema_summary.shard_count, 1);
2278 assert_eq!(stale_schema_summary.ready_shards, 1);
2279 assert!(
2280 !stale_schema_summary.complete,
2281 "stale schema shards must not summarize as complete"
2282 );
2283 }
2284
2285 #[test]
2286 fn shard_sidecar_does_not_make_main_manifest_ready() {
2287 let mut shards = SemanticShardManifest::default();
2288 shards.replace_shards_for_generation(
2289 TierKind::Fast,
2290 "fnv1a-384",
2291 "fp-sharded",
2292 vec![test_shard(0, 1, true)],
2293 );
2294 assert!(
2295 shards
2296 .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2297 .complete
2298 );
2299
2300 let manifest = SemanticManifest::default();
2301 let policy = test_policy();
2302 assert_eq!(
2303 manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2304 TierReadiness::Missing,
2305 "sidecar shards must not publish runtime semantic readiness"
2306 );
2307 }
2308
2309 #[test]
2310 fn shard_manifest_invalidates_incompatible_shards() {
2311 let mut bad = test_shard(0, 1, true);
2312 bad.schema_version = 0;
2313 let mut shards = SemanticShardManifest {
2314 shards: vec![bad, test_shard(0, 1, true)],
2315 ..Default::default()
2316 };
2317
2318 let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2319
2320 assert_eq!(invalidated, 1);
2321 assert_eq!(shards.shards.len(), 1);
2322 assert_eq!(shards.total_size_bytes(), 4096);
2323 }
2324}