Skip to main content

coding_agent_search/search/
semantic_manifest.rs

1//! Durable semantic asset manifest, backlog ledger, and resumable checkpoints.
2//!
3//! This module is the authoritative state model for semantic assets.  It tells
4//! cass exactly what semantic artifacts exist, how trustworthy they are, and
5//! what work remains to converge the corpus — enabling partial readiness,
6//! resumable builds, and truthful runtime degradation.
7//!
8//! # Storage
9//!
10//! The manifest is a single JSON file at:
11//! ```text
12//! {data_dir}/vector_index/semantic_manifest.json
13//! ```
14//!
15//! It is written atomically (write-to-temp then rename) and is the only file
16//! the backfill worker needs to consult to know what work remains.
17//!
18//! # Relationship to other modules
19//!
20//! - **[`policy`]**: Provides the contract (versions, budgets, tier names) that
21//!   this manifest fingerprints against.
22//! - **[`asset_state`]**: Evaluates coarse readiness from this manifest plus
23//!   live file probes.
24//! - **[`model_manager`]**: Detects model availability; this module records
25//!   which model was used to build each artifact.
26
27use std::fs::{self, OpenOptions};
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use ring::rand::{SecureRandom, SystemRandom};
33use serde::{Deserialize, Serialize};
34
35use super::policy::{
36    CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
37    SemanticAssetManifest as PolicyManifest, SemanticPolicy,
38};
39
40// ─── Constants ─────────────────────────────────────────────────────────────
41
42/// Current manifest format version.  Bump when the JSON schema changes in a
43/// backwards-incompatible way.
44pub const MANIFEST_FORMAT_VERSION: u32 = 1;
45
46/// Filename for the durable manifest.
47pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
48
49/// Filename for the prototype per-shard semantic artifact manifest.
50pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
51
52pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
53    let trimmed = recorded_path.trim();
54    if trimmed.is_empty() || trimmed != recorded_path {
55        return false;
56    }
57    let path = Path::new(recorded_path);
58    !path.is_absolute()
59        && path
60            .components()
61            .all(|component| matches!(component, Component::Normal(_)))
62}
63
64// ─── Tier kind ─────────────────────────────────────────────────────────────
65
66/// Which semantic tier an artifact or checkpoint belongs to.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
68#[serde(rename_all = "snake_case")]
69pub enum TierKind {
70    Fast,
71    Quality,
72}
73
74impl TierKind {
75    pub fn as_str(&self) -> &'static str {
76        match self {
77            Self::Fast => "fast",
78            Self::Quality => "quality",
79        }
80    }
81}
82
83// ─── Tier readiness ────────────────────────────────────────────────────────
84
85/// Readiness of a single semantic tier.
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
87#[serde(rename_all = "snake_case")]
88pub enum TierReadiness {
89    /// Artifact exists, verified, and current with the DB.
90    Ready,
91    /// Build is in progress (checkpoint present).
92    Building { progress_pct: u8 },
93    /// Artifact exists but DB or model changed since it was built.
94    Stale { reason: String },
95    /// No artifact at all for this tier.
96    Missing,
97    /// Schema or chunking version mismatch — must discard and rebuild.
98    Incompatible { reason: String },
99}
100
101impl TierReadiness {
102    pub fn is_ready(&self) -> bool {
103        matches!(self, Self::Ready)
104    }
105
106    pub fn is_usable(&self) -> bool {
107        matches!(self, Self::Ready | Self::Stale { .. })
108    }
109}
110
111// ─── Artifact record ───────────────────────────────────────────────────────
112
113/// Durable metadata for a single vector index artifact.
114#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
115pub struct ArtifactRecord {
116    /// Which tier this artifact belongs to.
117    pub tier: TierKind,
118    /// Embedder ID that produced these vectors (e.g., "minilm-384", "fnv1a-384").
119    pub embedder_id: String,
120    /// Model revision hash (HuggingFace commit or "hash" for the hash embedder).
121    pub model_revision: String,
122    /// Semantic schema version at build time.
123    pub schema_version: u32,
124    /// Chunking strategy version at build time.
125    pub chunking_version: u32,
126    /// Output dimension of the embedder.
127    pub dimension: usize,
128    /// Number of documents (message chunks) embedded.
129    pub doc_count: u64,
130    /// Number of conversations processed to produce this artifact.
131    pub conversation_count: u64,
132    /// Storage fingerprint of the canonical DB when this artifact was built.
133    pub db_fingerprint: String,
134    /// Relative path to the index file (from data_dir).
135    pub index_path: String,
136    /// File size in bytes.
137    pub size_bytes: u64,
138    /// Unix timestamp (ms) when the build started.
139    pub started_at_ms: i64,
140    /// Unix timestamp (ms) when the build completed.
141    pub completed_at_ms: i64,
142    /// Whether this artifact has been verified and published.
143    pub ready: bool,
144}
145
146impl ArtifactRecord {
147    /// Convert to the policy-level manifest for invalidation checks.
148    pub fn to_policy_manifest(&self) -> PolicyManifest {
149        PolicyManifest {
150            embedder_id: self.embedder_id.clone(),
151            model_revision: self.model_revision.clone(),
152            schema_version: self.schema_version,
153            chunking_version: self.chunking_version,
154            doc_count: self.doc_count,
155            built_at_ms: self.completed_at_ms,
156        }
157    }
158
159    /// Evaluate this artifact's readiness against the current policy and DB
160    /// fingerprint.
161    ///
162    /// **Note**: This checks schema/chunking versions, mode, model revision,
163    /// and DB fingerprint.  It does NOT detect embedder changes because the
164    /// expected embedder ID requires the embedder registry to resolve.
165    /// Callers needing embedder-change detection should call
166    /// [`SemanticAssetManifest::invalidation_action`] directly with the
167    /// correct `expected_embedder_id` from the registry.
168    pub fn readiness(
169        &self,
170        policy: &SemanticPolicy,
171        current_db_fingerprint: &str,
172        current_model_revision: &str,
173    ) -> TierReadiness {
174        let action = self.to_policy_manifest().invalidation_action(
175            policy,
176            current_model_revision,
177            &self.embedder_id,
178        );
179
180        match action {
181            InvalidationAction::UpToDate => {
182                if self.db_fingerprint != current_db_fingerprint {
183                    TierReadiness::Stale {
184                        reason: "DB content changed since artifact was built".to_owned(),
185                    }
186                } else if !self.ready {
187                    TierReadiness::Building { progress_pct: 100 }
188                } else {
189                    TierReadiness::Ready
190                }
191            }
192            InvalidationAction::RebuildInBackground => TierReadiness::Stale {
193                reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
194            },
195            InvalidationAction::DiscardAndRebuild { reason } => {
196                TierReadiness::Incompatible { reason }
197            }
198            InvalidationAction::Evict => TierReadiness::Incompatible {
199                reason: "semantic mode set to lexical-only".to_owned(),
200            },
201        }
202    }
203}
204
205// ─── HNSW accelerator record ──────────────────────────────────────────────
206
207/// Durable metadata for an HNSW accelerator index.
208#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
209pub struct HnswRecord {
210    /// Which base artifact this accelerates.
211    pub base_tier: TierKind,
212    /// Embedder ID of the base artifact.
213    pub embedder_id: String,
214    /// ef_search parameter used at build time.
215    pub ef_search: usize,
216    /// Relative path to the HNSW index file (from data_dir).
217    pub index_path: String,
218    /// File size in bytes.
219    pub size_bytes: u64,
220    /// Unix timestamp (ms) when built.
221    pub built_at_ms: i64,
222    /// Whether this index is ready for use.
223    pub ready: bool,
224}
225
226// ─── Sharded vector artifact sidecar ─────────────────────────────────────
227
228/// Durable metadata for one mmap-friendly semantic vector shard.
229///
230/// Shards deliberately live in a sidecar manifest instead of
231/// [`SemanticManifest`]. Runtime readiness continues to flow through the
232/// existing tier artifact records, so partial shard generations cannot make a
233/// semantic tier look ready before a promotion step has merged or selected a
234/// complete generation.
235#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
236pub struct SemanticShardRecord {
237    /// Which semantic tier this shard belongs to.
238    pub tier: TierKind,
239    /// Embedder ID that produced the vectors.
240    pub embedder_id: String,
241    /// Model revision hash or "hash" for the hash embedder.
242    pub model_revision: String,
243    /// Semantic schema version at build time.
244    pub schema_version: u32,
245    /// Chunking strategy version at build time.
246    pub chunking_version: u32,
247    /// Output dimension of the embedder.
248    pub dimension: usize,
249    /// Zero-based shard number within the generation.
250    pub shard_index: u32,
251    /// Total shards expected for this generation.
252    pub shard_count: u32,
253    /// Number of documents embedded in this shard.
254    pub doc_count: u64,
255    /// Total conversations represented by the full shard generation.
256    pub total_conversations: u64,
257    /// Storage fingerprint of the canonical DB when this shard was built.
258    pub db_fingerprint: String,
259    /// Relative path to the shard index file from data_dir.
260    pub index_path: String,
261    /// Vector quantization used by the shard file.
262    pub quantization: String,
263    /// Whether readers may mmap this artifact directly.
264    pub mmap_ready: bool,
265    /// Relative path to the shard-local ANN accelerator, when built.
266    pub ann_index_path: Option<String>,
267    /// File size of the ANN accelerator in bytes.
268    pub ann_size_bytes: u64,
269    /// Whether the shard-local ANN accelerator is ready for use.
270    pub ann_ready: bool,
271    /// File size in bytes.
272    pub size_bytes: u64,
273    /// Unix timestamp (ms) when the shard build started.
274    pub started_at_ms: i64,
275    /// Unix timestamp (ms) when the shard build completed.
276    pub completed_at_ms: i64,
277    /// Whether this shard has been verified and published to the sidecar.
278    pub ready: bool,
279}
280
281impl SemanticShardRecord {
282    pub fn to_policy_manifest(&self) -> PolicyManifest {
283        PolicyManifest {
284            embedder_id: self.embedder_id.clone(),
285            model_revision: self.model_revision.clone(),
286            schema_version: self.schema_version,
287            chunking_version: self.chunking_version,
288            doc_count: self.doc_count,
289            built_at_ms: self.completed_at_ms,
290        }
291    }
292
293    pub fn matches_generation(
294        &self,
295        tier: TierKind,
296        embedder_id: &str,
297        db_fingerprint: &str,
298    ) -> bool {
299        self.tier == tier
300            && self.embedder_id == embedder_id
301            && self.db_fingerprint == db_fingerprint
302    }
303}
304
305/// Aggregated readiness for a shard generation.
306#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
307pub struct SemanticShardSummary {
308    pub shard_count: u32,
309    pub ready_shards: u32,
310    pub ann_ready_shards: u32,
311    pub doc_count: u64,
312    pub total_conversations: u64,
313    pub size_bytes: u64,
314    pub ann_size_bytes: u64,
315    pub complete: bool,
316}
317
318/// Sidecar manifest for prototype semantic shard generations.
319#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
320pub struct SemanticShardManifest {
321    pub manifest_version: u32,
322    pub shards: Vec<SemanticShardRecord>,
323    pub updated_at_ms: i64,
324}
325
326impl Default for SemanticShardManifest {
327    fn default() -> Self {
328        Self {
329            manifest_version: MANIFEST_FORMAT_VERSION,
330            shards: Vec::new(),
331            updated_at_ms: 0,
332        }
333    }
334}
335
336impl SemanticShardManifest {
337    pub fn path(data_dir: &Path) -> PathBuf {
338        data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
339    }
340
341    pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
342        let path = Self::path(data_dir);
343        let bytes = match fs::read(&path) {
344            Ok(bytes) => bytes,
345            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
346            Err(e) => {
347                return Err(ManifestError::Io {
348                    path,
349                    source: e.to_string(),
350                });
351            }
352        };
353
354        let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
355            path: path.clone(),
356            source: e.to_string(),
357        })?;
358
359        if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
360            return Err(ManifestError::UnsupportedVersion {
361                found: manifest.manifest_version,
362                max_supported: MANIFEST_FORMAT_VERSION,
363            });
364        }
365
366        Ok(Some(manifest))
367    }
368
369    pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
370        match Self::load(data_dir) {
371            Ok(Some(manifest)) => Ok(manifest),
372            Ok(None) => Ok(Self::default()),
373            Err(e @ ManifestError::Io { .. }) => Err(e),
374            Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
375                Ok(Self::default())
376            }
377            Err(e) => Err(e),
378        }
379    }
380
381    pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
382        let path = Self::path(data_dir);
383        if let Some(parent) = path.parent() {
384            fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
385                path: parent.to_path_buf(),
386                source: e.to_string(),
387            })?;
388        }
389
390        self.updated_at_ms = now_ms();
391        let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
392            source: e.to_string(),
393        })?;
394        let (tmp_path, mut file) =
395            create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
396                path: path.clone(),
397                source: e.to_string(),
398            })?;
399        file.write_all(json.as_bytes())
400            .map_err(|e| ManifestError::Io {
401                path: tmp_path.clone(),
402                source: e.to_string(),
403            })?;
404        file.sync_all().map_err(|e| ManifestError::Io {
405            path: tmp_path.clone(),
406            source: e.to_string(),
407        })?;
408        replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
409            path: path.clone(),
410            source: e.to_string(),
411        })?;
412        sync_parent_directory(&path).map_err(|e| ManifestError::Io {
413            path: path
414                .parent()
415                .map(Path::to_path_buf)
416                .unwrap_or_else(|| path.clone()),
417            source: e.to_string(),
418        })?;
419
420        Ok(())
421    }
422
423    pub fn replace_shards_for_generation(
424        &mut self,
425        tier: TierKind,
426        embedder_id: &str,
427        db_fingerprint: &str,
428        mut shards: Vec<SemanticShardRecord>,
429    ) {
430        self.shards
431            .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
432        self.shards.append(&mut shards);
433        self.shards.sort_by(|a, b| {
434            (
435                a.tier.as_str(),
436                &a.embedder_id,
437                &a.db_fingerprint,
438                a.shard_index,
439            )
440                .cmp(&(
441                    b.tier.as_str(),
442                    &b.embedder_id,
443                    &b.db_fingerprint,
444                    b.shard_index,
445                ))
446        });
447    }
448
449    pub fn summary(
450        &self,
451        tier: TierKind,
452        embedder_id: &str,
453        db_fingerprint: &str,
454    ) -> SemanticShardSummary {
455        let mut summary = SemanticShardSummary::default();
456        let mut ready_indices = std::collections::BTreeSet::new();
457        let mut ann_ready_indices = std::collections::BTreeSet::new();
458        let mut seen_indices = std::collections::BTreeSet::new();
459        let mut seen_index_paths = std::collections::BTreeSet::new();
460        let mut seen_ann_index_paths = std::collections::BTreeSet::new();
461        let mut expected_shard_count = None;
462        let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
463        let mut generation_consistent = true;
464        for shard in self
465            .shards
466            .iter()
467            .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
468        {
469            if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
470                generation_consistent = false;
471            }
472            if !seen_indices.insert(shard.shard_index) {
473                generation_consistent = false;
474            }
475            if !semantic_shard_artifact_path_is_safe(&shard.index_path)
476                || !seen_index_paths.insert(&shard.index_path)
477            {
478                generation_consistent = false;
479            }
480            match expected_shard_count {
481                Some(expected) if expected != shard.shard_count => {
482                    generation_consistent = false;
483                }
484                None => expected_shard_count = Some(shard.shard_count),
485                _ => {}
486            }
487            let generation_metadata = (
488                shard.model_revision.as_str(),
489                shard.schema_version,
490                shard.chunking_version,
491                shard.dimension,
492                shard.total_conversations,
493                shard.quantization.as_str(),
494            );
495            match expected_generation_metadata {
496                Some(expected) if expected != generation_metadata => {
497                    generation_consistent = false;
498                }
499                None => expected_generation_metadata = Some(generation_metadata),
500                _ => {}
501            }
502            if shard.schema_version != SEMANTIC_SCHEMA_VERSION
503                || shard.chunking_version != CHUNKING_STRATEGY_VERSION
504                || shard.dimension == 0
505            {
506                generation_consistent = false;
507            }
508            summary.shard_count = summary.shard_count.max(shard.shard_count);
509            summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
510            summary.total_conversations =
511                summary.total_conversations.max(shard.total_conversations);
512            summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
513            summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
514            if shard.ready && shard.mmap_ready {
515                ready_indices.insert(shard.shard_index);
516            }
517            if shard.ready
518                && shard.mmap_ready
519                && shard.ann_ready
520                && shard.ann_size_bytes > 0
521                && let Some(ann_index_path) = shard.ann_index_path.as_deref()
522                && semantic_shard_artifact_path_is_safe(ann_index_path)
523                && seen_ann_index_paths.insert(ann_index_path)
524            {
525                ann_ready_indices.insert(shard.shard_index);
526            }
527        }
528        summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
529        summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
530        summary.complete = generation_consistent
531            && summary.shard_count > 0
532            && summary.ready_shards == summary.shard_count
533            && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
534        summary
535    }
536
537    pub fn invalidate_incompatible(
538        &mut self,
539        policy: &SemanticPolicy,
540        current_model_revision: &str,
541    ) -> usize {
542        let before = self.shards.len();
543        self.shards.retain(|shard| {
544            !matches!(
545                shard.to_policy_manifest().invalidation_action(
546                    policy,
547                    current_model_revision,
548                    &shard.embedder_id,
549                ),
550                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
551            )
552        });
553        before.saturating_sub(self.shards.len())
554    }
555
556    pub fn total_size_bytes(&self) -> u64 {
557        self.shards
558            .iter()
559            .map(|shard| shard.size_bytes)
560            .fold(0, u64::saturating_add)
561    }
562}
563
564// ─── Build checkpoint ──────────────────────────────────────────────────────
565
566/// Resumable position for an interrupted semantic build.
567#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
568pub struct BuildCheckpoint {
569    /// Which tier is being built.
570    pub tier: TierKind,
571    /// Embedder ID for this build.
572    pub embedder_id: String,
573    /// Last conversation offset processed (for pagination).
574    pub last_offset: i64,
575    /// Total documents embedded so far in this build.
576    pub docs_embedded: u64,
577    /// Total conversations processed so far.
578    pub conversations_processed: u64,
579    /// Total conversations expected (from DB at start of build).
580    pub total_conversations: u64,
581    /// DB fingerprint when this build started.
582    pub db_fingerprint: String,
583    /// Schema version for this build.
584    pub schema_version: u32,
585    /// Chunking version for this build.
586    pub chunking_version: u32,
587    /// Unix timestamp (ms) when this checkpoint was saved.
588    pub saved_at_ms: i64,
589}
590
591impl BuildCheckpoint {
592    /// Progress as a percentage (0–100).
593    pub fn progress_pct(&self) -> u8 {
594        if self.total_conversations == 0 {
595            return 0;
596        }
597        let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
598        (pct as u8).min(100)
599    }
600
601    /// Whether the build is complete (all conversations processed).
602    pub fn is_complete(&self) -> bool {
603        self.conversations_processed >= self.total_conversations
604    }
605
606    /// Whether this checkpoint is still valid against the current DB and policy.
607    pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
608        self.db_fingerprint == current_db_fingerprint
609            && self.schema_version == SEMANTIC_SCHEMA_VERSION
610            && self.chunking_version == CHUNKING_STRATEGY_VERSION
611    }
612}
613
614// ─── Backlog ledger ────────────────────────────────────────────────────────
615
616/// Tracks what semantic build work remains.
617#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
618pub struct BacklogLedger {
619    /// Total conversations in the canonical DB at last check.
620    pub total_conversations: u64,
621    /// Conversations embedded in the fast tier.
622    pub fast_tier_processed: u64,
623    /// Conversations embedded in the quality tier.
624    pub quality_tier_processed: u64,
625    /// DB fingerprint when this ledger was computed.
626    pub db_fingerprint: String,
627    /// Unix timestamp (ms) when this ledger was computed.
628    pub computed_at_ms: i64,
629}
630
631impl BacklogLedger {
632    /// Remaining conversations for the fast tier.
633    pub fn fast_tier_remaining(&self) -> u64 {
634        self.total_conversations
635            .saturating_sub(self.fast_tier_processed)
636    }
637
638    /// Remaining conversations for the quality tier.
639    pub fn quality_tier_remaining(&self) -> u64 {
640        self.total_conversations
641            .saturating_sub(self.quality_tier_processed)
642    }
643
644    /// Whether either tier has outstanding work.
645    pub fn has_pending_work(&self) -> bool {
646        self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
647    }
648
649    /// Whether the ledger is current with the given DB fingerprint.
650    pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
651        self.db_fingerprint == current_db_fingerprint
652    }
653}
654
655// ─── The top-level manifest ────────────────────────────────────────────────
656
657/// Durable, atomic semantic asset manifest.
658///
659/// This is the single source of truth for what semantic assets exist, their
660/// provenance, and what work remains.  It is loaded/saved as JSON.
661#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
662pub struct SemanticManifest {
663    /// Format version — for future migrations.
664    pub manifest_version: u32,
665    /// Fast-tier vector artifact (hash embedder).
666    pub fast_tier: Option<ArtifactRecord>,
667    /// Quality-tier vector artifact (ML embedder).
668    pub quality_tier: Option<ArtifactRecord>,
669    /// HNSW accelerator index.
670    pub hnsw: Option<HnswRecord>,
671    /// Backlog / progress tracker.
672    pub backlog: BacklogLedger,
673    /// Active build checkpoint (for resuming interrupted work).
674    pub checkpoint: Option<BuildCheckpoint>,
675    /// Unix timestamp (ms) when this manifest was last written.
676    pub updated_at_ms: i64,
677}
678
679impl Default for SemanticManifest {
680    fn default() -> Self {
681        Self {
682            manifest_version: MANIFEST_FORMAT_VERSION,
683            fast_tier: None,
684            quality_tier: None,
685            hnsw: None,
686            backlog: BacklogLedger {
687                total_conversations: 0,
688                fast_tier_processed: 0,
689                quality_tier_processed: 0,
690                db_fingerprint: String::new(),
691                computed_at_ms: 0,
692            },
693            checkpoint: None,
694            updated_at_ms: 0,
695        }
696    }
697}
698
699impl SemanticManifest {
700    // ── Path helpers ───────────────────────────────────────────────────
701
702    /// Path to the manifest file.
703    pub fn path(data_dir: &Path) -> PathBuf {
704        data_dir.join("vector_index").join(MANIFEST_FILENAME)
705    }
706
707    // ── Load / Save ───────────────────────────────────────────────────
708
709    /// Load the manifest from disk.  Returns `None` if the file doesn't
710    /// exist, `Err` if it exists but is corrupt.
711    pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
712        let path = Self::path(data_dir);
713        let bytes = match fs::read(&path) {
714            Ok(b) => b,
715            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
716            Err(e) => {
717                return Err(ManifestError::Io {
718                    path,
719                    source: e.to_string(),
720                });
721            }
722        };
723
724        let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
725            path: path.clone(),
726            source: e.to_string(),
727        })?;
728
729        // Forward-compatible: reject future manifest versions we can't read.
730        if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
731            return Err(ManifestError::UnsupportedVersion {
732                found: manifest.manifest_version,
733                max_supported: MANIFEST_FORMAT_VERSION,
734            });
735        }
736
737        Ok(Some(manifest))
738    }
739
740    /// Load the manifest, returning defaults if absent or corrupt.
741    ///
742    /// Unlike [`load`], this method treats parse errors and version mismatches
743    /// as "manifest absent" — the caller gets a clean default rather than an
744    /// error.  This is the right behaviour for runtime code that must always
745    /// make progress.
746    pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
747        match Self::load(data_dir) {
748            Ok(Some(manifest)) => Ok(manifest),
749            Ok(None) => Ok(Self::default()),
750            // I/O errors are real failures — propagate the original.
751            Err(e @ ManifestError::Io { .. }) => Err(e),
752            // Parse or version errors → treat as absent.
753            Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
754                Ok(Self::default())
755            }
756            Err(e) => Err(e),
757        }
758    }
759
760    /// Atomically save the manifest to disk (write-to-temp then rename).
761    pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
762        let path = Self::path(data_dir);
763
764        // Ensure parent directory exists.
765        if let Some(parent) = path.parent() {
766            fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
767                path: parent.to_path_buf(),
768                source: e.to_string(),
769            })?;
770        }
771
772        self.updated_at_ms = now_ms();
773
774        let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
775            source: e.to_string(),
776        })?;
777
778        // Atomic write: temp file → rename.
779        let (tmp_path, mut file) =
780            create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
781                path: path.clone(),
782                source: e.to_string(),
783            })?;
784        file.write_all(json.as_bytes())
785            .map_err(|e| ManifestError::Io {
786                path: tmp_path.clone(),
787                source: e.to_string(),
788            })?;
789        file.sync_all().map_err(|e| ManifestError::Io {
790            path: tmp_path.clone(),
791            source: e.to_string(),
792        })?;
793        replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
794            path: path.clone(),
795            source: e.to_string(),
796        })?;
797        sync_parent_directory(&path).map_err(|e| ManifestError::Io {
798            path: path
799                .parent()
800                .map(Path::to_path_buf)
801                .unwrap_or_else(|| path.clone()),
802            source: e.to_string(),
803        })?;
804
805        Ok(())
806    }
807
808    // ── Readiness evaluation ──────────────────────────────────────────
809
810    /// Evaluate readiness of the fast tier.
811    pub fn fast_tier_readiness(
812        &self,
813        policy: &SemanticPolicy,
814        current_db_fingerprint: &str,
815        current_model_revision: &str,
816    ) -> TierReadiness {
817        match &self.fast_tier {
818            Some(artifact) => {
819                artifact.readiness(policy, current_db_fingerprint, current_model_revision)
820            }
821            None => {
822                // Check for an active build checkpoint for this tier.
823                if let Some(cp) = &self.checkpoint
824                    && cp.tier == TierKind::Fast
825                    && cp.is_valid(current_db_fingerprint)
826                {
827                    TierReadiness::Building {
828                        progress_pct: cp.progress_pct(),
829                    }
830                } else {
831                    TierReadiness::Missing
832                }
833            }
834        }
835    }
836
837    /// Evaluate readiness of the quality tier.
838    pub fn quality_tier_readiness(
839        &self,
840        policy: &SemanticPolicy,
841        current_db_fingerprint: &str,
842        current_model_revision: &str,
843    ) -> TierReadiness {
844        match &self.quality_tier {
845            Some(artifact) => {
846                artifact.readiness(policy, current_db_fingerprint, current_model_revision)
847            }
848            None => {
849                if let Some(cp) = &self.checkpoint
850                    && cp.tier == TierKind::Quality
851                    && cp.is_valid(current_db_fingerprint)
852                {
853                    TierReadiness::Building {
854                        progress_pct: cp.progress_pct(),
855                    }
856                } else {
857                    TierReadiness::Missing
858                }
859            }
860        }
861    }
862
863    /// Whether hybrid refinement can run right now (fast tier usable).
864    pub fn can_hybrid_search(
865        &self,
866        policy: &SemanticPolicy,
867        current_db_fingerprint: &str,
868        current_model_revision: &str,
869    ) -> bool {
870        self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
871            .is_usable()
872    }
873
874    // ── Backlog / checkpoint management ───────────────────────────────
875
876    /// Update the backlog from the canonical DB state.
877    pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
878        let fast_processed = self
879            .fast_tier
880            .as_ref()
881            .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
882            .map_or(0, |a| a.conversation_count);
883        let quality_processed = self
884            .quality_tier
885            .as_ref()
886            .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
887            .map_or(0, |a| a.conversation_count);
888
889        self.backlog = BacklogLedger {
890            total_conversations,
891            fast_tier_processed: fast_processed,
892            quality_tier_processed: quality_processed,
893            db_fingerprint: current_db_fingerprint.to_owned(),
894            computed_at_ms: now_ms(),
895        };
896    }
897
898    /// Save a build checkpoint (called periodically during backfill).
899    pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
900        self.checkpoint = Some(checkpoint);
901    }
902
903    /// Clear the build checkpoint (called when build finishes or is abandoned).
904    pub fn clear_checkpoint(&mut self) {
905        self.checkpoint = None;
906    }
907
908    /// Record a completed artifact and clear the matching checkpoint.
909    pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
910        // Clear checkpoint if it matches this tier.
911        if self
912            .checkpoint
913            .as_ref()
914            .is_some_and(|cp| cp.tier == artifact.tier)
915        {
916            self.checkpoint = None;
917        }
918
919        match artifact.tier {
920            TierKind::Fast => self.fast_tier = Some(artifact),
921            TierKind::Quality => self.quality_tier = Some(artifact),
922        }
923    }
924
925    /// Record a completed HNSW accelerator.
926    pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
927        self.hnsw = Some(hnsw);
928    }
929
930    /// Adopt a legacy (pre-manifest) artifact if it is compatible with the
931    /// current schema/chunking versions.  Returns `true` if adopted.
932    #[allow(clippy::too_many_arguments)]
933    pub fn adopt_legacy_artifact(
934        &mut self,
935        tier: TierKind,
936        embedder_id: &str,
937        model_revision: &str,
938        dimension: usize,
939        doc_count: u64,
940        conversation_count: u64,
941        db_fingerprint: &str,
942        index_path: &str,
943        size_bytes: u64,
944    ) -> bool {
945        let record = ArtifactRecord {
946            tier,
947            embedder_id: embedder_id.to_owned(),
948            model_revision: model_revision.to_owned(),
949            schema_version: SEMANTIC_SCHEMA_VERSION,
950            chunking_version: CHUNKING_STRATEGY_VERSION,
951            dimension,
952            doc_count,
953            conversation_count,
954            db_fingerprint: db_fingerprint.to_owned(),
955            index_path: index_path.to_owned(),
956            size_bytes,
957            started_at_ms: 0,
958            completed_at_ms: now_ms(),
959            ready: true,
960        };
961
962        match tier {
963            TierKind::Fast => self.fast_tier = Some(record),
964            TierKind::Quality => self.quality_tier = Some(record),
965        }
966        true
967    }
968
969    /// Invalidate artifacts that are incompatible with the current policy.
970    /// Returns the number of artifacts invalidated.
971    ///
972    /// **Note**: This detects schema version, chunking version, and mode
973    /// incompatibilities.  It does NOT detect embedder changes (e.g., minilm →
974    /// snowflake) because the policy stores short names while artifacts store
975    /// full registry IDs.  Callers who need embedder-change detection should
976    /// compare `artifact.embedder_id` against the expected ID from the
977    /// embedder registry.
978    pub fn invalidate_incompatible(
979        &mut self,
980        policy: &SemanticPolicy,
981        current_model_revision: &str,
982    ) -> usize {
983        let mut count = 0;
984
985        if let Some(ref artifact) = self.fast_tier {
986            let pm = artifact.to_policy_manifest();
987            if matches!(
988                pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
989                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
990            ) {
991                self.fast_tier = None;
992                count += 1;
993            }
994        }
995
996        if let Some(ref artifact) = self.quality_tier {
997            let pm = artifact.to_policy_manifest();
998            if matches!(
999                pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1000                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1001            ) {
1002                self.quality_tier = None;
1003                count += 1;
1004            }
1005        }
1006
1007        // HNSW depends on the base tier — invalidate if base is gone.
1008        if let Some(ref hnsw) = self.hnsw {
1009            let base_gone = match hnsw.base_tier {
1010                TierKind::Fast => self.fast_tier.is_none(),
1011                TierKind::Quality => self.quality_tier.is_none(),
1012            };
1013            if base_gone {
1014                self.hnsw = None;
1015                count += 1;
1016            }
1017        }
1018
1019        // Invalidate checkpoint if its schema/chunking is wrong.
1020        if let Some(ref cp) = self.checkpoint
1021            && (cp.schema_version != policy.semantic_schema_version
1022                || cp.chunking_version != policy.chunking_strategy_version)
1023        {
1024            self.checkpoint = None;
1025        }
1026
1027        count
1028    }
1029
1030    /// Total disk usage of all semantic artifacts (bytes).
1031    pub fn total_size_bytes(&self) -> u64 {
1032        let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1033        let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1034        let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1035        fast + quality + hnsw
1036    }
1037
1038    /// Total disk usage in megabytes (rounded up).
1039    pub fn total_size_mb(&self) -> u64 {
1040        self.total_size_bytes().div_ceil(1_048_576)
1041    }
1042}
1043
1044// ─── Errors ────────────────────────────────────────────────────────────────
1045
1046#[derive(Debug)]
1047pub enum ManifestError {
1048    Io { path: PathBuf, source: String },
1049    Parse { path: PathBuf, source: String },
1050    Serialize { source: String },
1051    UnsupportedVersion { found: u32, max_supported: u32 },
1052}
1053
1054impl std::fmt::Display for ManifestError {
1055    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1056        match self {
1057            Self::Io { path, source } => {
1058                write!(f, "manifest I/O error at {}: {source}", path.display())
1059            }
1060            Self::Parse { path, source } => {
1061                write!(f, "manifest parse error at {}: {source}", path.display())
1062            }
1063            Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1064            Self::UnsupportedVersion {
1065                found,
1066                max_supported,
1067            } => write!(
1068                f,
1069                "manifest version {found} is newer than supported version {max_supported}"
1070            ),
1071        }
1072    }
1073}
1074
1075impl std::error::Error for ManifestError {}
1076
1077// ─── Helpers ───────────────────────────────────────────────────────────────
1078
1079fn now_ms() -> i64 {
1080    SystemTime::now()
1081        .duration_since(UNIX_EPOCH)
1082        .map(|d| d.as_millis() as i64)
1083        .unwrap_or(0)
1084}
1085
1086fn unique_manifest_temp_path(path: &Path, attempt: u32, random: u64) -> PathBuf {
1087    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1088
1089    let file_name = path
1090        .file_name()
1091        .and_then(|name| name.to_str())
1092        .unwrap_or(MANIFEST_FILENAME);
1093    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1094    path.with_file_name(format!(
1095        ".{file_name}.tmp.{attempt}.{}.{}.{random:016x}",
1096        now_ms(),
1097        nonce
1098    ))
1099}
1100
1101fn create_unique_manifest_temp_file(path: &Path) -> std::io::Result<(PathBuf, fs::File)> {
1102    for attempt in 0..100 {
1103        let random = random_manifest_path_nonce()?;
1104        let tmp_path = unique_manifest_temp_path(path, attempt, random);
1105        match OpenOptions::new()
1106            .write(true)
1107            .create_new(true)
1108            .open(&tmp_path)
1109        {
1110            Ok(file) => return Ok((tmp_path, file)),
1111            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
1112            Err(err) => return Err(err),
1113        }
1114    }
1115
1116    Err(std::io::Error::new(
1117        std::io::ErrorKind::AlreadyExists,
1118        format!(
1119            "could not create a unique temporary manifest file for {} after 100 attempts",
1120            path.display()
1121        ),
1122    ))
1123}
1124
1125#[cfg(windows)]
1126fn unique_manifest_backup_path(path: &Path) -> std::io::Result<PathBuf> {
1127    let random = random_manifest_path_nonce()?;
1128    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1129
1130    let file_name = path
1131        .file_name()
1132        .and_then(|name| name.to_str())
1133        .unwrap_or(MANIFEST_FILENAME);
1134    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1135    Ok(path.with_file_name(format!(
1136        ".{file_name}.bak.{}.{nonce}.{random:016x}",
1137        now_ms()
1138    )))
1139}
1140
1141fn random_manifest_path_nonce() -> std::io::Result<u64> {
1142    let mut random_bytes = [0u8; 8];
1143    SystemRandom::new()
1144        .fill(&mut random_bytes)
1145        .map_err(|_| std::io::Error::other("failed to generate manifest temp path nonce"))?;
1146    Ok(u64::from_le_bytes(random_bytes))
1147}
1148
1149fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1150    #[cfg(windows)]
1151    {
1152        match fs::rename(temp_path, final_path) {
1153            Ok(()) => sync_parent_directory(final_path),
1154            Err(first_err)
1155                if final_path.exists()
1156                    && matches!(
1157                        first_err.kind(),
1158                        std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1159                    ) =>
1160            {
1161                let backup_path = unique_manifest_backup_path(final_path)?;
1162                fs::rename(final_path, &backup_path).map_err(|backup_err| {
1163                    let _ = fs::remove_file(temp_path);
1164                    std::io::Error::other(format!(
1165                        "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1166                        backup_path.display(),
1167                        final_path.display(),
1168                        first_err,
1169                        backup_err
1170                    ))
1171                })?;
1172                match fs::rename(temp_path, final_path) {
1173                    Ok(()) => {
1174                        let _ = fs::remove_file(&backup_path);
1175                        sync_parent_directory(final_path)
1176                    }
1177                    Err(second_err) => match fs::rename(&backup_path, final_path) {
1178                        Ok(()) => {
1179                            let _ = fs::remove_file(temp_path);
1180                            sync_parent_directory(final_path)?;
1181                            Err(std::io::Error::other(format!(
1182                                "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1183                                final_path.display(),
1184                                temp_path.display(),
1185                                first_err,
1186                                second_err
1187                            )))
1188                        }
1189                        Err(restore_err) => Err(std::io::Error::other(format!(
1190                            "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1191                            final_path.display(),
1192                            temp_path.display(),
1193                            first_err,
1194                            second_err,
1195                            restore_err,
1196                            temp_path.display()
1197                        ))),
1198                    },
1199                }
1200            }
1201            Err(err) => Err(err),
1202        }
1203    }
1204
1205    #[cfg(not(windows))]
1206    {
1207        fs::rename(temp_path, final_path)
1208    }
1209}
1210
1211#[cfg(not(windows))]
1212fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1213    let Some(parent) = path.parent() else {
1214        return Ok(());
1215    };
1216    let directory = fs::File::open(parent)?;
1217    directory.sync_all()
1218}
1219
1220#[cfg(windows)]
1221fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1222    Ok(())
1223}
1224
1225// ─── Tests ─────────────────────────────────────────────────────────────────
1226
1227#[cfg(test)]
1228mod tests {
1229    use super::*;
1230    use crate::search::policy::SemanticPolicy;
1231
1232    fn test_policy() -> SemanticPolicy {
1233        SemanticPolicy::compiled_defaults()
1234    }
1235
1236    fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1237        ArtifactRecord {
1238            tier,
1239            embedder_id: match tier {
1240                TierKind::Fast => "fnv1a-384".to_owned(),
1241                TierKind::Quality => "minilm-384".to_owned(),
1242            },
1243            model_revision: "abc123".to_owned(),
1244            schema_version: SEMANTIC_SCHEMA_VERSION,
1245            chunking_version: CHUNKING_STRATEGY_VERSION,
1246            dimension: 384,
1247            doc_count: 1000,
1248            conversation_count: 250,
1249            db_fingerprint: "fp-1234".to_owned(),
1250            index_path: format!(
1251                "vector_index/index-{}.fsvi",
1252                match tier {
1253                    TierKind::Fast => "fnv1a-384",
1254                    TierKind::Quality => "minilm-384",
1255                }
1256            ),
1257            size_bytes: 150_000,
1258            started_at_ms: 1_700_000_000_000,
1259            completed_at_ms: 1_700_000_060_000,
1260            ready,
1261        }
1262    }
1263
1264    fn test_hnsw() -> HnswRecord {
1265        HnswRecord {
1266            base_tier: TierKind::Quality,
1267            embedder_id: "minilm-384".to_owned(),
1268            ef_search: 128,
1269            index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1270            size_bytes: 50_000,
1271            built_at_ms: 1_700_000_070_000,
1272            ready: true,
1273        }
1274    }
1275
1276    fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1277        SemanticShardRecord {
1278            tier: TierKind::Fast,
1279            embedder_id: "fnv1a-384".to_owned(),
1280            model_revision: "hash".to_owned(),
1281            schema_version: SEMANTIC_SCHEMA_VERSION,
1282            chunking_version: CHUNKING_STRATEGY_VERSION,
1283            dimension: 384,
1284            shard_index,
1285            shard_count,
1286            doc_count: 25,
1287            total_conversations: 10,
1288            db_fingerprint: "fp-sharded".to_owned(),
1289            index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1290            quantization: "f16".to_owned(),
1291            mmap_ready: true,
1292            ann_index_path: None,
1293            ann_size_bytes: 0,
1294            ann_ready: false,
1295            size_bytes: 4096,
1296            started_at_ms: 1_700_000_080_000,
1297            completed_at_ms: 1_700_000_081_000,
1298            ready,
1299        }
1300    }
1301
1302    fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1303        BuildCheckpoint {
1304            tier,
1305            embedder_id: "minilm-384".to_owned(),
1306            last_offset: 500,
1307            docs_embedded: 3000,
1308            conversations_processed: 500,
1309            total_conversations: 1000,
1310            db_fingerprint: "fp-1234".to_owned(),
1311            schema_version: SEMANTIC_SCHEMA_VERSION,
1312            chunking_version: CHUNKING_STRATEGY_VERSION,
1313            saved_at_ms: 1_700_000_030_000,
1314        }
1315    }
1316
1317    #[derive(Debug, Clone, Copy)]
1318    enum ExpectedTierReadiness {
1319        Ready,
1320        Stale,
1321        Incompatible,
1322        Building(u8),
1323    }
1324
1325    fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1326
1327    type TierReadinessCase = (
1328        &'static str,
1329        TierKind,
1330        bool,
1331        &'static str,
1332        &'static str,
1333        fn(&mut ArtifactRecord),
1334        ExpectedTierReadiness,
1335    );
1336
1337    fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1338        artifact.schema_version = 0;
1339    }
1340
1341    fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1342        match expected {
1343            ExpectedTierReadiness::Ready => {
1344                assert_eq!(actual, TierReadiness::Ready, "{label}");
1345            }
1346            ExpectedTierReadiness::Stale => {
1347                assert!(
1348                    matches!(actual, TierReadiness::Stale { .. }),
1349                    "{label}: {actual:?}"
1350                );
1351            }
1352            ExpectedTierReadiness::Incompatible => {
1353                assert!(
1354                    matches!(actual, TierReadiness::Incompatible { .. }),
1355                    "{label}: {actual:?}"
1356                );
1357            }
1358            ExpectedTierReadiness::Building(progress_pct) => {
1359                assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1360            }
1361        }
1362    }
1363
1364    // ── Manifest load/save round-trip ──────────────────────────────────
1365
1366    #[test]
1367    fn manifest_round_trip_via_disk() {
1368        let temp = tempfile::tempdir().unwrap();
1369        let mut manifest = SemanticManifest {
1370            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1371            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1372            hnsw: Some(test_hnsw()),
1373            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1374            backlog: BacklogLedger {
1375                total_conversations: 2000,
1376                fast_tier_processed: 1000,
1377                quality_tier_processed: 500,
1378                db_fingerprint: "fp-1234".to_owned(),
1379                computed_at_ms: 1_700_000_000_000,
1380            },
1381            ..Default::default()
1382        };
1383
1384        manifest.save(temp.path()).unwrap();
1385        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1386
1387        assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1388        assert!(loaded.fast_tier.is_some());
1389        assert!(loaded.quality_tier.is_some());
1390        assert!(loaded.hnsw.is_some());
1391        assert!(loaded.checkpoint.is_some());
1392        assert_eq!(loaded.backlog.total_conversations, 2000);
1393        assert!(loaded.updated_at_ms > 0);
1394    }
1395
1396    #[test]
1397    fn manifest_save_overwrites_existing_file() {
1398        let temp = tempfile::tempdir().unwrap();
1399        let mut first = SemanticManifest {
1400            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1401            ..Default::default()
1402        };
1403        first.save(temp.path()).unwrap();
1404
1405        let mut second = SemanticManifest {
1406            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1407            backlog: BacklogLedger {
1408                total_conversations: 99,
1409                fast_tier_processed: 0,
1410                quality_tier_processed: 99,
1411                db_fingerprint: "fp-overwrite".to_owned(),
1412                computed_at_ms: 1_700_000_000_123,
1413            },
1414            ..Default::default()
1415        };
1416        second.save(temp.path()).unwrap();
1417
1418        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1419        assert!(loaded.fast_tier.is_none());
1420        assert!(loaded.quality_tier.is_some());
1421        assert_eq!(loaded.backlog.total_conversations, 99);
1422    }
1423
1424    #[test]
1425    fn manifest_temp_file_creation_is_exclusive_and_unique() -> Result<(), String> {
1426        let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1427        let final_path = SemanticManifest::path(temp.path());
1428        let manifest_dir = final_path
1429            .parent()
1430            .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1431        fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1432
1433        let (first_path, mut first_file) =
1434            create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1435        first_file.write_all(b"first").map_err(|e| e.to_string())?;
1436        let (second_path, mut second_file) =
1437            create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1438        second_file
1439            .write_all(b"second")
1440            .map_err(|e| e.to_string())?;
1441
1442        if first_path == second_path {
1443            return Err("exclusive temp creation reused the same path".to_string());
1444        }
1445        if !first_path.exists() {
1446            return Err(format!(
1447                "first temp file is missing: {}",
1448                first_path.display()
1449            ));
1450        }
1451        if !second_path.exists() {
1452            return Err(format!(
1453                "second temp file is missing: {}",
1454                second_path.display()
1455            ));
1456        }
1457        if first_path.parent() != Some(manifest_dir) {
1458            return Err(format!(
1459                "first temp path escaped manifest directory: {}",
1460                first_path.display()
1461            ));
1462        }
1463        if second_path.parent() != Some(manifest_dir) {
1464            return Err(format!(
1465                "second temp path escaped manifest directory: {}",
1466                second_path.display()
1467            ));
1468        }
1469
1470        Ok(())
1471    }
1472
1473    #[test]
1474    fn manifest_load_missing_returns_none() {
1475        let temp = tempfile::tempdir().unwrap();
1476        let loaded = SemanticManifest::load(temp.path()).unwrap();
1477        assert!(loaded.is_none());
1478    }
1479
1480    #[test]
1481    fn manifest_load_or_default_returns_defaults() {
1482        let temp = tempfile::tempdir().unwrap();
1483        let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1484        assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1485        assert!(manifest.fast_tier.is_none());
1486        assert!(manifest.quality_tier.is_none());
1487    }
1488
1489    #[test]
1490    fn manifest_load_corrupt_returns_parse_error() {
1491        let temp = tempfile::tempdir().unwrap();
1492        let path = SemanticManifest::path(temp.path());
1493        fs::create_dir_all(path.parent().unwrap()).unwrap();
1494        fs::write(&path, b"not json").unwrap();
1495
1496        let result = SemanticManifest::load(temp.path());
1497        assert!(matches!(result, Err(ManifestError::Parse { .. })));
1498    }
1499
1500    #[test]
1501    fn manifest_load_future_version_returns_error() {
1502        let temp = tempfile::tempdir().unwrap();
1503        let path = SemanticManifest::path(temp.path());
1504        fs::create_dir_all(path.parent().unwrap()).unwrap();
1505
1506        let manifest = SemanticManifest {
1507            manifest_version: MANIFEST_FORMAT_VERSION + 1,
1508            ..Default::default()
1509        };
1510        let json = serde_json::to_string(&manifest).unwrap();
1511        fs::write(&path, json).unwrap();
1512
1513        let result = SemanticManifest::load(temp.path());
1514        assert!(matches!(
1515            result,
1516            Err(ManifestError::UnsupportedVersion { .. })
1517        ));
1518    }
1519
1520    // ── Tier readiness (table-driven) ──────────────────────────────────
1521
1522    #[test]
1523    fn tier_readiness_cases() {
1524        let policy = test_policy();
1525        let db_fp = "fp-1234";
1526        let model_rev = "abc123";
1527        let cases: &[TierReadinessCase] = &[
1528            (
1529                "ready artifact with matching fingerprint",
1530                TierKind::Fast,
1531                true,
1532                db_fp,
1533                model_rev,
1534                no_artifact_mutation,
1535                ExpectedTierReadiness::Ready,
1536            ),
1537            (
1538                "ready artifact with changed DB fingerprint",
1539                TierKind::Fast,
1540                true,
1541                "different-fp",
1542                model_rev,
1543                no_artifact_mutation,
1544                ExpectedTierReadiness::Stale,
1545            ),
1546            (
1547                "ready artifact with changed model revision",
1548                TierKind::Quality,
1549                true,
1550                db_fp,
1551                "new-revision",
1552                no_artifact_mutation,
1553                ExpectedTierReadiness::Stale,
1554            ),
1555            (
1556                "schema version mismatch",
1557                TierKind::Quality,
1558                true,
1559                db_fp,
1560                model_rev,
1561                set_schema_version_to_zero,
1562                ExpectedTierReadiness::Incompatible,
1563            ),
1564            (
1565                "not yet published artifact",
1566                TierKind::Fast,
1567                false,
1568                db_fp,
1569                model_rev,
1570                no_artifact_mutation,
1571                ExpectedTierReadiness::Building(100),
1572            ),
1573        ];
1574
1575        for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1576            let mut artifact = test_artifact(*tier, *ready);
1577            mutate(&mut artifact);
1578            assert_tier_readiness(
1579                artifact.readiness(&policy, current_db_fp, current_model_rev),
1580                *expected,
1581                label,
1582            );
1583        }
1584    }
1585
1586    // ── Manifest-level readiness ───────────────────────────────────────
1587
1588    #[test]
1589    fn manifest_tier_readiness_missing() {
1590        let manifest = SemanticManifest::default();
1591        let policy = test_policy();
1592        assert_eq!(
1593            manifest.fast_tier_readiness(&policy, "fp", "rev"),
1594            TierReadiness::Missing,
1595        );
1596        assert_eq!(
1597            manifest.quality_tier_readiness(&policy, "fp", "rev"),
1598            TierReadiness::Missing,
1599        );
1600    }
1601
1602    #[test]
1603    fn manifest_tier_readiness_with_checkpoint() {
1604        let manifest = SemanticManifest {
1605            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1606            ..Default::default()
1607        };
1608
1609        let policy = test_policy();
1610        // Fast tier has no checkpoint → Missing
1611        assert_eq!(
1612            manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1613            TierReadiness::Missing,
1614        );
1615        // Quality tier has a valid checkpoint → Building
1616        assert!(matches!(
1617            manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1618            TierReadiness::Building { progress_pct: 50 },
1619        ));
1620    }
1621
1622    #[test]
1623    fn manifest_tier_readiness_checkpoint_invalid_db() {
1624        let manifest = SemanticManifest {
1625            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1626            ..Default::default()
1627        };
1628
1629        let policy = test_policy();
1630        // Checkpoint DB doesn't match → Missing (checkpoint invalid)
1631        assert_eq!(
1632            manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1633            TierReadiness::Missing,
1634        );
1635    }
1636
1637    // ── Hybrid search check ────────────────────────────────────────────
1638
1639    #[test]
1640    fn can_hybrid_search_requires_usable_fast_tier() {
1641        let policy = test_policy();
1642        let db_fp = "fp-1234";
1643        let rev = "abc123";
1644
1645        // No fast tier → can't hybrid
1646        let manifest = SemanticManifest::default();
1647        assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1648
1649        // Fast tier present → can hybrid
1650        let manifest = SemanticManifest {
1651            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1652            ..Default::default()
1653        };
1654        assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1655    }
1656
1657    // ── Backlog ledger ─────────────────────────────────────────────────
1658
1659    #[test]
1660    fn backlog_remaining_and_pending() {
1661        let ledger = BacklogLedger {
1662            total_conversations: 1000,
1663            fast_tier_processed: 800,
1664            quality_tier_processed: 300,
1665            db_fingerprint: "fp".to_owned(),
1666            computed_at_ms: 0,
1667        };
1668
1669        assert_eq!(ledger.fast_tier_remaining(), 200);
1670        assert_eq!(ledger.quality_tier_remaining(), 700);
1671        assert!(ledger.has_pending_work());
1672        assert!(ledger.is_current("fp"));
1673        assert!(!ledger.is_current("other"));
1674    }
1675
1676    #[test]
1677    fn backlog_no_pending_when_fully_processed() {
1678        let ledger = BacklogLedger {
1679            total_conversations: 500,
1680            fast_tier_processed: 500,
1681            quality_tier_processed: 500,
1682            db_fingerprint: "fp".to_owned(),
1683            computed_at_ms: 0,
1684        };
1685
1686        assert_eq!(ledger.fast_tier_remaining(), 0);
1687        assert_eq!(ledger.quality_tier_remaining(), 0);
1688        assert!(!ledger.has_pending_work());
1689    }
1690
1691    // ── Build checkpoint ───────────────────────────────────────────────
1692
1693    #[test]
1694    fn checkpoint_progress_and_completion() {
1695        let cp = test_checkpoint(TierKind::Quality);
1696        assert_eq!(cp.progress_pct(), 50);
1697        assert!(!cp.is_complete());
1698        assert!(cp.is_valid("fp-1234"));
1699        assert!(!cp.is_valid("other-fp"));
1700
1701        // Complete checkpoint
1702        let mut cp = test_checkpoint(TierKind::Quality);
1703        cp.conversations_processed = 1000;
1704        assert_eq!(cp.progress_pct(), 100);
1705        assert!(cp.is_complete());
1706    }
1707
1708    #[test]
1709    fn checkpoint_zero_total_gives_zero_pct() {
1710        let mut cp = test_checkpoint(TierKind::Fast);
1711        cp.total_conversations = 0;
1712        cp.conversations_processed = 0;
1713        assert_eq!(cp.progress_pct(), 0);
1714    }
1715
1716    // ── Publish and clear ──────────────────────────────────────────────
1717
1718    #[test]
1719    fn publish_artifact_clears_matching_checkpoint() {
1720        let mut manifest = SemanticManifest {
1721            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1722            ..Default::default()
1723        };
1724
1725        manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1726        assert!(manifest.checkpoint.is_none());
1727        assert!(manifest.quality_tier.is_some());
1728    }
1729
1730    #[test]
1731    fn publish_artifact_keeps_non_matching_checkpoint() {
1732        let mut manifest = SemanticManifest {
1733            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1734            ..Default::default()
1735        };
1736
1737        manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1738        assert!(manifest.checkpoint.is_some()); // Quality checkpoint survives
1739        assert!(manifest.fast_tier.is_some());
1740    }
1741
1742    // ── Refresh backlog ────────────────────────────────────────────────
1743
1744    #[test]
1745    fn refresh_backlog_computes_from_ready_artifacts() {
1746        let mut manifest = SemanticManifest {
1747            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1748            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1749            ..Default::default()
1750        };
1751
1752        manifest.refresh_backlog(2000, "fp-1234");
1753        assert_eq!(manifest.backlog.total_conversations, 2000);
1754        assert_eq!(manifest.backlog.fast_tier_processed, 250);
1755        assert_eq!(manifest.backlog.quality_tier_processed, 250);
1756    }
1757
1758    #[test]
1759    fn refresh_backlog_ignores_stale_artifacts() {
1760        let mut manifest = SemanticManifest {
1761            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1762            ..Default::default()
1763        };
1764
1765        // DB fingerprint doesn't match → artifact not counted
1766        manifest.refresh_backlog(2000, "different-fp");
1767        assert_eq!(manifest.backlog.fast_tier_processed, 0);
1768    }
1769
1770    // ── Invalidation ───────────────────────────────────────────────────
1771
1772    #[test]
1773    fn invalidate_incompatible_removes_schema_mismatch() {
1774        let mut artifact = test_artifact(TierKind::Quality, true);
1775        artifact.schema_version = 0; // mismatch
1776        let mut manifest = SemanticManifest {
1777            quality_tier: Some(artifact),
1778            hnsw: Some(test_hnsw()), // depends on quality tier
1779            ..Default::default()
1780        };
1781
1782        let policy = test_policy();
1783        let count = manifest.invalidate_incompatible(&policy, "abc123");
1784
1785        assert_eq!(count, 2); // quality + hnsw
1786        assert!(manifest.quality_tier.is_none());
1787        assert!(manifest.hnsw.is_none());
1788    }
1789
1790    #[test]
1791    fn invalidate_incompatible_keeps_compatible() {
1792        let mut manifest = SemanticManifest {
1793            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1794            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1795            ..Default::default()
1796        };
1797
1798        let policy = test_policy();
1799        let count = manifest.invalidate_incompatible(&policy, "abc123");
1800
1801        assert_eq!(count, 0);
1802        assert!(manifest.fast_tier.is_some());
1803        assert!(manifest.quality_tier.is_some());
1804    }
1805
1806    // ── Legacy adoption ────────────────────────────────────────────────
1807
1808    #[test]
1809    fn adopt_legacy_artifact() {
1810        let mut manifest = SemanticManifest::default();
1811        let doc_count = 500;
1812        let conversation_count = 125;
1813        let index_path = "vector_index/index-fnv1a-384.fsvi";
1814        let db_fingerprint = "fp-old";
1815        let size_bytes = 75_000;
1816        let adopted = manifest.adopt_legacy_artifact(
1817            TierKind::Fast,
1818            "fnv1a-384",
1819            "hash",
1820            384,
1821            doc_count,
1822            conversation_count,
1823            db_fingerprint,
1824            index_path,
1825            size_bytes,
1826        );
1827
1828        assert!(adopted);
1829        let fast = manifest.fast_tier.as_ref().unwrap();
1830        assert_eq!(fast.embedder_id, "fnv1a-384");
1831        assert!(fast.ready);
1832        assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1833    }
1834
1835    // ── Size accounting ────────────────────────────────────────────────
1836
1837    #[test]
1838    fn total_size_accounts_for_all_artifacts() {
1839        let manifest = SemanticManifest {
1840            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1841            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1842            hnsw: Some(test_hnsw()),
1843            ..Default::default()
1844        };
1845
1846        assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1847        assert_eq!(manifest.total_size_mb(), 1); // 350KB rounds up to 1MB
1848    }
1849
1850    #[test]
1851    fn total_size_empty_is_zero() {
1852        let manifest = SemanticManifest::default();
1853        assert_eq!(manifest.total_size_bytes(), 0);
1854        assert_eq!(manifest.total_size_mb(), 0);
1855    }
1856
1857    // ── JSON round-trip ────────────────────────────────────────────────
1858
1859    #[test]
1860    fn manifest_json_round_trip() {
1861        let manifest = SemanticManifest {
1862            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1863            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1864            hnsw: Some(test_hnsw()),
1865            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1866            ..Default::default()
1867        };
1868
1869        let json = serde_json::to_string_pretty(&manifest).unwrap();
1870        let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
1871        assert_eq!(deser.fast_tier, manifest.fast_tier);
1872        assert_eq!(deser.quality_tier, manifest.quality_tier);
1873        assert_eq!(deser.hnsw, manifest.hnsw);
1874        assert_eq!(deser.checkpoint, manifest.checkpoint);
1875    }
1876
1877    // ── Shard sidecar manifest ─────────────────────────────────────────
1878
1879    #[test]
1880    fn shard_manifest_round_trip_via_sidecar() {
1881        let temp = tempfile::tempdir().unwrap();
1882        let mut shards = SemanticShardManifest::default();
1883        shards.replace_shards_for_generation(
1884            TierKind::Fast,
1885            "fnv1a-384",
1886            "fp-sharded",
1887            vec![test_shard(1, 2, true), test_shard(0, 2, true)],
1888        );
1889
1890        shards.save(temp.path()).unwrap();
1891        let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
1892
1893        assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1894        assert_eq!(loaded.shards.len(), 2);
1895        assert_eq!(loaded.shards[0].shard_index, 0);
1896        assert_eq!(loaded.shards[1].shard_index, 1);
1897        assert!(loaded.updated_at_ms > 0);
1898    }
1899
1900    #[test]
1901    fn shard_summary_requires_every_ready_shard() {
1902        let mut shards = SemanticShardManifest::default();
1903        shards.replace_shards_for_generation(
1904            TierKind::Fast,
1905            "fnv1a-384",
1906            "fp-sharded",
1907            vec![test_shard(0, 3, true), test_shard(2, 3, true)],
1908        );
1909
1910        let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1911        assert_eq!(partial.shard_count, 3);
1912        assert_eq!(partial.ready_shards, 2);
1913        assert!(!partial.complete);
1914
1915        shards.replace_shards_for_generation(
1916            TierKind::Fast,
1917            "fnv1a-384",
1918            "fp-sharded",
1919            vec![
1920                test_shard(0, 3, true),
1921                test_shard(1, 3, true),
1922                test_shard(2, 3, true),
1923            ],
1924        );
1925
1926        let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1927        assert_eq!(complete.ready_shards, 3);
1928        assert!(complete.complete);
1929        assert_eq!(complete.doc_count, 75);
1930        assert_eq!(complete.total_conversations, 10);
1931    }
1932
1933    #[test]
1934    fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
1935        let mut non_mmap = test_shard(0, 1, true);
1936        non_mmap.mmap_ready = false;
1937        let mut shards = SemanticShardManifest::default();
1938        shards.replace_shards_for_generation(
1939            TierKind::Fast,
1940            "fnv1a-384",
1941            "fp-sharded",
1942            vec![non_mmap],
1943        );
1944
1945        let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1946        assert_eq!(non_mmap_summary.ready_shards, 0);
1947        assert!(!non_mmap_summary.complete);
1948
1949        let mut inconsistent = test_shard(1, 3, true);
1950        inconsistent.ann_ready = true;
1951        inconsistent.ann_index_path = None;
1952        inconsistent.ann_size_bytes = 4096;
1953        shards.replace_shards_for_generation(
1954            TierKind::Fast,
1955            "fnv1a-384",
1956            "fp-sharded",
1957            vec![test_shard(0, 2, true), inconsistent],
1958        );
1959
1960        let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1961        assert_eq!(inconsistent_summary.shard_count, 3);
1962        assert_eq!(inconsistent_summary.ready_shards, 2);
1963        assert_eq!(inconsistent_summary.ann_ready_shards, 0);
1964        assert!(!inconsistent_summary.complete);
1965
1966        shards.replace_shards_for_generation(
1967            TierKind::Fast,
1968            "fnv1a-384",
1969            "fp-sharded",
1970            vec![
1971                test_shard(0, 2, true),
1972                test_shard(1, 2, true),
1973                test_shard(1, 2, false),
1974            ],
1975        );
1976        let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1977        assert_eq!(duplicate_summary.shard_count, 2);
1978        assert_eq!(duplicate_summary.ready_shards, 2);
1979        assert!(
1980            !duplicate_summary.complete,
1981            "duplicate shard indexes must not summarize as a complete generation"
1982        );
1983
1984        let mut duplicate_path = test_shard(1, 2, true);
1985        duplicate_path.index_path = test_shard(0, 2, true).index_path;
1986        shards.replace_shards_for_generation(
1987            TierKind::Fast,
1988            "fnv1a-384",
1989            "fp-sharded",
1990            vec![test_shard(0, 2, true), duplicate_path],
1991        );
1992        let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1993        assert_eq!(duplicate_path_summary.shard_count, 2);
1994        assert_eq!(duplicate_path_summary.ready_shards, 2);
1995        assert!(
1996            !duplicate_path_summary.complete,
1997            "duplicate shard index paths must not summarize as a complete generation"
1998        );
1999
2000        let mut blank_path = test_shard(0, 1, true);
2001        blank_path.index_path.clear();
2002        shards.replace_shards_for_generation(
2003            TierKind::Fast,
2004            "fnv1a-384",
2005            "fp-sharded",
2006            vec![blank_path],
2007        );
2008        let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2009        assert_eq!(blank_path_summary.shard_count, 1);
2010        assert_eq!(blank_path_summary.ready_shards, 1);
2011        assert!(
2012            !blank_path_summary.complete,
2013            "blank shard index paths must not summarize as complete"
2014        );
2015
2016        for unsafe_path in [
2017            tempfile::tempdir()
2018                .unwrap()
2019                .path()
2020                .join("outside.fsvi")
2021                .to_string_lossy()
2022                .to_string(),
2023            "vector_index/shards/../outside.fsvi".to_string(),
2024            "./vector_index/shards/fast/hash.fsvi".to_string(),
2025            " vector_index/shards/fast/hash.fsvi".to_string(),
2026        ] {
2027            let mut unsafe_shard = test_shard(0, 1, true);
2028            unsafe_shard.index_path = unsafe_path;
2029            shards.replace_shards_for_generation(
2030                TierKind::Fast,
2031                "fnv1a-384",
2032                "fp-sharded",
2033                vec![unsafe_shard],
2034            );
2035            let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2036            assert_eq!(unsafe_path_summary.shard_count, 1);
2037            assert_eq!(unsafe_path_summary.ready_shards, 1);
2038            assert!(
2039                !unsafe_path_summary.complete,
2040                "unsafe shard index paths must not summarize as complete"
2041            );
2042        }
2043
2044        let outside_ann_dir = tempfile::tempdir().unwrap();
2045        for unsafe_ann_path in [
2046            outside_ann_dir
2047                .path()
2048                .join("outside.chsw")
2049                .to_string_lossy()
2050                .to_string(),
2051            "vector_index/shards/../outside.chsw".to_string(),
2052            "./vector_index/shards/fast/hash.chsw".to_string(),
2053            " vector_index/shards/fast/hash.chsw".to_string(),
2054        ] {
2055            let mut unsafe_ann = test_shard(0, 1, true);
2056            unsafe_ann.ann_ready = true;
2057            unsafe_ann.ann_index_path = Some(unsafe_ann_path);
2058            unsafe_ann.ann_size_bytes = 4096;
2059            shards.replace_shards_for_generation(
2060                TierKind::Fast,
2061                "fnv1a-384",
2062                "fp-sharded",
2063                vec![unsafe_ann],
2064            );
2065            let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2066            assert_eq!(unsafe_ann_summary.shard_count, 1);
2067            assert_eq!(unsafe_ann_summary.ready_shards, 1);
2068            assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
2069            assert!(
2070                unsafe_ann_summary.complete,
2071                "unsafe optional ANN paths must not invalidate the vector shard generation"
2072            );
2073        }
2074
2075        let mut duplicate_ann_path = test_shard(1, 2, true);
2076        duplicate_ann_path.ann_ready = true;
2077        duplicate_ann_path.ann_index_path =
2078            Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
2079        duplicate_ann_path.ann_size_bytes = 4096;
2080        let mut first_ann_path = test_shard(0, 2, true);
2081        first_ann_path.ann_ready = true;
2082        first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2083        first_ann_path.ann_size_bytes = 4096;
2084        shards.replace_shards_for_generation(
2085            TierKind::Fast,
2086            "fnv1a-384",
2087            "fp-sharded",
2088            vec![first_ann_path, duplicate_ann_path],
2089        );
2090        let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2091        assert_eq!(duplicate_ann_summary.shard_count, 2);
2092        assert_eq!(duplicate_ann_summary.ready_shards, 2);
2093        assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2094        assert!(
2095            duplicate_ann_summary.complete,
2096            "duplicate optional ANN paths must not invalidate the vector shard generation"
2097        );
2098
2099        shards.replace_shards_for_generation(
2100            TierKind::Fast,
2101            "fnv1a-384",
2102            "fp-sharded",
2103            vec![test_shard(2, 2, true)],
2104        );
2105        let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2106        assert_eq!(out_of_range_summary.shard_count, 2);
2107        assert_eq!(out_of_range_summary.ready_shards, 1);
2108        assert!(
2109            !out_of_range_summary.complete,
2110            "shard indexes outside the declared shard count are malformed"
2111        );
2112
2113        let mut mismatched_metadata = test_shard(1, 2, true);
2114        mismatched_metadata.dimension = 768;
2115        shards.replace_shards_for_generation(
2116            TierKind::Fast,
2117            "fnv1a-384",
2118            "fp-sharded",
2119            vec![test_shard(0, 2, true), mismatched_metadata],
2120        );
2121        let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2122        assert_eq!(metadata_summary.shard_count, 2);
2123        assert_eq!(metadata_summary.ready_shards, 2);
2124        assert!(
2125            !metadata_summary.complete,
2126            "complete shard generations require consistent shard metadata"
2127        );
2128
2129        let mut stale_schema = test_shard(0, 1, true);
2130        stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2131        shards.replace_shards_for_generation(
2132            TierKind::Fast,
2133            "fnv1a-384",
2134            "fp-sharded",
2135            vec![stale_schema],
2136        );
2137        let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2138        assert_eq!(stale_schema_summary.shard_count, 1);
2139        assert_eq!(stale_schema_summary.ready_shards, 1);
2140        assert!(
2141            !stale_schema_summary.complete,
2142            "stale schema shards must not summarize as complete"
2143        );
2144    }
2145
2146    #[test]
2147    fn shard_sidecar_does_not_make_main_manifest_ready() {
2148        let mut shards = SemanticShardManifest::default();
2149        shards.replace_shards_for_generation(
2150            TierKind::Fast,
2151            "fnv1a-384",
2152            "fp-sharded",
2153            vec![test_shard(0, 1, true)],
2154        );
2155        assert!(
2156            shards
2157                .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2158                .complete
2159        );
2160
2161        let manifest = SemanticManifest::default();
2162        let policy = test_policy();
2163        assert_eq!(
2164            manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2165            TierReadiness::Missing,
2166            "sidecar shards must not publish runtime semantic readiness"
2167        );
2168    }
2169
2170    #[test]
2171    fn shard_manifest_invalidates_incompatible_shards() {
2172        let mut bad = test_shard(0, 1, true);
2173        bad.schema_version = 0;
2174        let mut shards = SemanticShardManifest {
2175            shards: vec![bad, test_shard(0, 1, true)],
2176            ..Default::default()
2177        };
2178
2179        let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2180
2181        assert_eq!(invalidated, 1);
2182        assert_eq!(shards.shards.len(), 1);
2183        assert_eq!(shards.total_size_bytes(), 4096);
2184    }
2185}