1use std::fs::{self, OpenOptions};
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use ring::rand::{SecureRandom, SystemRandom};
33use serde::{Deserialize, Serialize};
34
35use super::policy::{
36 CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
37 SemanticAssetManifest as PolicyManifest, SemanticPolicy,
38};
39
40pub const MANIFEST_FORMAT_VERSION: u32 = 2;
67
68pub const MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR: u32 = 1;
73
74pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
76
77pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
79
80pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
81 let trimmed = recorded_path.trim();
82 if trimmed.is_empty() || trimmed != recorded_path {
83 return false;
84 }
85 let path = Path::new(recorded_path);
86 !path.is_absolute()
87 && path
88 .components()
89 .all(|component| matches!(component, Component::Normal(_)))
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
96#[serde(rename_all = "snake_case")]
97pub enum TierKind {
98 Fast,
99 Quality,
100}
101
102impl TierKind {
103 pub fn as_str(&self) -> &'static str {
104 match self {
105 Self::Fast => "fast",
106 Self::Quality => "quality",
107 }
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(rename_all = "snake_case")]
116pub enum TierReadiness {
117 Ready,
119 Building { progress_pct: u8 },
121 Stale { reason: String },
123 Missing,
125 Incompatible { reason: String },
127}
128
129impl TierReadiness {
130 pub fn is_ready(&self) -> bool {
131 matches!(self, Self::Ready)
132 }
133
134 pub fn is_usable(&self) -> bool {
135 matches!(self, Self::Ready | Self::Stale { .. })
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
143pub struct ArtifactRecord {
144 pub tier: TierKind,
146 pub embedder_id: String,
148 pub model_revision: String,
150 pub schema_version: u32,
152 pub chunking_version: u32,
154 pub dimension: usize,
156 pub doc_count: u64,
158 pub conversation_count: u64,
160 pub db_fingerprint: String,
162 pub index_path: String,
164 pub size_bytes: u64,
166 pub started_at_ms: i64,
168 pub completed_at_ms: i64,
170 pub ready: bool,
172}
173
174impl ArtifactRecord {
175 pub fn to_policy_manifest(&self) -> PolicyManifest {
177 PolicyManifest {
178 embedder_id: self.embedder_id.clone(),
179 model_revision: self.model_revision.clone(),
180 schema_version: self.schema_version,
181 chunking_version: self.chunking_version,
182 doc_count: self.doc_count,
183 built_at_ms: self.completed_at_ms,
184 }
185 }
186
187 pub fn readiness(
197 &self,
198 policy: &SemanticPolicy,
199 current_db_fingerprint: &str,
200 current_model_revision: &str,
201 ) -> TierReadiness {
202 let action = self.to_policy_manifest().invalidation_action(
203 policy,
204 current_model_revision,
205 &self.embedder_id,
206 );
207
208 match action {
209 InvalidationAction::UpToDate => {
210 if self.db_fingerprint != current_db_fingerprint {
211 TierReadiness::Stale {
212 reason: "DB content changed since artifact was built".to_owned(),
213 }
214 } else if !self.ready {
215 TierReadiness::Building { progress_pct: 100 }
216 } else {
217 TierReadiness::Ready
218 }
219 }
220 InvalidationAction::RebuildInBackground => TierReadiness::Stale {
221 reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
222 },
223 InvalidationAction::DiscardAndRebuild { reason } => {
224 TierReadiness::Incompatible { reason }
225 }
226 InvalidationAction::Evict => TierReadiness::Incompatible {
227 reason: "semantic mode set to lexical-only".to_owned(),
228 },
229 }
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
237pub struct HnswRecord {
238 pub base_tier: TierKind,
240 pub embedder_id: String,
242 pub ef_search: usize,
244 pub index_path: String,
246 pub size_bytes: u64,
248 pub built_at_ms: i64,
250 pub ready: bool,
252}
253
254#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
264pub struct SemanticShardRecord {
265 pub tier: TierKind,
267 pub embedder_id: String,
269 pub model_revision: String,
271 pub schema_version: u32,
273 pub chunking_version: u32,
275 pub dimension: usize,
277 pub shard_index: u32,
279 pub shard_count: u32,
281 pub doc_count: u64,
283 pub total_conversations: u64,
285 pub db_fingerprint: String,
287 pub index_path: String,
289 pub quantization: String,
291 pub mmap_ready: bool,
293 pub ann_index_path: Option<String>,
295 pub ann_size_bytes: u64,
297 pub ann_ready: bool,
299 pub size_bytes: u64,
301 pub started_at_ms: i64,
303 pub completed_at_ms: i64,
305 pub ready: bool,
307}
308
309impl SemanticShardRecord {
310 pub fn to_policy_manifest(&self) -> PolicyManifest {
311 PolicyManifest {
312 embedder_id: self.embedder_id.clone(),
313 model_revision: self.model_revision.clone(),
314 schema_version: self.schema_version,
315 chunking_version: self.chunking_version,
316 doc_count: self.doc_count,
317 built_at_ms: self.completed_at_ms,
318 }
319 }
320
321 pub fn matches_generation(
322 &self,
323 tier: TierKind,
324 embedder_id: &str,
325 db_fingerprint: &str,
326 ) -> bool {
327 self.tier == tier
328 && self.embedder_id == embedder_id
329 && self.db_fingerprint == db_fingerprint
330 }
331}
332
333#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
335pub struct SemanticShardSummary {
336 pub shard_count: u32,
337 pub ready_shards: u32,
338 pub ann_ready_shards: u32,
339 pub doc_count: u64,
340 pub total_conversations: u64,
341 pub size_bytes: u64,
342 pub ann_size_bytes: u64,
343 pub complete: bool,
344}
345
346#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
348pub struct SemanticShardManifest {
349 pub manifest_version: u32,
350 pub shards: Vec<SemanticShardRecord>,
351 pub updated_at_ms: i64,
352}
353
354impl Default for SemanticShardManifest {
355 fn default() -> Self {
356 Self {
357 manifest_version: MANIFEST_FORMAT_VERSION,
358 shards: Vec::new(),
359 updated_at_ms: 0,
360 }
361 }
362}
363
364impl SemanticShardManifest {
365 pub fn path(data_dir: &Path) -> PathBuf {
366 data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
367 }
368
369 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
370 let path = Self::path(data_dir);
371 let bytes = match fs::read(&path) {
372 Ok(bytes) => bytes,
373 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
374 Err(e) => {
375 return Err(ManifestError::Io {
376 path,
377 source: e.to_string(),
378 });
379 }
380 };
381
382 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
383 path: path.clone(),
384 source: e.to_string(),
385 })?;
386
387 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
388 return Err(ManifestError::UnsupportedVersion {
389 found: manifest.manifest_version,
390 max_supported: MANIFEST_FORMAT_VERSION,
391 });
392 }
393
394 Ok(Some(manifest))
395 }
396
397 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
398 match Self::load(data_dir) {
399 Ok(Some(manifest)) => Ok(manifest),
400 Ok(None) => Ok(Self::default()),
401 Err(e @ ManifestError::Io { .. }) => Err(e),
402 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
403 Ok(Self::default())
404 }
405 Err(e) => Err(e),
406 }
407 }
408
409 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
410 let path = Self::path(data_dir);
411 if let Some(parent) = path.parent() {
412 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
413 path: parent.to_path_buf(),
414 source: e.to_string(),
415 })?;
416 }
417
418 self.updated_at_ms = now_ms();
419 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
420 source: e.to_string(),
421 })?;
422 let (tmp_path, mut file) =
423 create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
424 path: path.clone(),
425 source: e.to_string(),
426 })?;
427 file.write_all(json.as_bytes())
428 .map_err(|e| ManifestError::Io {
429 path: tmp_path.clone(),
430 source: e.to_string(),
431 })?;
432 file.sync_all().map_err(|e| ManifestError::Io {
433 path: tmp_path.clone(),
434 source: e.to_string(),
435 })?;
436 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
437 path: path.clone(),
438 source: e.to_string(),
439 })?;
440 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
441 path: path
442 .parent()
443 .map(Path::to_path_buf)
444 .unwrap_or_else(|| path.clone()),
445 source: e.to_string(),
446 })?;
447
448 Ok(())
449 }
450
451 pub fn replace_shards_for_generation(
452 &mut self,
453 tier: TierKind,
454 embedder_id: &str,
455 db_fingerprint: &str,
456 mut shards: Vec<SemanticShardRecord>,
457 ) {
458 self.shards
459 .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
460 self.shards.append(&mut shards);
461 self.shards.sort_by(|a, b| {
462 (
463 a.tier.as_str(),
464 &a.embedder_id,
465 &a.db_fingerprint,
466 a.shard_index,
467 )
468 .cmp(&(
469 b.tier.as_str(),
470 &b.embedder_id,
471 &b.db_fingerprint,
472 b.shard_index,
473 ))
474 });
475 }
476
477 pub fn summary(
478 &self,
479 tier: TierKind,
480 embedder_id: &str,
481 db_fingerprint: &str,
482 ) -> SemanticShardSummary {
483 let mut summary = SemanticShardSummary::default();
484 let mut ready_indices = std::collections::BTreeSet::new();
485 let mut ann_ready_indices = std::collections::BTreeSet::new();
486 let mut seen_indices = std::collections::BTreeSet::new();
487 let mut seen_index_paths = std::collections::BTreeSet::new();
488 let mut seen_ann_index_paths = std::collections::BTreeSet::new();
489 let mut expected_shard_count = None;
490 let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
491 let mut generation_consistent = true;
492 for shard in self
493 .shards
494 .iter()
495 .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
496 {
497 if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
498 generation_consistent = false;
499 }
500 if !seen_indices.insert(shard.shard_index) {
501 generation_consistent = false;
502 }
503 if !semantic_shard_artifact_path_is_safe(&shard.index_path)
504 || !seen_index_paths.insert(&shard.index_path)
505 {
506 generation_consistent = false;
507 }
508 match expected_shard_count {
509 Some(expected) if expected != shard.shard_count => {
510 generation_consistent = false;
511 }
512 None => expected_shard_count = Some(shard.shard_count),
513 _ => {}
514 }
515 let generation_metadata = (
516 shard.model_revision.as_str(),
517 shard.schema_version,
518 shard.chunking_version,
519 shard.dimension,
520 shard.total_conversations,
521 shard.quantization.as_str(),
522 );
523 match expected_generation_metadata {
524 Some(expected) if expected != generation_metadata => {
525 generation_consistent = false;
526 }
527 None => expected_generation_metadata = Some(generation_metadata),
528 _ => {}
529 }
530 if shard.schema_version != SEMANTIC_SCHEMA_VERSION
531 || shard.chunking_version != CHUNKING_STRATEGY_VERSION
532 || shard.dimension == 0
533 {
534 generation_consistent = false;
535 }
536 summary.shard_count = summary.shard_count.max(shard.shard_count);
537 summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
538 summary.total_conversations =
539 summary.total_conversations.max(shard.total_conversations);
540 summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
541 summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
542 if shard.ready && shard.mmap_ready {
543 ready_indices.insert(shard.shard_index);
544 }
545 if shard.ready
546 && shard.mmap_ready
547 && shard.ann_ready
548 && shard.ann_size_bytes > 0
549 && let Some(ann_index_path) = shard.ann_index_path.as_deref()
550 && semantic_shard_artifact_path_is_safe(ann_index_path)
551 && seen_ann_index_paths.insert(ann_index_path)
552 {
553 ann_ready_indices.insert(shard.shard_index);
554 }
555 }
556 summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
557 summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
558 summary.complete = generation_consistent
559 && summary.shard_count > 0
560 && summary.ready_shards == summary.shard_count
561 && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
562 summary
563 }
564
565 pub fn invalidate_incompatible(
566 &mut self,
567 policy: &SemanticPolicy,
568 current_model_revision: &str,
569 ) -> usize {
570 let before = self.shards.len();
571 self.shards.retain(|shard| {
572 !matches!(
573 shard.to_policy_manifest().invalidation_action(
574 policy,
575 current_model_revision,
576 &shard.embedder_id,
577 ),
578 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
579 )
580 });
581 before.saturating_sub(self.shards.len())
582 }
583
584 pub fn total_size_bytes(&self) -> u64 {
585 self.shards
586 .iter()
587 .map(|shard| shard.size_bytes)
588 .fold(0, u64::saturating_add)
589 }
590}
591
592#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
604pub struct BuildCheckpoint {
605 pub tier: TierKind,
607 pub embedder_id: String,
609 pub last_offset: i64,
611 pub docs_embedded: u64,
613 pub conversations_processed: u64,
615 pub total_conversations: u64,
617 pub db_fingerprint: String,
619 pub schema_version: u32,
621 pub chunking_version: u32,
623 pub saved_at_ms: i64,
625 #[serde(default, skip_serializing_if = "Option::is_none")]
632 pub last_message_id: Option<i64>,
633}
634
635impl BuildCheckpoint {
636 pub fn progress_pct(&self) -> u8 {
638 if self.total_conversations == 0 {
639 return 0;
640 }
641 let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
642 (pct as u8).min(100)
643 }
644
645 pub fn is_complete(&self) -> bool {
647 self.conversations_processed >= self.total_conversations
648 }
649
650 pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
652 self.db_fingerprint == current_db_fingerprint
653 && self.schema_version == SEMANTIC_SCHEMA_VERSION
654 && self.chunking_version == CHUNKING_STRATEGY_VERSION
655 }
656}
657
658#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
662pub struct BacklogLedger {
663 pub total_conversations: u64,
665 pub fast_tier_processed: u64,
667 pub quality_tier_processed: u64,
669 pub db_fingerprint: String,
671 pub computed_at_ms: i64,
673}
674
675impl BacklogLedger {
676 pub fn fast_tier_remaining(&self) -> u64 {
678 self.total_conversations
679 .saturating_sub(self.fast_tier_processed)
680 }
681
682 pub fn quality_tier_remaining(&self) -> u64 {
684 self.total_conversations
685 .saturating_sub(self.quality_tier_processed)
686 }
687
688 pub fn has_pending_work(&self) -> bool {
690 self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
691 }
692
693 pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
695 self.db_fingerprint == current_db_fingerprint
696 }
697}
698
699#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
706pub struct SemanticManifest {
707 pub manifest_version: u32,
709 pub fast_tier: Option<ArtifactRecord>,
711 pub quality_tier: Option<ArtifactRecord>,
713 pub hnsw: Option<HnswRecord>,
715 pub backlog: BacklogLedger,
717 pub checkpoint: Option<BuildCheckpoint>,
719 pub updated_at_ms: i64,
721}
722
723impl Default for SemanticManifest {
724 fn default() -> Self {
725 Self {
726 manifest_version: MANIFEST_FORMAT_VERSION,
727 fast_tier: None,
728 quality_tier: None,
729 hnsw: None,
730 backlog: BacklogLedger {
731 total_conversations: 0,
732 fast_tier_processed: 0,
733 quality_tier_processed: 0,
734 db_fingerprint: String::new(),
735 computed_at_ms: 0,
736 },
737 checkpoint: None,
738 updated_at_ms: 0,
739 }
740 }
741}
742
743impl SemanticManifest {
744 pub fn path(data_dir: &Path) -> PathBuf {
748 data_dir.join("vector_index").join(MANIFEST_FILENAME)
749 }
750
751 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
763 let path = Self::path(data_dir);
764 let bytes = match fs::read(&path) {
765 Ok(b) => b,
766 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
767 Err(e) => {
768 return Err(ManifestError::Io {
769 path,
770 source: e.to_string(),
771 });
772 }
773 };
774
775 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
776 path: path.clone(),
777 source: e.to_string(),
778 })?;
779
780 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
782 return Err(ManifestError::UnsupportedVersion {
783 found: manifest.manifest_version,
784 max_supported: MANIFEST_FORMAT_VERSION,
785 });
786 }
787
788 if manifest.manifest_version <= MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR
794 && manifest
795 .checkpoint
796 .as_ref()
797 .is_some_and(|cp| cp.last_message_id.is_none())
798 {
799 tracing::warn!(
800 manifest_version = manifest.manifest_version,
801 supported_version = MANIFEST_FORMAT_VERSION,
802 path = %path.display(),
803 "semantic checkpoint manifest predates last_message_id cursor (cass#257 sub-fix 2); resume will fall back to conversation offset until the next checkpoint save"
804 );
805 }
806
807 Ok(Some(manifest))
808 }
809
810 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
817 match Self::load(data_dir) {
818 Ok(Some(manifest)) => Ok(manifest),
819 Ok(None) => Ok(Self::default()),
820 Err(e @ ManifestError::Io { .. }) => Err(e),
822 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
824 Ok(Self::default())
825 }
826 Err(e) => Err(e),
827 }
828 }
829
830 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
832 let path = Self::path(data_dir);
833
834 if let Some(parent) = path.parent() {
836 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
837 path: parent.to_path_buf(),
838 source: e.to_string(),
839 })?;
840 }
841
842 self.updated_at_ms = now_ms();
843
844 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
845 source: e.to_string(),
846 })?;
847
848 let (tmp_path, mut file) =
850 create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
851 path: path.clone(),
852 source: e.to_string(),
853 })?;
854 file.write_all(json.as_bytes())
855 .map_err(|e| ManifestError::Io {
856 path: tmp_path.clone(),
857 source: e.to_string(),
858 })?;
859 file.sync_all().map_err(|e| ManifestError::Io {
860 path: tmp_path.clone(),
861 source: e.to_string(),
862 })?;
863 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
864 path: path.clone(),
865 source: e.to_string(),
866 })?;
867 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
868 path: path
869 .parent()
870 .map(Path::to_path_buf)
871 .unwrap_or_else(|| path.clone()),
872 source: e.to_string(),
873 })?;
874
875 Ok(())
876 }
877
878 pub fn fast_tier_readiness(
882 &self,
883 policy: &SemanticPolicy,
884 current_db_fingerprint: &str,
885 current_model_revision: &str,
886 ) -> TierReadiness {
887 match &self.fast_tier {
888 Some(artifact) => {
889 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
890 }
891 None => {
892 if let Some(cp) = &self.checkpoint
894 && cp.tier == TierKind::Fast
895 && cp.is_valid(current_db_fingerprint)
896 {
897 TierReadiness::Building {
898 progress_pct: cp.progress_pct(),
899 }
900 } else {
901 TierReadiness::Missing
902 }
903 }
904 }
905 }
906
907 pub fn quality_tier_readiness(
909 &self,
910 policy: &SemanticPolicy,
911 current_db_fingerprint: &str,
912 current_model_revision: &str,
913 ) -> TierReadiness {
914 match &self.quality_tier {
915 Some(artifact) => {
916 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
917 }
918 None => {
919 if let Some(cp) = &self.checkpoint
920 && cp.tier == TierKind::Quality
921 && cp.is_valid(current_db_fingerprint)
922 {
923 TierReadiness::Building {
924 progress_pct: cp.progress_pct(),
925 }
926 } else {
927 TierReadiness::Missing
928 }
929 }
930 }
931 }
932
933 pub fn can_hybrid_search(
935 &self,
936 policy: &SemanticPolicy,
937 current_db_fingerprint: &str,
938 current_model_revision: &str,
939 ) -> bool {
940 self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
941 .is_usable()
942 }
943
944 pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
948 let fast_processed = self
949 .fast_tier
950 .as_ref()
951 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
952 .map_or(0, |a| a.conversation_count);
953 let quality_processed = self
954 .quality_tier
955 .as_ref()
956 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
957 .map_or(0, |a| a.conversation_count);
958
959 self.backlog = BacklogLedger {
960 total_conversations,
961 fast_tier_processed: fast_processed,
962 quality_tier_processed: quality_processed,
963 db_fingerprint: current_db_fingerprint.to_owned(),
964 computed_at_ms: now_ms(),
965 };
966 }
967
968 pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
970 self.checkpoint = Some(checkpoint);
971 }
972
973 pub fn clear_checkpoint(&mut self) {
975 self.checkpoint = None;
976 }
977
978 pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
980 if self
982 .checkpoint
983 .as_ref()
984 .is_some_and(|cp| cp.tier == artifact.tier)
985 {
986 self.checkpoint = None;
987 }
988
989 match artifact.tier {
990 TierKind::Fast => self.fast_tier = Some(artifact),
991 TierKind::Quality => self.quality_tier = Some(artifact),
992 }
993 }
994
995 pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
997 self.hnsw = Some(hnsw);
998 }
999
1000 #[allow(clippy::too_many_arguments)]
1003 pub fn adopt_legacy_artifact(
1004 &mut self,
1005 tier: TierKind,
1006 embedder_id: &str,
1007 model_revision: &str,
1008 dimension: usize,
1009 doc_count: u64,
1010 conversation_count: u64,
1011 db_fingerprint: &str,
1012 index_path: &str,
1013 size_bytes: u64,
1014 ) -> bool {
1015 let record = ArtifactRecord {
1016 tier,
1017 embedder_id: embedder_id.to_owned(),
1018 model_revision: model_revision.to_owned(),
1019 schema_version: SEMANTIC_SCHEMA_VERSION,
1020 chunking_version: CHUNKING_STRATEGY_VERSION,
1021 dimension,
1022 doc_count,
1023 conversation_count,
1024 db_fingerprint: db_fingerprint.to_owned(),
1025 index_path: index_path.to_owned(),
1026 size_bytes,
1027 started_at_ms: 0,
1028 completed_at_ms: now_ms(),
1029 ready: true,
1030 };
1031
1032 match tier {
1033 TierKind::Fast => self.fast_tier = Some(record),
1034 TierKind::Quality => self.quality_tier = Some(record),
1035 }
1036 true
1037 }
1038
1039 pub fn invalidate_incompatible(
1049 &mut self,
1050 policy: &SemanticPolicy,
1051 current_model_revision: &str,
1052 ) -> usize {
1053 let mut count = 0;
1054
1055 if let Some(ref artifact) = self.fast_tier {
1056 let pm = artifact.to_policy_manifest();
1057 if matches!(
1058 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1059 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1060 ) {
1061 self.fast_tier = None;
1062 count += 1;
1063 }
1064 }
1065
1066 if let Some(ref artifact) = self.quality_tier {
1067 let pm = artifact.to_policy_manifest();
1068 if matches!(
1069 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1070 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1071 ) {
1072 self.quality_tier = None;
1073 count += 1;
1074 }
1075 }
1076
1077 if let Some(ref hnsw) = self.hnsw {
1079 let base_gone = match hnsw.base_tier {
1080 TierKind::Fast => self.fast_tier.is_none(),
1081 TierKind::Quality => self.quality_tier.is_none(),
1082 };
1083 if base_gone {
1084 self.hnsw = None;
1085 count += 1;
1086 }
1087 }
1088
1089 if let Some(ref cp) = self.checkpoint
1091 && (cp.schema_version != policy.semantic_schema_version
1092 || cp.chunking_version != policy.chunking_strategy_version)
1093 {
1094 self.checkpoint = None;
1095 }
1096
1097 count
1098 }
1099
1100 pub fn total_size_bytes(&self) -> u64 {
1102 let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1103 let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1104 let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1105 fast + quality + hnsw
1106 }
1107
1108 pub fn total_size_mb(&self) -> u64 {
1110 self.total_size_bytes().div_ceil(1_048_576)
1111 }
1112}
1113
1114#[derive(Debug)]
1117pub enum ManifestError {
1118 Io { path: PathBuf, source: String },
1119 Parse { path: PathBuf, source: String },
1120 Serialize { source: String },
1121 UnsupportedVersion { found: u32, max_supported: u32 },
1122}
1123
1124impl std::fmt::Display for ManifestError {
1125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1126 match self {
1127 Self::Io { path, source } => {
1128 write!(f, "manifest I/O error at {}: {source}", path.display())
1129 }
1130 Self::Parse { path, source } => {
1131 write!(f, "manifest parse error at {}: {source}", path.display())
1132 }
1133 Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1134 Self::UnsupportedVersion {
1135 found,
1136 max_supported,
1137 } => write!(
1138 f,
1139 "manifest version {found} is newer than supported version {max_supported}"
1140 ),
1141 }
1142 }
1143}
1144
1145impl std::error::Error for ManifestError {}
1146
1147fn now_ms() -> i64 {
1150 SystemTime::now()
1151 .duration_since(UNIX_EPOCH)
1152 .map(|d| d.as_millis() as i64)
1153 .unwrap_or(0)
1154}
1155
1156fn unique_manifest_temp_path(path: &Path, attempt: u32, random: u64) -> PathBuf {
1157 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1158
1159 let file_name = path
1160 .file_name()
1161 .and_then(|name| name.to_str())
1162 .unwrap_or(MANIFEST_FILENAME);
1163 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1164 path.with_file_name(format!(
1165 ".{file_name}.tmp.{attempt}.{}.{}.{random:016x}",
1166 now_ms(),
1167 nonce
1168 ))
1169}
1170
1171fn create_unique_manifest_temp_file(path: &Path) -> std::io::Result<(PathBuf, fs::File)> {
1172 for attempt in 0..100 {
1173 let random = random_manifest_path_nonce()?;
1174 let tmp_path = unique_manifest_temp_path(path, attempt, random);
1175 match OpenOptions::new()
1176 .write(true)
1177 .create_new(true)
1178 .open(&tmp_path)
1179 {
1180 Ok(file) => return Ok((tmp_path, file)),
1181 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
1182 Err(err) => return Err(err),
1183 }
1184 }
1185
1186 Err(std::io::Error::new(
1187 std::io::ErrorKind::AlreadyExists,
1188 format!(
1189 "could not create a unique temporary manifest file for {} after 100 attempts",
1190 path.display()
1191 ),
1192 ))
1193}
1194
1195#[cfg(windows)]
1196fn unique_manifest_backup_path(path: &Path) -> std::io::Result<PathBuf> {
1197 let random = random_manifest_path_nonce()?;
1198 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1199
1200 let file_name = path
1201 .file_name()
1202 .and_then(|name| name.to_str())
1203 .unwrap_or(MANIFEST_FILENAME);
1204 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1205 Ok(path.with_file_name(format!(
1206 ".{file_name}.bak.{}.{nonce}.{random:016x}",
1207 now_ms()
1208 )))
1209}
1210
1211fn random_manifest_path_nonce() -> std::io::Result<u64> {
1212 let mut random_bytes = [0u8; 8];
1213 SystemRandom::new()
1214 .fill(&mut random_bytes)
1215 .map_err(|_| std::io::Error::other("failed to generate manifest temp path nonce"))?;
1216 Ok(u64::from_le_bytes(random_bytes))
1217}
1218
1219fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1220 #[cfg(windows)]
1221 {
1222 match fs::rename(temp_path, final_path) {
1223 Ok(()) => sync_parent_directory(final_path),
1224 Err(first_err)
1225 if replacement_path_entry_exists(final_path)?
1226 && matches!(
1227 first_err.kind(),
1228 std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1229 ) =>
1230 {
1231 let backup_path = unique_manifest_backup_path(final_path)?;
1232 fs::rename(final_path, &backup_path).map_err(|backup_err| {
1233 let _ = fs::remove_file(temp_path);
1234 std::io::Error::other(format!(
1235 "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1236 backup_path.display(),
1237 final_path.display(),
1238 first_err,
1239 backup_err
1240 ))
1241 })?;
1242 match fs::rename(temp_path, final_path) {
1243 Ok(()) => {
1244 let _ = fs::remove_file(&backup_path);
1245 sync_parent_directory(final_path)
1246 }
1247 Err(second_err) => match fs::rename(&backup_path, final_path) {
1248 Ok(()) => {
1249 let _ = fs::remove_file(temp_path);
1250 sync_parent_directory(final_path)?;
1251 Err(std::io::Error::other(format!(
1252 "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1253 final_path.display(),
1254 temp_path.display(),
1255 first_err,
1256 second_err
1257 )))
1258 }
1259 Err(restore_err) => Err(std::io::Error::other(format!(
1260 "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1261 final_path.display(),
1262 temp_path.display(),
1263 first_err,
1264 second_err,
1265 restore_err,
1266 temp_path.display()
1267 ))),
1268 },
1269 }
1270 }
1271 Err(err) => Err(err),
1272 }
1273 }
1274
1275 #[cfg(not(windows))]
1276 {
1277 fs::rename(temp_path, final_path)
1278 }
1279}
1280
1281#[cfg(any(windows, test))]
1282fn replacement_path_entry_exists(path: &Path) -> std::io::Result<bool> {
1283 match fs::symlink_metadata(path) {
1284 Ok(_) => Ok(true),
1285 Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(false),
1286 Err(err) => Err(std::io::Error::new(
1287 err.kind(),
1288 format!(
1289 "failed inspecting semantic manifest replacement target {}: {err}",
1290 path.display()
1291 ),
1292 )),
1293 }
1294}
1295
1296#[cfg(not(windows))]
1297fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1298 let Some(parent) = path.parent() else {
1299 return Ok(());
1300 };
1301 let directory = fs::File::open(parent)?;
1302 directory.sync_all()
1303}
1304
1305#[cfg(windows)]
1306fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1307 Ok(())
1308}
1309
1310#[cfg(test)]
1313mod tests {
1314 use super::*;
1315 use crate::search::policy::SemanticPolicy;
1316
1317 fn test_policy() -> SemanticPolicy {
1318 SemanticPolicy::compiled_defaults()
1319 }
1320
1321 fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1322 ArtifactRecord {
1323 tier,
1324 embedder_id: match tier {
1325 TierKind::Fast => "fnv1a-384".to_owned(),
1326 TierKind::Quality => "minilm-384".to_owned(),
1327 },
1328 model_revision: "abc123".to_owned(),
1329 schema_version: SEMANTIC_SCHEMA_VERSION,
1330 chunking_version: CHUNKING_STRATEGY_VERSION,
1331 dimension: 384,
1332 doc_count: 1000,
1333 conversation_count: 250,
1334 db_fingerprint: "fp-1234".to_owned(),
1335 index_path: format!(
1336 "vector_index/index-{}.fsvi",
1337 match tier {
1338 TierKind::Fast => "fnv1a-384",
1339 TierKind::Quality => "minilm-384",
1340 }
1341 ),
1342 size_bytes: 150_000,
1343 started_at_ms: 1_700_000_000_000,
1344 completed_at_ms: 1_700_000_060_000,
1345 ready,
1346 }
1347 }
1348
1349 fn test_hnsw() -> HnswRecord {
1350 HnswRecord {
1351 base_tier: TierKind::Quality,
1352 embedder_id: "minilm-384".to_owned(),
1353 ef_search: 128,
1354 index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1355 size_bytes: 50_000,
1356 built_at_ms: 1_700_000_070_000,
1357 ready: true,
1358 }
1359 }
1360
1361 fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1362 SemanticShardRecord {
1363 tier: TierKind::Fast,
1364 embedder_id: "fnv1a-384".to_owned(),
1365 model_revision: "hash".to_owned(),
1366 schema_version: SEMANTIC_SCHEMA_VERSION,
1367 chunking_version: CHUNKING_STRATEGY_VERSION,
1368 dimension: 384,
1369 shard_index,
1370 shard_count,
1371 doc_count: 25,
1372 total_conversations: 10,
1373 db_fingerprint: "fp-sharded".to_owned(),
1374 index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1375 quantization: "f16".to_owned(),
1376 mmap_ready: true,
1377 ann_index_path: None,
1378 ann_size_bytes: 0,
1379 ann_ready: false,
1380 size_bytes: 4096,
1381 started_at_ms: 1_700_000_080_000,
1382 completed_at_ms: 1_700_000_081_000,
1383 ready,
1384 }
1385 }
1386
1387 fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1388 BuildCheckpoint {
1389 tier,
1390 embedder_id: "minilm-384".to_owned(),
1391 last_offset: 500,
1392 docs_embedded: 3000,
1393 conversations_processed: 500,
1394 total_conversations: 1000,
1395 db_fingerprint: "fp-1234".to_owned(),
1396 schema_version: SEMANTIC_SCHEMA_VERSION,
1397 chunking_version: CHUNKING_STRATEGY_VERSION,
1398 saved_at_ms: 1_700_000_030_000,
1399 last_message_id: None,
1400 }
1401 }
1402
1403 #[derive(Debug, Clone, Copy)]
1404 enum ExpectedTierReadiness {
1405 Ready,
1406 Stale,
1407 Incompatible,
1408 Building(u8),
1409 }
1410
1411 fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1412
1413 type TierReadinessCase = (
1414 &'static str,
1415 TierKind,
1416 bool,
1417 &'static str,
1418 &'static str,
1419 fn(&mut ArtifactRecord),
1420 ExpectedTierReadiness,
1421 );
1422
1423 fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1424 artifact.schema_version = 0;
1425 }
1426
1427 fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1428 match expected {
1429 ExpectedTierReadiness::Ready => {
1430 assert_eq!(actual, TierReadiness::Ready, "{label}");
1431 }
1432 ExpectedTierReadiness::Stale => {
1433 assert!(
1434 matches!(actual, TierReadiness::Stale { .. }),
1435 "{label}: {actual:?}"
1436 );
1437 }
1438 ExpectedTierReadiness::Incompatible => {
1439 assert!(
1440 matches!(actual, TierReadiness::Incompatible { .. }),
1441 "{label}: {actual:?}"
1442 );
1443 }
1444 ExpectedTierReadiness::Building(progress_pct) => {
1445 assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1446 }
1447 }
1448 }
1449
1450 #[test]
1453 fn manifest_round_trip_via_disk() {
1454 let temp = tempfile::tempdir().unwrap();
1455 let mut manifest = SemanticManifest {
1456 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1457 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1458 hnsw: Some(test_hnsw()),
1459 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1460 backlog: BacklogLedger {
1461 total_conversations: 2000,
1462 fast_tier_processed: 1000,
1463 quality_tier_processed: 500,
1464 db_fingerprint: "fp-1234".to_owned(),
1465 computed_at_ms: 1_700_000_000_000,
1466 },
1467 ..Default::default()
1468 };
1469
1470 manifest.save(temp.path()).unwrap();
1471 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1472
1473 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1474 assert!(loaded.fast_tier.is_some());
1475 assert!(loaded.quality_tier.is_some());
1476 assert!(loaded.hnsw.is_some());
1477 assert!(loaded.checkpoint.is_some());
1478 assert_eq!(loaded.backlog.total_conversations, 2000);
1479 assert!(loaded.updated_at_ms > 0);
1480 }
1481
1482 #[test]
1483 fn manifest_save_overwrites_existing_file() {
1484 let temp = tempfile::tempdir().unwrap();
1485 let mut first = SemanticManifest {
1486 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1487 ..Default::default()
1488 };
1489 first.save(temp.path()).unwrap();
1490
1491 let mut second = SemanticManifest {
1492 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1493 backlog: BacklogLedger {
1494 total_conversations: 99,
1495 fast_tier_processed: 0,
1496 quality_tier_processed: 99,
1497 db_fingerprint: "fp-overwrite".to_owned(),
1498 computed_at_ms: 1_700_000_000_123,
1499 },
1500 ..Default::default()
1501 };
1502 second.save(temp.path()).unwrap();
1503
1504 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1505 assert!(loaded.fast_tier.is_none());
1506 assert!(loaded.quality_tier.is_some());
1507 assert_eq!(loaded.backlog.total_conversations, 99);
1508 }
1509
1510 #[cfg(unix)]
1511 #[test]
1512 fn manifest_replacement_path_entry_exists_detects_dangling_symlink() -> Result<(), String> {
1513 use std::os::unix::fs::symlink;
1514
1515 let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1516 let link_path = SemanticManifest::path(temp.path());
1517 let manifest_dir = link_path
1518 .parent()
1519 .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1520 fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1521 let missing_target = manifest_dir.join("missing-semantic-manifest.json");
1522
1523 symlink(&missing_target, &link_path).map_err(|e| e.to_string())?;
1524
1525 if link_path.exists() {
1526 return Err("dangling manifest symlink unexpectedly resolved".to_string());
1527 }
1528 if !replacement_path_entry_exists(&link_path).map_err(|e| e.to_string())? {
1529 return Err(format!(
1530 "semantic manifest replacement entry check missed dangling symlink {}",
1531 link_path.display()
1532 ));
1533 }
1534
1535 Ok(())
1536 }
1537
1538 #[test]
1539 fn manifest_temp_file_creation_is_exclusive_and_unique() -> Result<(), String> {
1540 let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1541 let final_path = SemanticManifest::path(temp.path());
1542 let manifest_dir = final_path
1543 .parent()
1544 .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1545 fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1546
1547 let (first_path, mut first_file) =
1548 create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1549 first_file.write_all(b"first").map_err(|e| e.to_string())?;
1550 let (second_path, mut second_file) =
1551 create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1552 second_file
1553 .write_all(b"second")
1554 .map_err(|e| e.to_string())?;
1555
1556 if first_path == second_path {
1557 return Err("exclusive temp creation reused the same path".to_string());
1558 }
1559 if !first_path.exists() {
1560 return Err(format!(
1561 "first temp file is missing: {}",
1562 first_path.display()
1563 ));
1564 }
1565 if !second_path.exists() {
1566 return Err(format!(
1567 "second temp file is missing: {}",
1568 second_path.display()
1569 ));
1570 }
1571 if first_path.parent() != Some(manifest_dir) {
1572 return Err(format!(
1573 "first temp path escaped manifest directory: {}",
1574 first_path.display()
1575 ));
1576 }
1577 if second_path.parent() != Some(manifest_dir) {
1578 return Err(format!(
1579 "second temp path escaped manifest directory: {}",
1580 second_path.display()
1581 ));
1582 }
1583
1584 Ok(())
1585 }
1586
1587 #[test]
1588 fn manifest_load_missing_returns_none() {
1589 let temp = tempfile::tempdir().unwrap();
1590 let loaded = SemanticManifest::load(temp.path()).unwrap();
1591 assert!(loaded.is_none());
1592 }
1593
1594 #[test]
1595 fn manifest_load_or_default_returns_defaults() {
1596 let temp = tempfile::tempdir().unwrap();
1597 let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1598 assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1599 assert!(manifest.fast_tier.is_none());
1600 assert!(manifest.quality_tier.is_none());
1601 }
1602
1603 #[test]
1604 fn manifest_load_corrupt_returns_parse_error() {
1605 let temp = tempfile::tempdir().unwrap();
1606 let path = SemanticManifest::path(temp.path());
1607 fs::create_dir_all(path.parent().unwrap()).unwrap();
1608 fs::write(&path, b"not json").unwrap();
1609
1610 let result = SemanticManifest::load(temp.path());
1611 assert!(matches!(result, Err(ManifestError::Parse { .. })));
1612 }
1613
1614 #[test]
1615 fn manifest_load_future_version_returns_error() {
1616 let temp = tempfile::tempdir().unwrap();
1617 let path = SemanticManifest::path(temp.path());
1618 fs::create_dir_all(path.parent().unwrap()).unwrap();
1619
1620 let manifest = SemanticManifest {
1621 manifest_version: MANIFEST_FORMAT_VERSION + 1,
1622 ..Default::default()
1623 };
1624 let json = serde_json::to_string(&manifest).unwrap();
1625 fs::write(&path, json).unwrap();
1626
1627 let result = SemanticManifest::load(temp.path());
1628 assert!(matches!(
1629 result,
1630 Err(ManifestError::UnsupportedVersion { .. })
1631 ));
1632 }
1633
1634 #[test]
1637 fn tier_readiness_cases() {
1638 let policy = test_policy();
1639 let db_fp = "fp-1234";
1640 let model_rev = "abc123";
1641 let cases: &[TierReadinessCase] = &[
1642 (
1643 "ready artifact with matching fingerprint",
1644 TierKind::Fast,
1645 true,
1646 db_fp,
1647 model_rev,
1648 no_artifact_mutation,
1649 ExpectedTierReadiness::Ready,
1650 ),
1651 (
1652 "ready artifact with changed DB fingerprint",
1653 TierKind::Fast,
1654 true,
1655 "different-fp",
1656 model_rev,
1657 no_artifact_mutation,
1658 ExpectedTierReadiness::Stale,
1659 ),
1660 (
1661 "ready artifact with changed model revision",
1662 TierKind::Quality,
1663 true,
1664 db_fp,
1665 "new-revision",
1666 no_artifact_mutation,
1667 ExpectedTierReadiness::Stale,
1668 ),
1669 (
1670 "schema version mismatch",
1671 TierKind::Quality,
1672 true,
1673 db_fp,
1674 model_rev,
1675 set_schema_version_to_zero,
1676 ExpectedTierReadiness::Incompatible,
1677 ),
1678 (
1679 "not yet published artifact",
1680 TierKind::Fast,
1681 false,
1682 db_fp,
1683 model_rev,
1684 no_artifact_mutation,
1685 ExpectedTierReadiness::Building(100),
1686 ),
1687 ];
1688
1689 for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1690 let mut artifact = test_artifact(*tier, *ready);
1691 mutate(&mut artifact);
1692 assert_tier_readiness(
1693 artifact.readiness(&policy, current_db_fp, current_model_rev),
1694 *expected,
1695 label,
1696 );
1697 }
1698 }
1699
1700 #[test]
1703 fn manifest_tier_readiness_missing() {
1704 let manifest = SemanticManifest::default();
1705 let policy = test_policy();
1706 assert_eq!(
1707 manifest.fast_tier_readiness(&policy, "fp", "rev"),
1708 TierReadiness::Missing,
1709 );
1710 assert_eq!(
1711 manifest.quality_tier_readiness(&policy, "fp", "rev"),
1712 TierReadiness::Missing,
1713 );
1714 }
1715
1716 #[test]
1717 fn manifest_tier_readiness_with_checkpoint() {
1718 let manifest = SemanticManifest {
1719 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1720 ..Default::default()
1721 };
1722
1723 let policy = test_policy();
1724 assert_eq!(
1726 manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1727 TierReadiness::Missing,
1728 );
1729 assert!(matches!(
1731 manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1732 TierReadiness::Building { progress_pct: 50 },
1733 ));
1734 }
1735
1736 #[test]
1737 fn manifest_tier_readiness_checkpoint_invalid_db() {
1738 let manifest = SemanticManifest {
1739 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1740 ..Default::default()
1741 };
1742
1743 let policy = test_policy();
1744 assert_eq!(
1746 manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1747 TierReadiness::Missing,
1748 );
1749 }
1750
1751 #[test]
1754 fn can_hybrid_search_requires_usable_fast_tier() {
1755 let policy = test_policy();
1756 let db_fp = "fp-1234";
1757 let rev = "abc123";
1758
1759 let manifest = SemanticManifest::default();
1761 assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1762
1763 let manifest = SemanticManifest {
1765 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1766 ..Default::default()
1767 };
1768 assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1769 }
1770
1771 #[test]
1774 fn backlog_remaining_and_pending() {
1775 let ledger = BacklogLedger {
1776 total_conversations: 1000,
1777 fast_tier_processed: 800,
1778 quality_tier_processed: 300,
1779 db_fingerprint: "fp".to_owned(),
1780 computed_at_ms: 0,
1781 };
1782
1783 assert_eq!(ledger.fast_tier_remaining(), 200);
1784 assert_eq!(ledger.quality_tier_remaining(), 700);
1785 assert!(ledger.has_pending_work());
1786 assert!(ledger.is_current("fp"));
1787 assert!(!ledger.is_current("other"));
1788 }
1789
1790 #[test]
1791 fn backlog_no_pending_when_fully_processed() {
1792 let ledger = BacklogLedger {
1793 total_conversations: 500,
1794 fast_tier_processed: 500,
1795 quality_tier_processed: 500,
1796 db_fingerprint: "fp".to_owned(),
1797 computed_at_ms: 0,
1798 };
1799
1800 assert_eq!(ledger.fast_tier_remaining(), 0);
1801 assert_eq!(ledger.quality_tier_remaining(), 0);
1802 assert!(!ledger.has_pending_work());
1803 }
1804
1805 #[test]
1808 fn checkpoint_progress_and_completion() {
1809 let cp = test_checkpoint(TierKind::Quality);
1810 assert_eq!(cp.progress_pct(), 50);
1811 assert!(!cp.is_complete());
1812 assert!(cp.is_valid("fp-1234"));
1813 assert!(!cp.is_valid("other-fp"));
1814
1815 let mut cp = test_checkpoint(TierKind::Quality);
1817 cp.conversations_processed = 1000;
1818 assert_eq!(cp.progress_pct(), 100);
1819 assert!(cp.is_complete());
1820 }
1821
1822 #[test]
1823 fn checkpoint_zero_total_gives_zero_pct() {
1824 let mut cp = test_checkpoint(TierKind::Fast);
1825 cp.total_conversations = 0;
1826 cp.conversations_processed = 0;
1827 assert_eq!(cp.progress_pct(), 0);
1828 }
1829
1830 #[test]
1833 fn publish_artifact_clears_matching_checkpoint() {
1834 let mut manifest = SemanticManifest {
1835 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1836 ..Default::default()
1837 };
1838
1839 manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1840 assert!(manifest.checkpoint.is_none());
1841 assert!(manifest.quality_tier.is_some());
1842 }
1843
1844 #[test]
1845 fn publish_artifact_keeps_non_matching_checkpoint() {
1846 let mut manifest = SemanticManifest {
1847 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1848 ..Default::default()
1849 };
1850
1851 manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1852 assert!(manifest.checkpoint.is_some()); assert!(manifest.fast_tier.is_some());
1854 }
1855
1856 #[test]
1859 fn refresh_backlog_computes_from_ready_artifacts() {
1860 let mut manifest = SemanticManifest {
1861 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1862 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1863 ..Default::default()
1864 };
1865
1866 manifest.refresh_backlog(2000, "fp-1234");
1867 assert_eq!(manifest.backlog.total_conversations, 2000);
1868 assert_eq!(manifest.backlog.fast_tier_processed, 250);
1869 assert_eq!(manifest.backlog.quality_tier_processed, 250);
1870 }
1871
1872 #[test]
1873 fn refresh_backlog_ignores_stale_artifacts() {
1874 let mut manifest = SemanticManifest {
1875 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1876 ..Default::default()
1877 };
1878
1879 manifest.refresh_backlog(2000, "different-fp");
1881 assert_eq!(manifest.backlog.fast_tier_processed, 0);
1882 }
1883
1884 #[test]
1887 fn invalidate_incompatible_removes_schema_mismatch() {
1888 let mut artifact = test_artifact(TierKind::Quality, true);
1889 artifact.schema_version = 0; let mut manifest = SemanticManifest {
1891 quality_tier: Some(artifact),
1892 hnsw: Some(test_hnsw()), ..Default::default()
1894 };
1895
1896 let policy = test_policy();
1897 let count = manifest.invalidate_incompatible(&policy, "abc123");
1898
1899 assert_eq!(count, 2); assert!(manifest.quality_tier.is_none());
1901 assert!(manifest.hnsw.is_none());
1902 }
1903
1904 #[test]
1905 fn invalidate_incompatible_keeps_compatible() {
1906 let mut manifest = SemanticManifest {
1907 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1908 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1909 ..Default::default()
1910 };
1911
1912 let policy = test_policy();
1913 let count = manifest.invalidate_incompatible(&policy, "abc123");
1914
1915 assert_eq!(count, 0);
1916 assert!(manifest.fast_tier.is_some());
1917 assert!(manifest.quality_tier.is_some());
1918 }
1919
1920 #[test]
1923 fn adopt_legacy_artifact() {
1924 let mut manifest = SemanticManifest::default();
1925 let doc_count = 500;
1926 let conversation_count = 125;
1927 let index_path = "vector_index/index-fnv1a-384.fsvi";
1928 let db_fingerprint = "fp-old";
1929 let size_bytes = 75_000;
1930 let adopted = manifest.adopt_legacy_artifact(
1931 TierKind::Fast,
1932 "fnv1a-384",
1933 "hash",
1934 384,
1935 doc_count,
1936 conversation_count,
1937 db_fingerprint,
1938 index_path,
1939 size_bytes,
1940 );
1941
1942 assert!(adopted);
1943 let fast = manifest.fast_tier.as_ref().unwrap();
1944 assert_eq!(fast.embedder_id, "fnv1a-384");
1945 assert!(fast.ready);
1946 assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1947 }
1948
1949 #[test]
1952 fn total_size_accounts_for_all_artifacts() {
1953 let manifest = SemanticManifest {
1954 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1955 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1956 hnsw: Some(test_hnsw()),
1957 ..Default::default()
1958 };
1959
1960 assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1961 assert_eq!(manifest.total_size_mb(), 1); }
1963
1964 #[test]
1965 fn total_size_empty_is_zero() {
1966 let manifest = SemanticManifest::default();
1967 assert_eq!(manifest.total_size_bytes(), 0);
1968 assert_eq!(manifest.total_size_mb(), 0);
1969 }
1970
1971 #[test]
1974 fn manifest_json_round_trip() {
1975 let manifest = SemanticManifest {
1976 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1977 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1978 hnsw: Some(test_hnsw()),
1979 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1980 ..Default::default()
1981 };
1982
1983 let json = serde_json::to_string_pretty(&manifest).unwrap();
1984 let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
1985 assert_eq!(deser.fast_tier, manifest.fast_tier);
1986 assert_eq!(deser.quality_tier, manifest.quality_tier);
1987 assert_eq!(deser.hnsw, manifest.hnsw);
1988 assert_eq!(deser.checkpoint, manifest.checkpoint);
1989 }
1990
1991 #[test]
1994 fn shard_manifest_round_trip_via_sidecar() {
1995 let temp = tempfile::tempdir().unwrap();
1996 let mut shards = SemanticShardManifest::default();
1997 shards.replace_shards_for_generation(
1998 TierKind::Fast,
1999 "fnv1a-384",
2000 "fp-sharded",
2001 vec![test_shard(1, 2, true), test_shard(0, 2, true)],
2002 );
2003
2004 shards.save(temp.path()).unwrap();
2005 let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
2006
2007 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
2008 assert_eq!(loaded.shards.len(), 2);
2009 assert_eq!(loaded.shards[0].shard_index, 0);
2010 assert_eq!(loaded.shards[1].shard_index, 1);
2011 assert!(loaded.updated_at_ms > 0);
2012 }
2013
2014 #[test]
2015 fn shard_summary_requires_every_ready_shard() {
2016 let mut shards = SemanticShardManifest::default();
2017 shards.replace_shards_for_generation(
2018 TierKind::Fast,
2019 "fnv1a-384",
2020 "fp-sharded",
2021 vec![test_shard(0, 3, true), test_shard(2, 3, true)],
2022 );
2023
2024 let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2025 assert_eq!(partial.shard_count, 3);
2026 assert_eq!(partial.ready_shards, 2);
2027 assert!(!partial.complete);
2028
2029 shards.replace_shards_for_generation(
2030 TierKind::Fast,
2031 "fnv1a-384",
2032 "fp-sharded",
2033 vec![
2034 test_shard(0, 3, true),
2035 test_shard(1, 3, true),
2036 test_shard(2, 3, true),
2037 ],
2038 );
2039
2040 let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2041 assert_eq!(complete.ready_shards, 3);
2042 assert!(complete.complete);
2043 assert_eq!(complete.doc_count, 75);
2044 assert_eq!(complete.total_conversations, 10);
2045 }
2046
2047 #[test]
2048 fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
2049 let mut non_mmap = test_shard(0, 1, true);
2050 non_mmap.mmap_ready = false;
2051 let mut shards = SemanticShardManifest::default();
2052 shards.replace_shards_for_generation(
2053 TierKind::Fast,
2054 "fnv1a-384",
2055 "fp-sharded",
2056 vec![non_mmap],
2057 );
2058
2059 let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2060 assert_eq!(non_mmap_summary.ready_shards, 0);
2061 assert!(!non_mmap_summary.complete);
2062
2063 let mut inconsistent = test_shard(1, 3, true);
2064 inconsistent.ann_ready = true;
2065 inconsistent.ann_index_path = None;
2066 inconsistent.ann_size_bytes = 4096;
2067 shards.replace_shards_for_generation(
2068 TierKind::Fast,
2069 "fnv1a-384",
2070 "fp-sharded",
2071 vec![test_shard(0, 2, true), inconsistent],
2072 );
2073
2074 let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2075 assert_eq!(inconsistent_summary.shard_count, 3);
2076 assert_eq!(inconsistent_summary.ready_shards, 2);
2077 assert_eq!(inconsistent_summary.ann_ready_shards, 0);
2078 assert!(!inconsistent_summary.complete);
2079
2080 shards.replace_shards_for_generation(
2081 TierKind::Fast,
2082 "fnv1a-384",
2083 "fp-sharded",
2084 vec![
2085 test_shard(0, 2, true),
2086 test_shard(1, 2, true),
2087 test_shard(1, 2, false),
2088 ],
2089 );
2090 let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2091 assert_eq!(duplicate_summary.shard_count, 2);
2092 assert_eq!(duplicate_summary.ready_shards, 2);
2093 assert!(
2094 !duplicate_summary.complete,
2095 "duplicate shard indexes must not summarize as a complete generation"
2096 );
2097
2098 let mut duplicate_path = test_shard(1, 2, true);
2099 duplicate_path.index_path = test_shard(0, 2, true).index_path;
2100 shards.replace_shards_for_generation(
2101 TierKind::Fast,
2102 "fnv1a-384",
2103 "fp-sharded",
2104 vec![test_shard(0, 2, true), duplicate_path],
2105 );
2106 let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2107 assert_eq!(duplicate_path_summary.shard_count, 2);
2108 assert_eq!(duplicate_path_summary.ready_shards, 2);
2109 assert!(
2110 !duplicate_path_summary.complete,
2111 "duplicate shard index paths must not summarize as a complete generation"
2112 );
2113
2114 let mut blank_path = test_shard(0, 1, true);
2115 blank_path.index_path.clear();
2116 shards.replace_shards_for_generation(
2117 TierKind::Fast,
2118 "fnv1a-384",
2119 "fp-sharded",
2120 vec![blank_path],
2121 );
2122 let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2123 assert_eq!(blank_path_summary.shard_count, 1);
2124 assert_eq!(blank_path_summary.ready_shards, 1);
2125 assert!(
2126 !blank_path_summary.complete,
2127 "blank shard index paths must not summarize as complete"
2128 );
2129
2130 for unsafe_path in [
2131 tempfile::tempdir()
2132 .unwrap()
2133 .path()
2134 .join("outside.fsvi")
2135 .to_string_lossy()
2136 .to_string(),
2137 "vector_index/shards/../outside.fsvi".to_string(),
2138 "./vector_index/shards/fast/hash.fsvi".to_string(),
2139 " vector_index/shards/fast/hash.fsvi".to_string(),
2140 ] {
2141 let mut unsafe_shard = test_shard(0, 1, true);
2142 unsafe_shard.index_path = unsafe_path;
2143 shards.replace_shards_for_generation(
2144 TierKind::Fast,
2145 "fnv1a-384",
2146 "fp-sharded",
2147 vec![unsafe_shard],
2148 );
2149 let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2150 assert_eq!(unsafe_path_summary.shard_count, 1);
2151 assert_eq!(unsafe_path_summary.ready_shards, 1);
2152 assert!(
2153 !unsafe_path_summary.complete,
2154 "unsafe shard index paths must not summarize as complete"
2155 );
2156 }
2157
2158 let outside_ann_dir = tempfile::tempdir().unwrap();
2159 for unsafe_ann_path in [
2160 outside_ann_dir
2161 .path()
2162 .join("outside.chsw")
2163 .to_string_lossy()
2164 .to_string(),
2165 "vector_index/shards/../outside.chsw".to_string(),
2166 "./vector_index/shards/fast/hash.chsw".to_string(),
2167 " vector_index/shards/fast/hash.chsw".to_string(),
2168 ] {
2169 let mut unsafe_ann = test_shard(0, 1, true);
2170 unsafe_ann.ann_ready = true;
2171 unsafe_ann.ann_index_path = Some(unsafe_ann_path);
2172 unsafe_ann.ann_size_bytes = 4096;
2173 shards.replace_shards_for_generation(
2174 TierKind::Fast,
2175 "fnv1a-384",
2176 "fp-sharded",
2177 vec![unsafe_ann],
2178 );
2179 let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2180 assert_eq!(unsafe_ann_summary.shard_count, 1);
2181 assert_eq!(unsafe_ann_summary.ready_shards, 1);
2182 assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
2183 assert!(
2184 unsafe_ann_summary.complete,
2185 "unsafe optional ANN paths must not invalidate the vector shard generation"
2186 );
2187 }
2188
2189 let mut duplicate_ann_path = test_shard(1, 2, true);
2190 duplicate_ann_path.ann_ready = true;
2191 duplicate_ann_path.ann_index_path =
2192 Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
2193 duplicate_ann_path.ann_size_bytes = 4096;
2194 let mut first_ann_path = test_shard(0, 2, true);
2195 first_ann_path.ann_ready = true;
2196 first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2197 first_ann_path.ann_size_bytes = 4096;
2198 shards.replace_shards_for_generation(
2199 TierKind::Fast,
2200 "fnv1a-384",
2201 "fp-sharded",
2202 vec![first_ann_path, duplicate_ann_path],
2203 );
2204 let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2205 assert_eq!(duplicate_ann_summary.shard_count, 2);
2206 assert_eq!(duplicate_ann_summary.ready_shards, 2);
2207 assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2208 assert!(
2209 duplicate_ann_summary.complete,
2210 "duplicate optional ANN paths must not invalidate the vector shard generation"
2211 );
2212
2213 shards.replace_shards_for_generation(
2214 TierKind::Fast,
2215 "fnv1a-384",
2216 "fp-sharded",
2217 vec![test_shard(2, 2, true)],
2218 );
2219 let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2220 assert_eq!(out_of_range_summary.shard_count, 2);
2221 assert_eq!(out_of_range_summary.ready_shards, 1);
2222 assert!(
2223 !out_of_range_summary.complete,
2224 "shard indexes outside the declared shard count are malformed"
2225 );
2226
2227 let mut mismatched_metadata = test_shard(1, 2, true);
2228 mismatched_metadata.dimension = 768;
2229 shards.replace_shards_for_generation(
2230 TierKind::Fast,
2231 "fnv1a-384",
2232 "fp-sharded",
2233 vec![test_shard(0, 2, true), mismatched_metadata],
2234 );
2235 let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2236 assert_eq!(metadata_summary.shard_count, 2);
2237 assert_eq!(metadata_summary.ready_shards, 2);
2238 assert!(
2239 !metadata_summary.complete,
2240 "complete shard generations require consistent shard metadata"
2241 );
2242
2243 let mut stale_schema = test_shard(0, 1, true);
2244 stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2245 shards.replace_shards_for_generation(
2246 TierKind::Fast,
2247 "fnv1a-384",
2248 "fp-sharded",
2249 vec![stale_schema],
2250 );
2251 let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2252 assert_eq!(stale_schema_summary.shard_count, 1);
2253 assert_eq!(stale_schema_summary.ready_shards, 1);
2254 assert!(
2255 !stale_schema_summary.complete,
2256 "stale schema shards must not summarize as complete"
2257 );
2258 }
2259
2260 #[test]
2261 fn shard_sidecar_does_not_make_main_manifest_ready() {
2262 let mut shards = SemanticShardManifest::default();
2263 shards.replace_shards_for_generation(
2264 TierKind::Fast,
2265 "fnv1a-384",
2266 "fp-sharded",
2267 vec![test_shard(0, 1, true)],
2268 );
2269 assert!(
2270 shards
2271 .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2272 .complete
2273 );
2274
2275 let manifest = SemanticManifest::default();
2276 let policy = test_policy();
2277 assert_eq!(
2278 manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2279 TierReadiness::Missing,
2280 "sidecar shards must not publish runtime semantic readiness"
2281 );
2282 }
2283
2284 #[test]
2285 fn shard_manifest_invalidates_incompatible_shards() {
2286 let mut bad = test_shard(0, 1, true);
2287 bad.schema_version = 0;
2288 let mut shards = SemanticShardManifest {
2289 shards: vec![bad, test_shard(0, 1, true)],
2290 ..Default::default()
2291 };
2292
2293 let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2294
2295 assert_eq!(invalidated, 1);
2296 assert_eq!(shards.shards.len(), 1);
2297 assert_eq!(shards.total_size_bytes(), 4096);
2298 }
2299}