1use std::fs;
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use serde::{Deserialize, Serialize};
33
34use super::policy::{
35 CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
36 SemanticAssetManifest as PolicyManifest, SemanticPolicy,
37};
38
39pub const MANIFEST_FORMAT_VERSION: u32 = 1;
44
45pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
47
48pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
50
51pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
52 let trimmed = recorded_path.trim();
53 if trimmed.is_empty() || trimmed != recorded_path {
54 return false;
55 }
56 let path = Path::new(recorded_path);
57 !path.is_absolute()
58 && path
59 .components()
60 .all(|component| matches!(component, Component::Normal(_)))
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub enum TierKind {
69 Fast,
70 Quality,
71}
72
73impl TierKind {
74 pub fn as_str(&self) -> &'static str {
75 match self {
76 Self::Fast => "fast",
77 Self::Quality => "quality",
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum TierReadiness {
88 Ready,
90 Building { progress_pct: u8 },
92 Stale { reason: String },
94 Missing,
96 Incompatible { reason: String },
98}
99
100impl TierReadiness {
101 pub fn is_ready(&self) -> bool {
102 matches!(self, Self::Ready)
103 }
104
105 pub fn is_usable(&self) -> bool {
106 matches!(self, Self::Ready | Self::Stale { .. })
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
114pub struct ArtifactRecord {
115 pub tier: TierKind,
117 pub embedder_id: String,
119 pub model_revision: String,
121 pub schema_version: u32,
123 pub chunking_version: u32,
125 pub dimension: usize,
127 pub doc_count: u64,
129 pub conversation_count: u64,
131 pub db_fingerprint: String,
133 pub index_path: String,
135 pub size_bytes: u64,
137 pub started_at_ms: i64,
139 pub completed_at_ms: i64,
141 pub ready: bool,
143}
144
145impl ArtifactRecord {
146 pub fn to_policy_manifest(&self) -> PolicyManifest {
148 PolicyManifest {
149 embedder_id: self.embedder_id.clone(),
150 model_revision: self.model_revision.clone(),
151 schema_version: self.schema_version,
152 chunking_version: self.chunking_version,
153 doc_count: self.doc_count,
154 built_at_ms: self.completed_at_ms,
155 }
156 }
157
158 pub fn readiness(
168 &self,
169 policy: &SemanticPolicy,
170 current_db_fingerprint: &str,
171 current_model_revision: &str,
172 ) -> TierReadiness {
173 let action = self.to_policy_manifest().invalidation_action(
174 policy,
175 current_model_revision,
176 &self.embedder_id,
177 );
178
179 match action {
180 InvalidationAction::UpToDate => {
181 if self.db_fingerprint != current_db_fingerprint {
182 TierReadiness::Stale {
183 reason: "DB content changed since artifact was built".to_owned(),
184 }
185 } else if !self.ready {
186 TierReadiness::Building { progress_pct: 100 }
187 } else {
188 TierReadiness::Ready
189 }
190 }
191 InvalidationAction::RebuildInBackground => TierReadiness::Stale {
192 reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
193 },
194 InvalidationAction::DiscardAndRebuild { reason } => {
195 TierReadiness::Incompatible { reason }
196 }
197 InvalidationAction::Evict => TierReadiness::Incompatible {
198 reason: "semantic mode set to lexical-only".to_owned(),
199 },
200 }
201 }
202}
203
204#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
208pub struct HnswRecord {
209 pub base_tier: TierKind,
211 pub embedder_id: String,
213 pub ef_search: usize,
215 pub index_path: String,
217 pub size_bytes: u64,
219 pub built_at_ms: i64,
221 pub ready: bool,
223}
224
225#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
235pub struct SemanticShardRecord {
236 pub tier: TierKind,
238 pub embedder_id: String,
240 pub model_revision: String,
242 pub schema_version: u32,
244 pub chunking_version: u32,
246 pub dimension: usize,
248 pub shard_index: u32,
250 pub shard_count: u32,
252 pub doc_count: u64,
254 pub total_conversations: u64,
256 pub db_fingerprint: String,
258 pub index_path: String,
260 pub quantization: String,
262 pub mmap_ready: bool,
264 pub ann_index_path: Option<String>,
266 pub ann_size_bytes: u64,
268 pub ann_ready: bool,
270 pub size_bytes: u64,
272 pub started_at_ms: i64,
274 pub completed_at_ms: i64,
276 pub ready: bool,
278}
279
280impl SemanticShardRecord {
281 pub fn to_policy_manifest(&self) -> PolicyManifest {
282 PolicyManifest {
283 embedder_id: self.embedder_id.clone(),
284 model_revision: self.model_revision.clone(),
285 schema_version: self.schema_version,
286 chunking_version: self.chunking_version,
287 doc_count: self.doc_count,
288 built_at_ms: self.completed_at_ms,
289 }
290 }
291
292 pub fn matches_generation(
293 &self,
294 tier: TierKind,
295 embedder_id: &str,
296 db_fingerprint: &str,
297 ) -> bool {
298 self.tier == tier
299 && self.embedder_id == embedder_id
300 && self.db_fingerprint == db_fingerprint
301 }
302}
303
304#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
306pub struct SemanticShardSummary {
307 pub shard_count: u32,
308 pub ready_shards: u32,
309 pub ann_ready_shards: u32,
310 pub doc_count: u64,
311 pub total_conversations: u64,
312 pub size_bytes: u64,
313 pub ann_size_bytes: u64,
314 pub complete: bool,
315}
316
317#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
319pub struct SemanticShardManifest {
320 pub manifest_version: u32,
321 pub shards: Vec<SemanticShardRecord>,
322 pub updated_at_ms: i64,
323}
324
325impl Default for SemanticShardManifest {
326 fn default() -> Self {
327 Self {
328 manifest_version: MANIFEST_FORMAT_VERSION,
329 shards: Vec::new(),
330 updated_at_ms: 0,
331 }
332 }
333}
334
335impl SemanticShardManifest {
336 pub fn path(data_dir: &Path) -> PathBuf {
337 data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
338 }
339
340 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
341 let path = Self::path(data_dir);
342 let bytes = match fs::read(&path) {
343 Ok(bytes) => bytes,
344 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
345 Err(e) => {
346 return Err(ManifestError::Io {
347 path,
348 source: e.to_string(),
349 });
350 }
351 };
352
353 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
354 path: path.clone(),
355 source: e.to_string(),
356 })?;
357
358 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
359 return Err(ManifestError::UnsupportedVersion {
360 found: manifest.manifest_version,
361 max_supported: MANIFEST_FORMAT_VERSION,
362 });
363 }
364
365 Ok(Some(manifest))
366 }
367
368 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
369 match Self::load(data_dir) {
370 Ok(Some(manifest)) => Ok(manifest),
371 Ok(None) => Ok(Self::default()),
372 Err(e @ ManifestError::Io { .. }) => Err(e),
373 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
374 Ok(Self::default())
375 }
376 Err(e) => Err(e),
377 }
378 }
379
380 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
381 let path = Self::path(data_dir);
382 if let Some(parent) = path.parent() {
383 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
384 path: parent.to_path_buf(),
385 source: e.to_string(),
386 })?;
387 }
388
389 self.updated_at_ms = now_ms();
390 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
391 source: e.to_string(),
392 })?;
393 let tmp_path = unique_manifest_temp_path(&path);
394 let mut file = fs::File::create(&tmp_path).map_err(|e| ManifestError::Io {
395 path: tmp_path.clone(),
396 source: e.to_string(),
397 })?;
398 file.write_all(json.as_bytes())
399 .map_err(|e| ManifestError::Io {
400 path: tmp_path.clone(),
401 source: e.to_string(),
402 })?;
403 file.sync_all().map_err(|e| ManifestError::Io {
404 path: tmp_path.clone(),
405 source: e.to_string(),
406 })?;
407 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
408 path: path.clone(),
409 source: e.to_string(),
410 })?;
411 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
412 path: path
413 .parent()
414 .map(Path::to_path_buf)
415 .unwrap_or_else(|| path.clone()),
416 source: e.to_string(),
417 })?;
418
419 Ok(())
420 }
421
422 pub fn replace_shards_for_generation(
423 &mut self,
424 tier: TierKind,
425 embedder_id: &str,
426 db_fingerprint: &str,
427 mut shards: Vec<SemanticShardRecord>,
428 ) {
429 self.shards
430 .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
431 self.shards.append(&mut shards);
432 self.shards.sort_by(|a, b| {
433 (
434 a.tier.as_str(),
435 &a.embedder_id,
436 &a.db_fingerprint,
437 a.shard_index,
438 )
439 .cmp(&(
440 b.tier.as_str(),
441 &b.embedder_id,
442 &b.db_fingerprint,
443 b.shard_index,
444 ))
445 });
446 }
447
448 pub fn summary(
449 &self,
450 tier: TierKind,
451 embedder_id: &str,
452 db_fingerprint: &str,
453 ) -> SemanticShardSummary {
454 let mut summary = SemanticShardSummary::default();
455 let mut ready_indices = std::collections::BTreeSet::new();
456 let mut ann_ready_indices = std::collections::BTreeSet::new();
457 let mut seen_indices = std::collections::BTreeSet::new();
458 let mut seen_index_paths = std::collections::BTreeSet::new();
459 let mut seen_ann_index_paths = std::collections::BTreeSet::new();
460 let mut expected_shard_count = None;
461 let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
462 let mut generation_consistent = true;
463 for shard in self
464 .shards
465 .iter()
466 .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
467 {
468 if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
469 generation_consistent = false;
470 }
471 if !seen_indices.insert(shard.shard_index) {
472 generation_consistent = false;
473 }
474 if !semantic_shard_artifact_path_is_safe(&shard.index_path)
475 || !seen_index_paths.insert(&shard.index_path)
476 {
477 generation_consistent = false;
478 }
479 match expected_shard_count {
480 Some(expected) if expected != shard.shard_count => {
481 generation_consistent = false;
482 }
483 None => expected_shard_count = Some(shard.shard_count),
484 _ => {}
485 }
486 let generation_metadata = (
487 shard.model_revision.as_str(),
488 shard.schema_version,
489 shard.chunking_version,
490 shard.dimension,
491 shard.total_conversations,
492 shard.quantization.as_str(),
493 );
494 match expected_generation_metadata {
495 Some(expected) if expected != generation_metadata => {
496 generation_consistent = false;
497 }
498 None => expected_generation_metadata = Some(generation_metadata),
499 _ => {}
500 }
501 if shard.schema_version != SEMANTIC_SCHEMA_VERSION
502 || shard.chunking_version != CHUNKING_STRATEGY_VERSION
503 || shard.dimension == 0
504 {
505 generation_consistent = false;
506 }
507 summary.shard_count = summary.shard_count.max(shard.shard_count);
508 summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
509 summary.total_conversations =
510 summary.total_conversations.max(shard.total_conversations);
511 summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
512 summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
513 if shard.ready && shard.mmap_ready {
514 ready_indices.insert(shard.shard_index);
515 }
516 if shard.ready
517 && shard.mmap_ready
518 && shard.ann_ready
519 && shard.ann_size_bytes > 0
520 && let Some(ann_index_path) = shard.ann_index_path.as_deref()
521 && semantic_shard_artifact_path_is_safe(ann_index_path)
522 && seen_ann_index_paths.insert(ann_index_path)
523 {
524 ann_ready_indices.insert(shard.shard_index);
525 }
526 }
527 summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
528 summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
529 summary.complete = generation_consistent
530 && summary.shard_count > 0
531 && summary.ready_shards == summary.shard_count
532 && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
533 summary
534 }
535
536 pub fn invalidate_incompatible(
537 &mut self,
538 policy: &SemanticPolicy,
539 current_model_revision: &str,
540 ) -> usize {
541 let before = self.shards.len();
542 self.shards.retain(|shard| {
543 !matches!(
544 shard.to_policy_manifest().invalidation_action(
545 policy,
546 current_model_revision,
547 &shard.embedder_id,
548 ),
549 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
550 )
551 });
552 before.saturating_sub(self.shards.len())
553 }
554
555 pub fn total_size_bytes(&self) -> u64 {
556 self.shards
557 .iter()
558 .map(|shard| shard.size_bytes)
559 .fold(0, u64::saturating_add)
560 }
561}
562
563#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
567pub struct BuildCheckpoint {
568 pub tier: TierKind,
570 pub embedder_id: String,
572 pub last_offset: i64,
574 pub docs_embedded: u64,
576 pub conversations_processed: u64,
578 pub total_conversations: u64,
580 pub db_fingerprint: String,
582 pub schema_version: u32,
584 pub chunking_version: u32,
586 pub saved_at_ms: i64,
588}
589
590impl BuildCheckpoint {
591 pub fn progress_pct(&self) -> u8 {
593 if self.total_conversations == 0 {
594 return 0;
595 }
596 let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
597 (pct as u8).min(100)
598 }
599
600 pub fn is_complete(&self) -> bool {
602 self.conversations_processed >= self.total_conversations
603 }
604
605 pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
607 self.db_fingerprint == current_db_fingerprint
608 && self.schema_version == SEMANTIC_SCHEMA_VERSION
609 && self.chunking_version == CHUNKING_STRATEGY_VERSION
610 }
611}
612
613#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
617pub struct BacklogLedger {
618 pub total_conversations: u64,
620 pub fast_tier_processed: u64,
622 pub quality_tier_processed: u64,
624 pub db_fingerprint: String,
626 pub computed_at_ms: i64,
628}
629
630impl BacklogLedger {
631 pub fn fast_tier_remaining(&self) -> u64 {
633 self.total_conversations
634 .saturating_sub(self.fast_tier_processed)
635 }
636
637 pub fn quality_tier_remaining(&self) -> u64 {
639 self.total_conversations
640 .saturating_sub(self.quality_tier_processed)
641 }
642
643 pub fn has_pending_work(&self) -> bool {
645 self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
646 }
647
648 pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
650 self.db_fingerprint == current_db_fingerprint
651 }
652}
653
654#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
661pub struct SemanticManifest {
662 pub manifest_version: u32,
664 pub fast_tier: Option<ArtifactRecord>,
666 pub quality_tier: Option<ArtifactRecord>,
668 pub hnsw: Option<HnswRecord>,
670 pub backlog: BacklogLedger,
672 pub checkpoint: Option<BuildCheckpoint>,
674 pub updated_at_ms: i64,
676}
677
678impl Default for SemanticManifest {
679 fn default() -> Self {
680 Self {
681 manifest_version: MANIFEST_FORMAT_VERSION,
682 fast_tier: None,
683 quality_tier: None,
684 hnsw: None,
685 backlog: BacklogLedger {
686 total_conversations: 0,
687 fast_tier_processed: 0,
688 quality_tier_processed: 0,
689 db_fingerprint: String::new(),
690 computed_at_ms: 0,
691 },
692 checkpoint: None,
693 updated_at_ms: 0,
694 }
695 }
696}
697
698impl SemanticManifest {
699 pub fn path(data_dir: &Path) -> PathBuf {
703 data_dir.join("vector_index").join(MANIFEST_FILENAME)
704 }
705
706 pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
711 let path = Self::path(data_dir);
712 let bytes = match fs::read(&path) {
713 Ok(b) => b,
714 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
715 Err(e) => {
716 return Err(ManifestError::Io {
717 path,
718 source: e.to_string(),
719 });
720 }
721 };
722
723 let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
724 path: path.clone(),
725 source: e.to_string(),
726 })?;
727
728 if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
730 return Err(ManifestError::UnsupportedVersion {
731 found: manifest.manifest_version,
732 max_supported: MANIFEST_FORMAT_VERSION,
733 });
734 }
735
736 Ok(Some(manifest))
737 }
738
739 pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
746 match Self::load(data_dir) {
747 Ok(Some(manifest)) => Ok(manifest),
748 Ok(None) => Ok(Self::default()),
749 Err(e @ ManifestError::Io { .. }) => Err(e),
751 Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
753 Ok(Self::default())
754 }
755 Err(e) => Err(e),
756 }
757 }
758
759 pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
761 let path = Self::path(data_dir);
762
763 if let Some(parent) = path.parent() {
765 fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
766 path: parent.to_path_buf(),
767 source: e.to_string(),
768 })?;
769 }
770
771 self.updated_at_ms = now_ms();
772
773 let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
774 source: e.to_string(),
775 })?;
776
777 let tmp_path = unique_manifest_temp_path(&path);
779 let mut file = fs::File::create(&tmp_path).map_err(|e| ManifestError::Io {
780 path: tmp_path.clone(),
781 source: e.to_string(),
782 })?;
783 file.write_all(json.as_bytes())
784 .map_err(|e| ManifestError::Io {
785 path: tmp_path.clone(),
786 source: e.to_string(),
787 })?;
788 file.sync_all().map_err(|e| ManifestError::Io {
789 path: tmp_path.clone(),
790 source: e.to_string(),
791 })?;
792 replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
793 path: path.clone(),
794 source: e.to_string(),
795 })?;
796 sync_parent_directory(&path).map_err(|e| ManifestError::Io {
797 path: path
798 .parent()
799 .map(Path::to_path_buf)
800 .unwrap_or_else(|| path.clone()),
801 source: e.to_string(),
802 })?;
803
804 Ok(())
805 }
806
807 pub fn fast_tier_readiness(
811 &self,
812 policy: &SemanticPolicy,
813 current_db_fingerprint: &str,
814 current_model_revision: &str,
815 ) -> TierReadiness {
816 match &self.fast_tier {
817 Some(artifact) => {
818 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
819 }
820 None => {
821 if let Some(cp) = &self.checkpoint
823 && cp.tier == TierKind::Fast
824 && cp.is_valid(current_db_fingerprint)
825 {
826 TierReadiness::Building {
827 progress_pct: cp.progress_pct(),
828 }
829 } else {
830 TierReadiness::Missing
831 }
832 }
833 }
834 }
835
836 pub fn quality_tier_readiness(
838 &self,
839 policy: &SemanticPolicy,
840 current_db_fingerprint: &str,
841 current_model_revision: &str,
842 ) -> TierReadiness {
843 match &self.quality_tier {
844 Some(artifact) => {
845 artifact.readiness(policy, current_db_fingerprint, current_model_revision)
846 }
847 None => {
848 if let Some(cp) = &self.checkpoint
849 && cp.tier == TierKind::Quality
850 && cp.is_valid(current_db_fingerprint)
851 {
852 TierReadiness::Building {
853 progress_pct: cp.progress_pct(),
854 }
855 } else {
856 TierReadiness::Missing
857 }
858 }
859 }
860 }
861
862 pub fn can_hybrid_search(
864 &self,
865 policy: &SemanticPolicy,
866 current_db_fingerprint: &str,
867 current_model_revision: &str,
868 ) -> bool {
869 self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
870 .is_usable()
871 }
872
873 pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
877 let fast_processed = self
878 .fast_tier
879 .as_ref()
880 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
881 .map_or(0, |a| a.conversation_count);
882 let quality_processed = self
883 .quality_tier
884 .as_ref()
885 .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
886 .map_or(0, |a| a.conversation_count);
887
888 self.backlog = BacklogLedger {
889 total_conversations,
890 fast_tier_processed: fast_processed,
891 quality_tier_processed: quality_processed,
892 db_fingerprint: current_db_fingerprint.to_owned(),
893 computed_at_ms: now_ms(),
894 };
895 }
896
897 pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
899 self.checkpoint = Some(checkpoint);
900 }
901
902 pub fn clear_checkpoint(&mut self) {
904 self.checkpoint = None;
905 }
906
907 pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
909 if self
911 .checkpoint
912 .as_ref()
913 .is_some_and(|cp| cp.tier == artifact.tier)
914 {
915 self.checkpoint = None;
916 }
917
918 match artifact.tier {
919 TierKind::Fast => self.fast_tier = Some(artifact),
920 TierKind::Quality => self.quality_tier = Some(artifact),
921 }
922 }
923
924 pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
926 self.hnsw = Some(hnsw);
927 }
928
929 #[allow(clippy::too_many_arguments)]
932 pub fn adopt_legacy_artifact(
933 &mut self,
934 tier: TierKind,
935 embedder_id: &str,
936 model_revision: &str,
937 dimension: usize,
938 doc_count: u64,
939 conversation_count: u64,
940 db_fingerprint: &str,
941 index_path: &str,
942 size_bytes: u64,
943 ) -> bool {
944 let record = ArtifactRecord {
945 tier,
946 embedder_id: embedder_id.to_owned(),
947 model_revision: model_revision.to_owned(),
948 schema_version: SEMANTIC_SCHEMA_VERSION,
949 chunking_version: CHUNKING_STRATEGY_VERSION,
950 dimension,
951 doc_count,
952 conversation_count,
953 db_fingerprint: db_fingerprint.to_owned(),
954 index_path: index_path.to_owned(),
955 size_bytes,
956 started_at_ms: 0,
957 completed_at_ms: now_ms(),
958 ready: true,
959 };
960
961 match tier {
962 TierKind::Fast => self.fast_tier = Some(record),
963 TierKind::Quality => self.quality_tier = Some(record),
964 }
965 true
966 }
967
968 pub fn invalidate_incompatible(
978 &mut self,
979 policy: &SemanticPolicy,
980 current_model_revision: &str,
981 ) -> usize {
982 let mut count = 0;
983
984 if let Some(ref artifact) = self.fast_tier {
985 let pm = artifact.to_policy_manifest();
986 if matches!(
987 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
988 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
989 ) {
990 self.fast_tier = None;
991 count += 1;
992 }
993 }
994
995 if let Some(ref artifact) = self.quality_tier {
996 let pm = artifact.to_policy_manifest();
997 if matches!(
998 pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
999 InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1000 ) {
1001 self.quality_tier = None;
1002 count += 1;
1003 }
1004 }
1005
1006 if let Some(ref hnsw) = self.hnsw {
1008 let base_gone = match hnsw.base_tier {
1009 TierKind::Fast => self.fast_tier.is_none(),
1010 TierKind::Quality => self.quality_tier.is_none(),
1011 };
1012 if base_gone {
1013 self.hnsw = None;
1014 count += 1;
1015 }
1016 }
1017
1018 if let Some(ref cp) = self.checkpoint
1020 && (cp.schema_version != policy.semantic_schema_version
1021 || cp.chunking_version != policy.chunking_strategy_version)
1022 {
1023 self.checkpoint = None;
1024 }
1025
1026 count
1027 }
1028
1029 pub fn total_size_bytes(&self) -> u64 {
1031 let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1032 let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1033 let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1034 fast + quality + hnsw
1035 }
1036
1037 pub fn total_size_mb(&self) -> u64 {
1039 self.total_size_bytes().div_ceil(1_048_576)
1040 }
1041}
1042
1043#[derive(Debug)]
1046pub enum ManifestError {
1047 Io { path: PathBuf, source: String },
1048 Parse { path: PathBuf, source: String },
1049 Serialize { source: String },
1050 UnsupportedVersion { found: u32, max_supported: u32 },
1051}
1052
1053impl std::fmt::Display for ManifestError {
1054 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1055 match self {
1056 Self::Io { path, source } => {
1057 write!(f, "manifest I/O error at {}: {source}", path.display())
1058 }
1059 Self::Parse { path, source } => {
1060 write!(f, "manifest parse error at {}: {source}", path.display())
1061 }
1062 Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1063 Self::UnsupportedVersion {
1064 found,
1065 max_supported,
1066 } => write!(
1067 f,
1068 "manifest version {found} is newer than supported version {max_supported}"
1069 ),
1070 }
1071 }
1072}
1073
1074impl std::error::Error for ManifestError {}
1075
1076fn now_ms() -> i64 {
1079 SystemTime::now()
1080 .duration_since(UNIX_EPOCH)
1081 .map(|d| d.as_millis() as i64)
1082 .unwrap_or(0)
1083}
1084
1085fn unique_manifest_temp_path(path: &Path) -> PathBuf {
1086 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1087
1088 let file_name = path
1089 .file_name()
1090 .and_then(|name| name.to_str())
1091 .unwrap_or(MANIFEST_FILENAME);
1092 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1093 path.with_file_name(format!(
1094 ".{file_name}.tmp.{}.{}.{}",
1095 std::process::id(),
1096 now_ms(),
1097 nonce
1098 ))
1099}
1100
1101#[cfg(windows)]
1102fn unique_manifest_backup_path(path: &Path) -> PathBuf {
1103 static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1104
1105 let file_name = path
1106 .file_name()
1107 .and_then(|name| name.to_str())
1108 .unwrap_or(MANIFEST_FILENAME);
1109 let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1110 path.with_file_name(format!(
1111 ".{file_name}.bak.{}.{}.{}",
1112 std::process::id(),
1113 now_ms(),
1114 nonce
1115 ))
1116}
1117
1118fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1119 #[cfg(windows)]
1120 {
1121 match fs::rename(temp_path, final_path) {
1122 Ok(()) => sync_parent_directory(final_path),
1123 Err(first_err)
1124 if final_path.exists()
1125 && matches!(
1126 first_err.kind(),
1127 std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1128 ) =>
1129 {
1130 let backup_path = unique_manifest_backup_path(final_path);
1131 fs::rename(final_path, &backup_path).map_err(|backup_err| {
1132 let _ = fs::remove_file(temp_path);
1133 std::io::Error::other(format!(
1134 "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1135 backup_path.display(),
1136 final_path.display(),
1137 first_err,
1138 backup_err
1139 ))
1140 })?;
1141 match fs::rename(temp_path, final_path) {
1142 Ok(()) => {
1143 let _ = fs::remove_file(&backup_path);
1144 sync_parent_directory(final_path)
1145 }
1146 Err(second_err) => match fs::rename(&backup_path, final_path) {
1147 Ok(()) => {
1148 let _ = fs::remove_file(temp_path);
1149 sync_parent_directory(final_path)?;
1150 Err(std::io::Error::other(format!(
1151 "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1152 final_path.display(),
1153 temp_path.display(),
1154 first_err,
1155 second_err
1156 )))
1157 }
1158 Err(restore_err) => Err(std::io::Error::other(format!(
1159 "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1160 final_path.display(),
1161 temp_path.display(),
1162 first_err,
1163 second_err,
1164 restore_err,
1165 temp_path.display()
1166 ))),
1167 },
1168 }
1169 }
1170 Err(err) => Err(err),
1171 }
1172 }
1173
1174 #[cfg(not(windows))]
1175 {
1176 fs::rename(temp_path, final_path)
1177 }
1178}
1179
1180#[cfg(not(windows))]
1181fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1182 let Some(parent) = path.parent() else {
1183 return Ok(());
1184 };
1185 let directory = fs::File::open(parent)?;
1186 directory.sync_all()
1187}
1188
1189#[cfg(windows)]
1190fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1191 Ok(())
1192}
1193
1194#[cfg(test)]
1197mod tests {
1198 use super::*;
1199 use crate::search::policy::SemanticPolicy;
1200
1201 fn test_policy() -> SemanticPolicy {
1202 SemanticPolicy::compiled_defaults()
1203 }
1204
1205 fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1206 ArtifactRecord {
1207 tier,
1208 embedder_id: match tier {
1209 TierKind::Fast => "fnv1a-384".to_owned(),
1210 TierKind::Quality => "minilm-384".to_owned(),
1211 },
1212 model_revision: "abc123".to_owned(),
1213 schema_version: SEMANTIC_SCHEMA_VERSION,
1214 chunking_version: CHUNKING_STRATEGY_VERSION,
1215 dimension: 384,
1216 doc_count: 1000,
1217 conversation_count: 250,
1218 db_fingerprint: "fp-1234".to_owned(),
1219 index_path: format!(
1220 "vector_index/index-{}.fsvi",
1221 match tier {
1222 TierKind::Fast => "fnv1a-384",
1223 TierKind::Quality => "minilm-384",
1224 }
1225 ),
1226 size_bytes: 150_000,
1227 started_at_ms: 1_700_000_000_000,
1228 completed_at_ms: 1_700_000_060_000,
1229 ready,
1230 }
1231 }
1232
1233 fn test_hnsw() -> HnswRecord {
1234 HnswRecord {
1235 base_tier: TierKind::Quality,
1236 embedder_id: "minilm-384".to_owned(),
1237 ef_search: 128,
1238 index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1239 size_bytes: 50_000,
1240 built_at_ms: 1_700_000_070_000,
1241 ready: true,
1242 }
1243 }
1244
1245 fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1246 SemanticShardRecord {
1247 tier: TierKind::Fast,
1248 embedder_id: "fnv1a-384".to_owned(),
1249 model_revision: "hash".to_owned(),
1250 schema_version: SEMANTIC_SCHEMA_VERSION,
1251 chunking_version: CHUNKING_STRATEGY_VERSION,
1252 dimension: 384,
1253 shard_index,
1254 shard_count,
1255 doc_count: 25,
1256 total_conversations: 10,
1257 db_fingerprint: "fp-sharded".to_owned(),
1258 index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1259 quantization: "f16".to_owned(),
1260 mmap_ready: true,
1261 ann_index_path: None,
1262 ann_size_bytes: 0,
1263 ann_ready: false,
1264 size_bytes: 4096,
1265 started_at_ms: 1_700_000_080_000,
1266 completed_at_ms: 1_700_000_081_000,
1267 ready,
1268 }
1269 }
1270
1271 fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1272 BuildCheckpoint {
1273 tier,
1274 embedder_id: "minilm-384".to_owned(),
1275 last_offset: 500,
1276 docs_embedded: 3000,
1277 conversations_processed: 500,
1278 total_conversations: 1000,
1279 db_fingerprint: "fp-1234".to_owned(),
1280 schema_version: SEMANTIC_SCHEMA_VERSION,
1281 chunking_version: CHUNKING_STRATEGY_VERSION,
1282 saved_at_ms: 1_700_000_030_000,
1283 }
1284 }
1285
1286 #[derive(Debug, Clone, Copy)]
1287 enum ExpectedTierReadiness {
1288 Ready,
1289 Stale,
1290 Incompatible,
1291 Building(u8),
1292 }
1293
1294 fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1295
1296 type TierReadinessCase = (
1297 &'static str,
1298 TierKind,
1299 bool,
1300 &'static str,
1301 &'static str,
1302 fn(&mut ArtifactRecord),
1303 ExpectedTierReadiness,
1304 );
1305
1306 fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1307 artifact.schema_version = 0;
1308 }
1309
1310 fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1311 match expected {
1312 ExpectedTierReadiness::Ready => {
1313 assert_eq!(actual, TierReadiness::Ready, "{label}");
1314 }
1315 ExpectedTierReadiness::Stale => {
1316 assert!(
1317 matches!(actual, TierReadiness::Stale { .. }),
1318 "{label}: {actual:?}"
1319 );
1320 }
1321 ExpectedTierReadiness::Incompatible => {
1322 assert!(
1323 matches!(actual, TierReadiness::Incompatible { .. }),
1324 "{label}: {actual:?}"
1325 );
1326 }
1327 ExpectedTierReadiness::Building(progress_pct) => {
1328 assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1329 }
1330 }
1331 }
1332
1333 #[test]
1336 fn manifest_round_trip_via_disk() {
1337 let temp = tempfile::tempdir().unwrap();
1338 let mut manifest = SemanticManifest {
1339 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1340 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1341 hnsw: Some(test_hnsw()),
1342 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1343 backlog: BacklogLedger {
1344 total_conversations: 2000,
1345 fast_tier_processed: 1000,
1346 quality_tier_processed: 500,
1347 db_fingerprint: "fp-1234".to_owned(),
1348 computed_at_ms: 1_700_000_000_000,
1349 },
1350 ..Default::default()
1351 };
1352
1353 manifest.save(temp.path()).unwrap();
1354 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1355
1356 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1357 assert!(loaded.fast_tier.is_some());
1358 assert!(loaded.quality_tier.is_some());
1359 assert!(loaded.hnsw.is_some());
1360 assert!(loaded.checkpoint.is_some());
1361 assert_eq!(loaded.backlog.total_conversations, 2000);
1362 assert!(loaded.updated_at_ms > 0);
1363 }
1364
1365 #[test]
1366 fn manifest_save_overwrites_existing_file() {
1367 let temp = tempfile::tempdir().unwrap();
1368 let mut first = SemanticManifest {
1369 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1370 ..Default::default()
1371 };
1372 first.save(temp.path()).unwrap();
1373
1374 let mut second = SemanticManifest {
1375 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1376 backlog: BacklogLedger {
1377 total_conversations: 99,
1378 fast_tier_processed: 0,
1379 quality_tier_processed: 99,
1380 db_fingerprint: "fp-overwrite".to_owned(),
1381 computed_at_ms: 1_700_000_000_123,
1382 },
1383 ..Default::default()
1384 };
1385 second.save(temp.path()).unwrap();
1386
1387 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1388 assert!(loaded.fast_tier.is_none());
1389 assert!(loaded.quality_tier.is_some());
1390 assert_eq!(loaded.backlog.total_conversations, 99);
1391 }
1392
1393 #[test]
1394 fn manifest_load_missing_returns_none() {
1395 let temp = tempfile::tempdir().unwrap();
1396 let loaded = SemanticManifest::load(temp.path()).unwrap();
1397 assert!(loaded.is_none());
1398 }
1399
1400 #[test]
1401 fn manifest_load_or_default_returns_defaults() {
1402 let temp = tempfile::tempdir().unwrap();
1403 let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1404 assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1405 assert!(manifest.fast_tier.is_none());
1406 assert!(manifest.quality_tier.is_none());
1407 }
1408
1409 #[test]
1410 fn manifest_load_corrupt_returns_parse_error() {
1411 let temp = tempfile::tempdir().unwrap();
1412 let path = SemanticManifest::path(temp.path());
1413 fs::create_dir_all(path.parent().unwrap()).unwrap();
1414 fs::write(&path, b"not json").unwrap();
1415
1416 let result = SemanticManifest::load(temp.path());
1417 assert!(matches!(result, Err(ManifestError::Parse { .. })));
1418 }
1419
1420 #[test]
1421 fn manifest_load_future_version_returns_error() {
1422 let temp = tempfile::tempdir().unwrap();
1423 let path = SemanticManifest::path(temp.path());
1424 fs::create_dir_all(path.parent().unwrap()).unwrap();
1425
1426 let manifest = SemanticManifest {
1427 manifest_version: MANIFEST_FORMAT_VERSION + 1,
1428 ..Default::default()
1429 };
1430 let json = serde_json::to_string(&manifest).unwrap();
1431 fs::write(&path, json).unwrap();
1432
1433 let result = SemanticManifest::load(temp.path());
1434 assert!(matches!(
1435 result,
1436 Err(ManifestError::UnsupportedVersion { .. })
1437 ));
1438 }
1439
1440 #[test]
1443 fn tier_readiness_cases() {
1444 let policy = test_policy();
1445 let db_fp = "fp-1234";
1446 let model_rev = "abc123";
1447 let cases: &[TierReadinessCase] = &[
1448 (
1449 "ready artifact with matching fingerprint",
1450 TierKind::Fast,
1451 true,
1452 db_fp,
1453 model_rev,
1454 no_artifact_mutation,
1455 ExpectedTierReadiness::Ready,
1456 ),
1457 (
1458 "ready artifact with changed DB fingerprint",
1459 TierKind::Fast,
1460 true,
1461 "different-fp",
1462 model_rev,
1463 no_artifact_mutation,
1464 ExpectedTierReadiness::Stale,
1465 ),
1466 (
1467 "ready artifact with changed model revision",
1468 TierKind::Quality,
1469 true,
1470 db_fp,
1471 "new-revision",
1472 no_artifact_mutation,
1473 ExpectedTierReadiness::Stale,
1474 ),
1475 (
1476 "schema version mismatch",
1477 TierKind::Quality,
1478 true,
1479 db_fp,
1480 model_rev,
1481 set_schema_version_to_zero,
1482 ExpectedTierReadiness::Incompatible,
1483 ),
1484 (
1485 "not yet published artifact",
1486 TierKind::Fast,
1487 false,
1488 db_fp,
1489 model_rev,
1490 no_artifact_mutation,
1491 ExpectedTierReadiness::Building(100),
1492 ),
1493 ];
1494
1495 for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1496 let mut artifact = test_artifact(*tier, *ready);
1497 mutate(&mut artifact);
1498 assert_tier_readiness(
1499 artifact.readiness(&policy, current_db_fp, current_model_rev),
1500 *expected,
1501 label,
1502 );
1503 }
1504 }
1505
1506 #[test]
1509 fn manifest_tier_readiness_missing() {
1510 let manifest = SemanticManifest::default();
1511 let policy = test_policy();
1512 assert_eq!(
1513 manifest.fast_tier_readiness(&policy, "fp", "rev"),
1514 TierReadiness::Missing,
1515 );
1516 assert_eq!(
1517 manifest.quality_tier_readiness(&policy, "fp", "rev"),
1518 TierReadiness::Missing,
1519 );
1520 }
1521
1522 #[test]
1523 fn manifest_tier_readiness_with_checkpoint() {
1524 let manifest = SemanticManifest {
1525 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1526 ..Default::default()
1527 };
1528
1529 let policy = test_policy();
1530 assert_eq!(
1532 manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1533 TierReadiness::Missing,
1534 );
1535 assert!(matches!(
1537 manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1538 TierReadiness::Building { progress_pct: 50 },
1539 ));
1540 }
1541
1542 #[test]
1543 fn manifest_tier_readiness_checkpoint_invalid_db() {
1544 let manifest = SemanticManifest {
1545 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1546 ..Default::default()
1547 };
1548
1549 let policy = test_policy();
1550 assert_eq!(
1552 manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1553 TierReadiness::Missing,
1554 );
1555 }
1556
1557 #[test]
1560 fn can_hybrid_search_requires_usable_fast_tier() {
1561 let policy = test_policy();
1562 let db_fp = "fp-1234";
1563 let rev = "abc123";
1564
1565 let manifest = SemanticManifest::default();
1567 assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1568
1569 let manifest = SemanticManifest {
1571 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1572 ..Default::default()
1573 };
1574 assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1575 }
1576
1577 #[test]
1580 fn backlog_remaining_and_pending() {
1581 let ledger = BacklogLedger {
1582 total_conversations: 1000,
1583 fast_tier_processed: 800,
1584 quality_tier_processed: 300,
1585 db_fingerprint: "fp".to_owned(),
1586 computed_at_ms: 0,
1587 };
1588
1589 assert_eq!(ledger.fast_tier_remaining(), 200);
1590 assert_eq!(ledger.quality_tier_remaining(), 700);
1591 assert!(ledger.has_pending_work());
1592 assert!(ledger.is_current("fp"));
1593 assert!(!ledger.is_current("other"));
1594 }
1595
1596 #[test]
1597 fn backlog_no_pending_when_fully_processed() {
1598 let ledger = BacklogLedger {
1599 total_conversations: 500,
1600 fast_tier_processed: 500,
1601 quality_tier_processed: 500,
1602 db_fingerprint: "fp".to_owned(),
1603 computed_at_ms: 0,
1604 };
1605
1606 assert_eq!(ledger.fast_tier_remaining(), 0);
1607 assert_eq!(ledger.quality_tier_remaining(), 0);
1608 assert!(!ledger.has_pending_work());
1609 }
1610
1611 #[test]
1614 fn checkpoint_progress_and_completion() {
1615 let cp = test_checkpoint(TierKind::Quality);
1616 assert_eq!(cp.progress_pct(), 50);
1617 assert!(!cp.is_complete());
1618 assert!(cp.is_valid("fp-1234"));
1619 assert!(!cp.is_valid("other-fp"));
1620
1621 let mut cp = test_checkpoint(TierKind::Quality);
1623 cp.conversations_processed = 1000;
1624 assert_eq!(cp.progress_pct(), 100);
1625 assert!(cp.is_complete());
1626 }
1627
1628 #[test]
1629 fn checkpoint_zero_total_gives_zero_pct() {
1630 let mut cp = test_checkpoint(TierKind::Fast);
1631 cp.total_conversations = 0;
1632 cp.conversations_processed = 0;
1633 assert_eq!(cp.progress_pct(), 0);
1634 }
1635
1636 #[test]
1639 fn publish_artifact_clears_matching_checkpoint() {
1640 let mut manifest = SemanticManifest {
1641 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1642 ..Default::default()
1643 };
1644
1645 manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1646 assert!(manifest.checkpoint.is_none());
1647 assert!(manifest.quality_tier.is_some());
1648 }
1649
1650 #[test]
1651 fn publish_artifact_keeps_non_matching_checkpoint() {
1652 let mut manifest = SemanticManifest {
1653 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1654 ..Default::default()
1655 };
1656
1657 manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1658 assert!(manifest.checkpoint.is_some()); assert!(manifest.fast_tier.is_some());
1660 }
1661
1662 #[test]
1665 fn refresh_backlog_computes_from_ready_artifacts() {
1666 let mut manifest = SemanticManifest {
1667 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1668 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1669 ..Default::default()
1670 };
1671
1672 manifest.refresh_backlog(2000, "fp-1234");
1673 assert_eq!(manifest.backlog.total_conversations, 2000);
1674 assert_eq!(manifest.backlog.fast_tier_processed, 250);
1675 assert_eq!(manifest.backlog.quality_tier_processed, 250);
1676 }
1677
1678 #[test]
1679 fn refresh_backlog_ignores_stale_artifacts() {
1680 let mut manifest = SemanticManifest {
1681 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1682 ..Default::default()
1683 };
1684
1685 manifest.refresh_backlog(2000, "different-fp");
1687 assert_eq!(manifest.backlog.fast_tier_processed, 0);
1688 }
1689
1690 #[test]
1693 fn invalidate_incompatible_removes_schema_mismatch() {
1694 let mut artifact = test_artifact(TierKind::Quality, true);
1695 artifact.schema_version = 0; let mut manifest = SemanticManifest {
1697 quality_tier: Some(artifact),
1698 hnsw: Some(test_hnsw()), ..Default::default()
1700 };
1701
1702 let policy = test_policy();
1703 let count = manifest.invalidate_incompatible(&policy, "abc123");
1704
1705 assert_eq!(count, 2); assert!(manifest.quality_tier.is_none());
1707 assert!(manifest.hnsw.is_none());
1708 }
1709
1710 #[test]
1711 fn invalidate_incompatible_keeps_compatible() {
1712 let mut manifest = SemanticManifest {
1713 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1714 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1715 ..Default::default()
1716 };
1717
1718 let policy = test_policy();
1719 let count = manifest.invalidate_incompatible(&policy, "abc123");
1720
1721 assert_eq!(count, 0);
1722 assert!(manifest.fast_tier.is_some());
1723 assert!(manifest.quality_tier.is_some());
1724 }
1725
1726 #[test]
1729 fn adopt_legacy_artifact() {
1730 let mut manifest = SemanticManifest::default();
1731 let doc_count = 500;
1732 let conversation_count = 125;
1733 let index_path = "vector_index/index-fnv1a-384.fsvi";
1734 let db_fingerprint = "fp-old";
1735 let size_bytes = 75_000;
1736 let adopted = manifest.adopt_legacy_artifact(
1737 TierKind::Fast,
1738 "fnv1a-384",
1739 "hash",
1740 384,
1741 doc_count,
1742 conversation_count,
1743 db_fingerprint,
1744 index_path,
1745 size_bytes,
1746 );
1747
1748 assert!(adopted);
1749 let fast = manifest.fast_tier.as_ref().unwrap();
1750 assert_eq!(fast.embedder_id, "fnv1a-384");
1751 assert!(fast.ready);
1752 assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1753 }
1754
1755 #[test]
1758 fn total_size_accounts_for_all_artifacts() {
1759 let manifest = SemanticManifest {
1760 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1761 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1762 hnsw: Some(test_hnsw()),
1763 ..Default::default()
1764 };
1765
1766 assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1767 assert_eq!(manifest.total_size_mb(), 1); }
1769
1770 #[test]
1771 fn total_size_empty_is_zero() {
1772 let manifest = SemanticManifest::default();
1773 assert_eq!(manifest.total_size_bytes(), 0);
1774 assert_eq!(manifest.total_size_mb(), 0);
1775 }
1776
1777 #[test]
1780 fn manifest_json_round_trip() {
1781 let manifest = SemanticManifest {
1782 fast_tier: Some(test_artifact(TierKind::Fast, true)),
1783 quality_tier: Some(test_artifact(TierKind::Quality, true)),
1784 hnsw: Some(test_hnsw()),
1785 checkpoint: Some(test_checkpoint(TierKind::Quality)),
1786 ..Default::default()
1787 };
1788
1789 let json = serde_json::to_string_pretty(&manifest).unwrap();
1790 let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
1791 assert_eq!(deser.fast_tier, manifest.fast_tier);
1792 assert_eq!(deser.quality_tier, manifest.quality_tier);
1793 assert_eq!(deser.hnsw, manifest.hnsw);
1794 assert_eq!(deser.checkpoint, manifest.checkpoint);
1795 }
1796
1797 #[test]
1800 fn shard_manifest_round_trip_via_sidecar() {
1801 let temp = tempfile::tempdir().unwrap();
1802 let mut shards = SemanticShardManifest::default();
1803 shards.replace_shards_for_generation(
1804 TierKind::Fast,
1805 "fnv1a-384",
1806 "fp-sharded",
1807 vec![test_shard(1, 2, true), test_shard(0, 2, true)],
1808 );
1809
1810 shards.save(temp.path()).unwrap();
1811 let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
1812
1813 assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1814 assert_eq!(loaded.shards.len(), 2);
1815 assert_eq!(loaded.shards[0].shard_index, 0);
1816 assert_eq!(loaded.shards[1].shard_index, 1);
1817 assert!(loaded.updated_at_ms > 0);
1818 }
1819
1820 #[test]
1821 fn shard_summary_requires_every_ready_shard() {
1822 let mut shards = SemanticShardManifest::default();
1823 shards.replace_shards_for_generation(
1824 TierKind::Fast,
1825 "fnv1a-384",
1826 "fp-sharded",
1827 vec![test_shard(0, 3, true), test_shard(2, 3, true)],
1828 );
1829
1830 let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1831 assert_eq!(partial.shard_count, 3);
1832 assert_eq!(partial.ready_shards, 2);
1833 assert!(!partial.complete);
1834
1835 shards.replace_shards_for_generation(
1836 TierKind::Fast,
1837 "fnv1a-384",
1838 "fp-sharded",
1839 vec![
1840 test_shard(0, 3, true),
1841 test_shard(1, 3, true),
1842 test_shard(2, 3, true),
1843 ],
1844 );
1845
1846 let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1847 assert_eq!(complete.ready_shards, 3);
1848 assert!(complete.complete);
1849 assert_eq!(complete.doc_count, 75);
1850 assert_eq!(complete.total_conversations, 10);
1851 }
1852
1853 #[test]
1854 fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
1855 let mut non_mmap = test_shard(0, 1, true);
1856 non_mmap.mmap_ready = false;
1857 let mut shards = SemanticShardManifest::default();
1858 shards.replace_shards_for_generation(
1859 TierKind::Fast,
1860 "fnv1a-384",
1861 "fp-sharded",
1862 vec![non_mmap],
1863 );
1864
1865 let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1866 assert_eq!(non_mmap_summary.ready_shards, 0);
1867 assert!(!non_mmap_summary.complete);
1868
1869 let mut inconsistent = test_shard(1, 3, true);
1870 inconsistent.ann_ready = true;
1871 inconsistent.ann_index_path = None;
1872 inconsistent.ann_size_bytes = 4096;
1873 shards.replace_shards_for_generation(
1874 TierKind::Fast,
1875 "fnv1a-384",
1876 "fp-sharded",
1877 vec![test_shard(0, 2, true), inconsistent],
1878 );
1879
1880 let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1881 assert_eq!(inconsistent_summary.shard_count, 3);
1882 assert_eq!(inconsistent_summary.ready_shards, 2);
1883 assert_eq!(inconsistent_summary.ann_ready_shards, 0);
1884 assert!(!inconsistent_summary.complete);
1885
1886 shards.replace_shards_for_generation(
1887 TierKind::Fast,
1888 "fnv1a-384",
1889 "fp-sharded",
1890 vec![
1891 test_shard(0, 2, true),
1892 test_shard(1, 2, true),
1893 test_shard(1, 2, false),
1894 ],
1895 );
1896 let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1897 assert_eq!(duplicate_summary.shard_count, 2);
1898 assert_eq!(duplicate_summary.ready_shards, 2);
1899 assert!(
1900 !duplicate_summary.complete,
1901 "duplicate shard indexes must not summarize as a complete generation"
1902 );
1903
1904 let mut duplicate_path = test_shard(1, 2, true);
1905 duplicate_path.index_path = test_shard(0, 2, true).index_path;
1906 shards.replace_shards_for_generation(
1907 TierKind::Fast,
1908 "fnv1a-384",
1909 "fp-sharded",
1910 vec![test_shard(0, 2, true), duplicate_path],
1911 );
1912 let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1913 assert_eq!(duplicate_path_summary.shard_count, 2);
1914 assert_eq!(duplicate_path_summary.ready_shards, 2);
1915 assert!(
1916 !duplicate_path_summary.complete,
1917 "duplicate shard index paths must not summarize as a complete generation"
1918 );
1919
1920 let mut blank_path = test_shard(0, 1, true);
1921 blank_path.index_path.clear();
1922 shards.replace_shards_for_generation(
1923 TierKind::Fast,
1924 "fnv1a-384",
1925 "fp-sharded",
1926 vec![blank_path],
1927 );
1928 let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1929 assert_eq!(blank_path_summary.shard_count, 1);
1930 assert_eq!(blank_path_summary.ready_shards, 1);
1931 assert!(
1932 !blank_path_summary.complete,
1933 "blank shard index paths must not summarize as complete"
1934 );
1935
1936 for unsafe_path in [
1937 tempfile::tempdir()
1938 .unwrap()
1939 .path()
1940 .join("outside.fsvi")
1941 .to_string_lossy()
1942 .to_string(),
1943 "vector_index/shards/../outside.fsvi".to_string(),
1944 "./vector_index/shards/fast/hash.fsvi".to_string(),
1945 " vector_index/shards/fast/hash.fsvi".to_string(),
1946 ] {
1947 let mut unsafe_shard = test_shard(0, 1, true);
1948 unsafe_shard.index_path = unsafe_path;
1949 shards.replace_shards_for_generation(
1950 TierKind::Fast,
1951 "fnv1a-384",
1952 "fp-sharded",
1953 vec![unsafe_shard],
1954 );
1955 let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1956 assert_eq!(unsafe_path_summary.shard_count, 1);
1957 assert_eq!(unsafe_path_summary.ready_shards, 1);
1958 assert!(
1959 !unsafe_path_summary.complete,
1960 "unsafe shard index paths must not summarize as complete"
1961 );
1962 }
1963
1964 let outside_ann_dir = tempfile::tempdir().unwrap();
1965 for unsafe_ann_path in [
1966 outside_ann_dir
1967 .path()
1968 .join("outside.chsw")
1969 .to_string_lossy()
1970 .to_string(),
1971 "vector_index/shards/../outside.chsw".to_string(),
1972 "./vector_index/shards/fast/hash.chsw".to_string(),
1973 " vector_index/shards/fast/hash.chsw".to_string(),
1974 ] {
1975 let mut unsafe_ann = test_shard(0, 1, true);
1976 unsafe_ann.ann_ready = true;
1977 unsafe_ann.ann_index_path = Some(unsafe_ann_path);
1978 unsafe_ann.ann_size_bytes = 4096;
1979 shards.replace_shards_for_generation(
1980 TierKind::Fast,
1981 "fnv1a-384",
1982 "fp-sharded",
1983 vec![unsafe_ann],
1984 );
1985 let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1986 assert_eq!(unsafe_ann_summary.shard_count, 1);
1987 assert_eq!(unsafe_ann_summary.ready_shards, 1);
1988 assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
1989 assert!(
1990 unsafe_ann_summary.complete,
1991 "unsafe optional ANN paths must not invalidate the vector shard generation"
1992 );
1993 }
1994
1995 let mut duplicate_ann_path = test_shard(1, 2, true);
1996 duplicate_ann_path.ann_ready = true;
1997 duplicate_ann_path.ann_index_path =
1998 Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
1999 duplicate_ann_path.ann_size_bytes = 4096;
2000 let mut first_ann_path = test_shard(0, 2, true);
2001 first_ann_path.ann_ready = true;
2002 first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2003 first_ann_path.ann_size_bytes = 4096;
2004 shards.replace_shards_for_generation(
2005 TierKind::Fast,
2006 "fnv1a-384",
2007 "fp-sharded",
2008 vec![first_ann_path, duplicate_ann_path],
2009 );
2010 let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2011 assert_eq!(duplicate_ann_summary.shard_count, 2);
2012 assert_eq!(duplicate_ann_summary.ready_shards, 2);
2013 assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2014 assert!(
2015 duplicate_ann_summary.complete,
2016 "duplicate optional ANN paths must not invalidate the vector shard generation"
2017 );
2018
2019 shards.replace_shards_for_generation(
2020 TierKind::Fast,
2021 "fnv1a-384",
2022 "fp-sharded",
2023 vec![test_shard(2, 2, true)],
2024 );
2025 let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2026 assert_eq!(out_of_range_summary.shard_count, 2);
2027 assert_eq!(out_of_range_summary.ready_shards, 1);
2028 assert!(
2029 !out_of_range_summary.complete,
2030 "shard indexes outside the declared shard count are malformed"
2031 );
2032
2033 let mut mismatched_metadata = test_shard(1, 2, true);
2034 mismatched_metadata.dimension = 768;
2035 shards.replace_shards_for_generation(
2036 TierKind::Fast,
2037 "fnv1a-384",
2038 "fp-sharded",
2039 vec![test_shard(0, 2, true), mismatched_metadata],
2040 );
2041 let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2042 assert_eq!(metadata_summary.shard_count, 2);
2043 assert_eq!(metadata_summary.ready_shards, 2);
2044 assert!(
2045 !metadata_summary.complete,
2046 "complete shard generations require consistent shard metadata"
2047 );
2048
2049 let mut stale_schema = test_shard(0, 1, true);
2050 stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2051 shards.replace_shards_for_generation(
2052 TierKind::Fast,
2053 "fnv1a-384",
2054 "fp-sharded",
2055 vec![stale_schema],
2056 );
2057 let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2058 assert_eq!(stale_schema_summary.shard_count, 1);
2059 assert_eq!(stale_schema_summary.ready_shards, 1);
2060 assert!(
2061 !stale_schema_summary.complete,
2062 "stale schema shards must not summarize as complete"
2063 );
2064 }
2065
2066 #[test]
2067 fn shard_sidecar_does_not_make_main_manifest_ready() {
2068 let mut shards = SemanticShardManifest::default();
2069 shards.replace_shards_for_generation(
2070 TierKind::Fast,
2071 "fnv1a-384",
2072 "fp-sharded",
2073 vec![test_shard(0, 1, true)],
2074 );
2075 assert!(
2076 shards
2077 .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2078 .complete
2079 );
2080
2081 let manifest = SemanticManifest::default();
2082 let policy = test_policy();
2083 assert_eq!(
2084 manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2085 TierReadiness::Missing,
2086 "sidecar shards must not publish runtime semantic readiness"
2087 );
2088 }
2089
2090 #[test]
2091 fn shard_manifest_invalidates_incompatible_shards() {
2092 let mut bad = test_shard(0, 1, true);
2093 bad.schema_version = 0;
2094 let mut shards = SemanticShardManifest {
2095 shards: vec![bad, test_shard(0, 1, true)],
2096 ..Default::default()
2097 };
2098
2099 let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2100
2101 assert_eq!(invalidated, 1);
2102 assert_eq!(shards.shards.len(), 1);
2103 assert_eq!(shards.total_size_bytes(), 4096);
2104 }
2105}