Skip to main content

coding_agent_search/search/
semantic_manifest.rs

1//! Durable semantic asset manifest, backlog ledger, and resumable checkpoints.
2//!
3//! This module is the authoritative state model for semantic assets.  It tells
4//! cass exactly what semantic artifacts exist, how trustworthy they are, and
5//! what work remains to converge the corpus — enabling partial readiness,
6//! resumable builds, and truthful runtime degradation.
7//!
8//! # Storage
9//!
10//! The manifest is a single JSON file at:
11//! ```text
12//! {data_dir}/vector_index/semantic_manifest.json
13//! ```
14//!
15//! It is written atomically (write-to-temp then rename) and is the only file
16//! the backfill worker needs to consult to know what work remains.
17//!
18//! # Relationship to other modules
19//!
20//! - **[`policy`]**: Provides the contract (versions, budgets, tier names) that
21//!   this manifest fingerprints against.
22//! - **[`asset_state`]**: Evaluates coarse readiness from this manifest plus
23//!   live file probes.
24//! - **[`model_manager`]**: Detects model availability; this module records
25//!   which model was used to build each artifact.
26
27use std::fs::{self, OpenOptions};
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use ring::rand::{SecureRandom, SystemRandom};
33use serde::{Deserialize, Serialize};
34
35use super::policy::{
36    CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
37    SemanticAssetManifest as PolicyManifest, SemanticPolicy,
38};
39
40// ─── Constants ─────────────────────────────────────────────────────────────
41
42/// Current manifest format version.  Bump when the JSON schema changes in a
43/// backwards-incompatible way.
44/// Manifest format version.
45///
46/// History:
47/// - **v1** (pre-cass#257): `BuildCheckpoint` resume cursor is the
48///   conversation offset only.
49/// - **v2** (cass#257 sub-fix 2): `BuildCheckpoint` may additionally
50///   carry `last_message_id`, an inclusive canonical message PK
51///   advanced by every batch. Resume strictly skips messages ≤ this
52///   cursor so an interrupted bounded run never re-embeds work it
53///   already staged. The new field is `Option<i64>` with
54///   `#[serde(default)]`, so the JSON-on-disk shape only differs when
55///   at least one batch persisted the cursor — a freshly-created
56///   manifest still parses cleanly under v1 readers.
57///
58/// **Compatibility:**
59/// - **Old binary reading v2 manifest:** clean `UnsupportedVersion`
60///   error from `load()`; operator sees a clear "manifest version
61///   $V is newer than max-supported $MAX" message and can upgrade.
62/// - **New binary reading v1 manifest:** loads fine. `last_message_id`
63///   defaults to `None`; resume falls back to the conversation offset
64///   with a one-shot warning that resume granularity is coarser than
65///   ideal until the next checkpoint save bumps the on-disk shape.
66pub const MANIFEST_FORMAT_VERSION: u32 = 2;
67
68/// Highest manifest-version emitted by pre-cass#257 binaries; loading
69/// this is fully supported, but resume granularity will be coarser
70/// until a fresh checkpoint is saved. Kept as a named constant so the
71/// fallback warning quotes a stable number.
72pub const MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR: u32 = 1;
73
74/// Filename for the durable manifest.
75pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
76
77/// Filename for the prototype per-shard semantic artifact manifest.
78pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
79
80pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
81    let trimmed = recorded_path.trim();
82    if trimmed.is_empty() || trimmed != recorded_path {
83        return false;
84    }
85    let path = Path::new(recorded_path);
86    !path.is_absolute()
87        && path
88            .components()
89            .all(|component| matches!(component, Component::Normal(_)))
90}
91
92// ─── Tier kind ─────────────────────────────────────────────────────────────
93
94/// Which semantic tier an artifact or checkpoint belongs to.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
96#[serde(rename_all = "snake_case")]
97pub enum TierKind {
98    Fast,
99    Quality,
100}
101
102impl TierKind {
103    pub fn as_str(&self) -> &'static str {
104        match self {
105            Self::Fast => "fast",
106            Self::Quality => "quality",
107        }
108    }
109}
110
111// ─── Tier readiness ────────────────────────────────────────────────────────
112
113/// Readiness of a single semantic tier.
114#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(rename_all = "snake_case")]
116pub enum TierReadiness {
117    /// Artifact exists, verified, and current with the DB.
118    Ready,
119    /// Build is in progress (checkpoint present).
120    Building { progress_pct: u8 },
121    /// Artifact exists but DB or model changed since it was built.
122    Stale { reason: String },
123    /// No artifact at all for this tier.
124    Missing,
125    /// Schema or chunking version mismatch — must discard and rebuild.
126    Incompatible { reason: String },
127}
128
129impl TierReadiness {
130    pub fn is_ready(&self) -> bool {
131        matches!(self, Self::Ready)
132    }
133
134    pub fn is_usable(&self) -> bool {
135        matches!(self, Self::Ready | Self::Stale { .. })
136    }
137}
138
139// ─── Artifact record ───────────────────────────────────────────────────────
140
141/// Durable metadata for a single vector index artifact.
142#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
143pub struct ArtifactRecord {
144    /// Which tier this artifact belongs to.
145    pub tier: TierKind,
146    /// Embedder ID that produced these vectors (e.g., "minilm-384", "fnv1a-384").
147    pub embedder_id: String,
148    /// Model revision hash (HuggingFace commit or "hash" for the hash embedder).
149    pub model_revision: String,
150    /// Semantic schema version at build time.
151    pub schema_version: u32,
152    /// Chunking strategy version at build time.
153    pub chunking_version: u32,
154    /// Output dimension of the embedder.
155    pub dimension: usize,
156    /// Number of documents (message chunks) embedded.
157    pub doc_count: u64,
158    /// Number of conversations processed to produce this artifact.
159    pub conversation_count: u64,
160    /// Storage fingerprint of the canonical DB when this artifact was built.
161    pub db_fingerprint: String,
162    /// Relative path to the index file (from data_dir).
163    pub index_path: String,
164    /// File size in bytes.
165    pub size_bytes: u64,
166    /// Unix timestamp (ms) when the build started.
167    pub started_at_ms: i64,
168    /// Unix timestamp (ms) when the build completed.
169    pub completed_at_ms: i64,
170    /// Whether this artifact has been verified and published.
171    pub ready: bool,
172}
173
174impl ArtifactRecord {
175    /// Convert to the policy-level manifest for invalidation checks.
176    pub fn to_policy_manifest(&self) -> PolicyManifest {
177        PolicyManifest {
178            embedder_id: self.embedder_id.clone(),
179            model_revision: self.model_revision.clone(),
180            schema_version: self.schema_version,
181            chunking_version: self.chunking_version,
182            doc_count: self.doc_count,
183            built_at_ms: self.completed_at_ms,
184        }
185    }
186
187    /// Evaluate this artifact's readiness against the current policy and DB
188    /// fingerprint.
189    ///
190    /// **Note**: This checks schema/chunking versions, mode, model revision,
191    /// and DB fingerprint.  It does NOT detect embedder changes because the
192    /// expected embedder ID requires the embedder registry to resolve.
193    /// Callers needing embedder-change detection should call
194    /// [`SemanticAssetManifest::invalidation_action`] directly with the
195    /// correct `expected_embedder_id` from the registry.
196    pub fn readiness(
197        &self,
198        policy: &SemanticPolicy,
199        current_db_fingerprint: &str,
200        current_model_revision: &str,
201    ) -> TierReadiness {
202        let action = self.to_policy_manifest().invalidation_action(
203            policy,
204            current_model_revision,
205            &self.embedder_id,
206        );
207
208        match action {
209            InvalidationAction::UpToDate => {
210                if self.db_fingerprint != current_db_fingerprint {
211                    TierReadiness::Stale {
212                        reason: "DB content changed since artifact was built".to_owned(),
213                    }
214                } else if !self.ready {
215                    TierReadiness::Building { progress_pct: 100 }
216                } else {
217                    TierReadiness::Ready
218                }
219            }
220            InvalidationAction::RebuildInBackground => TierReadiness::Stale {
221                reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
222            },
223            InvalidationAction::DiscardAndRebuild { reason } => {
224                TierReadiness::Incompatible { reason }
225            }
226            InvalidationAction::Evict => TierReadiness::Incompatible {
227                reason: "semantic mode set to lexical-only".to_owned(),
228            },
229        }
230    }
231}
232
233// ─── HNSW accelerator record ──────────────────────────────────────────────
234
235/// Durable metadata for an HNSW accelerator index.
236#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
237pub struct HnswRecord {
238    /// Which base artifact this accelerates.
239    pub base_tier: TierKind,
240    /// Embedder ID of the base artifact.
241    pub embedder_id: String,
242    /// ef_search parameter used at build time.
243    pub ef_search: usize,
244    /// Relative path to the HNSW index file (from data_dir).
245    pub index_path: String,
246    /// File size in bytes.
247    pub size_bytes: u64,
248    /// Unix timestamp (ms) when built.
249    pub built_at_ms: i64,
250    /// Whether this index is ready for use.
251    pub ready: bool,
252}
253
254// ─── Sharded vector artifact sidecar ─────────────────────────────────────
255
256/// Durable metadata for one mmap-friendly semantic vector shard.
257///
258/// Shards deliberately live in a sidecar manifest instead of
259/// [`SemanticManifest`]. Runtime readiness continues to flow through the
260/// existing tier artifact records, so partial shard generations cannot make a
261/// semantic tier look ready before a promotion step has merged or selected a
262/// complete generation.
263#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
264pub struct SemanticShardRecord {
265    /// Which semantic tier this shard belongs to.
266    pub tier: TierKind,
267    /// Embedder ID that produced the vectors.
268    pub embedder_id: String,
269    /// Model revision hash or "hash" for the hash embedder.
270    pub model_revision: String,
271    /// Semantic schema version at build time.
272    pub schema_version: u32,
273    /// Chunking strategy version at build time.
274    pub chunking_version: u32,
275    /// Output dimension of the embedder.
276    pub dimension: usize,
277    /// Zero-based shard number within the generation.
278    pub shard_index: u32,
279    /// Total shards expected for this generation.
280    pub shard_count: u32,
281    /// Number of documents embedded in this shard.
282    pub doc_count: u64,
283    /// Total conversations represented by the full shard generation.
284    pub total_conversations: u64,
285    /// Storage fingerprint of the canonical DB when this shard was built.
286    pub db_fingerprint: String,
287    /// Relative path to the shard index file from data_dir.
288    pub index_path: String,
289    /// Vector quantization used by the shard file.
290    pub quantization: String,
291    /// Whether readers may mmap this artifact directly.
292    pub mmap_ready: bool,
293    /// Relative path to the shard-local ANN accelerator, when built.
294    pub ann_index_path: Option<String>,
295    /// File size of the ANN accelerator in bytes.
296    pub ann_size_bytes: u64,
297    /// Whether the shard-local ANN accelerator is ready for use.
298    pub ann_ready: bool,
299    /// File size in bytes.
300    pub size_bytes: u64,
301    /// Unix timestamp (ms) when the shard build started.
302    pub started_at_ms: i64,
303    /// Unix timestamp (ms) when the shard build completed.
304    pub completed_at_ms: i64,
305    /// Whether this shard has been verified and published to the sidecar.
306    pub ready: bool,
307}
308
309impl SemanticShardRecord {
310    pub fn to_policy_manifest(&self) -> PolicyManifest {
311        PolicyManifest {
312            embedder_id: self.embedder_id.clone(),
313            model_revision: self.model_revision.clone(),
314            schema_version: self.schema_version,
315            chunking_version: self.chunking_version,
316            doc_count: self.doc_count,
317            built_at_ms: self.completed_at_ms,
318        }
319    }
320
321    pub fn matches_generation(
322        &self,
323        tier: TierKind,
324        embedder_id: &str,
325        db_fingerprint: &str,
326    ) -> bool {
327        self.tier == tier
328            && self.embedder_id == embedder_id
329            && self.db_fingerprint == db_fingerprint
330    }
331}
332
333/// Aggregated readiness for a shard generation.
334#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
335pub struct SemanticShardSummary {
336    pub shard_count: u32,
337    pub ready_shards: u32,
338    pub ann_ready_shards: u32,
339    pub doc_count: u64,
340    pub total_conversations: u64,
341    pub size_bytes: u64,
342    pub ann_size_bytes: u64,
343    pub complete: bool,
344}
345
346/// Sidecar manifest for prototype semantic shard generations.
347#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
348pub struct SemanticShardManifest {
349    pub manifest_version: u32,
350    pub shards: Vec<SemanticShardRecord>,
351    pub updated_at_ms: i64,
352}
353
354impl Default for SemanticShardManifest {
355    fn default() -> Self {
356        Self {
357            manifest_version: MANIFEST_FORMAT_VERSION,
358            shards: Vec::new(),
359            updated_at_ms: 0,
360        }
361    }
362}
363
364impl SemanticShardManifest {
365    pub fn path(data_dir: &Path) -> PathBuf {
366        data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
367    }
368
369    pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
370        let path = Self::path(data_dir);
371        let bytes = match fs::read(&path) {
372            Ok(bytes) => bytes,
373            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
374            Err(e) => {
375                return Err(ManifestError::Io {
376                    path,
377                    source: e.to_string(),
378                });
379            }
380        };
381
382        let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
383            path: path.clone(),
384            source: e.to_string(),
385        })?;
386
387        if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
388            return Err(ManifestError::UnsupportedVersion {
389                found: manifest.manifest_version,
390                max_supported: MANIFEST_FORMAT_VERSION,
391            });
392        }
393
394        Ok(Some(manifest))
395    }
396
397    pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
398        match Self::load(data_dir) {
399            Ok(Some(manifest)) => Ok(manifest),
400            Ok(None) => Ok(Self::default()),
401            Err(e @ ManifestError::Io { .. }) => Err(e),
402            Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
403                Ok(Self::default())
404            }
405            Err(e) => Err(e),
406        }
407    }
408
409    pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
410        let path = Self::path(data_dir);
411        if let Some(parent) = path.parent() {
412            fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
413                path: parent.to_path_buf(),
414                source: e.to_string(),
415            })?;
416        }
417
418        self.updated_at_ms = now_ms();
419        let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
420            source: e.to_string(),
421        })?;
422        let (tmp_path, mut file) =
423            create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
424                path: path.clone(),
425                source: e.to_string(),
426            })?;
427        file.write_all(json.as_bytes())
428            .map_err(|e| ManifestError::Io {
429                path: tmp_path.clone(),
430                source: e.to_string(),
431            })?;
432        file.sync_all().map_err(|e| ManifestError::Io {
433            path: tmp_path.clone(),
434            source: e.to_string(),
435        })?;
436        replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
437            path: path.clone(),
438            source: e.to_string(),
439        })?;
440        sync_parent_directory(&path).map_err(|e| ManifestError::Io {
441            path: path
442                .parent()
443                .map(Path::to_path_buf)
444                .unwrap_or_else(|| path.clone()),
445            source: e.to_string(),
446        })?;
447
448        Ok(())
449    }
450
451    pub fn replace_shards_for_generation(
452        &mut self,
453        tier: TierKind,
454        embedder_id: &str,
455        db_fingerprint: &str,
456        mut shards: Vec<SemanticShardRecord>,
457    ) {
458        self.shards
459            .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
460        self.shards.append(&mut shards);
461        self.shards.sort_by(|a, b| {
462            (
463                a.tier.as_str(),
464                &a.embedder_id,
465                &a.db_fingerprint,
466                a.shard_index,
467            )
468                .cmp(&(
469                    b.tier.as_str(),
470                    &b.embedder_id,
471                    &b.db_fingerprint,
472                    b.shard_index,
473                ))
474        });
475    }
476
477    pub fn summary(
478        &self,
479        tier: TierKind,
480        embedder_id: &str,
481        db_fingerprint: &str,
482    ) -> SemanticShardSummary {
483        let mut summary = SemanticShardSummary::default();
484        let mut ready_indices = std::collections::BTreeSet::new();
485        let mut ann_ready_indices = std::collections::BTreeSet::new();
486        let mut seen_indices = std::collections::BTreeSet::new();
487        let mut seen_index_paths = std::collections::BTreeSet::new();
488        let mut seen_ann_index_paths = std::collections::BTreeSet::new();
489        let mut expected_shard_count = None;
490        let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
491        let mut generation_consistent = true;
492        for shard in self
493            .shards
494            .iter()
495            .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
496        {
497            if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
498                generation_consistent = false;
499            }
500            if !seen_indices.insert(shard.shard_index) {
501                generation_consistent = false;
502            }
503            if !semantic_shard_artifact_path_is_safe(&shard.index_path)
504                || !seen_index_paths.insert(&shard.index_path)
505            {
506                generation_consistent = false;
507            }
508            match expected_shard_count {
509                Some(expected) if expected != shard.shard_count => {
510                    generation_consistent = false;
511                }
512                None => expected_shard_count = Some(shard.shard_count),
513                _ => {}
514            }
515            let generation_metadata = (
516                shard.model_revision.as_str(),
517                shard.schema_version,
518                shard.chunking_version,
519                shard.dimension,
520                shard.total_conversations,
521                shard.quantization.as_str(),
522            );
523            match expected_generation_metadata {
524                Some(expected) if expected != generation_metadata => {
525                    generation_consistent = false;
526                }
527                None => expected_generation_metadata = Some(generation_metadata),
528                _ => {}
529            }
530            if shard.schema_version != SEMANTIC_SCHEMA_VERSION
531                || shard.chunking_version != CHUNKING_STRATEGY_VERSION
532                || shard.dimension == 0
533            {
534                generation_consistent = false;
535            }
536            summary.shard_count = summary.shard_count.max(shard.shard_count);
537            summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
538            summary.total_conversations =
539                summary.total_conversations.max(shard.total_conversations);
540            summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
541            summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
542            if shard.ready && shard.mmap_ready {
543                ready_indices.insert(shard.shard_index);
544            }
545            if shard.ready
546                && shard.mmap_ready
547                && shard.ann_ready
548                && shard.ann_size_bytes > 0
549                && let Some(ann_index_path) = shard.ann_index_path.as_deref()
550                && semantic_shard_artifact_path_is_safe(ann_index_path)
551                && seen_ann_index_paths.insert(ann_index_path)
552            {
553                ann_ready_indices.insert(shard.shard_index);
554            }
555        }
556        summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
557        summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
558        summary.complete = generation_consistent
559            && summary.shard_count > 0
560            && summary.ready_shards == summary.shard_count
561            && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
562        summary
563    }
564
565    pub fn invalidate_incompatible(
566        &mut self,
567        policy: &SemanticPolicy,
568        current_model_revision: &str,
569    ) -> usize {
570        let before = self.shards.len();
571        self.shards.retain(|shard| {
572            !matches!(
573                shard.to_policy_manifest().invalidation_action(
574                    policy,
575                    current_model_revision,
576                    &shard.embedder_id,
577                ),
578                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
579            )
580        });
581        before.saturating_sub(self.shards.len())
582    }
583
584    pub fn total_size_bytes(&self) -> u64 {
585        self.shards
586            .iter()
587            .map(|shard| shard.size_bytes)
588            .fold(0, u64::saturating_add)
589    }
590}
591
592// ─── Build checkpoint ──────────────────────────────────────────────────────
593
594/// Resumable position for an interrupted semantic build.
595///
596/// Sub-fix 2 for cass#257 added the optional `last_message_id` cursor.
597/// Resume strictly advances past this cursor when present, so that a
598/// rerun of an interrupted bounded backfill never re-embeds messages
599/// that already made it into the staged index. Pre-#257 binaries wrote
600/// checkpoints without the field (it deserializes to `None` via
601/// `#[serde(default)]`), and modern binaries fall back to the
602/// conversation offset when `last_message_id` is absent.
603#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
604pub struct BuildCheckpoint {
605    /// Which tier is being built.
606    pub tier: TierKind,
607    /// Embedder ID for this build.
608    pub embedder_id: String,
609    /// Last conversation offset processed (for pagination).
610    pub last_offset: i64,
611    /// Total documents embedded so far in this build.
612    pub docs_embedded: u64,
613    /// Total conversations processed so far.
614    pub conversations_processed: u64,
615    /// Total conversations expected (from DB at start of build).
616    pub total_conversations: u64,
617    /// DB fingerprint when this build started.
618    pub db_fingerprint: String,
619    /// Schema version for this build.
620    pub schema_version: u32,
621    /// Chunking version for this build.
622    pub chunking_version: u32,
623    /// Unix timestamp (ms) when this checkpoint was saved.
624    pub saved_at_ms: i64,
625    /// Highest canonical message PK embedded in this run so far.
626    ///
627    /// Added in cass#257 (sub-fix 2). `None` for checkpoints written
628    /// by pre-#257 binaries; new code falls back to `last_offset`
629    /// (conversation-granularity) when this is absent. New code
630    /// strictly resumes past this cursor when present.
631    #[serde(default, skip_serializing_if = "Option::is_none")]
632    pub last_message_id: Option<i64>,
633}
634
635impl BuildCheckpoint {
636    /// Progress as a percentage (0–100).
637    pub fn progress_pct(&self) -> u8 {
638        if self.total_conversations == 0 {
639            return 0;
640        }
641        let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
642        (pct as u8).min(100)
643    }
644
645    /// Whether the build is complete (all conversations processed).
646    pub fn is_complete(&self) -> bool {
647        self.conversations_processed >= self.total_conversations
648    }
649
650    /// Whether this checkpoint is still valid against the current DB and policy.
651    pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
652        self.db_fingerprint == current_db_fingerprint
653            && self.schema_version == SEMANTIC_SCHEMA_VERSION
654            && self.chunking_version == CHUNKING_STRATEGY_VERSION
655    }
656}
657
658// ─── Backlog ledger ────────────────────────────────────────────────────────
659
660/// Tracks what semantic build work remains.
661#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
662pub struct BacklogLedger {
663    /// Total conversations in the canonical DB at last check.
664    pub total_conversations: u64,
665    /// Conversations embedded in the fast tier.
666    pub fast_tier_processed: u64,
667    /// Conversations embedded in the quality tier.
668    pub quality_tier_processed: u64,
669    /// DB fingerprint when this ledger was computed.
670    pub db_fingerprint: String,
671    /// Unix timestamp (ms) when this ledger was computed.
672    pub computed_at_ms: i64,
673}
674
675impl BacklogLedger {
676    /// Remaining conversations for the fast tier.
677    pub fn fast_tier_remaining(&self) -> u64 {
678        self.total_conversations
679            .saturating_sub(self.fast_tier_processed)
680    }
681
682    /// Remaining conversations for the quality tier.
683    pub fn quality_tier_remaining(&self) -> u64 {
684        self.total_conversations
685            .saturating_sub(self.quality_tier_processed)
686    }
687
688    /// Whether either tier has outstanding work.
689    pub fn has_pending_work(&self) -> bool {
690        self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
691    }
692
693    /// Whether the ledger is current with the given DB fingerprint.
694    pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
695        self.db_fingerprint == current_db_fingerprint
696    }
697}
698
699// ─── The top-level manifest ────────────────────────────────────────────────
700
701/// Durable, atomic semantic asset manifest.
702///
703/// This is the single source of truth for what semantic assets exist, their
704/// provenance, and what work remains.  It is loaded/saved as JSON.
705#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
706pub struct SemanticManifest {
707    /// Format version — for future migrations.
708    pub manifest_version: u32,
709    /// Fast-tier vector artifact (hash embedder).
710    pub fast_tier: Option<ArtifactRecord>,
711    /// Quality-tier vector artifact (ML embedder).
712    pub quality_tier: Option<ArtifactRecord>,
713    /// HNSW accelerator index.
714    pub hnsw: Option<HnswRecord>,
715    /// Backlog / progress tracker.
716    pub backlog: BacklogLedger,
717    /// Active build checkpoint (for resuming interrupted work).
718    pub checkpoint: Option<BuildCheckpoint>,
719    /// Unix timestamp (ms) when this manifest was last written.
720    pub updated_at_ms: i64,
721}
722
723impl Default for SemanticManifest {
724    fn default() -> Self {
725        Self {
726            manifest_version: MANIFEST_FORMAT_VERSION,
727            fast_tier: None,
728            quality_tier: None,
729            hnsw: None,
730            backlog: BacklogLedger {
731                total_conversations: 0,
732                fast_tier_processed: 0,
733                quality_tier_processed: 0,
734                db_fingerprint: String::new(),
735                computed_at_ms: 0,
736            },
737            checkpoint: None,
738            updated_at_ms: 0,
739        }
740    }
741}
742
743impl SemanticManifest {
744    // ── Path helpers ───────────────────────────────────────────────────
745
746    /// Path to the manifest file.
747    pub fn path(data_dir: &Path) -> PathBuf {
748        data_dir.join("vector_index").join(MANIFEST_FILENAME)
749    }
750
751    // ── Load / Save ───────────────────────────────────────────────────
752
753    /// Load the manifest from disk.  Returns `None` if the file doesn't
754    /// exist, `Err` if it exists but is corrupt.
755    ///
756    /// **Migration notes (cass#257 sub-fix 2):** a v1 manifest written
757    /// by a pre-#257 binary loads cleanly — `last_message_id` defaults
758    /// to `None` and resume falls back to the conversation offset.
759    /// A one-shot warning surfaces on first load so an operator sees
760    /// that resume granularity is coarser than ideal until the next
761    /// checkpoint save lands and the on-disk shape upgrades.
762    pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
763        let path = Self::path(data_dir);
764        let bytes = match fs::read(&path) {
765            Ok(b) => b,
766            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
767            Err(e) => {
768                return Err(ManifestError::Io {
769                    path,
770                    source: e.to_string(),
771                });
772            }
773        };
774
775        let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
776            path: path.clone(),
777            source: e.to_string(),
778        })?;
779
780        // Forward-compatible: reject future manifest versions we can't read.
781        if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
782            return Err(ManifestError::UnsupportedVersion {
783                found: manifest.manifest_version,
784                max_supported: MANIFEST_FORMAT_VERSION,
785            });
786        }
787
788        // Backwards-compatible: a v1 manifest with an active checkpoint
789        // means a previous interrupted backfill saved without the
790        // `last_message_id` cursor. Warn so an operator monitoring an
791        // overnight run knows resume granularity is conversation-coarse
792        // until the next checkpoint save bumps the on-disk shape.
793        if manifest.manifest_version <= MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR
794            && manifest
795                .checkpoint
796                .as_ref()
797                .is_some_and(|cp| cp.last_message_id.is_none())
798        {
799            tracing::warn!(
800                manifest_version = manifest.manifest_version,
801                supported_version = MANIFEST_FORMAT_VERSION,
802                path = %path.display(),
803                "semantic checkpoint manifest predates last_message_id cursor (cass#257 sub-fix 2); resume will fall back to conversation offset until the next checkpoint save"
804            );
805        }
806
807        Ok(Some(manifest))
808    }
809
810    /// Load the manifest, returning defaults if absent or corrupt.
811    ///
812    /// Unlike [`load`], this method treats parse errors and version mismatches
813    /// as "manifest absent" — the caller gets a clean default rather than an
814    /// error.  This is the right behaviour for runtime code that must always
815    /// make progress.
816    pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
817        match Self::load(data_dir) {
818            Ok(Some(manifest)) => Ok(manifest),
819            Ok(None) => Ok(Self::default()),
820            // I/O errors are real failures — propagate the original.
821            Err(e @ ManifestError::Io { .. }) => Err(e),
822            // Parse or version errors → treat as absent.
823            Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
824                Ok(Self::default())
825            }
826            Err(e) => Err(e),
827        }
828    }
829
830    /// Atomically save the manifest to disk (write-to-temp then rename).
831    pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
832        let path = Self::path(data_dir);
833
834        // Ensure parent directory exists.
835        if let Some(parent) = path.parent() {
836            fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
837                path: parent.to_path_buf(),
838                source: e.to_string(),
839            })?;
840        }
841
842        self.updated_at_ms = now_ms();
843
844        let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
845            source: e.to_string(),
846        })?;
847
848        // Atomic write: temp file → rename.
849        let (tmp_path, mut file) =
850            create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
851                path: path.clone(),
852                source: e.to_string(),
853            })?;
854        file.write_all(json.as_bytes())
855            .map_err(|e| ManifestError::Io {
856                path: tmp_path.clone(),
857                source: e.to_string(),
858            })?;
859        file.sync_all().map_err(|e| ManifestError::Io {
860            path: tmp_path.clone(),
861            source: e.to_string(),
862        })?;
863        replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
864            path: path.clone(),
865            source: e.to_string(),
866        })?;
867        sync_parent_directory(&path).map_err(|e| ManifestError::Io {
868            path: path
869                .parent()
870                .map(Path::to_path_buf)
871                .unwrap_or_else(|| path.clone()),
872            source: e.to_string(),
873        })?;
874
875        Ok(())
876    }
877
878    // ── Readiness evaluation ──────────────────────────────────────────
879
880    /// Evaluate readiness of the fast tier.
881    pub fn fast_tier_readiness(
882        &self,
883        policy: &SemanticPolicy,
884        current_db_fingerprint: &str,
885        current_model_revision: &str,
886    ) -> TierReadiness {
887        match &self.fast_tier {
888            Some(artifact) => {
889                artifact.readiness(policy, current_db_fingerprint, current_model_revision)
890            }
891            None => {
892                // Check for an active build checkpoint for this tier.
893                if let Some(cp) = &self.checkpoint
894                    && cp.tier == TierKind::Fast
895                    && cp.is_valid(current_db_fingerprint)
896                {
897                    TierReadiness::Building {
898                        progress_pct: cp.progress_pct(),
899                    }
900                } else {
901                    TierReadiness::Missing
902                }
903            }
904        }
905    }
906
907    /// Evaluate readiness of the quality tier.
908    pub fn quality_tier_readiness(
909        &self,
910        policy: &SemanticPolicy,
911        current_db_fingerprint: &str,
912        current_model_revision: &str,
913    ) -> TierReadiness {
914        match &self.quality_tier {
915            Some(artifact) => {
916                artifact.readiness(policy, current_db_fingerprint, current_model_revision)
917            }
918            None => {
919                if let Some(cp) = &self.checkpoint
920                    && cp.tier == TierKind::Quality
921                    && cp.is_valid(current_db_fingerprint)
922                {
923                    TierReadiness::Building {
924                        progress_pct: cp.progress_pct(),
925                    }
926                } else {
927                    TierReadiness::Missing
928                }
929            }
930        }
931    }
932
933    /// Whether hybrid refinement can run right now (fast tier usable).
934    pub fn can_hybrid_search(
935        &self,
936        policy: &SemanticPolicy,
937        current_db_fingerprint: &str,
938        current_model_revision: &str,
939    ) -> bool {
940        self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
941            .is_usable()
942    }
943
944    // ── Backlog / checkpoint management ───────────────────────────────
945
946    /// Update the backlog from the canonical DB state.
947    pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
948        let fast_processed = self
949            .fast_tier
950            .as_ref()
951            .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
952            .map_or(0, |a| a.conversation_count);
953        let quality_processed = self
954            .quality_tier
955            .as_ref()
956            .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
957            .map_or(0, |a| a.conversation_count);
958
959        self.backlog = BacklogLedger {
960            total_conversations,
961            fast_tier_processed: fast_processed,
962            quality_tier_processed: quality_processed,
963            db_fingerprint: current_db_fingerprint.to_owned(),
964            computed_at_ms: now_ms(),
965        };
966    }
967
968    /// Save a build checkpoint (called periodically during backfill).
969    pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
970        self.checkpoint = Some(checkpoint);
971    }
972
973    /// Clear the build checkpoint (called when build finishes or is abandoned).
974    pub fn clear_checkpoint(&mut self) {
975        self.checkpoint = None;
976    }
977
978    /// Record a completed artifact and clear the matching checkpoint.
979    pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
980        // Clear checkpoint if it matches this tier.
981        if self
982            .checkpoint
983            .as_ref()
984            .is_some_and(|cp| cp.tier == artifact.tier)
985        {
986            self.checkpoint = None;
987        }
988
989        match artifact.tier {
990            TierKind::Fast => self.fast_tier = Some(artifact),
991            TierKind::Quality => self.quality_tier = Some(artifact),
992        }
993    }
994
995    /// Record a completed HNSW accelerator.
996    pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
997        self.hnsw = Some(hnsw);
998    }
999
1000    /// Adopt a legacy (pre-manifest) artifact if it is compatible with the
1001    /// current schema/chunking versions.  Returns `true` if adopted.
1002    #[allow(clippy::too_many_arguments)]
1003    pub fn adopt_legacy_artifact(
1004        &mut self,
1005        tier: TierKind,
1006        embedder_id: &str,
1007        model_revision: &str,
1008        dimension: usize,
1009        doc_count: u64,
1010        conversation_count: u64,
1011        db_fingerprint: &str,
1012        index_path: &str,
1013        size_bytes: u64,
1014    ) -> bool {
1015        let record = ArtifactRecord {
1016            tier,
1017            embedder_id: embedder_id.to_owned(),
1018            model_revision: model_revision.to_owned(),
1019            schema_version: SEMANTIC_SCHEMA_VERSION,
1020            chunking_version: CHUNKING_STRATEGY_VERSION,
1021            dimension,
1022            doc_count,
1023            conversation_count,
1024            db_fingerprint: db_fingerprint.to_owned(),
1025            index_path: index_path.to_owned(),
1026            size_bytes,
1027            started_at_ms: 0,
1028            completed_at_ms: now_ms(),
1029            ready: true,
1030        };
1031
1032        match tier {
1033            TierKind::Fast => self.fast_tier = Some(record),
1034            TierKind::Quality => self.quality_tier = Some(record),
1035        }
1036        true
1037    }
1038
1039    /// Invalidate artifacts that are incompatible with the current policy.
1040    /// Returns the number of artifacts invalidated.
1041    ///
1042    /// **Note**: This detects schema version, chunking version, and mode
1043    /// incompatibilities.  It does NOT detect embedder changes (e.g., minilm →
1044    /// snowflake) because the policy stores short names while artifacts store
1045    /// full registry IDs.  Callers who need embedder-change detection should
1046    /// compare `artifact.embedder_id` against the expected ID from the
1047    /// embedder registry.
1048    pub fn invalidate_incompatible(
1049        &mut self,
1050        policy: &SemanticPolicy,
1051        current_model_revision: &str,
1052    ) -> usize {
1053        let mut count = 0;
1054
1055        if let Some(ref artifact) = self.fast_tier {
1056            let pm = artifact.to_policy_manifest();
1057            if matches!(
1058                pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1059                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1060            ) {
1061                self.fast_tier = None;
1062                count += 1;
1063            }
1064        }
1065
1066        if let Some(ref artifact) = self.quality_tier {
1067            let pm = artifact.to_policy_manifest();
1068            if matches!(
1069                pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1070                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1071            ) {
1072                self.quality_tier = None;
1073                count += 1;
1074            }
1075        }
1076
1077        // HNSW depends on the base tier — invalidate if base is gone.
1078        if let Some(ref hnsw) = self.hnsw {
1079            let base_gone = match hnsw.base_tier {
1080                TierKind::Fast => self.fast_tier.is_none(),
1081                TierKind::Quality => self.quality_tier.is_none(),
1082            };
1083            if base_gone {
1084                self.hnsw = None;
1085                count += 1;
1086            }
1087        }
1088
1089        // Invalidate checkpoint if its schema/chunking is wrong.
1090        if let Some(ref cp) = self.checkpoint
1091            && (cp.schema_version != policy.semantic_schema_version
1092                || cp.chunking_version != policy.chunking_strategy_version)
1093        {
1094            self.checkpoint = None;
1095        }
1096
1097        count
1098    }
1099
1100    /// Total disk usage of all semantic artifacts (bytes).
1101    pub fn total_size_bytes(&self) -> u64 {
1102        let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1103        let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1104        let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1105        fast + quality + hnsw
1106    }
1107
1108    /// Total disk usage in megabytes (rounded up).
1109    pub fn total_size_mb(&self) -> u64 {
1110        self.total_size_bytes().div_ceil(1_048_576)
1111    }
1112}
1113
1114// ─── Errors ────────────────────────────────────────────────────────────────
1115
1116#[derive(Debug)]
1117pub enum ManifestError {
1118    Io { path: PathBuf, source: String },
1119    Parse { path: PathBuf, source: String },
1120    Serialize { source: String },
1121    UnsupportedVersion { found: u32, max_supported: u32 },
1122}
1123
1124impl std::fmt::Display for ManifestError {
1125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1126        match self {
1127            Self::Io { path, source } => {
1128                write!(f, "manifest I/O error at {}: {source}", path.display())
1129            }
1130            Self::Parse { path, source } => {
1131                write!(f, "manifest parse error at {}: {source}", path.display())
1132            }
1133            Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1134            Self::UnsupportedVersion {
1135                found,
1136                max_supported,
1137            } => write!(
1138                f,
1139                "manifest version {found} is newer than supported version {max_supported}"
1140            ),
1141        }
1142    }
1143}
1144
1145impl std::error::Error for ManifestError {}
1146
1147// ─── Helpers ───────────────────────────────────────────────────────────────
1148
1149fn now_ms() -> i64 {
1150    SystemTime::now()
1151        .duration_since(UNIX_EPOCH)
1152        .map(|d| d.as_millis() as i64)
1153        .unwrap_or(0)
1154}
1155
1156fn unique_manifest_temp_path(path: &Path, attempt: u32, random: u64) -> PathBuf {
1157    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1158
1159    let file_name = path
1160        .file_name()
1161        .and_then(|name| name.to_str())
1162        .unwrap_or(MANIFEST_FILENAME);
1163    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1164    path.with_file_name(format!(
1165        ".{file_name}.tmp.{attempt}.{}.{}.{random:016x}",
1166        now_ms(),
1167        nonce
1168    ))
1169}
1170
1171fn create_unique_manifest_temp_file(path: &Path) -> std::io::Result<(PathBuf, fs::File)> {
1172    for attempt in 0..100 {
1173        let random = random_manifest_path_nonce()?;
1174        let tmp_path = unique_manifest_temp_path(path, attempt, random);
1175        match OpenOptions::new()
1176            .write(true)
1177            .create_new(true)
1178            .open(&tmp_path)
1179        {
1180            Ok(file) => return Ok((tmp_path, file)),
1181            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
1182            Err(err) => return Err(err),
1183        }
1184    }
1185
1186    Err(std::io::Error::new(
1187        std::io::ErrorKind::AlreadyExists,
1188        format!(
1189            "could not create a unique temporary manifest file for {} after 100 attempts",
1190            path.display()
1191        ),
1192    ))
1193}
1194
1195#[cfg(windows)]
1196fn unique_manifest_backup_path(path: &Path) -> std::io::Result<PathBuf> {
1197    let random = random_manifest_path_nonce()?;
1198    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1199
1200    let file_name = path
1201        .file_name()
1202        .and_then(|name| name.to_str())
1203        .unwrap_or(MANIFEST_FILENAME);
1204    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1205    Ok(path.with_file_name(format!(
1206        ".{file_name}.bak.{}.{nonce}.{random:016x}",
1207        now_ms()
1208    )))
1209}
1210
1211fn random_manifest_path_nonce() -> std::io::Result<u64> {
1212    let mut random_bytes = [0u8; 8];
1213    SystemRandom::new()
1214        .fill(&mut random_bytes)
1215        .map_err(|_| std::io::Error::other("failed to generate manifest temp path nonce"))?;
1216    Ok(u64::from_le_bytes(random_bytes))
1217}
1218
1219fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1220    #[cfg(windows)]
1221    {
1222        match fs::rename(temp_path, final_path) {
1223            Ok(()) => sync_parent_directory(final_path),
1224            Err(first_err)
1225                if replacement_path_entry_exists(final_path)?
1226                    && matches!(
1227                        first_err.kind(),
1228                        std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1229                    ) =>
1230            {
1231                let backup_path = unique_manifest_backup_path(final_path)?;
1232                fs::rename(final_path, &backup_path).map_err(|backup_err| {
1233                    let _ = fs::remove_file(temp_path);
1234                    std::io::Error::other(format!(
1235                        "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1236                        backup_path.display(),
1237                        final_path.display(),
1238                        first_err,
1239                        backup_err
1240                    ))
1241                })?;
1242                match fs::rename(temp_path, final_path) {
1243                    Ok(()) => {
1244                        let _ = fs::remove_file(&backup_path);
1245                        sync_parent_directory(final_path)
1246                    }
1247                    Err(second_err) => match fs::rename(&backup_path, final_path) {
1248                        Ok(()) => {
1249                            let _ = fs::remove_file(temp_path);
1250                            sync_parent_directory(final_path)?;
1251                            Err(std::io::Error::other(format!(
1252                                "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1253                                final_path.display(),
1254                                temp_path.display(),
1255                                first_err,
1256                                second_err
1257                            )))
1258                        }
1259                        Err(restore_err) => Err(std::io::Error::other(format!(
1260                            "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1261                            final_path.display(),
1262                            temp_path.display(),
1263                            first_err,
1264                            second_err,
1265                            restore_err,
1266                            temp_path.display()
1267                        ))),
1268                    },
1269                }
1270            }
1271            Err(err) => Err(err),
1272        }
1273    }
1274
1275    #[cfg(not(windows))]
1276    {
1277        fs::rename(temp_path, final_path)
1278    }
1279}
1280
1281#[cfg(any(windows, test))]
1282fn replacement_path_entry_exists(path: &Path) -> std::io::Result<bool> {
1283    match fs::symlink_metadata(path) {
1284        Ok(_) => Ok(true),
1285        Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(false),
1286        Err(err) => Err(std::io::Error::new(
1287            err.kind(),
1288            format!(
1289                "failed inspecting semantic manifest replacement target {}: {err}",
1290                path.display()
1291            ),
1292        )),
1293    }
1294}
1295
1296#[cfg(not(windows))]
1297fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1298    let Some(parent) = path.parent() else {
1299        return Ok(());
1300    };
1301    let directory = fs::File::open(parent)?;
1302    directory.sync_all()
1303}
1304
1305#[cfg(windows)]
1306fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1307    Ok(())
1308}
1309
1310// ─── Tests ─────────────────────────────────────────────────────────────────
1311
1312#[cfg(test)]
1313mod tests {
1314    use super::*;
1315    use crate::search::policy::SemanticPolicy;
1316
1317    fn test_policy() -> SemanticPolicy {
1318        SemanticPolicy::compiled_defaults()
1319    }
1320
1321    fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1322        ArtifactRecord {
1323            tier,
1324            embedder_id: match tier {
1325                TierKind::Fast => "fnv1a-384".to_owned(),
1326                TierKind::Quality => "minilm-384".to_owned(),
1327            },
1328            model_revision: "abc123".to_owned(),
1329            schema_version: SEMANTIC_SCHEMA_VERSION,
1330            chunking_version: CHUNKING_STRATEGY_VERSION,
1331            dimension: 384,
1332            doc_count: 1000,
1333            conversation_count: 250,
1334            db_fingerprint: "fp-1234".to_owned(),
1335            index_path: format!(
1336                "vector_index/index-{}.fsvi",
1337                match tier {
1338                    TierKind::Fast => "fnv1a-384",
1339                    TierKind::Quality => "minilm-384",
1340                }
1341            ),
1342            size_bytes: 150_000,
1343            started_at_ms: 1_700_000_000_000,
1344            completed_at_ms: 1_700_000_060_000,
1345            ready,
1346        }
1347    }
1348
1349    fn test_hnsw() -> HnswRecord {
1350        HnswRecord {
1351            base_tier: TierKind::Quality,
1352            embedder_id: "minilm-384".to_owned(),
1353            ef_search: 128,
1354            index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1355            size_bytes: 50_000,
1356            built_at_ms: 1_700_000_070_000,
1357            ready: true,
1358        }
1359    }
1360
1361    fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1362        SemanticShardRecord {
1363            tier: TierKind::Fast,
1364            embedder_id: "fnv1a-384".to_owned(),
1365            model_revision: "hash".to_owned(),
1366            schema_version: SEMANTIC_SCHEMA_VERSION,
1367            chunking_version: CHUNKING_STRATEGY_VERSION,
1368            dimension: 384,
1369            shard_index,
1370            shard_count,
1371            doc_count: 25,
1372            total_conversations: 10,
1373            db_fingerprint: "fp-sharded".to_owned(),
1374            index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1375            quantization: "f16".to_owned(),
1376            mmap_ready: true,
1377            ann_index_path: None,
1378            ann_size_bytes: 0,
1379            ann_ready: false,
1380            size_bytes: 4096,
1381            started_at_ms: 1_700_000_080_000,
1382            completed_at_ms: 1_700_000_081_000,
1383            ready,
1384        }
1385    }
1386
1387    fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1388        BuildCheckpoint {
1389            tier,
1390            embedder_id: "minilm-384".to_owned(),
1391            last_offset: 500,
1392            docs_embedded: 3000,
1393            conversations_processed: 500,
1394            total_conversations: 1000,
1395            db_fingerprint: "fp-1234".to_owned(),
1396            schema_version: SEMANTIC_SCHEMA_VERSION,
1397            chunking_version: CHUNKING_STRATEGY_VERSION,
1398            saved_at_ms: 1_700_000_030_000,
1399            last_message_id: None,
1400        }
1401    }
1402
1403    #[derive(Debug, Clone, Copy)]
1404    enum ExpectedTierReadiness {
1405        Ready,
1406        Stale,
1407        Incompatible,
1408        Building(u8),
1409    }
1410
1411    fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1412
1413    type TierReadinessCase = (
1414        &'static str,
1415        TierKind,
1416        bool,
1417        &'static str,
1418        &'static str,
1419        fn(&mut ArtifactRecord),
1420        ExpectedTierReadiness,
1421    );
1422
1423    fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1424        artifact.schema_version = 0;
1425    }
1426
1427    fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1428        match expected {
1429            ExpectedTierReadiness::Ready => {
1430                assert_eq!(actual, TierReadiness::Ready, "{label}");
1431            }
1432            ExpectedTierReadiness::Stale => {
1433                assert!(
1434                    matches!(actual, TierReadiness::Stale { .. }),
1435                    "{label}: {actual:?}"
1436                );
1437            }
1438            ExpectedTierReadiness::Incompatible => {
1439                assert!(
1440                    matches!(actual, TierReadiness::Incompatible { .. }),
1441                    "{label}: {actual:?}"
1442                );
1443            }
1444            ExpectedTierReadiness::Building(progress_pct) => {
1445                assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1446            }
1447        }
1448    }
1449
1450    // ── Manifest load/save round-trip ──────────────────────────────────
1451
1452    #[test]
1453    fn manifest_round_trip_via_disk() {
1454        let temp = tempfile::tempdir().unwrap();
1455        let mut manifest = SemanticManifest {
1456            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1457            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1458            hnsw: Some(test_hnsw()),
1459            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1460            backlog: BacklogLedger {
1461                total_conversations: 2000,
1462                fast_tier_processed: 1000,
1463                quality_tier_processed: 500,
1464                db_fingerprint: "fp-1234".to_owned(),
1465                computed_at_ms: 1_700_000_000_000,
1466            },
1467            ..Default::default()
1468        };
1469
1470        manifest.save(temp.path()).unwrap();
1471        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1472
1473        assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1474        assert!(loaded.fast_tier.is_some());
1475        assert!(loaded.quality_tier.is_some());
1476        assert!(loaded.hnsw.is_some());
1477        assert!(loaded.checkpoint.is_some());
1478        assert_eq!(loaded.backlog.total_conversations, 2000);
1479        assert!(loaded.updated_at_ms > 0);
1480    }
1481
1482    #[test]
1483    fn manifest_save_overwrites_existing_file() {
1484        let temp = tempfile::tempdir().unwrap();
1485        let mut first = SemanticManifest {
1486            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1487            ..Default::default()
1488        };
1489        first.save(temp.path()).unwrap();
1490
1491        let mut second = SemanticManifest {
1492            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1493            backlog: BacklogLedger {
1494                total_conversations: 99,
1495                fast_tier_processed: 0,
1496                quality_tier_processed: 99,
1497                db_fingerprint: "fp-overwrite".to_owned(),
1498                computed_at_ms: 1_700_000_000_123,
1499            },
1500            ..Default::default()
1501        };
1502        second.save(temp.path()).unwrap();
1503
1504        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1505        assert!(loaded.fast_tier.is_none());
1506        assert!(loaded.quality_tier.is_some());
1507        assert_eq!(loaded.backlog.total_conversations, 99);
1508    }
1509
1510    #[cfg(unix)]
1511    #[test]
1512    fn manifest_replacement_path_entry_exists_detects_dangling_symlink() -> Result<(), String> {
1513        use std::os::unix::fs::symlink;
1514
1515        let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1516        let link_path = SemanticManifest::path(temp.path());
1517        let manifest_dir = link_path
1518            .parent()
1519            .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1520        fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1521        let missing_target = manifest_dir.join("missing-semantic-manifest.json");
1522
1523        symlink(&missing_target, &link_path).map_err(|e| e.to_string())?;
1524
1525        if link_path.exists() {
1526            return Err("dangling manifest symlink unexpectedly resolved".to_string());
1527        }
1528        if !replacement_path_entry_exists(&link_path).map_err(|e| e.to_string())? {
1529            return Err(format!(
1530                "semantic manifest replacement entry check missed dangling symlink {}",
1531                link_path.display()
1532            ));
1533        }
1534
1535        Ok(())
1536    }
1537
1538    #[test]
1539    fn manifest_temp_file_creation_is_exclusive_and_unique() -> Result<(), String> {
1540        let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1541        let final_path = SemanticManifest::path(temp.path());
1542        let manifest_dir = final_path
1543            .parent()
1544            .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1545        fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1546
1547        let (first_path, mut first_file) =
1548            create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1549        first_file.write_all(b"first").map_err(|e| e.to_string())?;
1550        let (second_path, mut second_file) =
1551            create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1552        second_file
1553            .write_all(b"second")
1554            .map_err(|e| e.to_string())?;
1555
1556        if first_path == second_path {
1557            return Err("exclusive temp creation reused the same path".to_string());
1558        }
1559        if !first_path.exists() {
1560            return Err(format!(
1561                "first temp file is missing: {}",
1562                first_path.display()
1563            ));
1564        }
1565        if !second_path.exists() {
1566            return Err(format!(
1567                "second temp file is missing: {}",
1568                second_path.display()
1569            ));
1570        }
1571        if first_path.parent() != Some(manifest_dir) {
1572            return Err(format!(
1573                "first temp path escaped manifest directory: {}",
1574                first_path.display()
1575            ));
1576        }
1577        if second_path.parent() != Some(manifest_dir) {
1578            return Err(format!(
1579                "second temp path escaped manifest directory: {}",
1580                second_path.display()
1581            ));
1582        }
1583
1584        Ok(())
1585    }
1586
1587    #[test]
1588    fn manifest_load_missing_returns_none() {
1589        let temp = tempfile::tempdir().unwrap();
1590        let loaded = SemanticManifest::load(temp.path()).unwrap();
1591        assert!(loaded.is_none());
1592    }
1593
1594    #[test]
1595    fn manifest_load_or_default_returns_defaults() {
1596        let temp = tempfile::tempdir().unwrap();
1597        let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1598        assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1599        assert!(manifest.fast_tier.is_none());
1600        assert!(manifest.quality_tier.is_none());
1601    }
1602
1603    #[test]
1604    fn manifest_load_corrupt_returns_parse_error() {
1605        let temp = tempfile::tempdir().unwrap();
1606        let path = SemanticManifest::path(temp.path());
1607        fs::create_dir_all(path.parent().unwrap()).unwrap();
1608        fs::write(&path, b"not json").unwrap();
1609
1610        let result = SemanticManifest::load(temp.path());
1611        assert!(matches!(result, Err(ManifestError::Parse { .. })));
1612    }
1613
1614    #[test]
1615    fn manifest_load_future_version_returns_error() {
1616        let temp = tempfile::tempdir().unwrap();
1617        let path = SemanticManifest::path(temp.path());
1618        fs::create_dir_all(path.parent().unwrap()).unwrap();
1619
1620        let manifest = SemanticManifest {
1621            manifest_version: MANIFEST_FORMAT_VERSION + 1,
1622            ..Default::default()
1623        };
1624        let json = serde_json::to_string(&manifest).unwrap();
1625        fs::write(&path, json).unwrap();
1626
1627        let result = SemanticManifest::load(temp.path());
1628        assert!(matches!(
1629            result,
1630            Err(ManifestError::UnsupportedVersion { .. })
1631        ));
1632    }
1633
1634    // ── Tier readiness (table-driven) ──────────────────────────────────
1635
1636    #[test]
1637    fn tier_readiness_cases() {
1638        let policy = test_policy();
1639        let db_fp = "fp-1234";
1640        let model_rev = "abc123";
1641        let cases: &[TierReadinessCase] = &[
1642            (
1643                "ready artifact with matching fingerprint",
1644                TierKind::Fast,
1645                true,
1646                db_fp,
1647                model_rev,
1648                no_artifact_mutation,
1649                ExpectedTierReadiness::Ready,
1650            ),
1651            (
1652                "ready artifact with changed DB fingerprint",
1653                TierKind::Fast,
1654                true,
1655                "different-fp",
1656                model_rev,
1657                no_artifact_mutation,
1658                ExpectedTierReadiness::Stale,
1659            ),
1660            (
1661                "ready artifact with changed model revision",
1662                TierKind::Quality,
1663                true,
1664                db_fp,
1665                "new-revision",
1666                no_artifact_mutation,
1667                ExpectedTierReadiness::Stale,
1668            ),
1669            (
1670                "schema version mismatch",
1671                TierKind::Quality,
1672                true,
1673                db_fp,
1674                model_rev,
1675                set_schema_version_to_zero,
1676                ExpectedTierReadiness::Incompatible,
1677            ),
1678            (
1679                "not yet published artifact",
1680                TierKind::Fast,
1681                false,
1682                db_fp,
1683                model_rev,
1684                no_artifact_mutation,
1685                ExpectedTierReadiness::Building(100),
1686            ),
1687        ];
1688
1689        for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1690            let mut artifact = test_artifact(*tier, *ready);
1691            mutate(&mut artifact);
1692            assert_tier_readiness(
1693                artifact.readiness(&policy, current_db_fp, current_model_rev),
1694                *expected,
1695                label,
1696            );
1697        }
1698    }
1699
1700    // ── Manifest-level readiness ───────────────────────────────────────
1701
1702    #[test]
1703    fn manifest_tier_readiness_missing() {
1704        let manifest = SemanticManifest::default();
1705        let policy = test_policy();
1706        assert_eq!(
1707            manifest.fast_tier_readiness(&policy, "fp", "rev"),
1708            TierReadiness::Missing,
1709        );
1710        assert_eq!(
1711            manifest.quality_tier_readiness(&policy, "fp", "rev"),
1712            TierReadiness::Missing,
1713        );
1714    }
1715
1716    #[test]
1717    fn manifest_tier_readiness_with_checkpoint() {
1718        let manifest = SemanticManifest {
1719            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1720            ..Default::default()
1721        };
1722
1723        let policy = test_policy();
1724        // Fast tier has no checkpoint → Missing
1725        assert_eq!(
1726            manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1727            TierReadiness::Missing,
1728        );
1729        // Quality tier has a valid checkpoint → Building
1730        assert!(matches!(
1731            manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1732            TierReadiness::Building { progress_pct: 50 },
1733        ));
1734    }
1735
1736    #[test]
1737    fn manifest_tier_readiness_checkpoint_invalid_db() {
1738        let manifest = SemanticManifest {
1739            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1740            ..Default::default()
1741        };
1742
1743        let policy = test_policy();
1744        // Checkpoint DB doesn't match → Missing (checkpoint invalid)
1745        assert_eq!(
1746            manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1747            TierReadiness::Missing,
1748        );
1749    }
1750
1751    // ── Hybrid search check ────────────────────────────────────────────
1752
1753    #[test]
1754    fn can_hybrid_search_requires_usable_fast_tier() {
1755        let policy = test_policy();
1756        let db_fp = "fp-1234";
1757        let rev = "abc123";
1758
1759        // No fast tier → can't hybrid
1760        let manifest = SemanticManifest::default();
1761        assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1762
1763        // Fast tier present → can hybrid
1764        let manifest = SemanticManifest {
1765            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1766            ..Default::default()
1767        };
1768        assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1769    }
1770
1771    // ── Backlog ledger ─────────────────────────────────────────────────
1772
1773    #[test]
1774    fn backlog_remaining_and_pending() {
1775        let ledger = BacklogLedger {
1776            total_conversations: 1000,
1777            fast_tier_processed: 800,
1778            quality_tier_processed: 300,
1779            db_fingerprint: "fp".to_owned(),
1780            computed_at_ms: 0,
1781        };
1782
1783        assert_eq!(ledger.fast_tier_remaining(), 200);
1784        assert_eq!(ledger.quality_tier_remaining(), 700);
1785        assert!(ledger.has_pending_work());
1786        assert!(ledger.is_current("fp"));
1787        assert!(!ledger.is_current("other"));
1788    }
1789
1790    #[test]
1791    fn backlog_no_pending_when_fully_processed() {
1792        let ledger = BacklogLedger {
1793            total_conversations: 500,
1794            fast_tier_processed: 500,
1795            quality_tier_processed: 500,
1796            db_fingerprint: "fp".to_owned(),
1797            computed_at_ms: 0,
1798        };
1799
1800        assert_eq!(ledger.fast_tier_remaining(), 0);
1801        assert_eq!(ledger.quality_tier_remaining(), 0);
1802        assert!(!ledger.has_pending_work());
1803    }
1804
1805    // ── Build checkpoint ───────────────────────────────────────────────
1806
1807    #[test]
1808    fn checkpoint_progress_and_completion() {
1809        let cp = test_checkpoint(TierKind::Quality);
1810        assert_eq!(cp.progress_pct(), 50);
1811        assert!(!cp.is_complete());
1812        assert!(cp.is_valid("fp-1234"));
1813        assert!(!cp.is_valid("other-fp"));
1814
1815        // Complete checkpoint
1816        let mut cp = test_checkpoint(TierKind::Quality);
1817        cp.conversations_processed = 1000;
1818        assert_eq!(cp.progress_pct(), 100);
1819        assert!(cp.is_complete());
1820    }
1821
1822    #[test]
1823    fn checkpoint_zero_total_gives_zero_pct() {
1824        let mut cp = test_checkpoint(TierKind::Fast);
1825        cp.total_conversations = 0;
1826        cp.conversations_processed = 0;
1827        assert_eq!(cp.progress_pct(), 0);
1828    }
1829
1830    // ── Publish and clear ──────────────────────────────────────────────
1831
1832    #[test]
1833    fn publish_artifact_clears_matching_checkpoint() {
1834        let mut manifest = SemanticManifest {
1835            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1836            ..Default::default()
1837        };
1838
1839        manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1840        assert!(manifest.checkpoint.is_none());
1841        assert!(manifest.quality_tier.is_some());
1842    }
1843
1844    #[test]
1845    fn publish_artifact_keeps_non_matching_checkpoint() {
1846        let mut manifest = SemanticManifest {
1847            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1848            ..Default::default()
1849        };
1850
1851        manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1852        assert!(manifest.checkpoint.is_some()); // Quality checkpoint survives
1853        assert!(manifest.fast_tier.is_some());
1854    }
1855
1856    // ── Refresh backlog ────────────────────────────────────────────────
1857
1858    #[test]
1859    fn refresh_backlog_computes_from_ready_artifacts() {
1860        let mut manifest = SemanticManifest {
1861            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1862            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1863            ..Default::default()
1864        };
1865
1866        manifest.refresh_backlog(2000, "fp-1234");
1867        assert_eq!(manifest.backlog.total_conversations, 2000);
1868        assert_eq!(manifest.backlog.fast_tier_processed, 250);
1869        assert_eq!(manifest.backlog.quality_tier_processed, 250);
1870    }
1871
1872    #[test]
1873    fn refresh_backlog_ignores_stale_artifacts() {
1874        let mut manifest = SemanticManifest {
1875            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1876            ..Default::default()
1877        };
1878
1879        // DB fingerprint doesn't match → artifact not counted
1880        manifest.refresh_backlog(2000, "different-fp");
1881        assert_eq!(manifest.backlog.fast_tier_processed, 0);
1882    }
1883
1884    // ── Invalidation ───────────────────────────────────────────────────
1885
1886    #[test]
1887    fn invalidate_incompatible_removes_schema_mismatch() {
1888        let mut artifact = test_artifact(TierKind::Quality, true);
1889        artifact.schema_version = 0; // mismatch
1890        let mut manifest = SemanticManifest {
1891            quality_tier: Some(artifact),
1892            hnsw: Some(test_hnsw()), // depends on quality tier
1893            ..Default::default()
1894        };
1895
1896        let policy = test_policy();
1897        let count = manifest.invalidate_incompatible(&policy, "abc123");
1898
1899        assert_eq!(count, 2); // quality + hnsw
1900        assert!(manifest.quality_tier.is_none());
1901        assert!(manifest.hnsw.is_none());
1902    }
1903
1904    #[test]
1905    fn invalidate_incompatible_keeps_compatible() {
1906        let mut manifest = SemanticManifest {
1907            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1908            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1909            ..Default::default()
1910        };
1911
1912        let policy = test_policy();
1913        let count = manifest.invalidate_incompatible(&policy, "abc123");
1914
1915        assert_eq!(count, 0);
1916        assert!(manifest.fast_tier.is_some());
1917        assert!(manifest.quality_tier.is_some());
1918    }
1919
1920    // ── Legacy adoption ────────────────────────────────────────────────
1921
1922    #[test]
1923    fn adopt_legacy_artifact() {
1924        let mut manifest = SemanticManifest::default();
1925        let doc_count = 500;
1926        let conversation_count = 125;
1927        let index_path = "vector_index/index-fnv1a-384.fsvi";
1928        let db_fingerprint = "fp-old";
1929        let size_bytes = 75_000;
1930        let adopted = manifest.adopt_legacy_artifact(
1931            TierKind::Fast,
1932            "fnv1a-384",
1933            "hash",
1934            384,
1935            doc_count,
1936            conversation_count,
1937            db_fingerprint,
1938            index_path,
1939            size_bytes,
1940        );
1941
1942        assert!(adopted);
1943        let fast = manifest.fast_tier.as_ref().unwrap();
1944        assert_eq!(fast.embedder_id, "fnv1a-384");
1945        assert!(fast.ready);
1946        assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1947    }
1948
1949    // ── Size accounting ────────────────────────────────────────────────
1950
1951    #[test]
1952    fn total_size_accounts_for_all_artifacts() {
1953        let manifest = SemanticManifest {
1954            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1955            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1956            hnsw: Some(test_hnsw()),
1957            ..Default::default()
1958        };
1959
1960        assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1961        assert_eq!(manifest.total_size_mb(), 1); // 350KB rounds up to 1MB
1962    }
1963
1964    #[test]
1965    fn total_size_empty_is_zero() {
1966        let manifest = SemanticManifest::default();
1967        assert_eq!(manifest.total_size_bytes(), 0);
1968        assert_eq!(manifest.total_size_mb(), 0);
1969    }
1970
1971    // ── JSON round-trip ────────────────────────────────────────────────
1972
1973    #[test]
1974    fn manifest_json_round_trip() {
1975        let manifest = SemanticManifest {
1976            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1977            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1978            hnsw: Some(test_hnsw()),
1979            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1980            ..Default::default()
1981        };
1982
1983        let json = serde_json::to_string_pretty(&manifest).unwrap();
1984        let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
1985        assert_eq!(deser.fast_tier, manifest.fast_tier);
1986        assert_eq!(deser.quality_tier, manifest.quality_tier);
1987        assert_eq!(deser.hnsw, manifest.hnsw);
1988        assert_eq!(deser.checkpoint, manifest.checkpoint);
1989    }
1990
1991    // ── Shard sidecar manifest ─────────────────────────────────────────
1992
1993    #[test]
1994    fn shard_manifest_round_trip_via_sidecar() {
1995        let temp = tempfile::tempdir().unwrap();
1996        let mut shards = SemanticShardManifest::default();
1997        shards.replace_shards_for_generation(
1998            TierKind::Fast,
1999            "fnv1a-384",
2000            "fp-sharded",
2001            vec![test_shard(1, 2, true), test_shard(0, 2, true)],
2002        );
2003
2004        shards.save(temp.path()).unwrap();
2005        let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
2006
2007        assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
2008        assert_eq!(loaded.shards.len(), 2);
2009        assert_eq!(loaded.shards[0].shard_index, 0);
2010        assert_eq!(loaded.shards[1].shard_index, 1);
2011        assert!(loaded.updated_at_ms > 0);
2012    }
2013
2014    #[test]
2015    fn shard_summary_requires_every_ready_shard() {
2016        let mut shards = SemanticShardManifest::default();
2017        shards.replace_shards_for_generation(
2018            TierKind::Fast,
2019            "fnv1a-384",
2020            "fp-sharded",
2021            vec![test_shard(0, 3, true), test_shard(2, 3, true)],
2022        );
2023
2024        let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2025        assert_eq!(partial.shard_count, 3);
2026        assert_eq!(partial.ready_shards, 2);
2027        assert!(!partial.complete);
2028
2029        shards.replace_shards_for_generation(
2030            TierKind::Fast,
2031            "fnv1a-384",
2032            "fp-sharded",
2033            vec![
2034                test_shard(0, 3, true),
2035                test_shard(1, 3, true),
2036                test_shard(2, 3, true),
2037            ],
2038        );
2039
2040        let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2041        assert_eq!(complete.ready_shards, 3);
2042        assert!(complete.complete);
2043        assert_eq!(complete.doc_count, 75);
2044        assert_eq!(complete.total_conversations, 10);
2045    }
2046
2047    #[test]
2048    fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
2049        let mut non_mmap = test_shard(0, 1, true);
2050        non_mmap.mmap_ready = false;
2051        let mut shards = SemanticShardManifest::default();
2052        shards.replace_shards_for_generation(
2053            TierKind::Fast,
2054            "fnv1a-384",
2055            "fp-sharded",
2056            vec![non_mmap],
2057        );
2058
2059        let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2060        assert_eq!(non_mmap_summary.ready_shards, 0);
2061        assert!(!non_mmap_summary.complete);
2062
2063        let mut inconsistent = test_shard(1, 3, true);
2064        inconsistent.ann_ready = true;
2065        inconsistent.ann_index_path = None;
2066        inconsistent.ann_size_bytes = 4096;
2067        shards.replace_shards_for_generation(
2068            TierKind::Fast,
2069            "fnv1a-384",
2070            "fp-sharded",
2071            vec![test_shard(0, 2, true), inconsistent],
2072        );
2073
2074        let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2075        assert_eq!(inconsistent_summary.shard_count, 3);
2076        assert_eq!(inconsistent_summary.ready_shards, 2);
2077        assert_eq!(inconsistent_summary.ann_ready_shards, 0);
2078        assert!(!inconsistent_summary.complete);
2079
2080        shards.replace_shards_for_generation(
2081            TierKind::Fast,
2082            "fnv1a-384",
2083            "fp-sharded",
2084            vec![
2085                test_shard(0, 2, true),
2086                test_shard(1, 2, true),
2087                test_shard(1, 2, false),
2088            ],
2089        );
2090        let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2091        assert_eq!(duplicate_summary.shard_count, 2);
2092        assert_eq!(duplicate_summary.ready_shards, 2);
2093        assert!(
2094            !duplicate_summary.complete,
2095            "duplicate shard indexes must not summarize as a complete generation"
2096        );
2097
2098        let mut duplicate_path = test_shard(1, 2, true);
2099        duplicate_path.index_path = test_shard(0, 2, true).index_path;
2100        shards.replace_shards_for_generation(
2101            TierKind::Fast,
2102            "fnv1a-384",
2103            "fp-sharded",
2104            vec![test_shard(0, 2, true), duplicate_path],
2105        );
2106        let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2107        assert_eq!(duplicate_path_summary.shard_count, 2);
2108        assert_eq!(duplicate_path_summary.ready_shards, 2);
2109        assert!(
2110            !duplicate_path_summary.complete,
2111            "duplicate shard index paths must not summarize as a complete generation"
2112        );
2113
2114        let mut blank_path = test_shard(0, 1, true);
2115        blank_path.index_path.clear();
2116        shards.replace_shards_for_generation(
2117            TierKind::Fast,
2118            "fnv1a-384",
2119            "fp-sharded",
2120            vec![blank_path],
2121        );
2122        let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2123        assert_eq!(blank_path_summary.shard_count, 1);
2124        assert_eq!(blank_path_summary.ready_shards, 1);
2125        assert!(
2126            !blank_path_summary.complete,
2127            "blank shard index paths must not summarize as complete"
2128        );
2129
2130        for unsafe_path in [
2131            tempfile::tempdir()
2132                .unwrap()
2133                .path()
2134                .join("outside.fsvi")
2135                .to_string_lossy()
2136                .to_string(),
2137            "vector_index/shards/../outside.fsvi".to_string(),
2138            "./vector_index/shards/fast/hash.fsvi".to_string(),
2139            " vector_index/shards/fast/hash.fsvi".to_string(),
2140        ] {
2141            let mut unsafe_shard = test_shard(0, 1, true);
2142            unsafe_shard.index_path = unsafe_path;
2143            shards.replace_shards_for_generation(
2144                TierKind::Fast,
2145                "fnv1a-384",
2146                "fp-sharded",
2147                vec![unsafe_shard],
2148            );
2149            let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2150            assert_eq!(unsafe_path_summary.shard_count, 1);
2151            assert_eq!(unsafe_path_summary.ready_shards, 1);
2152            assert!(
2153                !unsafe_path_summary.complete,
2154                "unsafe shard index paths must not summarize as complete"
2155            );
2156        }
2157
2158        let outside_ann_dir = tempfile::tempdir().unwrap();
2159        for unsafe_ann_path in [
2160            outside_ann_dir
2161                .path()
2162                .join("outside.chsw")
2163                .to_string_lossy()
2164                .to_string(),
2165            "vector_index/shards/../outside.chsw".to_string(),
2166            "./vector_index/shards/fast/hash.chsw".to_string(),
2167            " vector_index/shards/fast/hash.chsw".to_string(),
2168        ] {
2169            let mut unsafe_ann = test_shard(0, 1, true);
2170            unsafe_ann.ann_ready = true;
2171            unsafe_ann.ann_index_path = Some(unsafe_ann_path);
2172            unsafe_ann.ann_size_bytes = 4096;
2173            shards.replace_shards_for_generation(
2174                TierKind::Fast,
2175                "fnv1a-384",
2176                "fp-sharded",
2177                vec![unsafe_ann],
2178            );
2179            let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2180            assert_eq!(unsafe_ann_summary.shard_count, 1);
2181            assert_eq!(unsafe_ann_summary.ready_shards, 1);
2182            assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
2183            assert!(
2184                unsafe_ann_summary.complete,
2185                "unsafe optional ANN paths must not invalidate the vector shard generation"
2186            );
2187        }
2188
2189        let mut duplicate_ann_path = test_shard(1, 2, true);
2190        duplicate_ann_path.ann_ready = true;
2191        duplicate_ann_path.ann_index_path =
2192            Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
2193        duplicate_ann_path.ann_size_bytes = 4096;
2194        let mut first_ann_path = test_shard(0, 2, true);
2195        first_ann_path.ann_ready = true;
2196        first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2197        first_ann_path.ann_size_bytes = 4096;
2198        shards.replace_shards_for_generation(
2199            TierKind::Fast,
2200            "fnv1a-384",
2201            "fp-sharded",
2202            vec![first_ann_path, duplicate_ann_path],
2203        );
2204        let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2205        assert_eq!(duplicate_ann_summary.shard_count, 2);
2206        assert_eq!(duplicate_ann_summary.ready_shards, 2);
2207        assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2208        assert!(
2209            duplicate_ann_summary.complete,
2210            "duplicate optional ANN paths must not invalidate the vector shard generation"
2211        );
2212
2213        shards.replace_shards_for_generation(
2214            TierKind::Fast,
2215            "fnv1a-384",
2216            "fp-sharded",
2217            vec![test_shard(2, 2, true)],
2218        );
2219        let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2220        assert_eq!(out_of_range_summary.shard_count, 2);
2221        assert_eq!(out_of_range_summary.ready_shards, 1);
2222        assert!(
2223            !out_of_range_summary.complete,
2224            "shard indexes outside the declared shard count are malformed"
2225        );
2226
2227        let mut mismatched_metadata = test_shard(1, 2, true);
2228        mismatched_metadata.dimension = 768;
2229        shards.replace_shards_for_generation(
2230            TierKind::Fast,
2231            "fnv1a-384",
2232            "fp-sharded",
2233            vec![test_shard(0, 2, true), mismatched_metadata],
2234        );
2235        let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2236        assert_eq!(metadata_summary.shard_count, 2);
2237        assert_eq!(metadata_summary.ready_shards, 2);
2238        assert!(
2239            !metadata_summary.complete,
2240            "complete shard generations require consistent shard metadata"
2241        );
2242
2243        let mut stale_schema = test_shard(0, 1, true);
2244        stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2245        shards.replace_shards_for_generation(
2246            TierKind::Fast,
2247            "fnv1a-384",
2248            "fp-sharded",
2249            vec![stale_schema],
2250        );
2251        let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2252        assert_eq!(stale_schema_summary.shard_count, 1);
2253        assert_eq!(stale_schema_summary.ready_shards, 1);
2254        assert!(
2255            !stale_schema_summary.complete,
2256            "stale schema shards must not summarize as complete"
2257        );
2258    }
2259
2260    #[test]
2261    fn shard_sidecar_does_not_make_main_manifest_ready() {
2262        let mut shards = SemanticShardManifest::default();
2263        shards.replace_shards_for_generation(
2264            TierKind::Fast,
2265            "fnv1a-384",
2266            "fp-sharded",
2267            vec![test_shard(0, 1, true)],
2268        );
2269        assert!(
2270            shards
2271                .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2272                .complete
2273        );
2274
2275        let manifest = SemanticManifest::default();
2276        let policy = test_policy();
2277        assert_eq!(
2278            manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2279            TierReadiness::Missing,
2280            "sidecar shards must not publish runtime semantic readiness"
2281        );
2282    }
2283
2284    #[test]
2285    fn shard_manifest_invalidates_incompatible_shards() {
2286        let mut bad = test_shard(0, 1, true);
2287        bad.schema_version = 0;
2288        let mut shards = SemanticShardManifest {
2289            shards: vec![bad, test_shard(0, 1, true)],
2290            ..Default::default()
2291        };
2292
2293        let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2294
2295        assert_eq!(invalidated, 1);
2296        assert_eq!(shards.shards.len(), 1);
2297        assert_eq!(shards.total_size_bytes(), 4096);
2298    }
2299}