1use std::fs::{self, OpenOptions};
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use ring::rand::{SecureRandom, SystemRandom};
33use serde::{Deserialize, Serialize};
34
35use super::policy::{
36 CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
37 SemanticAssetManifest as PolicyManifest, SemanticPolicy,
38};
39
40pub const MANIFEST_FORMAT_VERSION: u32 = 2;
67
68pub const MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR: u32 = 1;
73
74pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
76
77pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
79
80pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
81 let trimmed = recorded_path.trim();
82 if trimmed.is_empty() || trimmed != recorded_path {
83 return false;
84 }
85 let path = Path::new(recorded_path);
86 !path.is_absolute()
87 && path
88 .components()
89 .all(|component| matches!(component, Component::Normal(_)))
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
96#[serde(rename_all = "snake_case")]
97pub enum TierKind {
98 Fast,
99 Quality,
100}
101
102impl TierKind {
103 pub fn as_str(&self) -> &'static str {
104 match self {
105 Self::Fast => "fast",
106 Self::Quality => "quality",
107 }
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(rename_all = "snake_case")]
116pub enum TierReadiness {
117 Ready,
119 Building { progress_pct: u8 },
121 Stale { reason: String },
123 Missing,
125 Incompatible { reason: String },
127}
128
129impl TierReadiness {
130 pub fn is_ready(&self) -> bool {
131 matches!(self, Self::Ready)
132 }
133
134 pub fn is_usable(&self) -> bool {
135 matches!(self, Self::Ready | Self::Stale { .. })
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
143pub struct ArtifactRecord {
144 pub tier: TierKind,
146 pub embedder_id: String,
148 pub model_revision: String,
150 pub schema_version: u32,
152 pub chunking_version: u32,
154 pub dimension: usize,
156 pub doc_count: u64,
158 pub conversation_count: u64,
160 pub db_fingerprint: String,
162 pub index_path: String,
164 pub size_bytes: u64,
166 pub started_at_ms: i64,
168 pub completed_at_ms: i64,
170 pub ready: bool,
172}
173
174impl ArtifactRecord {
175 pub fn to_policy_manifest(&self) -> PolicyManifest {
177 PolicyManifest {
178 embedder_id: self.embedder_id.clone(),
179 model_revision: self.model_revision.clone(),
180 schema_version: self.schema_version,
181 chunking_version: self.chunking_version,
182 doc_count: self.doc_count,
183 built_at_ms: self.completed_at_ms,
184 }
185 }
186
187 pub fn readiness(
197 &self,
198 policy: &SemanticPolicy,
199 current_db_fingerprint: &str,
200 current_model_revision: &str,
201 ) -> TierReadiness {
202 let action = self.to_policy_manifest().invalidation_action(
203 policy,
204 current_model_revision,
205 &self.embedder_id,
206 );
207
208 match action {
209 InvalidationAction::UpToDate => {
210 if self.db_fingerprint != current_db_fingerprint {
211 TierReadiness::Stale {
212 reason: "DB content changed since artifact was built".to_owned(),
213 }
214 } else if !self.ready {
215 TierReadiness::Building { progress_pct: 100 }
216 } else {
217 TierReadiness::Ready
218 }
219 }
220 InvalidationAction::RebuildInBackground => TierReadiness::Stale {
221 reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
222 },
223 InvalidationAction::DiscardAndRebuild { reason } => {
224 TierReadiness::Incompatible { reason }
225 }
226 InvalidationAction::Evict => TierReadiness::Incompatible {
227 reason: "semantic mode set to lexical-only".to_owned(),
228 },
229 }
230 }
231}
232
233#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
237pub struct HnswRecord {
238 pub base_tier: TierKind,
240 pub embedder_id: String,
242 pub ef_search: usize,
244 pub index_path: String,
246 pub size_bytes: u64,
248 pub built_at_ms: i64,
250 pub ready: bool,
252}
253
254#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
264pub struct SemanticShardRecord {
265 pub tier: TierKind,
267 pub embedder_id: String,
269 pub model_revision: String,
271 pub schema_version: u32,
273 pub chunking_version: u32,
275 pub dimension: usize,
277 pub shard_index: u32,
279 pub shard_count: u32,
281 pub doc_count: u64,
283 pub total_conversations: u64,
285 pub db_fingerprint: String,
287 pub index_path: String,
289 pub quantization: String,
291 pub mmap_ready: bool,
293 pub ann_index_path: Option<String>,
295 pub ann_size_bytes: u64,
297 pub ann_ready: bool,
299 pub size_bytes: u64,
301 pub started_at_ms: i64,
303 pub completed_at_ms: i64,
305 pub ready: bool,
307}
308
309impl SemanticShardRecord {
310 pub fn to_policy_manifest(&self) -> PolicyManifest {
311 PolicyManifest {
312 embedder_id: self.embedder_id.clone(),
313 model_revision: self.model_revision.clone(),
314 schema_version: self.schema_version,
315 chunking_version: self.chunking_version,
316 doc_count: self.doc_count,
317 built_at_ms: self.completed_at_ms,
318 }
319 }
320
321 pub fn matches_generation(
322 &self,
323 tier: TierKind,
324 embedder_id: &str,
325 db_fingerprint: &str,
326 ) -> bool {
327 self.tier == tier
328 && self.embedder_id == embedder_id
329 && self.db_fingerprint == db_fingerprint
330 }
331}
332
333#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
335pub struct SemanticShardSummary {
336 pub shard_count: u32,
337 pub ready_shards: u32,
338 pub ann_ready_shards: u32,
339 pub doc_count: u64,
340 pub total_conversations: u64,
341 pub size_bytes: u64,
342 pub ann_size_bytes: u64,
343 pub complete: bool,
344}
345
346#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
348pub struct SemanticShardManifest {
349 pub manifest_version: u32,
350 pub shards: Vec<SemanticShardRecord>,
351 pub updated_at_ms: i64,
352}
353
354impl Default for SemanticShardManifest {
355 fn default() -> Self {
356 Self {
357 manifest_version: MANIFEST_FORMAT_VERSION,
358 shards: Vec::new(),
359 updated_at_ms: 0,
360 }
361 }
362}
363
364impl SemanticShardManifest {
365 pub fn path(data_dir: &Path) -> PathBuf {
366 data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
367 }
368
369 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
370 let path = Self::path(data_dir);
371 let bytes = match fs::read(&path) {
372 Ok(bytes) => bytes,
373 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
374 Err(e) => {
375 return Err(ManifestError::Io {
376 path,
377 source: e.to_string(),
378 });
379 }
380 };
381
382 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
383 path: path.clone(),
384 source: e.to_string(),
385 })?;
386
387 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
388 return Err(ManifestError::UnsupportedVersion {
389 found: manifest.manifest_version,
390 max_supported: MANIFEST_FORMAT_VERSION,
391 });
392 }
393
394 Ok(Some(manifest))
395 }
396
397 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
398 match Self::load(data_dir) {
399 Ok(Some(manifest)) => Ok(manifest),
400 Ok(None) => Ok(Self::default()),
401 Err(e @ ManifestError::Io { .. }) => Err(e),
402 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
403 Ok(Self::default())
404 }
405 Err(e) => Err(e),
406 }
407 }
408
409 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
410 let path = Self::path(data_dir);
411 if let Some(parent) = path.parent() {
412 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
413 path: parent.to_path_buf(),
414 source: e.to_string(),
415 })?;
416 }
417
418 self.updated_at_ms = now_ms();
419 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
420 source: e.to_string(),
421 })?;
422 let (tmp_path, mut file) =
423 create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
424 path: path.clone(),
425 source: e.to_string(),
426 })?;
427 file.write_all(json.as_bytes())
428 .map_err(|e| ManifestError::Io {
429 path: tmp_path.clone(),
430 source: e.to_string(),
431 })?;
432 file.sync_all().map_err(|e| ManifestError::Io {
433 path: tmp_path.clone(),
434 source: e.to_string(),
435 })?;
436 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
437 path: path.clone(),
438 source: e.to_string(),
439 })?;
440 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
441 path: path
442 .parent()
443 .map(Path::to_path_buf)
444 .unwrap_or_else(|| path.clone()),
445 source: e.to_string(),
446 })?;
447
448 Ok(())
449 }
450
451 pub fn replace_shards_for_generation(
452 &mut self,
453 tier: TierKind,
454 embedder_id: &str,
455 db_fingerprint: &str,
456 mut shards: Vec<SemanticShardRecord>,
457 ) {
458 self.shards
459 .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
460 self.shards.append(&mut shards);
461 self.shards.sort_by(|a, b| {
462 (
463 a.tier.as_str(),
464 &a.embedder_id,
465 &a.db_fingerprint,
466 a.shard_index,
467 )
468 .cmp(&(
469 b.tier.as_str(),
470 &b.embedder_id,
471 &b.db_fingerprint,
472 b.shard_index,
473 ))
474 });
475 }
476
477 pub fn summary(
478 &self,
479 tier: TierKind,
480 embedder_id: &str,
481 db_fingerprint: &str,
482 ) -> SemanticShardSummary {
483 let mut summary = SemanticShardSummary::default();
484 let mut ready_indices = std::collections::BTreeSet::new();
485 let mut ann_ready_indices = std::collections::BTreeSet::new();
486 let mut seen_indices = std::collections::BTreeSet::new();
487 let mut seen_index_paths = std::collections::BTreeSet::new();
488 let mut seen_ann_index_paths = std::collections::BTreeSet::new();
489 let mut expected_shard_count = None;
490 let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
491 let mut generation_consistent = true;
492 for shard in self
493 .shards
494 .iter()
495 .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
496 {
497 if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
498 generation_consistent = false;
499 }
500 if !seen_indices.insert(shard.shard_index) {
501 generation_consistent = false;
502 }
503 if !semantic_shard_artifact_path_is_safe(&shard.index_path)
504 || !seen_index_paths.insert(&shard.index_path)
505 {
506 generation_consistent = false;
507 }
508 match expected_shard_count {
509 Some(expected) if expected != shard.shard_count => {
510 generation_consistent = false;
511 }
512 None => expected_shard_count = Some(shard.shard_count),
513 _ => {}
514 }
515 let generation_metadata = (
516 shard.model_revision.as_str(),
517 shard.schema_version,
518 shard.chunking_version,
519 shard.dimension,
520 shard.total_conversations,
521 shard.quantization.as_str(),
522 );
523 match expected_generation_metadata {
524 Some(expected) if expected != generation_metadata => {
525 generation_consistent = false;
526 }
527 None => expected_generation_metadata = Some(generation_metadata),
528 _ => {}
529 }
530 if shard.schema_version != SEMANTIC_SCHEMA_VERSION
531 || shard.chunking_version != CHUNKING_STRATEGY_VERSION
532 || shard.dimension == 0
533 {
534 generation_consistent = false;
535 }
536 summary.shard_count = summary.shard_count.max(shard.shard_count);
537 summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
538 summary.total_conversations =
539 summary.total_conversations.max(shard.total_conversations);
540 summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
541 summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
542 if shard.ready && shard.mmap_ready {
543 ready_indices.insert(shard.shard_index);
544 }
545 if shard.ready
546 && shard.mmap_ready
547 && shard.ann_ready
548 && shard.ann_size_bytes > 0
549 && let Some(ann_index_path) = shard.ann_index_path.as_deref()
550 && semantic_shard_artifact_path_is_safe(ann_index_path)
551 && seen_ann_index_paths.insert(ann_index_path)
552 {
553 ann_ready_indices.insert(shard.shard_index);
554 }
555 }
556 summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
557 summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
558 summary.complete = generation_consistent
559 && summary.shard_count > 0
560 && summary.ready_shards == summary.shard_count
561 && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
562 summary
563 }
564
565 pub fn invalidate_incompatible(
566 &mut self,
567 policy: &SemanticPolicy,
568 current_model_revision: &str,
569 ) -> usize {
570 let before = self.shards.len();
571 self.shards.retain(|shard| {
572 !matches!(
573 shard.to_policy_manifest().invalidation_action(
574 policy,
575 current_model_revision,
576 &shard.embedder_id,
577 ),
578 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
579 )
580 });
581 before.saturating_sub(self.shards.len())
582 }
583
584 pub fn total_size_bytes(&self) -> u64 {
585 self.shards
586 .iter()
587 .map(|shard| shard.size_bytes)
588 .fold(0, u64::saturating_add)
589 }
590}
591
592#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
604pub struct BuildCheckpoint {
605 pub tier: TierKind,
607 pub embedder_id: String,
609 pub last_offset: i64,
611 pub docs_embedded: u64,
613 pub conversations_processed: u64,
615 pub total_conversations: u64,
617 pub db_fingerprint: String,
619 pub schema_version: u32,
621 pub chunking_version: u32,
623 pub saved_at_ms: i64,
625 #[serde(default, skip_serializing_if = "Option::is_none")]
632 pub last_message_id: Option<i64>,
633}
634
635impl BuildCheckpoint {
636 pub fn progress_pct(&self) -> u8 {
638 if self.total_conversations == 0 {
639 return 0;
640 }
641 let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
642 (pct as u8).min(100)
643 }
644
645 pub fn is_complete(&self) -> bool {
647 self.conversations_processed >= self.total_conversations
648 }
649
650 pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
652 self.db_fingerprint == current_db_fingerprint
653 && self.schema_version == SEMANTIC_SCHEMA_VERSION
654 && self.chunking_version == CHUNKING_STRATEGY_VERSION
655 }
656}
657
658#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
662pub struct BacklogLedger {
663 pub total_conversations: u64,
665 pub fast_tier_processed: u64,
667 pub quality_tier_processed: u64,
669 pub db_fingerprint: String,
671 pub computed_at_ms: i64,
673}
674
675impl BacklogLedger {
676 pub fn fast_tier_remaining(&self) -> u64 {
678 self.total_conversations
679 .saturating_sub(self.fast_tier_processed)
680 }
681
682 pub fn quality_tier_remaining(&self) -> u64 {
684 self.total_conversations
685 .saturating_sub(self.quality_tier_processed)
686 }
687
688 pub fn has_pending_work(&self) -> bool {
690 self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
691 }
692
693 pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
695 self.db_fingerprint == current_db_fingerprint
696 }
697}
698
699#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
706pub struct SemanticManifest {
707 pub manifest_version: u32,
709 pub fast_tier: Option<ArtifactRecord>,
711 pub quality_tier: Option<ArtifactRecord>,
713 pub hnsw: Option<HnswRecord>,
715 pub backlog: BacklogLedger,
717 pub checkpoint: Option<BuildCheckpoint>,
719 pub updated_at_ms: i64,
721}
722
723impl Default for SemanticManifest {
724 fn default() -> Self {
725 Self {
726 manifest_version: MANIFEST_FORMAT_VERSION,
727 fast_tier: None,
728 quality_tier: None,
729 hnsw: None,
730 backlog: BacklogLedger {
731 total_conversations: 0,
732 fast_tier_processed: 0,
733 quality_tier_processed: 0,
734 db_fingerprint: String::new(),
735 computed_at_ms: 0,
736 },
737 checkpoint: None,
738 updated_at_ms: 0,
739 }
740 }
741}
742
743impl SemanticManifest {
744 pub fn path(data_dir: &Path) -> PathBuf {
748 data_dir.join("vector_index").join(MANIFEST_FILENAME)
749 }
750
751 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
763 let path = Self::path(data_dir);
764 let bytes = match fs::read(&path) {
765 Ok(b) => b,
766 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
767 Err(e) => {
768 return Err(ManifestError::Io {
769 path,
770 source: e.to_string(),
771 });
772 }
773 };
774
775 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
776 path: path.clone(),
777 source: e.to_string(),
778 })?;
779
780 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
782 return Err(ManifestError::UnsupportedVersion {
783 found: manifest.manifest_version,
784 max_supported: MANIFEST_FORMAT_VERSION,
785 });
786 }
787
788 if manifest.manifest_version <= MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR
794 && manifest
795 .checkpoint
796 .as_ref()
797 .is_some_and(|cp| cp.last_message_id.is_none())
798 {
799 tracing::warn!(
800 manifest_version = manifest.manifest_version,
801 supported_version = MANIFEST_FORMAT_VERSION,
802 path = %path.display(),
803 "semantic checkpoint manifest predates last_message_id cursor (cass#257 sub-fix 2); resume will fall back to conversation offset until the next checkpoint save"
804 );
805 }
806
807 Ok(Some(manifest))
808 }
809
810 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
817 match Self::load(data_dir) {
818 Ok(Some(manifest)) => Ok(manifest),
819 Ok(None) => Ok(Self::default()),
820 Err(e @ ManifestError::Io { .. }) => Err(e),
822 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
824 Ok(Self::default())
825 }
826 Err(e) => Err(e),
827 }
828 }
829
830 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
832 let path = Self::path(data_dir);
833
834 if let Some(parent) = path.parent() {
836 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
837 path: parent.to_path_buf(),
838 source: e.to_string(),
839 })?;
840 }
841
842 self.updated_at_ms = now_ms();
843
844 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
845 source: e.to_string(),
846 })?;
847
848 let (tmp_path, mut file) =
850 create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
851 path: path.clone(),
852 source: e.to_string(),
853 })?;
854 file.write_all(json.as_bytes())
855 .map_err(|e| ManifestError::Io {
856 path: tmp_path.clone(),
857 source: e.to_string(),
858 })?;
859 file.sync_all().map_err(|e| ManifestError::Io {
860 path: tmp_path.clone(),
861 source: e.to_string(),
862 })?;
863 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
864 path: path.clone(),
865 source: e.to_string(),
866 })?;
867 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
868 path: path
869 .parent()
870 .map(Path::to_path_buf)
871 .unwrap_or_else(|| path.clone()),
872 source: e.to_string(),
873 })?;
874
875 Ok(())
876 }
877
878 pub fn fast_tier_readiness(
882 &self,
883 policy: &SemanticPolicy,
884 current_db_fingerprint: &str,
885 current_model_revision: &str,
886 ) -> TierReadiness {
887 match &self.fast_tier {
888 Some(artifact) => {
889 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
890 }
891 None => {
892 if let Some(cp) = &self.checkpoint
894 && cp.tier == TierKind::Fast
895 && cp.is_valid(current_db_fingerprint)
896 {
897 TierReadiness::Building {
898 progress_pct: cp.progress_pct(),
899 }
900 } else {
901 TierReadiness::Missing
902 }
903 }
904 }
905 }
906
907 pub fn quality_tier_readiness(
909 &self,
910 policy: &SemanticPolicy,
911 current_db_fingerprint: &str,
912 current_model_revision: &str,
913 ) -> TierReadiness {
914 match &self.quality_tier {
915 Some(artifact) => {
916 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
917 }
918 None => {
919 if let Some(cp) = &self.checkpoint
920 && cp.tier == TierKind::Quality
921 && cp.is_valid(current_db_fingerprint)
922 {
923 TierReadiness::Building {
924 progress_pct: cp.progress_pct(),
925 }
926 } else {
927 TierReadiness::Missing
928 }
929 }
930 }
931 }
932
933 pub fn can_hybrid_search(
935 &self,
936 policy: &SemanticPolicy,
937 current_db_fingerprint: &str,
938 current_model_revision: &str,
939 ) -> bool {
940 self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
941 .is_usable()
942 }
943
944 pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
948 let fast_processed = self
949 .fast_tier
950 .as_ref()
951 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
952 .map_or(0, |a| a.conversation_count);
953 let quality_processed = self
954 .quality_tier
955 .as_ref()
956 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
957 .map_or(0, |a| a.conversation_count);
958
959 self.backlog = BacklogLedger {
960 total_conversations,
961 fast_tier_processed: fast_processed,
962 quality_tier_processed: quality_processed,
963 db_fingerprint: current_db_fingerprint.to_owned(),
964 computed_at_ms: now_ms(),
965 };
966 }
967
968 pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
970 self.checkpoint = Some(checkpoint);
971 }
972
973 pub fn clear_checkpoint(&mut self) {
975 self.checkpoint = None;
976 }
977
978 pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
980 if self
982 .checkpoint
983 .as_ref()
984 .is_some_and(|cp| cp.tier == artifact.tier)
985 {
986 self.checkpoint = None;
987 }
988
989 match artifact.tier {
990 TierKind::Fast => self.fast_tier = Some(artifact),
991 TierKind::Quality => self.quality_tier = Some(artifact),
992 }
993 }
994
995 pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
997 self.hnsw = Some(hnsw);
998 }
999
1000 #[allow(clippy::too_many_arguments)]
1003 pub fn adopt_legacy_artifact(
1004 &mut self,
1005 tier: TierKind,
1006 embedder_id: &str,
1007 model_revision: &str,
1008 dimension: usize,
1009 doc_count: u64,
1010 conversation_count: u64,
1011 db_fingerprint: &str,
1012 index_path: &str,
1013 size_bytes: u64,
1014 ) -> bool {
1015 let record = ArtifactRecord {
1016 tier,
1017 embedder_id: embedder_id.to_owned(),
1018 model_revision: model_revision.to_owned(),
1019 schema_version: SEMANTIC_SCHEMA_VERSION,
1020 chunking_version: CHUNKING_STRATEGY_VERSION,
1021 dimension,
1022 doc_count,
1023 conversation_count,
1024 db_fingerprint: db_fingerprint.to_owned(),
1025 index_path: index_path.to_owned(),
1026 size_bytes,
1027 started_at_ms: 0,
1028 completed_at_ms: now_ms(),
1029 ready: true,
1030 };
1031
1032 match tier {
1033 TierKind::Fast => self.fast_tier = Some(record),
1034 TierKind::Quality => self.quality_tier = Some(record),
1035 }
1036 true
1037 }
1038
1039 pub fn invalidate_incompatible(
1049 &mut self,
1050 policy: &SemanticPolicy,
1051 current_model_revision: &str,
1052 ) -> usize {
1053 let mut count = 0;
1054
1055 if let Some(ref artifact) = self.fast_tier {
1056 let pm = artifact.to_policy_manifest();
1057 if matches!(
1058 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1059 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1060 ) {
1061 self.fast_tier = None;
1062 count += 1;
1063 }
1064 }
1065
1066 if let Some(ref artifact) = self.quality_tier {
1067 let pm = artifact.to_policy_manifest();
1068 if matches!(
1069 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1070 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1071 ) {
1072 self.quality_tier = None;
1073 count += 1;
1074 }
1075 }
1076
1077 if let Some(ref hnsw) = self.hnsw {
1079 let base_gone = match hnsw.base_tier {
1080 TierKind::Fast => self.fast_tier.is_none(),
1081 TierKind::Quality => self.quality_tier.is_none(),
1082 };
1083 if base_gone {
1084 self.hnsw = None;
1085 count += 1;
1086 }
1087 }
1088
1089 if let Some(ref cp) = self.checkpoint
1091 && (cp.schema_version != policy.semantic_schema_version
1092 || cp.chunking_version != policy.chunking_strategy_version)
1093 {
1094 self.checkpoint = None;
1095 }
1096
1097 count
1098 }
1099
1100 pub fn total_size_bytes(&self) -> u64 {
1102 let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1103 let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1104 let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1105 fast + quality + hnsw
1106 }
1107
1108 pub fn total_size_mb(&self) -> u64 {
1110 self.total_size_bytes().div_ceil(1_048_576)
1111 }
1112}
1113
1114#[derive(Debug)]
1117pub enum ManifestError {
1118 Io { path: PathBuf, source: String },
1119 Parse { path: PathBuf, source: String },
1120 Serialize { source: String },
1121 UnsupportedVersion { found: u32, max_supported: u32 },
1122}
1123
1124impl std::fmt::Display for ManifestError {
1125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1126 match self {
1127 Self::Io { path, source } => {
1128 write!(f, "manifest I/O error at {}: {source}", path.display())
1129 }
1130 Self::Parse { path, source } => {
1131 write!(f, "manifest parse error at {}: {source}", path.display())
1132 }
1133 Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1134 Self::UnsupportedVersion {
1135 found,
1136 max_supported,
1137 } => write!(
1138 f,
1139 "manifest version {found} is newer than supported version {max_supported}"
1140 ),
1141 }
1142 }
1143}
1144
1145impl std::error::Error for ManifestError {}
1146
1147fn now_ms() -> i64 {
1150 SystemTime::now()
1151 .duration_since(UNIX_EPOCH)
1152 .map(|d| d.as_millis() as i64)
1153 .unwrap_or(0)
1154}
1155
1156fn unique_manifest_temp_path(path: &Path, attempt: u32, random: u64) -> PathBuf {
1157 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1158
1159 let file_name = path
1160 .file_name()
1161 .and_then(|name| name.to_str())
1162 .unwrap_or(MANIFEST_FILENAME);
1163 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1164 path.with_file_name(format!(
1165 ".{file_name}.tmp.{attempt}.{}.{}.{random:016x}",
1166 now_ms(),
1167 nonce
1168 ))
1169}
1170
1171fn create_unique_manifest_temp_file(path: &Path) -> std::io::Result<(PathBuf, fs::File)> {
1172 for attempt in 0..100 {
1173 let random = random_manifest_path_nonce()?;
1174 let tmp_path = unique_manifest_temp_path(path, attempt, random);
1175 match OpenOptions::new()
1176 .write(true)
1177 .create_new(true)
1178 .open(&tmp_path)
1179 {
1180 Ok(file) => return Ok((tmp_path, file)),
1181 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
1182 Err(err) => return Err(err),
1183 }
1184 }
1185
1186 Err(std::io::Error::new(
1187 std::io::ErrorKind::AlreadyExists,
1188 format!(
1189 "could not create a unique temporary manifest file for {} after 100 attempts",
1190 path.display()
1191 ),
1192 ))
1193}
1194
1195#[cfg(windows)]
1196fn unique_manifest_backup_path(path: &Path) -> std::io::Result<PathBuf> {
1197 let random = random_manifest_path_nonce()?;
1198 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1199
1200 let file_name = path
1201 .file_name()
1202 .and_then(|name| name.to_str())
1203 .unwrap_or(MANIFEST_FILENAME);
1204 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1205 Ok(path.with_file_name(format!(
1206 ".{file_name}.bak.{}.{nonce}.{random:016x}",
1207 now_ms()
1208 )))
1209}
1210
1211fn random_manifest_path_nonce() -> std::io::Result<u64> {
1212 let mut random_bytes = [0u8; 8];
1213 SystemRandom::new()
1214 .fill(&mut random_bytes)
1215 .map_err(|_| std::io::Error::other("failed to generate manifest temp path nonce"))?;
1216 Ok(u64::from_le_bytes(random_bytes))
1217}
1218
1219fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1220 #[cfg(windows)]
1221 {
1222 match fs::rename(temp_path, final_path) {
1223 Ok(()) => sync_parent_directory(final_path),
1224 Err(first_err)
1225 if final_path.exists()
1226 && matches!(
1227 first_err.kind(),
1228 std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1229 ) =>
1230 {
1231 let backup_path = unique_manifest_backup_path(final_path)?;
1232 fs::rename(final_path, &backup_path).map_err(|backup_err| {
1233 let _ = fs::remove_file(temp_path);
1234 std::io::Error::other(format!(
1235 "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1236 backup_path.display(),
1237 final_path.display(),
1238 first_err,
1239 backup_err
1240 ))
1241 })?;
1242 match fs::rename(temp_path, final_path) {
1243 Ok(()) => {
1244 let _ = fs::remove_file(&backup_path);
1245 sync_parent_directory(final_path)
1246 }
1247 Err(second_err) => match fs::rename(&backup_path, final_path) {
1248 Ok(()) => {
1249 let _ = fs::remove_file(temp_path);
1250 sync_parent_directory(final_path)?;
1251 Err(std::io::Error::other(format!(
1252 "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1253 final_path.display(),
1254 temp_path.display(),
1255 first_err,
1256 second_err
1257 )))
1258 }
1259 Err(restore_err) => Err(std::io::Error::other(format!(
1260 "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1261 final_path.display(),
1262 temp_path.display(),
1263 first_err,
1264 second_err,
1265 restore_err,
1266 temp_path.display()
1267 ))),
1268 },
1269 }
1270 }
1271 Err(err) => Err(err),
1272 }
1273 }
1274
1275 #[cfg(not(windows))]
1276 {
1277 fs::rename(temp_path, final_path)
1278 }
1279}
1280
1281#[cfg(not(windows))]
1282fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1283 let Some(parent) = path.parent() else {
1284 return Ok(());
1285 };
1286 let directory = fs::File::open(parent)?;
1287 directory.sync_all()
1288}
1289
1290#[cfg(windows)]
1291fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1292 Ok(())
1293}
1294
1295#[cfg(test)]
1298mod tests {
1299 use super::*;
1300 use crate::search::policy::SemanticPolicy;
1301
1302 fn test_policy() -> SemanticPolicy {
1303 SemanticPolicy::compiled_defaults()
1304 }
1305
1306 fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1307 ArtifactRecord {
1308 tier,
1309 embedder_id: match tier {
1310 TierKind::Fast => "fnv1a-384".to_owned(),
1311 TierKind::Quality => "minilm-384".to_owned(),
1312 },
1313 model_revision: "abc123".to_owned(),
1314 schema_version: SEMANTIC_SCHEMA_VERSION,
1315 chunking_version: CHUNKING_STRATEGY_VERSION,
1316 dimension: 384,
1317 doc_count: 1000,
1318 conversation_count: 250,
1319 db_fingerprint: "fp-1234".to_owned(),
1320 index_path: format!(
1321 "vector_index/index-{}.fsvi",
1322 match tier {
1323 TierKind::Fast => "fnv1a-384",
1324 TierKind::Quality => "minilm-384",
1325 }
1326 ),
1327 size_bytes: 150_000,
1328 started_at_ms: 1_700_000_000_000,
1329 completed_at_ms: 1_700_000_060_000,
1330 ready,
1331 }
1332 }
1333
1334 fn test_hnsw() -> HnswRecord {
1335 HnswRecord {
1336 base_tier: TierKind::Quality,
1337 embedder_id: "minilm-384".to_owned(),
1338 ef_search: 128,
1339 index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1340 size_bytes: 50_000,
1341 built_at_ms: 1_700_000_070_000,
1342 ready: true,
1343 }
1344 }
1345
1346 fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1347 SemanticShardRecord {
1348 tier: TierKind::Fast,
1349 embedder_id: "fnv1a-384".to_owned(),
1350 model_revision: "hash".to_owned(),
1351 schema_version: SEMANTIC_SCHEMA_VERSION,
1352 chunking_version: CHUNKING_STRATEGY_VERSION,
1353 dimension: 384,
1354 shard_index,
1355 shard_count,
1356 doc_count: 25,
1357 total_conversations: 10,
1358 db_fingerprint: "fp-sharded".to_owned(),
1359 index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1360 quantization: "f16".to_owned(),
1361 mmap_ready: true,
1362 ann_index_path: None,
1363 ann_size_bytes: 0,
1364 ann_ready: false,
1365 size_bytes: 4096,
1366 started_at_ms: 1_700_000_080_000,
1367 completed_at_ms: 1_700_000_081_000,
1368 ready,
1369 }
1370 }
1371
1372 fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1373 BuildCheckpoint {
1374 tier,
1375 embedder_id: "minilm-384".to_owned(),
1376 last_offset: 500,
1377 docs_embedded: 3000,
1378 conversations_processed: 500,
1379 total_conversations: 1000,
1380 db_fingerprint: "fp-1234".to_owned(),
1381 schema_version: SEMANTIC_SCHEMA_VERSION,
1382 chunking_version: CHUNKING_STRATEGY_VERSION,
1383 saved_at_ms: 1_700_000_030_000,
1384 last_message_id: None,
1385 }
1386 }
1387
1388 #[derive(Debug, Clone, Copy)]
1389 enum ExpectedTierReadiness {
1390 Ready,
1391 Stale,
1392 Incompatible,
1393 Building(u8),
1394 }
1395
1396 fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1397
1398 type TierReadinessCase = (
1399 &'static str,
1400 TierKind,
1401 bool,
1402 &'static str,
1403 &'static str,
1404 fn(&mut ArtifactRecord),
1405 ExpectedTierReadiness,
1406 );
1407
1408 fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1409 artifact.schema_version = 0;
1410 }
1411
1412 fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1413 match expected {
1414 ExpectedTierReadiness::Ready => {
1415 assert_eq!(actual, TierReadiness::Ready, "{label}");
1416 }
1417 ExpectedTierReadiness::Stale => {
1418 assert!(
1419 matches!(actual, TierReadiness::Stale { .. }),
1420 "{label}: {actual:?}"
1421 );
1422 }
1423 ExpectedTierReadiness::Incompatible => {
1424 assert!(
1425 matches!(actual, TierReadiness::Incompatible { .. }),
1426 "{label}: {actual:?}"
1427 );
1428 }
1429 ExpectedTierReadiness::Building(progress_pct) => {
1430 assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1431 }
1432 }
1433 }
1434
1435 #[test]
1438 fn manifest_round_trip_via_disk() {
1439 let temp = tempfile::tempdir().unwrap();
1440 let mut manifest = SemanticManifest {
1441 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1442 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1443 hnsw: Some(test_hnsw()),
1444 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1445 backlog: BacklogLedger {
1446 total_conversations: 2000,
1447 fast_tier_processed: 1000,
1448 quality_tier_processed: 500,
1449 db_fingerprint: "fp-1234".to_owned(),
1450 computed_at_ms: 1_700_000_000_000,
1451 },
1452 ..Default::default()
1453 };
1454
1455 manifest.save(temp.path()).unwrap();
1456 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1457
1458 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1459 assert!(loaded.fast_tier.is_some());
1460 assert!(loaded.quality_tier.is_some());
1461 assert!(loaded.hnsw.is_some());
1462 assert!(loaded.checkpoint.is_some());
1463 assert_eq!(loaded.backlog.total_conversations, 2000);
1464 assert!(loaded.updated_at_ms > 0);
1465 }
1466
1467 #[test]
1468 fn manifest_save_overwrites_existing_file() {
1469 let temp = tempfile::tempdir().unwrap();
1470 let mut first = SemanticManifest {
1471 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1472 ..Default::default()
1473 };
1474 first.save(temp.path()).unwrap();
1475
1476 let mut second = SemanticManifest {
1477 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1478 backlog: BacklogLedger {
1479 total_conversations: 99,
1480 fast_tier_processed: 0,
1481 quality_tier_processed: 99,
1482 db_fingerprint: "fp-overwrite".to_owned(),
1483 computed_at_ms: 1_700_000_000_123,
1484 },
1485 ..Default::default()
1486 };
1487 second.save(temp.path()).unwrap();
1488
1489 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1490 assert!(loaded.fast_tier.is_none());
1491 assert!(loaded.quality_tier.is_some());
1492 assert_eq!(loaded.backlog.total_conversations, 99);
1493 }
1494
1495 #[test]
1496 fn manifest_temp_file_creation_is_exclusive_and_unique() -> Result<(), String> {
1497 let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1498 let final_path = SemanticManifest::path(temp.path());
1499 let manifest_dir = final_path
1500 .parent()
1501 .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1502 fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1503
1504 let (first_path, mut first_file) =
1505 create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1506 first_file.write_all(b"first").map_err(|e| e.to_string())?;
1507 let (second_path, mut second_file) =
1508 create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1509 second_file
1510 .write_all(b"second")
1511 .map_err(|e| e.to_string())?;
1512
1513 if first_path == second_path {
1514 return Err("exclusive temp creation reused the same path".to_string());
1515 }
1516 if !first_path.exists() {
1517 return Err(format!(
1518 "first temp file is missing: {}",
1519 first_path.display()
1520 ));
1521 }
1522 if !second_path.exists() {
1523 return Err(format!(
1524 "second temp file is missing: {}",
1525 second_path.display()
1526 ));
1527 }
1528 if first_path.parent() != Some(manifest_dir) {
1529 return Err(format!(
1530 "first temp path escaped manifest directory: {}",
1531 first_path.display()
1532 ));
1533 }
1534 if second_path.parent() != Some(manifest_dir) {
1535 return Err(format!(
1536 "second temp path escaped manifest directory: {}",
1537 second_path.display()
1538 ));
1539 }
1540
1541 Ok(())
1542 }
1543
1544 #[test]
1545 fn manifest_load_missing_returns_none() {
1546 let temp = tempfile::tempdir().unwrap();
1547 let loaded = SemanticManifest::load(temp.path()).unwrap();
1548 assert!(loaded.is_none());
1549 }
1550
1551 #[test]
1552 fn manifest_load_or_default_returns_defaults() {
1553 let temp = tempfile::tempdir().unwrap();
1554 let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1555 assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1556 assert!(manifest.fast_tier.is_none());
1557 assert!(manifest.quality_tier.is_none());
1558 }
1559
1560 #[test]
1561 fn manifest_load_corrupt_returns_parse_error() {
1562 let temp = tempfile::tempdir().unwrap();
1563 let path = SemanticManifest::path(temp.path());
1564 fs::create_dir_all(path.parent().unwrap()).unwrap();
1565 fs::write(&path, b"not json").unwrap();
1566
1567 let result = SemanticManifest::load(temp.path());
1568 assert!(matches!(result, Err(ManifestError::Parse { .. })));
1569 }
1570
1571 #[test]
1572 fn manifest_load_future_version_returns_error() {
1573 let temp = tempfile::tempdir().unwrap();
1574 let path = SemanticManifest::path(temp.path());
1575 fs::create_dir_all(path.parent().unwrap()).unwrap();
1576
1577 let manifest = SemanticManifest {
1578 manifest_version: MANIFEST_FORMAT_VERSION + 1,
1579 ..Default::default()
1580 };
1581 let json = serde_json::to_string(&manifest).unwrap();
1582 fs::write(&path, json).unwrap();
1583
1584 let result = SemanticManifest::load(temp.path());
1585 assert!(matches!(
1586 result,
1587 Err(ManifestError::UnsupportedVersion { .. })
1588 ));
1589 }
1590
1591 #[test]
1594 fn tier_readiness_cases() {
1595 let policy = test_policy();
1596 let db_fp = "fp-1234";
1597 let model_rev = "abc123";
1598 let cases: &[TierReadinessCase] = &[
1599 (
1600 "ready artifact with matching fingerprint",
1601 TierKind::Fast,
1602 true,
1603 db_fp,
1604 model_rev,
1605 no_artifact_mutation,
1606 ExpectedTierReadiness::Ready,
1607 ),
1608 (
1609 "ready artifact with changed DB fingerprint",
1610 TierKind::Fast,
1611 true,
1612 "different-fp",
1613 model_rev,
1614 no_artifact_mutation,
1615 ExpectedTierReadiness::Stale,
1616 ),
1617 (
1618 "ready artifact with changed model revision",
1619 TierKind::Quality,
1620 true,
1621 db_fp,
1622 "new-revision",
1623 no_artifact_mutation,
1624 ExpectedTierReadiness::Stale,
1625 ),
1626 (
1627 "schema version mismatch",
1628 TierKind::Quality,
1629 true,
1630 db_fp,
1631 model_rev,
1632 set_schema_version_to_zero,
1633 ExpectedTierReadiness::Incompatible,
1634 ),
1635 (
1636 "not yet published artifact",
1637 TierKind::Fast,
1638 false,
1639 db_fp,
1640 model_rev,
1641 no_artifact_mutation,
1642 ExpectedTierReadiness::Building(100),
1643 ),
1644 ];
1645
1646 for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1647 let mut artifact = test_artifact(*tier, *ready);
1648 mutate(&mut artifact);
1649 assert_tier_readiness(
1650 artifact.readiness(&policy, current_db_fp, current_model_rev),
1651 *expected,
1652 label,
1653 );
1654 }
1655 }
1656
1657 #[test]
1660 fn manifest_tier_readiness_missing() {
1661 let manifest = SemanticManifest::default();
1662 let policy = test_policy();
1663 assert_eq!(
1664 manifest.fast_tier_readiness(&policy, "fp", "rev"),
1665 TierReadiness::Missing,
1666 );
1667 assert_eq!(
1668 manifest.quality_tier_readiness(&policy, "fp", "rev"),
1669 TierReadiness::Missing,
1670 );
1671 }
1672
1673 #[test]
1674 fn manifest_tier_readiness_with_checkpoint() {
1675 let manifest = SemanticManifest {
1676 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1677 ..Default::default()
1678 };
1679
1680 let policy = test_policy();
1681 assert_eq!(
1683 manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1684 TierReadiness::Missing,
1685 );
1686 assert!(matches!(
1688 manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1689 TierReadiness::Building { progress_pct: 50 },
1690 ));
1691 }
1692
1693 #[test]
1694 fn manifest_tier_readiness_checkpoint_invalid_db() {
1695 let manifest = SemanticManifest {
1696 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1697 ..Default::default()
1698 };
1699
1700 let policy = test_policy();
1701 assert_eq!(
1703 manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1704 TierReadiness::Missing,
1705 );
1706 }
1707
1708 #[test]
1711 fn can_hybrid_search_requires_usable_fast_tier() {
1712 let policy = test_policy();
1713 let db_fp = "fp-1234";
1714 let rev = "abc123";
1715
1716 let manifest = SemanticManifest::default();
1718 assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1719
1720 let manifest = SemanticManifest {
1722 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1723 ..Default::default()
1724 };
1725 assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1726 }
1727
1728 #[test]
1731 fn backlog_remaining_and_pending() {
1732 let ledger = BacklogLedger {
1733 total_conversations: 1000,
1734 fast_tier_processed: 800,
1735 quality_tier_processed: 300,
1736 db_fingerprint: "fp".to_owned(),
1737 computed_at_ms: 0,
1738 };
1739
1740 assert_eq!(ledger.fast_tier_remaining(), 200);
1741 assert_eq!(ledger.quality_tier_remaining(), 700);
1742 assert!(ledger.has_pending_work());
1743 assert!(ledger.is_current("fp"));
1744 assert!(!ledger.is_current("other"));
1745 }
1746
1747 #[test]
1748 fn backlog_no_pending_when_fully_processed() {
1749 let ledger = BacklogLedger {
1750 total_conversations: 500,
1751 fast_tier_processed: 500,
1752 quality_tier_processed: 500,
1753 db_fingerprint: "fp".to_owned(),
1754 computed_at_ms: 0,
1755 };
1756
1757 assert_eq!(ledger.fast_tier_remaining(), 0);
1758 assert_eq!(ledger.quality_tier_remaining(), 0);
1759 assert!(!ledger.has_pending_work());
1760 }
1761
1762 #[test]
1765 fn checkpoint_progress_and_completion() {
1766 let cp = test_checkpoint(TierKind::Quality);
1767 assert_eq!(cp.progress_pct(), 50);
1768 assert!(!cp.is_complete());
1769 assert!(cp.is_valid("fp-1234"));
1770 assert!(!cp.is_valid("other-fp"));
1771
1772 let mut cp = test_checkpoint(TierKind::Quality);
1774 cp.conversations_processed = 1000;
1775 assert_eq!(cp.progress_pct(), 100);
1776 assert!(cp.is_complete());
1777 }
1778
1779 #[test]
1780 fn checkpoint_zero_total_gives_zero_pct() {
1781 let mut cp = test_checkpoint(TierKind::Fast);
1782 cp.total_conversations = 0;
1783 cp.conversations_processed = 0;
1784 assert_eq!(cp.progress_pct(), 0);
1785 }
1786
1787 #[test]
1790 fn publish_artifact_clears_matching_checkpoint() {
1791 let mut manifest = SemanticManifest {
1792 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1793 ..Default::default()
1794 };
1795
1796 manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1797 assert!(manifest.checkpoint.is_none());
1798 assert!(manifest.quality_tier.is_some());
1799 }
1800
1801 #[test]
1802 fn publish_artifact_keeps_non_matching_checkpoint() {
1803 let mut manifest = SemanticManifest {
1804 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1805 ..Default::default()
1806 };
1807
1808 manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1809 assert!(manifest.checkpoint.is_some()); assert!(manifest.fast_tier.is_some());
1811 }
1812
1813 #[test]
1816 fn refresh_backlog_computes_from_ready_artifacts() {
1817 let mut manifest = SemanticManifest {
1818 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1819 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1820 ..Default::default()
1821 };
1822
1823 manifest.refresh_backlog(2000, "fp-1234");
1824 assert_eq!(manifest.backlog.total_conversations, 2000);
1825 assert_eq!(manifest.backlog.fast_tier_processed, 250);
1826 assert_eq!(manifest.backlog.quality_tier_processed, 250);
1827 }
1828
1829 #[test]
1830 fn refresh_backlog_ignores_stale_artifacts() {
1831 let mut manifest = SemanticManifest {
1832 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1833 ..Default::default()
1834 };
1835
1836 manifest.refresh_backlog(2000, "different-fp");
1838 assert_eq!(manifest.backlog.fast_tier_processed, 0);
1839 }
1840
1841 #[test]
1844 fn invalidate_incompatible_removes_schema_mismatch() {
1845 let mut artifact = test_artifact(TierKind::Quality, true);
1846 artifact.schema_version = 0; let mut manifest = SemanticManifest {
1848 quality_tier: Some(artifact),
1849 hnsw: Some(test_hnsw()), ..Default::default()
1851 };
1852
1853 let policy = test_policy();
1854 let count = manifest.invalidate_incompatible(&policy, "abc123");
1855
1856 assert_eq!(count, 2); assert!(manifest.quality_tier.is_none());
1858 assert!(manifest.hnsw.is_none());
1859 }
1860
1861 #[test]
1862 fn invalidate_incompatible_keeps_compatible() {
1863 let mut manifest = SemanticManifest {
1864 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1865 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1866 ..Default::default()
1867 };
1868
1869 let policy = test_policy();
1870 let count = manifest.invalidate_incompatible(&policy, "abc123");
1871
1872 assert_eq!(count, 0);
1873 assert!(manifest.fast_tier.is_some());
1874 assert!(manifest.quality_tier.is_some());
1875 }
1876
1877 #[test]
1880 fn adopt_legacy_artifact() {
1881 let mut manifest = SemanticManifest::default();
1882 let doc_count = 500;
1883 let conversation_count = 125;
1884 let index_path = "vector_index/index-fnv1a-384.fsvi";
1885 let db_fingerprint = "fp-old";
1886 let size_bytes = 75_000;
1887 let adopted = manifest.adopt_legacy_artifact(
1888 TierKind::Fast,
1889 "fnv1a-384",
1890 "hash",
1891 384,
1892 doc_count,
1893 conversation_count,
1894 db_fingerprint,
1895 index_path,
1896 size_bytes,
1897 );
1898
1899 assert!(adopted);
1900 let fast = manifest.fast_tier.as_ref().unwrap();
1901 assert_eq!(fast.embedder_id, "fnv1a-384");
1902 assert!(fast.ready);
1903 assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1904 }
1905
1906 #[test]
1909 fn total_size_accounts_for_all_artifacts() {
1910 let manifest = SemanticManifest {
1911 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1912 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1913 hnsw: Some(test_hnsw()),
1914 ..Default::default()
1915 };
1916
1917 assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1918 assert_eq!(manifest.total_size_mb(), 1); }
1920
1921 #[test]
1922 fn total_size_empty_is_zero() {
1923 let manifest = SemanticManifest::default();
1924 assert_eq!(manifest.total_size_bytes(), 0);
1925 assert_eq!(manifest.total_size_mb(), 0);
1926 }
1927
1928 #[test]
1931 fn manifest_json_round_trip() {
1932 let manifest = SemanticManifest {
1933 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1934 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1935 hnsw: Some(test_hnsw()),
1936 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1937 ..Default::default()
1938 };
1939
1940 let json = serde_json::to_string_pretty(&manifest).unwrap();
1941 let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
1942 assert_eq!(deser.fast_tier, manifest.fast_tier);
1943 assert_eq!(deser.quality_tier, manifest.quality_tier);
1944 assert_eq!(deser.hnsw, manifest.hnsw);
1945 assert_eq!(deser.checkpoint, manifest.checkpoint);
1946 }
1947
1948 #[test]
1951 fn shard_manifest_round_trip_via_sidecar() {
1952 let temp = tempfile::tempdir().unwrap();
1953 let mut shards = SemanticShardManifest::default();
1954 shards.replace_shards_for_generation(
1955 TierKind::Fast,
1956 "fnv1a-384",
1957 "fp-sharded",
1958 vec![test_shard(1, 2, true), test_shard(0, 2, true)],
1959 );
1960
1961 shards.save(temp.path()).unwrap();
1962 let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
1963
1964 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1965 assert_eq!(loaded.shards.len(), 2);
1966 assert_eq!(loaded.shards[0].shard_index, 0);
1967 assert_eq!(loaded.shards[1].shard_index, 1);
1968 assert!(loaded.updated_at_ms > 0);
1969 }
1970
1971 #[test]
1972 fn shard_summary_requires_every_ready_shard() {
1973 let mut shards = SemanticShardManifest::default();
1974 shards.replace_shards_for_generation(
1975 TierKind::Fast,
1976 "fnv1a-384",
1977 "fp-sharded",
1978 vec![test_shard(0, 3, true), test_shard(2, 3, true)],
1979 );
1980
1981 let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1982 assert_eq!(partial.shard_count, 3);
1983 assert_eq!(partial.ready_shards, 2);
1984 assert!(!partial.complete);
1985
1986 shards.replace_shards_for_generation(
1987 TierKind::Fast,
1988 "fnv1a-384",
1989 "fp-sharded",
1990 vec![
1991 test_shard(0, 3, true),
1992 test_shard(1, 3, true),
1993 test_shard(2, 3, true),
1994 ],
1995 );
1996
1997 let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1998 assert_eq!(complete.ready_shards, 3);
1999 assert!(complete.complete);
2000 assert_eq!(complete.doc_count, 75);
2001 assert_eq!(complete.total_conversations, 10);
2002 }
2003
2004 #[test]
2005 fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
2006 let mut non_mmap = test_shard(0, 1, true);
2007 non_mmap.mmap_ready = false;
2008 let mut shards = SemanticShardManifest::default();
2009 shards.replace_shards_for_generation(
2010 TierKind::Fast,
2011 "fnv1a-384",
2012 "fp-sharded",
2013 vec![non_mmap],
2014 );
2015
2016 let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2017 assert_eq!(non_mmap_summary.ready_shards, 0);
2018 assert!(!non_mmap_summary.complete);
2019
2020 let mut inconsistent = test_shard(1, 3, true);
2021 inconsistent.ann_ready = true;
2022 inconsistent.ann_index_path = None;
2023 inconsistent.ann_size_bytes = 4096;
2024 shards.replace_shards_for_generation(
2025 TierKind::Fast,
2026 "fnv1a-384",
2027 "fp-sharded",
2028 vec![test_shard(0, 2, true), inconsistent],
2029 );
2030
2031 let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2032 assert_eq!(inconsistent_summary.shard_count, 3);
2033 assert_eq!(inconsistent_summary.ready_shards, 2);
2034 assert_eq!(inconsistent_summary.ann_ready_shards, 0);
2035 assert!(!inconsistent_summary.complete);
2036
2037 shards.replace_shards_for_generation(
2038 TierKind::Fast,
2039 "fnv1a-384",
2040 "fp-sharded",
2041 vec![
2042 test_shard(0, 2, true),
2043 test_shard(1, 2, true),
2044 test_shard(1, 2, false),
2045 ],
2046 );
2047 let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2048 assert_eq!(duplicate_summary.shard_count, 2);
2049 assert_eq!(duplicate_summary.ready_shards, 2);
2050 assert!(
2051 !duplicate_summary.complete,
2052 "duplicate shard indexes must not summarize as a complete generation"
2053 );
2054
2055 let mut duplicate_path = test_shard(1, 2, true);
2056 duplicate_path.index_path = test_shard(0, 2, true).index_path;
2057 shards.replace_shards_for_generation(
2058 TierKind::Fast,
2059 "fnv1a-384",
2060 "fp-sharded",
2061 vec![test_shard(0, 2, true), duplicate_path],
2062 );
2063 let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2064 assert_eq!(duplicate_path_summary.shard_count, 2);
2065 assert_eq!(duplicate_path_summary.ready_shards, 2);
2066 assert!(
2067 !duplicate_path_summary.complete,
2068 "duplicate shard index paths must not summarize as a complete generation"
2069 );
2070
2071 let mut blank_path = test_shard(0, 1, true);
2072 blank_path.index_path.clear();
2073 shards.replace_shards_for_generation(
2074 TierKind::Fast,
2075 "fnv1a-384",
2076 "fp-sharded",
2077 vec![blank_path],
2078 );
2079 let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2080 assert_eq!(blank_path_summary.shard_count, 1);
2081 assert_eq!(blank_path_summary.ready_shards, 1);
2082 assert!(
2083 !blank_path_summary.complete,
2084 "blank shard index paths must not summarize as complete"
2085 );
2086
2087 for unsafe_path in [
2088 tempfile::tempdir()
2089 .unwrap()
2090 .path()
2091 .join("outside.fsvi")
2092 .to_string_lossy()
2093 .to_string(),
2094 "vector_index/shards/../outside.fsvi".to_string(),
2095 "./vector_index/shards/fast/hash.fsvi".to_string(),
2096 " vector_index/shards/fast/hash.fsvi".to_string(),
2097 ] {
2098 let mut unsafe_shard = test_shard(0, 1, true);
2099 unsafe_shard.index_path = unsafe_path;
2100 shards.replace_shards_for_generation(
2101 TierKind::Fast,
2102 "fnv1a-384",
2103 "fp-sharded",
2104 vec![unsafe_shard],
2105 );
2106 let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2107 assert_eq!(unsafe_path_summary.shard_count, 1);
2108 assert_eq!(unsafe_path_summary.ready_shards, 1);
2109 assert!(
2110 !unsafe_path_summary.complete,
2111 "unsafe shard index paths must not summarize as complete"
2112 );
2113 }
2114
2115 let outside_ann_dir = tempfile::tempdir().unwrap();
2116 for unsafe_ann_path in [
2117 outside_ann_dir
2118 .path()
2119 .join("outside.chsw")
2120 .to_string_lossy()
2121 .to_string(),
2122 "vector_index/shards/../outside.chsw".to_string(),
2123 "./vector_index/shards/fast/hash.chsw".to_string(),
2124 " vector_index/shards/fast/hash.chsw".to_string(),
2125 ] {
2126 let mut unsafe_ann = test_shard(0, 1, true);
2127 unsafe_ann.ann_ready = true;
2128 unsafe_ann.ann_index_path = Some(unsafe_ann_path);
2129 unsafe_ann.ann_size_bytes = 4096;
2130 shards.replace_shards_for_generation(
2131 TierKind::Fast,
2132 "fnv1a-384",
2133 "fp-sharded",
2134 vec![unsafe_ann],
2135 );
2136 let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2137 assert_eq!(unsafe_ann_summary.shard_count, 1);
2138 assert_eq!(unsafe_ann_summary.ready_shards, 1);
2139 assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
2140 assert!(
2141 unsafe_ann_summary.complete,
2142 "unsafe optional ANN paths must not invalidate the vector shard generation"
2143 );
2144 }
2145
2146 let mut duplicate_ann_path = test_shard(1, 2, true);
2147 duplicate_ann_path.ann_ready = true;
2148 duplicate_ann_path.ann_index_path =
2149 Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
2150 duplicate_ann_path.ann_size_bytes = 4096;
2151 let mut first_ann_path = test_shard(0, 2, true);
2152 first_ann_path.ann_ready = true;
2153 first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2154 first_ann_path.ann_size_bytes = 4096;
2155 shards.replace_shards_for_generation(
2156 TierKind::Fast,
2157 "fnv1a-384",
2158 "fp-sharded",
2159 vec![first_ann_path, duplicate_ann_path],
2160 );
2161 let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2162 assert_eq!(duplicate_ann_summary.shard_count, 2);
2163 assert_eq!(duplicate_ann_summary.ready_shards, 2);
2164 assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2165 assert!(
2166 duplicate_ann_summary.complete,
2167 "duplicate optional ANN paths must not invalidate the vector shard generation"
2168 );
2169
2170 shards.replace_shards_for_generation(
2171 TierKind::Fast,
2172 "fnv1a-384",
2173 "fp-sharded",
2174 vec![test_shard(2, 2, true)],
2175 );
2176 let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2177 assert_eq!(out_of_range_summary.shard_count, 2);
2178 assert_eq!(out_of_range_summary.ready_shards, 1);
2179 assert!(
2180 !out_of_range_summary.complete,
2181 "shard indexes outside the declared shard count are malformed"
2182 );
2183
2184 let mut mismatched_metadata = test_shard(1, 2, true);
2185 mismatched_metadata.dimension = 768;
2186 shards.replace_shards_for_generation(
2187 TierKind::Fast,
2188 "fnv1a-384",
2189 "fp-sharded",
2190 vec![test_shard(0, 2, true), mismatched_metadata],
2191 );
2192 let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2193 assert_eq!(metadata_summary.shard_count, 2);
2194 assert_eq!(metadata_summary.ready_shards, 2);
2195 assert!(
2196 !metadata_summary.complete,
2197 "complete shard generations require consistent shard metadata"
2198 );
2199
2200 let mut stale_schema = test_shard(0, 1, true);
2201 stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2202 shards.replace_shards_for_generation(
2203 TierKind::Fast,
2204 "fnv1a-384",
2205 "fp-sharded",
2206 vec![stale_schema],
2207 );
2208 let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2209 assert_eq!(stale_schema_summary.shard_count, 1);
2210 assert_eq!(stale_schema_summary.ready_shards, 1);
2211 assert!(
2212 !stale_schema_summary.complete,
2213 "stale schema shards must not summarize as complete"
2214 );
2215 }
2216
2217 #[test]
2218 fn shard_sidecar_does_not_make_main_manifest_ready() {
2219 let mut shards = SemanticShardManifest::default();
2220 shards.replace_shards_for_generation(
2221 TierKind::Fast,
2222 "fnv1a-384",
2223 "fp-sharded",
2224 vec![test_shard(0, 1, true)],
2225 );
2226 assert!(
2227 shards
2228 .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2229 .complete
2230 );
2231
2232 let manifest = SemanticManifest::default();
2233 let policy = test_policy();
2234 assert_eq!(
2235 manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2236 TierReadiness::Missing,
2237 "sidecar shards must not publish runtime semantic readiness"
2238 );
2239 }
2240
2241 #[test]
2242 fn shard_manifest_invalidates_incompatible_shards() {
2243 let mut bad = test_shard(0, 1, true);
2244 bad.schema_version = 0;
2245 let mut shards = SemanticShardManifest {
2246 shards: vec![bad, test_shard(0, 1, true)],
2247 ..Default::default()
2248 };
2249
2250 let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2251
2252 assert_eq!(invalidated, 1);
2253 assert_eq!(shards.shards.len(), 1);
2254 assert_eq!(shards.total_size_bytes(), 4096);
2255 }
2256}