Skip to main content

coding_agent_search/indexer/
semantic.rs

1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::io::IsTerminal;
4use std::path::{Path, PathBuf};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8use frankensearch::index::{
9    HNSW_DEFAULT_EF_CONSTRUCTION as FS_HNSW_DEFAULT_EF_CONSTRUCTION,
10    HNSW_DEFAULT_M as FS_HNSW_DEFAULT_M, HnswConfig as FsHnswConfig, HnswIndex as FsHnswIndex,
11    Quantization as FsQuantization, VectorIndex as FsVectorIndex,
12    VectorIndexWriter as FsVectorIndexWriter,
13};
14use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
15use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
16use rayon::prelude::*;
17
18use crate::indexer::memoization::{
19    ContentAddressedMemoCache, MemoCacheAuditRecord, MemoContentHash, MemoKey, MemoLookup,
20};
21use crate::indexer::responsiveness;
22use crate::indexer::semantic_progress::{
23    SemanticProgressEvent, SemanticProgressFields, SemanticProgressSink,
24};
25use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
26use crate::model::types::{Conversation, Message};
27use crate::search::canonicalize::{canonicalize_for_embedding, content_hash};
28use crate::search::embedder::Embedder;
29use crate::search::fastembed_embedder::FastEmbedder;
30use crate::search::hash_embedder::HashEmbedder;
31use crate::search::policy::{CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy};
32use crate::search::semantic_manifest::{
33    ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
34    TierKind,
35};
36use crate::search::tantivy::{
37    normalized_index_origin_host, normalized_index_origin_kind, normalized_index_source_id,
38};
39use crate::search::vector_index::{
40    ROLE_USER, SemanticDocId, VECTOR_INDEX_DIR, role_code_from_str, vector_index_path,
41};
42use crate::storage::sqlite::FrankenStorage;
43
44/// Default embedder batch size. 128 is a sweet spot for ONNX MiniLM models on
45/// modern CPUs: big enough to amortize dispatch overhead and keep the tensor
46/// kernels saturated, small enough that one batch fits comfortably in L2 and
47/// memory reservation stays bounded for large corpora.
48const DEFAULT_SEMANTIC_BATCH_SIZE: usize = 128;
49const DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY: usize = 4_096;
50const SEMANTIC_PREP_MEMO_ALGORITHM: &str = "semantic_prepare_window";
51const SEMANTIC_PREP_MEMO_VERSION: &str = "canonicalize_for_embedding:v2:stable-content-hash";
52
53fn resolved_default_batch_size() -> usize {
54    dotenvy::var("CASS_SEMANTIC_BATCH_SIZE")
55        .ok()
56        .and_then(|v| v.parse::<usize>().ok())
57        .filter(|v| *v > 0)
58        .unwrap_or(DEFAULT_SEMANTIC_BATCH_SIZE)
59}
60
61fn resolved_semantic_prep_memo_capacity() -> usize {
62    dotenvy::var("CASS_SEMANTIC_PREP_MEMO_CAPACITY")
63        .ok()
64        .and_then(|v| v.parse::<usize>().ok())
65        .filter(|v| *v > 0)
66        .unwrap_or(DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY)
67}
68
69/// Opt in to the rayon-parallel canonicalize+hash prep step. **Default: OFF.**
70///
71/// The parallel path is kept because canonicalize+hash CAN dominate the
72/// embedding wall-clock on pathological inputs (very long messages, costly
73/// Unicode normalization). But criterion baselines captured under
74/// `tests/artifacts/perf/2026-04-21-profile-run/baselines.md` showed a
75/// 1.2×–2.3× **regression** on the hash embedder across every batch size
76/// tested (2 000 messages, mixed markdown/code/unicode): rayon's per-task
77/// scheduling overhead is larger than the per-message canonicalize+hash cost
78/// when the embedder itself is cheap. For the production ONNX (MiniLM)
79/// embedder, per-batch inference already dwarfs prep, so parallel prep never
80/// buys meaningful wall-clock — the prep step is ≤ 1% of total embed time.
81///
82/// Set `CASS_SEMANTIC_PREP_PARALLEL=1` / `true` / `yes` / `on` to opt in.
83fn parallel_prep_enabled() -> bool {
84    env_truthy("CASS_SEMANTIC_PREP_PARALLEL")
85}
86
87fn saturating_u64_from_usize(value: usize) -> u64 {
88    u64::try_from(value).unwrap_or(u64::MAX)
89}
90
91#[derive(Debug, Clone)]
92pub struct EmbeddingInput {
93    pub message_id: u64,
94    pub created_at_ms: i64,
95    pub agent_id: u32,
96    pub workspace_id: u32,
97    pub source_id: u32,
98    pub role: u8,
99    pub chunk_idx: u8,
100    pub content: String,
101}
102
103impl EmbeddingInput {
104    pub fn new(message_id: u64, content: impl Into<String>) -> Self {
105        Self {
106            message_id,
107            created_at_ms: 0,
108            agent_id: 0,
109            workspace_id: 0,
110            source_id: 0,
111            role: ROLE_USER,
112            chunk_idx: 0,
113            content: content.into(),
114        }
115    }
116}
117
118#[derive(Debug, Clone)]
119pub struct EmbeddedMessage {
120    pub message_id: u64,
121    pub created_at_ms: i64,
122    pub agent_id: u32,
123    pub workspace_id: u32,
124    pub source_id: u32,
125    pub role: u8,
126    pub chunk_idx: u8,
127    pub content_hash: [u8; 32],
128    pub embedding: Vec<f32>,
129}
130
131#[derive(Debug, Clone)]
132pub struct SemanticBackfillBatchPlan {
133    pub tier: TierKind,
134    pub db_fingerprint: String,
135    pub model_revision: String,
136    pub total_conversations: u64,
137    pub conversations_in_batch: u64,
138    pub last_offset: i64,
139}
140
141#[derive(Debug, Clone)]
142pub struct SemanticBackfillStoragePlan {
143    pub tier: TierKind,
144    pub db_fingerprint: String,
145    pub model_revision: String,
146    pub max_conversations: usize,
147}
148
149#[derive(Debug, Clone)]
150pub struct SemanticBackfillBatchOutcome {
151    pub tier: TierKind,
152    pub embedder_id: String,
153    pub embedded_docs: u64,
154    pub conversations_processed: u64,
155    pub total_conversations: u64,
156    pub last_offset: i64,
157    pub checkpoint_saved: bool,
158    pub published: bool,
159    pub index_path: PathBuf,
160    pub manifest_path: PathBuf,
161}
162
163#[derive(Debug, Clone)]
164pub struct SemanticShardBuildPlan {
165    pub tier: TierKind,
166    pub db_fingerprint: String,
167    pub model_revision: String,
168    pub total_conversations: u64,
169    pub max_records_per_shard: usize,
170    pub build_ann: bool,
171}
172
173#[derive(Debug, Clone)]
174pub struct SemanticShardBuildOutcome {
175    pub tier: TierKind,
176    pub embedder_id: String,
177    pub shard_count: u32,
178    pub doc_count: u64,
179    pub total_conversations: u64,
180    pub index_paths: Vec<PathBuf>,
181    pub ann_index_paths: Vec<PathBuf>,
182    pub shard_manifest_path: PathBuf,
183    pub complete: bool,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
187#[serde(rename_all = "snake_case")]
188pub(crate) enum SemanticBackfillSchedulerState {
189    Running,
190    Paused,
191    Disabled,
192}
193
194impl SemanticBackfillSchedulerState {
195    pub(crate) fn as_str(self) -> &'static str {
196        match self {
197            Self::Running => "running",
198            Self::Paused => "paused",
199            Self::Disabled => "disabled",
200        }
201    }
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
205#[serde(rename_all = "snake_case")]
206pub(crate) enum SemanticBackfillSchedulerReason {
207    IdleBudgetAvailable,
208    OperatorDisabled,
209    PolicyDisabled,
210    ForegroundPressure,
211    LexicalRepairActive,
212    CapacityBelowFloor,
213    ThreadBudgetZero,
214    BatchBudgetZero,
215}
216
217impl SemanticBackfillSchedulerReason {
218    pub(crate) fn next_step(self) -> &'static str {
219        match self {
220            Self::IdleBudgetAvailable => "background semantic backfill is within idle budgets",
221            Self::OperatorDisabled => {
222                "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE"
223            }
224            Self::PolicyDisabled => "semantic policy disables background semantic backfill",
225            Self::ForegroundPressure => {
226                "foreground pressure is present; retry after the idle delay"
227            }
228            Self::LexicalRepairActive => "lexical repair is active; semantic backfill is yielding",
229            Self::CapacityBelowFloor => {
230                "machine responsiveness capacity is below the semantic backfill floor"
231            }
232            Self::ThreadBudgetZero => "semantic backfill thread budget is zero",
233            Self::BatchBudgetZero => "semantic backfill batch budget is zero",
234        }
235    }
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub(crate) struct SemanticBackfillSchedulerSignals {
240    pub foreground_pressure: bool,
241    pub lexical_repair_active: bool,
242    pub force: bool,
243    pub operator_disabled: bool,
244}
245
246impl SemanticBackfillSchedulerSignals {
247    pub(crate) fn from_env() -> Self {
248        Self {
249            foreground_pressure: env_truthy("CASS_SEMANTIC_BACKFILL_FOREGROUND_ACTIVE"),
250            lexical_repair_active: env_truthy("CASS_SEMANTIC_BACKFILL_LEXICAL_REPAIR_ACTIVE"),
251            force: env_truthy("CASS_SEMANTIC_BACKFILL_FORCE"),
252            operator_disabled: env_truthy("CASS_SEMANTIC_BACKFILL_DISABLE"),
253        }
254    }
255}
256
257#[derive(Debug, Clone, serde::Serialize)]
258pub(crate) struct SemanticBackfillSchedulerDecision {
259    pub state: SemanticBackfillSchedulerState,
260    pub reason: SemanticBackfillSchedulerReason,
261    pub requested_batch_conversations: usize,
262    pub scheduled_batch_conversations: usize,
263    pub current_capacity_pct: u32,
264    pub min_capacity_pct: u32,
265    pub max_backfill_threads: usize,
266    pub idle_delay_seconds: u64,
267    pub chunk_timeout_seconds: u64,
268    pub foreground_pressure: bool,
269    pub lexical_repair_active: bool,
270    pub forced: bool,
271    pub next_eligible_after_ms: u64,
272}
273
274impl SemanticBackfillSchedulerDecision {
275    pub(crate) fn should_run(&self) -> bool {
276        matches!(self.state, SemanticBackfillSchedulerState::Running)
277    }
278}
279
280fn env_truthy(key: &str) -> bool {
281    dotenvy::var(key)
282        .ok()
283        .map(|value| {
284            matches!(
285                value.trim().to_ascii_lowercase().as_str(),
286                "1" | "true" | "yes" | "on"
287            )
288        })
289        .unwrap_or(false)
290}
291
292fn env_backfill_min_capacity_pct() -> u32 {
293    dotenvy::var("CASS_SEMANTIC_BACKFILL_MIN_CAPACITY_PCT")
294        .ok()
295        .and_then(|value| value.trim().parse::<u32>().ok())
296        .map(|value| value.clamp(1, 100))
297        .unwrap_or(75)
298}
299
300pub(crate) fn semantic_backfill_scheduler_decision(
301    policy: &SemanticPolicy,
302    requested_batch_conversations: usize,
303    signals: &SemanticBackfillSchedulerSignals,
304) -> SemanticBackfillSchedulerDecision {
305    semantic_backfill_scheduler_decision_for_capacity(
306        policy,
307        requested_batch_conversations,
308        signals,
309        responsiveness::current_capacity_pct(),
310    )
311}
312
313pub(crate) fn semantic_backfill_scheduler_decision_for_capacity(
314    policy: &SemanticPolicy,
315    requested_batch_conversations: usize,
316    signals: &SemanticBackfillSchedulerSignals,
317    current_capacity_pct: u32,
318) -> SemanticBackfillSchedulerDecision {
319    let min_capacity_pct = env_backfill_min_capacity_pct();
320    let paused_delay_ms = policy.idle_delay_seconds.saturating_mul(1000);
321    let mut decision = SemanticBackfillSchedulerDecision {
322        state: SemanticBackfillSchedulerState::Running,
323        reason: SemanticBackfillSchedulerReason::IdleBudgetAvailable,
324        requested_batch_conversations,
325        scheduled_batch_conversations: requested_batch_conversations,
326        current_capacity_pct: current_capacity_pct.clamp(0, 100),
327        min_capacity_pct,
328        max_backfill_threads: policy.max_backfill_threads,
329        idle_delay_seconds: policy.idle_delay_seconds,
330        chunk_timeout_seconds: policy.chunk_timeout_seconds,
331        foreground_pressure: signals.foreground_pressure,
332        lexical_repair_active: signals.lexical_repair_active,
333        forced: signals.force,
334        next_eligible_after_ms: 0,
335    };
336
337    if requested_batch_conversations == 0 {
338        return stopped_scheduler_decision(
339            decision,
340            SemanticBackfillSchedulerState::Disabled,
341            SemanticBackfillSchedulerReason::BatchBudgetZero,
342            paused_delay_ms,
343        );
344    }
345    if policy.max_backfill_threads == 0 && !signals.force {
346        return stopped_scheduler_decision(
347            decision,
348            SemanticBackfillSchedulerState::Disabled,
349            SemanticBackfillSchedulerReason::ThreadBudgetZero,
350            paused_delay_ms,
351        );
352    }
353    if signals.operator_disabled && !signals.force {
354        return stopped_scheduler_decision(
355            decision,
356            SemanticBackfillSchedulerState::Disabled,
357            SemanticBackfillSchedulerReason::OperatorDisabled,
358            paused_delay_ms,
359        );
360    }
361    if !policy.mode.should_build_semantic() && !signals.force {
362        return stopped_scheduler_decision(
363            decision,
364            SemanticBackfillSchedulerState::Disabled,
365            SemanticBackfillSchedulerReason::PolicyDisabled,
366            paused_delay_ms,
367        );
368    }
369    if signals.lexical_repair_active && !signals.force {
370        return stopped_scheduler_decision(
371            decision,
372            SemanticBackfillSchedulerState::Paused,
373            SemanticBackfillSchedulerReason::LexicalRepairActive,
374            paused_delay_ms,
375        );
376    }
377    if signals.foreground_pressure && !signals.force {
378        return stopped_scheduler_decision(
379            decision,
380            SemanticBackfillSchedulerState::Paused,
381            SemanticBackfillSchedulerReason::ForegroundPressure,
382            paused_delay_ms,
383        );
384    }
385    if current_capacity_pct < min_capacity_pct && !signals.force {
386        return stopped_scheduler_decision(
387            decision,
388            SemanticBackfillSchedulerState::Paused,
389            SemanticBackfillSchedulerReason::CapacityBelowFloor,
390            paused_delay_ms,
391        );
392    }
393
394    let capacity = current_capacity_pct.clamp(1, 100) as usize;
395    let scaled = requested_batch_conversations.saturating_mul(capacity) / 100;
396    decision.scheduled_batch_conversations = scaled.max(1).min(requested_batch_conversations);
397    decision
398}
399
400fn stopped_scheduler_decision(
401    mut decision: SemanticBackfillSchedulerDecision,
402    state: SemanticBackfillSchedulerState,
403    reason: SemanticBackfillSchedulerReason,
404    next_eligible_after_ms: u64,
405) -> SemanticBackfillSchedulerDecision {
406    decision.state = state;
407    decision.reason = reason;
408    decision.scheduled_batch_conversations = 0;
409    decision.next_eligible_after_ms = next_eligible_after_ms;
410    decision
411}
412
413fn now_ms() -> i64 {
414    SystemTime::now()
415        .duration_since(UNIX_EPOCH)
416        .map(|duration| duration.as_millis() as i64)
417        .unwrap_or(0)
418}
419
420fn hnsw_index_path(data_dir: &Path, embedder_id: &str) -> PathBuf {
421    data_dir
422        .join(VECTOR_INDEX_DIR)
423        .join(format!("hnsw-{embedder_id}.chsw"))
424}
425
426fn safe_path_component(raw: &str) -> String {
427    raw.chars()
428        .map(|ch| {
429            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
430                ch
431            } else {
432                '_'
433            }
434        })
435        .collect()
436}
437
438fn semantic_staging_index_path(
439    data_dir: &Path,
440    tier: TierKind,
441    embedder_id: &str,
442    db_fingerprint: &str,
443) -> PathBuf {
444    let fingerprint_hash = crc32fast::hash(db_fingerprint.as_bytes());
445    data_dir.join(VECTOR_INDEX_DIR).join(format!(
446        ".staging-{}-{}-{fingerprint_hash:08x}.fsvi",
447        tier.as_str(),
448        safe_path_component(embedder_id)
449    ))
450}
451
452fn semantic_generation_fingerprint_component(db_fingerprint: &str) -> String {
453    blake3::hash(db_fingerprint.as_bytes())
454        .to_hex()
455        .chars()
456        .take(16)
457        .collect()
458}
459
460fn semantic_shard_generation_dir(
461    data_dir: &Path,
462    tier: TierKind,
463    embedder_id: &str,
464    db_fingerprint: &str,
465) -> PathBuf {
466    let fingerprint_hash = semantic_generation_fingerprint_component(db_fingerprint);
467    data_dir.join(VECTOR_INDEX_DIR).join("shards").join(format!(
468        "{}-{}-{fingerprint_hash}",
469        tier.as_str(),
470        safe_path_component(embedder_id),
471    ))
472}
473
474fn semantic_shard_index_path(
475    data_dir: &Path,
476    tier: TierKind,
477    embedder_id: &str,
478    db_fingerprint: &str,
479    shard_index: u32,
480) -> PathBuf {
481    semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
482        .join(format!("shard-{shard_index:05}.fsvi"))
483}
484
485fn semantic_shard_ann_index_path(
486    data_dir: &Path,
487    tier: TierKind,
488    embedder_id: &str,
489    db_fingerprint: &str,
490    shard_index: u32,
491) -> PathBuf {
492    semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
493        .join(format!("shard-{shard_index:05}.chsw"))
494}
495
496#[cfg(not(windows))]
497fn sync_parent_directory(path: &Path) -> Result<()> {
498    let Some(parent) = path.parent() else {
499        return Ok(());
500    };
501    let directory = fs::File::open(parent)
502        .with_context(|| format!("opening parent directory {}", parent.display()))?;
503    directory
504        .sync_all()
505        .with_context(|| format!("syncing parent directory {}", parent.display()))
506}
507
508#[cfg(windows)]
509fn sync_parent_directory(_path: &Path) -> Result<()> {
510    Ok(())
511}
512
513fn semantic_doc_id_for_embedded(embedded: &EmbeddedMessage) -> String {
514    SemanticDocId {
515        message_id: embedded.message_id,
516        chunk_idx: embedded.chunk_idx,
517        agent_id: embedded.agent_id,
518        workspace_id: embedded.workspace_id,
519        source_id: embedded.source_id,
520        role: embedded.role,
521        created_at_ms: embedded.created_at_ms,
522        content_hash: Some(embedded.content_hash),
523    }
524    .to_doc_id_string()
525}
526
527struct CanonicalEmbeddingConversationRow {
528    conversation_id: i64,
529    agent_slug: String,
530    agent_id: i64,
531    workspace: Option<PathBuf>,
532    workspace_id: Option<i64>,
533    external_id: Option<String>,
534    title: Option<String>,
535    source_path: PathBuf,
536    started_at: Option<i64>,
537    ended_at: Option<i64>,
538    source_id: Option<String>,
539    origin_host: Option<String>,
540}
541
542struct CanonicalEmbeddingBatch {
543    inputs: Vec<EmbeddingInput>,
544    conversations_in_batch: u64,
545    last_conversation_id: i64,
546    total_conversations: u64,
547}
548
549pub(crate) struct CanonicalIncrementalEmbeddingBatch {
550    pub inputs: Vec<EmbeddingInput>,
551    pub conversations_in_batch: u64,
552    pub raw_max_message_id: Option<i64>,
553}
554
555fn total_semantic_conversations(storage: &FrankenStorage) -> Result<u64> {
556    let hinted_fallback_sql = "SELECT c.id
557             FROM conversations c
558             WHERE EXISTS (
559                 SELECT 1
560                 FROM messages INDEXED BY sqlite_autoindex_messages_1
561                 WHERE conversation_id = c.id
562                 LIMIT 1
563             )";
564    let fallback_sql = "SELECT c.id
565             FROM conversations c
566             WHERE EXISTS (
567                 SELECT 1
568                 FROM messages
569                 WHERE conversation_id = c.id
570                 LIMIT 1
571             )";
572    let count: i64 = storage
573        .raw()
574        .query_row_map(
575            "SELECT COUNT(*) FROM conversations WHERE message_count > 0",
576            &[] as &[ParamValue],
577            |row| row.get_typed(0),
578        )
579        .or_else(|err| {
580            if !err.to_string().contains("no such column: message_count") {
581                return Err(err);
582            }
583            let conversation_ids: Vec<i64> = storage
584                .raw()
585                .query_map_collect(hinted_fallback_sql, &[] as &[ParamValue], |row| {
586                    row.get_typed(0)
587                })
588                .or_else(|err| {
589                    if err
590                        .to_string()
591                        .contains("no such index: sqlite_autoindex_messages_1")
592                    {
593                        return storage.raw().query_map_collect(
594                            fallback_sql,
595                            &[] as &[ParamValue],
596                            |row| row.get_typed(0),
597                        );
598                    }
599                    Err(err)
600                })?;
601            Ok(i64::try_from(conversation_ids.len()).unwrap_or(i64::MAX))
602        })
603        .with_context(|| "counting canonical conversations with semantic messages")?;
604    Ok(u64::try_from(count.max(0)).unwrap_or(u64::MAX))
605}
606
607pub(crate) fn message_id_from_db(raw: i64) -> Option<u64> {
608    u64::try_from(raw).ok()
609}
610
611pub(crate) fn saturating_u32_from_i64(raw: i64) -> u32 {
612    match u32::try_from(raw) {
613        Ok(value) => value,
614        Err(_) if raw.is_negative() => 0,
615        Err(_) => u32::MAX,
616    }
617}
618
619fn canonical_embedding_created_at_ms(message_id: u64, created_at: Option<i64>) -> i64 {
620    // `created_at_ms` feeds time-range filters in the vector index
621    // (src/search/vector_index.rs range predicates) and contributes to
622    // `stable_hit_hash`. Defaulting a NULL created_at to 0 silently
623    // masquerades the message as Unix-epoch (1970), which is indistinguishable
624    // from a legitimately-ancient row in downstream filters. Emit a warn
625    // so operators see NULL-created_at rows in the logs instead of only
626    // finding them by puzzling over 1970 timestamps in semantic hits.
627    created_at.unwrap_or_else(|| {
628        tracing::warn!(
629            message_id,
630            "semantic backfill: row has NULL created_at; defaulting to 0 (Unix epoch). \
631             Downstream time-range filters will treat this message as 1970-01-01."
632        );
633        0
634    })
635}
636
637fn canonical_embedding_packet_provenance(
638    row: &CanonicalEmbeddingConversationRow,
639) -> ConversationPacketProvenance {
640    let source_id =
641        normalized_index_source_id(row.source_id.as_deref(), None, row.origin_host.as_deref());
642    ConversationPacketProvenance {
643        origin_kind: normalized_index_origin_kind(&source_id, None),
644        origin_host: normalized_index_origin_host(row.origin_host.as_deref()),
645        source_id,
646    }
647}
648
649fn canonical_embedding_conversation(
650    row: &CanonicalEmbeddingConversationRow,
651    provenance: &ConversationPacketProvenance,
652    messages: Vec<Message>,
653) -> Conversation {
654    Conversation {
655        id: Some(row.conversation_id),
656        agent_slug: row.agent_slug.clone(),
657        workspace: row.workspace.clone(),
658        external_id: row.external_id.clone(),
659        title: row.title.clone(),
660        source_path: row.source_path.clone(),
661        started_at: row.started_at,
662        ended_at: row.ended_at,
663        approx_tokens: None,
664        metadata_json: serde_json::Value::Null,
665        messages,
666        source_id: provenance.source_id.clone(),
667        origin_host: provenance.origin_host.clone(),
668    }
669}
670
671fn embedding_input_from_packet_message(
672    conversation_id: i64,
673    agent_id: u32,
674    workspace_id: u32,
675    source_id_hash: u32,
676    message: &crate::model::conversation_packet::ConversationPacketMessage,
677) -> Option<EmbeddingInput> {
678    let Some(raw_message_id) = message.message_id else {
679        tracing::warn!(
680            conversation_id,
681            message_idx = message.idx,
682            "skipping semantic backfill message without canonical id in ConversationPacket replay"
683        );
684        return None;
685    };
686    let Some(message_id) = message_id_from_db(raw_message_id) else {
687        tracing::warn!(
688            conversation_id,
689            raw_message_id,
690            "skipping out-of-range id during semantic backfill"
691        );
692        return None;
693    };
694    Some(EmbeddingInput {
695        message_id,
696        created_at_ms: canonical_embedding_created_at_ms(message_id, message.created_at),
697        agent_id,
698        workspace_id,
699        source_id: source_id_hash,
700        role: role_code_from_str(&message.role).unwrap_or(ROLE_USER),
701        chunk_idx: 0,
702        content: message.content.clone(),
703    })
704}
705
706fn embedding_inputs_from_conversation_packet(
707    row: &CanonicalEmbeddingConversationRow,
708    packet: &ConversationPacket,
709) -> Vec<EmbeddingInput> {
710    let agent_id = saturating_u32_from_i64(row.agent_id);
711    let workspace_id = saturating_u32_from_i64(row.workspace_id.unwrap_or(0));
712    let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
713    packet
714        .projections
715        .semantic
716        .message_indices
717        .iter()
718        .filter_map(|message_index| {
719            packet
720                .payload
721                .messages
722                .get(*message_index)
723                .and_then(|message| {
724                    embedding_input_from_packet_message(
725                        row.conversation_id,
726                        agent_id,
727                        workspace_id,
728                        source_id_hash,
729                        message,
730                    )
731                })
732        })
733        .collect()
734}
735
736fn fetch_canonical_embedding_conversations(
737    storage: &FrankenStorage,
738    conversation_ids: &[i64],
739) -> Result<Vec<CanonicalEmbeddingConversationRow>> {
740    let mut envelope_sql = String::from(
741        "SELECT c.id,
742                COALESCE(a.slug, 'unknown'),
743                COALESCE(c.agent_id, 0),
744                c.workspace_id,
745                w.path,
746                c.external_id,
747                c.title,
748                c.source_path,
749                c.started_at,
750                c.ended_at,
751                c.source_id,
752                c.origin_host
753         FROM conversations c
754         LEFT JOIN agents a ON a.id = c.agent_id
755         LEFT JOIN workspaces w ON w.id = c.workspace_id
756         WHERE c.id IN (",
757    );
758    let mut params = Vec::with_capacity(conversation_ids.len());
759    for (idx, conversation_id) in conversation_ids.iter().enumerate() {
760        if idx > 0 {
761            envelope_sql.push_str(", ");
762        }
763        envelope_sql.push_str(&format!("?{}", idx + 1));
764        params.push(ParamValue::from(*conversation_id));
765    }
766    envelope_sql.push_str(") ORDER BY c.id ASC");
767
768    storage
769        .raw()
770        .query_map_collect(&envelope_sql, &params, |row| {
771            let workspace_path: Option<String> = row.get_typed(4)?;
772            Ok(CanonicalEmbeddingConversationRow {
773                conversation_id: row.get_typed(0)?,
774                agent_slug: row.get_typed(1)?,
775                agent_id: row.get_typed(2)?,
776                workspace_id: row.get_typed(3)?,
777                workspace: workspace_path.map(PathBuf::from),
778                external_id: row.get_typed(5)?,
779                title: row.get_typed(6)?,
780                source_path: PathBuf::from(row.get_typed::<String>(7)?),
781                started_at: row.get_typed(8)?,
782                ended_at: row.get_typed(9)?,
783                source_id: row.get_typed(10)?,
784                origin_host: row.get_typed(11)?,
785            })
786        })
787        .with_context(|| {
788            format!(
789                "fetching semantic backfill conversation envelopes for {} conversations",
790                conversation_ids.len()
791            )
792        })
793}
794
795/// Per-packet semantic context that supplies the database-internal
796/// agent / workspace ids the canonical embedding row carries but the
797/// `ConversationPacket` does not (those ids are storage-internal,
798/// not part of the packet contract).
799///
800/// `coding_agent_session_search-ibuuh.32` (sink #3): when a caller
801/// already holds packets (rebuild pipeline, salvage replay, repair
802/// flows, etc.) it can pair them with their canonical
803/// agent_id/workspace_id and drive the semantic preparation consumer
804/// without a second storage round-trip.
805#[allow(dead_code)]
806#[derive(Debug, Clone, Copy)]
807pub(crate) struct SemanticPacketContext {
808    pub conversation_id: i64,
809    pub agent_id: u32,
810    pub workspace_id: u32,
811}
812
813/// Packet-driven counterpart to
814/// [`packet_embedding_inputs_from_storage`]: derives the same
815/// `EmbeddingInput` list a fresh storage replay would produce, but
816/// without re-querying canonical conversation rows.
817///
818/// Invariants:
819/// - The `i`th element of `contexts` describes the `i`th packet.
820/// - Returns `Err` if the lengths disagree, so a callsite cannot
821///   silently mis-correlate packets and contexts.
822/// - `source_id_hash` is derived from `packet.payload.provenance.source_id`
823///   the same way `embedding_inputs_from_conversation_packet` derives
824///   it from the canonical row, so the produced `EmbeddingInput.source_id`
825///   matches both paths byte-for-byte.
826///
827/// The `semantic_inputs_from_packets_matches_storage_replay`
828/// equivalence test pins every produced `EmbeddingInput` field is
829/// identical to what the legacy storage-side replay returns for the
830/// same canonical corpus, so callers that already hold packets can
831/// switch to this helper without changing semantic-index output.
832#[allow(dead_code)]
833pub(crate) fn semantic_inputs_from_packets(
834    packets: &[ConversationPacket],
835    contexts: &[SemanticPacketContext],
836) -> Result<Vec<EmbeddingInput>> {
837    if packets.len() != contexts.len() {
838        anyhow::bail!(
839            "semantic_inputs_from_packets length mismatch: {} packets vs {} contexts",
840            packets.len(),
841            contexts.len()
842        );
843    }
844    let mut inputs = Vec::new();
845    for (packet, context) in packets.iter().zip(contexts.iter()) {
846        let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
847        for &message_index in &packet.projections.semantic.message_indices {
848            let Some(message) = packet.payload.messages.get(message_index) else {
849                anyhow::bail!(
850                    "packet semantic projection references missing message index {} \
851                     (packet has {} messages)",
852                    message_index,
853                    packet.payload.messages.len()
854                );
855            };
856            if let Some(input) = embedding_input_from_packet_message(
857                context.conversation_id,
858                context.agent_id,
859                context.workspace_id,
860                source_id_hash,
861                message,
862            ) {
863                inputs.push(input);
864            }
865        }
866    }
867    tracing::debug!(
868        packets = packets.len(),
869        packet_driven = true,
870        semantic_inputs = inputs.len(),
871        "built semantic inputs from in-memory ConversationPacket batch"
872    );
873    Ok(inputs)
874}
875
876fn fetch_canonical_embedding_batch(
877    storage: &FrankenStorage,
878    after_conversation_id: i64,
879    max_conversations: usize,
880) -> Result<CanonicalEmbeddingBatch> {
881    fetch_canonical_embedding_batch_inner(storage, after_conversation_id, max_conversations, None)
882}
883
884/// Variant of [`fetch_canonical_embedding_batch`] that additionally
885/// filters out canonical messages with `message_id <= after_message_id`
886/// when set. This is how sub-fix 2 (`last_message_id` cursor) enforces
887/// the "resume MUST advance past `last_message_id`" rule on a partially
888/// embedded conversation.
889fn fetch_canonical_embedding_batch_inner(
890    storage: &FrankenStorage,
891    after_conversation_id: i64,
892    max_conversations: usize,
893    after_message_id: Option<i64>,
894) -> Result<CanonicalEmbeddingBatch> {
895    let total_conversations = total_semantic_conversations(storage)?;
896    let max_conversations_i64 = i64::try_from(max_conversations.max(1)).unwrap_or(i64::MAX);
897    let mut params = vec![
898        ParamValue::from(after_conversation_id),
899        ParamValue::from(max_conversations_i64),
900    ];
901    let message_cursor_predicate = if let Some(after_message_id) = after_message_id {
902        params.push(ParamValue::from(after_message_id));
903        " AND id > ?3"
904    } else {
905        ""
906    };
907    let hinted_sql = format!(
908        "SELECT c.id
909         FROM conversations c
910         WHERE c.id > ?1
911           AND EXISTS (
912               SELECT 1
913               FROM messages INDEXED BY sqlite_autoindex_messages_1
914               WHERE conversation_id = c.id{message_cursor_predicate}
915               LIMIT 1
916           )
917         ORDER BY c.id ASC
918         LIMIT ?2"
919    );
920    let fallback_sql = format!(
921        "SELECT c.id
922         FROM conversations c
923         WHERE c.id > ?1
924           AND EXISTS (
925               SELECT 1
926               FROM messages
927               WHERE conversation_id = c.id{message_cursor_predicate}
928               LIMIT 1
929           )
930         ORDER BY c.id ASC
931         LIMIT ?2"
932    );
933    let conversation_ids: Vec<i64> = storage
934        .raw()
935        .query_map_collect(&hinted_sql, &params, |row| row.get_typed(0))
936        .or_else(|err| {
937            if err
938                .to_string()
939                .contains("no such index: sqlite_autoindex_messages_1")
940            {
941                return storage
942                    .raw()
943                    .query_map_collect(&fallback_sql, &params, |row| row.get_typed(0));
944            }
945            Err(err)
946        })
947        .with_context(|| {
948            format!("fetching semantic backfill conversation ids after {after_conversation_id}")
949        })?;
950
951    if conversation_ids.is_empty() {
952        return Ok(CanonicalEmbeddingBatch {
953            inputs: Vec::new(),
954            conversations_in_batch: 0,
955            last_conversation_id: after_conversation_id,
956            total_conversations,
957        });
958    }
959
960    let conversations = fetch_canonical_embedding_conversations(storage, &conversation_ids)?;
961
962    let mut grouped_messages =
963        storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
964    let (inputs, _) = packet_embedding_inputs_from_materialized_canonical_messages(
965        &conversations,
966        &mut grouped_messages,
967        |message| match after_message_id {
968            Some(min_exclusive) => message.id.is_some_and(|id| id > min_exclusive),
969            None => true,
970        },
971    );
972
973    let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
974    tracing::debug!(
975        conversations_in_batch,
976        packet_driven = true,
977        semantic_inputs = inputs.len(),
978        ?after_message_id,
979        "built semantic backfill batch from ConversationPacket canonical replay"
980    );
981
982    Ok(CanonicalEmbeddingBatch {
983        inputs,
984        conversations_in_batch,
985        last_conversation_id: *conversation_ids.last().unwrap_or(&after_conversation_id),
986        total_conversations,
987    })
988}
989
990pub(crate) fn packet_embedding_inputs_from_storage(
991    storage: &FrankenStorage,
992) -> Result<Vec<EmbeddingInput>> {
993    Ok(fetch_canonical_embedding_batch(storage, 0, usize::MAX)?.inputs)
994}
995
996fn packet_embedding_inputs_from_selected_canonical_messages<F>(
997    storage: &FrankenStorage,
998    conversation_ids: &[i64],
999    include_message: F,
1000) -> Result<(Vec<EmbeddingInput>, Option<i64>)>
1001where
1002    F: FnMut(&Message) -> bool,
1003{
1004    if conversation_ids.is_empty() {
1005        return Ok((Vec::new(), None));
1006    }
1007
1008    let conversations = fetch_canonical_embedding_conversations(storage, conversation_ids)?;
1009    let mut grouped_messages =
1010        storage.fetch_messages_for_lexical_rebuild_batch(conversation_ids, None, None)?;
1011    Ok(
1012        packet_embedding_inputs_from_materialized_canonical_messages(
1013            &conversations,
1014            &mut grouped_messages,
1015            include_message,
1016        ),
1017    )
1018}
1019
1020fn packet_embedding_inputs_from_materialized_canonical_messages<F>(
1021    conversations: &[CanonicalEmbeddingConversationRow],
1022    grouped_messages: &mut HashMap<i64, Vec<Message>>,
1023    mut include_message: F,
1024) -> (Vec<EmbeddingInput>, Option<i64>)
1025where
1026    F: FnMut(&Message) -> bool,
1027{
1028    let mut inputs = Vec::new();
1029    let mut raw_max_message_id: Option<i64> = None;
1030
1031    for conversation in conversations {
1032        let mut messages = grouped_messages
1033            .remove(&conversation.conversation_id)
1034            .unwrap_or_default();
1035        messages.retain(|message| {
1036            let keep = include_message(message);
1037            if keep && let Some(message_id) = message.id {
1038                raw_max_message_id =
1039                    Some(raw_max_message_id.map_or(message_id, |current| current.max(message_id)));
1040            }
1041            keep
1042        });
1043        if messages.is_empty() {
1044            continue;
1045        }
1046
1047        let provenance = canonical_embedding_packet_provenance(conversation);
1048        let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
1049        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
1050        inputs.extend(embedding_inputs_from_conversation_packet(
1051            conversation,
1052            &packet,
1053        ));
1054    }
1055
1056    (inputs, raw_max_message_id)
1057}
1058
1059pub(crate) fn packet_embedding_inputs_from_storage_since(
1060    storage: &FrankenStorage,
1061    since_message_id: i64,
1062) -> Result<CanonicalIncrementalEmbeddingBatch> {
1063    let conversation_ids: Vec<i64> = storage
1064        .raw()
1065        .query_map_collect(
1066            "SELECT DISTINCT m.conversation_id
1067             FROM messages m
1068             WHERE m.id > ?1
1069             ORDER BY m.conversation_id ASC",
1070            &[ParamValue::from(since_message_id)],
1071            |row| row.get_typed(0),
1072        )
1073        .with_context(|| {
1074            format!(
1075                "fetching canonical semantic catch-up conversation ids after message {since_message_id}"
1076            )
1077        })?;
1078
1079    if conversation_ids.is_empty() {
1080        return Ok(CanonicalIncrementalEmbeddingBatch {
1081            inputs: Vec::new(),
1082            conversations_in_batch: 0,
1083            raw_max_message_id: None,
1084        });
1085    }
1086
1087    let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1088        storage,
1089        &conversation_ids,
1090        |message| message.id.is_some_and(|id| id > since_message_id),
1091    )?;
1092
1093    let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
1094    tracing::debug!(
1095        since_message_id,
1096        conversations_in_batch,
1097        packet_driven = true,
1098        semantic_inputs = inputs.len(),
1099        "built semantic catch-up batch from ConversationPacket canonical replay"
1100    );
1101
1102    Ok(CanonicalIncrementalEmbeddingBatch {
1103        inputs,
1104        conversations_in_batch,
1105        raw_max_message_id,
1106    })
1107}
1108
1109pub(crate) fn packet_embedding_inputs_from_storage_for_message_ids(
1110    storage: &FrankenStorage,
1111    conversation_ids: &[i64],
1112    message_ids: &HashSet<i64>,
1113) -> Result<Vec<EmbeddingInput>> {
1114    if conversation_ids.is_empty() || message_ids.is_empty() {
1115        return Ok(Vec::new());
1116    }
1117
1118    let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1119        storage,
1120        conversation_ids,
1121        |message| message.id.is_some_and(|id| message_ids.contains(&id)),
1122    )?;
1123    tracing::debug!(
1124        conversations_in_batch = conversation_ids.len(),
1125        selected_message_ids = message_ids.len(),
1126        semantic_inputs = inputs.len(),
1127        raw_max_message_id,
1128        packet_driven = true,
1129        "built selected semantic batch from ConversationPacket canonical replay"
1130    );
1131
1132    Ok(inputs)
1133}
1134
1135struct Prepared<'a> {
1136    msg: &'a EmbeddingInput,
1137    canonical: String,
1138    hash: [u8; 32],
1139}
1140
1141#[derive(Debug, Clone, PartialEq, Eq)]
1142struct MemoizedPreparedMessage {
1143    canonical: String,
1144    hash: [u8; 32],
1145}
1146
1147fn semantic_prep_memo_key(content: &str) -> MemoKey {
1148    MemoKey::new(
1149        MemoContentHash::from_bytes(content_hash(content).to_vec()),
1150        SEMANTIC_PREP_MEMO_ALGORITHM,
1151        SEMANTIC_PREP_MEMO_VERSION,
1152    )
1153}
1154
1155fn memo_counter_delta(after: u64, before: u64) -> u64 {
1156    after.saturating_sub(before)
1157}
1158
1159fn trace_semantic_prep_memo_window(
1160    window_index: usize,
1161    window_len: usize,
1162    prepared_len: usize,
1163    entry_capacity: usize,
1164    before: &crate::indexer::memoization::MemoCacheStats,
1165    after: &crate::indexer::memoization::MemoCacheStats,
1166) {
1167    tracing::trace!(
1168        algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1169        algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1170        window_index,
1171        window_len,
1172        prepared_messages = prepared_len,
1173        skipped_messages = window_len.saturating_sub(prepared_len),
1174        hit_delta = memo_counter_delta(after.hits, before.hits),
1175        miss_delta = memo_counter_delta(after.misses, before.misses),
1176        insert_delta = memo_counter_delta(after.inserts, before.inserts),
1177        evictions_capacity_delta =
1178            memo_counter_delta(after.evictions_capacity, before.evictions_capacity),
1179        quarantined_delta = memo_counter_delta(after.quarantined, before.quarantined),
1180        live_entries = after.live_entries,
1181        entry_capacity,
1182        "semantic prep memo cache window"
1183    );
1184}
1185
1186fn trace_semantic_prep_memo_audit(audit: &MemoCacheAuditRecord) {
1187    tracing::trace!(?audit, "semantic prep memo cache audit");
1188}
1189
1190fn prepare_window_with_memo<'a>(
1191    window: &'a [EmbeddingInput],
1192    cache: &mut ContentAddressedMemoCache<MemoizedPreparedMessage>,
1193) -> Vec<Prepared<'a>> {
1194    window
1195        .iter()
1196        .filter_map(|msg| {
1197            let key = semantic_prep_memo_key(&msg.content);
1198            let (lookup, lookup_audit) = cache.get_with_audit(&key);
1199            trace_semantic_prep_memo_audit(&lookup_audit);
1200            match lookup {
1201                MemoLookup::Hit { value } => Some(Prepared {
1202                    msg,
1203                    canonical: value.canonical,
1204                    hash: value.hash,
1205                }),
1206                MemoLookup::Miss | MemoLookup::Quarantined { .. } => {
1207                    let canonical = canonicalize_for_embedding(&msg.content);
1208                    if canonical.is_empty() {
1209                        return None;
1210                    }
1211                    let hash = content_hash(&canonical);
1212                    let insert_audit = cache.insert_with_audit(
1213                        key,
1214                        MemoizedPreparedMessage {
1215                            canonical: canonical.clone(),
1216                            hash,
1217                        },
1218                    );
1219                    trace_semantic_prep_memo_audit(&insert_audit);
1220                    Some(Prepared {
1221                        msg,
1222                        canonical,
1223                        hash,
1224                    })
1225                }
1226            }
1227        })
1228        .collect()
1229}
1230
1231/// Canonicalize + hash a window of messages. Default is serial; opt in to
1232/// the rayon-parallel path via `CASS_SEMANTIC_PREP_PARALLEL=1` (see the
1233/// `parallel_prep_enabled` docstring for why it is not the default).
1234/// Parallel results preserve input order via `par_iter().filter_map().collect()`.
1235/// Messages whose canonical form is empty are filtered out so the embedder
1236/// batch is never polluted with useless inputs.
1237fn prepare_window<'a>(window: &'a [EmbeddingInput], serial: bool) -> Vec<Prepared<'a>> {
1238    let prep = |msg: &'a EmbeddingInput| -> Option<Prepared<'a>> {
1239        let canonical = canonicalize_for_embedding(&msg.content);
1240        if canonical.is_empty() {
1241            return None;
1242        }
1243        let hash = content_hash(&canonical);
1244        Some(Prepared {
1245            msg,
1246            canonical,
1247            hash,
1248        })
1249    };
1250
1251    if serial {
1252        window.iter().filter_map(prep).collect()
1253    } else {
1254        window.par_iter().filter_map(prep).collect()
1255    }
1256}
1257
1258fn flush_prepared_batch(
1259    batch: &[Prepared<'_>],
1260    embeddings: &mut Vec<EmbeddedMessage>,
1261    pb: &ProgressBar,
1262    embedder: &dyn Embedder,
1263) -> Result<()> {
1264    if batch.is_empty() {
1265        return Ok(());
1266    }
1267
1268    let texts: Vec<&str> = batch.iter().map(|p| p.canonical.as_str()).collect();
1269    let vectors = embedder
1270        .embed_batch_sync(&texts)
1271        .map_err(|e| anyhow::anyhow!("embedding failed: {e}"))?;
1272
1273    if vectors.len() != batch.len() {
1274        bail!(
1275            "embedder returned {} embeddings for {} inputs",
1276            vectors.len(),
1277            batch.len()
1278        );
1279    }
1280
1281    for (prepared, vector) in batch.iter().zip(vectors) {
1282        if vector.len() != embedder.dimension() {
1283            bail!(
1284                "embedding dimension mismatch: expected {}, got {}",
1285                embedder.dimension(),
1286                vector.len()
1287            );
1288        }
1289        embeddings.push(EmbeddedMessage {
1290            message_id: prepared.msg.message_id,
1291            created_at_ms: prepared.msg.created_at_ms,
1292            agent_id: prepared.msg.agent_id,
1293            workspace_id: prepared.msg.workspace_id,
1294            source_id: prepared.msg.source_id,
1295            role: prepared.msg.role,
1296            chunk_idx: prepared.msg.chunk_idx,
1297            content_hash: prepared.hash,
1298            embedding: vector,
1299        });
1300    }
1301
1302    pb.inc(saturating_u64_from_usize(batch.len()));
1303    Ok(())
1304}
1305
1306pub struct SemanticIndexer {
1307    embedder: Box<dyn Embedder>,
1308    batch_size: usize,
1309}
1310
1311impl SemanticIndexer {
1312    pub fn new(embedder_type: &str, data_dir: Option<&Path>) -> Result<Self> {
1313        let embedder: Box<dyn Embedder> = match embedder_type {
1314            "fastembed" | "minilm" | "snowflake-arctic-s" | "nomic-embed" => {
1315                let dir = data_dir
1316                    .ok_or_else(|| anyhow::anyhow!("data_dir required for fastembed embedder"))?;
1317                let embedder_name = if embedder_type == "fastembed" {
1318                    "minilm"
1319                } else {
1320                    embedder_type
1321                };
1322                Box::new(
1323                    FastEmbedder::load_by_name(dir, embedder_name)
1324                        .map_err(|e| anyhow::anyhow!("fastembed unavailable: {e}"))?,
1325                )
1326            }
1327            "hash" => Box::new(HashEmbedder::default()),
1328            other => bail!("unknown embedder: {other}"),
1329        };
1330
1331        Ok(Self {
1332            embedder,
1333            batch_size: resolved_default_batch_size(),
1334        })
1335    }
1336
1337    pub fn with_batch_size(mut self, batch_size: usize) -> Result<Self> {
1338        if batch_size == 0 {
1339            bail!("batch_size must be > 0");
1340        }
1341        self.batch_size = batch_size;
1342        Ok(self)
1343    }
1344
1345    pub fn batch_size(&self) -> usize {
1346        self.batch_size
1347    }
1348
1349    pub fn embedder_id(&self) -> &str {
1350        self.embedder.id()
1351    }
1352
1353    pub fn embedder_dimension(&self) -> usize {
1354        self.embedder.dimension()
1355    }
1356
1357    pub fn embed_messages(&self, messages: &[EmbeddingInput]) -> Result<Vec<EmbeddedMessage>> {
1358        self.embed_messages_with_sink(messages, &SemanticProgressSink::disabled())
1359    }
1360
1361    /// Variant of [`embed_messages`] that emits `embed_batch_*` events
1362    /// into the given JSONL sink. The sink is silent unless
1363    /// `CASS_SEMANTIC_PROGRESS_JSONL` is set, so this path is safe to
1364    /// take in production.
1365    pub fn embed_messages_with_sink(
1366        &self,
1367        messages: &[EmbeddingInput],
1368        sink: &SemanticProgressSink,
1369    ) -> Result<Vec<EmbeddedMessage>> {
1370        if messages.is_empty() {
1371            return Ok(Vec::new());
1372        }
1373
1374        let show_progress = std::io::stderr().is_terminal();
1375        let pb = ProgressBar::new(saturating_u64_from_usize(messages.len()));
1376        if show_progress {
1377            let style = ProgressStyle::default_bar()
1378                .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} messages embedded")
1379                .unwrap_or_else(|_| ProgressStyle::default_bar());
1380            pb.set_style(style);
1381        } else {
1382            pb.set_draw_target(ProgressDrawTarget::hidden());
1383        }
1384
1385        let mut embeddings = Vec::with_capacity(messages.len());
1386
1387        // Process the corpus in windows of ~4 batches. Within each window,
1388        // rayon parallelizes the canonicalize + hash prep across cores; the
1389        // ONNX embedder is then fed serially in `batch_size` chunks so its
1390        // internal thread pool stays saturated without being starved by the
1391        // single-threaded prep loop we had before. `with_batch_size` and
1392        // `resolved_default_batch_size` both guarantee `batch_size >= 1`,
1393        // so saturating_mul(4) is always >= batch_size — no further clamp.
1394        let window = self.batch_size.saturating_mul(4);
1395        let serial_prep = !parallel_prep_enabled();
1396        let prep_memo_capacity = resolved_semantic_prep_memo_capacity();
1397        let mut prep_memo =
1398            serial_prep.then(|| ContentAddressedMemoCache::with_capacity(prep_memo_capacity));
1399        let mut batch_index: u64 = 0;
1400        let mut rows_processed: u64 = 0;
1401        let rows_total = u64::try_from(messages.len()).ok();
1402        for (window_index, window_slice) in messages.chunks(window).enumerate() {
1403            let prepared_window = match prep_memo.as_mut() {
1404                Some(cache) => {
1405                    let stats_before = cache.stats().clone();
1406                    let prepared_window = prepare_window_with_memo(window_slice, cache);
1407                    trace_semantic_prep_memo_window(
1408                        window_index,
1409                        window_slice.len(),
1410                        prepared_window.len(),
1411                        prep_memo_capacity,
1412                        &stats_before,
1413                        cache.stats(),
1414                    );
1415                    prepared_window
1416                }
1417                None => prepare_window(window_slice, false),
1418            };
1419            let skipped_in_window = window_slice.len() - prepared_window.len();
1420            if skipped_in_window > 0 {
1421                pb.inc(saturating_u64_from_usize(skipped_in_window));
1422            }
1423
1424            for batch in prepared_window.chunks(self.batch_size) {
1425                let batch_rows = u64::try_from(batch.len()).unwrap_or(u64::MAX);
1426                // Sum the canonicalized byte count so an operator can
1427                // distinguish a stalled inference from a stalled query —
1428                // a tiny `bytes` value paired with a long batch wall-time
1429                // points at the model; a huge `bytes` paired with a short
1430                // wall-time points at the storage side.
1431                let batch_bytes: u64 = batch.iter().map(|p| p.canonical.len() as u64).sum();
1432                if sink.is_active() {
1433                    sink.emit(
1434                        SemanticProgressEvent::EmbedBatchStart,
1435                        SemanticProgressFields {
1436                            batch_index: Some(batch_index),
1437                            batch_rows: Some(batch_rows),
1438                            rows_processed: Some(rows_processed),
1439                            rows_total,
1440                            bytes: Some(batch_bytes),
1441                            ..Default::default()
1442                        },
1443                    );
1444                }
1445                flush_prepared_batch(batch, &mut embeddings, &pb, self.embedder.as_ref())?;
1446                rows_processed = rows_processed.saturating_add(batch_rows);
1447                if sink.is_active() {
1448                    sink.emit(
1449                        SemanticProgressEvent::EmbedBatchDone,
1450                        SemanticProgressFields {
1451                            batch_index: Some(batch_index),
1452                            batch_rows: Some(batch_rows),
1453                            rows_processed: Some(rows_processed),
1454                            rows_total,
1455                            bytes: Some(batch_bytes),
1456                            ..Default::default()
1457                        },
1458                    );
1459                }
1460                batch_index = batch_index.saturating_add(1);
1461            }
1462        }
1463
1464        if let Some(cache) = prep_memo.as_ref() {
1465            let stats = cache.stats();
1466            tracing::debug!(
1467                algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1468                algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1469                hits = stats.hits,
1470                misses = stats.misses,
1471                inserts = stats.inserts,
1472                quarantined = stats.quarantined,
1473                live_entries = stats.live_entries,
1474                entry_capacity = prep_memo_capacity,
1475                "semantic prep memo cache summary"
1476            );
1477        }
1478
1479        pb.finish_with_message("Embedding complete");
1480        Ok(embeddings)
1481    }
1482
1483    pub fn build_and_save_index<I>(
1484        &self,
1485        embedded_messages: I,
1486        data_dir: &Path,
1487    ) -> Result<FsVectorIndex>
1488    where
1489        I: IntoIterator<Item = EmbeddedMessage>,
1490    {
1491        let index_path = vector_index_path(data_dir, self.embedder_id());
1492        self.build_and_save_index_at_path(embedded_messages, &index_path)
1493    }
1494
1495    pub fn build_and_save_index_shards<I>(
1496        &self,
1497        embedded_messages: I,
1498        data_dir: &Path,
1499        plan: SemanticShardBuildPlan,
1500    ) -> Result<SemanticShardBuildOutcome>
1501    where
1502        I: IntoIterator<Item = EmbeddedMessage>,
1503    {
1504        if plan.db_fingerprint.trim().is_empty() {
1505            bail!("semantic shard build requires a non-empty DB fingerprint");
1506        }
1507        if plan.max_records_per_shard == 0 {
1508            bail!("semantic shard build requires max_records_per_shard > 0");
1509        }
1510
1511        let mut shard_records = Vec::new();
1512        let mut index_paths = Vec::new();
1513        let mut ann_index_paths = Vec::new();
1514        let mut current_records = Vec::with_capacity(plan.max_records_per_shard);
1515        let mut shard_index = 0u32;
1516        let mut total_docs = 0u64;
1517
1518        for embedded in embedded_messages {
1519            current_records.push(embedded);
1520            if current_records.len() >= plan.max_records_per_shard {
1521                let records = std::mem::take(&mut current_records);
1522                let (record, path, ann_path) =
1523                    self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1524                total_docs = total_docs.saturating_add(record.doc_count);
1525                shard_records.push(record);
1526                index_paths.push(path);
1527                if let Some(path) = ann_path {
1528                    ann_index_paths.push(path);
1529                }
1530                shard_index = shard_index
1531                    .checked_add(1)
1532                    .context("semantic shard index overflow")?;
1533            }
1534        }
1535
1536        if !current_records.is_empty() {
1537            let records = std::mem::take(&mut current_records);
1538            let (record, path, ann_path) =
1539                self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1540            total_docs = total_docs.saturating_add(record.doc_count);
1541            shard_records.push(record);
1542            index_paths.push(path);
1543            if let Some(path) = ann_path {
1544                ann_index_paths.push(path);
1545            }
1546        }
1547
1548        let shard_count = u32::try_from(shard_records.len())
1549            .context("semantic shard generation exceeded u32 shard count")?;
1550        for record in &mut shard_records {
1551            record.shard_count = shard_count;
1552        }
1553
1554        let mut shard_manifest = SemanticShardManifest::load_or_default(data_dir)
1555            .map_err(|err| anyhow::anyhow!("loading semantic shard manifest for publish: {err}"))?;
1556        shard_manifest.replace_shards_for_generation(
1557            plan.tier,
1558            self.embedder_id(),
1559            &plan.db_fingerprint,
1560            shard_records,
1561        );
1562        shard_manifest
1563            .save(data_dir)
1564            .map_err(|err| anyhow::anyhow!("saving semantic shard manifest: {err}"))?;
1565        let summary = shard_manifest.summary(plan.tier, self.embedder_id(), &plan.db_fingerprint);
1566
1567        tracing::info!(
1568            tier = plan.tier.as_str(),
1569            embedder = self.embedder_id(),
1570            shard_count,
1571            doc_count = total_docs,
1572            total_conversations = plan.total_conversations,
1573            "published semantic shard generation sidecar"
1574        );
1575
1576        Ok(SemanticShardBuildOutcome {
1577            tier: plan.tier,
1578            embedder_id: self.embedder_id().to_string(),
1579            shard_count,
1580            doc_count: total_docs,
1581            total_conversations: plan.total_conversations,
1582            index_paths,
1583            ann_index_paths,
1584            shard_manifest_path: SemanticShardManifest::path(data_dir),
1585            complete: summary.complete,
1586        })
1587    }
1588
1589    fn write_semantic_shard(
1590        &self,
1591        embedded_messages: Vec<EmbeddedMessage>,
1592        data_dir: &Path,
1593        plan: &SemanticShardBuildPlan,
1594        shard_index: u32,
1595    ) -> Result<(SemanticShardRecord, PathBuf, Option<PathBuf>)> {
1596        let started_at_ms = now_ms();
1597        let shard_path = semantic_shard_index_path(
1598            data_dir,
1599            plan.tier,
1600            self.embedder_id(),
1601            &plan.db_fingerprint,
1602            shard_index,
1603        );
1604        let shard_index_file = self.build_and_save_index_at_path(embedded_messages, &shard_path)?;
1605        let size_bytes = fs::metadata(&shard_path)
1606            .with_context(|| format!("stat semantic shard {}", shard_path.display()))?
1607            .len();
1608        let (ann_index_path, ann_size_bytes, ann_ready, ann_absolute_path) = if plan.build_ann {
1609            let ann_path = semantic_shard_ann_index_path(
1610                data_dir,
1611                plan.tier,
1612                self.embedder_id(),
1613                &plan.db_fingerprint,
1614                shard_index,
1615            );
1616            let config = FsHnswConfig {
1617                m: FS_HNSW_DEFAULT_M,
1618                ef_construction: FS_HNSW_DEFAULT_EF_CONSTRUCTION,
1619                ..FsHnswConfig::default()
1620            };
1621            let hnsw = FsHnswIndex::build_from_vector_index(&shard_index_file, config)
1622                .map_err(|err| anyhow::anyhow!("build shard HNSW index failed: {err}"))?;
1623            hnsw.save(&ann_path)
1624                .map_err(|err| anyhow::anyhow!("save shard HNSW index failed: {err}"))?;
1625            let ann_size_bytes = fs::metadata(&ann_path)
1626                .with_context(|| format!("stat semantic shard ANN {}", ann_path.display()))?
1627                .len();
1628            let relative_ann_path = ann_path
1629                .strip_prefix(data_dir)
1630                .unwrap_or(ann_path.as_path())
1631                .to_string_lossy()
1632                .to_string();
1633            (
1634                Some(relative_ann_path),
1635                ann_size_bytes,
1636                true,
1637                Some(ann_path),
1638            )
1639        } else {
1640            (None, 0, false, None)
1641        };
1642        let relative_index_path = shard_path
1643            .strip_prefix(data_dir)
1644            .unwrap_or(shard_path.as_path())
1645            .to_string_lossy()
1646            .to_string();
1647        let record = SemanticShardRecord {
1648            tier: plan.tier,
1649            embedder_id: self.embedder_id().to_string(),
1650            model_revision: plan.model_revision.clone(),
1651            schema_version: SEMANTIC_SCHEMA_VERSION,
1652            chunking_version: CHUNKING_STRATEGY_VERSION,
1653            dimension: self.embedder_dimension(),
1654            shard_index,
1655            shard_count: 0,
1656            doc_count: u64::try_from(shard_index_file.record_count()).unwrap_or(u64::MAX),
1657            total_conversations: plan.total_conversations,
1658            db_fingerprint: plan.db_fingerprint.clone(),
1659            index_path: relative_index_path,
1660            quantization: "f16".to_string(),
1661            mmap_ready: true,
1662            ann_index_path,
1663            ann_size_bytes,
1664            ann_ready,
1665            size_bytes,
1666            started_at_ms,
1667            completed_at_ms: now_ms(),
1668            ready: true,
1669        };
1670        Ok((record, shard_path, ann_absolute_path))
1671    }
1672
1673    fn build_and_save_index_at_path<I>(
1674        &self,
1675        embedded_messages: I,
1676        index_path: &Path,
1677    ) -> Result<FsVectorIndex>
1678    where
1679        I: IntoIterator<Item = EmbeddedMessage>,
1680    {
1681        if let Some(parent) = index_path.parent() {
1682            std::fs::create_dir_all(parent)?;
1683        }
1684
1685        // Store as f16 by default (smaller, faster I/O). Embeddings are validated by the writer.
1686        let mut writer: FsVectorIndexWriter = FsVectorIndex::create_with_revision(
1687            index_path,
1688            self.embedder_id(),
1689            "1.0",
1690            self.embedder_dimension(),
1691            FsQuantization::F16,
1692        )
1693        .map_err(|err| anyhow::anyhow!("create fsvi index failed: {err}"))?;
1694
1695        let write_result: Result<()> = (|| {
1696            for embedded in embedded_messages {
1697                if embedded.embedding.len() != self.embedder_dimension() {
1698                    bail!(
1699                        "embedding dimension mismatch: expected {}, got {}",
1700                        self.embedder_dimension(),
1701                        embedded.embedding.len()
1702                    );
1703                }
1704                let doc_id = semantic_doc_id_for_embedded(&embedded);
1705                writer
1706                    .write_record(&doc_id, &embedded.embedding)
1707                    .map_err(|err| anyhow::anyhow!("write fsvi record failed: {err}"))?;
1708            }
1709            Ok(())
1710        })();
1711
1712        if let Err(e) = &write_result {
1713            // Clean up partial index file to prevent corruption
1714            tracing::warn!("removing partial vector index after write failure: {e}");
1715            if let Err(rm_err) = std::fs::remove_file(index_path) {
1716                tracing::error!(
1717                    "failed to remove partial index file {}: {rm_err}",
1718                    index_path.display()
1719                );
1720            }
1721            return Err(anyhow::anyhow!("{e}"));
1722        }
1723
1724        writer
1725            .finish()
1726            .map_err(|err| anyhow::anyhow!("finish fsvi index failed: {err}"))?;
1727
1728        FsVectorIndex::open(index_path)
1729            .map_err(|err| anyhow::anyhow!("open fsvi index failed: {err}"))
1730    }
1731
1732    /// Append new embeddings to an existing FSVI index via the WAL.
1733    ///
1734    /// Used for incremental semantic indexing in watch mode. Opens the
1735    /// existing index, appends a batch of new embeddings, and compacts if
1736    /// the WAL has grown large enough.
1737    ///
1738    /// Returns the number of entries appended.
1739    pub fn append_to_index(
1740        &self,
1741        embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1742        data_dir: &Path,
1743    ) -> Result<usize> {
1744        let index_path = vector_index_path(data_dir, self.embedder_id());
1745        self.append_to_index_path(embedded_messages, &index_path)
1746    }
1747
1748    fn append_to_index_path(
1749        &self,
1750        embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1751        index_path: &Path,
1752    ) -> Result<usize> {
1753        let mut index = FsVectorIndex::open(index_path)
1754            .map_err(|err| anyhow::anyhow!("open fsvi index for append: {err}"))?;
1755
1756        let entries: Vec<(String, Vec<f32>)> = embedded_messages
1757            .into_iter()
1758            .map(|em| {
1759                let doc_id = semantic_doc_id_for_embedded(&em);
1760                (doc_id, em.embedding)
1761            })
1762            .collect();
1763
1764        let count = entries.len();
1765        if count == 0 {
1766            return Ok(0);
1767        }
1768
1769        index
1770            .append_batch(&entries)
1771            .map_err(|err| anyhow::anyhow!("append_batch: {err}"))?;
1772
1773        if index.needs_compaction() {
1774            index
1775                .compact()
1776                .map_err(|err| anyhow::anyhow!("compaction: {err}"))?;
1777        }
1778
1779        Ok(count)
1780    }
1781
1782    fn write_backfill_staging_index(
1783        &self,
1784        embedded_messages: Vec<EmbeddedMessage>,
1785        staging_path: &Path,
1786        resume_existing: bool,
1787    ) -> Result<FsVectorIndex> {
1788        if resume_existing && staging_path.exists() {
1789            self.append_to_index_path(embedded_messages, staging_path)?;
1790            FsVectorIndex::open(staging_path)
1791                .map_err(|err| anyhow::anyhow!("open staged semantic index failed: {err}"))
1792        } else {
1793            self.build_and_save_index_at_path(embedded_messages, staging_path)
1794        }
1795    }
1796
1797    pub fn run_backfill_batch(
1798        &self,
1799        messages: &[EmbeddingInput],
1800        data_dir: &Path,
1801        manifest: &mut SemanticManifest,
1802        plan: SemanticBackfillBatchPlan,
1803    ) -> Result<SemanticBackfillBatchOutcome> {
1804        self.run_backfill_batch_with_sink(
1805            messages,
1806            data_dir,
1807            manifest,
1808            plan,
1809            None,
1810            &SemanticProgressSink::disabled(),
1811        )
1812    }
1813
1814    /// Variant of [`run_backfill_batch`] that emits semantic progress
1815    /// events to the given JSONL sink and persists `last_message_id`
1816    /// into the resumable checkpoint when supplied. The sink is silent
1817    /// unless `CASS_SEMANTIC_PROGRESS_JSONL` is set, so this path is
1818    /// safe to take in production.
1819    pub fn run_backfill_batch_with_sink(
1820        &self,
1821        messages: &[EmbeddingInput],
1822        data_dir: &Path,
1823        manifest: &mut SemanticManifest,
1824        plan: SemanticBackfillBatchPlan,
1825        last_message_id: Option<i64>,
1826        sink: &SemanticProgressSink,
1827    ) -> Result<SemanticBackfillBatchOutcome> {
1828        if plan.db_fingerprint.trim().is_empty() {
1829            bail!("semantic backfill requires a non-empty DB fingerprint");
1830        }
1831        if plan.total_conversations == 0 && plan.conversations_in_batch > 0 {
1832            bail!("semantic backfill batch cannot process conversations when total is zero");
1833        }
1834
1835        let manifest_path = SemanticManifest::path(data_dir);
1836        let staging_path = semantic_staging_index_path(
1837            data_dir,
1838            plan.tier,
1839            self.embedder_id(),
1840            &plan.db_fingerprint,
1841        );
1842        let final_path = vector_index_path(data_dir, self.embedder_id());
1843
1844        let prior_checkpoint = manifest
1845            .checkpoint
1846            .as_ref()
1847            .filter(|checkpoint| {
1848                checkpoint.tier == plan.tier
1849                    && checkpoint.embedder_id == self.embedder_id()
1850                    && checkpoint.is_valid(&plan.db_fingerprint)
1851            })
1852            .cloned();
1853        let prior_conversations = prior_checkpoint
1854            .as_ref()
1855            .map_or(0, |checkpoint| checkpoint.conversations_processed);
1856        let prior_docs = prior_checkpoint
1857            .as_ref()
1858            .map_or(0, |checkpoint| checkpoint.docs_embedded);
1859
1860        let embeddings = self.embed_messages_with_sink(messages, sink)?;
1861        let embedded_docs = u64::try_from(embeddings.len()).unwrap_or(u64::MAX);
1862        if sink.is_active() {
1863            sink.emit(
1864                SemanticProgressEvent::StagingWriteStart,
1865                SemanticProgressFields {
1866                    batch_rows: Some(embedded_docs),
1867                    note: Some(staging_path.display().to_string()),
1868                    ..Default::default()
1869                },
1870            );
1871        }
1872        let mut staged_index = self.write_backfill_staging_index(
1873            embeddings,
1874            &staging_path,
1875            prior_checkpoint.is_some(),
1876        )?;
1877        if sink.is_active() {
1878            sink.emit(
1879                SemanticProgressEvent::StagingWriteDone,
1880                SemanticProgressFields {
1881                    batch_rows: Some(embedded_docs),
1882                    note: Some(staging_path.display().to_string()),
1883                    ..Default::default()
1884                },
1885            );
1886        }
1887        let conversations_processed = prior_conversations
1888            .saturating_add(plan.conversations_in_batch)
1889            .min(plan.total_conversations);
1890        let complete = conversations_processed >= plan.total_conversations;
1891
1892        manifest.refresh_backlog(plan.total_conversations, &plan.db_fingerprint);
1893
1894        if complete {
1895            let db_fingerprint = plan.db_fingerprint.clone();
1896            if staged_index.wal_record_count() > 0 {
1897                staged_index.compact().map_err(|err| {
1898                    anyhow::anyhow!("compact staged semantic index failed: {err}")
1899                })?;
1900            }
1901            drop(staged_index);
1902            if sink.is_active() {
1903                sink.emit(
1904                    SemanticProgressEvent::PublishStart,
1905                    SemanticProgressFields {
1906                        rows_processed: Some(conversations_processed),
1907                        rows_total: Some(plan.total_conversations),
1908                        last_conversation_id: Some(plan.last_offset),
1909                        last_message_id,
1910                        note: Some(final_path.display().to_string()),
1911                        ..Default::default()
1912                    },
1913                );
1914            }
1915            fs::rename(&staging_path, &final_path).with_context(|| {
1916                format!(
1917                    "publishing staged semantic index {} to {}",
1918                    staging_path.display(),
1919                    final_path.display()
1920                )
1921            })?;
1922            sync_parent_directory(&final_path)?;
1923            let published_index = FsVectorIndex::open(&final_path)
1924                .map_err(|err| anyhow::anyhow!("open published semantic index failed: {err}"))?;
1925            let size_bytes = fs::metadata(&final_path)
1926                .with_context(|| format!("stat published semantic index {}", final_path.display()))?
1927                .len();
1928            let relative_index_path = final_path
1929                .strip_prefix(data_dir)
1930                .unwrap_or(final_path.as_path())
1931                .to_string_lossy()
1932                .to_string();
1933            manifest.publish_artifact(ArtifactRecord {
1934                tier: plan.tier,
1935                embedder_id: self.embedder_id().to_string(),
1936                model_revision: plan.model_revision,
1937                schema_version: SEMANTIC_SCHEMA_VERSION,
1938                chunking_version: CHUNKING_STRATEGY_VERSION,
1939                dimension: self.embedder_dimension(),
1940                doc_count: u64::try_from(published_index.record_count()).unwrap_or(u64::MAX),
1941                conversation_count: conversations_processed,
1942                db_fingerprint: plan.db_fingerprint,
1943                index_path: relative_index_path,
1944                size_bytes,
1945                started_at_ms: prior_checkpoint
1946                    .as_ref()
1947                    .map_or_else(now_ms, |checkpoint| checkpoint.saved_at_ms),
1948                completed_at_ms: now_ms(),
1949                ready: true,
1950            });
1951            manifest.refresh_backlog(plan.total_conversations, &db_fingerprint);
1952            manifest.save(data_dir)?;
1953            if sink.is_active() {
1954                sink.emit(
1955                    SemanticProgressEvent::PublishDone,
1956                    SemanticProgressFields {
1957                        rows_processed: Some(conversations_processed),
1958                        rows_total: Some(plan.total_conversations),
1959                        last_conversation_id: Some(plan.last_offset),
1960                        last_message_id,
1961                        note: Some(final_path.display().to_string()),
1962                        ..Default::default()
1963                    },
1964                );
1965            }
1966        } else {
1967            let docs_embedded_on_disk =
1968                u64::try_from(staged_index.record_count()).unwrap_or(u64::MAX);
1969            let checkpoint_docs = prior_docs
1970                .saturating_add(embedded_docs)
1971                .max(docs_embedded_on_disk);
1972            if sink.is_active() {
1973                sink.emit(
1974                    SemanticProgressEvent::CheckpointSaveStart,
1975                    SemanticProgressFields {
1976                        rows_processed: Some(conversations_processed),
1977                        rows_total: Some(plan.total_conversations),
1978                        last_conversation_id: Some(plan.last_offset),
1979                        last_message_id,
1980                        ..Default::default()
1981                    },
1982                );
1983            }
1984            // Preserve any existing `last_message_id` cursor when the
1985            // caller did not supply a fresher one — see sub-fix 2 for
1986            // why durable message-PK resume matters.
1987            let prior_last_message_id = prior_checkpoint
1988                .as_ref()
1989                .and_then(|checkpoint| checkpoint.last_message_id);
1990            manifest.save_checkpoint(BuildCheckpoint {
1991                tier: plan.tier,
1992                embedder_id: self.embedder_id().to_string(),
1993                last_offset: plan.last_offset,
1994                docs_embedded: checkpoint_docs,
1995                conversations_processed,
1996                total_conversations: plan.total_conversations,
1997                db_fingerprint: plan.db_fingerprint,
1998                schema_version: SEMANTIC_SCHEMA_VERSION,
1999                chunking_version: CHUNKING_STRATEGY_VERSION,
2000                saved_at_ms: now_ms(),
2001                last_message_id: last_message_id.or(prior_last_message_id),
2002            });
2003            manifest.save(data_dir)?;
2004            if sink.is_active() {
2005                sink.emit(
2006                    SemanticProgressEvent::CheckpointSaveDone,
2007                    SemanticProgressFields {
2008                        rows_processed: Some(conversations_processed),
2009                        rows_total: Some(plan.total_conversations),
2010                        last_conversation_id: Some(plan.last_offset),
2011                        last_message_id: last_message_id.or(prior_last_message_id),
2012                        ..Default::default()
2013                    },
2014                );
2015            }
2016        }
2017
2018        Ok(SemanticBackfillBatchOutcome {
2019            tier: plan.tier,
2020            embedder_id: self.embedder_id().to_string(),
2021            embedded_docs,
2022            conversations_processed,
2023            total_conversations: plan.total_conversations,
2024            last_offset: plan.last_offset,
2025            checkpoint_saved: !complete,
2026            published: complete,
2027            index_path: if complete { final_path } else { staging_path },
2028            manifest_path,
2029        })
2030    }
2031
2032    pub fn run_backfill_from_storage(
2033        &self,
2034        storage: &FrankenStorage,
2035        data_dir: &Path,
2036        manifest: &mut SemanticManifest,
2037        plan: SemanticBackfillStoragePlan,
2038    ) -> Result<SemanticBackfillBatchOutcome> {
2039        self.run_backfill_from_storage_with_sink(
2040            storage,
2041            data_dir,
2042            manifest,
2043            plan,
2044            &SemanticProgressSink::disabled(),
2045        )
2046    }
2047
2048    /// Variant of [`run_backfill_from_storage`] that emits semantic
2049    /// progress events to a JSONL sink and persists `last_message_id`
2050    /// in the resumable checkpoint. The sink is silent unless
2051    /// `CASS_SEMANTIC_PROGRESS_JSONL` is set.
2052    pub fn run_backfill_from_storage_with_sink(
2053        &self,
2054        storage: &FrankenStorage,
2055        data_dir: &Path,
2056        manifest: &mut SemanticManifest,
2057        plan: SemanticBackfillStoragePlan,
2058        sink: &SemanticProgressSink,
2059    ) -> Result<SemanticBackfillBatchOutcome> {
2060        // Resume cursor: prefer `last_message_id` (sub-fix 2) when a
2061        // valid checkpoint matches this tier+embedder+db; fall back to
2062        // the conversation offset for old-shape manifests written by
2063        // pre-#257 binaries.
2064        let prior_checkpoint = manifest.checkpoint.as_ref().filter(|checkpoint| {
2065            checkpoint.tier == plan.tier
2066                && checkpoint.embedder_id == self.embedder_id()
2067                && checkpoint.is_valid(&plan.db_fingerprint)
2068        });
2069        let after_conversation_id = prior_checkpoint.map_or(0, |checkpoint| checkpoint.last_offset);
2070        let prior_last_message_id =
2071            prior_checkpoint.and_then(|checkpoint| checkpoint.last_message_id);
2072
2073        if sink.is_active() {
2074            sink.emit(
2075                SemanticProgressEvent::SelectionStart,
2076                SemanticProgressFields {
2077                    last_conversation_id: Some(after_conversation_id),
2078                    last_message_id: prior_last_message_id,
2079                    rows_total: Some(plan.max_conversations as u64),
2080                    note: Some(plan.tier.as_str().to_string()),
2081                    ..Default::default()
2082                },
2083            );
2084        }
2085
2086        let batch = match fetch_canonical_embedding_batch_inner(
2087            storage,
2088            after_conversation_id,
2089            plan.max_conversations,
2090            prior_last_message_id,
2091        ) {
2092            Ok(batch) => batch,
2093            Err(err) => {
2094                if sink.is_active() {
2095                    sink.emit(
2096                        SemanticProgressEvent::Error,
2097                        SemanticProgressFields {
2098                            error: Some(format!("selection: {err}")),
2099                            last_conversation_id: Some(after_conversation_id),
2100                            last_message_id: prior_last_message_id,
2101                            ..Default::default()
2102                        },
2103                    );
2104                }
2105                return Err(err);
2106            }
2107        };
2108
2109        if sink.is_active() {
2110            sink.emit(
2111                SemanticProgressEvent::SelectionDone,
2112                SemanticProgressFields {
2113                    last_conversation_id: Some(batch.last_conversation_id),
2114                    last_message_id: prior_last_message_id,
2115                    conversations_in_batch: Some(batch.conversations_in_batch),
2116                    rows_total: Some(batch.total_conversations),
2117                    ..Default::default()
2118                },
2119            );
2120            // PacketReplay {start,progress,done} bracket the
2121            // envelope/messages/packet build done by
2122            // `fetch_canonical_embedding_batch`. We can't easily plumb
2123            // a callback inside that helper without refactoring it, so
2124            // the start/done bracket here straddles the work that
2125            // already happened (replay always finishes before we see
2126            // the result). A future refactor — flagged in the
2127            // SQL-shape follow-up — can move the packet-replay work
2128            // into a streaming iterator and emit `progress` ticks
2129            // per conversation. For now, the bracket still gives
2130            // operators a clear "we got past replay" signal.
2131            sink.emit(
2132                SemanticProgressEvent::PacketReplayStart,
2133                SemanticProgressFields {
2134                    conversations_in_batch: Some(batch.conversations_in_batch),
2135                    ..Default::default()
2136                },
2137            );
2138            sink.emit(
2139                SemanticProgressEvent::PacketReplayProgress,
2140                SemanticProgressFields {
2141                    conversations_in_batch: Some(batch.conversations_in_batch),
2142                    rows_processed: Some(batch.inputs.len() as u64),
2143                    bytes: Some(batch.inputs.iter().map(|i| i.content.len() as u64).sum()),
2144                    ..Default::default()
2145                },
2146            );
2147            sink.emit(
2148                SemanticProgressEvent::PacketReplayDone,
2149                SemanticProgressFields {
2150                    conversations_in_batch: Some(batch.conversations_in_batch),
2151                    rows_processed: Some(batch.inputs.len() as u64),
2152                    ..Default::default()
2153                },
2154            );
2155        }
2156
2157        // Compute the freshest `last_message_id` from the inputs we are
2158        // about to embed. EmbeddingInput.message_id is u64 (canonical
2159        // message PK); we coerce to i64 since the manifest stores i64.
2160        let batch_last_message_id = batch
2161            .inputs
2162            .iter()
2163            .map(|input| i64::try_from(input.message_id).unwrap_or(i64::MAX))
2164            .max();
2165        let next_last_message_id = match (prior_last_message_id, batch_last_message_id) {
2166            (Some(prior), Some(batch_max)) => Some(prior.max(batch_max)),
2167            (Some(prior), None) => Some(prior),
2168            (None, Some(batch_max)) => Some(batch_max),
2169            (None, None) => None,
2170        };
2171
2172        let outcome = self.run_backfill_batch_with_sink(
2173            &batch.inputs,
2174            data_dir,
2175            manifest,
2176            SemanticBackfillBatchPlan {
2177                tier: plan.tier,
2178                db_fingerprint: plan.db_fingerprint,
2179                model_revision: plan.model_revision,
2180                total_conversations: batch.total_conversations,
2181                conversations_in_batch: batch.conversations_in_batch,
2182                last_offset: batch.last_conversation_id,
2183            },
2184            next_last_message_id,
2185            sink,
2186        );
2187
2188        match &outcome {
2189            Ok(o) => {
2190                if sink.is_active() {
2191                    sink.emit(
2192                        SemanticProgressEvent::Complete,
2193                        SemanticProgressFields {
2194                            rows_processed: Some(o.conversations_processed),
2195                            rows_total: Some(o.total_conversations),
2196                            last_conversation_id: Some(o.last_offset),
2197                            last_message_id: next_last_message_id,
2198                            ..Default::default()
2199                        },
2200                    );
2201                }
2202            }
2203            Err(err) => {
2204                if sink.is_active() {
2205                    sink.emit(
2206                        SemanticProgressEvent::Error,
2207                        SemanticProgressFields {
2208                            error: Some(err.to_string()),
2209                            last_conversation_id: Some(batch.last_conversation_id),
2210                            last_message_id: next_last_message_id,
2211                            ..Default::default()
2212                        },
2213                    );
2214                }
2215            }
2216        }
2217
2218        outcome
2219    }
2220
2221    /// Build and save an HNSW index for approximate nearest neighbor search.
2222    ///
2223    /// This creates an HNSW graph structure from the existing VectorIndex,
2224    /// enabling O(log n) approximate search with the `--approximate` flag.
2225    ///
2226    /// # Arguments
2227    /// * `vector_index` - The VectorIndex to build HNSW from
2228    /// * `data_dir` - Directory to save the HNSW index
2229    /// * `m` - Max connections per node (default: 16)
2230    /// * `ef_construction` - Search width during build (default: 200)
2231    ///
2232    /// # Returns
2233    /// Path to the saved HNSW index file
2234    pub fn build_hnsw_index(
2235        &self,
2236        vector_index: &FsVectorIndex,
2237        data_dir: &Path,
2238        m: Option<usize>,
2239        ef_construction: Option<usize>,
2240    ) -> Result<PathBuf> {
2241        let m = m.unwrap_or(FS_HNSW_DEFAULT_M);
2242        let ef_construction = ef_construction.unwrap_or(FS_HNSW_DEFAULT_EF_CONSTRUCTION);
2243
2244        tracing::info!(
2245            embedder = self.embedder_id(),
2246            count = vector_index.record_count(),
2247            m,
2248            ef_construction,
2249            "Building HNSW index for approximate nearest neighbor search"
2250        );
2251
2252        let config = FsHnswConfig {
2253            m,
2254            ef_construction,
2255            ..FsHnswConfig::default()
2256        };
2257        let hnsw = FsHnswIndex::build_from_vector_index(vector_index, config)
2258            .map_err(|err| anyhow::anyhow!("build HNSW index failed: {err}"))?;
2259
2260        let hnsw_path = hnsw_index_path(data_dir, self.embedder_id());
2261        if let Some(parent) = hnsw_path.parent() {
2262            std::fs::create_dir_all(parent)?;
2263        }
2264        hnsw.save(&hnsw_path)
2265            .map_err(|err| anyhow::anyhow!("save HNSW index failed: {err}"))?;
2266
2267        tracing::info!(?hnsw_path, "Saved HNSW index");
2268        Ok(hnsw_path)
2269    }
2270}
2271
2272#[cfg(test)]
2273mod tests {
2274    use super::*;
2275    use crate::model::types::{Agent, AgentKind, Conversation, Message, MessageRole};
2276    use crate::storage::sqlite::FrankenStorage;
2277    use serde_json::json;
2278    use std::path::Path;
2279    use tempfile::tempdir;
2280
2281    #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2282    struct ComparableSemanticInput {
2283        message_id: u64,
2284        created_at_ms: i64,
2285        agent_id: u32,
2286        workspace_id: u32,
2287        source_id: u32,
2288        role: u8,
2289        content: String,
2290    }
2291
2292    fn comparable_semantic_inputs(mut inputs: Vec<EmbeddingInput>) -> Vec<ComparableSemanticInput> {
2293        let mut comparable: Vec<ComparableSemanticInput> = inputs
2294            .drain(..)
2295            .map(|input| ComparableSemanticInput {
2296                message_id: input.message_id,
2297                created_at_ms: input.created_at_ms,
2298                agent_id: input.agent_id,
2299                workspace_id: input.workspace_id,
2300                source_id: input.source_id,
2301                role: input.role,
2302                content: input.content,
2303            })
2304            .collect();
2305        comparable.sort();
2306        comparable
2307    }
2308
2309    fn test_conversation(external_id: &str, body: &str) -> Conversation {
2310        test_conversation_fixture(
2311            external_id,
2312            vec![Message {
2313                id: None,
2314                idx: 0,
2315                role: MessageRole::User,
2316                author: None,
2317                created_at: Some(1_700_000_000_500),
2318                content: body.to_string(),
2319                extra_json: json!({}),
2320                snippets: Vec::new(),
2321            }],
2322            "local",
2323            None,
2324        )
2325    }
2326
2327    fn test_conversation_with_messages(external_id: &str, messages: Vec<Message>) -> Conversation {
2328        test_conversation_fixture(external_id, messages, "remote-laptop", Some("builder-host"))
2329    }
2330
2331    fn test_conversation_fixture(
2332        external_id: &str,
2333        messages: Vec<Message>,
2334        source_id: &str,
2335        origin_host: Option<&str>,
2336    ) -> Conversation {
2337        Conversation {
2338            id: None,
2339            agent_slug: "codex".to_string(),
2340            workspace: None,
2341            external_id: Some(external_id.to_string()),
2342            title: Some(format!("semantic {external_id}")),
2343            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2344            started_at: Some(1_700_000_000_000),
2345            ended_at: Some(1_700_000_001_000),
2346            approx_tokens: None,
2347            metadata_json: json!({}),
2348            messages,
2349            source_id: source_id.to_string(),
2350            origin_host: origin_host.map(str::to_string),
2351        }
2352    }
2353
2354    fn default_scheduler_signals() -> SemanticBackfillSchedulerSignals {
2355        SemanticBackfillSchedulerSignals {
2356            foreground_pressure: false,
2357            lexical_repair_active: false,
2358            force: false,
2359            operator_disabled: false,
2360        }
2361    }
2362
2363    struct EnvVarGuard {
2364        key: &'static str,
2365        prior: Option<String>,
2366    }
2367
2368    impl EnvVarGuard {
2369        fn set(key: &'static str, value: &str) -> Self {
2370            let prior = std::env::var(key).ok();
2371            // SAFETY: focused tests temporarily mutate process env and restore on drop.
2372            unsafe {
2373                std::env::set_var(key, value);
2374            }
2375            Self { key, prior }
2376        }
2377
2378        fn remove(key: &'static str) -> Self {
2379            let prior = std::env::var(key).ok();
2380            // SAFETY: focused tests temporarily mutate process env and restore on drop.
2381            unsafe {
2382                std::env::remove_var(key);
2383            }
2384            Self { key, prior }
2385        }
2386    }
2387
2388    impl Drop for EnvVarGuard {
2389        fn drop(&mut self) {
2390            // SAFETY: restores the process env value captured by this test guard.
2391            unsafe {
2392                match self.prior.as_deref() {
2393                    Some(value) => std::env::set_var(self.key, value),
2394                    None => std::env::remove_var(self.key),
2395                }
2396            }
2397        }
2398    }
2399
2400    #[test]
2401    fn semantic_backfill_scheduler_runs_and_scales_batch_under_idle_budget() {
2402        let policy = SemanticPolicy::compiled_defaults();
2403        let decision = semantic_backfill_scheduler_decision_for_capacity(
2404            &policy,
2405            64,
2406            &default_scheduler_signals(),
2407            80,
2408        );
2409
2410        assert!(decision.should_run());
2411        assert_eq!(decision.state, SemanticBackfillSchedulerState::Running);
2412        assert_eq!(
2413            decision.reason,
2414            SemanticBackfillSchedulerReason::IdleBudgetAvailable
2415        );
2416        assert_eq!(decision.scheduled_batch_conversations, 51);
2417        assert_eq!(decision.current_capacity_pct, 80);
2418        assert_eq!(decision.next_eligible_after_ms, 0);
2419    }
2420
2421    #[test]
2422    fn semantic_backfill_scheduler_reason_next_steps_are_stable() {
2423        for (reason, expected) in [
2424            (
2425                SemanticBackfillSchedulerReason::IdleBudgetAvailable,
2426                "background semantic backfill is within idle budgets",
2427            ),
2428            (
2429                SemanticBackfillSchedulerReason::OperatorDisabled,
2430                "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE",
2431            ),
2432            (
2433                SemanticBackfillSchedulerReason::PolicyDisabled,
2434                "semantic policy disables background semantic backfill",
2435            ),
2436            (
2437                SemanticBackfillSchedulerReason::ForegroundPressure,
2438                "foreground pressure is present; retry after the idle delay",
2439            ),
2440            (
2441                SemanticBackfillSchedulerReason::LexicalRepairActive,
2442                "lexical repair is active; semantic backfill is yielding",
2443            ),
2444            (
2445                SemanticBackfillSchedulerReason::CapacityBelowFloor,
2446                "machine responsiveness capacity is below the semantic backfill floor",
2447            ),
2448            (
2449                SemanticBackfillSchedulerReason::ThreadBudgetZero,
2450                "semantic backfill thread budget is zero",
2451            ),
2452            (
2453                SemanticBackfillSchedulerReason::BatchBudgetZero,
2454                "semantic backfill batch budget is zero",
2455            ),
2456        ] {
2457            assert_eq!(reason.next_step(), expected, "{reason:?}");
2458        }
2459    }
2460
2461    #[test]
2462    fn semantic_backfill_scheduler_yields_to_foreground_and_lexical_pressure() {
2463        let policy = SemanticPolicy::compiled_defaults();
2464        let foreground = SemanticBackfillSchedulerSignals {
2465            foreground_pressure: true,
2466            ..default_scheduler_signals()
2467        };
2468        let foreground_decision =
2469            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &foreground, 100);
2470        assert!(!foreground_decision.should_run());
2471        assert_eq!(
2472            foreground_decision.state,
2473            SemanticBackfillSchedulerState::Paused
2474        );
2475        assert_eq!(
2476            foreground_decision.reason,
2477            SemanticBackfillSchedulerReason::ForegroundPressure
2478        );
2479        assert_eq!(
2480            foreground_decision.next_eligible_after_ms,
2481            policy.idle_delay_seconds * 1000
2482        );
2483
2484        let lexical_repair = SemanticBackfillSchedulerSignals {
2485            lexical_repair_active: true,
2486            ..default_scheduler_signals()
2487        };
2488        let lexical_decision =
2489            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &lexical_repair, 100);
2490        assert!(!lexical_decision.should_run());
2491        assert_eq!(
2492            lexical_decision.state,
2493            SemanticBackfillSchedulerState::Paused
2494        );
2495        assert_eq!(
2496            lexical_decision.reason,
2497            SemanticBackfillSchedulerReason::LexicalRepairActive
2498        );
2499    }
2500
2501    #[test]
2502    fn semantic_backfill_scheduler_honors_policy_disable_and_force_override() {
2503        let mut policy = SemanticPolicy::compiled_defaults();
2504        policy.mode = crate::search::policy::SemanticMode::LexicalOnly;
2505
2506        let disabled = semantic_backfill_scheduler_decision_for_capacity(
2507            &policy,
2508            64,
2509            &default_scheduler_signals(),
2510            100,
2511        );
2512        assert!(!disabled.should_run());
2513        assert_eq!(disabled.state, SemanticBackfillSchedulerState::Disabled);
2514        assert_eq!(
2515            disabled.reason,
2516            SemanticBackfillSchedulerReason::PolicyDisabled
2517        );
2518
2519        let forced = SemanticBackfillSchedulerSignals {
2520            force: true,
2521            ..default_scheduler_signals()
2522        };
2523        let forced_decision =
2524            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &forced, 100);
2525        assert!(forced_decision.should_run());
2526        assert_eq!(
2527            forced_decision.reason,
2528            SemanticBackfillSchedulerReason::IdleBudgetAvailable
2529        );
2530        assert!(forced_decision.forced);
2531    }
2532
2533    #[test]
2534    fn test_batch_embedding() {
2535        let indexer = SemanticIndexer::new("hash", None).unwrap();
2536        let messages = vec![
2537            EmbeddingInput::new(1, "Hello world"),
2538            EmbeddingInput::new(2, "Goodbye world"),
2539        ];
2540
2541        let embeddings = indexer.embed_messages(&messages).unwrap();
2542
2543        assert_eq!(embeddings.len(), 2);
2544        assert_eq!(embeddings[0].message_id, 1);
2545        assert_eq!(embeddings[1].message_id, 2);
2546        assert_eq!(embeddings[0].embedding.len(), indexer.embedder_dimension());
2547    }
2548
2549    #[test]
2550    fn test_progress_indicator() {
2551        let indexer = SemanticIndexer::new("hash", None).unwrap();
2552        let messages: Vec<_> = (0..1000)
2553            .map(|i| EmbeddingInput::new(i as u64, format!("Message {}", i)))
2554            .collect();
2555
2556        let embeddings = indexer.embed_messages(&messages).unwrap();
2557        assert_eq!(embeddings.len(), messages.len());
2558    }
2559
2560    #[test]
2561    fn test_build_and_save_index() {
2562        let indexer = SemanticIndexer::new("hash", None).unwrap();
2563        let messages = vec![
2564            EmbeddingInput::new(1, "Hello world"),
2565            EmbeddingInput::new(2, "Goodbye world"),
2566        ];
2567
2568        let embeddings = indexer.embed_messages(&messages).unwrap();
2569        let tmp = tempdir().unwrap();
2570        let index = indexer
2571            .build_and_save_index(embeddings, tmp.path())
2572            .unwrap();
2573        assert_eq!(index.embedder_id(), indexer.embedder_id());
2574        assert_eq!(index.dimension(), indexer.embedder_dimension());
2575        assert_eq!(index.record_count(), 2);
2576    }
2577
2578    #[test]
2579    fn sharded_index_build_writes_sidecar_without_runtime_publish() {
2580        let indexer = SemanticIndexer::new("hash", None).unwrap();
2581        let messages: Vec<_> = (0..5)
2582            .map(|idx| EmbeddingInput::new(idx, format!("semantic shard message {idx}")))
2583            .collect();
2584        let embeddings = indexer.embed_messages(&messages).unwrap();
2585        let tmp = tempdir().unwrap();
2586
2587        let outcome = indexer
2588            .build_and_save_index_shards(
2589                embeddings,
2590                tmp.path(),
2591                SemanticShardBuildPlan {
2592                    tier: TierKind::Fast,
2593                    db_fingerprint: "db-fp-sharded-build".to_string(),
2594                    model_revision: "hash".to_string(),
2595                    total_conversations: 5,
2596                    max_records_per_shard: 2,
2597                    build_ann: false,
2598                },
2599            )
2600            .unwrap();
2601
2602        assert_eq!(outcome.shard_count, 3);
2603        assert_eq!(outcome.doc_count, 5);
2604        assert_eq!(outcome.total_conversations, 5);
2605        assert!(outcome.complete);
2606        assert_eq!(outcome.index_paths.len(), 3);
2607        for path in &outcome.index_paths {
2608            let shard = FsVectorIndex::open(path).unwrap();
2609            assert_eq!(shard.embedder_id(), indexer.embedder_id());
2610            assert!(shard.record_count() > 0);
2611        }
2612
2613        let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2614        let summary = shards.summary(TierKind::Fast, indexer.embedder_id(), "db-fp-sharded-build");
2615        assert!(summary.complete);
2616        assert_eq!(summary.ready_shards, 3);
2617        assert_eq!(summary.ann_ready_shards, 0);
2618        assert_eq!(summary.doc_count, 5);
2619        assert_eq!(summary.total_conversations, 5);
2620
2621        assert!(
2622            SemanticManifest::load(tmp.path()).unwrap().is_none(),
2623            "sidecar shards must not publish the main runtime manifest"
2624        );
2625        assert!(!vector_index_path(tmp.path(), indexer.embedder_id()).exists());
2626    }
2627
2628    #[test]
2629    fn sharded_index_build_rejects_zero_sized_shards() {
2630        let indexer = SemanticIndexer::new("hash", None).unwrap();
2631        let err = indexer
2632            .build_and_save_index_shards(
2633                std::iter::empty(),
2634                tempdir().unwrap().path(),
2635                SemanticShardBuildPlan {
2636                    tier: TierKind::Fast,
2637                    db_fingerprint: "db-fp-sharded-build".to_string(),
2638                    model_revision: "hash".to_string(),
2639                    total_conversations: 0,
2640                    max_records_per_shard: 0,
2641                    build_ann: false,
2642                },
2643            )
2644            .unwrap_err();
2645
2646        assert!(err.to_string().contains("max_records_per_shard > 0"));
2647    }
2648
2649    #[test]
2650    fn sharded_ann_build_records_per_shard_accelerators() {
2651        let indexer = SemanticIndexer::new("hash", None).unwrap();
2652        let messages: Vec<_> = (0..8)
2653            .map(|idx| EmbeddingInput::new(idx, format!("semantic ann shard message {idx}")))
2654            .collect();
2655        let embeddings = indexer.embed_messages(&messages).unwrap();
2656        let tmp = tempdir().unwrap();
2657
2658        let outcome = indexer
2659            .build_and_save_index_shards(
2660                embeddings,
2661                tmp.path(),
2662                SemanticShardBuildPlan {
2663                    tier: TierKind::Fast,
2664                    db_fingerprint: "db-fp-sharded-ann-build".to_string(),
2665                    model_revision: "hash".to_string(),
2666                    total_conversations: 8,
2667                    max_records_per_shard: 4,
2668                    build_ann: true,
2669                },
2670            )
2671            .unwrap();
2672
2673        assert_eq!(outcome.shard_count, 2);
2674        assert_eq!(outcome.ann_index_paths.len(), 2);
2675        for path in &outcome.ann_index_paths {
2676            assert!(path.exists(), "ANN shard missing at {}", path.display());
2677        }
2678
2679        let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2680        let summary = shards.summary(
2681            TierKind::Fast,
2682            indexer.embedder_id(),
2683            "db-fp-sharded-ann-build",
2684        );
2685        assert!(summary.complete);
2686        assert_eq!(summary.ann_ready_shards, 2);
2687        assert!(summary.ann_size_bytes > 0);
2688        assert!(
2689            shards
2690                .shards
2691                .iter()
2692                .all(|record| record.ann_index_path.is_some() && record.ann_ready)
2693        );
2694    }
2695
2696    /// Golden-output regression: any change to the embedding prep pipeline,
2697    /// the canonicalizer, the hash embedder's deterministic projection, or
2698    /// the ordering semantics of `embed_messages` must not silently mutate
2699    /// the bytes we write to the vector index. This digest is derived from a
2700    /// frozen 64-message corpus processed through the hash embedder; a
2701    /// mismatch means one of those contracts moved.
2702    #[test]
2703    fn embed_messages_golden_digest_hash_embedder() {
2704        use ring::digest::{Context, SHA256};
2705
2706        let corpus: Vec<EmbeddingInput> = (0..64)
2707            .map(|i| {
2708                let body = match i % 5 {
2709                    0 => format!("plain text message number {i}"),
2710                    1 => format!("**bold** line {i} with _emphasis_"),
2711                    2 => format!("```rust\nfn f_{i}() {{ println!(\"{i}\"); }}\n```"),
2712                    3 => format!("   whitespace {i}   "),
2713                    _ => format!("unicode \u{00E9}\u{0301} + emoji \u{1F600} {i}"),
2714                };
2715                EmbeddingInput::new(i as u64, body)
2716            })
2717            .collect();
2718
2719        let indexer = SemanticIndexer::new("hash", None)
2720            .unwrap()
2721            .with_batch_size(16)
2722            .unwrap();
2723        let embeddings = indexer.embed_messages(&corpus).unwrap();
2724
2725        // Digest over (message_id, content_hash, embedding f32 bytes) for every
2726        // embedded message, in the order emitted. Preserves order + content +
2727        // numeric equality without having to compare raw floats directly.
2728        let mut ctx = Context::new(&SHA256);
2729        for em in &embeddings {
2730            ctx.update(&em.message_id.to_le_bytes());
2731            ctx.update(&em.content_hash);
2732            for v in &em.embedding {
2733                ctx.update(&v.to_le_bytes());
2734            }
2735        }
2736        let digest = hex::encode(ctx.finish().as_ref());
2737
2738        // Captured 2026-04-21 against a freshly built hash embedder, batch
2739        // size 16, the frozen 64-message corpus above. Stable so long as
2740        // the prep pipeline, canonicalizer, and HashEmbedder::embed
2741        // implementation are all byte-preserving. If you intentionally
2742        // changed any of those, update this value AND record the reason
2743        // in the commit message.
2744        const EXPECTED: &str = "22d9ae7076925a4b70a194b0f519dfb1d465cc757368c296ef24055a02038c2c";
2745        assert_eq!(
2746            digest, EXPECTED,
2747            "embed_messages golden digest drifted; if this was intentional, \
2748             update EXPECTED in this test and record the reason in the commit message"
2749        );
2750    }
2751
2752    #[test]
2753    fn parallel_prep_matches_serial_prep_bitwise() {
2754        // Mix of short, long, empty, markdown, code-block, and unicode inputs
2755        // to make sure the canonicalizer is exercised across all of its paths.
2756        let inputs: Vec<EmbeddingInput> = (0..500)
2757            .map(|i| {
2758                let text = match i % 7 {
2759                    0 => format!("Plain message number {i} with some ordinary words."),
2760                    1 => format!("**Bold** and _italic_ markdown line {i}"),
2761                    2 => format!(
2762                        "```rust\nfn example_{i}() {{\n    println!(\"code block {i}\");\n}}\n```\nfollow-up text"
2763                    ),
2764                    3 => String::new(), // empty — should be filtered
2765                    4 => format!("   whitespace   galore   {i}   "),
2766                    5 => format!("Unicode \u{00E9}\u{0301} (combining accent) and emoji \u{1F600} line {i}"),
2767                    _ => format!(
2768                        "Mixed line {i}: `inline_code`, [link](http://x), {{braces}}, and \u{201C}curly quotes\u{201D}."
2769                    ),
2770                };
2771                EmbeddingInput::new(i as u64, text)
2772            })
2773            .collect();
2774
2775        let serial = prepare_window(&inputs, true);
2776        let parallel = prepare_window(&inputs, false);
2777
2778        assert_eq!(
2779            serial.len(),
2780            parallel.len(),
2781            "serial and parallel prep should skip the same number of empty canonicals"
2782        );
2783
2784        for (s, p) in serial.iter().zip(parallel.iter()) {
2785            assert_eq!(
2786                s.msg.message_id, p.msg.message_id,
2787                "ordering must be preserved between serial and parallel prep"
2788            );
2789            assert_eq!(
2790                s.canonical, p.canonical,
2791                "canonical form diverged between serial and parallel prep"
2792            );
2793            assert_eq!(
2794                s.hash, p.hash,
2795                "content hash diverged between serial and parallel prep"
2796            );
2797        }
2798    }
2799
2800    #[test]
2801    fn parallel_prep_filters_empty_canonicals() {
2802        let inputs = vec![
2803            EmbeddingInput::new(1, "valid content"),
2804            EmbeddingInput::new(2, ""),
2805            EmbeddingInput::new(3, "   \n\n   \t  "),
2806            EmbeddingInput::new(4, "more valid content"),
2807        ];
2808
2809        let prepared = prepare_window(&inputs, false);
2810        let ids: Vec<u64> = prepared.iter().map(|p| p.msg.message_id).collect();
2811
2812        assert!(ids.contains(&1));
2813        assert!(ids.contains(&4));
2814        // ids 2 and 3 should be dropped because their canonicals are empty.
2815        assert!(!ids.contains(&2));
2816        assert!(!ids.contains(&3));
2817    }
2818
2819    #[test]
2820    fn memoized_serial_prep_matches_stateless_prepare_window() {
2821        let inputs = vec![
2822            EmbeddingInput::new(1, "repeat me exactly"),
2823            EmbeddingInput::new(2, "repeat me exactly"),
2824            EmbeddingInput::new(3, "unique payload"),
2825            EmbeddingInput::new(4, ""),
2826            EmbeddingInput::new(5, "repeat me exactly"),
2827        ];
2828
2829        let baseline = prepare_window(&inputs, true);
2830        let mut cache = ContentAddressedMemoCache::with_capacity(16);
2831        let memoized = prepare_window_with_memo(&inputs, &mut cache);
2832
2833        assert_eq!(baseline.len(), memoized.len());
2834        for (plain, cached) in baseline.iter().zip(memoized.iter()) {
2835            assert_eq!(plain.msg.message_id, cached.msg.message_id);
2836            assert_eq!(plain.canonical, cached.canonical);
2837            assert_eq!(plain.hash, cached.hash);
2838        }
2839    }
2840
2841    #[test]
2842    fn semantic_prep_memo_key_uses_stable_content_hash_bytes() {
2843        let key = semantic_prep_memo_key("repeat me exactly");
2844        let expected = content_hash("repeat me exactly");
2845
2846        assert_eq!(key.content_hash.as_bytes(), expected.as_slice());
2847        assert_eq!(key.content_hash.as_bytes().len(), expected.len());
2848        assert_eq!(key.algorithm, SEMANTIC_PREP_MEMO_ALGORITHM);
2849        assert_eq!(key.algorithm_version, SEMANTIC_PREP_MEMO_VERSION);
2850    }
2851
2852    #[test]
2853    fn memoized_serial_prep_reuses_duplicate_content_across_windows() {
2854        let inputs = vec![
2855            EmbeddingInput::new(1, "repeat me exactly"),
2856            EmbeddingInput::new(2, "repeat me exactly"),
2857            EmbeddingInput::new(3, "unique payload"),
2858            EmbeddingInput::new(4, ""),
2859            EmbeddingInput::new(5, "repeat me exactly"),
2860        ];
2861
2862        let mut cache = ContentAddressedMemoCache::with_capacity(16);
2863        let prepared = prepare_window_with_memo(&inputs, &mut cache);
2864        let stats = cache.stats().clone();
2865
2866        assert_eq!(prepared.len(), 4);
2867        assert_eq!(stats.hits, 2);
2868        assert_eq!(stats.misses, 3);
2869        assert_eq!(stats.inserts, 2);
2870        assert_eq!(stats.live_entries, 2);
2871    }
2872
2873    #[test]
2874    fn packet_embedding_inputs_reuse_memoized_prep_for_duplicate_content() -> Result<()> {
2875        let temp = tempdir().unwrap();
2876        let db_path = temp.path().join("agent_search.db");
2877        let storage = FrankenStorage::open(&db_path)?;
2878        let agent_id = storage.ensure_agent(&Agent {
2879            id: None,
2880            slug: "codex".to_string(),
2881            name: "Codex".to_string(),
2882            version: None,
2883            kind: AgentKind::Cli,
2884        })?;
2885
2886        storage.insert_conversation_tree(
2887            agent_id,
2888            None,
2889            &test_conversation_with_messages(
2890                "packet-memo-conv-one",
2891                vec![
2892                    Message {
2893                        id: None,
2894                        idx: 0,
2895                        role: MessageRole::User,
2896                        author: None,
2897                        created_at: Some(1_700_000_010_100),
2898                        content: "shared semantic payload".to_string(),
2899                        extra_json: json!({}),
2900                        snippets: Vec::new(),
2901                    },
2902                    Message {
2903                        id: None,
2904                        idx: 1,
2905                        role: MessageRole::Agent,
2906                        author: None,
2907                        created_at: Some(1_700_000_010_200),
2908                        content: "unique semantic payload one".to_string(),
2909                        extra_json: json!({}),
2910                        snippets: Vec::new(),
2911                    },
2912                ],
2913            ),
2914        )?;
2915        storage.insert_conversation_tree(
2916            agent_id,
2917            None,
2918            &test_conversation_with_messages(
2919                "packet-memo-conv-two",
2920                vec![
2921                    Message {
2922                        id: None,
2923                        idx: 0,
2924                        role: MessageRole::Tool,
2925                        author: None,
2926                        created_at: Some(1_700_000_010_300),
2927                        content: "shared semantic payload".to_string(),
2928                        extra_json: json!({}),
2929                        snippets: Vec::new(),
2930                    },
2931                    Message {
2932                        id: None,
2933                        idx: 1,
2934                        role: MessageRole::Agent,
2935                        author: None,
2936                        created_at: Some(1_700_000_010_400),
2937                        content: "unique semantic payload two".to_string(),
2938                        extra_json: json!({}),
2939                        snippets: Vec::new(),
2940                    },
2941                ],
2942            ),
2943        )?;
2944
2945        let packet_inputs = packet_embedding_inputs_from_storage(&storage)?;
2946        let mut cache = ContentAddressedMemoCache::with_capacity(16);
2947        let prepared = prepare_window_with_memo(&packet_inputs, &mut cache);
2948        let stats = cache.stats().clone();
2949
2950        assert_eq!(packet_inputs.len(), 4);
2951        assert_eq!(prepared.len(), 4);
2952        assert_eq!(
2953            semantic_prep_memo_key("shared semantic payload")
2954                .content_hash
2955                .as_bytes()
2956                .len(),
2957            32
2958        );
2959        assert_eq!(stats.hits, 1);
2960        assert_eq!(stats.misses, 3);
2961        assert_eq!(stats.inserts, 3);
2962        assert_eq!(stats.live_entries, 3);
2963        Ok(())
2964    }
2965
2966    #[test]
2967    fn backfill_batch_saves_checkpoint_and_staged_index_until_complete() {
2968        let temp = tempdir().unwrap();
2969        let mut manifest = SemanticManifest::default();
2970        let indexer = SemanticIndexer::new("hash", None).unwrap();
2971        let messages = vec![
2972            EmbeddingInput::new(10, "first staged semantic message"),
2973            EmbeddingInput::new(11, "second staged semantic message"),
2974        ];
2975
2976        let outcome = indexer
2977            .run_backfill_batch(
2978                &messages,
2979                temp.path(),
2980                &mut manifest,
2981                SemanticBackfillBatchPlan {
2982                    tier: TierKind::Fast,
2983                    db_fingerprint: "db-fp-backfill-partial".to_string(),
2984                    model_revision: "hash".to_string(),
2985                    total_conversations: 2,
2986                    conversations_in_batch: 1,
2987                    last_offset: 1,
2988                },
2989            )
2990            .unwrap();
2991
2992        assert!(!outcome.published);
2993        assert!(outcome.checkpoint_saved);
2994        assert!(outcome.index_path.exists());
2995        assert!(!vector_index_path(temp.path(), indexer.embedder_id()).exists());
2996        let checkpoint = manifest.checkpoint.as_ref().expect("checkpoint");
2997        assert_eq!(checkpoint.tier, TierKind::Fast);
2998        assert_eq!(checkpoint.conversations_processed, 1);
2999        assert_eq!(checkpoint.docs_embedded, 2);
3000        assert_eq!(manifest.backlog.total_conversations, 2);
3001        assert!(SemanticManifest::path(temp.path()).exists());
3002    }
3003
3004    #[test]
3005    fn backfill_batch_resumes_staged_index_and_publishes_manifest_atomically() {
3006        let temp = tempdir().unwrap();
3007        let mut manifest = SemanticManifest::default();
3008        let indexer = SemanticIndexer::new("hash", None).unwrap();
3009        let db_fingerprint = "db-fp-backfill-complete";
3010        let staging_path = semantic_staging_index_path(
3011            temp.path(),
3012            TierKind::Fast,
3013            indexer.embedder_id(),
3014            db_fingerprint,
3015        );
3016
3017        let first = vec![EmbeddingInput::new(20, "first resume batch")];
3018        let first_outcome = indexer
3019            .run_backfill_batch(
3020                &first,
3021                temp.path(),
3022                &mut manifest,
3023                SemanticBackfillBatchPlan {
3024                    tier: TierKind::Fast,
3025                    db_fingerprint: db_fingerprint.to_string(),
3026                    model_revision: "hash".to_string(),
3027                    total_conversations: 2,
3028                    conversations_in_batch: 1,
3029                    last_offset: 1,
3030                },
3031            )
3032            .unwrap();
3033        assert_eq!(first_outcome.index_path, staging_path);
3034        assert!(staging_path.exists());
3035
3036        let second = vec![EmbeddingInput::new(21, "second resume batch")];
3037        let second_outcome = indexer
3038            .run_backfill_batch(
3039                &second,
3040                temp.path(),
3041                &mut manifest,
3042                SemanticBackfillBatchPlan {
3043                    tier: TierKind::Fast,
3044                    db_fingerprint: db_fingerprint.to_string(),
3045                    model_revision: "hash".to_string(),
3046                    total_conversations: 2,
3047                    conversations_in_batch: 1,
3048                    last_offset: 2,
3049                },
3050            )
3051            .unwrap();
3052
3053        assert!(second_outcome.published);
3054        assert!(!second_outcome.checkpoint_saved);
3055        assert!(!staging_path.exists());
3056        let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3057        assert_eq!(second_outcome.index_path, final_path);
3058        assert!(final_path.exists());
3059        assert!(manifest.checkpoint.is_none());
3060        let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3061        assert!(artifact.ready);
3062        assert_eq!(artifact.conversation_count, 2);
3063        assert_eq!(artifact.doc_count, 2);
3064        assert_eq!(manifest.backlog.fast_tier_processed, 2);
3065
3066        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
3067        assert!(loaded.checkpoint.is_none());
3068        assert!(loaded.fast_tier.as_ref().is_some_and(|record| record.ready));
3069    }
3070
3071    #[test]
3072    fn backfill_publish_compacts_resumed_wal_before_rename() {
3073        let temp = tempdir().unwrap();
3074        let mut manifest = SemanticManifest::default();
3075        let indexer = SemanticIndexer::new("hash", None).unwrap();
3076        let db_fingerprint = "db-fp-backfill-small-resume";
3077        let first: Vec<EmbeddingInput> = (0..20)
3078            .map(|idx| EmbeddingInput::new(100 + idx, format!("first batch message {idx}")))
3079            .collect();
3080
3081        let first_outcome = indexer
3082            .run_backfill_batch(
3083                &first,
3084                temp.path(),
3085                &mut manifest,
3086                SemanticBackfillBatchPlan {
3087                    tier: TierKind::Fast,
3088                    db_fingerprint: db_fingerprint.to_string(),
3089                    model_revision: "hash".to_string(),
3090                    total_conversations: 2,
3091                    conversations_in_batch: 1,
3092                    last_offset: 1,
3093                },
3094            )
3095            .unwrap();
3096        assert!(first_outcome.checkpoint_saved);
3097
3098        let second = vec![EmbeddingInput::new(200, "small final resume batch")];
3099        let second_outcome = indexer
3100            .run_backfill_batch(
3101                &second,
3102                temp.path(),
3103                &mut manifest,
3104                SemanticBackfillBatchPlan {
3105                    tier: TierKind::Fast,
3106                    db_fingerprint: db_fingerprint.to_string(),
3107                    model_revision: "hash".to_string(),
3108                    total_conversations: 2,
3109                    conversations_in_batch: 1,
3110                    last_offset: 2,
3111                },
3112            )
3113            .unwrap();
3114
3115        assert!(second_outcome.published);
3116        let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3117        let mut final_wal_path = final_path.as_os_str().to_os_string();
3118        final_wal_path.push(".wal");
3119        assert!(!PathBuf::from(final_wal_path).exists());
3120
3121        let published_index = FsVectorIndex::open(&final_path).unwrap();
3122        assert_eq!(published_index.record_count(), 21);
3123        let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3124        assert_eq!(artifact.doc_count, 21);
3125        assert_eq!(artifact.conversation_count, 2);
3126    }
3127
3128    #[test]
3129    fn backfill_from_storage_fetches_canonical_batches_and_resumes() -> Result<()> {
3130        let temp = tempdir().unwrap();
3131        let db_path = temp.path().join("agent_search.db");
3132        let storage = FrankenStorage::open(&db_path)?;
3133        let agent_id = storage.ensure_agent(&Agent {
3134            id: None,
3135            slug: "codex".to_string(),
3136            name: "Codex".to_string(),
3137            version: None,
3138            kind: AgentKind::Cli,
3139        })?;
3140        storage.insert_conversation_tree(
3141            agent_id,
3142            None,
3143            &test_conversation("first", "first canonical semantic message"),
3144        )?;
3145        storage.insert_conversation_tree(
3146            agent_id,
3147            None,
3148            &test_conversation("second", "second canonical semantic message"),
3149        )?;
3150
3151        let mut manifest = SemanticManifest::default();
3152        let indexer = SemanticIndexer::new("hash", None)?;
3153
3154        let first = indexer.run_backfill_from_storage(
3155            &storage,
3156            temp.path(),
3157            &mut manifest,
3158            SemanticBackfillStoragePlan {
3159                tier: TierKind::Fast,
3160                db_fingerprint: "canonical-db-fp".to_string(),
3161                model_revision: "hash".to_string(),
3162                max_conversations: 1,
3163            },
3164        )?;
3165        assert!(!first.published);
3166        assert!(first.checkpoint_saved);
3167        assert_eq!(first.conversations_processed, 1);
3168        assert_eq!(first.total_conversations, 2);
3169        assert_eq!(first.embedded_docs, 1);
3170        assert!(first.last_offset > 0);
3171
3172        let second = indexer.run_backfill_from_storage(
3173            &storage,
3174            temp.path(),
3175            &mut manifest,
3176            SemanticBackfillStoragePlan {
3177                tier: TierKind::Fast,
3178                db_fingerprint: "canonical-db-fp".to_string(),
3179                model_revision: "hash".to_string(),
3180                max_conversations: 1,
3181            },
3182        )?;
3183        assert!(second.published);
3184        assert!(!second.checkpoint_saved);
3185        assert_eq!(second.conversations_processed, 2);
3186        assert_eq!(second.embedded_docs, 1);
3187        assert!(manifest.checkpoint.is_none());
3188        assert_eq!(
3189            manifest.fast_tier.as_ref().map(|record| record.doc_count),
3190            Some(2)
3191        );
3192        Ok(())
3193    }
3194
3195    #[test]
3196    fn canonical_embedding_batch_uses_conversation_packet_semantic_projection() -> Result<()> {
3197        let temp = tempdir().unwrap();
3198        let db_path = temp.path().join("agent_search.db");
3199        let storage = FrankenStorage::open(&db_path)?;
3200        let agent_id = storage.ensure_agent(&Agent {
3201            id: None,
3202            slug: "codex".to_string(),
3203            name: "Codex".to_string(),
3204            version: None,
3205            kind: AgentKind::Cli,
3206        })?;
3207        storage.insert_conversation_tree(
3208            agent_id,
3209            None,
3210            &test_conversation_with_messages(
3211                "packet-projection",
3212                vec![
3213                    Message {
3214                        id: None,
3215                        idx: 0,
3216                        role: MessageRole::User,
3217                        author: None,
3218                        created_at: Some(1_700_000_000_500),
3219                        content: "user semantic text".to_string(),
3220                        extra_json: json!({}),
3221                        snippets: Vec::new(),
3222                    },
3223                    Message {
3224                        id: None,
3225                        idx: 1,
3226                        role: MessageRole::Tool,
3227                        author: None,
3228                        created_at: Some(1_700_000_000_600),
3229                        content: "tool semantic text".to_string(),
3230                        extra_json: json!({}),
3231                        snippets: Vec::new(),
3232                    },
3233                    Message {
3234                        id: None,
3235                        idx: 2,
3236                        role: MessageRole::System,
3237                        author: None,
3238                        created_at: Some(1_700_000_000_700),
3239                        content: String::new(),
3240                        extra_json: json!({}),
3241                        snippets: Vec::new(),
3242                    },
3243                ],
3244            ),
3245        )?;
3246
3247        let batch = fetch_canonical_embedding_batch(&storage, 0, 1)?;
3248
3249        assert_eq!(batch.conversations_in_batch, 1);
3250        assert_eq!(batch.inputs.len(), 2);
3251        assert_eq!(batch.inputs[0].content, "user semantic text");
3252        assert_eq!(batch.inputs[1].content, "tool semantic text");
3253        assert_eq!(batch.inputs[0].role, role_code_from_str("user").unwrap());
3254        assert_eq!(batch.inputs[1].role, role_code_from_str("tool").unwrap());
3255        let normalized_source_id =
3256            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3257        let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3258        assert_eq!(batch.inputs[0].source_id, expected_hash);
3259        assert_eq!(batch.inputs[1].source_id, expected_hash);
3260        Ok(())
3261    }
3262
3263    #[test]
3264    fn canonical_embedding_batch_pushes_last_message_id_filter_into_selection() -> Result<()> {
3265        let temp = tempdir().unwrap();
3266        let db_path = temp.path().join("agent_search.db");
3267        let storage = FrankenStorage::open(&db_path)?;
3268        let agent_id = storage.ensure_agent(&Agent {
3269            id: None,
3270            slug: "codex".to_string(),
3271            name: "Codex".to_string(),
3272            version: None,
3273            kind: AgentKind::Cli,
3274        })?;
3275        storage.insert_conversation_tree(
3276            agent_id,
3277            None,
3278            &test_conversation("before-watermark", "old semantic message"),
3279        )?;
3280        let watermark: i64 = storage.raw().query_row_map(
3281            "SELECT MAX(id) FROM messages",
3282            &[] as &[ParamValue],
3283            |row| row.get_typed(0),
3284        )?;
3285        storage.insert_conversation_tree(
3286            agent_id,
3287            None,
3288            &test_conversation("after-watermark", "new semantic message"),
3289        )?;
3290
3291        let batch = fetch_canonical_embedding_batch_inner(&storage, 0, 8, Some(watermark))?;
3292
3293        assert_eq!(
3294            batch.conversations_in_batch, 1,
3295            "last_message_id must be pushed into candidate selection so old conversations are not counted in the batch"
3296        );
3297        assert_eq!(batch.total_conversations, 2);
3298        assert_eq!(batch.inputs.len(), 1);
3299        assert_eq!(batch.inputs[0].content, "new semantic message");
3300        assert!(
3301            i64::try_from(batch.inputs[0].message_id).unwrap_or(i64::MAX) > watermark,
3302            "selected semantic input must be strictly newer than the checkpoint message id"
3303        );
3304        Ok(())
3305    }
3306
3307    #[test]
3308    fn packet_embedding_inputs_from_storage_since_only_emits_new_canonical_messages() -> Result<()>
3309    {
3310        let temp = tempdir().unwrap();
3311        let db_path = temp.path().join("agent_search.db");
3312        let storage = FrankenStorage::open(&db_path)?;
3313        let agent_id = storage.ensure_agent(&Agent {
3314            id: None,
3315            slug: "codex".to_string(),
3316            name: "Codex".to_string(),
3317            version: None,
3318            kind: AgentKind::Cli,
3319        })?;
3320        storage.insert_conversation_tree(
3321            agent_id,
3322            None,
3323            &test_conversation_with_messages(
3324                "packet-delta",
3325                vec![
3326                    Message {
3327                        id: None,
3328                        idx: 0,
3329                        role: MessageRole::User,
3330                        author: None,
3331                        created_at: Some(1_700_000_000_500),
3332                        content: "existing semantic text".to_string(),
3333                        extra_json: json!({}),
3334                        snippets: Vec::new(),
3335                    },
3336                    Message {
3337                        id: None,
3338                        idx: 1,
3339                        role: MessageRole::Agent,
3340                        author: None,
3341                        created_at: Some(1_700_000_000_600),
3342                        content: "existing assistant text".to_string(),
3343                        extra_json: json!({}),
3344                        snippets: Vec::new(),
3345                    },
3346                ],
3347            ),
3348        )?;
3349        let watermark: i64 = storage.raw().query_row_map(
3350            "SELECT MAX(id) FROM messages",
3351            &[] as &[ParamValue],
3352            |row| row.get_typed(0),
3353        )?;
3354
3355        storage.insert_conversation_tree(
3356            agent_id,
3357            None,
3358            &test_conversation_with_messages(
3359                "packet-delta",
3360                vec![
3361                    Message {
3362                        id: None,
3363                        idx: 0,
3364                        role: MessageRole::User,
3365                        author: None,
3366                        created_at: Some(1_700_000_000_500),
3367                        content: "existing semantic text".to_string(),
3368                        extra_json: json!({}),
3369                        snippets: Vec::new(),
3370                    },
3371                    Message {
3372                        id: None,
3373                        idx: 1,
3374                        role: MessageRole::Agent,
3375                        author: None,
3376                        created_at: Some(1_700_000_000_600),
3377                        content: "existing assistant text".to_string(),
3378                        extra_json: json!({}),
3379                        snippets: Vec::new(),
3380                    },
3381                    Message {
3382                        id: None,
3383                        idx: 2,
3384                        role: MessageRole::Agent,
3385                        author: None,
3386                        created_at: Some(1_700_000_000_700),
3387                        content: "new packet semantic text".to_string(),
3388                        extra_json: json!({}),
3389                        snippets: Vec::new(),
3390                    },
3391                    Message {
3392                        id: None,
3393                        idx: 3,
3394                        role: MessageRole::System,
3395                        author: None,
3396                        created_at: Some(1_700_000_000_800),
3397                        content: String::new(),
3398                        extra_json: json!({}),
3399                        snippets: Vec::new(),
3400                    },
3401                ],
3402            ),
3403        )?;
3404
3405        let batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3406
3407        assert_eq!(batch.conversations_in_batch, 1);
3408        assert_eq!(batch.inputs.len(), 1);
3409        assert_eq!(batch.inputs[0].content, "new packet semantic text");
3410        assert_eq!(
3411            batch.inputs[0].role,
3412            role_code_from_str("assistant").unwrap()
3413        );
3414        let normalized_source_id =
3415            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3416        assert_eq!(
3417            batch.inputs[0].source_id,
3418            crc32fast::hash(normalized_source_id.as_bytes())
3419        );
3420        let expected_raw_max_id: i64 = storage.raw().query_row_map(
3421            "SELECT MAX(id) FROM messages",
3422            &[] as &[ParamValue],
3423            |row| row.get_typed(0),
3424        )?;
3425        assert_eq!(batch.raw_max_message_id, Some(expected_raw_max_id));
3426        Ok(())
3427    }
3428
3429    #[test]
3430    fn packet_catch_up_emits_expected_semantic_docs_after_watermark() -> Result<()> {
3431        let temp = tempdir().unwrap();
3432        let db_path = temp.path().join("agent_search.db");
3433        let storage = FrankenStorage::open(&db_path)?;
3434        let agent_id = storage.ensure_agent(&Agent {
3435            id: None,
3436            slug: "codex".to_string(),
3437            name: "Codex".to_string(),
3438            version: None,
3439            kind: AgentKind::Cli,
3440        })?;
3441        let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3442
3443        storage.insert_conversation_tree(
3444            agent_id,
3445            Some(workspace_id),
3446            &test_conversation_with_messages(
3447                "legacy-packet-semantics",
3448                vec![
3449                    Message {
3450                        id: None,
3451                        idx: 0,
3452                        role: MessageRole::User,
3453                        author: None,
3454                        created_at: Some(1_700_000_000_500),
3455                        content: "before watermark".to_string(),
3456                        extra_json: json!({}),
3457                        snippets: Vec::new(),
3458                    },
3459                    Message {
3460                        id: None,
3461                        idx: 1,
3462                        role: MessageRole::Agent,
3463                        author: None,
3464                        created_at: Some(1_700_000_000_600),
3465                        content: "before watermark assistant".to_string(),
3466                        extra_json: json!({}),
3467                        snippets: Vec::new(),
3468                    },
3469                ],
3470            ),
3471        )?;
3472
3473        let watermark: i64 = storage.raw().query_row_map(
3474            "SELECT MAX(id) FROM messages",
3475            &[] as &[ParamValue],
3476            |row| row.get_typed(0),
3477        )?;
3478
3479        storage.insert_conversation_tree(
3480            agent_id,
3481            Some(workspace_id),
3482            &test_conversation_with_messages(
3483                "legacy-packet-semantics",
3484                vec![
3485                    Message {
3486                        id: None,
3487                        idx: 0,
3488                        role: MessageRole::User,
3489                        author: None,
3490                        created_at: Some(1_700_000_000_500),
3491                        content: "before watermark".to_string(),
3492                        extra_json: json!({}),
3493                        snippets: Vec::new(),
3494                    },
3495                    Message {
3496                        id: None,
3497                        idx: 1,
3498                        role: MessageRole::Agent,
3499                        author: None,
3500                        created_at: Some(1_700_000_000_600),
3501                        content: "before watermark assistant".to_string(),
3502                        extra_json: json!({}),
3503                        snippets: Vec::new(),
3504                    },
3505                    Message {
3506                        id: None,
3507                        idx: 2,
3508                        role: MessageRole::Agent,
3509                        author: None,
3510                        created_at: Some(1_700_000_000_700),
3511                        content: "after watermark assistant".to_string(),
3512                        extra_json: json!({}),
3513                        snippets: Vec::new(),
3514                    },
3515                    Message {
3516                        id: None,
3517                        idx: 3,
3518                        role: MessageRole::System,
3519                        author: None,
3520                        created_at: Some(1_700_000_000_800),
3521                        content: String::new(),
3522                        extra_json: json!({}),
3523                        snippets: Vec::new(),
3524                    },
3525                ],
3526            ),
3527        )?;
3528        storage.insert_conversation_tree(
3529            agent_id,
3530            Some(workspace_id),
3531            &test_conversation_with_messages(
3532                "legacy-packet-semantics-second-conv",
3533                vec![
3534                    Message {
3535                        id: None,
3536                        idx: 0,
3537                        role: MessageRole::Tool,
3538                        author: None,
3539                        created_at: Some(1_700_000_000_900),
3540                        content: "after watermark tool".to_string(),
3541                        extra_json: json!({}),
3542                        snippets: Vec::new(),
3543                    },
3544                    Message {
3545                        id: None,
3546                        idx: 1,
3547                        role: MessageRole::System,
3548                        author: None,
3549                        created_at: Some(1_700_000_001_000),
3550                        content: String::new(),
3551                        extra_json: json!({}),
3552                        snippets: Vec::new(),
3553                    },
3554                ],
3555            ),
3556        )?;
3557
3558        let packet_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3559        let normalized_source_id =
3560            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3561        let source_id_hash = crc32fast::hash(normalized_source_id.as_bytes());
3562        let expected = vec![
3563            ComparableSemanticInput {
3564                message_id: u64::try_from(watermark + 1).unwrap(),
3565                created_at_ms: 1_700_000_000_700,
3566                agent_id: u32::try_from(agent_id).unwrap(),
3567                workspace_id: u32::try_from(workspace_id).unwrap(),
3568                source_id: source_id_hash,
3569                role: role_code_from_str("assistant").unwrap(),
3570                content: "after watermark assistant".to_string(),
3571            },
3572            ComparableSemanticInput {
3573                message_id: u64::try_from(watermark + 3).unwrap(),
3574                created_at_ms: 1_700_000_000_900,
3575                agent_id: u32::try_from(agent_id).unwrap(),
3576                workspace_id: u32::try_from(workspace_id).unwrap(),
3577                source_id: source_id_hash,
3578                role: role_code_from_str("tool").unwrap(),
3579                content: "after watermark tool".to_string(),
3580            },
3581        ];
3582
3583        assert_eq!(comparable_semantic_inputs(packet_batch.inputs), expected);
3584        assert_eq!(packet_batch.conversations_in_batch, 2);
3585        assert_eq!(packet_batch.raw_max_message_id, Some(watermark + 4));
3586        Ok(())
3587    }
3588
3589    #[test]
3590    fn packet_embedding_inputs_for_message_ids_matches_since_selection() -> Result<()> {
3591        let temp = tempdir().unwrap();
3592        let db_path = temp.path().join("agent_search.db");
3593        let storage = FrankenStorage::open(&db_path)?;
3594        let agent_id = storage.ensure_agent(&Agent {
3595            id: None,
3596            slug: "codex".to_string(),
3597            name: "Codex".to_string(),
3598            version: None,
3599            kind: AgentKind::Cli,
3600        })?;
3601        let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3602
3603        storage.insert_conversation_tree(
3604            agent_id,
3605            Some(workspace_id),
3606            &test_conversation_with_messages(
3607                "selected-vs-since",
3608                vec![
3609                    Message {
3610                        id: None,
3611                        idx: 0,
3612                        role: MessageRole::User,
3613                        author: None,
3614                        created_at: Some(1_700_000_100_100),
3615                        content: "before watermark".to_string(),
3616                        extra_json: json!({}),
3617                        snippets: Vec::new(),
3618                    },
3619                    Message {
3620                        id: None,
3621                        idx: 1,
3622                        role: MessageRole::Agent,
3623                        author: None,
3624                        created_at: Some(1_700_000_100_200),
3625                        content: "before watermark assistant".to_string(),
3626                        extra_json: json!({}),
3627                        snippets: Vec::new(),
3628                    },
3629                ],
3630            ),
3631        )?;
3632
3633        let watermark: i64 = storage.raw().query_row_map(
3634            "SELECT MAX(id) FROM messages",
3635            &[] as &[ParamValue],
3636            |row| row.get_typed(0),
3637        )?;
3638
3639        storage.insert_conversation_tree(
3640            agent_id,
3641            Some(workspace_id),
3642            &test_conversation_with_messages(
3643                "selected-vs-since",
3644                vec![
3645                    Message {
3646                        id: None,
3647                        idx: 0,
3648                        role: MessageRole::User,
3649                        author: None,
3650                        created_at: Some(1_700_000_100_100),
3651                        content: "before watermark".to_string(),
3652                        extra_json: json!({}),
3653                        snippets: Vec::new(),
3654                    },
3655                    Message {
3656                        id: None,
3657                        idx: 1,
3658                        role: MessageRole::Agent,
3659                        author: None,
3660                        created_at: Some(1_700_000_100_200),
3661                        content: "before watermark assistant".to_string(),
3662                        extra_json: json!({}),
3663                        snippets: Vec::new(),
3664                    },
3665                    Message {
3666                        id: None,
3667                        idx: 2,
3668                        role: MessageRole::Tool,
3669                        author: None,
3670                        created_at: Some(1_700_000_100_300),
3671                        content: "after watermark tool".to_string(),
3672                        extra_json: json!({}),
3673                        snippets: Vec::new(),
3674                    },
3675                    Message {
3676                        id: None,
3677                        idx: 3,
3678                        role: MessageRole::System,
3679                        author: None,
3680                        created_at: Some(1_700_000_100_400),
3681                        content: String::new(),
3682                        extra_json: json!({}),
3683                        snippets: Vec::new(),
3684                    },
3685                ],
3686            ),
3687        )?;
3688        storage.insert_conversation_tree(
3689            agent_id,
3690            Some(workspace_id),
3691            &test_conversation_with_messages(
3692                "selected-vs-since-second",
3693                vec![Message {
3694                    id: None,
3695                    idx: 0,
3696                    role: MessageRole::Agent,
3697                    author: None,
3698                    created_at: Some(1_700_000_100_500),
3699                    content: "after watermark assistant".to_string(),
3700                    extra_json: json!({}),
3701                    snippets: Vec::new(),
3702                }],
3703            ),
3704        )?;
3705
3706        let since_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3707        let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
3708            "SELECT DISTINCT conversation_id
3709             FROM messages
3710             WHERE id > ?1
3711             ORDER BY conversation_id ASC",
3712            &[ParamValue::from(watermark)],
3713            |row| row.get_typed(0),
3714        )?;
3715        let selected_message_ids: HashSet<i64> = storage
3716            .raw()
3717            .query_map_collect(
3718                "SELECT id
3719                 FROM messages
3720                 WHERE id > ?1
3721                 ORDER BY id ASC",
3722                &[ParamValue::from(watermark)],
3723                |row| row.get_typed(0),
3724            )?
3725            .into_iter()
3726            .collect();
3727        let selected_inputs = packet_embedding_inputs_from_storage_for_message_ids(
3728            &storage,
3729            &conversation_ids,
3730            &selected_message_ids,
3731        )?;
3732
3733        assert_eq!(
3734            comparable_semantic_inputs(selected_inputs),
3735            comparable_semantic_inputs(since_batch.inputs)
3736        );
3737        Ok(())
3738    }
3739
3740    #[test]
3741    fn default_batch_size_uses_new_value() {
3742        // The test setup must not leak a caller-provided CASS_SEMANTIC_BATCH_SIZE
3743        // override, which would mask the constant bump we're asserting on.
3744        let _guard = EnvVarGuard::remove("CASS_SEMANTIC_BATCH_SIZE");
3745        let indexer = SemanticIndexer::new("hash", None).unwrap();
3746        assert_eq!(indexer.batch_size(), DEFAULT_SEMANTIC_BATCH_SIZE);
3747    }
3748
3749    #[test]
3750    fn parallel_prep_enabled_reuses_truthy_env_parser() {
3751        for (value, expected) in [
3752            ("1", true),
3753            ("true", true),
3754            (" YeS ", true),
3755            ("on", true),
3756            ("0", false),
3757            ("false", false),
3758            ("off", false),
3759        ] {
3760            let _guard = EnvVarGuard::set("CASS_SEMANTIC_PREP_PARALLEL", value);
3761            assert_eq!(parallel_prep_enabled(), expected, "env value {value:?}");
3762        }
3763
3764        let _guard = EnvVarGuard::remove("CASS_SEMANTIC_PREP_PARALLEL");
3765        assert!(!parallel_prep_enabled());
3766    }
3767
3768    #[test]
3769    fn saturating_u64_from_usize_covers_bounds() {
3770        assert_eq!(saturating_u64_from_usize(0), 0);
3771        assert_eq!(saturating_u64_from_usize(42), 42);
3772        assert_eq!(
3773            saturating_u64_from_usize(usize::MAX),
3774            u64::try_from(usize::MAX).unwrap_or(u64::MAX)
3775        );
3776    }
3777
3778    /// `coding_agent_session_search-ibuuh.32` (sink #3 equivalence gate):
3779    /// the packet-driven `semantic_inputs_from_packets` helper must
3780    /// produce the same `EmbeddingInput` list a fresh storage replay
3781    /// returns for the same canonical corpus. Once this passes, callers
3782    /// that already hold packets (rebuild pipeline, salvage replay,
3783    /// repair flows) can drive the semantic preparation consumer
3784    /// without a second canonical-row round-trip.
3785    #[test]
3786    fn semantic_inputs_from_packets_matches_storage_replay() -> Result<()> {
3787        let temp = tempdir().unwrap();
3788        let db_path = temp.path().join("agent_search.db");
3789        let storage = FrankenStorage::open(&db_path)?;
3790
3791        let agent_id_codex = storage.ensure_agent(&Agent {
3792            id: None,
3793            slug: "codex".to_string(),
3794            name: "Codex".to_string(),
3795            version: None,
3796            kind: AgentKind::Cli,
3797        })?;
3798        let agent_id_claude = storage.ensure_agent(&Agent {
3799            id: None,
3800            slug: "claude_code".to_string(),
3801            name: "Claude Code".to_string(),
3802            version: None,
3803            kind: AgentKind::Cli,
3804        })?;
3805        let workspace_id =
3806            storage.ensure_workspace(Path::new("/tmp/semantic-equivalence-ws"), None)?;
3807
3808        // Two conversations on different agents, mixed roles, including
3809        // an empty-content system message that the semantic projection
3810        // must filter (matches the legacy storage replay).
3811        storage.insert_conversation_tree(
3812            agent_id_codex,
3813            Some(workspace_id),
3814            &test_conversation_with_messages(
3815                "packet-equiv-1",
3816                vec![
3817                    Message {
3818                        id: None,
3819                        idx: 0,
3820                        role: MessageRole::User,
3821                        author: None,
3822                        created_at: Some(1_700_000_000_500),
3823                        content: "first user prompt".to_string(),
3824                        extra_json: json!({}),
3825                        snippets: Vec::new(),
3826                    },
3827                    Message {
3828                        id: None,
3829                        idx: 1,
3830                        role: MessageRole::Agent,
3831                        author: None,
3832                        created_at: Some(1_700_000_000_600),
3833                        content: "first assistant reply".to_string(),
3834                        extra_json: json!({}),
3835                        snippets: Vec::new(),
3836                    },
3837                    Message {
3838                        id: None,
3839                        idx: 2,
3840                        role: MessageRole::System,
3841                        author: None,
3842                        created_at: Some(1_700_000_000_700),
3843                        // Empty content is filtered by both paths.
3844                        content: String::new(),
3845                        extra_json: json!({}),
3846                        snippets: Vec::new(),
3847                    },
3848                ],
3849            ),
3850        )?;
3851        storage.insert_conversation_tree(
3852            agent_id_claude,
3853            Some(workspace_id),
3854            &test_conversation_with_messages(
3855                "packet-equiv-2",
3856                vec![
3857                    Message {
3858                        id: None,
3859                        idx: 0,
3860                        role: MessageRole::Tool,
3861                        author: Some("ripgrep".to_string()),
3862                        created_at: Some(1_700_000_001_500),
3863                        content: "tool output line".to_string(),
3864                        extra_json: json!({}),
3865                        snippets: Vec::new(),
3866                    },
3867                    Message {
3868                        id: None,
3869                        idx: 1,
3870                        role: MessageRole::Agent,
3871                        author: None,
3872                        created_at: Some(1_700_000_001_600),
3873                        content: "second assistant reply".to_string(),
3874                        extra_json: json!({}),
3875                        snippets: Vec::new(),
3876                    },
3877                ],
3878            ),
3879        )?;
3880
3881        // Legacy path: the storage-driven replay that the rebuild
3882        // pipeline currently uses.
3883        let storage_inputs = packet_embedding_inputs_from_storage(&storage)?;
3884
3885        // Packet-driven path: re-fetch the canonical envelopes (so we
3886        // get the storage-internal agent/workspace ids the rebuild path
3887        // would normally pair with packets), then convert those rows
3888        // into ConversationPackets via canonical replay and feed them
3889        // through `semantic_inputs_from_packets`.
3890        let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
3891            "SELECT DISTINCT m.conversation_id
3892             FROM messages m
3893             JOIN conversations c ON c.id = m.conversation_id
3894             ORDER BY m.conversation_id ASC",
3895            &[] as &[ParamValue],
3896            |row| row.get_typed(0),
3897        )?;
3898        let envelopes = fetch_canonical_embedding_conversations(&storage, &conversation_ids)?;
3899        let mut grouped_messages =
3900            storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
3901        let mut packets: Vec<ConversationPacket> = Vec::with_capacity(envelopes.len());
3902        let mut contexts: Vec<SemanticPacketContext> = Vec::with_capacity(envelopes.len());
3903        for envelope in &envelopes {
3904            let messages = grouped_messages
3905                .remove(&envelope.conversation_id)
3906                .unwrap_or_default();
3907            let provenance = canonical_embedding_packet_provenance(envelope);
3908            let canonical = canonical_embedding_conversation(envelope, &provenance, messages);
3909            packets.push(ConversationPacket::from_canonical_replay(
3910                &canonical, provenance,
3911            ));
3912            contexts.push(SemanticPacketContext {
3913                conversation_id: envelope.conversation_id,
3914                agent_id: saturating_u32_from_i64(envelope.agent_id),
3915                workspace_id: saturating_u32_from_i64(envelope.workspace_id.unwrap_or(0)),
3916            });
3917        }
3918        let packet_inputs = semantic_inputs_from_packets(&packets, &contexts)?;
3919
3920        // The two paths must produce the same EmbeddingInput list
3921        // (sortable comparison normalizes ordering across the two
3922        // helpers' iteration orders).
3923        assert!(
3924            !storage_inputs.is_empty(),
3925            "fixture should produce non-empty semantic inputs (sanity)"
3926        );
3927        assert_eq!(
3928            comparable_semantic_inputs(storage_inputs.clone()),
3929            comparable_semantic_inputs(packet_inputs.clone()),
3930            "packet-driven semantic preparation must match storage replay byte-for-byte"
3931        );
3932
3933        // Sanity-pin a couple of contract details so a regression in
3934        // either path (e.g. role normalization or empty-content
3935        // filtering) trips a clear assertion rather than a generic
3936        // length mismatch.
3937        let storage_count = storage_inputs.len();
3938        let packet_count = packet_inputs.len();
3939        assert_eq!(
3940            storage_count, packet_count,
3941            "storage and packet semantic input counts must agree exactly"
3942        );
3943        // Empty-content system message must NOT appear in the output.
3944        assert!(
3945            packet_inputs.iter().all(|input| !input.content.is_empty()),
3946            "empty content must be filtered by the packet semantic projection"
3947        );
3948        // The remote-host source_id pins the cross-path provenance hash.
3949        let normalized_source_id =
3950            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3951        let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3952        assert!(
3953            packet_inputs
3954                .iter()
3955                .all(|input| input.source_id == expected_hash),
3956            "every emitted EmbeddingInput must hash provenance via the packet's normalized source_id"
3957        );
3958
3959        Ok(())
3960    }
3961
3962    /// Length-mismatch defense: if a caller hands `semantic_inputs_from_packets`
3963    /// a packet/context slice pair of different lengths, the helper must
3964    /// return an error rather than silently mis-correlating ids. Pinning
3965    /// this is part of the bead's "shadow / compare mode plus explicit
3966    /// kill-switch" acceptance language.
3967    #[test]
3968    fn semantic_inputs_from_packets_rejects_length_mismatch() {
3969        let provenance = ConversationPacketProvenance::local();
3970        let canonical = test_conversation("packet-mismatch", "hello");
3971        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
3972        let result = semantic_inputs_from_packets(&[packet], &[]);
3973        assert!(
3974            result.is_err(),
3975            "expected error on packet/context length mismatch"
3976        );
3977        let err = result.unwrap_err().to_string();
3978        assert!(
3979            err.contains("length mismatch"),
3980            "error should mention length mismatch, got: {err}"
3981        );
3982    }
3983}