Skip to main content

coding_agent_search/indexer/
semantic.rs

1use std::collections::HashSet;
2use std::fs;
3use std::io::IsTerminal;
4use std::path::{Path, PathBuf};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8use frankensearch::index::{
9    HNSW_DEFAULT_EF_CONSTRUCTION as FS_HNSW_DEFAULT_EF_CONSTRUCTION,
10    HNSW_DEFAULT_M as FS_HNSW_DEFAULT_M, HnswConfig as FsHnswConfig, HnswIndex as FsHnswIndex,
11    Quantization as FsQuantization, VectorIndex as FsVectorIndex,
12    VectorIndexWriter as FsVectorIndexWriter,
13};
14use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
15use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
16use rayon::prelude::*;
17
18use crate::indexer::memoization::{
19    ContentAddressedMemoCache, MemoCacheAuditRecord, MemoContentHash, MemoKey, MemoLookup,
20};
21use crate::indexer::responsiveness;
22use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
23use crate::model::types::{Conversation, Message};
24use crate::search::canonicalize::{canonicalize_for_embedding, content_hash};
25use crate::search::embedder::Embedder;
26use crate::search::fastembed_embedder::FastEmbedder;
27use crate::search::hash_embedder::HashEmbedder;
28use crate::search::policy::{CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy};
29use crate::search::semantic_manifest::{
30    ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
31    TierKind,
32};
33use crate::search::tantivy::{
34    normalized_index_origin_host, normalized_index_origin_kind, normalized_index_source_id,
35};
36use crate::search::vector_index::{
37    ROLE_USER, SemanticDocId, VECTOR_INDEX_DIR, role_code_from_str, vector_index_path,
38};
39use crate::storage::sqlite::FrankenStorage;
40
41/// Default embedder batch size. 128 is a sweet spot for ONNX MiniLM models on
42/// modern CPUs: big enough to amortize dispatch overhead and keep the tensor
43/// kernels saturated, small enough that one batch fits comfortably in L2 and
44/// memory reservation stays bounded for large corpora.
45const DEFAULT_SEMANTIC_BATCH_SIZE: usize = 128;
46const DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY: usize = 4_096;
47const SEMANTIC_PREP_MEMO_ALGORITHM: &str = "semantic_prepare_window";
48const SEMANTIC_PREP_MEMO_VERSION: &str = "canonicalize_for_embedding:v2:stable-content-hash";
49
50fn resolved_default_batch_size() -> usize {
51    dotenvy::var("CASS_SEMANTIC_BATCH_SIZE")
52        .ok()
53        .and_then(|v| v.parse::<usize>().ok())
54        .filter(|v| *v > 0)
55        .unwrap_or(DEFAULT_SEMANTIC_BATCH_SIZE)
56}
57
58fn resolved_semantic_prep_memo_capacity() -> usize {
59    dotenvy::var("CASS_SEMANTIC_PREP_MEMO_CAPACITY")
60        .ok()
61        .and_then(|v| v.parse::<usize>().ok())
62        .filter(|v| *v > 0)
63        .unwrap_or(DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY)
64}
65
66/// Opt in to the rayon-parallel canonicalize+hash prep step. **Default: OFF.**
67///
68/// The parallel path is kept because canonicalize+hash CAN dominate the
69/// embedding wall-clock on pathological inputs (very long messages, costly
70/// Unicode normalization). But criterion baselines captured under
71/// `tests/artifacts/perf/2026-04-21-profile-run/baselines.md` showed a
72/// 1.2×–2.3× **regression** on the hash embedder across every batch size
73/// tested (2 000 messages, mixed markdown/code/unicode): rayon's per-task
74/// scheduling overhead is larger than the per-message canonicalize+hash cost
75/// when the embedder itself is cheap. For the production ONNX (MiniLM)
76/// embedder, per-batch inference already dwarfs prep, so parallel prep never
77/// buys meaningful wall-clock — the prep step is ≤ 1% of total embed time.
78///
79/// Set `CASS_SEMANTIC_PREP_PARALLEL=1` / `true` / `yes` / `on` to opt in.
80fn parallel_prep_enabled() -> bool {
81    env_truthy("CASS_SEMANTIC_PREP_PARALLEL")
82}
83
84fn saturating_u64_from_usize(value: usize) -> u64 {
85    u64::try_from(value).unwrap_or(u64::MAX)
86}
87
88#[derive(Debug, Clone)]
89pub struct EmbeddingInput {
90    pub message_id: u64,
91    pub created_at_ms: i64,
92    pub agent_id: u32,
93    pub workspace_id: u32,
94    pub source_id: u32,
95    pub role: u8,
96    pub chunk_idx: u8,
97    pub content: String,
98}
99
100impl EmbeddingInput {
101    pub fn new(message_id: u64, content: impl Into<String>) -> Self {
102        Self {
103            message_id,
104            created_at_ms: 0,
105            agent_id: 0,
106            workspace_id: 0,
107            source_id: 0,
108            role: ROLE_USER,
109            chunk_idx: 0,
110            content: content.into(),
111        }
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct EmbeddedMessage {
117    pub message_id: u64,
118    pub created_at_ms: i64,
119    pub agent_id: u32,
120    pub workspace_id: u32,
121    pub source_id: u32,
122    pub role: u8,
123    pub chunk_idx: u8,
124    pub content_hash: [u8; 32],
125    pub embedding: Vec<f32>,
126}
127
128#[derive(Debug, Clone)]
129pub struct SemanticBackfillBatchPlan {
130    pub tier: TierKind,
131    pub db_fingerprint: String,
132    pub model_revision: String,
133    pub total_conversations: u64,
134    pub conversations_in_batch: u64,
135    pub last_offset: i64,
136}
137
138#[derive(Debug, Clone)]
139pub struct SemanticBackfillStoragePlan {
140    pub tier: TierKind,
141    pub db_fingerprint: String,
142    pub model_revision: String,
143    pub max_conversations: usize,
144}
145
146#[derive(Debug, Clone)]
147pub struct SemanticBackfillBatchOutcome {
148    pub tier: TierKind,
149    pub embedder_id: String,
150    pub embedded_docs: u64,
151    pub conversations_processed: u64,
152    pub total_conversations: u64,
153    pub last_offset: i64,
154    pub checkpoint_saved: bool,
155    pub published: bool,
156    pub index_path: PathBuf,
157    pub manifest_path: PathBuf,
158}
159
160#[derive(Debug, Clone)]
161pub struct SemanticShardBuildPlan {
162    pub tier: TierKind,
163    pub db_fingerprint: String,
164    pub model_revision: String,
165    pub total_conversations: u64,
166    pub max_records_per_shard: usize,
167    pub build_ann: bool,
168}
169
170#[derive(Debug, Clone)]
171pub struct SemanticShardBuildOutcome {
172    pub tier: TierKind,
173    pub embedder_id: String,
174    pub shard_count: u32,
175    pub doc_count: u64,
176    pub total_conversations: u64,
177    pub index_paths: Vec<PathBuf>,
178    pub ann_index_paths: Vec<PathBuf>,
179    pub shard_manifest_path: PathBuf,
180    pub complete: bool,
181}
182
183#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
184#[serde(rename_all = "snake_case")]
185pub(crate) enum SemanticBackfillSchedulerState {
186    Running,
187    Paused,
188    Disabled,
189}
190
191impl SemanticBackfillSchedulerState {
192    pub(crate) fn as_str(self) -> &'static str {
193        match self {
194            Self::Running => "running",
195            Self::Paused => "paused",
196            Self::Disabled => "disabled",
197        }
198    }
199}
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
202#[serde(rename_all = "snake_case")]
203pub(crate) enum SemanticBackfillSchedulerReason {
204    IdleBudgetAvailable,
205    OperatorDisabled,
206    PolicyDisabled,
207    ForegroundPressure,
208    LexicalRepairActive,
209    CapacityBelowFloor,
210    ThreadBudgetZero,
211    BatchBudgetZero,
212}
213
214impl SemanticBackfillSchedulerReason {
215    pub(crate) fn next_step(self) -> &'static str {
216        match self {
217            Self::IdleBudgetAvailable => "background semantic backfill is within idle budgets",
218            Self::OperatorDisabled => {
219                "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE"
220            }
221            Self::PolicyDisabled => "semantic policy disables background semantic backfill",
222            Self::ForegroundPressure => {
223                "foreground pressure is present; retry after the idle delay"
224            }
225            Self::LexicalRepairActive => "lexical repair is active; semantic backfill is yielding",
226            Self::CapacityBelowFloor => {
227                "machine responsiveness capacity is below the semantic backfill floor"
228            }
229            Self::ThreadBudgetZero => "semantic backfill thread budget is zero",
230            Self::BatchBudgetZero => "semantic backfill batch budget is zero",
231        }
232    }
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub(crate) struct SemanticBackfillSchedulerSignals {
237    pub foreground_pressure: bool,
238    pub lexical_repair_active: bool,
239    pub force: bool,
240    pub operator_disabled: bool,
241}
242
243impl SemanticBackfillSchedulerSignals {
244    pub(crate) fn from_env() -> Self {
245        Self {
246            foreground_pressure: env_truthy("CASS_SEMANTIC_BACKFILL_FOREGROUND_ACTIVE"),
247            lexical_repair_active: env_truthy("CASS_SEMANTIC_BACKFILL_LEXICAL_REPAIR_ACTIVE"),
248            force: env_truthy("CASS_SEMANTIC_BACKFILL_FORCE"),
249            operator_disabled: env_truthy("CASS_SEMANTIC_BACKFILL_DISABLE"),
250        }
251    }
252}
253
254#[derive(Debug, Clone, serde::Serialize)]
255pub(crate) struct SemanticBackfillSchedulerDecision {
256    pub state: SemanticBackfillSchedulerState,
257    pub reason: SemanticBackfillSchedulerReason,
258    pub requested_batch_conversations: usize,
259    pub scheduled_batch_conversations: usize,
260    pub current_capacity_pct: u32,
261    pub min_capacity_pct: u32,
262    pub max_backfill_threads: usize,
263    pub idle_delay_seconds: u64,
264    pub chunk_timeout_seconds: u64,
265    pub foreground_pressure: bool,
266    pub lexical_repair_active: bool,
267    pub forced: bool,
268    pub next_eligible_after_ms: u64,
269}
270
271impl SemanticBackfillSchedulerDecision {
272    pub(crate) fn should_run(&self) -> bool {
273        matches!(self.state, SemanticBackfillSchedulerState::Running)
274    }
275}
276
277fn env_truthy(key: &str) -> bool {
278    dotenvy::var(key)
279        .ok()
280        .map(|value| {
281            matches!(
282                value.trim().to_ascii_lowercase().as_str(),
283                "1" | "true" | "yes" | "on"
284            )
285        })
286        .unwrap_or(false)
287}
288
289fn env_backfill_min_capacity_pct() -> u32 {
290    dotenvy::var("CASS_SEMANTIC_BACKFILL_MIN_CAPACITY_PCT")
291        .ok()
292        .and_then(|value| value.trim().parse::<u32>().ok())
293        .map(|value| value.clamp(1, 100))
294        .unwrap_or(75)
295}
296
297pub(crate) fn semantic_backfill_scheduler_decision(
298    policy: &SemanticPolicy,
299    requested_batch_conversations: usize,
300    signals: &SemanticBackfillSchedulerSignals,
301) -> SemanticBackfillSchedulerDecision {
302    semantic_backfill_scheduler_decision_for_capacity(
303        policy,
304        requested_batch_conversations,
305        signals,
306        responsiveness::current_capacity_pct(),
307    )
308}
309
310pub(crate) fn semantic_backfill_scheduler_decision_for_capacity(
311    policy: &SemanticPolicy,
312    requested_batch_conversations: usize,
313    signals: &SemanticBackfillSchedulerSignals,
314    current_capacity_pct: u32,
315) -> SemanticBackfillSchedulerDecision {
316    let min_capacity_pct = env_backfill_min_capacity_pct();
317    let paused_delay_ms = policy.idle_delay_seconds.saturating_mul(1000);
318    let mut decision = SemanticBackfillSchedulerDecision {
319        state: SemanticBackfillSchedulerState::Running,
320        reason: SemanticBackfillSchedulerReason::IdleBudgetAvailable,
321        requested_batch_conversations,
322        scheduled_batch_conversations: requested_batch_conversations,
323        current_capacity_pct: current_capacity_pct.clamp(0, 100),
324        min_capacity_pct,
325        max_backfill_threads: policy.max_backfill_threads,
326        idle_delay_seconds: policy.idle_delay_seconds,
327        chunk_timeout_seconds: policy.chunk_timeout_seconds,
328        foreground_pressure: signals.foreground_pressure,
329        lexical_repair_active: signals.lexical_repair_active,
330        forced: signals.force,
331        next_eligible_after_ms: 0,
332    };
333
334    if requested_batch_conversations == 0 {
335        return stopped_scheduler_decision(
336            decision,
337            SemanticBackfillSchedulerState::Disabled,
338            SemanticBackfillSchedulerReason::BatchBudgetZero,
339            paused_delay_ms,
340        );
341    }
342    if policy.max_backfill_threads == 0 && !signals.force {
343        return stopped_scheduler_decision(
344            decision,
345            SemanticBackfillSchedulerState::Disabled,
346            SemanticBackfillSchedulerReason::ThreadBudgetZero,
347            paused_delay_ms,
348        );
349    }
350    if signals.operator_disabled && !signals.force {
351        return stopped_scheduler_decision(
352            decision,
353            SemanticBackfillSchedulerState::Disabled,
354            SemanticBackfillSchedulerReason::OperatorDisabled,
355            paused_delay_ms,
356        );
357    }
358    if !policy.mode.should_build_semantic() && !signals.force {
359        return stopped_scheduler_decision(
360            decision,
361            SemanticBackfillSchedulerState::Disabled,
362            SemanticBackfillSchedulerReason::PolicyDisabled,
363            paused_delay_ms,
364        );
365    }
366    if signals.lexical_repair_active && !signals.force {
367        return stopped_scheduler_decision(
368            decision,
369            SemanticBackfillSchedulerState::Paused,
370            SemanticBackfillSchedulerReason::LexicalRepairActive,
371            paused_delay_ms,
372        );
373    }
374    if signals.foreground_pressure && !signals.force {
375        return stopped_scheduler_decision(
376            decision,
377            SemanticBackfillSchedulerState::Paused,
378            SemanticBackfillSchedulerReason::ForegroundPressure,
379            paused_delay_ms,
380        );
381    }
382    if current_capacity_pct < min_capacity_pct && !signals.force {
383        return stopped_scheduler_decision(
384            decision,
385            SemanticBackfillSchedulerState::Paused,
386            SemanticBackfillSchedulerReason::CapacityBelowFloor,
387            paused_delay_ms,
388        );
389    }
390
391    let capacity = current_capacity_pct.clamp(1, 100) as usize;
392    let scaled = requested_batch_conversations.saturating_mul(capacity) / 100;
393    decision.scheduled_batch_conversations = scaled.max(1).min(requested_batch_conversations);
394    decision
395}
396
397fn stopped_scheduler_decision(
398    mut decision: SemanticBackfillSchedulerDecision,
399    state: SemanticBackfillSchedulerState,
400    reason: SemanticBackfillSchedulerReason,
401    next_eligible_after_ms: u64,
402) -> SemanticBackfillSchedulerDecision {
403    decision.state = state;
404    decision.reason = reason;
405    decision.scheduled_batch_conversations = 0;
406    decision.next_eligible_after_ms = next_eligible_after_ms;
407    decision
408}
409
410fn now_ms() -> i64 {
411    SystemTime::now()
412        .duration_since(UNIX_EPOCH)
413        .map(|duration| duration.as_millis() as i64)
414        .unwrap_or(0)
415}
416
417fn hnsw_index_path(data_dir: &Path, embedder_id: &str) -> PathBuf {
418    data_dir
419        .join(VECTOR_INDEX_DIR)
420        .join(format!("hnsw-{embedder_id}.chsw"))
421}
422
423fn safe_path_component(raw: &str) -> String {
424    raw.chars()
425        .map(|ch| {
426            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
427                ch
428            } else {
429                '_'
430            }
431        })
432        .collect()
433}
434
435fn semantic_staging_index_path(
436    data_dir: &Path,
437    tier: TierKind,
438    embedder_id: &str,
439    db_fingerprint: &str,
440) -> PathBuf {
441    let fingerprint_hash = crc32fast::hash(db_fingerprint.as_bytes());
442    data_dir.join(VECTOR_INDEX_DIR).join(format!(
443        ".staging-{}-{}-{fingerprint_hash:08x}.fsvi",
444        tier.as_str(),
445        safe_path_component(embedder_id)
446    ))
447}
448
449fn semantic_generation_fingerprint_component(db_fingerprint: &str) -> String {
450    blake3::hash(db_fingerprint.as_bytes())
451        .to_hex()
452        .chars()
453        .take(16)
454        .collect()
455}
456
457fn semantic_shard_generation_dir(
458    data_dir: &Path,
459    tier: TierKind,
460    embedder_id: &str,
461    db_fingerprint: &str,
462) -> PathBuf {
463    let fingerprint_hash = semantic_generation_fingerprint_component(db_fingerprint);
464    data_dir.join(VECTOR_INDEX_DIR).join("shards").join(format!(
465        "{}-{}-{fingerprint_hash}",
466        tier.as_str(),
467        safe_path_component(embedder_id),
468    ))
469}
470
471fn semantic_shard_index_path(
472    data_dir: &Path,
473    tier: TierKind,
474    embedder_id: &str,
475    db_fingerprint: &str,
476    shard_index: u32,
477) -> PathBuf {
478    semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
479        .join(format!("shard-{shard_index:05}.fsvi"))
480}
481
482fn semantic_shard_ann_index_path(
483    data_dir: &Path,
484    tier: TierKind,
485    embedder_id: &str,
486    db_fingerprint: &str,
487    shard_index: u32,
488) -> PathBuf {
489    semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
490        .join(format!("shard-{shard_index:05}.chsw"))
491}
492
493#[cfg(not(windows))]
494fn sync_parent_directory(path: &Path) -> Result<()> {
495    let Some(parent) = path.parent() else {
496        return Ok(());
497    };
498    let directory = fs::File::open(parent)
499        .with_context(|| format!("opening parent directory {}", parent.display()))?;
500    directory
501        .sync_all()
502        .with_context(|| format!("syncing parent directory {}", parent.display()))
503}
504
505#[cfg(windows)]
506fn sync_parent_directory(_path: &Path) -> Result<()> {
507    Ok(())
508}
509
510fn semantic_doc_id_for_embedded(embedded: &EmbeddedMessage) -> String {
511    SemanticDocId {
512        message_id: embedded.message_id,
513        chunk_idx: embedded.chunk_idx,
514        agent_id: embedded.agent_id,
515        workspace_id: embedded.workspace_id,
516        source_id: embedded.source_id,
517        role: embedded.role,
518        created_at_ms: embedded.created_at_ms,
519        content_hash: Some(embedded.content_hash),
520    }
521    .to_doc_id_string()
522}
523
524struct CanonicalEmbeddingConversationRow {
525    conversation_id: i64,
526    agent_slug: String,
527    agent_id: i64,
528    workspace: Option<PathBuf>,
529    workspace_id: Option<i64>,
530    external_id: Option<String>,
531    title: Option<String>,
532    source_path: PathBuf,
533    started_at: Option<i64>,
534    ended_at: Option<i64>,
535    source_id: Option<String>,
536    origin_host: Option<String>,
537}
538
539struct CanonicalEmbeddingBatch {
540    inputs: Vec<EmbeddingInput>,
541    conversations_in_batch: u64,
542    last_conversation_id: i64,
543    total_conversations: u64,
544}
545
546pub(crate) struct CanonicalIncrementalEmbeddingBatch {
547    pub inputs: Vec<EmbeddingInput>,
548    pub conversations_in_batch: u64,
549    pub raw_max_message_id: Option<i64>,
550}
551
552fn matching_semantic_checkpoint_offset(
553    manifest: &SemanticManifest,
554    tier: TierKind,
555    embedder_id: &str,
556    db_fingerprint: &str,
557) -> i64 {
558    manifest
559        .checkpoint
560        .as_ref()
561        .filter(|checkpoint| {
562            checkpoint.tier == tier
563                && checkpoint.embedder_id == embedder_id
564                && checkpoint.is_valid(db_fingerprint)
565        })
566        .map_or(0, |checkpoint| checkpoint.last_offset)
567}
568
569fn total_semantic_conversations(storage: &FrankenStorage) -> Result<u64> {
570    let count: i64 = storage
571        .raw()
572        .query_row_map(
573            "SELECT COUNT(DISTINCT m.conversation_id)
574             FROM messages m
575             JOIN conversations c ON c.id = m.conversation_id",
576            &[] as &[ParamValue],
577            |row| row.get_typed(0),
578        )
579        .with_context(|| "counting canonical conversations with semantic messages")?;
580    Ok(u64::try_from(count.max(0)).unwrap_or(u64::MAX))
581}
582
583pub(crate) fn message_id_from_db(raw: i64) -> Option<u64> {
584    u64::try_from(raw).ok()
585}
586
587pub(crate) fn saturating_u32_from_i64(raw: i64) -> u32 {
588    match u32::try_from(raw) {
589        Ok(value) => value,
590        Err(_) if raw.is_negative() => 0,
591        Err(_) => u32::MAX,
592    }
593}
594
595fn canonical_embedding_created_at_ms(message_id: u64, created_at: Option<i64>) -> i64 {
596    // `created_at_ms` feeds time-range filters in the vector index
597    // (src/search/vector_index.rs range predicates) and contributes to
598    // `stable_hit_hash`. Defaulting a NULL created_at to 0 silently
599    // masquerades the message as Unix-epoch (1970), which is indistinguishable
600    // from a legitimately-ancient row in downstream filters. Emit a warn
601    // so operators see NULL-created_at rows in the logs instead of only
602    // finding them by puzzling over 1970 timestamps in semantic hits.
603    created_at.unwrap_or_else(|| {
604        tracing::warn!(
605            message_id,
606            "semantic backfill: row has NULL created_at; defaulting to 0 (Unix epoch). \
607             Downstream time-range filters will treat this message as 1970-01-01."
608        );
609        0
610    })
611}
612
613fn canonical_embedding_packet_provenance(
614    row: &CanonicalEmbeddingConversationRow,
615) -> ConversationPacketProvenance {
616    let source_id =
617        normalized_index_source_id(row.source_id.as_deref(), None, row.origin_host.as_deref());
618    ConversationPacketProvenance {
619        origin_kind: normalized_index_origin_kind(&source_id, None),
620        origin_host: normalized_index_origin_host(row.origin_host.as_deref()),
621        source_id,
622    }
623}
624
625fn canonical_embedding_conversation(
626    row: &CanonicalEmbeddingConversationRow,
627    provenance: &ConversationPacketProvenance,
628    messages: Vec<Message>,
629) -> Conversation {
630    Conversation {
631        id: Some(row.conversation_id),
632        agent_slug: row.agent_slug.clone(),
633        workspace: row.workspace.clone(),
634        external_id: row.external_id.clone(),
635        title: row.title.clone(),
636        source_path: row.source_path.clone(),
637        started_at: row.started_at,
638        ended_at: row.ended_at,
639        approx_tokens: None,
640        metadata_json: serde_json::Value::Null,
641        messages,
642        source_id: provenance.source_id.clone(),
643        origin_host: provenance.origin_host.clone(),
644    }
645}
646
647fn embedding_input_from_packet_message(
648    conversation_id: i64,
649    agent_id: u32,
650    workspace_id: u32,
651    source_id_hash: u32,
652    message: &crate::model::conversation_packet::ConversationPacketMessage,
653) -> Option<EmbeddingInput> {
654    let Some(raw_message_id) = message.message_id else {
655        tracing::warn!(
656            conversation_id,
657            message_idx = message.idx,
658            "skipping semantic backfill message without canonical id in ConversationPacket replay"
659        );
660        return None;
661    };
662    let Some(message_id) = message_id_from_db(raw_message_id) else {
663        tracing::warn!(
664            conversation_id,
665            raw_message_id,
666            "skipping out-of-range id during semantic backfill"
667        );
668        return None;
669    };
670    Some(EmbeddingInput {
671        message_id,
672        created_at_ms: canonical_embedding_created_at_ms(message_id, message.created_at),
673        agent_id,
674        workspace_id,
675        source_id: source_id_hash,
676        role: role_code_from_str(&message.role).unwrap_or(ROLE_USER),
677        chunk_idx: 0,
678        content: message.content.clone(),
679    })
680}
681
682fn embedding_inputs_from_conversation_packet(
683    row: &CanonicalEmbeddingConversationRow,
684    packet: &ConversationPacket,
685) -> Vec<EmbeddingInput> {
686    let agent_id = saturating_u32_from_i64(row.agent_id);
687    let workspace_id = saturating_u32_from_i64(row.workspace_id.unwrap_or(0));
688    let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
689    packet
690        .projections
691        .semantic
692        .message_indices
693        .iter()
694        .filter_map(|message_index| {
695            packet
696                .payload
697                .messages
698                .get(*message_index)
699                .and_then(|message| {
700                    embedding_input_from_packet_message(
701                        row.conversation_id,
702                        agent_id,
703                        workspace_id,
704                        source_id_hash,
705                        message,
706                    )
707                })
708        })
709        .collect()
710}
711
712fn fetch_canonical_embedding_conversations(
713    storage: &FrankenStorage,
714    conversation_ids: &[i64],
715) -> Result<Vec<CanonicalEmbeddingConversationRow>> {
716    let mut envelope_sql = String::from(
717        "SELECT c.id,
718                COALESCE(a.slug, 'unknown'),
719                COALESCE(c.agent_id, 0),
720                c.workspace_id,
721                w.path,
722                c.external_id,
723                c.title,
724                c.source_path,
725                c.started_at,
726                c.ended_at,
727                c.source_id,
728                c.origin_host
729         FROM conversations c
730         LEFT JOIN agents a ON a.id = c.agent_id
731         LEFT JOIN workspaces w ON w.id = c.workspace_id
732         WHERE c.id IN (",
733    );
734    let mut params = Vec::with_capacity(conversation_ids.len());
735    for (idx, conversation_id) in conversation_ids.iter().enumerate() {
736        if idx > 0 {
737            envelope_sql.push_str(", ");
738        }
739        envelope_sql.push_str(&format!("?{}", idx + 1));
740        params.push(ParamValue::from(*conversation_id));
741    }
742    envelope_sql.push_str(") ORDER BY c.id ASC");
743
744    storage
745        .raw()
746        .query_map_collect(&envelope_sql, &params, |row| {
747            let workspace_path: Option<String> = row.get_typed(4)?;
748            Ok(CanonicalEmbeddingConversationRow {
749                conversation_id: row.get_typed(0)?,
750                agent_slug: row.get_typed(1)?,
751                agent_id: row.get_typed(2)?,
752                workspace_id: row.get_typed(3)?,
753                workspace: workspace_path.map(PathBuf::from),
754                external_id: row.get_typed(5)?,
755                title: row.get_typed(6)?,
756                source_path: PathBuf::from(row.get_typed::<String>(7)?),
757                started_at: row.get_typed(8)?,
758                ended_at: row.get_typed(9)?,
759                source_id: row.get_typed(10)?,
760                origin_host: row.get_typed(11)?,
761            })
762        })
763        .with_context(|| {
764            format!(
765                "fetching semantic backfill conversation envelopes for {} conversations",
766                conversation_ids.len()
767            )
768        })
769}
770
771/// Per-packet semantic context that supplies the database-internal
772/// agent / workspace ids the canonical embedding row carries but the
773/// `ConversationPacket` does not (those ids are storage-internal,
774/// not part of the packet contract).
775///
776/// `coding_agent_session_search-ibuuh.32` (sink #3): when a caller
777/// already holds packets (rebuild pipeline, salvage replay, repair
778/// flows, etc.) it can pair them with their canonical
779/// agent_id/workspace_id and drive the semantic preparation consumer
780/// without a second storage round-trip.
781#[allow(dead_code)]
782#[derive(Debug, Clone, Copy)]
783pub(crate) struct SemanticPacketContext {
784    pub conversation_id: i64,
785    pub agent_id: u32,
786    pub workspace_id: u32,
787}
788
789/// Packet-driven counterpart to
790/// [`packet_embedding_inputs_from_storage`]: derives the same
791/// `EmbeddingInput` list a fresh storage replay would produce, but
792/// without re-querying canonical conversation rows.
793///
794/// Invariants:
795/// - The `i`th element of `contexts` describes the `i`th packet.
796/// - Returns `Err` if the lengths disagree, so a callsite cannot
797///   silently mis-correlate packets and contexts.
798/// - `source_id_hash` is derived from `packet.payload.provenance.source_id`
799///   the same way `embedding_inputs_from_conversation_packet` derives
800///   it from the canonical row, so the produced `EmbeddingInput.source_id`
801///   matches both paths byte-for-byte.
802///
803/// The `semantic_inputs_from_packets_matches_storage_replay`
804/// equivalence test pins every produced `EmbeddingInput` field is
805/// identical to what the legacy storage-side replay returns for the
806/// same canonical corpus, so callers that already hold packets can
807/// switch to this helper without changing semantic-index output.
808#[allow(dead_code)]
809pub(crate) fn semantic_inputs_from_packets(
810    packets: &[ConversationPacket],
811    contexts: &[SemanticPacketContext],
812) -> Result<Vec<EmbeddingInput>> {
813    if packets.len() != contexts.len() {
814        anyhow::bail!(
815            "semantic_inputs_from_packets length mismatch: {} packets vs {} contexts",
816            packets.len(),
817            contexts.len()
818        );
819    }
820    let mut inputs = Vec::new();
821    for (packet, context) in packets.iter().zip(contexts.iter()) {
822        let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
823        for &message_index in &packet.projections.semantic.message_indices {
824            let Some(message) = packet.payload.messages.get(message_index) else {
825                anyhow::bail!(
826                    "packet semantic projection references missing message index {} \
827                     (packet has {} messages)",
828                    message_index,
829                    packet.payload.messages.len()
830                );
831            };
832            if let Some(input) = embedding_input_from_packet_message(
833                context.conversation_id,
834                context.agent_id,
835                context.workspace_id,
836                source_id_hash,
837                message,
838            ) {
839                inputs.push(input);
840            }
841        }
842    }
843    tracing::debug!(
844        packets = packets.len(),
845        packet_driven = true,
846        semantic_inputs = inputs.len(),
847        "built semantic inputs from in-memory ConversationPacket batch"
848    );
849    Ok(inputs)
850}
851
852fn fetch_canonical_embedding_batch(
853    storage: &FrankenStorage,
854    after_conversation_id: i64,
855    max_conversations: usize,
856) -> Result<CanonicalEmbeddingBatch> {
857    let total_conversations = total_semantic_conversations(storage)?;
858    let max_conversations_i64 = i64::try_from(max_conversations.max(1)).unwrap_or(i64::MAX);
859    let conversation_ids: Vec<i64> = storage
860        .raw()
861        .query_map_collect(
862            "SELECT DISTINCT m.conversation_id
863             FROM messages m
864             JOIN conversations c ON c.id = m.conversation_id
865             WHERE m.conversation_id > ?1
866             ORDER BY m.conversation_id ASC
867             LIMIT ?2",
868            &[
869                ParamValue::from(after_conversation_id),
870                ParamValue::from(max_conversations_i64),
871            ],
872            |row| row.get_typed(0),
873        )
874        .with_context(|| {
875            format!("fetching semantic backfill conversation ids after {after_conversation_id}")
876        })?;
877
878    if conversation_ids.is_empty() {
879        return Ok(CanonicalEmbeddingBatch {
880            inputs: Vec::new(),
881            conversations_in_batch: 0,
882            last_conversation_id: after_conversation_id,
883            total_conversations,
884        });
885    }
886
887    let conversations = fetch_canonical_embedding_conversations(storage, &conversation_ids)?;
888
889    let mut grouped_messages =
890        storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
891    let mut inputs = Vec::new();
892    for conversation in &conversations {
893        let messages = grouped_messages
894            .remove(&conversation.conversation_id)
895            .unwrap_or_default();
896        let provenance = canonical_embedding_packet_provenance(conversation);
897        let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
898        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
899        inputs.extend(embedding_inputs_from_conversation_packet(
900            conversation,
901            &packet,
902        ));
903    }
904
905    let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
906    tracing::debug!(
907        conversations_in_batch,
908        packet_driven = true,
909        semantic_inputs = inputs.len(),
910        "built semantic backfill batch from ConversationPacket canonical replay"
911    );
912
913    Ok(CanonicalEmbeddingBatch {
914        inputs,
915        conversations_in_batch,
916        last_conversation_id: *conversation_ids.last().unwrap_or(&after_conversation_id),
917        total_conversations,
918    })
919}
920
921pub(crate) fn packet_embedding_inputs_from_storage(
922    storage: &FrankenStorage,
923) -> Result<Vec<EmbeddingInput>> {
924    Ok(fetch_canonical_embedding_batch(storage, 0, usize::MAX)?.inputs)
925}
926
927fn packet_embedding_inputs_from_selected_canonical_messages<F>(
928    storage: &FrankenStorage,
929    conversation_ids: &[i64],
930    mut include_message: F,
931) -> Result<(Vec<EmbeddingInput>, Option<i64>)>
932where
933    F: FnMut(&Message) -> bool,
934{
935    if conversation_ids.is_empty() {
936        return Ok((Vec::new(), None));
937    }
938
939    let conversations = fetch_canonical_embedding_conversations(storage, conversation_ids)?;
940    let mut grouped_messages =
941        storage.fetch_messages_for_lexical_rebuild_batch(conversation_ids, None, None)?;
942    let mut inputs = Vec::new();
943    let mut raw_max_message_id: Option<i64> = None;
944
945    for conversation in &conversations {
946        let mut messages = grouped_messages
947            .remove(&conversation.conversation_id)
948            .unwrap_or_default();
949        messages.retain(|message| {
950            let keep = include_message(message);
951            if keep && let Some(message_id) = message.id {
952                raw_max_message_id =
953                    Some(raw_max_message_id.map_or(message_id, |current| current.max(message_id)));
954            }
955            keep
956        });
957        if messages.is_empty() {
958            continue;
959        }
960
961        let provenance = canonical_embedding_packet_provenance(conversation);
962        let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
963        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
964        inputs.extend(embedding_inputs_from_conversation_packet(
965            conversation,
966            &packet,
967        ));
968    }
969
970    Ok((inputs, raw_max_message_id))
971}
972
973pub(crate) fn packet_embedding_inputs_from_storage_since(
974    storage: &FrankenStorage,
975    since_message_id: i64,
976) -> Result<CanonicalIncrementalEmbeddingBatch> {
977    let conversation_ids: Vec<i64> = storage
978        .raw()
979        .query_map_collect(
980            "SELECT DISTINCT m.conversation_id
981             FROM messages m
982             WHERE m.id > ?1
983             ORDER BY m.conversation_id ASC",
984            &[ParamValue::from(since_message_id)],
985            |row| row.get_typed(0),
986        )
987        .with_context(|| {
988            format!(
989                "fetching canonical semantic catch-up conversation ids after message {since_message_id}"
990            )
991        })?;
992
993    if conversation_ids.is_empty() {
994        return Ok(CanonicalIncrementalEmbeddingBatch {
995            inputs: Vec::new(),
996            conversations_in_batch: 0,
997            raw_max_message_id: None,
998        });
999    }
1000
1001    let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1002        storage,
1003        &conversation_ids,
1004        |message| message.id.is_some_and(|id| id > since_message_id),
1005    )?;
1006
1007    let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
1008    tracing::debug!(
1009        since_message_id,
1010        conversations_in_batch,
1011        packet_driven = true,
1012        semantic_inputs = inputs.len(),
1013        "built semantic catch-up batch from ConversationPacket canonical replay"
1014    );
1015
1016    Ok(CanonicalIncrementalEmbeddingBatch {
1017        inputs,
1018        conversations_in_batch,
1019        raw_max_message_id,
1020    })
1021}
1022
1023pub(crate) fn packet_embedding_inputs_from_storage_for_message_ids(
1024    storage: &FrankenStorage,
1025    conversation_ids: &[i64],
1026    message_ids: &HashSet<i64>,
1027) -> Result<Vec<EmbeddingInput>> {
1028    if conversation_ids.is_empty() || message_ids.is_empty() {
1029        return Ok(Vec::new());
1030    }
1031
1032    let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1033        storage,
1034        conversation_ids,
1035        |message| message.id.is_some_and(|id| message_ids.contains(&id)),
1036    )?;
1037    tracing::debug!(
1038        conversations_in_batch = conversation_ids.len(),
1039        selected_message_ids = message_ids.len(),
1040        semantic_inputs = inputs.len(),
1041        raw_max_message_id,
1042        packet_driven = true,
1043        "built selected semantic batch from ConversationPacket canonical replay"
1044    );
1045
1046    Ok(inputs)
1047}
1048
1049struct Prepared<'a> {
1050    msg: &'a EmbeddingInput,
1051    canonical: String,
1052    hash: [u8; 32],
1053}
1054
1055#[derive(Debug, Clone, PartialEq, Eq)]
1056struct MemoizedPreparedMessage {
1057    canonical: String,
1058    hash: [u8; 32],
1059}
1060
1061fn semantic_prep_memo_key(content: &str) -> MemoKey {
1062    MemoKey::new(
1063        MemoContentHash::from_bytes(content_hash(content).to_vec()),
1064        SEMANTIC_PREP_MEMO_ALGORITHM,
1065        SEMANTIC_PREP_MEMO_VERSION,
1066    )
1067}
1068
1069fn memo_counter_delta(after: u64, before: u64) -> u64 {
1070    after.saturating_sub(before)
1071}
1072
1073fn trace_semantic_prep_memo_window(
1074    window_index: usize,
1075    window_len: usize,
1076    prepared_len: usize,
1077    entry_capacity: usize,
1078    before: &crate::indexer::memoization::MemoCacheStats,
1079    after: &crate::indexer::memoization::MemoCacheStats,
1080) {
1081    tracing::trace!(
1082        algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1083        algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1084        window_index,
1085        window_len,
1086        prepared_messages = prepared_len,
1087        skipped_messages = window_len.saturating_sub(prepared_len),
1088        hit_delta = memo_counter_delta(after.hits, before.hits),
1089        miss_delta = memo_counter_delta(after.misses, before.misses),
1090        insert_delta = memo_counter_delta(after.inserts, before.inserts),
1091        evictions_capacity_delta =
1092            memo_counter_delta(after.evictions_capacity, before.evictions_capacity),
1093        quarantined_delta = memo_counter_delta(after.quarantined, before.quarantined),
1094        live_entries = after.live_entries,
1095        entry_capacity,
1096        "semantic prep memo cache window"
1097    );
1098}
1099
1100fn trace_semantic_prep_memo_audit(audit: &MemoCacheAuditRecord) {
1101    tracing::trace!(?audit, "semantic prep memo cache audit");
1102}
1103
1104fn prepare_window_with_memo<'a>(
1105    window: &'a [EmbeddingInput],
1106    cache: &mut ContentAddressedMemoCache<MemoizedPreparedMessage>,
1107) -> Vec<Prepared<'a>> {
1108    window
1109        .iter()
1110        .filter_map(|msg| {
1111            let key = semantic_prep_memo_key(&msg.content);
1112            let (lookup, lookup_audit) = cache.get_with_audit(&key);
1113            trace_semantic_prep_memo_audit(&lookup_audit);
1114            match lookup {
1115                MemoLookup::Hit { value } => Some(Prepared {
1116                    msg,
1117                    canonical: value.canonical,
1118                    hash: value.hash,
1119                }),
1120                MemoLookup::Miss | MemoLookup::Quarantined { .. } => {
1121                    let canonical = canonicalize_for_embedding(&msg.content);
1122                    if canonical.is_empty() {
1123                        return None;
1124                    }
1125                    let hash = content_hash(&canonical);
1126                    let insert_audit = cache.insert_with_audit(
1127                        key,
1128                        MemoizedPreparedMessage {
1129                            canonical: canonical.clone(),
1130                            hash,
1131                        },
1132                    );
1133                    trace_semantic_prep_memo_audit(&insert_audit);
1134                    Some(Prepared {
1135                        msg,
1136                        canonical,
1137                        hash,
1138                    })
1139                }
1140            }
1141        })
1142        .collect()
1143}
1144
1145/// Canonicalize + hash a window of messages. Default is serial; opt in to
1146/// the rayon-parallel path via `CASS_SEMANTIC_PREP_PARALLEL=1` (see the
1147/// `parallel_prep_enabled` docstring for why it is not the default).
1148/// Parallel results preserve input order via `par_iter().filter_map().collect()`.
1149/// Messages whose canonical form is empty are filtered out so the embedder
1150/// batch is never polluted with useless inputs.
1151fn prepare_window<'a>(window: &'a [EmbeddingInput], serial: bool) -> Vec<Prepared<'a>> {
1152    let prep = |msg: &'a EmbeddingInput| -> Option<Prepared<'a>> {
1153        let canonical = canonicalize_for_embedding(&msg.content);
1154        if canonical.is_empty() {
1155            return None;
1156        }
1157        let hash = content_hash(&canonical);
1158        Some(Prepared {
1159            msg,
1160            canonical,
1161            hash,
1162        })
1163    };
1164
1165    if serial {
1166        window.iter().filter_map(prep).collect()
1167    } else {
1168        window.par_iter().filter_map(prep).collect()
1169    }
1170}
1171
1172fn flush_prepared_batch(
1173    batch: &[Prepared<'_>],
1174    embeddings: &mut Vec<EmbeddedMessage>,
1175    pb: &ProgressBar,
1176    embedder: &dyn Embedder,
1177) -> Result<()> {
1178    if batch.is_empty() {
1179        return Ok(());
1180    }
1181
1182    let texts: Vec<&str> = batch.iter().map(|p| p.canonical.as_str()).collect();
1183    let vectors = embedder
1184        .embed_batch_sync(&texts)
1185        .map_err(|e| anyhow::anyhow!("embedding failed: {e}"))?;
1186
1187    if vectors.len() != batch.len() {
1188        bail!(
1189            "embedder returned {} embeddings for {} inputs",
1190            vectors.len(),
1191            batch.len()
1192        );
1193    }
1194
1195    for (prepared, vector) in batch.iter().zip(vectors) {
1196        if vector.len() != embedder.dimension() {
1197            bail!(
1198                "embedding dimension mismatch: expected {}, got {}",
1199                embedder.dimension(),
1200                vector.len()
1201            );
1202        }
1203        embeddings.push(EmbeddedMessage {
1204            message_id: prepared.msg.message_id,
1205            created_at_ms: prepared.msg.created_at_ms,
1206            agent_id: prepared.msg.agent_id,
1207            workspace_id: prepared.msg.workspace_id,
1208            source_id: prepared.msg.source_id,
1209            role: prepared.msg.role,
1210            chunk_idx: prepared.msg.chunk_idx,
1211            content_hash: prepared.hash,
1212            embedding: vector,
1213        });
1214    }
1215
1216    pb.inc(saturating_u64_from_usize(batch.len()));
1217    Ok(())
1218}
1219
1220pub struct SemanticIndexer {
1221    embedder: Box<dyn Embedder>,
1222    batch_size: usize,
1223}
1224
1225impl SemanticIndexer {
1226    pub fn new(embedder_type: &str, data_dir: Option<&Path>) -> Result<Self> {
1227        let embedder: Box<dyn Embedder> = match embedder_type {
1228            "fastembed" | "minilm" | "snowflake-arctic-s" | "nomic-embed" => {
1229                let dir = data_dir
1230                    .ok_or_else(|| anyhow::anyhow!("data_dir required for fastembed embedder"))?;
1231                let embedder_name = if embedder_type == "fastembed" {
1232                    "minilm"
1233                } else {
1234                    embedder_type
1235                };
1236                Box::new(
1237                    FastEmbedder::load_by_name(dir, embedder_name)
1238                        .map_err(|e| anyhow::anyhow!("fastembed unavailable: {e}"))?,
1239                )
1240            }
1241            "hash" => Box::new(HashEmbedder::default()),
1242            other => bail!("unknown embedder: {other}"),
1243        };
1244
1245        Ok(Self {
1246            embedder,
1247            batch_size: resolved_default_batch_size(),
1248        })
1249    }
1250
1251    pub fn with_batch_size(mut self, batch_size: usize) -> Result<Self> {
1252        if batch_size == 0 {
1253            bail!("batch_size must be > 0");
1254        }
1255        self.batch_size = batch_size;
1256        Ok(self)
1257    }
1258
1259    pub fn batch_size(&self) -> usize {
1260        self.batch_size
1261    }
1262
1263    pub fn embedder_id(&self) -> &str {
1264        self.embedder.id()
1265    }
1266
1267    pub fn embedder_dimension(&self) -> usize {
1268        self.embedder.dimension()
1269    }
1270
1271    pub fn embed_messages(&self, messages: &[EmbeddingInput]) -> Result<Vec<EmbeddedMessage>> {
1272        if messages.is_empty() {
1273            return Ok(Vec::new());
1274        }
1275
1276        let show_progress = std::io::stderr().is_terminal();
1277        let pb = ProgressBar::new(saturating_u64_from_usize(messages.len()));
1278        if show_progress {
1279            let style = ProgressStyle::default_bar()
1280                .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} messages embedded")
1281                .unwrap_or_else(|_| ProgressStyle::default_bar());
1282            pb.set_style(style);
1283        } else {
1284            pb.set_draw_target(ProgressDrawTarget::hidden());
1285        }
1286
1287        let mut embeddings = Vec::with_capacity(messages.len());
1288
1289        // Process the corpus in windows of ~4 batches. Within each window,
1290        // rayon parallelizes the canonicalize + hash prep across cores; the
1291        // ONNX embedder is then fed serially in `batch_size` chunks so its
1292        // internal thread pool stays saturated without being starved by the
1293        // single-threaded prep loop we had before. `with_batch_size` and
1294        // `resolved_default_batch_size` both guarantee `batch_size >= 1`,
1295        // so saturating_mul(4) is always >= batch_size — no further clamp.
1296        let window = self.batch_size.saturating_mul(4);
1297        let serial_prep = !parallel_prep_enabled();
1298        let prep_memo_capacity = resolved_semantic_prep_memo_capacity();
1299        let mut prep_memo =
1300            serial_prep.then(|| ContentAddressedMemoCache::with_capacity(prep_memo_capacity));
1301        for (window_index, window_slice) in messages.chunks(window).enumerate() {
1302            let prepared_window = match prep_memo.as_mut() {
1303                Some(cache) => {
1304                    let stats_before = cache.stats().clone();
1305                    let prepared_window = prepare_window_with_memo(window_slice, cache);
1306                    trace_semantic_prep_memo_window(
1307                        window_index,
1308                        window_slice.len(),
1309                        prepared_window.len(),
1310                        prep_memo_capacity,
1311                        &stats_before,
1312                        cache.stats(),
1313                    );
1314                    prepared_window
1315                }
1316                None => prepare_window(window_slice, false),
1317            };
1318            let skipped_in_window = window_slice.len() - prepared_window.len();
1319            if skipped_in_window > 0 {
1320                pb.inc(saturating_u64_from_usize(skipped_in_window));
1321            }
1322
1323            for batch in prepared_window.chunks(self.batch_size) {
1324                flush_prepared_batch(batch, &mut embeddings, &pb, self.embedder.as_ref())?;
1325            }
1326        }
1327
1328        if let Some(cache) = prep_memo.as_ref() {
1329            let stats = cache.stats();
1330            tracing::debug!(
1331                algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1332                algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1333                hits = stats.hits,
1334                misses = stats.misses,
1335                inserts = stats.inserts,
1336                quarantined = stats.quarantined,
1337                live_entries = stats.live_entries,
1338                entry_capacity = prep_memo_capacity,
1339                "semantic prep memo cache summary"
1340            );
1341        }
1342
1343        pb.finish_with_message("Embedding complete");
1344        Ok(embeddings)
1345    }
1346
1347    pub fn build_and_save_index<I>(
1348        &self,
1349        embedded_messages: I,
1350        data_dir: &Path,
1351    ) -> Result<FsVectorIndex>
1352    where
1353        I: IntoIterator<Item = EmbeddedMessage>,
1354    {
1355        let index_path = vector_index_path(data_dir, self.embedder_id());
1356        self.build_and_save_index_at_path(embedded_messages, &index_path)
1357    }
1358
1359    pub fn build_and_save_index_shards<I>(
1360        &self,
1361        embedded_messages: I,
1362        data_dir: &Path,
1363        plan: SemanticShardBuildPlan,
1364    ) -> Result<SemanticShardBuildOutcome>
1365    where
1366        I: IntoIterator<Item = EmbeddedMessage>,
1367    {
1368        if plan.db_fingerprint.trim().is_empty() {
1369            bail!("semantic shard build requires a non-empty DB fingerprint");
1370        }
1371        if plan.max_records_per_shard == 0 {
1372            bail!("semantic shard build requires max_records_per_shard > 0");
1373        }
1374
1375        let mut shard_records = Vec::new();
1376        let mut index_paths = Vec::new();
1377        let mut ann_index_paths = Vec::new();
1378        let mut current_records = Vec::with_capacity(plan.max_records_per_shard);
1379        let mut shard_index = 0u32;
1380        let mut total_docs = 0u64;
1381
1382        for embedded in embedded_messages {
1383            current_records.push(embedded);
1384            if current_records.len() >= plan.max_records_per_shard {
1385                let records = std::mem::take(&mut current_records);
1386                let (record, path, ann_path) =
1387                    self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1388                total_docs = total_docs.saturating_add(record.doc_count);
1389                shard_records.push(record);
1390                index_paths.push(path);
1391                if let Some(path) = ann_path {
1392                    ann_index_paths.push(path);
1393                }
1394                shard_index = shard_index
1395                    .checked_add(1)
1396                    .context("semantic shard index overflow")?;
1397            }
1398        }
1399
1400        if !current_records.is_empty() {
1401            let records = std::mem::take(&mut current_records);
1402            let (record, path, ann_path) =
1403                self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1404            total_docs = total_docs.saturating_add(record.doc_count);
1405            shard_records.push(record);
1406            index_paths.push(path);
1407            if let Some(path) = ann_path {
1408                ann_index_paths.push(path);
1409            }
1410        }
1411
1412        let shard_count = u32::try_from(shard_records.len())
1413            .context("semantic shard generation exceeded u32 shard count")?;
1414        for record in &mut shard_records {
1415            record.shard_count = shard_count;
1416        }
1417
1418        let mut shard_manifest = SemanticShardManifest::load_or_default(data_dir)
1419            .map_err(|err| anyhow::anyhow!("loading semantic shard manifest for publish: {err}"))?;
1420        shard_manifest.replace_shards_for_generation(
1421            plan.tier,
1422            self.embedder_id(),
1423            &plan.db_fingerprint,
1424            shard_records,
1425        );
1426        shard_manifest
1427            .save(data_dir)
1428            .map_err(|err| anyhow::anyhow!("saving semantic shard manifest: {err}"))?;
1429        let summary = shard_manifest.summary(plan.tier, self.embedder_id(), &plan.db_fingerprint);
1430
1431        tracing::info!(
1432            tier = plan.tier.as_str(),
1433            embedder = self.embedder_id(),
1434            shard_count,
1435            doc_count = total_docs,
1436            total_conversations = plan.total_conversations,
1437            "published semantic shard generation sidecar"
1438        );
1439
1440        Ok(SemanticShardBuildOutcome {
1441            tier: plan.tier,
1442            embedder_id: self.embedder_id().to_string(),
1443            shard_count,
1444            doc_count: total_docs,
1445            total_conversations: plan.total_conversations,
1446            index_paths,
1447            ann_index_paths,
1448            shard_manifest_path: SemanticShardManifest::path(data_dir),
1449            complete: summary.complete,
1450        })
1451    }
1452
1453    fn write_semantic_shard(
1454        &self,
1455        embedded_messages: Vec<EmbeddedMessage>,
1456        data_dir: &Path,
1457        plan: &SemanticShardBuildPlan,
1458        shard_index: u32,
1459    ) -> Result<(SemanticShardRecord, PathBuf, Option<PathBuf>)> {
1460        let started_at_ms = now_ms();
1461        let shard_path = semantic_shard_index_path(
1462            data_dir,
1463            plan.tier,
1464            self.embedder_id(),
1465            &plan.db_fingerprint,
1466            shard_index,
1467        );
1468        let shard_index_file = self.build_and_save_index_at_path(embedded_messages, &shard_path)?;
1469        let size_bytes = fs::metadata(&shard_path)
1470            .with_context(|| format!("stat semantic shard {}", shard_path.display()))?
1471            .len();
1472        let (ann_index_path, ann_size_bytes, ann_ready, ann_absolute_path) = if plan.build_ann {
1473            let ann_path = semantic_shard_ann_index_path(
1474                data_dir,
1475                plan.tier,
1476                self.embedder_id(),
1477                &plan.db_fingerprint,
1478                shard_index,
1479            );
1480            let config = FsHnswConfig {
1481                m: FS_HNSW_DEFAULT_M,
1482                ef_construction: FS_HNSW_DEFAULT_EF_CONSTRUCTION,
1483                ..FsHnswConfig::default()
1484            };
1485            let hnsw = FsHnswIndex::build_from_vector_index(&shard_index_file, config)
1486                .map_err(|err| anyhow::anyhow!("build shard HNSW index failed: {err}"))?;
1487            hnsw.save(&ann_path)
1488                .map_err(|err| anyhow::anyhow!("save shard HNSW index failed: {err}"))?;
1489            let ann_size_bytes = fs::metadata(&ann_path)
1490                .with_context(|| format!("stat semantic shard ANN {}", ann_path.display()))?
1491                .len();
1492            let relative_ann_path = ann_path
1493                .strip_prefix(data_dir)
1494                .unwrap_or(ann_path.as_path())
1495                .to_string_lossy()
1496                .to_string();
1497            (
1498                Some(relative_ann_path),
1499                ann_size_bytes,
1500                true,
1501                Some(ann_path),
1502            )
1503        } else {
1504            (None, 0, false, None)
1505        };
1506        let relative_index_path = shard_path
1507            .strip_prefix(data_dir)
1508            .unwrap_or(shard_path.as_path())
1509            .to_string_lossy()
1510            .to_string();
1511        let record = SemanticShardRecord {
1512            tier: plan.tier,
1513            embedder_id: self.embedder_id().to_string(),
1514            model_revision: plan.model_revision.clone(),
1515            schema_version: SEMANTIC_SCHEMA_VERSION,
1516            chunking_version: CHUNKING_STRATEGY_VERSION,
1517            dimension: self.embedder_dimension(),
1518            shard_index,
1519            shard_count: 0,
1520            doc_count: u64::try_from(shard_index_file.record_count()).unwrap_or(u64::MAX),
1521            total_conversations: plan.total_conversations,
1522            db_fingerprint: plan.db_fingerprint.clone(),
1523            index_path: relative_index_path,
1524            quantization: "f16".to_string(),
1525            mmap_ready: true,
1526            ann_index_path,
1527            ann_size_bytes,
1528            ann_ready,
1529            size_bytes,
1530            started_at_ms,
1531            completed_at_ms: now_ms(),
1532            ready: true,
1533        };
1534        Ok((record, shard_path, ann_absolute_path))
1535    }
1536
1537    fn build_and_save_index_at_path<I>(
1538        &self,
1539        embedded_messages: I,
1540        index_path: &Path,
1541    ) -> Result<FsVectorIndex>
1542    where
1543        I: IntoIterator<Item = EmbeddedMessage>,
1544    {
1545        if let Some(parent) = index_path.parent() {
1546            std::fs::create_dir_all(parent)?;
1547        }
1548
1549        // Store as f16 by default (smaller, faster I/O). Embeddings are validated by the writer.
1550        let mut writer: FsVectorIndexWriter = FsVectorIndex::create_with_revision(
1551            index_path,
1552            self.embedder_id(),
1553            "1.0",
1554            self.embedder_dimension(),
1555            FsQuantization::F16,
1556        )
1557        .map_err(|err| anyhow::anyhow!("create fsvi index failed: {err}"))?;
1558
1559        let write_result: Result<()> = (|| {
1560            for embedded in embedded_messages {
1561                if embedded.embedding.len() != self.embedder_dimension() {
1562                    bail!(
1563                        "embedding dimension mismatch: expected {}, got {}",
1564                        self.embedder_dimension(),
1565                        embedded.embedding.len()
1566                    );
1567                }
1568                let doc_id = semantic_doc_id_for_embedded(&embedded);
1569                writer
1570                    .write_record(&doc_id, &embedded.embedding)
1571                    .map_err(|err| anyhow::anyhow!("write fsvi record failed: {err}"))?;
1572            }
1573            Ok(())
1574        })();
1575
1576        if let Err(e) = &write_result {
1577            // Clean up partial index file to prevent corruption
1578            tracing::warn!("removing partial vector index after write failure: {e}");
1579            if let Err(rm_err) = std::fs::remove_file(index_path) {
1580                tracing::error!(
1581                    "failed to remove partial index file {}: {rm_err}",
1582                    index_path.display()
1583                );
1584            }
1585            return Err(anyhow::anyhow!("{e}"));
1586        }
1587
1588        writer
1589            .finish()
1590            .map_err(|err| anyhow::anyhow!("finish fsvi index failed: {err}"))?;
1591
1592        FsVectorIndex::open(index_path)
1593            .map_err(|err| anyhow::anyhow!("open fsvi index failed: {err}"))
1594    }
1595
1596    /// Append new embeddings to an existing FSVI index via the WAL.
1597    ///
1598    /// Used for incremental semantic indexing in watch mode. Opens the
1599    /// existing index, appends a batch of new embeddings, and compacts if
1600    /// the WAL has grown large enough.
1601    ///
1602    /// Returns the number of entries appended.
1603    pub fn append_to_index(
1604        &self,
1605        embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1606        data_dir: &Path,
1607    ) -> Result<usize> {
1608        let index_path = vector_index_path(data_dir, self.embedder_id());
1609        self.append_to_index_path(embedded_messages, &index_path)
1610    }
1611
1612    fn append_to_index_path(
1613        &self,
1614        embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1615        index_path: &Path,
1616    ) -> Result<usize> {
1617        let mut index = FsVectorIndex::open(index_path)
1618            .map_err(|err| anyhow::anyhow!("open fsvi index for append: {err}"))?;
1619
1620        let entries: Vec<(String, Vec<f32>)> = embedded_messages
1621            .into_iter()
1622            .map(|em| {
1623                let doc_id = semantic_doc_id_for_embedded(&em);
1624                (doc_id, em.embedding)
1625            })
1626            .collect();
1627
1628        let count = entries.len();
1629        if count == 0 {
1630            return Ok(0);
1631        }
1632
1633        index
1634            .append_batch(&entries)
1635            .map_err(|err| anyhow::anyhow!("append_batch: {err}"))?;
1636
1637        if index.needs_compaction() {
1638            index
1639                .compact()
1640                .map_err(|err| anyhow::anyhow!("compaction: {err}"))?;
1641        }
1642
1643        Ok(count)
1644    }
1645
1646    fn write_backfill_staging_index(
1647        &self,
1648        embedded_messages: Vec<EmbeddedMessage>,
1649        staging_path: &Path,
1650        resume_existing: bool,
1651    ) -> Result<FsVectorIndex> {
1652        if resume_existing && staging_path.exists() {
1653            self.append_to_index_path(embedded_messages, staging_path)?;
1654            FsVectorIndex::open(staging_path)
1655                .map_err(|err| anyhow::anyhow!("open staged semantic index failed: {err}"))
1656        } else {
1657            self.build_and_save_index_at_path(embedded_messages, staging_path)
1658        }
1659    }
1660
1661    pub fn run_backfill_batch(
1662        &self,
1663        messages: &[EmbeddingInput],
1664        data_dir: &Path,
1665        manifest: &mut SemanticManifest,
1666        plan: SemanticBackfillBatchPlan,
1667    ) -> Result<SemanticBackfillBatchOutcome> {
1668        if plan.db_fingerprint.trim().is_empty() {
1669            bail!("semantic backfill requires a non-empty DB fingerprint");
1670        }
1671        if plan.total_conversations == 0 && plan.conversations_in_batch > 0 {
1672            bail!("semantic backfill batch cannot process conversations when total is zero");
1673        }
1674
1675        let manifest_path = SemanticManifest::path(data_dir);
1676        let staging_path = semantic_staging_index_path(
1677            data_dir,
1678            plan.tier,
1679            self.embedder_id(),
1680            &plan.db_fingerprint,
1681        );
1682        let final_path = vector_index_path(data_dir, self.embedder_id());
1683
1684        let prior_checkpoint = manifest
1685            .checkpoint
1686            .as_ref()
1687            .filter(|checkpoint| {
1688                checkpoint.tier == plan.tier
1689                    && checkpoint.embedder_id == self.embedder_id()
1690                    && checkpoint.is_valid(&plan.db_fingerprint)
1691            })
1692            .cloned();
1693        let prior_conversations = prior_checkpoint
1694            .as_ref()
1695            .map_or(0, |checkpoint| checkpoint.conversations_processed);
1696        let prior_docs = prior_checkpoint
1697            .as_ref()
1698            .map_or(0, |checkpoint| checkpoint.docs_embedded);
1699
1700        let embeddings = self.embed_messages(messages)?;
1701        let embedded_docs = u64::try_from(embeddings.len()).unwrap_or(u64::MAX);
1702        let mut staged_index = self.write_backfill_staging_index(
1703            embeddings,
1704            &staging_path,
1705            prior_checkpoint.is_some(),
1706        )?;
1707        let conversations_processed = prior_conversations
1708            .saturating_add(plan.conversations_in_batch)
1709            .min(plan.total_conversations);
1710        let complete = conversations_processed >= plan.total_conversations;
1711
1712        manifest.refresh_backlog(plan.total_conversations, &plan.db_fingerprint);
1713
1714        if complete {
1715            let db_fingerprint = plan.db_fingerprint.clone();
1716            if staged_index.wal_record_count() > 0 {
1717                staged_index.compact().map_err(|err| {
1718                    anyhow::anyhow!("compact staged semantic index failed: {err}")
1719                })?;
1720            }
1721            drop(staged_index);
1722            fs::rename(&staging_path, &final_path).with_context(|| {
1723                format!(
1724                    "publishing staged semantic index {} to {}",
1725                    staging_path.display(),
1726                    final_path.display()
1727                )
1728            })?;
1729            sync_parent_directory(&final_path)?;
1730            let published_index = FsVectorIndex::open(&final_path)
1731                .map_err(|err| anyhow::anyhow!("open published semantic index failed: {err}"))?;
1732            let size_bytes = fs::metadata(&final_path)
1733                .with_context(|| format!("stat published semantic index {}", final_path.display()))?
1734                .len();
1735            let relative_index_path = final_path
1736                .strip_prefix(data_dir)
1737                .unwrap_or(final_path.as_path())
1738                .to_string_lossy()
1739                .to_string();
1740            manifest.publish_artifact(ArtifactRecord {
1741                tier: plan.tier,
1742                embedder_id: self.embedder_id().to_string(),
1743                model_revision: plan.model_revision,
1744                schema_version: SEMANTIC_SCHEMA_VERSION,
1745                chunking_version: CHUNKING_STRATEGY_VERSION,
1746                dimension: self.embedder_dimension(),
1747                doc_count: u64::try_from(published_index.record_count()).unwrap_or(u64::MAX),
1748                conversation_count: conversations_processed,
1749                db_fingerprint: plan.db_fingerprint,
1750                index_path: relative_index_path,
1751                size_bytes,
1752                started_at_ms: prior_checkpoint
1753                    .as_ref()
1754                    .map_or_else(now_ms, |checkpoint| checkpoint.saved_at_ms),
1755                completed_at_ms: now_ms(),
1756                ready: true,
1757            });
1758            manifest.refresh_backlog(plan.total_conversations, &db_fingerprint);
1759            manifest.save(data_dir)?;
1760        } else {
1761            let docs_embedded_on_disk =
1762                u64::try_from(staged_index.record_count()).unwrap_or(u64::MAX);
1763            let checkpoint_docs = prior_docs
1764                .saturating_add(embedded_docs)
1765                .max(docs_embedded_on_disk);
1766            manifest.save_checkpoint(BuildCheckpoint {
1767                tier: plan.tier,
1768                embedder_id: self.embedder_id().to_string(),
1769                last_offset: plan.last_offset,
1770                docs_embedded: checkpoint_docs,
1771                conversations_processed,
1772                total_conversations: plan.total_conversations,
1773                db_fingerprint: plan.db_fingerprint,
1774                schema_version: SEMANTIC_SCHEMA_VERSION,
1775                chunking_version: CHUNKING_STRATEGY_VERSION,
1776                saved_at_ms: now_ms(),
1777            });
1778            manifest.save(data_dir)?;
1779        }
1780
1781        Ok(SemanticBackfillBatchOutcome {
1782            tier: plan.tier,
1783            embedder_id: self.embedder_id().to_string(),
1784            embedded_docs,
1785            conversations_processed,
1786            total_conversations: plan.total_conversations,
1787            last_offset: plan.last_offset,
1788            checkpoint_saved: !complete,
1789            published: complete,
1790            index_path: if complete { final_path } else { staging_path },
1791            manifest_path,
1792        })
1793    }
1794
1795    pub fn run_backfill_from_storage(
1796        &self,
1797        storage: &FrankenStorage,
1798        data_dir: &Path,
1799        manifest: &mut SemanticManifest,
1800        plan: SemanticBackfillStoragePlan,
1801    ) -> Result<SemanticBackfillBatchOutcome> {
1802        let after_conversation_id = matching_semantic_checkpoint_offset(
1803            manifest,
1804            plan.tier,
1805            self.embedder_id(),
1806            &plan.db_fingerprint,
1807        );
1808        let batch = fetch_canonical_embedding_batch(
1809            storage,
1810            after_conversation_id,
1811            plan.max_conversations,
1812        )?;
1813        self.run_backfill_batch(
1814            &batch.inputs,
1815            data_dir,
1816            manifest,
1817            SemanticBackfillBatchPlan {
1818                tier: plan.tier,
1819                db_fingerprint: plan.db_fingerprint,
1820                model_revision: plan.model_revision,
1821                total_conversations: batch.total_conversations,
1822                conversations_in_batch: batch.conversations_in_batch,
1823                last_offset: batch.last_conversation_id,
1824            },
1825        )
1826    }
1827
1828    /// Build and save an HNSW index for approximate nearest neighbor search.
1829    ///
1830    /// This creates an HNSW graph structure from the existing VectorIndex,
1831    /// enabling O(log n) approximate search with the `--approximate` flag.
1832    ///
1833    /// # Arguments
1834    /// * `vector_index` - The VectorIndex to build HNSW from
1835    /// * `data_dir` - Directory to save the HNSW index
1836    /// * `m` - Max connections per node (default: 16)
1837    /// * `ef_construction` - Search width during build (default: 200)
1838    ///
1839    /// # Returns
1840    /// Path to the saved HNSW index file
1841    pub fn build_hnsw_index(
1842        &self,
1843        vector_index: &FsVectorIndex,
1844        data_dir: &Path,
1845        m: Option<usize>,
1846        ef_construction: Option<usize>,
1847    ) -> Result<PathBuf> {
1848        let m = m.unwrap_or(FS_HNSW_DEFAULT_M);
1849        let ef_construction = ef_construction.unwrap_or(FS_HNSW_DEFAULT_EF_CONSTRUCTION);
1850
1851        tracing::info!(
1852            embedder = self.embedder_id(),
1853            count = vector_index.record_count(),
1854            m,
1855            ef_construction,
1856            "Building HNSW index for approximate nearest neighbor search"
1857        );
1858
1859        let config = FsHnswConfig {
1860            m,
1861            ef_construction,
1862            ..FsHnswConfig::default()
1863        };
1864        let hnsw = FsHnswIndex::build_from_vector_index(vector_index, config)
1865            .map_err(|err| anyhow::anyhow!("build HNSW index failed: {err}"))?;
1866
1867        let hnsw_path = hnsw_index_path(data_dir, self.embedder_id());
1868        if let Some(parent) = hnsw_path.parent() {
1869            std::fs::create_dir_all(parent)?;
1870        }
1871        hnsw.save(&hnsw_path)
1872            .map_err(|err| anyhow::anyhow!("save HNSW index failed: {err}"))?;
1873
1874        tracing::info!(?hnsw_path, "Saved HNSW index");
1875        Ok(hnsw_path)
1876    }
1877}
1878
1879#[cfg(test)]
1880mod tests {
1881    use super::*;
1882    use crate::model::types::{Agent, AgentKind, Conversation, Message, MessageRole};
1883    use crate::storage::sqlite::FrankenStorage;
1884    use serde_json::json;
1885    use std::path::Path;
1886    use tempfile::tempdir;
1887
1888    #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1889    struct ComparableSemanticInput {
1890        message_id: u64,
1891        created_at_ms: i64,
1892        agent_id: u32,
1893        workspace_id: u32,
1894        source_id: u32,
1895        role: u8,
1896        content: String,
1897    }
1898
1899    fn comparable_semantic_inputs(mut inputs: Vec<EmbeddingInput>) -> Vec<ComparableSemanticInput> {
1900        let mut comparable: Vec<ComparableSemanticInput> = inputs
1901            .drain(..)
1902            .map(|input| ComparableSemanticInput {
1903                message_id: input.message_id,
1904                created_at_ms: input.created_at_ms,
1905                agent_id: input.agent_id,
1906                workspace_id: input.workspace_id,
1907                source_id: input.source_id,
1908                role: input.role,
1909                content: input.content,
1910            })
1911            .collect();
1912        comparable.sort();
1913        comparable
1914    }
1915
1916    fn test_conversation(external_id: &str, body: &str) -> Conversation {
1917        test_conversation_fixture(
1918            external_id,
1919            vec![Message {
1920                id: None,
1921                idx: 0,
1922                role: MessageRole::User,
1923                author: None,
1924                created_at: Some(1_700_000_000_500),
1925                content: body.to_string(),
1926                extra_json: json!({}),
1927                snippets: Vec::new(),
1928            }],
1929            "local",
1930            None,
1931        )
1932    }
1933
1934    fn test_conversation_with_messages(external_id: &str, messages: Vec<Message>) -> Conversation {
1935        test_conversation_fixture(external_id, messages, "remote-laptop", Some("builder-host"))
1936    }
1937
1938    fn test_conversation_fixture(
1939        external_id: &str,
1940        messages: Vec<Message>,
1941        source_id: &str,
1942        origin_host: Option<&str>,
1943    ) -> Conversation {
1944        Conversation {
1945            id: None,
1946            agent_slug: "codex".to_string(),
1947            workspace: None,
1948            external_id: Some(external_id.to_string()),
1949            title: Some(format!("semantic {external_id}")),
1950            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
1951            started_at: Some(1_700_000_000_000),
1952            ended_at: Some(1_700_000_001_000),
1953            approx_tokens: None,
1954            metadata_json: json!({}),
1955            messages,
1956            source_id: source_id.to_string(),
1957            origin_host: origin_host.map(str::to_string),
1958        }
1959    }
1960
1961    fn default_scheduler_signals() -> SemanticBackfillSchedulerSignals {
1962        SemanticBackfillSchedulerSignals {
1963            foreground_pressure: false,
1964            lexical_repair_active: false,
1965            force: false,
1966            operator_disabled: false,
1967        }
1968    }
1969
1970    struct EnvVarGuard {
1971        key: &'static str,
1972        prior: Option<String>,
1973    }
1974
1975    impl EnvVarGuard {
1976        fn set(key: &'static str, value: &str) -> Self {
1977            let prior = std::env::var(key).ok();
1978            // SAFETY: focused tests temporarily mutate process env and restore on drop.
1979            unsafe {
1980                std::env::set_var(key, value);
1981            }
1982            Self { key, prior }
1983        }
1984
1985        fn remove(key: &'static str) -> Self {
1986            let prior = std::env::var(key).ok();
1987            // SAFETY: focused tests temporarily mutate process env and restore on drop.
1988            unsafe {
1989                std::env::remove_var(key);
1990            }
1991            Self { key, prior }
1992        }
1993    }
1994
1995    impl Drop for EnvVarGuard {
1996        fn drop(&mut self) {
1997            // SAFETY: restores the process env value captured by this test guard.
1998            unsafe {
1999                match self.prior.as_deref() {
2000                    Some(value) => std::env::set_var(self.key, value),
2001                    None => std::env::remove_var(self.key),
2002                }
2003            }
2004        }
2005    }
2006
2007    #[test]
2008    fn semantic_backfill_scheduler_runs_and_scales_batch_under_idle_budget() {
2009        let policy = SemanticPolicy::compiled_defaults();
2010        let decision = semantic_backfill_scheduler_decision_for_capacity(
2011            &policy,
2012            64,
2013            &default_scheduler_signals(),
2014            80,
2015        );
2016
2017        assert!(decision.should_run());
2018        assert_eq!(decision.state, SemanticBackfillSchedulerState::Running);
2019        assert_eq!(
2020            decision.reason,
2021            SemanticBackfillSchedulerReason::IdleBudgetAvailable
2022        );
2023        assert_eq!(decision.scheduled_batch_conversations, 51);
2024        assert_eq!(decision.current_capacity_pct, 80);
2025        assert_eq!(decision.next_eligible_after_ms, 0);
2026    }
2027
2028    #[test]
2029    fn semantic_backfill_scheduler_reason_next_steps_are_stable() {
2030        for (reason, expected) in [
2031            (
2032                SemanticBackfillSchedulerReason::IdleBudgetAvailable,
2033                "background semantic backfill is within idle budgets",
2034            ),
2035            (
2036                SemanticBackfillSchedulerReason::OperatorDisabled,
2037                "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE",
2038            ),
2039            (
2040                SemanticBackfillSchedulerReason::PolicyDisabled,
2041                "semantic policy disables background semantic backfill",
2042            ),
2043            (
2044                SemanticBackfillSchedulerReason::ForegroundPressure,
2045                "foreground pressure is present; retry after the idle delay",
2046            ),
2047            (
2048                SemanticBackfillSchedulerReason::LexicalRepairActive,
2049                "lexical repair is active; semantic backfill is yielding",
2050            ),
2051            (
2052                SemanticBackfillSchedulerReason::CapacityBelowFloor,
2053                "machine responsiveness capacity is below the semantic backfill floor",
2054            ),
2055            (
2056                SemanticBackfillSchedulerReason::ThreadBudgetZero,
2057                "semantic backfill thread budget is zero",
2058            ),
2059            (
2060                SemanticBackfillSchedulerReason::BatchBudgetZero,
2061                "semantic backfill batch budget is zero",
2062            ),
2063        ] {
2064            assert_eq!(reason.next_step(), expected, "{reason:?}");
2065        }
2066    }
2067
2068    #[test]
2069    fn semantic_backfill_scheduler_yields_to_foreground_and_lexical_pressure() {
2070        let policy = SemanticPolicy::compiled_defaults();
2071        let foreground = SemanticBackfillSchedulerSignals {
2072            foreground_pressure: true,
2073            ..default_scheduler_signals()
2074        };
2075        let foreground_decision =
2076            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &foreground, 100);
2077        assert!(!foreground_decision.should_run());
2078        assert_eq!(
2079            foreground_decision.state,
2080            SemanticBackfillSchedulerState::Paused
2081        );
2082        assert_eq!(
2083            foreground_decision.reason,
2084            SemanticBackfillSchedulerReason::ForegroundPressure
2085        );
2086        assert_eq!(
2087            foreground_decision.next_eligible_after_ms,
2088            policy.idle_delay_seconds * 1000
2089        );
2090
2091        let lexical_repair = SemanticBackfillSchedulerSignals {
2092            lexical_repair_active: true,
2093            ..default_scheduler_signals()
2094        };
2095        let lexical_decision =
2096            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &lexical_repair, 100);
2097        assert!(!lexical_decision.should_run());
2098        assert_eq!(
2099            lexical_decision.state,
2100            SemanticBackfillSchedulerState::Paused
2101        );
2102        assert_eq!(
2103            lexical_decision.reason,
2104            SemanticBackfillSchedulerReason::LexicalRepairActive
2105        );
2106    }
2107
2108    #[test]
2109    fn semantic_backfill_scheduler_honors_policy_disable_and_force_override() {
2110        let mut policy = SemanticPolicy::compiled_defaults();
2111        policy.mode = crate::search::policy::SemanticMode::LexicalOnly;
2112
2113        let disabled = semantic_backfill_scheduler_decision_for_capacity(
2114            &policy,
2115            64,
2116            &default_scheduler_signals(),
2117            100,
2118        );
2119        assert!(!disabled.should_run());
2120        assert_eq!(disabled.state, SemanticBackfillSchedulerState::Disabled);
2121        assert_eq!(
2122            disabled.reason,
2123            SemanticBackfillSchedulerReason::PolicyDisabled
2124        );
2125
2126        let forced = SemanticBackfillSchedulerSignals {
2127            force: true,
2128            ..default_scheduler_signals()
2129        };
2130        let forced_decision =
2131            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &forced, 100);
2132        assert!(forced_decision.should_run());
2133        assert_eq!(
2134            forced_decision.reason,
2135            SemanticBackfillSchedulerReason::IdleBudgetAvailable
2136        );
2137        assert!(forced_decision.forced);
2138    }
2139
2140    #[test]
2141    fn test_batch_embedding() {
2142        let indexer = SemanticIndexer::new("hash", None).unwrap();
2143        let messages = vec![
2144            EmbeddingInput::new(1, "Hello world"),
2145            EmbeddingInput::new(2, "Goodbye world"),
2146        ];
2147
2148        let embeddings = indexer.embed_messages(&messages).unwrap();
2149
2150        assert_eq!(embeddings.len(), 2);
2151        assert_eq!(embeddings[0].message_id, 1);
2152        assert_eq!(embeddings[1].message_id, 2);
2153        assert_eq!(embeddings[0].embedding.len(), indexer.embedder_dimension());
2154    }
2155
2156    #[test]
2157    fn test_progress_indicator() {
2158        let indexer = SemanticIndexer::new("hash", None).unwrap();
2159        let messages: Vec<_> = (0..1000)
2160            .map(|i| EmbeddingInput::new(i as u64, format!("Message {}", i)))
2161            .collect();
2162
2163        let embeddings = indexer.embed_messages(&messages).unwrap();
2164        assert_eq!(embeddings.len(), messages.len());
2165    }
2166
2167    #[test]
2168    fn test_build_and_save_index() {
2169        let indexer = SemanticIndexer::new("hash", None).unwrap();
2170        let messages = vec![
2171            EmbeddingInput::new(1, "Hello world"),
2172            EmbeddingInput::new(2, "Goodbye world"),
2173        ];
2174
2175        let embeddings = indexer.embed_messages(&messages).unwrap();
2176        let tmp = tempdir().unwrap();
2177        let index = indexer
2178            .build_and_save_index(embeddings, tmp.path())
2179            .unwrap();
2180        assert_eq!(index.embedder_id(), indexer.embedder_id());
2181        assert_eq!(index.dimension(), indexer.embedder_dimension());
2182        assert_eq!(index.record_count(), 2);
2183    }
2184
2185    #[test]
2186    fn sharded_index_build_writes_sidecar_without_runtime_publish() {
2187        let indexer = SemanticIndexer::new("hash", None).unwrap();
2188        let messages: Vec<_> = (0..5)
2189            .map(|idx| EmbeddingInput::new(idx, format!("semantic shard message {idx}")))
2190            .collect();
2191        let embeddings = indexer.embed_messages(&messages).unwrap();
2192        let tmp = tempdir().unwrap();
2193
2194        let outcome = indexer
2195            .build_and_save_index_shards(
2196                embeddings,
2197                tmp.path(),
2198                SemanticShardBuildPlan {
2199                    tier: TierKind::Fast,
2200                    db_fingerprint: "db-fp-sharded-build".to_string(),
2201                    model_revision: "hash".to_string(),
2202                    total_conversations: 5,
2203                    max_records_per_shard: 2,
2204                    build_ann: false,
2205                },
2206            )
2207            .unwrap();
2208
2209        assert_eq!(outcome.shard_count, 3);
2210        assert_eq!(outcome.doc_count, 5);
2211        assert_eq!(outcome.total_conversations, 5);
2212        assert!(outcome.complete);
2213        assert_eq!(outcome.index_paths.len(), 3);
2214        for path in &outcome.index_paths {
2215            let shard = FsVectorIndex::open(path).unwrap();
2216            assert_eq!(shard.embedder_id(), indexer.embedder_id());
2217            assert!(shard.record_count() > 0);
2218        }
2219
2220        let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2221        let summary = shards.summary(TierKind::Fast, indexer.embedder_id(), "db-fp-sharded-build");
2222        assert!(summary.complete);
2223        assert_eq!(summary.ready_shards, 3);
2224        assert_eq!(summary.ann_ready_shards, 0);
2225        assert_eq!(summary.doc_count, 5);
2226        assert_eq!(summary.total_conversations, 5);
2227
2228        assert!(
2229            SemanticManifest::load(tmp.path()).unwrap().is_none(),
2230            "sidecar shards must not publish the main runtime manifest"
2231        );
2232        assert!(!vector_index_path(tmp.path(), indexer.embedder_id()).exists());
2233    }
2234
2235    #[test]
2236    fn sharded_index_build_rejects_zero_sized_shards() {
2237        let indexer = SemanticIndexer::new("hash", None).unwrap();
2238        let err = indexer
2239            .build_and_save_index_shards(
2240                std::iter::empty(),
2241                tempdir().unwrap().path(),
2242                SemanticShardBuildPlan {
2243                    tier: TierKind::Fast,
2244                    db_fingerprint: "db-fp-sharded-build".to_string(),
2245                    model_revision: "hash".to_string(),
2246                    total_conversations: 0,
2247                    max_records_per_shard: 0,
2248                    build_ann: false,
2249                },
2250            )
2251            .unwrap_err();
2252
2253        assert!(err.to_string().contains("max_records_per_shard > 0"));
2254    }
2255
2256    #[test]
2257    fn sharded_ann_build_records_per_shard_accelerators() {
2258        let indexer = SemanticIndexer::new("hash", None).unwrap();
2259        let messages: Vec<_> = (0..8)
2260            .map(|idx| EmbeddingInput::new(idx, format!("semantic ann shard message {idx}")))
2261            .collect();
2262        let embeddings = indexer.embed_messages(&messages).unwrap();
2263        let tmp = tempdir().unwrap();
2264
2265        let outcome = indexer
2266            .build_and_save_index_shards(
2267                embeddings,
2268                tmp.path(),
2269                SemanticShardBuildPlan {
2270                    tier: TierKind::Fast,
2271                    db_fingerprint: "db-fp-sharded-ann-build".to_string(),
2272                    model_revision: "hash".to_string(),
2273                    total_conversations: 8,
2274                    max_records_per_shard: 4,
2275                    build_ann: true,
2276                },
2277            )
2278            .unwrap();
2279
2280        assert_eq!(outcome.shard_count, 2);
2281        assert_eq!(outcome.ann_index_paths.len(), 2);
2282        for path in &outcome.ann_index_paths {
2283            assert!(path.exists(), "ANN shard missing at {}", path.display());
2284        }
2285
2286        let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2287        let summary = shards.summary(
2288            TierKind::Fast,
2289            indexer.embedder_id(),
2290            "db-fp-sharded-ann-build",
2291        );
2292        assert!(summary.complete);
2293        assert_eq!(summary.ann_ready_shards, 2);
2294        assert!(summary.ann_size_bytes > 0);
2295        assert!(
2296            shards
2297                .shards
2298                .iter()
2299                .all(|record| record.ann_index_path.is_some() && record.ann_ready)
2300        );
2301    }
2302
2303    /// Golden-output regression: any change to the embedding prep pipeline,
2304    /// the canonicalizer, the hash embedder's deterministic projection, or
2305    /// the ordering semantics of `embed_messages` must not silently mutate
2306    /// the bytes we write to the vector index. This digest is derived from a
2307    /// frozen 64-message corpus processed through the hash embedder; a
2308    /// mismatch means one of those contracts moved.
2309    #[test]
2310    fn embed_messages_golden_digest_hash_embedder() {
2311        use ring::digest::{Context, SHA256};
2312
2313        let corpus: Vec<EmbeddingInput> = (0..64)
2314            .map(|i| {
2315                let body = match i % 5 {
2316                    0 => format!("plain text message number {i}"),
2317                    1 => format!("**bold** line {i} with _emphasis_"),
2318                    2 => format!("```rust\nfn f_{i}() {{ println!(\"{i}\"); }}\n```"),
2319                    3 => format!("   whitespace {i}   "),
2320                    _ => format!("unicode \u{00E9}\u{0301} + emoji \u{1F600} {i}"),
2321                };
2322                EmbeddingInput::new(i as u64, body)
2323            })
2324            .collect();
2325
2326        let indexer = SemanticIndexer::new("hash", None)
2327            .unwrap()
2328            .with_batch_size(16)
2329            .unwrap();
2330        let embeddings = indexer.embed_messages(&corpus).unwrap();
2331
2332        // Digest over (message_id, content_hash, embedding f32 bytes) for every
2333        // embedded message, in the order emitted. Preserves order + content +
2334        // numeric equality without having to compare raw floats directly.
2335        let mut ctx = Context::new(&SHA256);
2336        for em in &embeddings {
2337            ctx.update(&em.message_id.to_le_bytes());
2338            ctx.update(&em.content_hash);
2339            for v in &em.embedding {
2340                ctx.update(&v.to_le_bytes());
2341            }
2342        }
2343        let digest = hex::encode(ctx.finish().as_ref());
2344
2345        // Captured 2026-04-21 against a freshly built hash embedder, batch
2346        // size 16, the frozen 64-message corpus above. Stable so long as
2347        // the prep pipeline, canonicalizer, and HashEmbedder::embed
2348        // implementation are all byte-preserving. If you intentionally
2349        // changed any of those, update this value AND record the reason
2350        // in the commit message.
2351        const EXPECTED: &str = "22d9ae7076925a4b70a194b0f519dfb1d465cc757368c296ef24055a02038c2c";
2352        assert_eq!(
2353            digest, EXPECTED,
2354            "embed_messages golden digest drifted; if this was intentional, \
2355             update EXPECTED in this test and record the reason in the commit message"
2356        );
2357    }
2358
2359    #[test]
2360    fn parallel_prep_matches_serial_prep_bitwise() {
2361        // Mix of short, long, empty, markdown, code-block, and unicode inputs
2362        // to make sure the canonicalizer is exercised across all of its paths.
2363        let inputs: Vec<EmbeddingInput> = (0..500)
2364            .map(|i| {
2365                let text = match i % 7 {
2366                    0 => format!("Plain message number {i} with some ordinary words."),
2367                    1 => format!("**Bold** and _italic_ markdown line {i}"),
2368                    2 => format!(
2369                        "```rust\nfn example_{i}() {{\n    println!(\"code block {i}\");\n}}\n```\nfollow-up text"
2370                    ),
2371                    3 => String::new(), // empty — should be filtered
2372                    4 => format!("   whitespace   galore   {i}   "),
2373                    5 => format!("Unicode \u{00E9}\u{0301} (combining accent) and emoji \u{1F600} line {i}"),
2374                    _ => format!(
2375                        "Mixed line {i}: `inline_code`, [link](http://x), {{braces}}, and \u{201C}curly quotes\u{201D}."
2376                    ),
2377                };
2378                EmbeddingInput::new(i as u64, text)
2379            })
2380            .collect();
2381
2382        let serial = prepare_window(&inputs, true);
2383        let parallel = prepare_window(&inputs, false);
2384
2385        assert_eq!(
2386            serial.len(),
2387            parallel.len(),
2388            "serial and parallel prep should skip the same number of empty canonicals"
2389        );
2390
2391        for (s, p) in serial.iter().zip(parallel.iter()) {
2392            assert_eq!(
2393                s.msg.message_id, p.msg.message_id,
2394                "ordering must be preserved between serial and parallel prep"
2395            );
2396            assert_eq!(
2397                s.canonical, p.canonical,
2398                "canonical form diverged between serial and parallel prep"
2399            );
2400            assert_eq!(
2401                s.hash, p.hash,
2402                "content hash diverged between serial and parallel prep"
2403            );
2404        }
2405    }
2406
2407    #[test]
2408    fn parallel_prep_filters_empty_canonicals() {
2409        let inputs = vec![
2410            EmbeddingInput::new(1, "valid content"),
2411            EmbeddingInput::new(2, ""),
2412            EmbeddingInput::new(3, "   \n\n   \t  "),
2413            EmbeddingInput::new(4, "more valid content"),
2414        ];
2415
2416        let prepared = prepare_window(&inputs, false);
2417        let ids: Vec<u64> = prepared.iter().map(|p| p.msg.message_id).collect();
2418
2419        assert!(ids.contains(&1));
2420        assert!(ids.contains(&4));
2421        // ids 2 and 3 should be dropped because their canonicals are empty.
2422        assert!(!ids.contains(&2));
2423        assert!(!ids.contains(&3));
2424    }
2425
2426    #[test]
2427    fn memoized_serial_prep_matches_stateless_prepare_window() {
2428        let inputs = vec![
2429            EmbeddingInput::new(1, "repeat me exactly"),
2430            EmbeddingInput::new(2, "repeat me exactly"),
2431            EmbeddingInput::new(3, "unique payload"),
2432            EmbeddingInput::new(4, ""),
2433            EmbeddingInput::new(5, "repeat me exactly"),
2434        ];
2435
2436        let baseline = prepare_window(&inputs, true);
2437        let mut cache = ContentAddressedMemoCache::with_capacity(16);
2438        let memoized = prepare_window_with_memo(&inputs, &mut cache);
2439
2440        assert_eq!(baseline.len(), memoized.len());
2441        for (plain, cached) in baseline.iter().zip(memoized.iter()) {
2442            assert_eq!(plain.msg.message_id, cached.msg.message_id);
2443            assert_eq!(plain.canonical, cached.canonical);
2444            assert_eq!(plain.hash, cached.hash);
2445        }
2446    }
2447
2448    #[test]
2449    fn semantic_prep_memo_key_uses_stable_content_hash_bytes() {
2450        let key = semantic_prep_memo_key("repeat me exactly");
2451        let expected = content_hash("repeat me exactly");
2452
2453        assert_eq!(key.content_hash.as_bytes(), expected.as_slice());
2454        assert_eq!(key.content_hash.as_bytes().len(), expected.len());
2455        assert_eq!(key.algorithm, SEMANTIC_PREP_MEMO_ALGORITHM);
2456        assert_eq!(key.algorithm_version, SEMANTIC_PREP_MEMO_VERSION);
2457    }
2458
2459    #[test]
2460    fn memoized_serial_prep_reuses_duplicate_content_across_windows() {
2461        let inputs = vec![
2462            EmbeddingInput::new(1, "repeat me exactly"),
2463            EmbeddingInput::new(2, "repeat me exactly"),
2464            EmbeddingInput::new(3, "unique payload"),
2465            EmbeddingInput::new(4, ""),
2466            EmbeddingInput::new(5, "repeat me exactly"),
2467        ];
2468
2469        let mut cache = ContentAddressedMemoCache::with_capacity(16);
2470        let prepared = prepare_window_with_memo(&inputs, &mut cache);
2471        let stats = cache.stats().clone();
2472
2473        assert_eq!(prepared.len(), 4);
2474        assert_eq!(stats.hits, 2);
2475        assert_eq!(stats.misses, 3);
2476        assert_eq!(stats.inserts, 2);
2477        assert_eq!(stats.live_entries, 2);
2478    }
2479
2480    #[test]
2481    fn packet_embedding_inputs_reuse_memoized_prep_for_duplicate_content() -> Result<()> {
2482        let temp = tempdir().unwrap();
2483        let db_path = temp.path().join("agent_search.db");
2484        let storage = FrankenStorage::open(&db_path)?;
2485        let agent_id = storage.ensure_agent(&Agent {
2486            id: None,
2487            slug: "codex".to_string(),
2488            name: "Codex".to_string(),
2489            version: None,
2490            kind: AgentKind::Cli,
2491        })?;
2492
2493        storage.insert_conversation_tree(
2494            agent_id,
2495            None,
2496            &test_conversation_with_messages(
2497                "packet-memo-conv-one",
2498                vec![
2499                    Message {
2500                        id: None,
2501                        idx: 0,
2502                        role: MessageRole::User,
2503                        author: None,
2504                        created_at: Some(1_700_000_010_100),
2505                        content: "shared semantic payload".to_string(),
2506                        extra_json: json!({}),
2507                        snippets: Vec::new(),
2508                    },
2509                    Message {
2510                        id: None,
2511                        idx: 1,
2512                        role: MessageRole::Agent,
2513                        author: None,
2514                        created_at: Some(1_700_000_010_200),
2515                        content: "unique semantic payload one".to_string(),
2516                        extra_json: json!({}),
2517                        snippets: Vec::new(),
2518                    },
2519                ],
2520            ),
2521        )?;
2522        storage.insert_conversation_tree(
2523            agent_id,
2524            None,
2525            &test_conversation_with_messages(
2526                "packet-memo-conv-two",
2527                vec![
2528                    Message {
2529                        id: None,
2530                        idx: 0,
2531                        role: MessageRole::Tool,
2532                        author: None,
2533                        created_at: Some(1_700_000_010_300),
2534                        content: "shared semantic payload".to_string(),
2535                        extra_json: json!({}),
2536                        snippets: Vec::new(),
2537                    },
2538                    Message {
2539                        id: None,
2540                        idx: 1,
2541                        role: MessageRole::Agent,
2542                        author: None,
2543                        created_at: Some(1_700_000_010_400),
2544                        content: "unique semantic payload two".to_string(),
2545                        extra_json: json!({}),
2546                        snippets: Vec::new(),
2547                    },
2548                ],
2549            ),
2550        )?;
2551
2552        let packet_inputs = packet_embedding_inputs_from_storage(&storage)?;
2553        let mut cache = ContentAddressedMemoCache::with_capacity(16);
2554        let prepared = prepare_window_with_memo(&packet_inputs, &mut cache);
2555        let stats = cache.stats().clone();
2556
2557        assert_eq!(packet_inputs.len(), 4);
2558        assert_eq!(prepared.len(), 4);
2559        assert_eq!(
2560            semantic_prep_memo_key("shared semantic payload")
2561                .content_hash
2562                .as_bytes()
2563                .len(),
2564            32
2565        );
2566        assert_eq!(stats.hits, 1);
2567        assert_eq!(stats.misses, 3);
2568        assert_eq!(stats.inserts, 3);
2569        assert_eq!(stats.live_entries, 3);
2570        Ok(())
2571    }
2572
2573    #[test]
2574    fn backfill_batch_saves_checkpoint_and_staged_index_until_complete() {
2575        let temp = tempdir().unwrap();
2576        let mut manifest = SemanticManifest::default();
2577        let indexer = SemanticIndexer::new("hash", None).unwrap();
2578        let messages = vec![
2579            EmbeddingInput::new(10, "first staged semantic message"),
2580            EmbeddingInput::new(11, "second staged semantic message"),
2581        ];
2582
2583        let outcome = indexer
2584            .run_backfill_batch(
2585                &messages,
2586                temp.path(),
2587                &mut manifest,
2588                SemanticBackfillBatchPlan {
2589                    tier: TierKind::Fast,
2590                    db_fingerprint: "db-fp-backfill-partial".to_string(),
2591                    model_revision: "hash".to_string(),
2592                    total_conversations: 2,
2593                    conversations_in_batch: 1,
2594                    last_offset: 1,
2595                },
2596            )
2597            .unwrap();
2598
2599        assert!(!outcome.published);
2600        assert!(outcome.checkpoint_saved);
2601        assert!(outcome.index_path.exists());
2602        assert!(!vector_index_path(temp.path(), indexer.embedder_id()).exists());
2603        let checkpoint = manifest.checkpoint.as_ref().expect("checkpoint");
2604        assert_eq!(checkpoint.tier, TierKind::Fast);
2605        assert_eq!(checkpoint.conversations_processed, 1);
2606        assert_eq!(checkpoint.docs_embedded, 2);
2607        assert_eq!(manifest.backlog.total_conversations, 2);
2608        assert!(SemanticManifest::path(temp.path()).exists());
2609    }
2610
2611    #[test]
2612    fn backfill_batch_resumes_staged_index_and_publishes_manifest_atomically() {
2613        let temp = tempdir().unwrap();
2614        let mut manifest = SemanticManifest::default();
2615        let indexer = SemanticIndexer::new("hash", None).unwrap();
2616        let db_fingerprint = "db-fp-backfill-complete";
2617        let staging_path = semantic_staging_index_path(
2618            temp.path(),
2619            TierKind::Fast,
2620            indexer.embedder_id(),
2621            db_fingerprint,
2622        );
2623
2624        let first = vec![EmbeddingInput::new(20, "first resume batch")];
2625        let first_outcome = indexer
2626            .run_backfill_batch(
2627                &first,
2628                temp.path(),
2629                &mut manifest,
2630                SemanticBackfillBatchPlan {
2631                    tier: TierKind::Fast,
2632                    db_fingerprint: db_fingerprint.to_string(),
2633                    model_revision: "hash".to_string(),
2634                    total_conversations: 2,
2635                    conversations_in_batch: 1,
2636                    last_offset: 1,
2637                },
2638            )
2639            .unwrap();
2640        assert_eq!(first_outcome.index_path, staging_path);
2641        assert!(staging_path.exists());
2642
2643        let second = vec![EmbeddingInput::new(21, "second resume batch")];
2644        let second_outcome = indexer
2645            .run_backfill_batch(
2646                &second,
2647                temp.path(),
2648                &mut manifest,
2649                SemanticBackfillBatchPlan {
2650                    tier: TierKind::Fast,
2651                    db_fingerprint: db_fingerprint.to_string(),
2652                    model_revision: "hash".to_string(),
2653                    total_conversations: 2,
2654                    conversations_in_batch: 1,
2655                    last_offset: 2,
2656                },
2657            )
2658            .unwrap();
2659
2660        assert!(second_outcome.published);
2661        assert!(!second_outcome.checkpoint_saved);
2662        assert!(!staging_path.exists());
2663        let final_path = vector_index_path(temp.path(), indexer.embedder_id());
2664        assert_eq!(second_outcome.index_path, final_path);
2665        assert!(final_path.exists());
2666        assert!(manifest.checkpoint.is_none());
2667        let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
2668        assert!(artifact.ready);
2669        assert_eq!(artifact.conversation_count, 2);
2670        assert_eq!(artifact.doc_count, 2);
2671        assert_eq!(manifest.backlog.fast_tier_processed, 2);
2672
2673        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
2674        assert!(loaded.checkpoint.is_none());
2675        assert!(loaded.fast_tier.as_ref().is_some_and(|record| record.ready));
2676    }
2677
2678    #[test]
2679    fn backfill_publish_compacts_resumed_wal_before_rename() {
2680        let temp = tempdir().unwrap();
2681        let mut manifest = SemanticManifest::default();
2682        let indexer = SemanticIndexer::new("hash", None).unwrap();
2683        let db_fingerprint = "db-fp-backfill-small-resume";
2684        let first: Vec<EmbeddingInput> = (0..20)
2685            .map(|idx| EmbeddingInput::new(100 + idx, format!("first batch message {idx}")))
2686            .collect();
2687
2688        let first_outcome = indexer
2689            .run_backfill_batch(
2690                &first,
2691                temp.path(),
2692                &mut manifest,
2693                SemanticBackfillBatchPlan {
2694                    tier: TierKind::Fast,
2695                    db_fingerprint: db_fingerprint.to_string(),
2696                    model_revision: "hash".to_string(),
2697                    total_conversations: 2,
2698                    conversations_in_batch: 1,
2699                    last_offset: 1,
2700                },
2701            )
2702            .unwrap();
2703        assert!(first_outcome.checkpoint_saved);
2704
2705        let second = vec![EmbeddingInput::new(200, "small final resume batch")];
2706        let second_outcome = indexer
2707            .run_backfill_batch(
2708                &second,
2709                temp.path(),
2710                &mut manifest,
2711                SemanticBackfillBatchPlan {
2712                    tier: TierKind::Fast,
2713                    db_fingerprint: db_fingerprint.to_string(),
2714                    model_revision: "hash".to_string(),
2715                    total_conversations: 2,
2716                    conversations_in_batch: 1,
2717                    last_offset: 2,
2718                },
2719            )
2720            .unwrap();
2721
2722        assert!(second_outcome.published);
2723        let final_path = vector_index_path(temp.path(), indexer.embedder_id());
2724        let mut final_wal_path = final_path.as_os_str().to_os_string();
2725        final_wal_path.push(".wal");
2726        assert!(!PathBuf::from(final_wal_path).exists());
2727
2728        let published_index = FsVectorIndex::open(&final_path).unwrap();
2729        assert_eq!(published_index.record_count(), 21);
2730        let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
2731        assert_eq!(artifact.doc_count, 21);
2732        assert_eq!(artifact.conversation_count, 2);
2733    }
2734
2735    #[test]
2736    fn backfill_from_storage_fetches_canonical_batches_and_resumes() -> Result<()> {
2737        let temp = tempdir().unwrap();
2738        let db_path = temp.path().join("agent_search.db");
2739        let storage = FrankenStorage::open(&db_path)?;
2740        let agent_id = storage.ensure_agent(&Agent {
2741            id: None,
2742            slug: "codex".to_string(),
2743            name: "Codex".to_string(),
2744            version: None,
2745            kind: AgentKind::Cli,
2746        })?;
2747        storage.insert_conversation_tree(
2748            agent_id,
2749            None,
2750            &test_conversation("first", "first canonical semantic message"),
2751        )?;
2752        storage.insert_conversation_tree(
2753            agent_id,
2754            None,
2755            &test_conversation("second", "second canonical semantic message"),
2756        )?;
2757
2758        let mut manifest = SemanticManifest::default();
2759        let indexer = SemanticIndexer::new("hash", None)?;
2760
2761        let first = indexer.run_backfill_from_storage(
2762            &storage,
2763            temp.path(),
2764            &mut manifest,
2765            SemanticBackfillStoragePlan {
2766                tier: TierKind::Fast,
2767                db_fingerprint: "canonical-db-fp".to_string(),
2768                model_revision: "hash".to_string(),
2769                max_conversations: 1,
2770            },
2771        )?;
2772        assert!(!first.published);
2773        assert!(first.checkpoint_saved);
2774        assert_eq!(first.conversations_processed, 1);
2775        assert_eq!(first.total_conversations, 2);
2776        assert_eq!(first.embedded_docs, 1);
2777        assert!(first.last_offset > 0);
2778
2779        let second = indexer.run_backfill_from_storage(
2780            &storage,
2781            temp.path(),
2782            &mut manifest,
2783            SemanticBackfillStoragePlan {
2784                tier: TierKind::Fast,
2785                db_fingerprint: "canonical-db-fp".to_string(),
2786                model_revision: "hash".to_string(),
2787                max_conversations: 1,
2788            },
2789        )?;
2790        assert!(second.published);
2791        assert!(!second.checkpoint_saved);
2792        assert_eq!(second.conversations_processed, 2);
2793        assert_eq!(second.embedded_docs, 1);
2794        assert!(manifest.checkpoint.is_none());
2795        assert_eq!(
2796            manifest.fast_tier.as_ref().map(|record| record.doc_count),
2797            Some(2)
2798        );
2799        Ok(())
2800    }
2801
2802    #[test]
2803    fn canonical_embedding_batch_uses_conversation_packet_semantic_projection() -> Result<()> {
2804        let temp = tempdir().unwrap();
2805        let db_path = temp.path().join("agent_search.db");
2806        let storage = FrankenStorage::open(&db_path)?;
2807        let agent_id = storage.ensure_agent(&Agent {
2808            id: None,
2809            slug: "codex".to_string(),
2810            name: "Codex".to_string(),
2811            version: None,
2812            kind: AgentKind::Cli,
2813        })?;
2814        storage.insert_conversation_tree(
2815            agent_id,
2816            None,
2817            &test_conversation_with_messages(
2818                "packet-projection",
2819                vec![
2820                    Message {
2821                        id: None,
2822                        idx: 0,
2823                        role: MessageRole::User,
2824                        author: None,
2825                        created_at: Some(1_700_000_000_500),
2826                        content: "user semantic text".to_string(),
2827                        extra_json: json!({}),
2828                        snippets: Vec::new(),
2829                    },
2830                    Message {
2831                        id: None,
2832                        idx: 1,
2833                        role: MessageRole::Tool,
2834                        author: None,
2835                        created_at: Some(1_700_000_000_600),
2836                        content: "tool semantic text".to_string(),
2837                        extra_json: json!({}),
2838                        snippets: Vec::new(),
2839                    },
2840                    Message {
2841                        id: None,
2842                        idx: 2,
2843                        role: MessageRole::System,
2844                        author: None,
2845                        created_at: Some(1_700_000_000_700),
2846                        content: String::new(),
2847                        extra_json: json!({}),
2848                        snippets: Vec::new(),
2849                    },
2850                ],
2851            ),
2852        )?;
2853
2854        let batch = fetch_canonical_embedding_batch(&storage, 0, 1)?;
2855
2856        assert_eq!(batch.conversations_in_batch, 1);
2857        assert_eq!(batch.inputs.len(), 2);
2858        assert_eq!(batch.inputs[0].content, "user semantic text");
2859        assert_eq!(batch.inputs[1].content, "tool semantic text");
2860        assert_eq!(batch.inputs[0].role, role_code_from_str("user").unwrap());
2861        assert_eq!(batch.inputs[1].role, role_code_from_str("tool").unwrap());
2862        let normalized_source_id =
2863            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
2864        let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
2865        assert_eq!(batch.inputs[0].source_id, expected_hash);
2866        assert_eq!(batch.inputs[1].source_id, expected_hash);
2867        Ok(())
2868    }
2869
2870    #[test]
2871    fn packet_embedding_inputs_from_storage_since_only_emits_new_canonical_messages() -> Result<()>
2872    {
2873        let temp = tempdir().unwrap();
2874        let db_path = temp.path().join("agent_search.db");
2875        let storage = FrankenStorage::open(&db_path)?;
2876        let agent_id = storage.ensure_agent(&Agent {
2877            id: None,
2878            slug: "codex".to_string(),
2879            name: "Codex".to_string(),
2880            version: None,
2881            kind: AgentKind::Cli,
2882        })?;
2883        storage.insert_conversation_tree(
2884            agent_id,
2885            None,
2886            &test_conversation_with_messages(
2887                "packet-delta",
2888                vec![
2889                    Message {
2890                        id: None,
2891                        idx: 0,
2892                        role: MessageRole::User,
2893                        author: None,
2894                        created_at: Some(1_700_000_000_500),
2895                        content: "existing semantic text".to_string(),
2896                        extra_json: json!({}),
2897                        snippets: Vec::new(),
2898                    },
2899                    Message {
2900                        id: None,
2901                        idx: 1,
2902                        role: MessageRole::Agent,
2903                        author: None,
2904                        created_at: Some(1_700_000_000_600),
2905                        content: "existing assistant text".to_string(),
2906                        extra_json: json!({}),
2907                        snippets: Vec::new(),
2908                    },
2909                ],
2910            ),
2911        )?;
2912        let watermark: i64 = storage.raw().query_row_map(
2913            "SELECT MAX(id) FROM messages",
2914            &[] as &[ParamValue],
2915            |row| row.get_typed(0),
2916        )?;
2917
2918        storage.insert_conversation_tree(
2919            agent_id,
2920            None,
2921            &test_conversation_with_messages(
2922                "packet-delta",
2923                vec![
2924                    Message {
2925                        id: None,
2926                        idx: 0,
2927                        role: MessageRole::User,
2928                        author: None,
2929                        created_at: Some(1_700_000_000_500),
2930                        content: "existing semantic text".to_string(),
2931                        extra_json: json!({}),
2932                        snippets: Vec::new(),
2933                    },
2934                    Message {
2935                        id: None,
2936                        idx: 1,
2937                        role: MessageRole::Agent,
2938                        author: None,
2939                        created_at: Some(1_700_000_000_600),
2940                        content: "existing assistant text".to_string(),
2941                        extra_json: json!({}),
2942                        snippets: Vec::new(),
2943                    },
2944                    Message {
2945                        id: None,
2946                        idx: 2,
2947                        role: MessageRole::Agent,
2948                        author: None,
2949                        created_at: Some(1_700_000_000_700),
2950                        content: "new packet semantic text".to_string(),
2951                        extra_json: json!({}),
2952                        snippets: Vec::new(),
2953                    },
2954                    Message {
2955                        id: None,
2956                        idx: 3,
2957                        role: MessageRole::System,
2958                        author: None,
2959                        created_at: Some(1_700_000_000_800),
2960                        content: String::new(),
2961                        extra_json: json!({}),
2962                        snippets: Vec::new(),
2963                    },
2964                ],
2965            ),
2966        )?;
2967
2968        let batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
2969
2970        assert_eq!(batch.conversations_in_batch, 1);
2971        assert_eq!(batch.inputs.len(), 1);
2972        assert_eq!(batch.inputs[0].content, "new packet semantic text");
2973        assert_eq!(
2974            batch.inputs[0].role,
2975            role_code_from_str("assistant").unwrap()
2976        );
2977        let normalized_source_id =
2978            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
2979        assert_eq!(
2980            batch.inputs[0].source_id,
2981            crc32fast::hash(normalized_source_id.as_bytes())
2982        );
2983        let expected_raw_max_id: i64 = storage.raw().query_row_map(
2984            "SELECT MAX(id) FROM messages",
2985            &[] as &[ParamValue],
2986            |row| row.get_typed(0),
2987        )?;
2988        assert_eq!(batch.raw_max_message_id, Some(expected_raw_max_id));
2989        Ok(())
2990    }
2991
2992    #[test]
2993    fn packet_catch_up_emits_expected_semantic_docs_after_watermark() -> Result<()> {
2994        let temp = tempdir().unwrap();
2995        let db_path = temp.path().join("agent_search.db");
2996        let storage = FrankenStorage::open(&db_path)?;
2997        let agent_id = storage.ensure_agent(&Agent {
2998            id: None,
2999            slug: "codex".to_string(),
3000            name: "Codex".to_string(),
3001            version: None,
3002            kind: AgentKind::Cli,
3003        })?;
3004        let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3005
3006        storage.insert_conversation_tree(
3007            agent_id,
3008            Some(workspace_id),
3009            &test_conversation_with_messages(
3010                "legacy-packet-semantics",
3011                vec![
3012                    Message {
3013                        id: None,
3014                        idx: 0,
3015                        role: MessageRole::User,
3016                        author: None,
3017                        created_at: Some(1_700_000_000_500),
3018                        content: "before watermark".to_string(),
3019                        extra_json: json!({}),
3020                        snippets: Vec::new(),
3021                    },
3022                    Message {
3023                        id: None,
3024                        idx: 1,
3025                        role: MessageRole::Agent,
3026                        author: None,
3027                        created_at: Some(1_700_000_000_600),
3028                        content: "before watermark assistant".to_string(),
3029                        extra_json: json!({}),
3030                        snippets: Vec::new(),
3031                    },
3032                ],
3033            ),
3034        )?;
3035
3036        let watermark: i64 = storage.raw().query_row_map(
3037            "SELECT MAX(id) FROM messages",
3038            &[] as &[ParamValue],
3039            |row| row.get_typed(0),
3040        )?;
3041
3042        storage.insert_conversation_tree(
3043            agent_id,
3044            Some(workspace_id),
3045            &test_conversation_with_messages(
3046                "legacy-packet-semantics",
3047                vec![
3048                    Message {
3049                        id: None,
3050                        idx: 0,
3051                        role: MessageRole::User,
3052                        author: None,
3053                        created_at: Some(1_700_000_000_500),
3054                        content: "before watermark".to_string(),
3055                        extra_json: json!({}),
3056                        snippets: Vec::new(),
3057                    },
3058                    Message {
3059                        id: None,
3060                        idx: 1,
3061                        role: MessageRole::Agent,
3062                        author: None,
3063                        created_at: Some(1_700_000_000_600),
3064                        content: "before watermark assistant".to_string(),
3065                        extra_json: json!({}),
3066                        snippets: Vec::new(),
3067                    },
3068                    Message {
3069                        id: None,
3070                        idx: 2,
3071                        role: MessageRole::Agent,
3072                        author: None,
3073                        created_at: Some(1_700_000_000_700),
3074                        content: "after watermark assistant".to_string(),
3075                        extra_json: json!({}),
3076                        snippets: Vec::new(),
3077                    },
3078                    Message {
3079                        id: None,
3080                        idx: 3,
3081                        role: MessageRole::System,
3082                        author: None,
3083                        created_at: Some(1_700_000_000_800),
3084                        content: String::new(),
3085                        extra_json: json!({}),
3086                        snippets: Vec::new(),
3087                    },
3088                ],
3089            ),
3090        )?;
3091        storage.insert_conversation_tree(
3092            agent_id,
3093            Some(workspace_id),
3094            &test_conversation_with_messages(
3095                "legacy-packet-semantics-second-conv",
3096                vec![
3097                    Message {
3098                        id: None,
3099                        idx: 0,
3100                        role: MessageRole::Tool,
3101                        author: None,
3102                        created_at: Some(1_700_000_000_900),
3103                        content: "after watermark tool".to_string(),
3104                        extra_json: json!({}),
3105                        snippets: Vec::new(),
3106                    },
3107                    Message {
3108                        id: None,
3109                        idx: 1,
3110                        role: MessageRole::System,
3111                        author: None,
3112                        created_at: Some(1_700_000_001_000),
3113                        content: String::new(),
3114                        extra_json: json!({}),
3115                        snippets: Vec::new(),
3116                    },
3117                ],
3118            ),
3119        )?;
3120
3121        let packet_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3122        let normalized_source_id =
3123            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3124        let source_id_hash = crc32fast::hash(normalized_source_id.as_bytes());
3125        let expected = vec![
3126            ComparableSemanticInput {
3127                message_id: u64::try_from(watermark + 1).unwrap(),
3128                created_at_ms: 1_700_000_000_700,
3129                agent_id: u32::try_from(agent_id).unwrap(),
3130                workspace_id: u32::try_from(workspace_id).unwrap(),
3131                source_id: source_id_hash,
3132                role: role_code_from_str("assistant").unwrap(),
3133                content: "after watermark assistant".to_string(),
3134            },
3135            ComparableSemanticInput {
3136                message_id: u64::try_from(watermark + 3).unwrap(),
3137                created_at_ms: 1_700_000_000_900,
3138                agent_id: u32::try_from(agent_id).unwrap(),
3139                workspace_id: u32::try_from(workspace_id).unwrap(),
3140                source_id: source_id_hash,
3141                role: role_code_from_str("tool").unwrap(),
3142                content: "after watermark tool".to_string(),
3143            },
3144        ];
3145
3146        assert_eq!(comparable_semantic_inputs(packet_batch.inputs), expected);
3147        assert_eq!(packet_batch.conversations_in_batch, 2);
3148        assert_eq!(packet_batch.raw_max_message_id, Some(watermark + 4));
3149        Ok(())
3150    }
3151
3152    #[test]
3153    fn packet_embedding_inputs_for_message_ids_matches_since_selection() -> Result<()> {
3154        let temp = tempdir().unwrap();
3155        let db_path = temp.path().join("agent_search.db");
3156        let storage = FrankenStorage::open(&db_path)?;
3157        let agent_id = storage.ensure_agent(&Agent {
3158            id: None,
3159            slug: "codex".to_string(),
3160            name: "Codex".to_string(),
3161            version: None,
3162            kind: AgentKind::Cli,
3163        })?;
3164        let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3165
3166        storage.insert_conversation_tree(
3167            agent_id,
3168            Some(workspace_id),
3169            &test_conversation_with_messages(
3170                "selected-vs-since",
3171                vec![
3172                    Message {
3173                        id: None,
3174                        idx: 0,
3175                        role: MessageRole::User,
3176                        author: None,
3177                        created_at: Some(1_700_000_100_100),
3178                        content: "before watermark".to_string(),
3179                        extra_json: json!({}),
3180                        snippets: Vec::new(),
3181                    },
3182                    Message {
3183                        id: None,
3184                        idx: 1,
3185                        role: MessageRole::Agent,
3186                        author: None,
3187                        created_at: Some(1_700_000_100_200),
3188                        content: "before watermark assistant".to_string(),
3189                        extra_json: json!({}),
3190                        snippets: Vec::new(),
3191                    },
3192                ],
3193            ),
3194        )?;
3195
3196        let watermark: i64 = storage.raw().query_row_map(
3197            "SELECT MAX(id) FROM messages",
3198            &[] as &[ParamValue],
3199            |row| row.get_typed(0),
3200        )?;
3201
3202        storage.insert_conversation_tree(
3203            agent_id,
3204            Some(workspace_id),
3205            &test_conversation_with_messages(
3206                "selected-vs-since",
3207                vec![
3208                    Message {
3209                        id: None,
3210                        idx: 0,
3211                        role: MessageRole::User,
3212                        author: None,
3213                        created_at: Some(1_700_000_100_100),
3214                        content: "before watermark".to_string(),
3215                        extra_json: json!({}),
3216                        snippets: Vec::new(),
3217                    },
3218                    Message {
3219                        id: None,
3220                        idx: 1,
3221                        role: MessageRole::Agent,
3222                        author: None,
3223                        created_at: Some(1_700_000_100_200),
3224                        content: "before watermark assistant".to_string(),
3225                        extra_json: json!({}),
3226                        snippets: Vec::new(),
3227                    },
3228                    Message {
3229                        id: None,
3230                        idx: 2,
3231                        role: MessageRole::Tool,
3232                        author: None,
3233                        created_at: Some(1_700_000_100_300),
3234                        content: "after watermark tool".to_string(),
3235                        extra_json: json!({}),
3236                        snippets: Vec::new(),
3237                    },
3238                    Message {
3239                        id: None,
3240                        idx: 3,
3241                        role: MessageRole::System,
3242                        author: None,
3243                        created_at: Some(1_700_000_100_400),
3244                        content: String::new(),
3245                        extra_json: json!({}),
3246                        snippets: Vec::new(),
3247                    },
3248                ],
3249            ),
3250        )?;
3251        storage.insert_conversation_tree(
3252            agent_id,
3253            Some(workspace_id),
3254            &test_conversation_with_messages(
3255                "selected-vs-since-second",
3256                vec![Message {
3257                    id: None,
3258                    idx: 0,
3259                    role: MessageRole::Agent,
3260                    author: None,
3261                    created_at: Some(1_700_000_100_500),
3262                    content: "after watermark assistant".to_string(),
3263                    extra_json: json!({}),
3264                    snippets: Vec::new(),
3265                }],
3266            ),
3267        )?;
3268
3269        let since_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3270        let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
3271            "SELECT DISTINCT conversation_id
3272             FROM messages
3273             WHERE id > ?1
3274             ORDER BY conversation_id ASC",
3275            &[ParamValue::from(watermark)],
3276            |row| row.get_typed(0),
3277        )?;
3278        let selected_message_ids: HashSet<i64> = storage
3279            .raw()
3280            .query_map_collect(
3281                "SELECT id
3282                 FROM messages
3283                 WHERE id > ?1
3284                 ORDER BY id ASC",
3285                &[ParamValue::from(watermark)],
3286                |row| row.get_typed(0),
3287            )?
3288            .into_iter()
3289            .collect();
3290        let selected_inputs = packet_embedding_inputs_from_storage_for_message_ids(
3291            &storage,
3292            &conversation_ids,
3293            &selected_message_ids,
3294        )?;
3295
3296        assert_eq!(
3297            comparable_semantic_inputs(selected_inputs),
3298            comparable_semantic_inputs(since_batch.inputs)
3299        );
3300        Ok(())
3301    }
3302
3303    #[test]
3304    fn default_batch_size_uses_new_value() {
3305        // The test setup must not leak a caller-provided CASS_SEMANTIC_BATCH_SIZE
3306        // override, which would mask the constant bump we're asserting on.
3307        let _guard = EnvVarGuard::remove("CASS_SEMANTIC_BATCH_SIZE");
3308        let indexer = SemanticIndexer::new("hash", None).unwrap();
3309        assert_eq!(indexer.batch_size(), DEFAULT_SEMANTIC_BATCH_SIZE);
3310    }
3311
3312    #[test]
3313    fn parallel_prep_enabled_reuses_truthy_env_parser() {
3314        for (value, expected) in [
3315            ("1", true),
3316            ("true", true),
3317            (" YeS ", true),
3318            ("on", true),
3319            ("0", false),
3320            ("false", false),
3321            ("off", false),
3322        ] {
3323            let _guard = EnvVarGuard::set("CASS_SEMANTIC_PREP_PARALLEL", value);
3324            assert_eq!(parallel_prep_enabled(), expected, "env value {value:?}");
3325        }
3326
3327        let _guard = EnvVarGuard::remove("CASS_SEMANTIC_PREP_PARALLEL");
3328        assert!(!parallel_prep_enabled());
3329    }
3330
3331    #[test]
3332    fn saturating_u64_from_usize_covers_bounds() {
3333        assert_eq!(saturating_u64_from_usize(0), 0);
3334        assert_eq!(saturating_u64_from_usize(42), 42);
3335        assert_eq!(
3336            saturating_u64_from_usize(usize::MAX),
3337            u64::try_from(usize::MAX).unwrap_or(u64::MAX)
3338        );
3339    }
3340
3341    /// `coding_agent_session_search-ibuuh.32` (sink #3 equivalence gate):
3342    /// the packet-driven `semantic_inputs_from_packets` helper must
3343    /// produce the same `EmbeddingInput` list a fresh storage replay
3344    /// returns for the same canonical corpus. Once this passes, callers
3345    /// that already hold packets (rebuild pipeline, salvage replay,
3346    /// repair flows) can drive the semantic preparation consumer
3347    /// without a second canonical-row round-trip.
3348    #[test]
3349    fn semantic_inputs_from_packets_matches_storage_replay() -> Result<()> {
3350        let temp = tempdir().unwrap();
3351        let db_path = temp.path().join("agent_search.db");
3352        let storage = FrankenStorage::open(&db_path)?;
3353
3354        let agent_id_codex = storage.ensure_agent(&Agent {
3355            id: None,
3356            slug: "codex".to_string(),
3357            name: "Codex".to_string(),
3358            version: None,
3359            kind: AgentKind::Cli,
3360        })?;
3361        let agent_id_claude = storage.ensure_agent(&Agent {
3362            id: None,
3363            slug: "claude_code".to_string(),
3364            name: "Claude Code".to_string(),
3365            version: None,
3366            kind: AgentKind::Cli,
3367        })?;
3368        let workspace_id =
3369            storage.ensure_workspace(Path::new("/tmp/semantic-equivalence-ws"), None)?;
3370
3371        // Two conversations on different agents, mixed roles, including
3372        // an empty-content system message that the semantic projection
3373        // must filter (matches the legacy storage replay).
3374        storage.insert_conversation_tree(
3375            agent_id_codex,
3376            Some(workspace_id),
3377            &test_conversation_with_messages(
3378                "packet-equiv-1",
3379                vec![
3380                    Message {
3381                        id: None,
3382                        idx: 0,
3383                        role: MessageRole::User,
3384                        author: None,
3385                        created_at: Some(1_700_000_000_500),
3386                        content: "first user prompt".to_string(),
3387                        extra_json: json!({}),
3388                        snippets: Vec::new(),
3389                    },
3390                    Message {
3391                        id: None,
3392                        idx: 1,
3393                        role: MessageRole::Agent,
3394                        author: None,
3395                        created_at: Some(1_700_000_000_600),
3396                        content: "first assistant reply".to_string(),
3397                        extra_json: json!({}),
3398                        snippets: Vec::new(),
3399                    },
3400                    Message {
3401                        id: None,
3402                        idx: 2,
3403                        role: MessageRole::System,
3404                        author: None,
3405                        created_at: Some(1_700_000_000_700),
3406                        // Empty content is filtered by both paths.
3407                        content: String::new(),
3408                        extra_json: json!({}),
3409                        snippets: Vec::new(),
3410                    },
3411                ],
3412            ),
3413        )?;
3414        storage.insert_conversation_tree(
3415            agent_id_claude,
3416            Some(workspace_id),
3417            &test_conversation_with_messages(
3418                "packet-equiv-2",
3419                vec![
3420                    Message {
3421                        id: None,
3422                        idx: 0,
3423                        role: MessageRole::Tool,
3424                        author: Some("ripgrep".to_string()),
3425                        created_at: Some(1_700_000_001_500),
3426                        content: "tool output line".to_string(),
3427                        extra_json: json!({}),
3428                        snippets: Vec::new(),
3429                    },
3430                    Message {
3431                        id: None,
3432                        idx: 1,
3433                        role: MessageRole::Agent,
3434                        author: None,
3435                        created_at: Some(1_700_000_001_600),
3436                        content: "second assistant reply".to_string(),
3437                        extra_json: json!({}),
3438                        snippets: Vec::new(),
3439                    },
3440                ],
3441            ),
3442        )?;
3443
3444        // Legacy path: the storage-driven replay that the rebuild
3445        // pipeline currently uses.
3446        let storage_inputs = packet_embedding_inputs_from_storage(&storage)?;
3447
3448        // Packet-driven path: re-fetch the canonical envelopes (so we
3449        // get the storage-internal agent/workspace ids the rebuild path
3450        // would normally pair with packets), then convert those rows
3451        // into ConversationPackets via canonical replay and feed them
3452        // through `semantic_inputs_from_packets`.
3453        let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
3454            "SELECT DISTINCT m.conversation_id
3455             FROM messages m
3456             JOIN conversations c ON c.id = m.conversation_id
3457             ORDER BY m.conversation_id ASC",
3458            &[] as &[ParamValue],
3459            |row| row.get_typed(0),
3460        )?;
3461        let envelopes = fetch_canonical_embedding_conversations(&storage, &conversation_ids)?;
3462        let mut grouped_messages =
3463            storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
3464        let mut packets: Vec<ConversationPacket> = Vec::with_capacity(envelopes.len());
3465        let mut contexts: Vec<SemanticPacketContext> = Vec::with_capacity(envelopes.len());
3466        for envelope in &envelopes {
3467            let messages = grouped_messages
3468                .remove(&envelope.conversation_id)
3469                .unwrap_or_default();
3470            let provenance = canonical_embedding_packet_provenance(envelope);
3471            let canonical = canonical_embedding_conversation(envelope, &provenance, messages);
3472            packets.push(ConversationPacket::from_canonical_replay(
3473                &canonical, provenance,
3474            ));
3475            contexts.push(SemanticPacketContext {
3476                conversation_id: envelope.conversation_id,
3477                agent_id: saturating_u32_from_i64(envelope.agent_id),
3478                workspace_id: saturating_u32_from_i64(envelope.workspace_id.unwrap_or(0)),
3479            });
3480        }
3481        let packet_inputs = semantic_inputs_from_packets(&packets, &contexts)?;
3482
3483        // The two paths must produce the same EmbeddingInput list
3484        // (sortable comparison normalizes ordering across the two
3485        // helpers' iteration orders).
3486        assert!(
3487            !storage_inputs.is_empty(),
3488            "fixture should produce non-empty semantic inputs (sanity)"
3489        );
3490        assert_eq!(
3491            comparable_semantic_inputs(storage_inputs.clone()),
3492            comparable_semantic_inputs(packet_inputs.clone()),
3493            "packet-driven semantic preparation must match storage replay byte-for-byte"
3494        );
3495
3496        // Sanity-pin a couple of contract details so a regression in
3497        // either path (e.g. role normalization or empty-content
3498        // filtering) trips a clear assertion rather than a generic
3499        // length mismatch.
3500        let storage_count = storage_inputs.len();
3501        let packet_count = packet_inputs.len();
3502        assert_eq!(
3503            storage_count, packet_count,
3504            "storage and packet semantic input counts must agree exactly"
3505        );
3506        // Empty-content system message must NOT appear in the output.
3507        assert!(
3508            packet_inputs.iter().all(|input| !input.content.is_empty()),
3509            "empty content must be filtered by the packet semantic projection"
3510        );
3511        // The remote-host source_id pins the cross-path provenance hash.
3512        let normalized_source_id =
3513            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3514        let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3515        assert!(
3516            packet_inputs
3517                .iter()
3518                .all(|input| input.source_id == expected_hash),
3519            "every emitted EmbeddingInput must hash provenance via the packet's normalized source_id"
3520        );
3521
3522        Ok(())
3523    }
3524
3525    /// Length-mismatch defense: if a caller hands `semantic_inputs_from_packets`
3526    /// a packet/context slice pair of different lengths, the helper must
3527    /// return an error rather than silently mis-correlating ids. Pinning
3528    /// this is part of the bead's "shadow / compare mode plus explicit
3529    /// kill-switch" acceptance language.
3530    #[test]
3531    fn semantic_inputs_from_packets_rejects_length_mismatch() {
3532        let provenance = ConversationPacketProvenance::local();
3533        let canonical = test_conversation("packet-mismatch", "hello");
3534        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
3535        let result = semantic_inputs_from_packets(&[packet], &[]);
3536        assert!(
3537            result.is_err(),
3538            "expected error on packet/context length mismatch"
3539        );
3540        let err = result.unwrap_err().to_string();
3541        assert!(
3542            err.contains("length mismatch"),
3543            "error should mention length mismatch, got: {err}"
3544        );
3545    }
3546}