Skip to main content

coding_agent_search/search/
semantic_manifest.rs

1//! Durable semantic asset manifest, backlog ledger, and resumable checkpoints.
2//!
3//! This module is the authoritative state model for semantic assets.  It tells
4//! cass exactly what semantic artifacts exist, how trustworthy they are, and
5//! what work remains to converge the corpus — enabling partial readiness,
6//! resumable builds, and truthful runtime degradation.
7//!
8//! # Storage
9//!
10//! The manifest is a single JSON file at:
11//! ```text
12//! {data_dir}/vector_index/semantic_manifest.json
13//! ```
14//!
15//! It is written atomically (write-to-temp then rename) and is the only file
16//! the backfill worker needs to consult to know what work remains.
17//!
18//! # Relationship to other modules
19//!
20//! - **[`policy`]**: Provides the contract (versions, budgets, tier names) that
21//!   this manifest fingerprints against.
22//! - **[`asset_state`]**: Evaluates coarse readiness from this manifest plus
23//!   live file probes.
24//! - **[`model_manager`]**: Detects model availability; this module records
25//!   which model was used to build each artifact.
26
27use std::fs::{self, OpenOptions};
28use std::io::Write as IoWrite;
29use std::path::{Component, Path, PathBuf};
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use ring::rand::{SecureRandom, SystemRandom};
33use serde::{Deserialize, Serialize};
34
35use super::policy::{
36    CHUNKING_STRATEGY_VERSION, InvalidationAction, SEMANTIC_SCHEMA_VERSION,
37    SemanticAssetManifest as PolicyManifest, SemanticPolicy,
38};
39
40// ─── Constants ─────────────────────────────────────────────────────────────
41
42/// Current manifest format version.  Bump when the JSON schema changes in a
43/// backwards-incompatible way.
44/// Manifest format version.
45///
46/// History:
47/// - **v1** (pre-cass#257): `BuildCheckpoint` resume cursor is the
48///   conversation offset only.
49/// - **v2** (cass#257 sub-fix 2): `BuildCheckpoint` may additionally
50///   carry `last_message_id`, an inclusive canonical message PK
51///   advanced by every batch. Resume strictly skips messages ≤ this
52///   cursor so an interrupted bounded run never re-embeds work it
53///   already staged. The new field is `Option<i64>` with
54///   `#[serde(default)]`, so the JSON-on-disk shape only differs when
55///   at least one batch persisted the cursor — a freshly-created
56///   manifest still parses cleanly under v1 readers.
57///
58/// **Compatibility:**
59/// - **Old binary reading v2 manifest:** clean `UnsupportedVersion`
60///   error from `load()`; operator sees a clear "manifest version
61///   $V is newer than max-supported $MAX" message and can upgrade.
62/// - **New binary reading v1 manifest:** loads fine. `last_message_id`
63///   defaults to `None`; resume falls back to the conversation offset
64///   with a one-shot warning that resume granularity is coarser than
65///   ideal until the next checkpoint save bumps the on-disk shape.
66pub const MANIFEST_FORMAT_VERSION: u32 = 2;
67
68/// Highest manifest-version emitted by pre-cass#257 binaries; loading
69/// this is fully supported, but resume granularity will be coarser
70/// until a fresh checkpoint is saved. Kept as a named constant so the
71/// fallback warning quotes a stable number.
72pub const MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR: u32 = 1;
73
74/// Filename for the durable manifest.
75pub const MANIFEST_FILENAME: &str = "semantic_manifest.json";
76
77/// Filename for the prototype per-shard semantic artifact manifest.
78pub const SHARD_MANIFEST_FILENAME: &str = "semantic_shards.json";
79
80pub(crate) fn semantic_shard_artifact_path_is_safe(recorded_path: &str) -> bool {
81    let trimmed = recorded_path.trim();
82    if trimmed.is_empty() || trimmed != recorded_path {
83        return false;
84    }
85    let path = Path::new(recorded_path);
86    !path.is_absolute()
87        && path
88            .components()
89            .all(|component| matches!(component, Component::Normal(_)))
90}
91
92// ─── Tier kind ─────────────────────────────────────────────────────────────
93
94/// Which semantic tier an artifact or checkpoint belongs to.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
96#[serde(rename_all = "snake_case")]
97pub enum TierKind {
98    Fast,
99    Quality,
100}
101
102impl TierKind {
103    pub fn as_str(&self) -> &'static str {
104        match self {
105            Self::Fast => "fast",
106            Self::Quality => "quality",
107        }
108    }
109}
110
111// ─── Tier readiness ────────────────────────────────────────────────────────
112
113/// Readiness of a single semantic tier.
114#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(rename_all = "snake_case")]
116pub enum TierReadiness {
117    /// Artifact exists, verified, and current with the DB.
118    Ready,
119    /// Build is in progress (checkpoint present).
120    Building { progress_pct: u8 },
121    /// Artifact exists but DB or model changed since it was built.
122    Stale { reason: String },
123    /// No artifact at all for this tier.
124    Missing,
125    /// Schema or chunking version mismatch — must discard and rebuild.
126    Incompatible { reason: String },
127}
128
129impl TierReadiness {
130    pub fn is_ready(&self) -> bool {
131        matches!(self, Self::Ready)
132    }
133
134    pub fn is_usable(&self) -> bool {
135        matches!(self, Self::Ready | Self::Stale { .. })
136    }
137}
138
139// ─── Artifact record ───────────────────────────────────────────────────────
140
141/// Durable metadata for a single vector index artifact.
142#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
143pub struct ArtifactRecord {
144    /// Which tier this artifact belongs to.
145    pub tier: TierKind,
146    /// Embedder ID that produced these vectors (e.g., "minilm-384", "fnv1a-384").
147    pub embedder_id: String,
148    /// Model revision hash (HuggingFace commit or "hash" for the hash embedder).
149    pub model_revision: String,
150    /// Semantic schema version at build time.
151    pub schema_version: u32,
152    /// Chunking strategy version at build time.
153    pub chunking_version: u32,
154    /// Output dimension of the embedder.
155    pub dimension: usize,
156    /// Number of documents (message chunks) embedded.
157    pub doc_count: u64,
158    /// Number of conversations processed to produce this artifact.
159    pub conversation_count: u64,
160    /// Storage fingerprint of the canonical DB when this artifact was built.
161    pub db_fingerprint: String,
162    /// Relative path to the index file (from data_dir).
163    pub index_path: String,
164    /// File size in bytes.
165    pub size_bytes: u64,
166    /// Unix timestamp (ms) when the build started.
167    pub started_at_ms: i64,
168    /// Unix timestamp (ms) when the build completed.
169    pub completed_at_ms: i64,
170    /// Whether this artifact has been verified and published.
171    pub ready: bool,
172}
173
174impl ArtifactRecord {
175    /// Convert to the policy-level manifest for invalidation checks.
176    pub fn to_policy_manifest(&self) -> PolicyManifest {
177        PolicyManifest {
178            embedder_id: self.embedder_id.clone(),
179            model_revision: self.model_revision.clone(),
180            schema_version: self.schema_version,
181            chunking_version: self.chunking_version,
182            doc_count: self.doc_count,
183            built_at_ms: self.completed_at_ms,
184        }
185    }
186
187    /// Evaluate this artifact's readiness against the current policy and DB
188    /// fingerprint.
189    ///
190    /// **Note**: This checks schema/chunking versions, mode, model revision,
191    /// and DB fingerprint.  It does NOT detect embedder changes because the
192    /// expected embedder ID requires the embedder registry to resolve.
193    /// Callers needing embedder-change detection should call
194    /// [`SemanticAssetManifest::invalidation_action`] directly with the
195    /// correct `expected_embedder_id` from the registry.
196    pub fn readiness(
197        &self,
198        policy: &SemanticPolicy,
199        current_db_fingerprint: &str,
200        current_model_revision: &str,
201    ) -> TierReadiness {
202        let action = self.to_policy_manifest().invalidation_action(
203            policy,
204            current_model_revision,
205            &self.embedder_id,
206        );
207
208        match action {
209            InvalidationAction::UpToDate => {
210                if self.db_fingerprint != current_db_fingerprint {
211                    TierReadiness::Stale {
212                        reason: "DB content changed since artifact was built".to_owned(),
213                    }
214                } else if !self.ready {
215                    TierReadiness::Building { progress_pct: 100 }
216                } else {
217                    TierReadiness::Ready
218                }
219            }
220            InvalidationAction::RebuildInBackground => TierReadiness::Stale {
221                reason: "model revision changed; vectors usable until rebuild completes".to_owned(),
222            },
223            InvalidationAction::DiscardAndRebuild { reason } => {
224                TierReadiness::Incompatible { reason }
225            }
226            InvalidationAction::Evict => TierReadiness::Incompatible {
227                reason: "semantic mode set to lexical-only".to_owned(),
228            },
229        }
230    }
231}
232
233// ─── HNSW accelerator record ──────────────────────────────────────────────
234
235/// Durable metadata for an HNSW accelerator index.
236#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
237pub struct HnswRecord {
238    /// Which base artifact this accelerates.
239    pub base_tier: TierKind,
240    /// Embedder ID of the base artifact.
241    pub embedder_id: String,
242    /// ef_search parameter used at build time.
243    pub ef_search: usize,
244    /// Relative path to the HNSW index file (from data_dir).
245    pub index_path: String,
246    /// File size in bytes.
247    pub size_bytes: u64,
248    /// Unix timestamp (ms) when built.
249    pub built_at_ms: i64,
250    /// Whether this index is ready for use.
251    pub ready: bool,
252}
253
254// ─── Sharded vector artifact sidecar ─────────────────────────────────────
255
256/// Durable metadata for one mmap-friendly semantic vector shard.
257///
258/// Shards deliberately live in a sidecar manifest instead of
259/// [`SemanticManifest`]. Runtime readiness continues to flow through the
260/// existing tier artifact records, so partial shard generations cannot make a
261/// semantic tier look ready before a promotion step has merged or selected a
262/// complete generation.
263#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
264pub struct SemanticShardRecord {
265    /// Which semantic tier this shard belongs to.
266    pub tier: TierKind,
267    /// Embedder ID that produced the vectors.
268    pub embedder_id: String,
269    /// Model revision hash or "hash" for the hash embedder.
270    pub model_revision: String,
271    /// Semantic schema version at build time.
272    pub schema_version: u32,
273    /// Chunking strategy version at build time.
274    pub chunking_version: u32,
275    /// Output dimension of the embedder.
276    pub dimension: usize,
277    /// Zero-based shard number within the generation.
278    pub shard_index: u32,
279    /// Total shards expected for this generation.
280    pub shard_count: u32,
281    /// Number of documents embedded in this shard.
282    pub doc_count: u64,
283    /// Total conversations represented by the full shard generation.
284    pub total_conversations: u64,
285    /// Storage fingerprint of the canonical DB when this shard was built.
286    pub db_fingerprint: String,
287    /// Relative path to the shard index file from data_dir.
288    pub index_path: String,
289    /// Vector quantization used by the shard file.
290    pub quantization: String,
291    /// Whether readers may mmap this artifact directly.
292    pub mmap_ready: bool,
293    /// Relative path to the shard-local ANN accelerator, when built.
294    pub ann_index_path: Option<String>,
295    /// File size of the ANN accelerator in bytes.
296    pub ann_size_bytes: u64,
297    /// Whether the shard-local ANN accelerator is ready for use.
298    pub ann_ready: bool,
299    /// File size in bytes.
300    pub size_bytes: u64,
301    /// Unix timestamp (ms) when the shard build started.
302    pub started_at_ms: i64,
303    /// Unix timestamp (ms) when the shard build completed.
304    pub completed_at_ms: i64,
305    /// Whether this shard has been verified and published to the sidecar.
306    pub ready: bool,
307}
308
309impl SemanticShardRecord {
310    pub fn to_policy_manifest(&self) -> PolicyManifest {
311        PolicyManifest {
312            embedder_id: self.embedder_id.clone(),
313            model_revision: self.model_revision.clone(),
314            schema_version: self.schema_version,
315            chunking_version: self.chunking_version,
316            doc_count: self.doc_count,
317            built_at_ms: self.completed_at_ms,
318        }
319    }
320
321    pub fn matches_generation(
322        &self,
323        tier: TierKind,
324        embedder_id: &str,
325        db_fingerprint: &str,
326    ) -> bool {
327        self.tier == tier
328            && self.embedder_id == embedder_id
329            && self.db_fingerprint == db_fingerprint
330    }
331}
332
333/// Aggregated readiness for a shard generation.
334#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
335pub struct SemanticShardSummary {
336    pub shard_count: u32,
337    pub ready_shards: u32,
338    pub ann_ready_shards: u32,
339    pub doc_count: u64,
340    pub total_conversations: u64,
341    pub size_bytes: u64,
342    pub ann_size_bytes: u64,
343    pub complete: bool,
344}
345
346/// Sidecar manifest for prototype semantic shard generations.
347#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
348pub struct SemanticShardManifest {
349    pub manifest_version: u32,
350    pub shards: Vec<SemanticShardRecord>,
351    pub updated_at_ms: i64,
352}
353
354impl Default for SemanticShardManifest {
355    fn default() -> Self {
356        Self {
357            manifest_version: MANIFEST_FORMAT_VERSION,
358            shards: Vec::new(),
359            updated_at_ms: 0,
360        }
361    }
362}
363
364impl SemanticShardManifest {
365    pub fn path(data_dir: &Path) -> PathBuf {
366        data_dir.join("vector_index").join(SHARD_MANIFEST_FILENAME)
367    }
368
369    pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
370        let path = Self::path(data_dir);
371        let bytes = match fs::read(&path) {
372            Ok(bytes) => bytes,
373            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
374            Err(e) => {
375                return Err(ManifestError::Io {
376                    path,
377                    source: e.to_string(),
378                });
379            }
380        };
381
382        let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
383            path: path.clone(),
384            source: e.to_string(),
385        })?;
386
387        if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
388            return Err(ManifestError::UnsupportedVersion {
389                found: manifest.manifest_version,
390                max_supported: MANIFEST_FORMAT_VERSION,
391            });
392        }
393
394        Ok(Some(manifest))
395    }
396
397    pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
398        match Self::load(data_dir) {
399            Ok(Some(manifest)) => Ok(manifest),
400            Ok(None) => Ok(Self::default()),
401            Err(e @ ManifestError::Io { .. }) => Err(e),
402            Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
403                Ok(Self::default())
404            }
405            Err(e) => Err(e),
406        }
407    }
408
409    pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
410        let path = Self::path(data_dir);
411        if let Some(parent) = path.parent() {
412            fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
413                path: parent.to_path_buf(),
414                source: e.to_string(),
415            })?;
416        }
417
418        self.updated_at_ms = now_ms();
419        let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
420            source: e.to_string(),
421        })?;
422        let (tmp_path, mut file) =
423            create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
424                path: path.clone(),
425                source: e.to_string(),
426            })?;
427        file.write_all(json.as_bytes())
428            .map_err(|e| ManifestError::Io {
429                path: tmp_path.clone(),
430                source: e.to_string(),
431            })?;
432        file.sync_all().map_err(|e| ManifestError::Io {
433            path: tmp_path.clone(),
434            source: e.to_string(),
435        })?;
436        replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
437            path: path.clone(),
438            source: e.to_string(),
439        })?;
440        sync_parent_directory(&path).map_err(|e| ManifestError::Io {
441            path: path
442                .parent()
443                .map(Path::to_path_buf)
444                .unwrap_or_else(|| path.clone()),
445            source: e.to_string(),
446        })?;
447
448        Ok(())
449    }
450
451    pub fn replace_shards_for_generation(
452        &mut self,
453        tier: TierKind,
454        embedder_id: &str,
455        db_fingerprint: &str,
456        mut shards: Vec<SemanticShardRecord>,
457    ) {
458        self.shards
459            .retain(|shard| !shard.matches_generation(tier, embedder_id, db_fingerprint));
460        self.shards.append(&mut shards);
461        self.shards.sort_by(|a, b| {
462            (
463                a.tier.as_str(),
464                &a.embedder_id,
465                &a.db_fingerprint,
466                a.shard_index,
467            )
468                .cmp(&(
469                    b.tier.as_str(),
470                    &b.embedder_id,
471                    &b.db_fingerprint,
472                    b.shard_index,
473                ))
474        });
475    }
476
477    pub fn summary(
478        &self,
479        tier: TierKind,
480        embedder_id: &str,
481        db_fingerprint: &str,
482    ) -> SemanticShardSummary {
483        let mut summary = SemanticShardSummary::default();
484        let mut ready_indices = std::collections::BTreeSet::new();
485        let mut ann_ready_indices = std::collections::BTreeSet::new();
486        let mut seen_indices = std::collections::BTreeSet::new();
487        let mut seen_index_paths = std::collections::BTreeSet::new();
488        let mut seen_ann_index_paths = std::collections::BTreeSet::new();
489        let mut expected_shard_count = None;
490        let mut expected_generation_metadata: Option<(&str, u32, u32, usize, u64, &str)> = None;
491        let mut generation_consistent = true;
492        for shard in self
493            .shards
494            .iter()
495            .filter(|shard| shard.matches_generation(tier, embedder_id, db_fingerprint))
496        {
497            if shard.shard_count == 0 || shard.shard_index >= shard.shard_count {
498                generation_consistent = false;
499            }
500            if !seen_indices.insert(shard.shard_index) {
501                generation_consistent = false;
502            }
503            if !semantic_shard_artifact_path_is_safe(&shard.index_path)
504                || !seen_index_paths.insert(&shard.index_path)
505            {
506                generation_consistent = false;
507            }
508            match expected_shard_count {
509                Some(expected) if expected != shard.shard_count => {
510                    generation_consistent = false;
511                }
512                None => expected_shard_count = Some(shard.shard_count),
513                _ => {}
514            }
515            let generation_metadata = (
516                shard.model_revision.as_str(),
517                shard.schema_version,
518                shard.chunking_version,
519                shard.dimension,
520                shard.total_conversations,
521                shard.quantization.as_str(),
522            );
523            match expected_generation_metadata {
524                Some(expected) if expected != generation_metadata => {
525                    generation_consistent = false;
526                }
527                None => expected_generation_metadata = Some(generation_metadata),
528                _ => {}
529            }
530            if shard.schema_version != SEMANTIC_SCHEMA_VERSION
531                || shard.chunking_version != CHUNKING_STRATEGY_VERSION
532                || shard.dimension == 0
533            {
534                generation_consistent = false;
535            }
536            summary.shard_count = summary.shard_count.max(shard.shard_count);
537            summary.doc_count = summary.doc_count.saturating_add(shard.doc_count);
538            summary.total_conversations =
539                summary.total_conversations.max(shard.total_conversations);
540            summary.size_bytes = summary.size_bytes.saturating_add(shard.size_bytes);
541            summary.ann_size_bytes = summary.ann_size_bytes.saturating_add(shard.ann_size_bytes);
542            if shard.ready && shard.mmap_ready {
543                ready_indices.insert(shard.shard_index);
544            }
545            if shard.ready
546                && shard.mmap_ready
547                && shard.ann_ready
548                && shard.ann_size_bytes > 0
549                && let Some(ann_index_path) = shard.ann_index_path.as_deref()
550                && semantic_shard_artifact_path_is_safe(ann_index_path)
551                && seen_ann_index_paths.insert(ann_index_path)
552            {
553                ann_ready_indices.insert(shard.shard_index);
554            }
555        }
556        summary.ready_shards = u32::try_from(ready_indices.len()).unwrap_or(u32::MAX);
557        summary.ann_ready_shards = u32::try_from(ann_ready_indices.len()).unwrap_or(u32::MAX);
558        summary.complete = generation_consistent
559            && summary.shard_count > 0
560            && summary.ready_shards == summary.shard_count
561            && (0..summary.shard_count).all(|index| ready_indices.contains(&index));
562        summary
563    }
564
565    pub fn invalidate_incompatible(
566        &mut self,
567        policy: &SemanticPolicy,
568        current_model_revision: &str,
569    ) -> usize {
570        let before = self.shards.len();
571        self.shards.retain(|shard| {
572            !matches!(
573                shard.to_policy_manifest().invalidation_action(
574                    policy,
575                    current_model_revision,
576                    &shard.embedder_id,
577                ),
578                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
579            )
580        });
581        before.saturating_sub(self.shards.len())
582    }
583
584    pub fn total_size_bytes(&self) -> u64 {
585        self.shards
586            .iter()
587            .map(|shard| shard.size_bytes)
588            .fold(0, u64::saturating_add)
589    }
590}
591
592// ─── Build checkpoint ──────────────────────────────────────────────────────
593
594/// Resumable position for an interrupted semantic build.
595///
596/// Sub-fix 2 for cass#257 added the optional `last_message_id` cursor.
597/// Resume strictly advances past this cursor when present, so that a
598/// rerun of an interrupted bounded backfill never re-embeds messages
599/// that already made it into the staged index. Pre-#257 binaries wrote
600/// checkpoints without the field (it deserializes to `None` via
601/// `#[serde(default)]`), and modern binaries fall back to the
602/// conversation offset when `last_message_id` is absent.
603#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
604pub struct BuildCheckpoint {
605    /// Which tier is being built.
606    pub tier: TierKind,
607    /// Embedder ID for this build.
608    pub embedder_id: String,
609    /// Last conversation offset processed (for pagination).
610    pub last_offset: i64,
611    /// Total documents embedded so far in this build.
612    pub docs_embedded: u64,
613    /// Total conversations processed so far.
614    pub conversations_processed: u64,
615    /// Total conversations expected (from DB at start of build).
616    pub total_conversations: u64,
617    /// DB fingerprint when this build started.
618    pub db_fingerprint: String,
619    /// Schema version for this build.
620    pub schema_version: u32,
621    /// Chunking version for this build.
622    pub chunking_version: u32,
623    /// Unix timestamp (ms) when this checkpoint was saved.
624    pub saved_at_ms: i64,
625    /// Highest canonical message PK embedded in this run so far.
626    ///
627    /// Added in cass#257 (sub-fix 2). `None` for checkpoints written
628    /// by pre-#257 binaries; new code falls back to `last_offset`
629    /// (conversation-granularity) when this is absent. New code
630    /// strictly resumes past this cursor when present.
631    #[serde(default, skip_serializing_if = "Option::is_none")]
632    pub last_message_id: Option<i64>,
633}
634
635impl BuildCheckpoint {
636    /// Progress as a percentage (0–100).
637    pub fn progress_pct(&self) -> u8 {
638        if self.total_conversations == 0 {
639            return 0;
640        }
641        let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
642        (pct as u8).min(100)
643    }
644
645    /// Whether the build is complete (all conversations processed).
646    pub fn is_complete(&self) -> bool {
647        self.conversations_processed >= self.total_conversations
648    }
649
650    /// Whether this checkpoint is still valid against the current DB and policy.
651    pub fn is_valid(&self, current_db_fingerprint: &str) -> bool {
652        self.db_fingerprint == current_db_fingerprint
653            && self.schema_version == SEMANTIC_SCHEMA_VERSION
654            && self.chunking_version == CHUNKING_STRATEGY_VERSION
655    }
656}
657
658// ─── Backlog ledger ────────────────────────────────────────────────────────
659
660/// Tracks what semantic build work remains.
661#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
662pub struct BacklogLedger {
663    /// Total conversations in the canonical DB at last check.
664    pub total_conversations: u64,
665    /// Conversations embedded in the fast tier.
666    pub fast_tier_processed: u64,
667    /// Conversations embedded in the quality tier.
668    pub quality_tier_processed: u64,
669    /// DB fingerprint when this ledger was computed.
670    pub db_fingerprint: String,
671    /// Unix timestamp (ms) when this ledger was computed.
672    pub computed_at_ms: i64,
673}
674
675impl BacklogLedger {
676    /// Remaining conversations for the fast tier.
677    pub fn fast_tier_remaining(&self) -> u64 {
678        self.total_conversations
679            .saturating_sub(self.fast_tier_processed)
680    }
681
682    /// Remaining conversations for the quality tier.
683    pub fn quality_tier_remaining(&self) -> u64 {
684        self.total_conversations
685            .saturating_sub(self.quality_tier_processed)
686    }
687
688    /// Whether either tier has outstanding work.
689    pub fn has_pending_work(&self) -> bool {
690        self.fast_tier_remaining() > 0 || self.quality_tier_remaining() > 0
691    }
692
693    /// Whether the ledger is current with the given DB fingerprint.
694    pub fn is_current(&self, current_db_fingerprint: &str) -> bool {
695        self.db_fingerprint == current_db_fingerprint
696    }
697}
698
699// ─── The top-level manifest ────────────────────────────────────────────────
700
701/// Durable, atomic semantic asset manifest.
702///
703/// This is the single source of truth for what semantic assets exist, their
704/// provenance, and what work remains.  It is loaded/saved as JSON.
705#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
706pub struct SemanticManifest {
707    /// Format version — for future migrations.
708    pub manifest_version: u32,
709    /// Fast-tier vector artifact (hash embedder).
710    pub fast_tier: Option<ArtifactRecord>,
711    /// Quality-tier vector artifact (ML embedder).
712    pub quality_tier: Option<ArtifactRecord>,
713    /// HNSW accelerator index.
714    pub hnsw: Option<HnswRecord>,
715    /// Backlog / progress tracker.
716    pub backlog: BacklogLedger,
717    /// Active build checkpoint (for resuming interrupted work).
718    pub checkpoint: Option<BuildCheckpoint>,
719    /// Unix timestamp (ms) when this manifest was last written.
720    pub updated_at_ms: i64,
721}
722
723impl Default for SemanticManifest {
724    fn default() -> Self {
725        Self {
726            manifest_version: MANIFEST_FORMAT_VERSION,
727            fast_tier: None,
728            quality_tier: None,
729            hnsw: None,
730            backlog: BacklogLedger {
731                total_conversations: 0,
732                fast_tier_processed: 0,
733                quality_tier_processed: 0,
734                db_fingerprint: String::new(),
735                computed_at_ms: 0,
736            },
737            checkpoint: None,
738            updated_at_ms: 0,
739        }
740    }
741}
742
743impl SemanticManifest {
744    // ── Path helpers ───────────────────────────────────────────────────
745
746    /// Path to the manifest file.
747    pub fn path(data_dir: &Path) -> PathBuf {
748        data_dir.join("vector_index").join(MANIFEST_FILENAME)
749    }
750
751    // ── Load / Save ───────────────────────────────────────────────────
752
753    /// Load the manifest from disk.  Returns `None` if the file doesn't
754    /// exist, `Err` if it exists but is corrupt.
755    ///
756    /// **Migration notes (cass#257 sub-fix 2):** a v1 manifest written
757    /// by a pre-#257 binary loads cleanly — `last_message_id` defaults
758    /// to `None` and resume falls back to the conversation offset.
759    /// A one-shot warning surfaces on first load so an operator sees
760    /// that resume granularity is coarser than ideal until the next
761    /// checkpoint save lands and the on-disk shape upgrades.
762    pub fn load(data_dir: &Path) -> Result<Option<Self>, ManifestError> {
763        let path = Self::path(data_dir);
764        let bytes = match fs::read(&path) {
765            Ok(b) => b,
766            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
767            Err(e) => {
768                return Err(ManifestError::Io {
769                    path,
770                    source: e.to_string(),
771                });
772            }
773        };
774
775        let manifest: Self = serde_json::from_slice(&bytes).map_err(|e| ManifestError::Parse {
776            path: path.clone(),
777            source: e.to_string(),
778        })?;
779
780        // Forward-compatible: reject future manifest versions we can't read.
781        if manifest.manifest_version > MANIFEST_FORMAT_VERSION {
782            return Err(ManifestError::UnsupportedVersion {
783                found: manifest.manifest_version,
784                max_supported: MANIFEST_FORMAT_VERSION,
785            });
786        }
787
788        // Backwards-compatible: a v1 manifest with an active checkpoint
789        // means a previous interrupted backfill saved without the
790        // `last_message_id` cursor. Warn so an operator monitoring an
791        // overnight run knows resume granularity is conversation-coarse
792        // until the next checkpoint save bumps the on-disk shape.
793        if manifest.manifest_version <= MANIFEST_FORMAT_VERSION_PRE_LAST_MESSAGE_CURSOR
794            && manifest
795                .checkpoint
796                .as_ref()
797                .is_some_and(|cp| cp.last_message_id.is_none())
798        {
799            tracing::warn!(
800                manifest_version = manifest.manifest_version,
801                supported_version = MANIFEST_FORMAT_VERSION,
802                path = %path.display(),
803                "semantic checkpoint manifest predates last_message_id cursor (cass#257 sub-fix 2); resume will fall back to conversation offset until the next checkpoint save"
804            );
805        }
806
807        Ok(Some(manifest))
808    }
809
810    /// Load the manifest, returning defaults if absent or corrupt.
811    ///
812    /// Unlike [`load`], this method treats parse errors and version mismatches
813    /// as "manifest absent" — the caller gets a clean default rather than an
814    /// error.  This is the right behaviour for runtime code that must always
815    /// make progress.
816    pub fn load_or_default(data_dir: &Path) -> Result<Self, ManifestError> {
817        match Self::load(data_dir) {
818            Ok(Some(manifest)) => Ok(manifest),
819            Ok(None) => Ok(Self::default()),
820            // I/O errors are real failures — propagate the original.
821            Err(e @ ManifestError::Io { .. }) => Err(e),
822            // Parse or version errors → treat as absent.
823            Err(ManifestError::Parse { .. } | ManifestError::UnsupportedVersion { .. }) => {
824                Ok(Self::default())
825            }
826            Err(e) => Err(e),
827        }
828    }
829
830    /// Atomically save the manifest to disk (write-to-temp then rename).
831    pub fn save(&mut self, data_dir: &Path) -> Result<(), ManifestError> {
832        let path = Self::path(data_dir);
833
834        // Ensure parent directory exists.
835        if let Some(parent) = path.parent() {
836            fs::create_dir_all(parent).map_err(|e| ManifestError::Io {
837                path: parent.to_path_buf(),
838                source: e.to_string(),
839            })?;
840        }
841
842        self.updated_at_ms = now_ms();
843
844        let json = serde_json::to_string_pretty(self).map_err(|e| ManifestError::Serialize {
845            source: e.to_string(),
846        })?;
847
848        // Atomic write: temp file → rename.
849        let (tmp_path, mut file) =
850            create_unique_manifest_temp_file(&path).map_err(|e| ManifestError::Io {
851                path: path.clone(),
852                source: e.to_string(),
853            })?;
854        file.write_all(json.as_bytes())
855            .map_err(|e| ManifestError::Io {
856                path: tmp_path.clone(),
857                source: e.to_string(),
858            })?;
859        file.sync_all().map_err(|e| ManifestError::Io {
860            path: tmp_path.clone(),
861            source: e.to_string(),
862        })?;
863        replace_file_from_temp(&tmp_path, &path).map_err(|e| ManifestError::Io {
864            path: path.clone(),
865            source: e.to_string(),
866        })?;
867        sync_parent_directory(&path).map_err(|e| ManifestError::Io {
868            path: path
869                .parent()
870                .map(Path::to_path_buf)
871                .unwrap_or_else(|| path.clone()),
872            source: e.to_string(),
873        })?;
874
875        Ok(())
876    }
877
878    // ── Readiness evaluation ──────────────────────────────────────────
879
880    /// Evaluate readiness of the fast tier.
881    pub fn fast_tier_readiness(
882        &self,
883        policy: &SemanticPolicy,
884        current_db_fingerprint: &str,
885        current_model_revision: &str,
886    ) -> TierReadiness {
887        match &self.fast_tier {
888            Some(artifact) => {
889                artifact.readiness(policy, current_db_fingerprint, current_model_revision)
890            }
891            None => {
892                // Check for an active build checkpoint for this tier.
893                if let Some(cp) = &self.checkpoint
894                    && cp.tier == TierKind::Fast
895                    && cp.is_valid(current_db_fingerprint)
896                {
897                    TierReadiness::Building {
898                        progress_pct: cp.progress_pct(),
899                    }
900                } else {
901                    TierReadiness::Missing
902                }
903            }
904        }
905    }
906
907    /// Evaluate readiness of the quality tier.
908    pub fn quality_tier_readiness(
909        &self,
910        policy: &SemanticPolicy,
911        current_db_fingerprint: &str,
912        current_model_revision: &str,
913    ) -> TierReadiness {
914        match &self.quality_tier {
915            Some(artifact) => {
916                artifact.readiness(policy, current_db_fingerprint, current_model_revision)
917            }
918            None => {
919                if let Some(cp) = &self.checkpoint
920                    && cp.tier == TierKind::Quality
921                    && cp.is_valid(current_db_fingerprint)
922                {
923                    TierReadiness::Building {
924                        progress_pct: cp.progress_pct(),
925                    }
926                } else {
927                    TierReadiness::Missing
928                }
929            }
930        }
931    }
932
933    /// Whether hybrid refinement can run right now (fast tier usable).
934    pub fn can_hybrid_search(
935        &self,
936        policy: &SemanticPolicy,
937        current_db_fingerprint: &str,
938        current_model_revision: &str,
939    ) -> bool {
940        self.fast_tier_readiness(policy, current_db_fingerprint, current_model_revision)
941            .is_usable()
942    }
943
944    // ── Backlog / checkpoint management ───────────────────────────────
945
946    /// Update the backlog from the canonical DB state.
947    pub fn refresh_backlog(&mut self, total_conversations: u64, current_db_fingerprint: &str) {
948        let fast_processed = self
949            .fast_tier
950            .as_ref()
951            .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
952            .map_or(0, |a| a.conversation_count);
953        let quality_processed = self
954            .quality_tier
955            .as_ref()
956            .filter(|a| a.ready && a.db_fingerprint == current_db_fingerprint)
957            .map_or(0, |a| a.conversation_count);
958
959        self.backlog = BacklogLedger {
960            total_conversations,
961            fast_tier_processed: fast_processed,
962            quality_tier_processed: quality_processed,
963            db_fingerprint: current_db_fingerprint.to_owned(),
964            computed_at_ms: now_ms(),
965        };
966    }
967
968    /// Save a build checkpoint (called periodically during backfill).
969    pub fn save_checkpoint(&mut self, checkpoint: BuildCheckpoint) {
970        self.checkpoint = Some(checkpoint);
971    }
972
973    /// Clear the build checkpoint (called when build finishes or is abandoned).
974    pub fn clear_checkpoint(&mut self) {
975        self.checkpoint = None;
976    }
977
978    /// Record a completed artifact and clear the matching checkpoint.
979    pub fn publish_artifact(&mut self, artifact: ArtifactRecord) {
980        // Clear checkpoint if it matches this tier.
981        if self
982            .checkpoint
983            .as_ref()
984            .is_some_and(|cp| cp.tier == artifact.tier)
985        {
986            self.checkpoint = None;
987        }
988
989        match artifact.tier {
990            TierKind::Fast => self.fast_tier = Some(artifact),
991            TierKind::Quality => self.quality_tier = Some(artifact),
992        }
993    }
994
995    /// Record a completed HNSW accelerator.
996    pub fn publish_hnsw(&mut self, hnsw: HnswRecord) {
997        self.hnsw = Some(hnsw);
998    }
999
1000    /// Adopt a legacy (pre-manifest) artifact if it is compatible with the
1001    /// current schema/chunking versions.  Returns `true` if adopted.
1002    #[allow(clippy::too_many_arguments)]
1003    pub fn adopt_legacy_artifact(
1004        &mut self,
1005        tier: TierKind,
1006        embedder_id: &str,
1007        model_revision: &str,
1008        dimension: usize,
1009        doc_count: u64,
1010        conversation_count: u64,
1011        db_fingerprint: &str,
1012        index_path: &str,
1013        size_bytes: u64,
1014    ) -> bool {
1015        let record = ArtifactRecord {
1016            tier,
1017            embedder_id: embedder_id.to_owned(),
1018            model_revision: model_revision.to_owned(),
1019            schema_version: SEMANTIC_SCHEMA_VERSION,
1020            chunking_version: CHUNKING_STRATEGY_VERSION,
1021            dimension,
1022            doc_count,
1023            conversation_count,
1024            db_fingerprint: db_fingerprint.to_owned(),
1025            index_path: index_path.to_owned(),
1026            size_bytes,
1027            started_at_ms: 0,
1028            completed_at_ms: now_ms(),
1029            ready: true,
1030        };
1031
1032        match tier {
1033            TierKind::Fast => self.fast_tier = Some(record),
1034            TierKind::Quality => self.quality_tier = Some(record),
1035        }
1036        true
1037    }
1038
1039    /// Invalidate artifacts that are incompatible with the current policy.
1040    /// Returns the number of artifacts invalidated.
1041    ///
1042    /// **Note**: This detects schema version, chunking version, and mode
1043    /// incompatibilities.  It does NOT detect embedder changes (e.g., minilm →
1044    /// snowflake) because the policy stores short names while artifacts store
1045    /// full registry IDs.  Callers who need embedder-change detection should
1046    /// compare `artifact.embedder_id` against the expected ID from the
1047    /// embedder registry.
1048    pub fn invalidate_incompatible(
1049        &mut self,
1050        policy: &SemanticPolicy,
1051        current_model_revision: &str,
1052    ) -> usize {
1053        let mut count = 0;
1054
1055        if let Some(ref artifact) = self.fast_tier {
1056            let pm = artifact.to_policy_manifest();
1057            if matches!(
1058                pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1059                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1060            ) {
1061                self.fast_tier = None;
1062                count += 1;
1063            }
1064        }
1065
1066        if let Some(ref artifact) = self.quality_tier {
1067            let pm = artifact.to_policy_manifest();
1068            if matches!(
1069                pm.invalidation_action(policy, current_model_revision, &artifact.embedder_id),
1070                InvalidationAction::DiscardAndRebuild { .. } | InvalidationAction::Evict
1071            ) {
1072                self.quality_tier = None;
1073                count += 1;
1074            }
1075        }
1076
1077        // HNSW depends on the base tier — invalidate if base is gone.
1078        if let Some(ref hnsw) = self.hnsw {
1079            let base_gone = match hnsw.base_tier {
1080                TierKind::Fast => self.fast_tier.is_none(),
1081                TierKind::Quality => self.quality_tier.is_none(),
1082            };
1083            if base_gone {
1084                self.hnsw = None;
1085                count += 1;
1086            }
1087        }
1088
1089        // Invalidate checkpoint if its schema/chunking is wrong.
1090        if let Some(ref cp) = self.checkpoint
1091            && (cp.schema_version != policy.semantic_schema_version
1092                || cp.chunking_version != policy.chunking_strategy_version)
1093        {
1094            self.checkpoint = None;
1095        }
1096
1097        count
1098    }
1099
1100    /// Total disk usage of all semantic artifacts (bytes).
1101    pub fn total_size_bytes(&self) -> u64 {
1102        let fast = self.fast_tier.as_ref().map_or(0, |a| a.size_bytes);
1103        let quality = self.quality_tier.as_ref().map_or(0, |a| a.size_bytes);
1104        let hnsw = self.hnsw.as_ref().map_or(0, |h| h.size_bytes);
1105        fast + quality + hnsw
1106    }
1107
1108    /// Total disk usage in megabytes (rounded up).
1109    pub fn total_size_mb(&self) -> u64 {
1110        self.total_size_bytes().div_ceil(1_048_576)
1111    }
1112}
1113
1114// ─── Errors ────────────────────────────────────────────────────────────────
1115
1116#[derive(Debug)]
1117pub enum ManifestError {
1118    Io { path: PathBuf, source: String },
1119    Parse { path: PathBuf, source: String },
1120    Serialize { source: String },
1121    UnsupportedVersion { found: u32, max_supported: u32 },
1122}
1123
1124impl std::fmt::Display for ManifestError {
1125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1126        match self {
1127            Self::Io { path, source } => {
1128                write!(f, "manifest I/O error at {}: {source}", path.display())
1129            }
1130            Self::Parse { path, source } => {
1131                write!(f, "manifest parse error at {}: {source}", path.display())
1132            }
1133            Self::Serialize { source } => write!(f, "manifest serialization error: {source}"),
1134            Self::UnsupportedVersion {
1135                found,
1136                max_supported,
1137            } => write!(
1138                f,
1139                "manifest version {found} is newer than supported version {max_supported}"
1140            ),
1141        }
1142    }
1143}
1144
1145impl std::error::Error for ManifestError {}
1146
1147// ─── Helpers ───────────────────────────────────────────────────────────────
1148
1149fn now_ms() -> i64 {
1150    SystemTime::now()
1151        .duration_since(UNIX_EPOCH)
1152        .map(|d| d.as_millis() as i64)
1153        .unwrap_or(0)
1154}
1155
1156fn unique_manifest_temp_path(path: &Path, attempt: u32, random: u64) -> PathBuf {
1157    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1158
1159    let file_name = path
1160        .file_name()
1161        .and_then(|name| name.to_str())
1162        .unwrap_or(MANIFEST_FILENAME);
1163    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1164    path.with_file_name(format!(
1165        ".{file_name}.tmp.{attempt}.{}.{}.{random:016x}",
1166        now_ms(),
1167        nonce
1168    ))
1169}
1170
1171fn create_unique_manifest_temp_file(path: &Path) -> std::io::Result<(PathBuf, fs::File)> {
1172    for attempt in 0..100 {
1173        let random = random_manifest_path_nonce()?;
1174        let tmp_path = unique_manifest_temp_path(path, attempt, random);
1175        match OpenOptions::new()
1176            .write(true)
1177            .create_new(true)
1178            .open(&tmp_path)
1179        {
1180            Ok(file) => return Ok((tmp_path, file)),
1181            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue,
1182            Err(err) => return Err(err),
1183        }
1184    }
1185
1186    Err(std::io::Error::new(
1187        std::io::ErrorKind::AlreadyExists,
1188        format!(
1189            "could not create a unique temporary manifest file for {} after 100 attempts",
1190            path.display()
1191        ),
1192    ))
1193}
1194
1195#[cfg(windows)]
1196fn unique_manifest_backup_path(path: &Path) -> std::io::Result<PathBuf> {
1197    let random = random_manifest_path_nonce()?;
1198    static NEXT_NONCE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
1199
1200    let file_name = path
1201        .file_name()
1202        .and_then(|name| name.to_str())
1203        .unwrap_or(MANIFEST_FILENAME);
1204    let nonce = NEXT_NONCE.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1205    Ok(path.with_file_name(format!(
1206        ".{file_name}.bak.{}.{nonce}.{random:016x}",
1207        now_ms()
1208    )))
1209}
1210
1211fn random_manifest_path_nonce() -> std::io::Result<u64> {
1212    let mut random_bytes = [0u8; 8];
1213    SystemRandom::new()
1214        .fill(&mut random_bytes)
1215        .map_err(|_| std::io::Error::other("failed to generate manifest temp path nonce"))?;
1216    Ok(u64::from_le_bytes(random_bytes))
1217}
1218
1219fn replace_file_from_temp(temp_path: &Path, final_path: &Path) -> std::io::Result<()> {
1220    #[cfg(windows)]
1221    {
1222        match fs::rename(temp_path, final_path) {
1223            Ok(()) => sync_parent_directory(final_path),
1224            Err(first_err)
1225                if final_path.exists()
1226                    && matches!(
1227                        first_err.kind(),
1228                        std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
1229                    ) =>
1230            {
1231                let backup_path = unique_manifest_backup_path(final_path)?;
1232                fs::rename(final_path, &backup_path).map_err(|backup_err| {
1233                    let _ = fs::remove_file(temp_path);
1234                    std::io::Error::other(format!(
1235                        "failed preparing backup {} before replacing {}: first error: {}; backup error: {}",
1236                        backup_path.display(),
1237                        final_path.display(),
1238                        first_err,
1239                        backup_err
1240                    ))
1241                })?;
1242                match fs::rename(temp_path, final_path) {
1243                    Ok(()) => {
1244                        let _ = fs::remove_file(&backup_path);
1245                        sync_parent_directory(final_path)
1246                    }
1247                    Err(second_err) => match fs::rename(&backup_path, final_path) {
1248                        Ok(()) => {
1249                            let _ = fs::remove_file(temp_path);
1250                            sync_parent_directory(final_path)?;
1251                            Err(std::io::Error::other(format!(
1252                                "failed replacing {} with {}: first error: {}; second error: {}; restored original file",
1253                                final_path.display(),
1254                                temp_path.display(),
1255                                first_err,
1256                                second_err
1257                            )))
1258                        }
1259                        Err(restore_err) => Err(std::io::Error::other(format!(
1260                            "failed replacing {} with {}: first error: {}; second error: {}; restore error: {}; temp file retained at {}",
1261                            final_path.display(),
1262                            temp_path.display(),
1263                            first_err,
1264                            second_err,
1265                            restore_err,
1266                            temp_path.display()
1267                        ))),
1268                    },
1269                }
1270            }
1271            Err(err) => Err(err),
1272        }
1273    }
1274
1275    #[cfg(not(windows))]
1276    {
1277        fs::rename(temp_path, final_path)
1278    }
1279}
1280
1281#[cfg(not(windows))]
1282fn sync_parent_directory(path: &Path) -> std::io::Result<()> {
1283    let Some(parent) = path.parent() else {
1284        return Ok(());
1285    };
1286    let directory = fs::File::open(parent)?;
1287    directory.sync_all()
1288}
1289
1290#[cfg(windows)]
1291fn sync_parent_directory(_path: &Path) -> std::io::Result<()> {
1292    Ok(())
1293}
1294
1295// ─── Tests ─────────────────────────────────────────────────────────────────
1296
1297#[cfg(test)]
1298mod tests {
1299    use super::*;
1300    use crate::search::policy::SemanticPolicy;
1301
1302    fn test_policy() -> SemanticPolicy {
1303        SemanticPolicy::compiled_defaults()
1304    }
1305
1306    fn test_artifact(tier: TierKind, ready: bool) -> ArtifactRecord {
1307        ArtifactRecord {
1308            tier,
1309            embedder_id: match tier {
1310                TierKind::Fast => "fnv1a-384".to_owned(),
1311                TierKind::Quality => "minilm-384".to_owned(),
1312            },
1313            model_revision: "abc123".to_owned(),
1314            schema_version: SEMANTIC_SCHEMA_VERSION,
1315            chunking_version: CHUNKING_STRATEGY_VERSION,
1316            dimension: 384,
1317            doc_count: 1000,
1318            conversation_count: 250,
1319            db_fingerprint: "fp-1234".to_owned(),
1320            index_path: format!(
1321                "vector_index/index-{}.fsvi",
1322                match tier {
1323                    TierKind::Fast => "fnv1a-384",
1324                    TierKind::Quality => "minilm-384",
1325                }
1326            ),
1327            size_bytes: 150_000,
1328            started_at_ms: 1_700_000_000_000,
1329            completed_at_ms: 1_700_000_060_000,
1330            ready,
1331        }
1332    }
1333
1334    fn test_hnsw() -> HnswRecord {
1335        HnswRecord {
1336            base_tier: TierKind::Quality,
1337            embedder_id: "minilm-384".to_owned(),
1338            ef_search: 128,
1339            index_path: "vector_index/hnsw-minilm-384.chsw".to_owned(),
1340            size_bytes: 50_000,
1341            built_at_ms: 1_700_000_070_000,
1342            ready: true,
1343        }
1344    }
1345
1346    fn test_shard(shard_index: u32, shard_count: u32, ready: bool) -> SemanticShardRecord {
1347        SemanticShardRecord {
1348            tier: TierKind::Fast,
1349            embedder_id: "fnv1a-384".to_owned(),
1350            model_revision: "hash".to_owned(),
1351            schema_version: SEMANTIC_SCHEMA_VERSION,
1352            chunking_version: CHUNKING_STRATEGY_VERSION,
1353            dimension: 384,
1354            shard_index,
1355            shard_count,
1356            doc_count: 25,
1357            total_conversations: 10,
1358            db_fingerprint: "fp-sharded".to_owned(),
1359            index_path: format!("vector_index/shards/fast-fnv1a-384/shard-{shard_index:05}.fsvi"),
1360            quantization: "f16".to_owned(),
1361            mmap_ready: true,
1362            ann_index_path: None,
1363            ann_size_bytes: 0,
1364            ann_ready: false,
1365            size_bytes: 4096,
1366            started_at_ms: 1_700_000_080_000,
1367            completed_at_ms: 1_700_000_081_000,
1368            ready,
1369        }
1370    }
1371
1372    fn test_checkpoint(tier: TierKind) -> BuildCheckpoint {
1373        BuildCheckpoint {
1374            tier,
1375            embedder_id: "minilm-384".to_owned(),
1376            last_offset: 500,
1377            docs_embedded: 3000,
1378            conversations_processed: 500,
1379            total_conversations: 1000,
1380            db_fingerprint: "fp-1234".to_owned(),
1381            schema_version: SEMANTIC_SCHEMA_VERSION,
1382            chunking_version: CHUNKING_STRATEGY_VERSION,
1383            saved_at_ms: 1_700_000_030_000,
1384            last_message_id: None,
1385        }
1386    }
1387
1388    #[derive(Debug, Clone, Copy)]
1389    enum ExpectedTierReadiness {
1390        Ready,
1391        Stale,
1392        Incompatible,
1393        Building(u8),
1394    }
1395
1396    fn no_artifact_mutation(_: &mut ArtifactRecord) {}
1397
1398    type TierReadinessCase = (
1399        &'static str,
1400        TierKind,
1401        bool,
1402        &'static str,
1403        &'static str,
1404        fn(&mut ArtifactRecord),
1405        ExpectedTierReadiness,
1406    );
1407
1408    fn set_schema_version_to_zero(artifact: &mut ArtifactRecord) {
1409        artifact.schema_version = 0;
1410    }
1411
1412    fn assert_tier_readiness(actual: TierReadiness, expected: ExpectedTierReadiness, label: &str) {
1413        match expected {
1414            ExpectedTierReadiness::Ready => {
1415                assert_eq!(actual, TierReadiness::Ready, "{label}");
1416            }
1417            ExpectedTierReadiness::Stale => {
1418                assert!(
1419                    matches!(actual, TierReadiness::Stale { .. }),
1420                    "{label}: {actual:?}"
1421                );
1422            }
1423            ExpectedTierReadiness::Incompatible => {
1424                assert!(
1425                    matches!(actual, TierReadiness::Incompatible { .. }),
1426                    "{label}: {actual:?}"
1427                );
1428            }
1429            ExpectedTierReadiness::Building(progress_pct) => {
1430                assert_eq!(actual, TierReadiness::Building { progress_pct }, "{label}");
1431            }
1432        }
1433    }
1434
1435    // ── Manifest load/save round-trip ──────────────────────────────────
1436
1437    #[test]
1438    fn manifest_round_trip_via_disk() {
1439        let temp = tempfile::tempdir().unwrap();
1440        let mut manifest = SemanticManifest {
1441            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1442            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1443            hnsw: Some(test_hnsw()),
1444            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1445            backlog: BacklogLedger {
1446                total_conversations: 2000,
1447                fast_tier_processed: 1000,
1448                quality_tier_processed: 500,
1449                db_fingerprint: "fp-1234".to_owned(),
1450                computed_at_ms: 1_700_000_000_000,
1451            },
1452            ..Default::default()
1453        };
1454
1455        manifest.save(temp.path()).unwrap();
1456        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1457
1458        assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1459        assert!(loaded.fast_tier.is_some());
1460        assert!(loaded.quality_tier.is_some());
1461        assert!(loaded.hnsw.is_some());
1462        assert!(loaded.checkpoint.is_some());
1463        assert_eq!(loaded.backlog.total_conversations, 2000);
1464        assert!(loaded.updated_at_ms > 0);
1465    }
1466
1467    #[test]
1468    fn manifest_save_overwrites_existing_file() {
1469        let temp = tempfile::tempdir().unwrap();
1470        let mut first = SemanticManifest {
1471            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1472            ..Default::default()
1473        };
1474        first.save(temp.path()).unwrap();
1475
1476        let mut second = SemanticManifest {
1477            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1478            backlog: BacklogLedger {
1479                total_conversations: 99,
1480                fast_tier_processed: 0,
1481                quality_tier_processed: 99,
1482                db_fingerprint: "fp-overwrite".to_owned(),
1483                computed_at_ms: 1_700_000_000_123,
1484            },
1485            ..Default::default()
1486        };
1487        second.save(temp.path()).unwrap();
1488
1489        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
1490        assert!(loaded.fast_tier.is_none());
1491        assert!(loaded.quality_tier.is_some());
1492        assert_eq!(loaded.backlog.total_conversations, 99);
1493    }
1494
1495    #[test]
1496    fn manifest_temp_file_creation_is_exclusive_and_unique() -> Result<(), String> {
1497        let temp = tempfile::tempdir().map_err(|e| e.to_string())?;
1498        let final_path = SemanticManifest::path(temp.path());
1499        let manifest_dir = final_path
1500            .parent()
1501            .ok_or_else(|| "semantic manifest path should have a parent directory".to_string())?;
1502        fs::create_dir_all(manifest_dir).map_err(|e| e.to_string())?;
1503
1504        let (first_path, mut first_file) =
1505            create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1506        first_file.write_all(b"first").map_err(|e| e.to_string())?;
1507        let (second_path, mut second_file) =
1508            create_unique_manifest_temp_file(&final_path).map_err(|e| e.to_string())?;
1509        second_file
1510            .write_all(b"second")
1511            .map_err(|e| e.to_string())?;
1512
1513        if first_path == second_path {
1514            return Err("exclusive temp creation reused the same path".to_string());
1515        }
1516        if !first_path.exists() {
1517            return Err(format!(
1518                "first temp file is missing: {}",
1519                first_path.display()
1520            ));
1521        }
1522        if !second_path.exists() {
1523            return Err(format!(
1524                "second temp file is missing: {}",
1525                second_path.display()
1526            ));
1527        }
1528        if first_path.parent() != Some(manifest_dir) {
1529            return Err(format!(
1530                "first temp path escaped manifest directory: {}",
1531                first_path.display()
1532            ));
1533        }
1534        if second_path.parent() != Some(manifest_dir) {
1535            return Err(format!(
1536                "second temp path escaped manifest directory: {}",
1537                second_path.display()
1538            ));
1539        }
1540
1541        Ok(())
1542    }
1543
1544    #[test]
1545    fn manifest_load_missing_returns_none() {
1546        let temp = tempfile::tempdir().unwrap();
1547        let loaded = SemanticManifest::load(temp.path()).unwrap();
1548        assert!(loaded.is_none());
1549    }
1550
1551    #[test]
1552    fn manifest_load_or_default_returns_defaults() {
1553        let temp = tempfile::tempdir().unwrap();
1554        let manifest = SemanticManifest::load_or_default(temp.path()).unwrap();
1555        assert_eq!(manifest.manifest_version, MANIFEST_FORMAT_VERSION);
1556        assert!(manifest.fast_tier.is_none());
1557        assert!(manifest.quality_tier.is_none());
1558    }
1559
1560    #[test]
1561    fn manifest_load_corrupt_returns_parse_error() {
1562        let temp = tempfile::tempdir().unwrap();
1563        let path = SemanticManifest::path(temp.path());
1564        fs::create_dir_all(path.parent().unwrap()).unwrap();
1565        fs::write(&path, b"not json").unwrap();
1566
1567        let result = SemanticManifest::load(temp.path());
1568        assert!(matches!(result, Err(ManifestError::Parse { .. })));
1569    }
1570
1571    #[test]
1572    fn manifest_load_future_version_returns_error() {
1573        let temp = tempfile::tempdir().unwrap();
1574        let path = SemanticManifest::path(temp.path());
1575        fs::create_dir_all(path.parent().unwrap()).unwrap();
1576
1577        let manifest = SemanticManifest {
1578            manifest_version: MANIFEST_FORMAT_VERSION + 1,
1579            ..Default::default()
1580        };
1581        let json = serde_json::to_string(&manifest).unwrap();
1582        fs::write(&path, json).unwrap();
1583
1584        let result = SemanticManifest::load(temp.path());
1585        assert!(matches!(
1586            result,
1587            Err(ManifestError::UnsupportedVersion { .. })
1588        ));
1589    }
1590
1591    // ── Tier readiness (table-driven) ──────────────────────────────────
1592
1593    #[test]
1594    fn tier_readiness_cases() {
1595        let policy = test_policy();
1596        let db_fp = "fp-1234";
1597        let model_rev = "abc123";
1598        let cases: &[TierReadinessCase] = &[
1599            (
1600                "ready artifact with matching fingerprint",
1601                TierKind::Fast,
1602                true,
1603                db_fp,
1604                model_rev,
1605                no_artifact_mutation,
1606                ExpectedTierReadiness::Ready,
1607            ),
1608            (
1609                "ready artifact with changed DB fingerprint",
1610                TierKind::Fast,
1611                true,
1612                "different-fp",
1613                model_rev,
1614                no_artifact_mutation,
1615                ExpectedTierReadiness::Stale,
1616            ),
1617            (
1618                "ready artifact with changed model revision",
1619                TierKind::Quality,
1620                true,
1621                db_fp,
1622                "new-revision",
1623                no_artifact_mutation,
1624                ExpectedTierReadiness::Stale,
1625            ),
1626            (
1627                "schema version mismatch",
1628                TierKind::Quality,
1629                true,
1630                db_fp,
1631                model_rev,
1632                set_schema_version_to_zero,
1633                ExpectedTierReadiness::Incompatible,
1634            ),
1635            (
1636                "not yet published artifact",
1637                TierKind::Fast,
1638                false,
1639                db_fp,
1640                model_rev,
1641                no_artifact_mutation,
1642                ExpectedTierReadiness::Building(100),
1643            ),
1644        ];
1645
1646        for (label, tier, ready, current_db_fp, current_model_rev, mutate, expected) in cases {
1647            let mut artifact = test_artifact(*tier, *ready);
1648            mutate(&mut artifact);
1649            assert_tier_readiness(
1650                artifact.readiness(&policy, current_db_fp, current_model_rev),
1651                *expected,
1652                label,
1653            );
1654        }
1655    }
1656
1657    // ── Manifest-level readiness ───────────────────────────────────────
1658
1659    #[test]
1660    fn manifest_tier_readiness_missing() {
1661        let manifest = SemanticManifest::default();
1662        let policy = test_policy();
1663        assert_eq!(
1664            manifest.fast_tier_readiness(&policy, "fp", "rev"),
1665            TierReadiness::Missing,
1666        );
1667        assert_eq!(
1668            manifest.quality_tier_readiness(&policy, "fp", "rev"),
1669            TierReadiness::Missing,
1670        );
1671    }
1672
1673    #[test]
1674    fn manifest_tier_readiness_with_checkpoint() {
1675        let manifest = SemanticManifest {
1676            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1677            ..Default::default()
1678        };
1679
1680        let policy = test_policy();
1681        // Fast tier has no checkpoint → Missing
1682        assert_eq!(
1683            manifest.fast_tier_readiness(&policy, "fp-1234", "rev"),
1684            TierReadiness::Missing,
1685        );
1686        // Quality tier has a valid checkpoint → Building
1687        assert!(matches!(
1688            manifest.quality_tier_readiness(&policy, "fp-1234", "rev"),
1689            TierReadiness::Building { progress_pct: 50 },
1690        ));
1691    }
1692
1693    #[test]
1694    fn manifest_tier_readiness_checkpoint_invalid_db() {
1695        let manifest = SemanticManifest {
1696            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1697            ..Default::default()
1698        };
1699
1700        let policy = test_policy();
1701        // Checkpoint DB doesn't match → Missing (checkpoint invalid)
1702        assert_eq!(
1703            manifest.quality_tier_readiness(&policy, "other-fp", "rev"),
1704            TierReadiness::Missing,
1705        );
1706    }
1707
1708    // ── Hybrid search check ────────────────────────────────────────────
1709
1710    #[test]
1711    fn can_hybrid_search_requires_usable_fast_tier() {
1712        let policy = test_policy();
1713        let db_fp = "fp-1234";
1714        let rev = "abc123";
1715
1716        // No fast tier → can't hybrid
1717        let manifest = SemanticManifest::default();
1718        assert!(!manifest.can_hybrid_search(&policy, db_fp, rev));
1719
1720        // Fast tier present → can hybrid
1721        let manifest = SemanticManifest {
1722            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1723            ..Default::default()
1724        };
1725        assert!(manifest.can_hybrid_search(&policy, db_fp, rev));
1726    }
1727
1728    // ── Backlog ledger ─────────────────────────────────────────────────
1729
1730    #[test]
1731    fn backlog_remaining_and_pending() {
1732        let ledger = BacklogLedger {
1733            total_conversations: 1000,
1734            fast_tier_processed: 800,
1735            quality_tier_processed: 300,
1736            db_fingerprint: "fp".to_owned(),
1737            computed_at_ms: 0,
1738        };
1739
1740        assert_eq!(ledger.fast_tier_remaining(), 200);
1741        assert_eq!(ledger.quality_tier_remaining(), 700);
1742        assert!(ledger.has_pending_work());
1743        assert!(ledger.is_current("fp"));
1744        assert!(!ledger.is_current("other"));
1745    }
1746
1747    #[test]
1748    fn backlog_no_pending_when_fully_processed() {
1749        let ledger = BacklogLedger {
1750            total_conversations: 500,
1751            fast_tier_processed: 500,
1752            quality_tier_processed: 500,
1753            db_fingerprint: "fp".to_owned(),
1754            computed_at_ms: 0,
1755        };
1756
1757        assert_eq!(ledger.fast_tier_remaining(), 0);
1758        assert_eq!(ledger.quality_tier_remaining(), 0);
1759        assert!(!ledger.has_pending_work());
1760    }
1761
1762    // ── Build checkpoint ───────────────────────────────────────────────
1763
1764    #[test]
1765    fn checkpoint_progress_and_completion() {
1766        let cp = test_checkpoint(TierKind::Quality);
1767        assert_eq!(cp.progress_pct(), 50);
1768        assert!(!cp.is_complete());
1769        assert!(cp.is_valid("fp-1234"));
1770        assert!(!cp.is_valid("other-fp"));
1771
1772        // Complete checkpoint
1773        let mut cp = test_checkpoint(TierKind::Quality);
1774        cp.conversations_processed = 1000;
1775        assert_eq!(cp.progress_pct(), 100);
1776        assert!(cp.is_complete());
1777    }
1778
1779    #[test]
1780    fn checkpoint_zero_total_gives_zero_pct() {
1781        let mut cp = test_checkpoint(TierKind::Fast);
1782        cp.total_conversations = 0;
1783        cp.conversations_processed = 0;
1784        assert_eq!(cp.progress_pct(), 0);
1785    }
1786
1787    // ── Publish and clear ──────────────────────────────────────────────
1788
1789    #[test]
1790    fn publish_artifact_clears_matching_checkpoint() {
1791        let mut manifest = SemanticManifest {
1792            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1793            ..Default::default()
1794        };
1795
1796        manifest.publish_artifact(test_artifact(TierKind::Quality, true));
1797        assert!(manifest.checkpoint.is_none());
1798        assert!(manifest.quality_tier.is_some());
1799    }
1800
1801    #[test]
1802    fn publish_artifact_keeps_non_matching_checkpoint() {
1803        let mut manifest = SemanticManifest {
1804            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1805            ..Default::default()
1806        };
1807
1808        manifest.publish_artifact(test_artifact(TierKind::Fast, true));
1809        assert!(manifest.checkpoint.is_some()); // Quality checkpoint survives
1810        assert!(manifest.fast_tier.is_some());
1811    }
1812
1813    // ── Refresh backlog ────────────────────────────────────────────────
1814
1815    #[test]
1816    fn refresh_backlog_computes_from_ready_artifacts() {
1817        let mut manifest = SemanticManifest {
1818            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1819            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1820            ..Default::default()
1821        };
1822
1823        manifest.refresh_backlog(2000, "fp-1234");
1824        assert_eq!(manifest.backlog.total_conversations, 2000);
1825        assert_eq!(manifest.backlog.fast_tier_processed, 250);
1826        assert_eq!(manifest.backlog.quality_tier_processed, 250);
1827    }
1828
1829    #[test]
1830    fn refresh_backlog_ignores_stale_artifacts() {
1831        let mut manifest = SemanticManifest {
1832            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1833            ..Default::default()
1834        };
1835
1836        // DB fingerprint doesn't match → artifact not counted
1837        manifest.refresh_backlog(2000, "different-fp");
1838        assert_eq!(manifest.backlog.fast_tier_processed, 0);
1839    }
1840
1841    // ── Invalidation ───────────────────────────────────────────────────
1842
1843    #[test]
1844    fn invalidate_incompatible_removes_schema_mismatch() {
1845        let mut artifact = test_artifact(TierKind::Quality, true);
1846        artifact.schema_version = 0; // mismatch
1847        let mut manifest = SemanticManifest {
1848            quality_tier: Some(artifact),
1849            hnsw: Some(test_hnsw()), // depends on quality tier
1850            ..Default::default()
1851        };
1852
1853        let policy = test_policy();
1854        let count = manifest.invalidate_incompatible(&policy, "abc123");
1855
1856        assert_eq!(count, 2); // quality + hnsw
1857        assert!(manifest.quality_tier.is_none());
1858        assert!(manifest.hnsw.is_none());
1859    }
1860
1861    #[test]
1862    fn invalidate_incompatible_keeps_compatible() {
1863        let mut manifest = SemanticManifest {
1864            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1865            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1866            ..Default::default()
1867        };
1868
1869        let policy = test_policy();
1870        let count = manifest.invalidate_incompatible(&policy, "abc123");
1871
1872        assert_eq!(count, 0);
1873        assert!(manifest.fast_tier.is_some());
1874        assert!(manifest.quality_tier.is_some());
1875    }
1876
1877    // ── Legacy adoption ────────────────────────────────────────────────
1878
1879    #[test]
1880    fn adopt_legacy_artifact() {
1881        let mut manifest = SemanticManifest::default();
1882        let doc_count = 500;
1883        let conversation_count = 125;
1884        let index_path = "vector_index/index-fnv1a-384.fsvi";
1885        let db_fingerprint = "fp-old";
1886        let size_bytes = 75_000;
1887        let adopted = manifest.adopt_legacy_artifact(
1888            TierKind::Fast,
1889            "fnv1a-384",
1890            "hash",
1891            384,
1892            doc_count,
1893            conversation_count,
1894            db_fingerprint,
1895            index_path,
1896            size_bytes,
1897        );
1898
1899        assert!(adopted);
1900        let fast = manifest.fast_tier.as_ref().unwrap();
1901        assert_eq!(fast.embedder_id, "fnv1a-384");
1902        assert!(fast.ready);
1903        assert_eq!(fast.schema_version, SEMANTIC_SCHEMA_VERSION);
1904    }
1905
1906    // ── Size accounting ────────────────────────────────────────────────
1907
1908    #[test]
1909    fn total_size_accounts_for_all_artifacts() {
1910        let manifest = SemanticManifest {
1911            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1912            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1913            hnsw: Some(test_hnsw()),
1914            ..Default::default()
1915        };
1916
1917        assert_eq!(manifest.total_size_bytes(), 150_000 + 150_000 + 50_000);
1918        assert_eq!(manifest.total_size_mb(), 1); // 350KB rounds up to 1MB
1919    }
1920
1921    #[test]
1922    fn total_size_empty_is_zero() {
1923        let manifest = SemanticManifest::default();
1924        assert_eq!(manifest.total_size_bytes(), 0);
1925        assert_eq!(manifest.total_size_mb(), 0);
1926    }
1927
1928    // ── JSON round-trip ────────────────────────────────────────────────
1929
1930    #[test]
1931    fn manifest_json_round_trip() {
1932        let manifest = SemanticManifest {
1933            fast_tier: Some(test_artifact(TierKind::Fast, true)),
1934            quality_tier: Some(test_artifact(TierKind::Quality, true)),
1935            hnsw: Some(test_hnsw()),
1936            checkpoint: Some(test_checkpoint(TierKind::Quality)),
1937            ..Default::default()
1938        };
1939
1940        let json = serde_json::to_string_pretty(&manifest).unwrap();
1941        let deser: SemanticManifest = serde_json::from_str(&json).unwrap();
1942        assert_eq!(deser.fast_tier, manifest.fast_tier);
1943        assert_eq!(deser.quality_tier, manifest.quality_tier);
1944        assert_eq!(deser.hnsw, manifest.hnsw);
1945        assert_eq!(deser.checkpoint, manifest.checkpoint);
1946    }
1947
1948    // ── Shard sidecar manifest ─────────────────────────────────────────
1949
1950    #[test]
1951    fn shard_manifest_round_trip_via_sidecar() {
1952        let temp = tempfile::tempdir().unwrap();
1953        let mut shards = SemanticShardManifest::default();
1954        shards.replace_shards_for_generation(
1955            TierKind::Fast,
1956            "fnv1a-384",
1957            "fp-sharded",
1958            vec![test_shard(1, 2, true), test_shard(0, 2, true)],
1959        );
1960
1961        shards.save(temp.path()).unwrap();
1962        let loaded = SemanticShardManifest::load(temp.path()).unwrap().unwrap();
1963
1964        assert_eq!(loaded.manifest_version, MANIFEST_FORMAT_VERSION);
1965        assert_eq!(loaded.shards.len(), 2);
1966        assert_eq!(loaded.shards[0].shard_index, 0);
1967        assert_eq!(loaded.shards[1].shard_index, 1);
1968        assert!(loaded.updated_at_ms > 0);
1969    }
1970
1971    #[test]
1972    fn shard_summary_requires_every_ready_shard() {
1973        let mut shards = SemanticShardManifest::default();
1974        shards.replace_shards_for_generation(
1975            TierKind::Fast,
1976            "fnv1a-384",
1977            "fp-sharded",
1978            vec![test_shard(0, 3, true), test_shard(2, 3, true)],
1979        );
1980
1981        let partial = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1982        assert_eq!(partial.shard_count, 3);
1983        assert_eq!(partial.ready_shards, 2);
1984        assert!(!partial.complete);
1985
1986        shards.replace_shards_for_generation(
1987            TierKind::Fast,
1988            "fnv1a-384",
1989            "fp-sharded",
1990            vec![
1991                test_shard(0, 3, true),
1992                test_shard(1, 3, true),
1993                test_shard(2, 3, true),
1994            ],
1995        );
1996
1997        let complete = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
1998        assert_eq!(complete.ready_shards, 3);
1999        assert!(complete.complete);
2000        assert_eq!(complete.doc_count, 75);
2001        assert_eq!(complete.total_conversations, 10);
2002    }
2003
2004    #[test]
2005    fn shard_summary_rejects_non_mmap_ready_or_inconsistent_shards() {
2006        let mut non_mmap = test_shard(0, 1, true);
2007        non_mmap.mmap_ready = false;
2008        let mut shards = SemanticShardManifest::default();
2009        shards.replace_shards_for_generation(
2010            TierKind::Fast,
2011            "fnv1a-384",
2012            "fp-sharded",
2013            vec![non_mmap],
2014        );
2015
2016        let non_mmap_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2017        assert_eq!(non_mmap_summary.ready_shards, 0);
2018        assert!(!non_mmap_summary.complete);
2019
2020        let mut inconsistent = test_shard(1, 3, true);
2021        inconsistent.ann_ready = true;
2022        inconsistent.ann_index_path = None;
2023        inconsistent.ann_size_bytes = 4096;
2024        shards.replace_shards_for_generation(
2025            TierKind::Fast,
2026            "fnv1a-384",
2027            "fp-sharded",
2028            vec![test_shard(0, 2, true), inconsistent],
2029        );
2030
2031        let inconsistent_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2032        assert_eq!(inconsistent_summary.shard_count, 3);
2033        assert_eq!(inconsistent_summary.ready_shards, 2);
2034        assert_eq!(inconsistent_summary.ann_ready_shards, 0);
2035        assert!(!inconsistent_summary.complete);
2036
2037        shards.replace_shards_for_generation(
2038            TierKind::Fast,
2039            "fnv1a-384",
2040            "fp-sharded",
2041            vec![
2042                test_shard(0, 2, true),
2043                test_shard(1, 2, true),
2044                test_shard(1, 2, false),
2045            ],
2046        );
2047        let duplicate_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2048        assert_eq!(duplicate_summary.shard_count, 2);
2049        assert_eq!(duplicate_summary.ready_shards, 2);
2050        assert!(
2051            !duplicate_summary.complete,
2052            "duplicate shard indexes must not summarize as a complete generation"
2053        );
2054
2055        let mut duplicate_path = test_shard(1, 2, true);
2056        duplicate_path.index_path = test_shard(0, 2, true).index_path;
2057        shards.replace_shards_for_generation(
2058            TierKind::Fast,
2059            "fnv1a-384",
2060            "fp-sharded",
2061            vec![test_shard(0, 2, true), duplicate_path],
2062        );
2063        let duplicate_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2064        assert_eq!(duplicate_path_summary.shard_count, 2);
2065        assert_eq!(duplicate_path_summary.ready_shards, 2);
2066        assert!(
2067            !duplicate_path_summary.complete,
2068            "duplicate shard index paths must not summarize as a complete generation"
2069        );
2070
2071        let mut blank_path = test_shard(0, 1, true);
2072        blank_path.index_path.clear();
2073        shards.replace_shards_for_generation(
2074            TierKind::Fast,
2075            "fnv1a-384",
2076            "fp-sharded",
2077            vec![blank_path],
2078        );
2079        let blank_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2080        assert_eq!(blank_path_summary.shard_count, 1);
2081        assert_eq!(blank_path_summary.ready_shards, 1);
2082        assert!(
2083            !blank_path_summary.complete,
2084            "blank shard index paths must not summarize as complete"
2085        );
2086
2087        for unsafe_path in [
2088            tempfile::tempdir()
2089                .unwrap()
2090                .path()
2091                .join("outside.fsvi")
2092                .to_string_lossy()
2093                .to_string(),
2094            "vector_index/shards/../outside.fsvi".to_string(),
2095            "./vector_index/shards/fast/hash.fsvi".to_string(),
2096            " vector_index/shards/fast/hash.fsvi".to_string(),
2097        ] {
2098            let mut unsafe_shard = test_shard(0, 1, true);
2099            unsafe_shard.index_path = unsafe_path;
2100            shards.replace_shards_for_generation(
2101                TierKind::Fast,
2102                "fnv1a-384",
2103                "fp-sharded",
2104                vec![unsafe_shard],
2105            );
2106            let unsafe_path_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2107            assert_eq!(unsafe_path_summary.shard_count, 1);
2108            assert_eq!(unsafe_path_summary.ready_shards, 1);
2109            assert!(
2110                !unsafe_path_summary.complete,
2111                "unsafe shard index paths must not summarize as complete"
2112            );
2113        }
2114
2115        let outside_ann_dir = tempfile::tempdir().unwrap();
2116        for unsafe_ann_path in [
2117            outside_ann_dir
2118                .path()
2119                .join("outside.chsw")
2120                .to_string_lossy()
2121                .to_string(),
2122            "vector_index/shards/../outside.chsw".to_string(),
2123            "./vector_index/shards/fast/hash.chsw".to_string(),
2124            " vector_index/shards/fast/hash.chsw".to_string(),
2125        ] {
2126            let mut unsafe_ann = test_shard(0, 1, true);
2127            unsafe_ann.ann_ready = true;
2128            unsafe_ann.ann_index_path = Some(unsafe_ann_path);
2129            unsafe_ann.ann_size_bytes = 4096;
2130            shards.replace_shards_for_generation(
2131                TierKind::Fast,
2132                "fnv1a-384",
2133                "fp-sharded",
2134                vec![unsafe_ann],
2135            );
2136            let unsafe_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2137            assert_eq!(unsafe_ann_summary.shard_count, 1);
2138            assert_eq!(unsafe_ann_summary.ready_shards, 1);
2139            assert_eq!(unsafe_ann_summary.ann_ready_shards, 0);
2140            assert!(
2141                unsafe_ann_summary.complete,
2142                "unsafe optional ANN paths must not invalidate the vector shard generation"
2143            );
2144        }
2145
2146        let mut duplicate_ann_path = test_shard(1, 2, true);
2147        duplicate_ann_path.ann_ready = true;
2148        duplicate_ann_path.ann_index_path =
2149            Some("vector_index/shards/fast-fnv1a-384/shared-ann.chsw".to_owned());
2150        duplicate_ann_path.ann_size_bytes = 4096;
2151        let mut first_ann_path = test_shard(0, 2, true);
2152        first_ann_path.ann_ready = true;
2153        first_ann_path.ann_index_path = duplicate_ann_path.ann_index_path.clone();
2154        first_ann_path.ann_size_bytes = 4096;
2155        shards.replace_shards_for_generation(
2156            TierKind::Fast,
2157            "fnv1a-384",
2158            "fp-sharded",
2159            vec![first_ann_path, duplicate_ann_path],
2160        );
2161        let duplicate_ann_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2162        assert_eq!(duplicate_ann_summary.shard_count, 2);
2163        assert_eq!(duplicate_ann_summary.ready_shards, 2);
2164        assert_eq!(duplicate_ann_summary.ann_ready_shards, 1);
2165        assert!(
2166            duplicate_ann_summary.complete,
2167            "duplicate optional ANN paths must not invalidate the vector shard generation"
2168        );
2169
2170        shards.replace_shards_for_generation(
2171            TierKind::Fast,
2172            "fnv1a-384",
2173            "fp-sharded",
2174            vec![test_shard(2, 2, true)],
2175        );
2176        let out_of_range_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2177        assert_eq!(out_of_range_summary.shard_count, 2);
2178        assert_eq!(out_of_range_summary.ready_shards, 1);
2179        assert!(
2180            !out_of_range_summary.complete,
2181            "shard indexes outside the declared shard count are malformed"
2182        );
2183
2184        let mut mismatched_metadata = test_shard(1, 2, true);
2185        mismatched_metadata.dimension = 768;
2186        shards.replace_shards_for_generation(
2187            TierKind::Fast,
2188            "fnv1a-384",
2189            "fp-sharded",
2190            vec![test_shard(0, 2, true), mismatched_metadata],
2191        );
2192        let metadata_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2193        assert_eq!(metadata_summary.shard_count, 2);
2194        assert_eq!(metadata_summary.ready_shards, 2);
2195        assert!(
2196            !metadata_summary.complete,
2197            "complete shard generations require consistent shard metadata"
2198        );
2199
2200        let mut stale_schema = test_shard(0, 1, true);
2201        stale_schema.schema_version = SEMANTIC_SCHEMA_VERSION.saturating_sub(1);
2202        shards.replace_shards_for_generation(
2203            TierKind::Fast,
2204            "fnv1a-384",
2205            "fp-sharded",
2206            vec![stale_schema],
2207        );
2208        let stale_schema_summary = shards.summary(TierKind::Fast, "fnv1a-384", "fp-sharded");
2209        assert_eq!(stale_schema_summary.shard_count, 1);
2210        assert_eq!(stale_schema_summary.ready_shards, 1);
2211        assert!(
2212            !stale_schema_summary.complete,
2213            "stale schema shards must not summarize as complete"
2214        );
2215    }
2216
2217    #[test]
2218    fn shard_sidecar_does_not_make_main_manifest_ready() {
2219        let mut shards = SemanticShardManifest::default();
2220        shards.replace_shards_for_generation(
2221            TierKind::Fast,
2222            "fnv1a-384",
2223            "fp-sharded",
2224            vec![test_shard(0, 1, true)],
2225        );
2226        assert!(
2227            shards
2228                .summary(TierKind::Fast, "fnv1a-384", "fp-sharded")
2229                .complete
2230        );
2231
2232        let manifest = SemanticManifest::default();
2233        let policy = test_policy();
2234        assert_eq!(
2235            manifest.fast_tier_readiness(&policy, "fp-sharded", "hash"),
2236            TierReadiness::Missing,
2237            "sidecar shards must not publish runtime semantic readiness"
2238        );
2239    }
2240
2241    #[test]
2242    fn shard_manifest_invalidates_incompatible_shards() {
2243        let mut bad = test_shard(0, 1, true);
2244        bad.schema_version = 0;
2245        let mut shards = SemanticShardManifest {
2246            shards: vec![bad, test_shard(0, 1, true)],
2247            ..Default::default()
2248        };
2249
2250        let invalidated = shards.invalidate_incompatible(&test_policy(), "hash");
2251
2252        assert_eq!(invalidated, 1);
2253        assert_eq!(shards.shards.len(), 1);
2254        assert_eq!(shards.total_size_bytes(), 4096);
2255    }
2256}