Skip to main content

coding_agent_search/search/
semantic_manifest.rs

1//! Durable semantic asset manifest, backlog ledger, and resumable checkpoints.
2//!
3//! This module is the authoritative state model for semantic assets.  It tells
4//! cass exactly what semantic artifacts exist, how trustworthy they are, and
5//! what work remains to converge the corpus — enabling partial readiness,
6//! resumable builds, and truthful runtime degradation.
7//!
8//! # Storage
9//!
10//! The manifest is a single JSON file at:
11//! ```text
12//! {data_dir}/vector_index/semantic_manifest.json
13//! ```
14//!
15//! It is written atomically (write-to-temp then rename) and is the only file
16//! the backfill worker needs to consult to know what work remains.
17//!
18//! # Relationship to other modules
19//!
20//! - **[`policy`]**: Provides the contract (versions, budgets, tier names) that
21//!   this manifest fingerprints against.
22//! - **[`asset_state`]**: Evaluates coarse readiness from this manifest plus
23//!   live file probes.
24//! - **[`model_manager`]**: Detects model availability; this module records
25//!   which model was used to build each artifact.
26
27use std::fs::{self, OpenOptions};
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use ring::rand::{SecureRandom, SystemRandom};
33use serde::{Deserialize, Serialize};
34
35use super::policy::{
36    CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
37    SemanticAssetManifest as PolicyManifest, SemanticPolicy,
38};
39
40// ─── Constants ─────────────────────────────────────────────────────────────
41
42/// Current manifest format version.  Bump when the JSON schema changes in a
43/// backwards-incompatible way.
44/// Manifest format version.
45///
46/// History:
47/// - **v1** (pre-cass#257): `BuildCheckpoint` resume cursor is the
48///   conversation offset only.
49/// - **v2** (cass#257 sub-fix 2): `BuildCheckpoint` may additionally
50///   carry `last_message_id`, an inclusive canonical message PK
51///   advanced by every batch. Resume strictly skips messages ≤ this
52///   cursor so an interrupted bounded run never re-embeds work it
53///   already staged. Later v2 checkpoints may also carry
54///   `cursor_exhausted`, which separates durable completion from
55///   count-based progress. Both fields use `#[serde(default)]`; the
56///   JSON-on-disk shape only differs when a batch persists the cursor
57///   metadata, and older v2 readers ignore the extra field.
58///
59/// **Compatibility:**
60/// - **Old binary reading v2 manifest:** clean `UnsupportedVersion`
61///   error from `load()`; operator sees a clear "manifest version
62///   $V is newer than max-supported $MAX" message and can upgrade.
63/// - **New binary reading v1 manifest:** loads fine. `last_message_id`
64///   defaults to `None`; resume falls back to the conversation offset
65///   with a one-shot warning that resume granularity is coarser than
66///   ideal until the next checkpoint save bumps the on-disk shape.
67pub const MANIFEST_FORMAT_VERSION: u32 = 2;
68
69/// Highest manifest-version emitted by pre-cass#257 binaries; loading
70/// this is fully supported, but resume granularity will be coarser
71/// until a fresh checkpoint is saved. Kept as a named constant so the
72/// fallback warning quotes a stable number.
73pub const MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR: u32 = 1;
74
75/// Filename for the durable manifest.
76pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
77
78/// Filename for the prototype per-shard semantic artifact manifest.
79pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
80
81pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
82    let trimmed = recorded_path.trim();
83    if trimmed.is_empty() || trimmed != recorded_path {
84        return false;
85    }
86    let path = Path::new(recorded_path);
87    !path.is_absolute()
88        && path
89            .components()
90            .all(|component| matches!(component, Component::Normal(_)))
91}
92
93// ─── Tier kind ─────────────────────────────────────────────────────────────
94
95/// Which semantic tier an artifact or checkpoint belongs to.
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
97#[serde(rename_all = "snake_case")]
98pub enum TierKind {
99    Fast,
100    Quality,
101}
102
103impl TierKind {
104    pub fn as_str(&self) -> &'static str {
105        match self {
106            Self::Fast => "fast",
107            Self::Quality => "quality",
108        }
109    }
110}
111
112// ─── Tier readiness ────────────────────────────────────────────────────────
113
114/// Readiness of a single semantic tier.
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
116#[serde(rename_all = "snake_case")]
117pub enum TierReadiness {
118    /// Artifact exists, verified, and current with the DB.
119    Ready,
120    /// Build is in progress (checkpoint present).
121    Building { progress_pct: u8 },
122    /// Artifact exists but DB or model changed since it was built.
123    Stale { reason: String },
124    /// No artifact at all for this tier.
125    Missing,
126    /// Schema or chunking version mismatch — must discard and rebuild.
127    Incompatible { reason: String },
128}
129
130impl TierReadiness {
131    pub fn is_ready(&self) -> bool {
132        matches!(self, Self::Ready)
133    }
134
135    pub fn is_usable(&self) -> bool {
136        matches!(self, Self::Ready | Self::Stale { .. })
137    }
138}
139
140// ─── Artifact record ───────────────────────────────────────────────────────
141
142/// Durable metadata for a single vector index artifact.
143#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
144pub struct ArtifactRecord {
145    /// Which tier this artifact belongs to.
146    pub tier: TierKind,
147    /// Embedder ID that produced these vectors (e.g., "minilm-384", "fnv1a-384").
148    pub embedder_id: String,
149    /// Model revision hash (HuggingFace commit or "hash" for the hash embedder).
150    pub model_revision: String,
151    /// Semantic schema version at build time.
152    pub schema_version: u32,
153    /// Chunking strategy version at build time.
154    pub chunking_version: u32,
155    /// Output dimension of the embedder.
156    pub dimension: usize,
157    /// Number of documents (message chunks) embedded.
158    pub doc_count: u64,
159    /// Number of conversations processed to produce this artifact.
160    pub conversation_count: u64,
161    /// Storage fingerprint of the canonical DB when this artifact was built.
162    pub db_fingerprint: String,
163    /// Relative path to the index file (from data_dir).
164    pub index_path: String,
165    /// File size in bytes.
166    pub size_bytes: u64,
167    /// Unix timestamp (ms) when the build started.
168    pub started_at_ms: i64,
169    /// Unix timestamp (ms) when the build completed.
170    pub completed_at_ms: i64,
171    /// Whether this artifact has been verified and published.
172    pub ready: bool,
173}
174
175impl ArtifactRecord {
176    /// Convert to the policy-level manifest for invalidation checks.
177    pub fn to_policy_manifest(&self) -> PolicyManifest {
178        PolicyManifest {
179            embedder_id: self.embedder_id.clone(),
180            model_revision: self.model_revision.clone(),
181            schema_version: self.schema_version,
182            chunking_version: self.chunking_version,
183            doc_count: self.doc_count,
184            built_at_ms: self.completed_at_ms,
185        }
186    }
187
188    /// Evaluate this artifact's readiness against the current policy and DB
189    /// fingerprint.
190    ///
191    /// **Note**: This checks schema/chunking versions, mode, model revision,
192    /// and DB fingerprint.  It does NOT detect embedder changes because the
193    /// expected embedder ID requires the embedder registry to resolve.
194    /// Callers needing embedder-change detection should call
195    /// [`SemanticAssetManifest::invalidation_action`] directly with the
196    /// correct `expected_embedder_id` from the registry.
197    pub fn readiness(
198        &self,
199        policy: &SemanticPolicy,
200        current_db_fingerprint: &str,
201        current_model_revision: &str,
202    ) -> TierReadiness {
203        let action = self.to_policy_manifest().invalidation_action(
204            policy,
205            current_model_revision,
206            &self.embedder_id,
207        );
208
209        match action {
210            InvalidationAction::UpToDate => {
211                if self.db_fingerprint != current_db_fingerprint {
212                    TierReadiness::Stale {
213                        reason: "DB content changed since artifact was built".to_owned(),
214                    }
215                } else if !self.ready {
216                    TierReadiness::Building { progress_pct: 100 }
217                } else {
218                    TierReadiness::Ready
219                }
220            }
221            InvalidationAction::RebuildInBackground => TierReadiness::Stale {
222                reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
223            },
224            InvalidationAction::DiscardAndRebuild { reason } => {
225                TierReadiness::Incompatible { reason }
226            }
227            InvalidationAction::Evict => TierReadiness::Incompatible {
228                reason: "semantic mode set to lexical-only".to_owned(),
229            },
230        }
231    }
232}
233
234// ─── HNSW accelerator record ──────────────────────────────────────────────
235
236/// Durable metadata for an HNSW accelerator index.
237#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
238pub struct HnswRecord {
239    /// Which base artifact this accelerates.
240    pub base_tier: TierKind,
241    /// Embedder ID of the base artifact.
242    pub embedder_id: String,
243    /// ef_search parameter used at build time.
244    pub ef_search: usize,
245    /// Relative path to the HNSW index file (from data_dir).
246    pub index_path: String,
247    /// File size in bytes.
248    pub size_bytes: u64,
249    /// Unix timestamp (ms) when built.
250    pub built_at_ms: i64,
251    /// Whether this index is ready for use.
252    pub ready: bool,
253}
254
255// ─── Sharded vector artifact sidecar ─────────────────────────────────────
256
257/// Durable metadata for one mmap-friendly semantic vector shard.
258///
259/// Shards deliberately live in a sidecar manifest instead of
260/// [`SemanticManifest`]. Runtime readiness continues to flow through the
261/// existing tier artifact records, so partial shard generations cannot make a
262/// semantic tier look ready before a promotion step has merged or selected a
263/// complete generation.
264#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
265pub struct SemanticShardRecord {
266    /// Which semantic tier this shard belongs to.
267    pub tier: TierKind,
268    /// Embedder ID that produced the vectors.
269    pub embedder_id: String,
270    /// Model revision hash or "hash" for the hash embedder.
271    pub model_revision: String,
272    /// Semantic schema version at build time.
273    pub schema_version: u32,
274    /// Chunking strategy version at build time.
275    pub chunking_version: u32,
276    /// Output dimension of the embedder.
277    pub dimension: usize,
278    /// Zero-based shard number within the generation.
279    pub shard_index: u32,
280    /// Total shards expected for this generation.
281    pub shard_count: u32,
282    /// Number of documents embedded in this shard.
283    pub doc_count: u64,
284    /// Total conversations represented by the full shard generation.
285    pub total_conversations: u64,
286    /// Storage fingerprint of the canonical DB when this shard was built.
287    pub db_fingerprint: String,
288    /// Relative path to the shard index file from data_dir.
289    pub index_path: String,
290    /// Vector quantization used by the shard file.
291    pub quantization: String,
292    /// Whether readers may mmap this artifact directly.
293    pub mmap_ready: bool,
294    /// Relative path to the shard-local ANN accelerator, when built.
295    pub ann_index_path: Option<String>,
296    /// File size of the ANN accelerator in bytes.
297    pub ann_size_bytes: u64,
298    /// Whether the shard-local ANN accelerator is ready for use.
299    pub ann_ready: bool,
300    /// File size in bytes.
301    pub size_bytes: u64,
302    /// Unix timestamp (ms) when the shard build started.
303    pub started_at_ms: i64,
304    /// Unix timestamp (ms) when the shard build completed.
305    pub completed_at_ms: i64,
306    /// Whether this shard has been verified and published to the sidecar.
307    pub ready: bool,
308}
309
310impl SemanticShardRecord {
311    pub fn to_policy_manifest(&self) -> PolicyManifest {
312        PolicyManifest {
313            embedder_id: self.embedder_id.clone(),
314            model_revision: self.model_revision.clone(),
315            schema_version: self.schema_version,
316            chunking_version: self.chunking_version,
317            doc_count: self.doc_count,
318            built_at_ms: self.completed_at_ms,
319        }
320    }
321
322    pub fn matches_generation(
323        &self,
324        tier: TierKind,
325        embedder_id: &str,
326        db_fingerprint: &str,
327    ) -> bool {
328        self.tier == tier
329            && self.embedder_id == embedder_id
330            && self.db_fingerprint == db_fingerprint
331    }
332}
333
334/// Aggregated readiness for a shard generation.
335#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
336pub struct SemanticShardSummary {
337    pub shard_count: u32,
338    pub ready_shards: u32,
339    pub ann_ready_shards: u32,
340    pub doc_count: u64,
341    pub total_conversations: u64,
342    pub size_bytes: u64,
343    pub ann_size_bytes: u64,
344    pub complete: bool,
345}
346
347/// Sidecar manifest for prototype semantic shard generations.
348#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
349pub struct SemanticShardManifest {
350    pub manifest_version: u32,
351    pub shards: Vec<SemanticShardRecord>,
352    pub updated_at_ms: i64,
353}
354
355impl Default for SemanticShardManifest {
356    fn default() -> Self {
357        Self {
358            manifest_version: MANIFEST_FORMAT_VERSION,
359            shards: Vec::new(),
360            updated_at_ms: 0,
361        }
362    }
363}
364
365impl SemanticShardManifest {
366    pub fn path(data_dir: &Path) -> PathBuf {
367        data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
368    }
369
370    pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
371        let path = Self::path(data_dir);
372        let bytes = match fs::read(&path) {
373            Ok(bytes) => bytes,
374            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
375            Err(e) => {
376                return Err(ManifestError::Io {
377                    path,
378                    source: e.to_string(),
379                });
380            }
381        };
382
383        let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
384            path: path.clone(),
385            source: e.to_string(),
386        })?;
387
388        if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
389            return Err(ManifestError::UnsupportedVersion {
390                found: manifest.manifest_version,
391                max_supported: MANIFEST_FORMAT_VERSION,
392            });
393        }
394
395        Ok(Some(manifest))
396    }
397
398    pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
399        match Self::load(data_dir) {
400            Ok(Some(manifest)) => Ok(manifest),
401            Ok(None) => Ok(Self::default()),
402            Err(e @ ManifestError::Io { .. }) => Err(e),
403            Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
404                Ok(Self::default())
405            }
406            Err(e) => Err(e),
407        }
408    }
409
410    pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
411        let path = Self::path(data_dir);
412        if let Some(parent) = path.parent() {
413            fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
414                path: parent.to_path_buf(),
415                source: e.to_string(),
416            })?;
417        }
418
419        self.updated_at_ms = now_ms();
420        let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
421            source: e.to_string(),
422        })?;
423        let (tmp_path, mut file) =
424            create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
425                path: path.clone(),
426                source: e.to_string(),
427            })?;
428        file.write_all(json.as_bytes())
429            .map_err(|e| ManifestError::Io {
430                path: tmp_path.clone(),
431                source: e.to_string(),
432            })?;
433        file.sync_all().map_err(|e| ManifestError::Io {
434            path: tmp_path.clone(),
435            source: e.to_string(),
436        })?;
437        replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
438            path: path.clone(),
439            source: e.to_string(),
440        })?;
441        sync_parent_directory(&path).map_err(|e| ManifestError::Io {
442            path: path
443                .parent()
444                .map(Path::to_path_buf)
445                .unwrap_or_else(|| path.clone()),
446            source: e.to_string(),
447        })?;
448
449        Ok(())
450    }
451
452    pub fn replace_shards_for_generation(
453        &mut self,
454        tier: TierKind,
455        embedder_id: &str,
456        db_fingerprint: &str,
457        mut shards: Vec<SemanticShardRecord>,
458    ) {
459        self.shards
460            .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
461        self.shards.append(&mut shards);
462        self.shards.sort_by(|a, b| {
463            (
464                a.tier.as_str(),
465                &a.embedder_id,
466                &a.db_fingerprint,
467                a.shard_index,
468            )
469                .cmp(&(
470                    b.tier.as_str(),
471                    &b.embedder_id,
472                    &b.db_fingerprint,
473                    b.shard_index,
474                ))
475        });
476    }
477
478    pub fn summary(
479        &self,
480        tier: TierKind,
481        embedder_id: &str,
482        db_fingerprint: &str,
483    ) -> SemanticShardSummary {
484        let mut summary = SemanticShardSummary::default();
485        let mut ready_indices = std::collections::BTreeSet::new();
486        let mut ann_ready_indices = std::collections::BTreeSet::new();
487        let mut seen_indices = std::collections::BTreeSet::new();
488        let mut seen_index_paths = std::collections::BTreeSet::new();
489        let mut seen_ann_index_paths = std::collections::BTreeSet::new();
490        let mut expected_shard_count = None;
491        let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
492        let mut generation_consistent = true;
493        for shard in self
494            .shards
495            .iter()
496            .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
497        {
498            if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
499                generation_consistent = false;
500            }
501            if !seen_indices.insert(shard.shard_index) {
502                generation_consistent = false;
503            }
504            if !semantic_shard_artifact_path_is_safe(&shard.index_path)
505                || !seen_index_paths.insert(&shard.index_path)
506            {
507                generation_consistent = false;
508            }
509            match expected_shard_count {
510                Some(expected) if expected != shard.shard_count => {
511                    generation_consistent = false;
512                }
513                None => expected_shard_count = Some(shard.shard_count),
514                _ => {}
515            }
516            let generation_metadata = (
517                shard.model_revision.as_str(),
518                shard.schema_version,
519                shard.chunking_version,
520                shard.dimension,
521                shard.total_conversations,
522                shard.quantization.as_str(),
523            );
524            match expected_generation_metadata {
525                Some(expected) if expected != generation_metadata => {
526                    generation_consistent = false;
527                }
528                None => expected_generation_metadata = Some(generation_metadata),
529                _ => {}
530            }
531            if shard.schema_version != SEMANTIC_SCHEMA_VERSION
532                || shard.chunking_version != CHUNKING_STRATEGY_VERSION
533                || shard.dimension == 0
534            {
535                generation_consistent = false;
536            }
537            summary.shard_count = summary.shard_count.max(shard.shard_count);
538            summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
539            summary.total_conversations =
540                summary.total_conversations.max(shard.total_conversations);
541            summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
542            summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
543            if shard.ready && shard.mmap_ready {
544                ready_indices.insert(shard.shard_index);
545            }
546            if shard.ready
547                && shard.mmap_ready
548                && shard.ann_ready
549                && shard.ann_size_bytes > 0
550                && let Some(ann_index_path) = shard.ann_index_path.as_deref()
551                && semantic_shard_artifact_path_is_safe(ann_index_path)
552                && seen_ann_index_paths.insert(ann_index_path)
553            {
554                ann_ready_indices.insert(shard.shard_index);
555            }
556        }
557        summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
558        summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
559        summary.complete = generation_consistent
560            && summary.shard_count > 0
561            && summary.ready_shards == summary.shard_count
562            && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
563        summary
564    }
565
566    pub fn invalidate_incompatible(
567        &mut self,
568        policy: &SemanticPolicy,
569        current_model_revision: &str,
570    ) -> usize {
571        let before = self.shards.len();
572        self.shards.retain(|shard| {
573            !matches!(
574                shard.to_policy_manifest().invalidation_action(
575                    policy,
576                    current_model_revision,
577                    &shard.embedder_id,
578                ),
579                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
580            )
581        });
582        before.saturating_sub(self.shards.len())
583    }
584
585    pub fn total_size_bytes(&self) -> u64 {
586        self.shards
587            .iter()
588            .map(|shard| shard.size_bytes)
589            .fold(0, u64::saturating_add)
590    }
591}
592
593// ─── Build checkpoint ──────────────────────────────────────────────────────
594
595/// Resumable position for an interrupted semantic build.
596///
597/// Sub-fix 2 for cass#257 added the optional `last_message_id` cursor.
598/// Resume strictly advances past this cursor when present, so that a
599/// rerun of an interrupted bounded backfill never re-embeds messages
600/// that already made it into the staged index. Pre-#257 binaries wrote
601/// checkpoints without the field (it deserializes to `None` via
602/// `#[serde(default)]`), and modern binaries fall back to the
603/// conversation offset when `last_message_id` is absent.
604#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
605pub struct BuildCheckpoint {
606    /// Which tier is being built.
607    pub tier: TierKind,
608    /// Embedder ID for this build.
609    pub embedder_id: String,
610    /// Last conversation offset processed (for pagination).
611    pub last_offset: i64,
612    /// Total documents embedded so far in this build.
613    pub docs_embedded: u64,
614    /// Total conversations processed so far.
615    pub conversations_processed: u64,
616    /// Total conversations expected (from DB at start of build).
617    pub total_conversations: u64,
618    /// DB fingerprint when this build started.
619    pub db_fingerprint: String,
620    /// Schema version for this build.
621    pub schema_version: u32,
622    /// Chunking version for this build.
623    pub chunking_version: u32,
624    /// Unix timestamp (ms) when this checkpoint was saved.
625    pub saved_at_ms: i64,
626    /// Highest canonical message PK embedded in this run so far.
627    ///
628    /// Added in cass#257 (sub-fix 2). `None` for checkpoints written
629    /// by pre-#257 binaries; new code falls back to `last_offset`
630    /// (conversation-granularity) when this is absent. New code
631    /// strictly resumes past this cursor when present.
632    #[serde(default, skip_serializing_if = "Option::is_none")]
633    pub last_message_id: Option<i64>,
634    /// Whether the canonical selection cursor proved there are no more
635    /// eligible conversations after this checkpoint.
636    ///
637    /// This is distinct from count-based progress: checkpoint caps and
638    /// cursor predicates can leave more work even when processed counts
639    /// appear to have reached the DB total snapshot.
640    #[serde(default, skip_serializing_if = "is_false")]
641    pub cursor_exhausted: bool,
642}
643
644fn is_false(value: &bool) -> bool {
645    !*value
646}
647
648impl BuildCheckpoint {
649    /// Progress as a percentage (0–100).
650    pub fn progress_pct(&self) -> u8 {
651        if self.total_conversations == 0 {
652            return 0;
653        }
654        let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
655        let pct = (pct as u8).min(100);
656        if pct == 100 && !self.cursor_exhausted {
657            99
658        } else {
659            pct
660        }
661    }
662
663    /// Whether the build is complete (all conversations processed).
664    pub fn is_complete(&self) -> bool {
665        self.cursor_exhausted && self.conversations_processed >= self.total_conversations
666    }
667
668    /// Whether this checkpoint is still valid against the current DB and policy.
669    pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
670        self.db_fingerprint == current_db_fingerprint
671            && self.schema_version == SEMANTIC_SCHEMA_VERSION
672            && self.chunking_version == CHUNKING_STRATEGY_VERSION
673    }
674}
675
676// ─── Backlog ledger ────────────────────────────────────────────────────────
677
678/// Tracks what semantic build work remains.
679#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
680pub struct BacklogLedger {
681    /// Total conversations in the canonical DB at last check.
682    pub total_conversations: u64,
683    /// Conversations embedded in the fast tier.
684    pub fast_tier_processed: u64,
685    /// Conversations embedded in the quality tier.
686    pub quality_tier_processed: u64,
687    /// DB fingerprint when this ledger was computed.
688    pub db_fingerprint: String,
689    /// Unix timestamp (ms) when this ledger was computed.
690    pub computed_at_ms: i64,
691}
692
693impl BacklogLedger {
694    /// Remaining conversations for the fast tier.
695    pub fn fast_tier_remaining(&self) -> u64 {
696        self.total_conversations
697            .saturating_sub(self.fast_tier_processed)
698    }
699
700    /// Remaining conversations for the quality tier.
701    pub fn quality_tier_remaining(&self) -> u64 {
702        self.total_conversations
703            .saturating_sub(self.quality_tier_processed)
704    }
705
706    /// Whether either tier has outstanding work.
707    pub fn has_pending_work(&self) -> bool {
708        self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
709    }
710
711    /// Whether the ledger is current with the given DB fingerprint.
712    pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
713        self.db_fingerprint == current_db_fingerprint
714    }
715}
716
717// ─── The top-level manifest ────────────────────────────────────────────────
718
719/// Durable, atomic semantic asset manifest.
720///
721/// This is the single source of truth for what semantic assets exist, their
722/// provenance, and what work remains.  It is loaded/saved as JSON.
723#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
724pub struct SemanticManifest {
725    /// Format version — for future migrations.
726    pub manifest_version: u32,
727    /// Fast-tier vector artifact (hash embedder).
728    pub fast_tier: Option<ArtifactRecord>,
729    /// Quality-tier vector artifact (ML embedder).
730    pub quality_tier: Option<ArtifactRecord>,
731    /// HNSW accelerator index.
732    pub hnsw: Option<HnswRecord>,
733    /// Backlog / progress tracker.
734    pub backlog: BacklogLedger,
735    /// Active build checkpoint (for resuming interrupted work).
736    pub checkpoint: Option<BuildCheckpoint>,
737    /// Unix timestamp (ms) when this manifest was last written.
738    pub updated_at_ms: i64,
739}
740
741impl Default for SemanticManifest {
742    fn default() -> Self {
743        Self {
744            manifest_version: MANIFEST_FORMAT_VERSION,
745            fast_tier: None,
746            quality_tier: None,
747            hnsw: None,
748            backlog: BacklogLedger {
749                total_conversations: 0,
750                fast_tier_processed: 0,
751                quality_tier_processed: 0,
752                db_fingerprint: String::new(),
753                computed_at_ms: 0,
754            },
755            checkpoint: None,
756            updated_at_ms: 0,
757        }
758    }
759}
760
761impl SemanticManifest {
762    // ── Path helpers ───────────────────────────────────────────────────
763
764    /// Path to the manifest file.
765    pub fn path(data_dir: &Path) -> PathBuf {
766        data_dir.join("vector_index").join(MANIFEST_FILENAME)
767    }
768
769    // ── Load / Save ───────────────────────────────────────────────────
770
771    /// Load the manifest from disk.  Returns `None` if the file doesn't
772    /// exist, `Err` if it exists but is corrupt.
773    ///
774    /// **Migration notes (cass#257 sub-fix 2):** a v1 manifest written
775    /// by a pre-#257 binary loads cleanly — `last_message_id` defaults
776    /// to `None` and resume falls back to the conversation offset.
777    /// A one-shot warning surfaces on first load so an operator sees
778    /// that resume granularity is coarser than ideal until the next
779    /// checkpoint save lands and the on-disk shape upgrades.
780    pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
781        let path = Self::path(data_dir);
782        let bytes = match fs::read(&path) {
783            Ok(b) => b,
784            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
785            Err(e) => {
786                return Err(ManifestError::Io {
787                    path,
788                    source: e.to_string(),
789                });
790            }
791        };
792
793        let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
794            path: path.clone(),
795            source: e.to_string(),
796        })?;
797
798        // Forward-compatible: reject future manifest versions we can't read.
799        if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
800            return Err(ManifestError::UnsupportedVersion {
801                found: manifest.manifest_version,
802                max_supported: MANIFEST_FORMAT_VERSION,
803            });
804        }
805
806        // Backwards-compatible: a v1 manifest with an active checkpoint
807        // means a previous interrupted backfill saved without the
808        // `last_message_id` cursor. Warn so an operator monitoring an
809        // overnight run knows resume granularity is conversation-coarse
810        // until the next checkpoint save bumps the on-disk shape.
811        if manifest.manifest_version <= MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR
812            && manifest
813                .checkpoint
814                .as_ref()
815                .is_some_and(|cp| cp.last_message_id.is_none())
816        {
817            tracing::warn!(
818                manifest_version = manifest.manifest_version,
819                supported_version = MANIFEST_FORMAT_VERSION,
820                path = %path.display(),
821                "semantic checkpoint manifest predates last_message_id cursor (cass#257 sub-fix 2); resume will fall back to conversation offset until the next checkpoint save"
822            );
823        }
824
825        Ok(Some(manifest))
826    }
827
828    /// Load the manifest, returning defaults if absent or corrupt.
829    ///
830    /// Unlike [`load`], this method treats parse errors and version mismatches
831    /// as "manifest absent" — the caller gets a clean default rather than an
832    /// error.  This is the right behaviour for runtime code that must always
833    /// make progress.
834    pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
835        match Self::load(data_dir) {
836            Ok(Some(manifest)) => Ok(manifest),
837            Ok(None) => Ok(Self::default()),
838            // I/O errors are real failures — propagate the original.
839            Err(e @ ManifestError::Io { .. }) => Err(e),
840            // Parse or version errors → treat as absent.
841            Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
842                Ok(Self::default())
843            }
844            Err(e) => Err(e),
845        }
846    }
847
848    /// Atomically save the manifest to disk (write-to-temp then rename).
849    pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
850        let path = Self::path(data_dir);
851
852        // Ensure parent directory exists.
853        if let Some(parent) = path.parent() {
854            fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
855                path: parent.to_path_buf(),
856                source: e.to_string(),
857            })?;
858        }
859
860        self.updated_at_ms = now_ms();
861
862        let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
863            source: e.to_string(),
864        })?;
865
866        // Atomic write: temp file → rename.
867        let (tmp_path, mut file) =
868            create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
869                path: path.clone(),
870                source: e.to_string(),
871            })?;
872        file.write_all(json.as_bytes())
873            .map_err(|e| ManifestError::Io {
874                path: tmp_path.clone(),
875                source: e.to_string(),
876            })?;
877        file.sync_all().map_err(|e| ManifestError::Io {
878            path: tmp_path.clone(),
879            source: e.to_string(),
880        })?;
881        replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
882            path: path.clone(),
883            source: e.to_string(),
884        })?;
885        sync_parent_directory(&path).map_err(|e| ManifestError::Io {
886            path: path
887                .parent()
888                .map(Path::to_path_buf)
889                .unwrap_or_else(|| path.clone()),
890            source: e.to_string(),
891        })?;
892
893        Ok(())
894    }
895
896    // ── Readiness evaluation ──────────────────────────────────────────
897
898    /// Evaluate readiness of the fast tier.
899    pub fn fast_tier_readiness(
900        &self,
901        policy: &SemanticPolicy,
902        current_db_fingerprint: &str,
903        current_model_revision: &str,
904    ) -> TierReadiness {
905        match &self.fast_tier {
906            Some(artifact) => {
907                artifact.readiness(policy, current_db_fingerprint, current_model_revision)
908            }
909            None => {
910                // Check for an active build checkpoint for this tier.
911                if let Some(cp) = &self.checkpoint
912                    && cp.tier == TierKind::Fast
913                    && cp.is_valid(current_db_fingerprint)
914                {
915                    TierReadiness::Building {
916                        progress_pct: cp.progress_pct(),
917                    }
918                } else {
919                    TierReadiness::Missing
920                }
921            }
922        }
923    }
924
925    /// Evaluate readiness of the quality tier.
926    pub fn quality_tier_readiness(
927        &self,
928        policy: &SemanticPolicy,
929        current_db_fingerprint: &str,
930        current_model_revision: &str,
931    ) -> TierReadiness {
932        match &self.quality_tier {
933            Some(artifact) => {
934                artifact.readiness(policy, current_db_fingerprint, current_model_revision)
935            }
936            None => {
937                if let Some(cp) = &self.checkpoint
938                    && cp.tier == TierKind::Quality
939                    && cp.is_valid(current_db_fingerprint)
940                {
941                    TierReadiness::Building {
942                        progress_pct: cp.progress_pct(),
943                    }
944                } else {
945                    TierReadiness::Missing
946                }
947            }
948        }
949    }
950
951    /// Whether hybrid refinement can run right now (fast tier usable).
952    pub fn can_hybrid_search(
953        &self,
954        policy: &SemanticPolicy,
955        current_db_fingerprint: &str,
956        current_model_revision: &str,
957    ) -> bool {
958        self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
959            .is_usable()
960    }
961
962    // ── Backlog / checkpoint management ───────────────────────────────
963
964    /// Update the backlog from the canonical DB state.
965    pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
966        let fast_processed = self
967            .fast_tier
968            .as_ref()
969            .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
970            .map_or(0, |a| a.conversation_count);
971        let quality_processed = self
972            .quality_tier
973            .as_ref()
974            .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
975            .map_or(0, |a| a.conversation_count);
976
977        self.backlog = BacklogLedger {
978            total_conversations,
979            fast_tier_processed: fast_processed,
980            quality_tier_processed: quality_processed,
981            db_fingerprint: current_db_fingerprint.to_owned(),
982            computed_at_ms: now_ms(),
983        };
984    }
985
986    /// Save a build checkpoint (called periodically during backfill).
987    pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
988        self.checkpoint = Some(checkpoint);
989    }
990
991    /// Clear the build checkpoint (called when build finishes or is abandoned).
992    pub fn clear_checkpoint(&mut self) {
993        self.checkpoint = None;
994    }
995
996    /// Record a completed artifact and clear the matching checkpoint.
997    pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
998        // Clear checkpoint if it matches this tier.
999        if self
1000            .checkpoint
1001            .as_ref()
1002            .is_some_and(|cp| cp.tier == artifact.tier)
1003        {
1004            self.checkpoint = None;
1005        }
1006
1007        match artifact.tier {
1008            TierKind::Fast => self.fast_tier = Some(artifact),
1009            TierKind::Quality => self.quality_tier = Some(artifact),
1010        }
1011    }
1012
1013    /// Record a completed HNSW accelerator.
1014    pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
1015        self.hnsw = Some(hnsw);
1016    }
1017
1018    /// Adopt a legacy (pre-manifest) artifact if it is compatible with the
1019    /// current schema/chunking versions.  Returns `true` if adopted.
1020    #[allow(clippy::too_many_arguments)]
1021    pub fn adopt_legacy_artifact(
1022        &mut self,
1023        tier: TierKind,
1024        embedder_id: &str,
1025        model_revision: &str,
1026        dimension: usize,
1027        doc_count: u64,
1028        conversation_count: u64,
1029        db_fingerprint: &str,
1030        index_path: &str,
1031        size_bytes: u64,
1032    ) -> bool {
1033        let record = ArtifactRecord {
1034            tier,
1035            embedder_id: embedder_id.to_owned(),
1036            model_revision: model_revision.to_owned(),
1037            schema_version: SEMANTIC_SCHEMA_VERSION,
1038            chunking_version: CHUNKING_STRATEGY_VERSION,
1039            dimension,
1040            doc_count,
1041            conversation_count,
1042            db_fingerprint: db_fingerprint.to_owned(),
1043            index_path: index_path.to_owned(),
1044            size_bytes,
1045            started_at_ms: 0,
1046            completed_at_ms: now_ms(),
1047            ready: true,
1048        };
1049
1050        match tier {
1051            TierKind::Fast => self.fast_tier = Some(record),
1052            TierKind::Quality => self.quality_tier = Some(record),
1053        }
1054        true
1055    }
1056
1057    /// Invalidate artifacts that are incompatible with the current policy.
1058    /// Returns the number of artifacts invalidated.
1059    ///
1060    /// **Note**: This detects schema version, chunking version, and mode
1061    /// incompatibilities.  It does NOT detect embedder changes (e.g., minilm →
1062    /// snowflake) because the policy stores short names while artifacts store
1063    /// full registry IDs.  Callers who need embedder-change detection should
1064    /// compare `artifact.embedder_id` against the expected ID from the
1065    /// embedder registry.
1066    pub fn invalidate_incompatible(
1067        &mut self,
1068        policy: &SemanticPolicy,
1069        current_model_revision: &str,
1070    ) -> usize {
1071        let mut count = 0;
1072
1073        if let Some(ref artifact) = self.fast_tier {
1074            let pm = artifact.to_policy_manifest();
1075            if matches!(
1076                pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1077                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1078            ) {
1079                self.fast_tier = None;
1080                count += 1;
1081            }
1082        }
1083
1084        if let Some(ref artifact) = self.quality_tier {
1085            let pm = artifact.to_policy_manifest();
1086            if matches!(
1087                pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1088                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1089            ) {
1090                self.quality_tier = None;
1091                count += 1;
1092            }
1093        }
1094
1095        // HNSW depends on the base tier — invalidate if base is gone.
1096        if let Some(ref hnsw) = self.hnsw {
1097            let base_gone = match hnsw.base_tier {
1098                TierKind::Fast => self.fast_tier.is_none(),
1099                TierKind::Quality => self.quality_tier.is_none(),
1100            };
1101            if base_gone {
1102                self.hnsw = None;
1103                count += 1;
1104            }
1105        }
1106
1107        // Invalidate checkpoint if its schema/chunking is wrong.
1108        if let Some(ref cp) = self.checkpoint
1109            && (cp.schema_version != policy.semantic_schema_version
1110                || cp.chunking_version != policy.chunking_strategy_version)
1111        {
1112            self.checkpoint = None;
1113        }
1114
1115        count
1116    }
1117
1118    /// Total disk usage of all semantic artifacts (bytes).
1119    pub fn total_size_bytes(&self) -> u64 {
1120        let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1121        let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1122        let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1123        fast + quality + hnsw
1124    }
1125
1126    /// Total disk usage in megabytes (rounded up).
1127    pub fn total_size_mb(&self) -> u64 {
1128        self.total_size_bytes().div_ceil(1_048_576)
1129    }
1130}
1131
1132// ─── Errors ────────────────────────────────────────────────────────────────
1133
1134#[derive(Debug)]
1135pub enum ManifestError {
1136    Io { path: PathBuf, source: String },
1137    Parse { path: PathBuf, source: String },
1138    Serialize { source: String },
1139    UnsupportedVersion { found: u32, max_supported: u32 },
1140}
1141
1142impl std::fmt::Display for ManifestError {
1143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1144        match self {
1145            Self::Io { path, source } => {
1146                write!(f, "manifest I/O error at {}: {source}", path.display())
1147            }
1148            Self::Parse { path, source } => {
1149                write!(f, "manifest parse error at {}: {source}", path.display())
1150            }
1151            Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1152            Self::UnsupportedVersion {
1153                found,
1154                max_supported,
1155            } => write!(
1156                f,
1157                "manifest version {found} is newer than supported version {max_supported}"
1158            ),
1159        }
1160    }
1161}
1162
1163impl std::error::Error for ManifestError {}
1164
1165// ─── Helpers ───────────────────────────────────────────────────────────────
1166
1167fn now_ms() -> i64 {
1168    SystemTime::now()
1169        .duration_since(UNIX_EPOCH)
1170        .map(|d| d.as_millis() as i64)
1171        .unwrap_or(0)
1172}
1173
1174fn unique_manifest_temp_path(path: &Path, attempt: u32, random: u64) -> PathBuf {
1175    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1176
1177    let file_name = path
1178        .file_name()
1179        .and_then(|name| name.to_str())
1180        .unwrap_or(MANIFEST_FILENAME);
1181    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1182    path.with_file_name(format!(
1183        ".{file_name}.tmp.{attempt}.{}.{}.{random:016x}",
1184        now_ms(),
1185        nonce
1186    ))
1187}
1188
1189fn create_unique_manifest_temp_file(path: &Path) -> std::io::Result<(PathBuf, fs::File)> {
1190    for attempt in 0..100 {
1191        let random = random_manifest_path_nonce()?;
1192        let tmp_path = unique_manifest_temp_path(path, attempt, random);
1193        match OpenOptions::new()
1194            .write(true)
1195            .create_new(true)
1196            .open(&tmp_path)
1197        {
1198            Ok(file) => return Ok((tmp_path, file)),
1199            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
1200            Err(err) => return Err(err),
1201        }
1202    }
1203
1204    Err(std::io::Error::new(
1205        std::io::ErrorKind::AlreadyExists,
1206        format!(
1207            "could not create a unique temporary manifest file for {} after 100 attempts",
1208            path.display()
1209        ),
1210    ))
1211}
1212
1213#[cfg(windows)]
1214fn unique_manifest_backup_path(path: &Path) -> std::io::Result<PathBuf> {
1215    let random = random_manifest_path_nonce()?;
1216    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1217
1218    let file_name = path
1219        .file_name()
1220        .and_then(|name| name.to_str())
1221        .unwrap_or(MANIFEST_FILENAME);
1222    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1223    Ok(path.with_file_name(format!(
1224        ".{file_name}.bak.{}.{nonce}.{random:016x}",
1225        now_ms()
1226    )))
1227}
1228
1229fn random_manifest_path_nonce() -> std::io::Result<u64> {
1230    let mut random_bytes = [0u8; 8];
1231    SystemRandom::new()
1232        .fill(&mut random_bytes)
1233        .map_err(|_| std::io::Error::other("failed to generate manifest temp path nonce"))?;
1234    Ok(u64::from_le_bytes(random_bytes))
1235}
1236
1237fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1238    #[cfg(windows)]
1239    {
1240        match fs::rename(temp_path, final_path) {
1241            Ok(()) => sync_parent_directory(final_path),
1242            Err(first_err)
1243                if replacement_path_entry_exists(final_path)?
1244                    && matches!(
1245                        first_err.kind(),
1246                        std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1247                    ) =>
1248            {
1249                let backup_path = unique_manifest_backup_path(final_path)?;
1250                fs::rename(final_path, &backup_path).map_err(|backup_err| {
1251                    let _ = fs::remove_file(temp_path);
1252                    std::io::Error::other(format!(
1253                        "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1254                        backup_path.display(),
1255                        final_path.display(),
1256                        first_err,
1257                        backup_err
1258                    ))
1259                })?;
1260                match fs::rename(temp_path, final_path) {
1261                    Ok(()) => {
1262                        let _ = fs::remove_file(&backup_path);
1263                        sync_parent_directory(final_path)
1264                    }
1265                    Err(second_err) => match fs::rename(&backup_path, final_path) {
1266                        Ok(()) => {
1267                            let _ = fs::remove_file(temp_path);
1268                            sync_parent_directory(final_path)?;
1269                            Err(std::io::Error::other(format!(
1270                                "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1271                                final_path.display(),
1272                                temp_path.display(),
1273                                first_err,
1274                                second_err
1275                            )))
1276                        }
1277                        Err(restore_err) => Err(std::io::Error::other(format!(
1278                            "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1279                            final_path.display(),
1280                            temp_path.display(),
1281                            first_err,
1282                            second_err,
1283                            restore_err,
1284                            temp_path.display()
1285                        ))),
1286                    },
1287                }
1288            }
1289            Err(err) => Err(err),
1290        }
1291    }
1292
1293    #[cfg(not(windows))]
1294    {
1295        fs::rename(temp_path, final_path)
1296    }
1297}
1298
1299#[cfg(any(windows, test))]
1300fn replacement_path_entry_exists(path: &Path) -> std::io::Result<bool> {
1301    match fs::symlink_metadata(path) {
1302        Ok(_) => Ok(true),
1303        Err(err) if matches!(err.kind(), std::io::ErrorKind::NotFound) => Ok(false),
1304        Err(err) => Err(std::io::Error::new(
1305            err.kind(),
1306            format!(
1307                "failed inspecting semantic manifest replacement target {}: {err}",
1308                path.display()
1309            ),
1310        )),
1311    }
1312}
1313
1314#[cfg(not(windows))]
1315fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1316    let Some(parent) = path.parent() else {
1317        return Ok(());
1318    };
1319    let directory = fs::File::open(parent)?;
1320    directory.sync_all()
1321}
1322
1323#[cfg(windows)]
1324fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1325    Ok(())
1326}
1327
1328// ─── Tests ─────────────────────────────────────────────────────────────────
1329
1330#[cfg(test)]
1331mod tests {
1332    use super::*;
1333    use crate::search::policy::SemanticPolicy;
1334
1335    fn test_policy() -> SemanticPolicy {
1336        SemanticPolicy::compiled_defaults()
1337    }
1338
1339    fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1340        ArtifactRecord {
1341            tier,
1342            embedder_id: match tier {
1343                TierKind::Fast => "fnv1a-384".to_owned(),
1344                TierKind::Quality => "minilm-384".to_owned(),
1345            },
1346            model_revision: "abc123".to_owned(),
1347            schema_version: SEMANTIC_SCHEMA_VERSION,
1348            chunking_version: CHUNKING_STRATEGY_VERSION,
1349            dimension: 384,
1350            doc_count: 1000,
1351            conversation_count: 250,
1352            db_fingerprint: "fp-1234".to_owned(),
1353            index_path: format!(
1354                "vector_index/index-{}.fsvi",
1355                match tier {
1356                    TierKind::Fast => "fnv1a-384",
1357                    TierKind::Quality => "minilm-384",
1358                }
1359            ),
1360            size_bytes: 150_000,
1361            started_at_ms: 1_700_000_000_000,
1362            completed_at_ms: 1_700_000_060_000,
1363            ready,
1364        }
1365    }
1366
1367    fn test_hnsw() -> HnswRecord {
1368        HnswRecord {
1369            base_tier: TierKind::Quality,
1370            embedder_id: "minilm-384".to_owned(),
1371            ef_search: 128,
1372            index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1373            size_bytes: 50_000,
1374            built_at_ms: 1_700_000_070_000,
1375            ready: true,
1376        }
1377    }
1378
1379    fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1380        SemanticShardRecord {
1381            tier: TierKind::Fast,
1382            embedder_id: "fnv1a-384".to_owned(),
1383            model_revision: "hash".to_owned(),
1384            schema_version: SEMANTIC_SCHEMA_VERSION,
1385            chunking_version: CHUNKING_STRATEGY_VERSION,
1386            dimension: 384,
1387            shard_index,
1388            shard_count,
1389            doc_count: 25,
1390            total_conversations: 10,
1391            db_fingerprint: "fp-sharded".to_owned(),
1392            index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1393            quantization: "f16".to_owned(),
1394            mmap_ready: true,
1395            ann_index_path: None,
1396            ann_size_bytes: 0,
1397            ann_ready: false,
1398            size_bytes: 4096,
1399            started_at_ms: 1_700_000_080_000,
1400            completed_at_ms: 1_700_000_081_000,
1401            ready,
1402        }
1403    }
1404
1405    fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1406        BuildCheckpoint {
1407            tier,
1408            embedder_id: "minilm-384".to_owned(),
1409            last_offset: 500,
1410            docs_embedded: 3000,
1411            conversations_processed: 500,
1412            total_conversations: 1000,
1413            db_fingerprint: "fp-1234".to_owned(),
1414            schema_version: SEMANTIC_SCHEMA_VERSION,
1415            chunking_version: CHUNKING_STRATEGY_VERSION,
1416            saved_at_ms: 1_700_000_030_000,
1417            last_message_id: None,
1418            cursor_exhausted: false,
1419        }
1420    }
1421
1422    #[derive(Debug, Clone, Copy)]
1423    enum ExpectedTierReadiness {
1424        Ready,
1425        Stale,
1426        Incompatible,
1427        Building(u8),
1428    }
1429
1430    fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1431
1432    type TierReadinessCase = (
1433        &'static str,
1434        TierKind,
1435        bool,
1436        &'static str,
1437        &'static str,
1438        fn(&mut ArtifactRecord),
1439        ExpectedTierReadiness,
1440    );
1441
1442    fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1443        artifact.schema_version = 0;
1444    }
1445
1446    fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1447        match expected {
1448            ExpectedTierReadiness::Ready => {
1449                assert_eq!(actual, TierReadiness::Ready, "{label}");
1450            }
1451            ExpectedTierReadiness::Stale => {
1452                assert!(
1453                    matches!(actual, TierReadiness::Stale { .. }),
1454                    "{label}: {actual:?}"
1455                );
1456            }
1457            ExpectedTierReadiness::Incompatible => {
1458                assert!(
1459                    matches!(actual, TierReadiness::Incompatible { .. }),
1460                    "{label}: {actual:?}"
1461                );
1462            }
1463            ExpectedTierReadiness::Building(progress_pct) => {
1464                assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1465            }
1466        }
1467    }
1468
1469    // ── Manifest load/save round-trip ──────────────────────────────────
1470
1471    #[test]
1472    fn manifest_round_trip_via_disk() {
1473        let temp = tempfile::tempdir().unwrap();
1474        let mut manifest = SemanticManifest {
1475            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1476            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1477            hnsw: Some(test_hnsw()),
1478            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1479            backlog: BacklogLedger {
1480                total_conversations: 2000,
1481                fast_tier_processed: 1000,
1482                quality_tier_processed: 500,
1483                db_fingerprint: "fp-1234".to_owned(),
1484                computed_at_ms: 1_700_000_000_000,
1485            },
1486            ..Default::default()
1487        };
1488
1489        manifest.save(temp.path()).unwrap();
1490        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1491
1492        assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1493        assert!(loaded.fast_tier.is_some());
1494        assert!(loaded.quality_tier.is_some());
1495        assert!(loaded.hnsw.is_some());
1496        assert!(loaded.checkpoint.is_some());
1497        assert_eq!(loaded.backlog.total_conversations, 2000);
1498        assert!(loaded.updated_at_ms > 0);
1499    }
1500
1501    #[test]
1502    fn manifest_save_overwrites_existing_file() {
1503        let temp = tempfile::tempdir().unwrap();
1504        let mut first = SemanticManifest {
1505            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1506            ..Default::default()
1507        };
1508        first.save(temp.path()).unwrap();
1509
1510        let mut second = SemanticManifest {
1511            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1512            backlog: BacklogLedger {
1513                total_conversations: 99,
1514                fast_tier_processed: 0,
1515                quality_tier_processed: 99,
1516                db_fingerprint: "fp-overwrite".to_owned(),
1517                computed_at_ms: 1_700_000_000_123,
1518            },
1519            ..Default::default()
1520        };
1521        second.save(temp.path()).unwrap();
1522
1523        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1524        assert!(loaded.fast_tier.is_none());
1525        assert!(loaded.quality_tier.is_some());
1526        assert_eq!(loaded.backlog.total_conversations, 99);
1527    }
1528
1529    #[cfg(unix)]
1530    #[test]
1531    fn manifest_replacement_path_entry_exists_detects_dangling_symlink() -> Result<(), String> {
1532        use std::os::unix::fs::symlink;
1533
1534        let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1535        let link_path = SemanticManifest::path(temp.path());
1536        let manifest_dir = link_path
1537            .parent()
1538            .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1539        fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1540        let missing_target = manifest_dir.join("missing-semantic-manifest.json");
1541
1542        symlink(&missing_target, &link_path).map_err(|e| e.to_string())?;
1543
1544        if link_path.exists() {
1545            return Err("dangling manifest symlink unexpectedly resolved".to_string());
1546        }
1547        if !replacement_path_entry_exists(&link_path).map_err(|e| e.to_string())? {
1548            return Err(format!(
1549                "semantic manifest replacement entry check missed dangling symlink {}",
1550                link_path.display()
1551            ));
1552        }
1553
1554        Ok(())
1555    }
1556
1557    #[test]
1558    fn manifest_temp_file_creation_is_exclusive_and_unique() -> Result<(), String> {
1559        let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1560        let final_path = SemanticManifest::path(temp.path());
1561        let manifest_dir = final_path
1562            .parent()
1563            .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1564        fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1565
1566        let (first_path, mut first_file) =
1567            create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1568        first_file.write_all(b"first").map_err(|e| e.to_string())?;
1569        let (second_path, mut second_file) =
1570            create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1571        second_file
1572            .write_all(b"second")
1573            .map_err(|e| e.to_string())?;
1574
1575        if first_path == second_path {
1576            return Err("exclusive temp creation reused the same path".to_string());
1577        }
1578        if !first_path.exists() {
1579            return Err(format!(
1580                "first temp file is missing: {}",
1581                first_path.display()
1582            ));
1583        }
1584        if !second_path.exists() {
1585            return Err(format!(
1586                "second temp file is missing: {}",
1587                second_path.display()
1588            ));
1589        }
1590        if first_path.parent() != Some(manifest_dir) {
1591            return Err(format!(
1592                "first temp path escaped manifest directory: {}",
1593                first_path.display()
1594            ));
1595        }
1596        if second_path.parent() != Some(manifest_dir) {
1597            return Err(format!(
1598                "second temp path escaped manifest directory: {}",
1599                second_path.display()
1600            ));
1601        }
1602
1603        Ok(())
1604    }
1605
1606    #[test]
1607    fn manifest_load_missing_returns_none() {
1608        let temp = tempfile::tempdir().unwrap();
1609        let loaded = SemanticManifest::load(temp.path()).unwrap();
1610        assert!(loaded.is_none());
1611    }
1612
1613    #[test]
1614    fn manifest_load_or_default_returns_defaults() {
1615        let temp = tempfile::tempdir().unwrap();
1616        let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1617        assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1618        assert!(manifest.fast_tier.is_none());
1619        assert!(manifest.quality_tier.is_none());
1620    }
1621
1622    #[test]
1623    fn manifest_load_corrupt_returns_parse_error() {
1624        let temp = tempfile::tempdir().unwrap();
1625        let path = SemanticManifest::path(temp.path());
1626        fs::create_dir_all(path.parent().unwrap()).unwrap();
1627        fs::write(&path, b"not json").unwrap();
1628
1629        let result = SemanticManifest::load(temp.path());
1630        assert!(matches!(result, Err(ManifestError::Parse { .. })));
1631    }
1632
1633    #[test]
1634    fn manifest_load_future_version_returns_error() {
1635        let temp = tempfile::tempdir().unwrap();
1636        let path = SemanticManifest::path(temp.path());
1637        fs::create_dir_all(path.parent().unwrap()).unwrap();
1638
1639        let manifest = SemanticManifest {
1640            manifest_version: MANIFEST_FORMAT_VERSION + 1,
1641            ..Default::default()
1642        };
1643        let json = serde_json::to_string(&manifest).unwrap();
1644        fs::write(&path, json).unwrap();
1645
1646        let result = SemanticManifest::load(temp.path());
1647        assert!(matches!(
1648            result,
1649            Err(ManifestError::UnsupportedVersion { .. })
1650        ));
1651    }
1652
1653    // ── Tier readiness (table-driven) ──────────────────────────────────
1654
1655    #[test]
1656    fn tier_readiness_cases() {
1657        let policy = test_policy();
1658        let db_fp = "fp-1234";
1659        let model_rev = "abc123";
1660        let cases: &[TierReadinessCase] = &[
1661            (
1662                "ready artifact with matching fingerprint",
1663                TierKind::Fast,
1664                true,
1665                db_fp,
1666                model_rev,
1667                no_artifact_mutation,
1668                ExpectedTierReadiness::Ready,
1669            ),
1670            (
1671                "ready artifact with changed DB fingerprint",
1672                TierKind::Fast,
1673                true,
1674                "different-fp",
1675                model_rev,
1676                no_artifact_mutation,
1677                ExpectedTierReadiness::Stale,
1678            ),
1679            (
1680                "ready artifact with changed model revision",
1681                TierKind::Quality,
1682                true,
1683                db_fp,
1684                "new-revision",
1685                no_artifact_mutation,
1686                ExpectedTierReadiness::Stale,
1687            ),
1688            (
1689                "schema version mismatch",
1690                TierKind::Quality,
1691                true,
1692                db_fp,
1693                model_rev,
1694                set_schema_version_to_zero,
1695                ExpectedTierReadiness::Incompatible,
1696            ),
1697            (
1698                "not yet published artifact",
1699                TierKind::Fast,
1700                false,
1701                db_fp,
1702                model_rev,
1703                no_artifact_mutation,
1704                ExpectedTierReadiness::Building(100),
1705            ),
1706        ];
1707
1708        for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1709            let mut artifact = test_artifact(*tier, *ready);
1710            mutate(&mut artifact);
1711            assert_tier_readiness(
1712                artifact.readiness(&policy, current_db_fp, current_model_rev),
1713                *expected,
1714                label,
1715            );
1716        }
1717    }
1718
1719    // ── Manifest-level readiness ───────────────────────────────────────
1720
1721    #[test]
1722    fn manifest_tier_readiness_missing() {
1723        let manifest = SemanticManifest::default();
1724        let policy = test_policy();
1725        assert_eq!(
1726            manifest.fast_tier_readiness(&policy, "fp", "rev"),
1727            TierReadiness::Missing,
1728        );
1729        assert_eq!(
1730            manifest.quality_tier_readiness(&policy, "fp", "rev"),
1731            TierReadiness::Missing,
1732        );
1733    }
1734
1735    #[test]
1736    fn manifest_tier_readiness_with_checkpoint() {
1737        let manifest = SemanticManifest {
1738            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1739            ..Default::default()
1740        };
1741
1742        let policy = test_policy();
1743        // Fast tier has no checkpoint → Missing
1744        assert_eq!(
1745            manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1746            TierReadiness::Missing,
1747        );
1748        // Quality tier has a valid checkpoint → Building
1749        assert!(matches!(
1750            manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1751            TierReadiness::Building { progress_pct: 50 },
1752        ));
1753    }
1754
1755    #[test]
1756    fn manifest_tier_readiness_checkpoint_invalid_db() {
1757        let manifest = SemanticManifest {
1758            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1759            ..Default::default()
1760        };
1761
1762        let policy = test_policy();
1763        // Checkpoint DB doesn't match → Missing (checkpoint invalid)
1764        assert_eq!(
1765            manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1766            TierReadiness::Missing,
1767        );
1768    }
1769
1770    // ── Hybrid search check ────────────────────────────────────────────
1771
1772    #[test]
1773    fn can_hybrid_search_requires_usable_fast_tier() {
1774        let policy = test_policy();
1775        let db_fp = "fp-1234";
1776        let rev = "abc123";
1777
1778        // No fast tier → can't hybrid
1779        let manifest = SemanticManifest::default();
1780        assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1781
1782        // Fast tier present → can hybrid
1783        let manifest = SemanticManifest {
1784            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1785            ..Default::default()
1786        };
1787        assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1788    }
1789
1790    // ── Backlog ledger ─────────────────────────────────────────────────
1791
1792    #[test]
1793    fn backlog_remaining_and_pending() {
1794        let ledger = BacklogLedger {
1795            total_conversations: 1000,
1796            fast_tier_processed: 800,
1797            quality_tier_processed: 300,
1798            db_fingerprint: "fp".to_owned(),
1799            computed_at_ms: 0,
1800        };
1801
1802        assert_eq!(ledger.fast_tier_remaining(), 200);
1803        assert_eq!(ledger.quality_tier_remaining(), 700);
1804        assert!(ledger.has_pending_work());
1805        assert!(ledger.is_current("fp"));
1806        assert!(!ledger.is_current("other"));
1807    }
1808
1809    #[test]
1810    fn backlog_no_pending_when_fully_processed() {
1811        let ledger = BacklogLedger {
1812            total_conversations: 500,
1813            fast_tier_processed: 500,
1814            quality_tier_processed: 500,
1815            db_fingerprint: "fp".to_owned(),
1816            computed_at_ms: 0,
1817        };
1818
1819        assert_eq!(ledger.fast_tier_remaining(), 0);
1820        assert_eq!(ledger.quality_tier_remaining(), 0);
1821        assert!(!ledger.has_pending_work());
1822    }
1823
1824    // ── Build checkpoint ───────────────────────────────────────────────
1825
1826    #[test]
1827    fn checkpoint_progress_and_completion() -> Result<(), String> {
1828        let cp = test_checkpoint(TierKind::Quality);
1829        assert_eq!(cp.progress_pct(), 50);
1830        assert!(!cp.is_complete());
1831        assert!(cp.is_valid("fp-1234"));
1832        assert!(!cp.is_valid("other-fp"));
1833
1834        // Complete checkpoint
1835        let mut cp = test_checkpoint(TierKind::Quality);
1836        cp.conversations_processed = 1000;
1837        assert_eq!(cp.progress_pct(), 99);
1838        if cp.is_complete() {
1839            return Err("count parity alone should not complete a checkpoint".to_string());
1840        }
1841        cp.cursor_exhausted = true;
1842        assert_eq!(cp.progress_pct(), 100);
1843        assert!(cp.is_complete());
1844        Ok(())
1845    }
1846
1847    #[test]
1848    fn checkpoint_zero_total_gives_zero_pct() {
1849        let mut cp = test_checkpoint(TierKind::Fast);
1850        cp.total_conversations = 0;
1851        cp.conversations_processed = 0;
1852        assert_eq!(cp.progress_pct(), 0);
1853    }
1854
1855    // ── Publish and clear ──────────────────────────────────────────────
1856
1857    #[test]
1858    fn publish_artifact_clears_matching_checkpoint() {
1859        let mut manifest = SemanticManifest {
1860            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1861            ..Default::default()
1862        };
1863
1864        manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1865        assert!(manifest.checkpoint.is_none());
1866        assert!(manifest.quality_tier.is_some());
1867    }
1868
1869    #[test]
1870    fn publish_artifact_keeps_non_matching_checkpoint() {
1871        let mut manifest = SemanticManifest {
1872            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1873            ..Default::default()
1874        };
1875
1876        manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1877        assert!(manifest.checkpoint.is_some()); // Quality checkpoint survives
1878        assert!(manifest.fast_tier.is_some());
1879    }
1880
1881    // ── Refresh backlog ────────────────────────────────────────────────
1882
1883    #[test]
1884    fn refresh_backlog_computes_from_ready_artifacts() {
1885        let mut manifest = SemanticManifest {
1886            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1887            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1888            ..Default::default()
1889        };
1890
1891        manifest.refresh_backlog(2000, "fp-1234");
1892        assert_eq!(manifest.backlog.total_conversations, 2000);
1893        assert_eq!(manifest.backlog.fast_tier_processed, 250);
1894        assert_eq!(manifest.backlog.quality_tier_processed, 250);
1895    }
1896
1897    #[test]
1898    fn refresh_backlog_ignores_stale_artifacts() {
1899        let mut manifest = SemanticManifest {
1900            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1901            ..Default::default()
1902        };
1903
1904        // DB fingerprint doesn't match → artifact not counted
1905        manifest.refresh_backlog(2000, "different-fp");
1906        assert_eq!(manifest.backlog.fast_tier_processed, 0);
1907    }
1908
1909    // ── Invalidation ───────────────────────────────────────────────────
1910
1911    #[test]
1912    fn invalidate_incompatible_removes_schema_mismatch() {
1913        let mut artifact = test_artifact(TierKind::Quality, true);
1914        artifact.schema_version = 0; // mismatch
1915        let mut manifest = SemanticManifest {
1916            quality_tier: Some(artifact),
1917            hnsw: Some(test_hnsw()), // depends on quality tier
1918            ..Default::default()
1919        };
1920
1921        let policy = test_policy();
1922        let count = manifest.invalidate_incompatible(&policy, "abc123");
1923
1924        assert_eq!(count, 2); // quality + hnsw
1925        assert!(manifest.quality_tier.is_none());
1926        assert!(manifest.hnsw.is_none());
1927    }
1928
1929    #[test]
1930    fn invalidate_incompatible_keeps_compatible() {
1931        let mut manifest = SemanticManifest {
1932            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1933            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1934            ..Default::default()
1935        };
1936
1937        let policy = test_policy();
1938        let count = manifest.invalidate_incompatible(&policy, "abc123");
1939
1940        assert_eq!(count, 0);
1941        assert!(manifest.fast_tier.is_some());
1942        assert!(manifest.quality_tier.is_some());
1943    }
1944
1945    // ── Legacy adoption ────────────────────────────────────────────────
1946
1947    #[test]
1948    fn adopt_legacy_artifact() {
1949        let mut manifest = SemanticManifest::default();
1950        let doc_count = 500;
1951        let conversation_count = 125;
1952        let index_path = "vector_index/index-fnv1a-384.fsvi";
1953        let db_fingerprint = "fp-old";
1954        let size_bytes = 75_000;
1955        let adopted = manifest.adopt_legacy_artifact(
1956            TierKind::Fast,
1957            "fnv1a-384",
1958            "hash",
1959            384,
1960            doc_count,
1961            conversation_count,
1962            db_fingerprint,
1963            index_path,
1964            size_bytes,
1965        );
1966
1967        assert!(adopted);
1968        let fast = manifest.fast_tier.as_ref().unwrap();
1969        assert_eq!(fast.embedder_id, "fnv1a-384");
1970        assert!(fast.ready);
1971        assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1972    }
1973
1974    // ── Size accounting ────────────────────────────────────────────────
1975
1976    #[test]
1977    fn total_size_accounts_for_all_artifacts() {
1978        let manifest = SemanticManifest {
1979            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1980            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1981            hnsw: Some(test_hnsw()),
1982            ..Default::default()
1983        };
1984
1985        assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1986        assert_eq!(manifest.total_size_mb(), 1); // 350KB rounds up to 1MB
1987    }
1988
1989    #[test]
1990    fn total_size_empty_is_zero() {
1991        let manifest = SemanticManifest::default();
1992        assert_eq!(manifest.total_size_bytes(), 0);
1993        assert_eq!(manifest.total_size_mb(), 0);
1994    }
1995
1996    // ── JSON round-trip ────────────────────────────────────────────────
1997
1998    #[test]
1999    fn manifest_json_round_trip() {
2000        let manifest = SemanticManifest {
2001            fast_tier: Some(test_artifact(TierKind::Fast, true)),
2002            quality_tier: Some(test_artifact(TierKind::Quality, true)),
2003            hnsw: Some(test_hnsw()),
2004            checkpoint: Some(test_checkpoint(TierKind::Quality)),
2005            ..Default::default()
2006        };
2007
2008        let json = serde_json::to_string_pretty(&manifest).unwrap();
2009        let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
2010        assert_eq!(deser.fast_tier, manifest.fast_tier);
2011        assert_eq!(deser.quality_tier, manifest.quality_tier);
2012        assert_eq!(deser.hnsw, manifest.hnsw);
2013        assert_eq!(deser.checkpoint, manifest.checkpoint);
2014    }
2015
2016    // ── Shard sidecar manifest ─────────────────────────────────────────
2017
2018    #[test]
2019    fn shard_manifest_round_trip_via_sidecar() {
2020        let temp = tempfile::tempdir().unwrap();
2021        let mut shards = SemanticShardManifest::default();
2022        shards.replace_shards_for_generation(
2023            TierKind::Fast,
2024            "fnv1a-384",
2025            "fp-sharded",
2026            vec![test_shard(1, 2, true), test_shard(0, 2, true)],
2027        );
2028
2029        shards.save(temp.path()).unwrap();
2030        let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
2031
2032        assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
2033        assert_eq!(loaded.shards.len(), 2);
2034        assert_eq!(loaded.shards[0].shard_index, 0);
2035        assert_eq!(loaded.shards[1].shard_index, 1);
2036        assert!(loaded.updated_at_ms > 0);
2037    }
2038
2039    #[test]
2040    fn shard_summary_requires_every_ready_shard() {
2041        let mut shards = SemanticShardManifest::default();
2042        shards.replace_shards_for_generation(
2043            TierKind::Fast,
2044            "fnv1a-384",
2045            "fp-sharded",
2046            vec![test_shard(0, 3, true), test_shard(2, 3, true)],
2047        );
2048
2049        let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2050        assert_eq!(partial.shard_count, 3);
2051        assert_eq!(partial.ready_shards, 2);
2052        assert!(!partial.complete);
2053
2054        shards.replace_shards_for_generation(
2055            TierKind::Fast,
2056            "fnv1a-384",
2057            "fp-sharded",
2058            vec![
2059                test_shard(0, 3, true),
2060                test_shard(1, 3, true),
2061                test_shard(2, 3, true),
2062            ],
2063        );
2064
2065        let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2066        assert_eq!(complete.ready_shards, 3);
2067        assert!(complete.complete);
2068        assert_eq!(complete.doc_count, 75);
2069        assert_eq!(complete.total_conversations, 10);
2070    }
2071
2072    #[test]
2073    fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
2074        let mut non_mmap = test_shard(0, 1, true);
2075        non_mmap.mmap_ready = false;
2076        let mut shards = SemanticShardManifest::default();
2077        shards.replace_shards_for_generation(
2078            TierKind::Fast,
2079            "fnv1a-384",
2080            "fp-sharded",
2081            vec![non_mmap],
2082        );
2083
2084        let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2085        assert_eq!(non_mmap_summary.ready_shards, 0);
2086        assert!(!non_mmap_summary.complete);
2087
2088        let mut inconsistent = test_shard(1, 3, true);
2089        inconsistent.ann_ready = true;
2090        inconsistent.ann_index_path = None;
2091        inconsistent.ann_size_bytes = 4096;
2092        shards.replace_shards_for_generation(
2093            TierKind::Fast,
2094            "fnv1a-384",
2095            "fp-sharded",
2096            vec![test_shard(0, 2, true), inconsistent],
2097        );
2098
2099        let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2100        assert_eq!(inconsistent_summary.shard_count, 3);
2101        assert_eq!(inconsistent_summary.ready_shards, 2);
2102        assert_eq!(inconsistent_summary.ann_ready_shards, 0);
2103        assert!(!inconsistent_summary.complete);
2104
2105        shards.replace_shards_for_generation(
2106            TierKind::Fast,
2107            "fnv1a-384",
2108            "fp-sharded",
2109            vec![
2110                test_shard(0, 2, true),
2111                test_shard(1, 2, true),
2112                test_shard(1, 2, false),
2113            ],
2114        );
2115        let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2116        assert_eq!(duplicate_summary.shard_count, 2);
2117        assert_eq!(duplicate_summary.ready_shards, 2);
2118        assert!(
2119            !duplicate_summary.complete,
2120            "duplicate shard indexes must not summarize as a complete generation"
2121        );
2122
2123        let mut duplicate_path = test_shard(1, 2, true);
2124        duplicate_path.index_path = test_shard(0, 2, true).index_path;
2125        shards.replace_shards_for_generation(
2126            TierKind::Fast,
2127            "fnv1a-384",
2128            "fp-sharded",
2129            vec![test_shard(0, 2, true), duplicate_path],
2130        );
2131        let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2132        assert_eq!(duplicate_path_summary.shard_count, 2);
2133        assert_eq!(duplicate_path_summary.ready_shards, 2);
2134        assert!(
2135            !duplicate_path_summary.complete,
2136            "duplicate shard index paths must not summarize as a complete generation"
2137        );
2138
2139        let mut blank_path = test_shard(0, 1, true);
2140        blank_path.index_path.clear();
2141        shards.replace_shards_for_generation(
2142            TierKind::Fast,
2143            "fnv1a-384",
2144            "fp-sharded",
2145            vec![blank_path],
2146        );
2147        let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2148        assert_eq!(blank_path_summary.shard_count, 1);
2149        assert_eq!(blank_path_summary.ready_shards, 1);
2150        assert!(
2151            !blank_path_summary.complete,
2152            "blank shard index paths must not summarize as complete"
2153        );
2154
2155        for unsafe_path in [
2156            tempfile::tempdir()
2157                .unwrap()
2158                .path()
2159                .join("outside.fsvi")
2160                .to_string_lossy()
2161                .to_string(),
2162            "vector_index/shards/../outside.fsvi".to_string(),
2163            "./vector_index/shards/fast/hash.fsvi".to_string(),
2164            " vector_index/shards/fast/hash.fsvi".to_string(),
2165        ] {
2166            let mut unsafe_shard = test_shard(0, 1, true);
2167            unsafe_shard.index_path = unsafe_path;
2168            shards.replace_shards_for_generation(
2169                TierKind::Fast,
2170                "fnv1a-384",
2171                "fp-sharded",
2172                vec![unsafe_shard],
2173            );
2174            let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2175            assert_eq!(unsafe_path_summary.shard_count, 1);
2176            assert_eq!(unsafe_path_summary.ready_shards, 1);
2177            assert!(
2178                !unsafe_path_summary.complete,
2179                "unsafe shard index paths must not summarize as complete"
2180            );
2181        }
2182
2183        let outside_ann_dir = tempfile::tempdir().unwrap();
2184        for unsafe_ann_path in [
2185            outside_ann_dir
2186                .path()
2187                .join("outside.chsw")
2188                .to_string_lossy()
2189                .to_string(),
2190            "vector_index/shards/../outside.chsw".to_string(),
2191            "./vector_index/shards/fast/hash.chsw".to_string(),
2192            " vector_index/shards/fast/hash.chsw".to_string(),
2193        ] {
2194            let mut unsafe_ann = test_shard(0, 1, true);
2195            unsafe_ann.ann_ready = true;
2196            unsafe_ann.ann_index_path = Some(unsafe_ann_path);
2197            unsafe_ann.ann_size_bytes = 4096;
2198            shards.replace_shards_for_generation(
2199                TierKind::Fast,
2200                "fnv1a-384",
2201                "fp-sharded",
2202                vec![unsafe_ann],
2203            );
2204            let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2205            assert_eq!(unsafe_ann_summary.shard_count, 1);
2206            assert_eq!(unsafe_ann_summary.ready_shards, 1);
2207            assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
2208            assert!(
2209                unsafe_ann_summary.complete,
2210                "unsafe optional ANN paths must not invalidate the vector shard generation"
2211            );
2212        }
2213
2214        let mut duplicate_ann_path = test_shard(1, 2, true);
2215        duplicate_ann_path.ann_ready = true;
2216        duplicate_ann_path.ann_index_path =
2217            Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
2218        duplicate_ann_path.ann_size_bytes = 4096;
2219        let mut first_ann_path = test_shard(0, 2, true);
2220        first_ann_path.ann_ready = true;
2221        first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2222        first_ann_path.ann_size_bytes = 4096;
2223        shards.replace_shards_for_generation(
2224            TierKind::Fast,
2225            "fnv1a-384",
2226            "fp-sharded",
2227            vec![first_ann_path, duplicate_ann_path],
2228        );
2229        let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2230        assert_eq!(duplicate_ann_summary.shard_count, 2);
2231        assert_eq!(duplicate_ann_summary.ready_shards, 2);
2232        assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2233        assert!(
2234            duplicate_ann_summary.complete,
2235            "duplicate optional ANN paths must not invalidate the vector shard generation"
2236        );
2237
2238        shards.replace_shards_for_generation(
2239            TierKind::Fast,
2240            "fnv1a-384",
2241            "fp-sharded",
2242            vec![test_shard(2, 2, true)],
2243        );
2244        let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2245        assert_eq!(out_of_range_summary.shard_count, 2);
2246        assert_eq!(out_of_range_summary.ready_shards, 1);
2247        assert!(
2248            !out_of_range_summary.complete,
2249            "shard indexes outside the declared shard count are malformed"
2250        );
2251
2252        let mut mismatched_metadata = test_shard(1, 2, true);
2253        mismatched_metadata.dimension = 768;
2254        shards.replace_shards_for_generation(
2255            TierKind::Fast,
2256            "fnv1a-384",
2257            "fp-sharded",
2258            vec![test_shard(0, 2, true), mismatched_metadata],
2259        );
2260        let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2261        assert_eq!(metadata_summary.shard_count, 2);
2262        assert_eq!(metadata_summary.ready_shards, 2);
2263        assert!(
2264            !metadata_summary.complete,
2265            "complete shard generations require consistent shard metadata"
2266        );
2267
2268        let mut stale_schema = test_shard(0, 1, true);
2269        stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2270        shards.replace_shards_for_generation(
2271            TierKind::Fast,
2272            "fnv1a-384",
2273            "fp-sharded",
2274            vec![stale_schema],
2275        );
2276        let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2277        assert_eq!(stale_schema_summary.shard_count, 1);
2278        assert_eq!(stale_schema_summary.ready_shards, 1);
2279        assert!(
2280            !stale_schema_summary.complete,
2281            "stale schema shards must not summarize as complete"
2282        );
2283    }
2284
2285    #[test]
2286    fn shard_sidecar_does_not_make_main_manifest_ready() {
2287        let mut shards = SemanticShardManifest::default();
2288        shards.replace_shards_for_generation(
2289            TierKind::Fast,
2290            "fnv1a-384",
2291            "fp-sharded",
2292            vec![test_shard(0, 1, true)],
2293        );
2294        assert!(
2295            shards
2296                .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2297                .complete
2298        );
2299
2300        let manifest = SemanticManifest::default();
2301        let policy = test_policy();
2302        assert_eq!(
2303            manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2304            TierReadiness::Missing,
2305            "sidecar shards must not publish runtime semantic readiness"
2306        );
2307    }
2308
2309    #[test]
2310    fn shard_manifest_invalidates_incompatible_shards() {
2311        let mut bad = test_shard(0, 1, true);
2312        bad.schema_version = 0;
2313        let mut shards = SemanticShardManifest {
2314            shards: vec![bad, test_shard(0, 1, true)],
2315            ..Default::default()
2316        };
2317
2318        let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2319
2320        assert_eq!(invalidated, 1);
2321        assert_eq!(shards.shards.len(), 1);
2322        assert_eq!(shards.total_size_bytes(), 4096);
2323    }
2324}