Skip to main content

coding_agent_search/indexer/
semantic.rs

1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::io::IsTerminal;
4use std::path::{Path, PathBuf};
5use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8use frankensearch::index::{
9    HNSW_DEFAULT_EF_CONSTRUCTION as FS_HNSW_DEFAULT_EF_CONSTRUCTION,
10    HNSW_DEFAULT_M as FS_HNSW_DEFAULT_M, HnswConfig as FsHnswConfig, HnswIndex as FsHnswIndex,
11    Quantization as FsQuantization, VectorIndex as FsVectorIndex,
12    VectorIndexWriter as FsVectorIndexWriter,
13};
14use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
15use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
16use rayon::prelude::*;
17
18use crate::indexer::memoization::{
19    ContentAddressedMemoCache, MemoCacheAuditRecord, MemoContentHash, MemoKey, MemoLookup,
20};
21use crate::indexer::responsiveness;
22use crate::indexer::semantic_progress::{
23    SemanticProgressEvent, SemanticProgressFields, SemanticProgressSink,
24};
25use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
26use crate::model::types::{Conversation, Message};
27use crate::search::canonicalize::{canonicalize_for_embedding, content_hash};
28use crate::search::embedder::Embedder;
29use crate::search::fastembed_embedder::FastEmbedder;
30use crate::search::hash_embedder::HashEmbedder;
31use crate::search::policy::{CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy};
32use crate::search::semantic_manifest::{
33    ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
34    TierKind,
35};
36use crate::search::tantivy::{
37    normalized_index_origin_host, normalized_index_origin_kind, normalized_index_source_id,
38};
39use crate::search::vector_index::{
40    ROLE_USER, SemanticDocId, VECTOR_INDEX_DIR, role_code_from_str, vector_index_path,
41};
42use crate::storage::sqlite::FrankenStorage;
43
44/// Default embedder batch size. 128 is a sweet spot for ONNX MiniLM models on
45/// modern CPUs: big enough to amortize dispatch overhead and keep the tensor
46/// kernels saturated, small enough that one batch fits comfortably in L2 and
47/// memory reservation stays bounded for large corpora.
48const DEFAULT_SEMANTIC_BATCH_SIZE: usize = 128;
49const DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY: usize = 4_096;
50const DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS: u64 = 30_000;
51const DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS: u64 = 300_000;
52const DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT: usize = 10_000;
53const DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT: u64 = 8 * 1024 * 1024;
54const SEMANTIC_PREP_MEMO_ALGORITHM: &str = "semantic_prepare_window";
55const SEMANTIC_PREP_MEMO_VERSION: &str = "canonicalize_for_embedding:v2:stable-content-hash";
56
57fn resolved_env_usize(key: &str, default: usize) -> usize {
58    dotenvy::var(key)
59        .ok()
60        .and_then(|v| v.parse::<usize>().ok())
61        .unwrap_or(default)
62}
63
64fn resolved_env_u64(key: &str, default: u64) -> u64 {
65    dotenvy::var(key)
66        .ok()
67        .and_then(|v| v.parse::<u64>().ok())
68        .unwrap_or(default)
69}
70
71fn resolved_default_batch_size() -> usize {
72    dotenvy::var("CASS_SEMANTIC_BATCH_SIZE")
73        .ok()
74        .and_then(|v| v.parse::<usize>().ok())
75        .filter(|v| *v > 0)
76        .unwrap_or(DEFAULT_SEMANTIC_BATCH_SIZE)
77}
78
79fn resolved_semantic_prep_memo_capacity() -> usize {
80    dotenvy::var("CASS_SEMANTIC_PREP_MEMO_CAPACITY")
81        .ok()
82        .and_then(|v| v.parse::<usize>().ok())
83        .filter(|v| *v > 0)
84        .unwrap_or(DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY)
85}
86
87fn resolved_semantic_embed_batch_warn_after_ms() -> u64 {
88    resolved_env_u64(
89        "CASS_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS",
90        DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS,
91    )
92}
93
94fn resolved_semantic_embed_batch_fail_after_ms() -> u64 {
95    resolved_env_u64(
96        "CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS",
97        DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS,
98    )
99}
100
101/// Opt in to the rayon-parallel canonicalize+hash prep step. **Default: OFF.**
102///
103/// The parallel path is kept because canonicalize+hash CAN dominate the
104/// embedding wall-clock on pathological inputs (very long messages, costly
105/// Unicode normalization). But criterion baselines captured under
106/// `tests/artifacts/perf/2026-04-21-profile-run/baselines.md` showed a
107/// 1.2×–2.3× **regression** on the hash embedder across every batch size
108/// tested (2 000 messages, mixed markdown/code/unicode): rayon's per-task
109/// scheduling overhead is larger than the per-message canonicalize+hash cost
110/// when the embedder itself is cheap. For the production ONNX (MiniLM)
111/// embedder, per-batch inference already dwarfs prep, so parallel prep never
112/// buys meaningful wall-clock — the prep step is ≤ 1% of total embed time.
113///
114/// Set `CASS_SEMANTIC_PREP_PARALLEL=1` / `true` / `yes` / `on` to opt in.
115fn parallel_prep_enabled() -> bool {
116    env_truthy("CASS_SEMANTIC_PREP_PARALLEL")
117}
118
119fn saturating_u64_from_usize(value: usize) -> u64 {
120    u64::try_from(value).unwrap_or(u64::MAX)
121}
122
123fn saturating_u64_from_millis(value: u128) -> u64 {
124    u64::try_from(value).unwrap_or(u64::MAX)
125}
126
127#[derive(Debug, Clone)]
128pub struct EmbeddingInput {
129    pub message_id: u64,
130    pub created_at_ms: i64,
131    pub agent_id: u32,
132    pub workspace_id: u32,
133    pub source_id: u32,
134    pub role: u8,
135    pub chunk_idx: u8,
136    pub content: String,
137}
138
139impl EmbeddingInput {
140    pub fn new(message_id: u64, content: impl Into<String>) -> Self {
141        Self {
142            message_id,
143            created_at_ms: 0,
144            agent_id: 0,
145            workspace_id: 0,
146            source_id: 0,
147            role: ROLE_USER,
148            chunk_idx: 0,
149            content: content.into(),
150        }
151    }
152}
153
154#[derive(Debug, Clone)]
155pub struct EmbeddedMessage {
156    pub message_id: u64,
157    pub created_at_ms: i64,
158    pub agent_id: u32,
159    pub workspace_id: u32,
160    pub source_id: u32,
161    pub role: u8,
162    pub chunk_idx: u8,
163    pub content_hash: [u8; 32],
164    pub embedding: Vec<f32>,
165}
166
167#[derive(Debug, Clone)]
168pub struct SemanticBackfillBatchPlan {
169    pub tier: TierKind,
170    pub db_fingerprint: String,
171    pub model_revision: String,
172    pub total_conversations: u64,
173    pub conversations_in_batch: u64,
174    pub last_offset: i64,
175    pub cursor_exhausted: bool,
176}
177
178#[derive(Debug, Clone)]
179pub struct SemanticBackfillStoragePlan {
180    pub tier: TierKind,
181    pub db_fingerprint: String,
182    pub model_revision: String,
183    pub max_conversations: usize,
184}
185
186#[derive(Debug, Clone)]
187pub struct SemanticBackfillBatchOutcome {
188    pub tier: TierKind,
189    pub embedder_id: String,
190    pub embedded_docs: u64,
191    pub conversations_processed: u64,
192    pub total_conversations: u64,
193    pub last_offset: i64,
194    pub checkpoint_saved: bool,
195    pub published: bool,
196    pub index_path: PathBuf,
197    pub manifest_path: PathBuf,
198}
199
200impl SemanticBackfillBatchOutcome {
201    pub fn progress_pct(&self) -> f64 {
202        if self.total_conversations == 0 {
203            return if self.published { 100.0 } else { 0.0 };
204        }
205
206        let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
207        let pct = pct.min(100.0);
208        if pct >= 100.0 && !self.published {
209            99.0
210        } else {
211            pct
212        }
213    }
214}
215
216#[derive(Debug, Clone)]
217pub struct SemanticShardBuildPlan {
218    pub tier: TierKind,
219    pub db_fingerprint: String,
220    pub model_revision: String,
221    pub total_conversations: u64,
222    pub max_records_per_shard: usize,
223    pub build_ann: bool,
224}
225
226#[derive(Debug, Clone)]
227pub struct SemanticShardBuildOutcome {
228    pub tier: TierKind,
229    pub embedder_id: String,
230    pub shard_count: u32,
231    pub doc_count: u64,
232    pub total_conversations: u64,
233    pub index_paths: Vec<PathBuf>,
234    pub ann_index_paths: Vec<PathBuf>,
235    pub shard_manifest_path: PathBuf,
236    pub complete: bool,
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
240#[serde(rename_all = "snake_case")]
241pub(crate) enum SemanticBackfillSchedulerState {
242    Running,
243    Paused,
244    Disabled,
245}
246
247impl SemanticBackfillSchedulerState {
248    pub(crate) fn as_str(self) -> &'static str {
249        match self {
250            Self::Running => "running",
251            Self::Paused => "paused",
252            Self::Disabled => "disabled",
253        }
254    }
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
258#[serde(rename_all = "snake_case")]
259pub(crate) enum SemanticBackfillSchedulerReason {
260    IdleBudgetAvailable,
261    OperatorDisabled,
262    PolicyDisabled,
263    ForegroundPressure,
264    LexicalRepairActive,
265    CapacityBelowFloor,
266    ThreadBudgetZero,
267    BatchBudgetZero,
268}
269
270impl SemanticBackfillSchedulerReason {
271    pub(crate) fn next_step(self) -> &'static str {
272        match self {
273            Self::IdleBudgetAvailable => "background semantic backfill is within idle budgets",
274            Self::OperatorDisabled => {
275                "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE"
276            }
277            Self::PolicyDisabled => "semantic policy disables background semantic backfill",
278            Self::ForegroundPressure => {
279                "foreground pressure is present; retry after the idle delay"
280            }
281            Self::LexicalRepairActive => "lexical repair is active; semantic backfill is yielding",
282            Self::CapacityBelowFloor => {
283                "machine responsiveness capacity is below the semantic backfill floor"
284            }
285            Self::ThreadBudgetZero => "semantic backfill thread budget is zero",
286            Self::BatchBudgetZero => "semantic backfill batch budget is zero",
287        }
288    }
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
292pub(crate) struct SemanticBackfillSchedulerSignals {
293    pub foreground_pressure: bool,
294    pub lexical_repair_active: bool,
295    pub force: bool,
296    pub operator_disabled: bool,
297}
298
299impl SemanticBackfillSchedulerSignals {
300    pub(crate) fn from_env() -> Self {
301        Self {
302            foreground_pressure: env_truthy("CASS_SEMANTIC_BACKFILL_FOREGROUND_ACTIVE"),
303            lexical_repair_active: env_truthy("CASS_SEMANTIC_BACKFILL_LEXICAL_REPAIR_ACTIVE"),
304            force: env_truthy("CASS_SEMANTIC_BACKFILL_FORCE"),
305            operator_disabled: env_truthy("CASS_SEMANTIC_BACKFILL_DISABLE"),
306        }
307    }
308}
309
310#[derive(Debug, Clone, serde::Serialize)]
311pub(crate) struct SemanticBackfillSchedulerDecision {
312    pub state: SemanticBackfillSchedulerState,
313    pub reason: SemanticBackfillSchedulerReason,
314    pub requested_batch_conversations: usize,
315    pub scheduled_batch_conversations: usize,
316    pub current_capacity_pct: u32,
317    pub min_capacity_pct: u32,
318    pub max_backfill_threads: usize,
319    pub idle_delay_seconds: u64,
320    pub chunk_timeout_seconds: u64,
321    pub foreground_pressure: bool,
322    pub lexical_repair_active: bool,
323    pub forced: bool,
324    pub next_eligible_after_ms: u64,
325}
326
327impl SemanticBackfillSchedulerDecision {
328    pub(crate) fn should_run(&self) -> bool {
329        matches!(self.state, SemanticBackfillSchedulerState::Running)
330    }
331}
332
333fn env_truthy(key: &str) -> bool {
334    dotenvy::var(key)
335        .ok()
336        .map(|value| {
337            matches!(
338                value.trim().to_ascii_lowercase().as_str(),
339                "1" | "true" | "yes" | "on"
340            )
341        })
342        .unwrap_or(false)
343}
344
345fn env_backfill_min_capacity_pct() -> u32 {
346    dotenvy::var("CASS_SEMANTIC_BACKFILL_MIN_CAPACITY_PCT")
347        .ok()
348        .and_then(|value| value.trim().parse::<u32>().ok())
349        .map(|value| value.clamp(1, 100))
350        .unwrap_or(75)
351}
352
353pub(crate) fn semantic_backfill_scheduler_decision(
354    policy: &SemanticPolicy,
355    requested_batch_conversations: usize,
356    signals: &SemanticBackfillSchedulerSignals,
357) -> SemanticBackfillSchedulerDecision {
358    semantic_backfill_scheduler_decision_for_capacity(
359        policy,
360        requested_batch_conversations,
361        signals,
362        responsiveness::current_capacity_pct(),
363    )
364}
365
366pub(crate) fn semantic_backfill_scheduler_decision_for_capacity(
367    policy: &SemanticPolicy,
368    requested_batch_conversations: usize,
369    signals: &SemanticBackfillSchedulerSignals,
370    current_capacity_pct: u32,
371) -> SemanticBackfillSchedulerDecision {
372    let min_capacity_pct = env_backfill_min_capacity_pct();
373    let paused_delay_ms = policy.idle_delay_seconds.saturating_mul(1000);
374    let mut decision = SemanticBackfillSchedulerDecision {
375        state: SemanticBackfillSchedulerState::Running,
376        reason: SemanticBackfillSchedulerReason::IdleBudgetAvailable,
377        requested_batch_conversations,
378        scheduled_batch_conversations: requested_batch_conversations,
379        current_capacity_pct: current_capacity_pct.clamp(0, 100),
380        min_capacity_pct,
381        max_backfill_threads: policy.max_backfill_threads,
382        idle_delay_seconds: policy.idle_delay_seconds,
383        chunk_timeout_seconds: policy.chunk_timeout_seconds,
384        foreground_pressure: signals.foreground_pressure,
385        lexical_repair_active: signals.lexical_repair_active,
386        forced: signals.force,
387        next_eligible_after_ms: 0,
388    };
389
390    if requested_batch_conversations == 0 {
391        return stopped_scheduler_decision(
392            decision,
393            SemanticBackfillSchedulerState::Disabled,
394            SemanticBackfillSchedulerReason::BatchBudgetZero,
395            paused_delay_ms,
396        );
397    }
398    if policy.max_backfill_threads == 0 && !signals.force {
399        return stopped_scheduler_decision(
400            decision,
401            SemanticBackfillSchedulerState::Disabled,
402            SemanticBackfillSchedulerReason::ThreadBudgetZero,
403            paused_delay_ms,
404        );
405    }
406    if signals.operator_disabled && !signals.force {
407        return stopped_scheduler_decision(
408            decision,
409            SemanticBackfillSchedulerState::Disabled,
410            SemanticBackfillSchedulerReason::OperatorDisabled,
411            paused_delay_ms,
412        );
413    }
414    if !policy.mode.should_build_semantic() && !signals.force {
415        return stopped_scheduler_decision(
416            decision,
417            SemanticBackfillSchedulerState::Disabled,
418            SemanticBackfillSchedulerReason::PolicyDisabled,
419            paused_delay_ms,
420        );
421    }
422    if signals.lexical_repair_active && !signals.force {
423        return stopped_scheduler_decision(
424            decision,
425            SemanticBackfillSchedulerState::Paused,
426            SemanticBackfillSchedulerReason::LexicalRepairActive,
427            paused_delay_ms,
428        );
429    }
430    if signals.foreground_pressure && !signals.force {
431        return stopped_scheduler_decision(
432            decision,
433            SemanticBackfillSchedulerState::Paused,
434            SemanticBackfillSchedulerReason::ForegroundPressure,
435            paused_delay_ms,
436        );
437    }
438    if current_capacity_pct < min_capacity_pct && !signals.force {
439        return stopped_scheduler_decision(
440            decision,
441            SemanticBackfillSchedulerState::Paused,
442            SemanticBackfillSchedulerReason::CapacityBelowFloor,
443            paused_delay_ms,
444        );
445    }
446
447    let capacity = current_capacity_pct.clamp(1, 100) as usize;
448    let scaled = requested_batch_conversations.saturating_mul(capacity) / 100;
449    decision.scheduled_batch_conversations = scaled.max(1).min(requested_batch_conversations);
450    decision
451}
452
453fn stopped_scheduler_decision(
454    mut decision: SemanticBackfillSchedulerDecision,
455    state: SemanticBackfillSchedulerState,
456    reason: SemanticBackfillSchedulerReason,
457    next_eligible_after_ms: u64,
458) -> SemanticBackfillSchedulerDecision {
459    decision.state = state;
460    decision.reason = reason;
461    decision.scheduled_batch_conversations = 0;
462    decision.next_eligible_after_ms = next_eligible_after_ms;
463    decision
464}
465
466fn now_ms() -> i64 {
467    SystemTime::now()
468        .duration_since(UNIX_EPOCH)
469        .map(|duration| duration.as_millis() as i64)
470        .unwrap_or(0)
471}
472
473fn hnsw_index_path(data_dir: &Path, embedder_id: &str) -> PathBuf {
474    data_dir
475        .join(VECTOR_INDEX_DIR)
476        .join(format!("hnsw-{embedder_id}.chsw"))
477}
478
479fn safe_path_component(raw: &str) -> String {
480    raw.chars()
481        .map(|ch| {
482            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
483                ch
484            } else {
485                '_'
486            }
487        })
488        .collect()
489}
490
491fn semantic_staging_index_path(
492    data_dir: &Path,
493    tier: TierKind,
494    embedder_id: &str,
495    db_fingerprint: &str,
496) -> PathBuf {
497    let fingerprint_hash = crc32fast::hash(db_fingerprint.as_bytes());
498    data_dir.join(VECTOR_INDEX_DIR).join(format!(
499        ".staging-{}-{}-{fingerprint_hash:08x}.fsvi",
500        tier.as_str(),
501        safe_path_component(embedder_id)
502    ))
503}
504
505fn semantic_generation_fingerprint_component(db_fingerprint: &str) -> String {
506    blake3::hash(db_fingerprint.as_bytes())
507        .to_hex()
508        .chars()
509        .take(16)
510        .collect()
511}
512
513fn semantic_shard_generation_dir(
514    data_dir: &Path,
515    tier: TierKind,
516    embedder_id: &str,
517    db_fingerprint: &str,
518) -> PathBuf {
519    let fingerprint_hash = semantic_generation_fingerprint_component(db_fingerprint);
520    data_dir.join(VECTOR_INDEX_DIR).join("shards").join(format!(
521        "{}-{}-{fingerprint_hash}",
522        tier.as_str(),
523        safe_path_component(embedder_id),
524    ))
525}
526
527fn semantic_shard_index_path(
528    data_dir: &Path,
529    tier: TierKind,
530    embedder_id: &str,
531    db_fingerprint: &str,
532    shard_index: u32,
533) -> PathBuf {
534    semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
535        .join(format!("shard-{shard_index:05}.fsvi"))
536}
537
538fn semantic_shard_ann_index_path(
539    data_dir: &Path,
540    tier: TierKind,
541    embedder_id: &str,
542    db_fingerprint: &str,
543    shard_index: u32,
544) -> PathBuf {
545    semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
546        .join(format!("shard-{shard_index:05}.chsw"))
547}
548
549#[cfg(not(windows))]
550fn sync_parent_directory(path: &Path) -> Result<()> {
551    let Some(parent) = path.parent() else {
552        return Ok(());
553    };
554    let directory = fs::File::open(parent)
555        .with_context(|| format!("opening parent directory {}", parent.display()))?;
556    directory
557        .sync_all()
558        .with_context(|| format!("syncing parent directory {}", parent.display()))
559}
560
561#[cfg(windows)]
562fn sync_parent_directory(_path: &Path) -> Result<()> {
563    Ok(())
564}
565
566fn semantic_doc_id_for_embedded(embedded: &EmbeddedMessage) -> String {
567    SemanticDocId {
568        message_id: embedded.message_id,
569        chunk_idx: embedded.chunk_idx,
570        agent_id: embedded.agent_id,
571        workspace_id: embedded.workspace_id,
572        source_id: embedded.source_id,
573        role: embedded.role,
574        created_at_ms: embedded.created_at_ms,
575        content_hash: Some(embedded.content_hash),
576    }
577    .to_doc_id_string()
578}
579
580struct CanonicalEmbeddingConversationRow {
581    conversation_id: i64,
582    agent_slug: String,
583    agent_id: i64,
584    workspace: Option<PathBuf>,
585    workspace_id: Option<i64>,
586    external_id: Option<String>,
587    title: Option<String>,
588    source_path: PathBuf,
589    started_at: Option<i64>,
590    ended_at: Option<i64>,
591    source_id: Option<String>,
592    origin_host: Option<String>,
593}
594
595struct CanonicalEmbeddingBatch {
596    inputs: Vec<EmbeddingInput>,
597    conversations_in_batch: u64,
598    last_conversation_id: i64,
599    total_conversations: u64,
600    cursor_exhausted: bool,
601}
602
603#[derive(Debug, Clone, Copy, PartialEq, Eq)]
604struct SemanticCheckpointCaps {
605    max_messages: usize,
606    max_bytes: u64,
607}
608
609impl SemanticCheckpointCaps {
610    fn unlimited() -> Self {
611        Self {
612            max_messages: 0,
613            max_bytes: 0,
614        }
615    }
616
617    fn from_env() -> Self {
618        Self {
619            max_messages: resolved_env_usize(
620                "CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT",
621                DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT,
622            ),
623            max_bytes: resolved_env_u64(
624                "CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT",
625                DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT,
626            ),
627        }
628    }
629
630    fn message_limited(self) -> bool {
631        self.max_messages > 0
632    }
633
634    fn byte_limited(self) -> bool {
635        self.max_bytes > 0
636    }
637}
638
639pub(crate) struct CanonicalIncrementalEmbeddingBatch {
640    pub inputs: Vec<EmbeddingInput>,
641    pub conversations_in_batch: u64,
642    pub raw_max_message_id: Option<i64>,
643}
644
645fn total_semantic_conversations(storage: &FrankenStorage) -> Result<u64> {
646    let hinted_fallback_sql = "SELECT c.id
647             FROM conversations c
648             WHERE EXISTS (
649                 SELECT 1
650                 FROM messages INDEXED BY sqlite_autoindex_messages_1
651                 WHERE conversation_id = c.id
652                 LIMIT 1
653             )";
654    let fallback_sql = "SELECT c.id
655             FROM conversations c
656             WHERE EXISTS (
657                 SELECT 1
658                 FROM messages
659                 WHERE conversation_id = c.id
660                 LIMIT 1
661             )";
662    let count: i64 = storage
663        .raw()
664        .query_row_map(
665            "SELECT COUNT(*) FROM conversations WHERE message_count > 0",
666            &[] as &[ParamValue],
667            |row| row.get_typed(0),
668        )
669        .or_else(|err| {
670            if !err.to_string().contains("no such column: message_count") {
671                return Err(err);
672            }
673            let conversation_ids: Vec<i64> = storage
674                .raw()
675                .query_map_collect(hinted_fallback_sql, &[] as &[ParamValue], |row| {
676                    row.get_typed(0)
677                })
678                .or_else(|err| {
679                    if err
680                        .to_string()
681                        .contains("no such index: sqlite_autoindex_messages_1")
682                    {
683                        return storage.raw().query_map_collect(
684                            fallback_sql,
685                            &[] as &[ParamValue],
686                            |row| row.get_typed(0),
687                        );
688                    }
689                    Err(err)
690                })?;
691            Ok(i64::try_from(conversation_ids.len()).unwrap_or(i64::MAX))
692        })
693        .with_context(|| "counting canonical conversations with semantic messages")?;
694    Ok(u64::try_from(count.max(0)).unwrap_or(u64::MAX))
695}
696
697pub(crate) fn message_id_from_db(raw: i64) -> Option<u64> {
698    u64::try_from(raw).ok()
699}
700
701pub(crate) fn saturating_u32_from_i64(raw: i64) -> u32 {
702    match u32::try_from(raw) {
703        Ok(value) => value,
704        Err(_) if raw.is_negative() => 0,
705        Err(_) => u32::MAX,
706    }
707}
708
709fn canonical_embedding_created_at_ms(message_id: u64, created_at: Option<i64>) -> i64 {
710    // `created_at_ms` feeds time-range filters in the vector index
711    // (src/search/vector_index.rs range predicates) and contributes to
712    // `stable_hit_hash`. Defaulting a NULL created_at to 0 silently
713    // masquerades the message as Unix-epoch (1970), which is indistinguishable
714    // from a legitimately-ancient row in downstream filters. Emit a warn
715    // so operators see NULL-created_at rows in the logs instead of only
716    // finding them by puzzling over 1970 timestamps in semantic hits.
717    created_at.unwrap_or_else(|| {
718        tracing::warn!(
719            message_id,
720            "semantic backfill: row has NULL created_at; defaulting to 0 (Unix epoch). \
721             Downstream time-range filters will treat this message as 1970-01-01."
722        );
723        0
724    })
725}
726
727fn canonical_embedding_packet_provenance(
728    row: &CanonicalEmbeddingConversationRow,
729) -> ConversationPacketProvenance {
730    let source_id =
731        normalized_index_source_id(row.source_id.as_deref(), None, row.origin_host.as_deref());
732    ConversationPacketProvenance {
733        origin_kind: normalized_index_origin_kind(&source_id, None),
734        origin_host: normalized_index_origin_host(row.origin_host.as_deref()),
735        source_id,
736    }
737}
738
739fn canonical_embedding_conversation(
740    row: &CanonicalEmbeddingConversationRow,
741    provenance: &ConversationPacketProvenance,
742    messages: Vec<Message>,
743) -> Conversation {
744    Conversation {
745        id: Some(row.conversation_id),
746        agent_slug: row.agent_slug.clone(),
747        workspace: row.workspace.clone(),
748        external_id: row.external_id.clone(),
749        title: row.title.clone(),
750        source_path: row.source_path.clone(),
751        started_at: row.started_at,
752        ended_at: row.ended_at,
753        approx_tokens: None,
754        metadata_json: serde_json::Value::Null,
755        messages,
756        source_id: provenance.source_id.clone(),
757        origin_host: provenance.origin_host.clone(),
758    }
759}
760
761fn embedding_input_from_packet_message(
762    conversation_id: i64,
763    agent_id: u32,
764    workspace_id: u32,
765    source_id_hash: u32,
766    message: &crate::model::conversation_packet::ConversationPacketMessage,
767) -> Option<EmbeddingInput> {
768    let Some(raw_message_id) = message.message_id else {
769        tracing::warn!(
770            conversation_id,
771            message_idx = message.idx,
772            "skipping semantic backfill message without canonical id in ConversationPacket replay"
773        );
774        return None;
775    };
776    let Some(message_id) = message_id_from_db(raw_message_id) else {
777        tracing::warn!(
778            conversation_id,
779            raw_message_id,
780            "skipping out-of-range id during semantic backfill"
781        );
782        return None;
783    };
784    Some(EmbeddingInput {
785        message_id,
786        created_at_ms: canonical_embedding_created_at_ms(message_id, message.created_at),
787        agent_id,
788        workspace_id,
789        source_id: source_id_hash,
790        role: role_code_from_str(&message.role).unwrap_or(ROLE_USER),
791        chunk_idx: 0,
792        content: message.content.clone(),
793    })
794}
795
796fn embedding_inputs_from_conversation_packet(
797    row: &CanonicalEmbeddingConversationRow,
798    packet: &ConversationPacket,
799) -> Vec<EmbeddingInput> {
800    let agent_id = saturating_u32_from_i64(row.agent_id);
801    let workspace_id = saturating_u32_from_i64(row.workspace_id.unwrap_or(0));
802    let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
803    packet
804        .projections
805        .semantic
806        .message_indices
807        .iter()
808        .filter_map(|message_index| {
809            packet
810                .payload
811                .messages
812                .get(*message_index)
813                .and_then(|message| {
814                    embedding_input_from_packet_message(
815                        row.conversation_id,
816                        agent_id,
817                        workspace_id,
818                        source_id_hash,
819                        message,
820                    )
821                })
822        })
823        .collect()
824}
825
826fn fetch_canonical_embedding_conversations(
827    storage: &FrankenStorage,
828    conversation_ids: &[i64],
829) -> Result<Vec<CanonicalEmbeddingConversationRow>> {
830    let mut envelope_sql = String::from(
831        "SELECT c.id,
832                COALESCE(a.slug, 'unknown'),
833                COALESCE(c.agent_id, 0),
834                c.workspace_id,
835                w.path,
836                c.external_id,
837                c.title,
838                c.source_path,
839                c.started_at,
840                c.ended_at,
841                c.source_id,
842                c.origin_host
843         FROM conversations c
844         LEFT JOIN agents a ON a.id = c.agent_id
845         LEFT JOIN workspaces w ON w.id = c.workspace_id
846         WHERE c.id IN (",
847    );
848    let mut params = Vec::with_capacity(conversation_ids.len());
849    for (idx, conversation_id) in conversation_ids.iter().enumerate() {
850        if idx > 0 {
851            envelope_sql.push_str(", ");
852        }
853        envelope_sql.push_str(&format!("?{}", idx + 1));
854        params.push(ParamValue::from(*conversation_id));
855    }
856    envelope_sql.push_str(") ORDER BY c.id ASC");
857
858    storage
859        .raw()
860        .query_map_collect(&envelope_sql, &params, |row| {
861            let workspace_path: Option<String> = row.get_typed(4)?;
862            Ok(CanonicalEmbeddingConversationRow {
863                conversation_id: row.get_typed(0)?,
864                agent_slug: row.get_typed(1)?,
865                agent_id: row.get_typed(2)?,
866                workspace_id: row.get_typed(3)?,
867                workspace: workspace_path.map(PathBuf::from),
868                external_id: row.get_typed(5)?,
869                title: row.get_typed(6)?,
870                source_path: PathBuf::from(row.get_typed::<String>(7)?),
871                started_at: row.get_typed(8)?,
872                ended_at: row.get_typed(9)?,
873                source_id: row.get_typed(10)?,
874                origin_host: row.get_typed(11)?,
875            })
876        })
877        .with_context(|| {
878            format!(
879                "fetching semantic backfill conversation envelopes for {} conversations",
880                conversation_ids.len()
881            )
882        })
883}
884
885/// Per-packet semantic context that supplies the database-internal
886/// agent / workspace ids the canonical embedding row carries but the
887/// `ConversationPacket` does not (those ids are storage-internal,
888/// not part of the packet contract).
889///
890/// `coding_agent_session_search-ibuuh.32` (sink #3): when a caller
891/// already holds packets (rebuild pipeline, salvage replay, repair
892/// flows, etc.) it can pair them with their canonical
893/// agent_id/workspace_id and drive the semantic preparation consumer
894/// without a second storage round-trip.
895#[allow(dead_code)]
896#[derive(Debug, Clone, Copy)]
897pub(crate) struct SemanticPacketContext {
898    pub conversation_id: i64,
899    pub agent_id: u32,
900    pub workspace_id: u32,
901}
902
903/// Packet-driven counterpart to
904/// [`packet_embedding_inputs_from_storage`]: derives the same
905/// `EmbeddingInput` list a fresh storage replay would produce, but
906/// without re-querying canonical conversation rows.
907///
908/// Invariants:
909/// - The `i`th element of `contexts` describes the `i`th packet.
910/// - Returns `Err` if the lengths disagree, so a callsite cannot
911///   silently mis-correlate packets and contexts.
912/// - `source_id_hash` is derived from `packet.payload.provenance.source_id`
913///   the same way `embedding_inputs_from_conversation_packet` derives
914///   it from the canonical row, so the produced `EmbeddingInput.source_id`
915///   matches both paths byte-for-byte.
916///
917/// The `semantic_inputs_from_packets_matches_storage_replay`
918/// equivalence test pins every produced `EmbeddingInput` field is
919/// identical to what the legacy storage-side replay returns for the
920/// same canonical corpus, so callers that already hold packets can
921/// switch to this helper without changing semantic-index output.
922#[allow(dead_code)]
923pub(crate) fn semantic_inputs_from_packets(
924    packets: &[ConversationPacket],
925    contexts: &[SemanticPacketContext],
926) -> Result<Vec<EmbeddingInput>> {
927    if packets.len() != contexts.len() {
928        anyhow::bail!(
929            "semantic_inputs_from_packets length mismatch: {} packets vs {} contexts",
930            packets.len(),
931            contexts.len()
932        );
933    }
934    let mut inputs = Vec::new();
935    for (packet, context) in packets.iter().zip(contexts.iter()) {
936        let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
937        for &message_index in &packet.projections.semantic.message_indices {
938            let Some(message) = packet.payload.messages.get(message_index) else {
939                anyhow::bail!(
940                    "packet semantic projection references missing message index {} \
941                     (packet has {} messages)",
942                    message_index,
943                    packet.payload.messages.len()
944                );
945            };
946            if let Some(input) = embedding_input_from_packet_message(
947                context.conversation_id,
948                context.agent_id,
949                context.workspace_id,
950                source_id_hash,
951                message,
952            ) {
953                inputs.push(input);
954            }
955        }
956    }
957    tracing::debug!(
958        packets = packets.len(),
959        packet_driven = true,
960        semantic_inputs = inputs.len(),
961        "built semantic inputs from in-memory ConversationPacket batch"
962    );
963    Ok(inputs)
964}
965
966fn fetch_canonical_embedding_batch(
967    storage: &FrankenStorage,
968    after_conversation_id: i64,
969    max_conversations: usize,
970) -> Result<CanonicalEmbeddingBatch> {
971    fetch_canonical_embedding_batch_inner(storage, after_conversation_id, max_conversations, None)
972}
973
974/// Variant of [`fetch_canonical_embedding_batch`] that additionally
975/// filters out canonical messages with `message_id <= after_message_id`
976/// when set. This is how sub-fix 2 (`last_message_id` cursor) enforces
977/// the "resume MUST advance past `last_message_id`" rule on a partially
978/// embedded conversation.
979fn fetch_canonical_embedding_batch_inner(
980    storage: &FrankenStorage,
981    after_conversation_id: i64,
982    max_conversations: usize,
983    after_message_id: Option<i64>,
984) -> Result<CanonicalEmbeddingBatch> {
985    fetch_canonical_embedding_batch_inner_with_caps(
986        storage,
987        after_conversation_id,
988        max_conversations,
989        after_message_id,
990        SemanticCheckpointCaps::unlimited(),
991    )
992}
993
994fn fetch_canonical_embedding_batch_inner_with_caps(
995    storage: &FrankenStorage,
996    after_conversation_id: i64,
997    max_conversations: usize,
998    after_message_id: Option<i64>,
999    caps: SemanticCheckpointCaps,
1000) -> Result<CanonicalEmbeddingBatch> {
1001    let total_conversations = total_semantic_conversations(storage)?;
1002    let max_conversations = max_conversations.max(1);
1003    let query_limit = max_conversations.saturating_add(1);
1004    let query_limit_i64 = i64::try_from(query_limit).unwrap_or(i64::MAX);
1005    let mut params = vec![
1006        ParamValue::from(after_conversation_id),
1007        ParamValue::from(query_limit_i64),
1008    ];
1009    let message_cursor_predicate = if let Some(after_message_id) = after_message_id {
1010        params.push(ParamValue::from(after_message_id));
1011        " AND id > ?3"
1012    } else {
1013        ""
1014    };
1015    let hinted_sql = format!(
1016        "SELECT c.id
1017         FROM conversations c
1018         WHERE c.id > ?1
1019           AND EXISTS (
1020               SELECT 1
1021               FROM messages INDEXED BY sqlite_autoindex_messages_1
1022               WHERE conversation_id = c.id{message_cursor_predicate}
1023               LIMIT 1
1024           )
1025         ORDER BY c.id ASC
1026         LIMIT ?2"
1027    );
1028    let fallback_sql = format!(
1029        "SELECT c.id
1030         FROM conversations c
1031         WHERE c.id > ?1
1032           AND EXISTS (
1033               SELECT 1
1034               FROM messages
1035               WHERE conversation_id = c.id{message_cursor_predicate}
1036               LIMIT 1
1037           )
1038         ORDER BY c.id ASC
1039         LIMIT ?2"
1040    );
1041    let mut conversation_ids: Vec<i64> = storage
1042        .raw()
1043        .query_map_collect(&hinted_sql, &params, |row| row.get_typed(0))
1044        .or_else(|err| {
1045            if err
1046                .to_string()
1047                .contains("no such index: sqlite_autoindex_messages_1")
1048            {
1049                return storage
1050                    .raw()
1051                    .query_map_collect(&fallback_sql, &params, |row| row.get_typed(0));
1052            }
1053            Err(err)
1054        })
1055        .with_context(|| {
1056            format!("fetching semantic backfill conversation ids after {after_conversation_id}")
1057        })?;
1058
1059    if conversation_ids.is_empty() {
1060        return Ok(CanonicalEmbeddingBatch {
1061            inputs: Vec::new(),
1062            conversations_in_batch: 0,
1063            last_conversation_id: after_conversation_id,
1064            total_conversations,
1065            cursor_exhausted: true,
1066        });
1067    }
1068
1069    let has_more_from_sql_limit = conversation_ids.len() > max_conversations;
1070    if has_more_from_sql_limit {
1071        conversation_ids.truncate(max_conversations);
1072    }
1073
1074    let conversations = fetch_canonical_embedding_conversations(storage, &conversation_ids)?;
1075
1076    let mut grouped_messages =
1077        storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
1078    let CheckpointCappedSelection {
1079        conversations,
1080        last_conversation_id,
1081        stopped_before_candidate,
1082    } = select_checkpoint_capped_conversations(
1083        conversations,
1084        &mut grouped_messages,
1085        after_message_id,
1086        caps,
1087    );
1088    let (inputs, _) = packet_embedding_inputs_from_materialized_canonical_messages(
1089        &conversations,
1090        &mut grouped_messages,
1091        |_| true,
1092    );
1093
1094    let conversations_in_batch = u64::try_from(conversations.len()).unwrap_or(u64::MAX);
1095    let cursor_exhausted = !has_more_from_sql_limit && !stopped_before_candidate;
1096    tracing::debug!(
1097        conversations_in_batch,
1098        cursor_exhausted,
1099        packet_driven = true,
1100        semantic_inputs = inputs.len(),
1101        max_messages_per_checkpoint = caps.max_messages,
1102        max_bytes_per_checkpoint = caps.max_bytes,
1103        ?after_message_id,
1104        "built semantic backfill batch from ConversationPacket canonical replay"
1105    );
1106
1107    Ok(CanonicalEmbeddingBatch {
1108        inputs,
1109        conversations_in_batch,
1110        last_conversation_id: last_conversation_id.unwrap_or(after_conversation_id),
1111        total_conversations,
1112        cursor_exhausted,
1113    })
1114}
1115
1116struct CheckpointCappedSelection {
1117    conversations: Vec<CanonicalEmbeddingConversationRow>,
1118    last_conversation_id: Option<i64>,
1119    stopped_before_candidate: bool,
1120}
1121
1122fn select_checkpoint_capped_conversations(
1123    conversations: Vec<CanonicalEmbeddingConversationRow>,
1124    grouped_messages: &mut HashMap<i64, Vec<Message>>,
1125    after_message_id: Option<i64>,
1126    caps: SemanticCheckpointCaps,
1127) -> CheckpointCappedSelection {
1128    let mut selected = Vec::new();
1129    let mut selected_messages = 0usize;
1130    let mut selected_bytes = 0u64;
1131    let mut last_conversation_id = None;
1132    let mut stopped_before_candidate = false;
1133
1134    for conversation in conversations {
1135        let mut messages = grouped_messages
1136            .remove(&conversation.conversation_id)
1137            .unwrap_or_default();
1138        if let Some(min_exclusive) = after_message_id {
1139            messages.retain(|message| message.id.is_some_and(|id| id > min_exclusive));
1140        }
1141        if messages.is_empty() {
1142            continue;
1143        }
1144
1145        let message_count = messages.len();
1146        let byte_count = messages
1147            .iter()
1148            .map(|message| saturating_u64_from_usize(message.content.len()))
1149            .fold(0u64, u64::saturating_add);
1150        let would_exceed_messages = caps.message_limited()
1151            && !selected.is_empty()
1152            && selected_messages.saturating_add(message_count) > caps.max_messages;
1153        let would_exceed_bytes = caps.byte_limited()
1154            && !selected.is_empty()
1155            && selected_bytes.saturating_add(byte_count) > caps.max_bytes;
1156        if would_exceed_messages || would_exceed_bytes {
1157            stopped_before_candidate = true;
1158            tracing::debug!(
1159                conversation_id = conversation.conversation_id,
1160                selected_conversations = selected.len(),
1161                selected_messages,
1162                selected_bytes,
1163                candidate_messages = message_count,
1164                candidate_bytes = byte_count,
1165                max_messages_per_checkpoint = caps.max_messages,
1166                max_bytes_per_checkpoint = caps.max_bytes,
1167                "semantic checkpoint cap stopped this batch before the next full conversation"
1168            );
1169            break;
1170        }
1171
1172        selected_messages = selected_messages.saturating_add(message_count);
1173        selected_bytes = selected_bytes.saturating_add(byte_count);
1174        last_conversation_id = Some(conversation.conversation_id);
1175        grouped_messages.insert(conversation.conversation_id, messages);
1176        selected.push(conversation);
1177    }
1178
1179    CheckpointCappedSelection {
1180        conversations: selected,
1181        last_conversation_id,
1182        stopped_before_candidate,
1183    }
1184}
1185
1186pub(crate) fn packet_embedding_inputs_from_storage(
1187    storage: &FrankenStorage,
1188) -> Result<Vec<EmbeddingInput>> {
1189    Ok(fetch_canonical_embedding_batch(storage, 0, usize::MAX)?.inputs)
1190}
1191
1192fn packet_embedding_inputs_from_selected_canonical_messages<F>(
1193    storage: &FrankenStorage,
1194    conversation_ids: &[i64],
1195    include_message: F,
1196) -> Result<(Vec<EmbeddingInput>, Option<i64>)>
1197where
1198    F: FnMut(&Message) -> bool,
1199{
1200    if conversation_ids.is_empty() {
1201        return Ok((Vec::new(), None));
1202    }
1203
1204    let conversations = fetch_canonical_embedding_conversations(storage, conversation_ids)?;
1205    let mut grouped_messages =
1206        storage.fetch_messages_for_lexical_rebuild_batch(conversation_ids, None, None)?;
1207    Ok(
1208        packet_embedding_inputs_from_materialized_canonical_messages(
1209            &conversations,
1210            &mut grouped_messages,
1211            include_message,
1212        ),
1213    )
1214}
1215
1216fn packet_embedding_inputs_from_materialized_canonical_messages<F>(
1217    conversations: &[CanonicalEmbeddingConversationRow],
1218    grouped_messages: &mut HashMap<i64, Vec<Message>>,
1219    mut include_message: F,
1220) -> (Vec<EmbeddingInput>, Option<i64>)
1221where
1222    F: FnMut(&Message) -> bool,
1223{
1224    let mut inputs = Vec::new();
1225    let mut raw_max_message_id: Option<i64> = None;
1226
1227    for conversation in conversations {
1228        let mut messages = grouped_messages
1229            .remove(&conversation.conversation_id)
1230            .unwrap_or_default();
1231        messages.retain(|message| {
1232            let keep = include_message(message);
1233            if keep && let Some(message_id) = message.id {
1234                raw_max_message_id =
1235                    Some(raw_max_message_id.map_or(message_id, |current| current.max(message_id)));
1236            }
1237            keep
1238        });
1239        if messages.is_empty() {
1240            continue;
1241        }
1242
1243        let provenance = canonical_embedding_packet_provenance(conversation);
1244        let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
1245        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
1246        inputs.extend(embedding_inputs_from_conversation_packet(
1247            conversation,
1248            &packet,
1249        ));
1250    }
1251
1252    (inputs, raw_max_message_id)
1253}
1254
1255pub(crate) fn packet_embedding_inputs_from_storage_since(
1256    storage: &FrankenStorage,
1257    since_message_id: i64,
1258) -> Result<CanonicalIncrementalEmbeddingBatch> {
1259    let conversation_ids: Vec<i64> = storage
1260        .raw()
1261        .query_map_collect(
1262            "SELECT DISTINCT m.conversation_id
1263             FROM messages m
1264             WHERE m.id > ?1
1265             ORDER BY m.conversation_id ASC",
1266            &[ParamValue::from(since_message_id)],
1267            |row| row.get_typed(0),
1268        )
1269        .with_context(|| {
1270            format!(
1271                "fetching canonical semantic catch-up conversation ids after message {since_message_id}"
1272            )
1273        })?;
1274
1275    if conversation_ids.is_empty() {
1276        return Ok(CanonicalIncrementalEmbeddingBatch {
1277            inputs: Vec::new(),
1278            conversations_in_batch: 0,
1279            raw_max_message_id: None,
1280        });
1281    }
1282
1283    let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1284        storage,
1285        &conversation_ids,
1286        |message| message.id.is_some_and(|id| id > since_message_id),
1287    )?;
1288
1289    let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
1290    tracing::debug!(
1291        since_message_id,
1292        conversations_in_batch,
1293        packet_driven = true,
1294        semantic_inputs = inputs.len(),
1295        "built semantic catch-up batch from ConversationPacket canonical replay"
1296    );
1297
1298    Ok(CanonicalIncrementalEmbeddingBatch {
1299        inputs,
1300        conversations_in_batch,
1301        raw_max_message_id,
1302    })
1303}
1304
1305pub(crate) fn packet_embedding_inputs_from_storage_for_message_ids(
1306    storage: &FrankenStorage,
1307    conversation_ids: &[i64],
1308    message_ids: &HashSet<i64>,
1309) -> Result<Vec<EmbeddingInput>> {
1310    if conversation_ids.is_empty() || message_ids.is_empty() {
1311        return Ok(Vec::new());
1312    }
1313
1314    let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1315        storage,
1316        conversation_ids,
1317        |message| message.id.is_some_and(|id| message_ids.contains(&id)),
1318    )?;
1319    tracing::debug!(
1320        conversations_in_batch = conversation_ids.len(),
1321        selected_message_ids = message_ids.len(),
1322        semantic_inputs = inputs.len(),
1323        raw_max_message_id,
1324        packet_driven = true,
1325        "built selected semantic batch from ConversationPacket canonical replay"
1326    );
1327
1328    Ok(inputs)
1329}
1330
1331struct Prepared<'a> {
1332    msg: &'a EmbeddingInput,
1333    canonical: String,
1334    hash: [u8; 32],
1335}
1336
1337#[derive(Debug, Clone, PartialEq, Eq)]
1338struct MemoizedPreparedMessage {
1339    canonical: String,
1340    hash: [u8; 32],
1341}
1342
1343fn semantic_prep_memo_key(content: &str) -> MemoKey {
1344    MemoKey::new(
1345        MemoContentHash::from_bytes(content_hash(content).to_vec()),
1346        SEMANTIC_PREP_MEMO_ALGORITHM,
1347        SEMANTIC_PREP_MEMO_VERSION,
1348    )
1349}
1350
1351fn memo_counter_delta(after: u64, before: u64) -> u64 {
1352    after.saturating_sub(before)
1353}
1354
1355fn trace_semantic_prep_memo_window(
1356    window_index: usize,
1357    window_len: usize,
1358    prepared_len: usize,
1359    entry_capacity: usize,
1360    before: &crate::indexer::memoization::MemoCacheStats,
1361    after: &crate::indexer::memoization::MemoCacheStats,
1362) {
1363    tracing::trace!(
1364        algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1365        algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1366        window_index,
1367        window_len,
1368        prepared_messages = prepared_len,
1369        skipped_messages = window_len.saturating_sub(prepared_len),
1370        hit_delta = memo_counter_delta(after.hits, before.hits),
1371        miss_delta = memo_counter_delta(after.misses, before.misses),
1372        insert_delta = memo_counter_delta(after.inserts, before.inserts),
1373        evictions_capacity_delta =
1374            memo_counter_delta(after.evictions_capacity, before.evictions_capacity),
1375        quarantined_delta = memo_counter_delta(after.quarantined, before.quarantined),
1376        live_entries = after.live_entries,
1377        entry_capacity,
1378        "semantic prep memo cache window"
1379    );
1380}
1381
1382fn trace_semantic_prep_memo_audit(audit: &MemoCacheAuditRecord) {
1383    tracing::trace!(?audit, "semantic prep memo cache audit");
1384}
1385
1386fn prepare_window_with_memo<'a>(
1387    window: &'a [EmbeddingInput],
1388    cache: &mut ContentAddressedMemoCache<MemoizedPreparedMessage>,
1389) -> Vec<Prepared<'a>> {
1390    window
1391        .iter()
1392        .filter_map(|msg| {
1393            let key = semantic_prep_memo_key(&msg.content);
1394            let (lookup, lookup_audit) = cache.get_with_audit(&key);
1395            trace_semantic_prep_memo_audit(&lookup_audit);
1396            match lookup {
1397                MemoLookup::Hit { value } => Some(Prepared {
1398                    msg,
1399                    canonical: value.canonical,
1400                    hash: value.hash,
1401                }),
1402                MemoLookup::Miss | MemoLookup::Quarantined { .. } => {
1403                    let canonical = canonicalize_for_embedding(&msg.content);
1404                    if canonical.is_empty() {
1405                        return None;
1406                    }
1407                    let hash = content_hash(&canonical);
1408                    let insert_audit = cache.insert_with_audit(
1409                        key,
1410                        MemoizedPreparedMessage {
1411                            canonical: canonical.clone(),
1412                            hash,
1413                        },
1414                    );
1415                    trace_semantic_prep_memo_audit(&insert_audit);
1416                    Some(Prepared {
1417                        msg,
1418                        canonical,
1419                        hash,
1420                    })
1421                }
1422            }
1423        })
1424        .collect()
1425}
1426
1427/// Canonicalize + hash a window of messages. Default is serial; opt in to
1428/// the rayon-parallel path via `CASS_SEMANTIC_PREP_PARALLEL=1` (see the
1429/// `parallel_prep_enabled` docstring for why it is not the default).
1430/// Parallel results preserve input order via `par_iter().filter_map().collect()`.
1431/// Messages whose canonical form is empty are filtered out so the embedder
1432/// batch is never polluted with useless inputs.
1433fn prepare_window<'a>(window: &'a [EmbeddingInput], serial: bool) -> Vec<Prepared<'a>> {
1434    let prep = |msg: &'a EmbeddingInput| -> Option<Prepared<'a>> {
1435        let canonical = canonicalize_for_embedding(&msg.content);
1436        if canonical.is_empty() {
1437            return None;
1438        }
1439        let hash = content_hash(&canonical);
1440        Some(Prepared {
1441            msg,
1442            canonical,
1443            hash,
1444        })
1445    };
1446
1447    if serial {
1448        window.iter().filter_map(prep).collect()
1449    } else {
1450        window.par_iter().filter_map(prep).collect()
1451    }
1452}
1453
1454fn flush_prepared_batch(
1455    batch: &[Prepared<'_>],
1456    embeddings: &mut Vec<EmbeddedMessage>,
1457    pb: &ProgressBar,
1458    embedder: &dyn Embedder,
1459) -> Result<()> {
1460    if batch.is_empty() {
1461        return Ok(());
1462    }
1463
1464    let texts: Vec<&str> = batch.iter().map(|p| p.canonical.as_str()).collect();
1465    let vectors = embedder
1466        .embed_batch_sync(&texts)
1467        .map_err(|e| anyhow::anyhow!("embedding failed: {e}"))?;
1468
1469    if vectors.len() != batch.len() {
1470        bail!(
1471            "embedder returned {} embeddings for {} inputs",
1472            vectors.len(),
1473            batch.len()
1474        );
1475    }
1476
1477    for (prepared, vector) in batch.iter().zip(vectors) {
1478        if vector.len() != embedder.dimension() {
1479            bail!(
1480                "embedding dimension mismatch: expected {}, got {}",
1481                embedder.dimension(),
1482                vector.len()
1483            );
1484        }
1485        embeddings.push(EmbeddedMessage {
1486            message_id: prepared.msg.message_id,
1487            created_at_ms: prepared.msg.created_at_ms,
1488            agent_id: prepared.msg.agent_id,
1489            workspace_id: prepared.msg.workspace_id,
1490            source_id: prepared.msg.source_id,
1491            role: prepared.msg.role,
1492            chunk_idx: prepared.msg.chunk_idx,
1493            content_hash: prepared.hash,
1494            embedding: vector,
1495        });
1496    }
1497
1498    pb.inc(saturating_u64_from_usize(batch.len()));
1499    Ok(())
1500}
1501
1502pub struct SemanticIndexer {
1503    embedder: Box<dyn Embedder>,
1504    batch_size: usize,
1505}
1506
1507impl SemanticIndexer {
1508    pub fn new(embedder_type: &str, data_dir: Option<&Path>) -> Result<Self> {
1509        let embedder: Box<dyn Embedder> = match embedder_type {
1510            "fastembed" | "minilm" | "snowflake-arctic-s" | "nomic-embed" => {
1511                let dir = data_dir
1512                    .ok_or_else(|| anyhow::anyhow!("data_dir required for fastembed embedder"))?;
1513                let embedder_name = if embedder_type == "fastembed" {
1514                    "minilm"
1515                } else {
1516                    embedder_type
1517                };
1518                Box::new(
1519                    FastEmbedder::load_by_name(dir, embedder_name)
1520                        .map_err(|e| anyhow::anyhow!("fastembed unavailable: {e}"))?,
1521                )
1522            }
1523            "hash" => Box::new(HashEmbedder::default()),
1524            other => bail!("unknown embedder: {other}"),
1525        };
1526
1527        Ok(Self {
1528            embedder,
1529            batch_size: resolved_default_batch_size(),
1530        })
1531    }
1532
1533    pub fn with_batch_size(mut self, batch_size: usize) -> Result<Self> {
1534        if batch_size == 0 {
1535            bail!("batch_size must be > 0");
1536        }
1537        self.batch_size = batch_size;
1538        Ok(self)
1539    }
1540
1541    pub fn batch_size(&self) -> usize {
1542        self.batch_size
1543    }
1544
1545    pub fn embedder_id(&self) -> &str {
1546        self.embedder.id()
1547    }
1548
1549    pub fn embedder_dimension(&self) -> usize {
1550        self.embedder.dimension()
1551    }
1552
1553    pub fn embed_messages(&self, messages: &[EmbeddingInput]) -> Result<Vec<EmbeddedMessage>> {
1554        self.embed_messages_with_sink(messages, &SemanticProgressSink::disabled())
1555    }
1556
1557    /// Variant of [`embed_messages`] that emits `embed_batch_*` events
1558    /// into the given JSONL sink. The sink is silent unless
1559    /// `CASS_SEMANTIC_PROGRESS_JSONL` is set, so this path is safe to
1560    /// take in production.
1561    pub fn embed_messages_with_sink(
1562        &self,
1563        messages: &[EmbeddingInput],
1564        sink: &SemanticProgressSink,
1565    ) -> Result<Vec<EmbeddedMessage>> {
1566        if messages.is_empty() {
1567            return Ok(Vec::new());
1568        }
1569
1570        let show_progress = std::io::stderr().is_terminal();
1571        let pb = ProgressBar::new(saturating_u64_from_usize(messages.len()));
1572        if show_progress {
1573            let style = ProgressStyle::default_bar()
1574                .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} messages embedded")
1575                .unwrap_or_else(|_| ProgressStyle::default_bar());
1576            pb.set_style(style);
1577        } else {
1578            pb.set_draw_target(ProgressDrawTarget::hidden());
1579        }
1580
1581        let mut embeddings = Vec::with_capacity(messages.len());
1582
1583        // Process the corpus in windows of ~4 batches. Within each window,
1584        // rayon parallelizes the canonicalize + hash prep across cores; the
1585        // ONNX embedder is then fed serially in `batch_size` chunks so its
1586        // internal thread pool stays saturated without being starved by the
1587        // single-threaded prep loop we had before. `with_batch_size` and
1588        // `resolved_default_batch_size` both guarantee `batch_size >= 1`,
1589        // so saturating_mul(4) is always >= batch_size — no further clamp.
1590        let window = self.batch_size.saturating_mul(4);
1591        let serial_prep = !parallel_prep_enabled();
1592        let prep_memo_capacity = resolved_semantic_prep_memo_capacity();
1593        let mut prep_memo =
1594            serial_prep.then(|| ContentAddressedMemoCache::with_capacity(prep_memo_capacity));
1595        let mut batch_index: u64 = 0;
1596        let mut rows_processed: u64 = 0;
1597        let rows_total = u64::try_from(messages.len()).ok();
1598        let warn_after_ms = resolved_semantic_embed_batch_warn_after_ms();
1599        let fail_after_ms = resolved_semantic_embed_batch_fail_after_ms();
1600        for (window_index, window_slice) in messages.chunks(window).enumerate() {
1601            let prepared_window = match prep_memo.as_mut() {
1602                Some(cache) => {
1603                    let stats_before = cache.stats().clone();
1604                    let prepared_window = prepare_window_with_memo(window_slice, cache);
1605                    trace_semantic_prep_memo_window(
1606                        window_index,
1607                        window_slice.len(),
1608                        prepared_window.len(),
1609                        prep_memo_capacity,
1610                        &stats_before,
1611                        cache.stats(),
1612                    );
1613                    prepared_window
1614                }
1615                None => prepare_window(window_slice, false),
1616            };
1617            let skipped_in_window = window_slice.len() - prepared_window.len();
1618            if skipped_in_window > 0 {
1619                pb.inc(saturating_u64_from_usize(skipped_in_window));
1620            }
1621
1622            for batch in prepared_window.chunks(self.batch_size) {
1623                let batch_rows = u64::try_from(batch.len()).unwrap_or(u64::MAX);
1624                // Sum the canonicalized byte count so an operator can
1625                // distinguish a stalled inference from a stalled query —
1626                // a tiny `bytes` value paired with a long batch wall-time
1627                // points at the model; a huge `bytes` paired with a short
1628                // wall-time points at the storage side.
1629                let batch_bytes: u64 = batch
1630                    .iter()
1631                    .map(|p| saturating_u64_from_usize(p.canonical.len()))
1632                    .sum();
1633                if sink.is_active() {
1634                    sink.emit(
1635                        SemanticProgressEvent::EmbedBatchStart,
1636                        SemanticProgressFields {
1637                            batch_index: Some(batch_index),
1638                            batch_rows: Some(batch_rows),
1639                            rows_processed: Some(rows_processed),
1640                            rows_total,
1641                            bytes: Some(batch_bytes),
1642                            ..Default::default()
1643                        },
1644                    );
1645                }
1646                let batch_started = Instant::now();
1647                flush_prepared_batch(batch, &mut embeddings, &pb, self.embedder.as_ref())?;
1648                let elapsed_ms = saturating_u64_from_millis(batch_started.elapsed().as_millis());
1649                rows_processed = rows_processed.saturating_add(batch_rows);
1650                if warn_after_ms > 0 && elapsed_ms > warn_after_ms {
1651                    tracing::warn!(
1652                        batch_index,
1653                        elapsed_ms,
1654                        warn_after_ms,
1655                        batch_rows,
1656                        batch_bytes,
1657                        embedder = self.embedder_id(),
1658                        "semantic embed batch exceeded watchdog warning threshold"
1659                    );
1660                }
1661                if fail_after_ms > 0 && elapsed_ms > fail_after_ms {
1662                    bail!(
1663                        "semantic embed batch {batch_index} took {elapsed_ms}ms, exceeding \
1664                         CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS={fail_after_ms}"
1665                    );
1666                }
1667                if sink.is_active() {
1668                    sink.emit(
1669                        SemanticProgressEvent::EmbedBatchDone,
1670                        SemanticProgressFields {
1671                            batch_index: Some(batch_index),
1672                            batch_rows: Some(batch_rows),
1673                            rows_processed: Some(rows_processed),
1674                            rows_total,
1675                            bytes: Some(batch_bytes),
1676                            ..Default::default()
1677                        },
1678                    );
1679                }
1680                batch_index = batch_index.saturating_add(1);
1681            }
1682        }
1683
1684        if let Some(cache) = prep_memo.as_ref() {
1685            let stats = cache.stats();
1686            tracing::debug!(
1687                algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1688                algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1689                hits = stats.hits,
1690                misses = stats.misses,
1691                inserts = stats.inserts,
1692                quarantined = stats.quarantined,
1693                live_entries = stats.live_entries,
1694                entry_capacity = prep_memo_capacity,
1695                "semantic prep memo cache summary"
1696            );
1697        }
1698
1699        pb.finish_with_message("Embedding complete");
1700        Ok(embeddings)
1701    }
1702
1703    pub fn build_and_save_index<I>(
1704        &self,
1705        embedded_messages: I,
1706        data_dir: &Path,
1707    ) -> Result<FsVectorIndex>
1708    where
1709        I: IntoIterator<Item = EmbeddedMessage>,
1710    {
1711        let index_path = vector_index_path(data_dir, self.embedder_id());
1712        self.build_and_save_index_at_path(embedded_messages, &index_path)
1713    }
1714
1715    pub fn build_and_save_index_shards<I>(
1716        &self,
1717        embedded_messages: I,
1718        data_dir: &Path,
1719        plan: SemanticShardBuildPlan,
1720    ) -> Result<SemanticShardBuildOutcome>
1721    where
1722        I: IntoIterator<Item = EmbeddedMessage>,
1723    {
1724        if plan.db_fingerprint.trim().is_empty() {
1725            bail!("semantic shard build requires a non-empty DB fingerprint");
1726        }
1727        if plan.max_records_per_shard == 0 {
1728            bail!("semantic shard build requires max_records_per_shard > 0");
1729        }
1730
1731        let mut shard_records = Vec::new();
1732        let mut index_paths = Vec::new();
1733        let mut ann_index_paths = Vec::new();
1734        let mut current_records = Vec::with_capacity(plan.max_records_per_shard);
1735        let mut shard_index = 0u32;
1736        let mut total_docs = 0u64;
1737
1738        for embedded in embedded_messages {
1739            current_records.push(embedded);
1740            if current_records.len() >= plan.max_records_per_shard {
1741                let records = std::mem::take(&mut current_records);
1742                let (record, path, ann_path) =
1743                    self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1744                total_docs = total_docs.saturating_add(record.doc_count);
1745                shard_records.push(record);
1746                index_paths.push(path);
1747                if let Some(path) = ann_path {
1748                    ann_index_paths.push(path);
1749                }
1750                shard_index = shard_index
1751                    .checked_add(1)
1752                    .context("semantic shard index overflow")?;
1753            }
1754        }
1755
1756        if !current_records.is_empty() {
1757            let records = std::mem::take(&mut current_records);
1758            let (record, path, ann_path) =
1759                self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1760            total_docs = total_docs.saturating_add(record.doc_count);
1761            shard_records.push(record);
1762            index_paths.push(path);
1763            if let Some(path) = ann_path {
1764                ann_index_paths.push(path);
1765            }
1766        }
1767
1768        let shard_count = u32::try_from(shard_records.len())
1769            .context("semantic shard generation exceeded u32 shard count")?;
1770        for record in &mut shard_records {
1771            record.shard_count = shard_count;
1772        }
1773
1774        let mut shard_manifest = SemanticShardManifest::load_or_default(data_dir)
1775            .map_err(|err| anyhow::anyhow!("loading semantic shard manifest for publish: {err}"))?;
1776        shard_manifest.replace_shards_for_generation(
1777            plan.tier,
1778            self.embedder_id(),
1779            &plan.db_fingerprint,
1780            shard_records,
1781        );
1782        shard_manifest
1783            .save(data_dir)
1784            .map_err(|err| anyhow::anyhow!("saving semantic shard manifest: {err}"))?;
1785        let summary = shard_manifest.summary(plan.tier, self.embedder_id(), &plan.db_fingerprint);
1786
1787        tracing::info!(
1788            tier = plan.tier.as_str(),
1789            embedder = self.embedder_id(),
1790            shard_count,
1791            doc_count = total_docs,
1792            total_conversations = plan.total_conversations,
1793            "published semantic shard generation sidecar"
1794        );
1795
1796        Ok(SemanticShardBuildOutcome {
1797            tier: plan.tier,
1798            embedder_id: self.embedder_id().to_string(),
1799            shard_count,
1800            doc_count: total_docs,
1801            total_conversations: plan.total_conversations,
1802            index_paths,
1803            ann_index_paths,
1804            shard_manifest_path: SemanticShardManifest::path(data_dir),
1805            complete: summary.complete,
1806        })
1807    }
1808
1809    fn write_semantic_shard(
1810        &self,
1811        embedded_messages: Vec<EmbeddedMessage>,
1812        data_dir: &Path,
1813        plan: &SemanticShardBuildPlan,
1814        shard_index: u32,
1815    ) -> Result<(SemanticShardRecord, PathBuf, Option<PathBuf>)> {
1816        let started_at_ms = now_ms();
1817        let shard_path = semantic_shard_index_path(
1818            data_dir,
1819            plan.tier,
1820            self.embedder_id(),
1821            &plan.db_fingerprint,
1822            shard_index,
1823        );
1824        let shard_index_file = self.build_and_save_index_at_path(embedded_messages, &shard_path)?;
1825        let size_bytes = fs::metadata(&shard_path)
1826            .with_context(|| format!("stat semantic shard {}", shard_path.display()))?
1827            .len();
1828        let (ann_index_path, ann_size_bytes, ann_ready, ann_absolute_path) = if plan.build_ann {
1829            let ann_path = semantic_shard_ann_index_path(
1830                data_dir,
1831                plan.tier,
1832                self.embedder_id(),
1833                &plan.db_fingerprint,
1834                shard_index,
1835            );
1836            let config = FsHnswConfig {
1837                m: FS_HNSW_DEFAULT_M,
1838                ef_construction: FS_HNSW_DEFAULT_EF_CONSTRUCTION,
1839                ..FsHnswConfig::default()
1840            };
1841            let hnsw = FsHnswIndex::build_from_vector_index(&shard_index_file, config)
1842                .map_err(|err| anyhow::anyhow!("build shard HNSW index failed: {err}"))?;
1843            hnsw.save(&ann_path)
1844                .map_err(|err| anyhow::anyhow!("save shard HNSW index failed: {err}"))?;
1845            let ann_size_bytes = fs::metadata(&ann_path)
1846                .with_context(|| format!("stat semantic shard ANN {}", ann_path.display()))?
1847                .len();
1848            let relative_ann_path = ann_path
1849                .strip_prefix(data_dir)
1850                .unwrap_or(ann_path.as_path())
1851                .to_string_lossy()
1852                .to_string();
1853            (
1854                Some(relative_ann_path),
1855                ann_size_bytes,
1856                true,
1857                Some(ann_path),
1858            )
1859        } else {
1860            (None, 0, false, None)
1861        };
1862        let relative_index_path = shard_path
1863            .strip_prefix(data_dir)
1864            .unwrap_or(shard_path.as_path())
1865            .to_string_lossy()
1866            .to_string();
1867        let record = SemanticShardRecord {
1868            tier: plan.tier,
1869            embedder_id: self.embedder_id().to_string(),
1870            model_revision: plan.model_revision.clone(),
1871            schema_version: SEMANTIC_SCHEMA_VERSION,
1872            chunking_version: CHUNKING_STRATEGY_VERSION,
1873            dimension: self.embedder_dimension(),
1874            shard_index,
1875            shard_count: 0,
1876            doc_count: u64::try_from(shard_index_file.record_count()).unwrap_or(u64::MAX),
1877            total_conversations: plan.total_conversations,
1878            db_fingerprint: plan.db_fingerprint.clone(),
1879            index_path: relative_index_path,
1880            quantization: "f16".to_string(),
1881            mmap_ready: true,
1882            ann_index_path,
1883            ann_size_bytes,
1884            ann_ready,
1885            size_bytes,
1886            started_at_ms,
1887            completed_at_ms: now_ms(),
1888            ready: true,
1889        };
1890        Ok((record, shard_path, ann_absolute_path))
1891    }
1892
1893    fn build_and_save_index_at_path<I>(
1894        &self,
1895        embedded_messages: I,
1896        index_path: &Path,
1897    ) -> Result<FsVectorIndex>
1898    where
1899        I: IntoIterator<Item = EmbeddedMessage>,
1900    {
1901        if let Some(parent) = index_path.parent() {
1902            std::fs::create_dir_all(parent)?;
1903        }
1904
1905        // Store as f16 by default (smaller, faster I/O). Embeddings are validated by the writer.
1906        let mut writer: FsVectorIndexWriter = FsVectorIndex::create_with_revision(
1907            index_path,
1908            self.embedder_id(),
1909            "1.0",
1910            self.embedder_dimension(),
1911            FsQuantization::F16,
1912        )
1913        .map_err(|err| anyhow::anyhow!("create fsvi index failed: {err}"))?;
1914
1915        let write_result: Result<()> = (|| {
1916            for embedded in embedded_messages {
1917                if embedded.embedding.len() != self.embedder_dimension() {
1918                    bail!(
1919                        "embedding dimension mismatch: expected {}, got {}",
1920                        self.embedder_dimension(),
1921                        embedded.embedding.len()
1922                    );
1923                }
1924                let doc_id = semantic_doc_id_for_embedded(&embedded);
1925                writer
1926                    .write_record(&doc_id, &embedded.embedding)
1927                    .map_err(|err| anyhow::anyhow!("write fsvi record failed: {err}"))?;
1928            }
1929            Ok(())
1930        })();
1931
1932        if let Err(e) = &write_result {
1933            // Clean up partial index file to prevent corruption
1934            tracing::warn!("removing partial vector index after write failure: {e}");
1935            if let Err(rm_err) = std::fs::remove_file(index_path) {
1936                tracing::error!(
1937                    "failed to remove partial index file {}: {rm_err}",
1938                    index_path.display()
1939                );
1940            }
1941            return Err(anyhow::anyhow!("{e}"));
1942        }
1943
1944        writer
1945            .finish()
1946            .map_err(|err| anyhow::anyhow!("finish fsvi index failed: {err}"))?;
1947
1948        FsVectorIndex::open(index_path)
1949            .map_err(|err| anyhow::anyhow!("open fsvi index failed: {err}"))
1950    }
1951
1952    /// Append new embeddings to an existing FSVI index via the WAL.
1953    ///
1954    /// Used for incremental semantic indexing in watch mode. Opens the
1955    /// existing index, appends a batch of new embeddings, and compacts if
1956    /// the WAL has grown large enough.
1957    ///
1958    /// Returns the number of entries appended.
1959    pub fn append_to_index(
1960        &self,
1961        embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1962        data_dir: &Path,
1963    ) -> Result<usize> {
1964        let index_path = vector_index_path(data_dir, self.embedder_id());
1965        self.append_to_index_path(embedded_messages, &index_path)
1966    }
1967
1968    fn append_to_index_path(
1969        &self,
1970        embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1971        index_path: &Path,
1972    ) -> Result<usize> {
1973        let mut index = FsVectorIndex::open(index_path)
1974            .map_err(|err| anyhow::anyhow!("open fsvi index for append: {err}"))?;
1975
1976        let entries: Vec<(String, Vec<f32>)> = embedded_messages
1977            .into_iter()
1978            .map(|em| {
1979                let doc_id = semantic_doc_id_for_embedded(&em);
1980                (doc_id, em.embedding)
1981            })
1982            .collect();
1983
1984        let count = entries.len();
1985        if count == 0 {
1986            return Ok(0);
1987        }
1988
1989        index
1990            .append_batch(&entries)
1991            .map_err(|err| anyhow::anyhow!("append_batch: {err}"))?;
1992
1993        if index.needs_compaction() {
1994            index
1995                .compact()
1996                .map_err(|err| anyhow::anyhow!("compaction: {err}"))?;
1997        }
1998
1999        Ok(count)
2000    }
2001
2002    fn write_backfill_staging_index(
2003        &self,
2004        embedded_messages: Vec<EmbeddedMessage>,
2005        staging_path: &Path,
2006        resume_existing: bool,
2007    ) -> Result<FsVectorIndex> {
2008        if resume_existing && staging_path.exists() {
2009            self.append_to_index_path(embedded_messages, staging_path)?;
2010            FsVectorIndex::open(staging_path)
2011                .map_err(|err| anyhow::anyhow!("open staged semantic index failed: {err}"))
2012        } else {
2013            self.build_and_save_index_at_path(embedded_messages, staging_path)
2014        }
2015    }
2016
2017    pub fn run_backfill_batch(
2018        &self,
2019        messages: &[EmbeddingInput],
2020        data_dir: &Path,
2021        manifest: &mut SemanticManifest,
2022        plan: SemanticBackfillBatchPlan,
2023    ) -> Result<SemanticBackfillBatchOutcome> {
2024        self.run_backfill_batch_with_sink(
2025            messages,
2026            data_dir,
2027            manifest,
2028            plan,
2029            None,
2030            &SemanticProgressSink::disabled(),
2031        )
2032    }
2033
2034    /// Variant of [`run_backfill_batch`] that emits semantic progress
2035    /// events to the given JSONL sink and persists `last_message_id`
2036    /// into the resumable checkpoint when supplied. The sink is silent
2037    /// unless `CASS_SEMANTIC_PROGRESS_JSONL` is set, so this path is
2038    /// safe to take in production.
2039    pub fn run_backfill_batch_with_sink(
2040        &self,
2041        messages: &[EmbeddingInput],
2042        data_dir: &Path,
2043        manifest: &mut SemanticManifest,
2044        plan: SemanticBackfillBatchPlan,
2045        last_message_id: Option<i64>,
2046        sink: &SemanticProgressSink,
2047    ) -> Result<SemanticBackfillBatchOutcome> {
2048        if plan.db_fingerprint.trim().is_empty() {
2049            bail!("semantic backfill requires a non-empty DB fingerprint");
2050        }
2051        if plan.total_conversations == 0 && plan.conversations_in_batch > 0 {
2052            bail!("semantic backfill batch cannot process conversations when total is zero");
2053        }
2054
2055        let manifest_path = SemanticManifest::path(data_dir);
2056        let staging_path = semantic_staging_index_path(
2057            data_dir,
2058            plan.tier,
2059            self.embedder_id(),
2060            &plan.db_fingerprint,
2061        );
2062        let final_path = vector_index_path(data_dir, self.embedder_id());
2063
2064        let prior_checkpoint = manifest
2065            .checkpoint
2066            .as_ref()
2067            .filter(|checkpoint| {
2068                checkpoint.tier == plan.tier
2069                    && checkpoint.embedder_id == self.embedder_id()
2070                    && checkpoint.is_valid(&plan.db_fingerprint)
2071            })
2072            .cloned();
2073        let prior_conversations = prior_checkpoint
2074            .as_ref()
2075            .map_or(0, |checkpoint| checkpoint.conversations_processed);
2076        let prior_docs = prior_checkpoint
2077            .as_ref()
2078            .map_or(0, |checkpoint| checkpoint.docs_embedded);
2079
2080        let embeddings = self.embed_messages_with_sink(messages, sink)?;
2081        let embedded_docs = u64::try_from(embeddings.len()).unwrap_or(u64::MAX);
2082        if sink.is_active() {
2083            sink.emit(
2084                SemanticProgressEvent::StagingWriteStart,
2085                SemanticProgressFields {
2086                    batch_rows: Some(embedded_docs),
2087                    note: Some(staging_path.display().to_string()),
2088                    ..Default::default()
2089                },
2090            );
2091        }
2092        let mut staged_index = self.write_backfill_staging_index(
2093            embeddings,
2094            &staging_path,
2095            prior_checkpoint.is_some(),
2096        )?;
2097        if sink.is_active() {
2098            sink.emit(
2099                SemanticProgressEvent::StagingWriteDone,
2100                SemanticProgressFields {
2101                    batch_rows: Some(embedded_docs),
2102                    note: Some(staging_path.display().to_string()),
2103                    ..Default::default()
2104                },
2105            );
2106        }
2107        let counted_conversations_processed =
2108            prior_conversations.saturating_add(plan.conversations_in_batch);
2109        let conversations_processed = if plan.cursor_exhausted {
2110            plan.total_conversations
2111        } else {
2112            counted_conversations_processed.min(plan.total_conversations)
2113        };
2114        let complete = plan.cursor_exhausted;
2115
2116        manifest.refresh_backlog(plan.total_conversations, &plan.db_fingerprint);
2117
2118        if complete {
2119            let db_fingerprint = plan.db_fingerprint.clone();
2120            if staged_index.wal_record_count() > 0 {
2121                staged_index.compact().map_err(|err| {
2122                    anyhow::anyhow!("compact staged semantic index failed: {err}")
2123                })?;
2124            }
2125            drop(staged_index);
2126            if sink.is_active() {
2127                sink.emit(
2128                    SemanticProgressEvent::PublishStart,
2129                    SemanticProgressFields {
2130                        rows_processed: Some(conversations_processed),
2131                        rows_total: Some(plan.total_conversations),
2132                        last_conversation_id: Some(plan.last_offset),
2133                        last_message_id,
2134                        note: Some(final_path.display().to_string()),
2135                        ..Default::default()
2136                    },
2137                );
2138            }
2139            fs::rename(&staging_path, &final_path).with_context(|| {
2140                format!(
2141                    "publishing staged semantic index {} to {}",
2142                    staging_path.display(),
2143                    final_path.display()
2144                )
2145            })?;
2146            sync_parent_directory(&final_path)?;
2147            let published_index = FsVectorIndex::open(&final_path)
2148                .map_err(|err| anyhow::anyhow!("open published semantic index failed: {err}"))?;
2149            let size_bytes = fs::metadata(&final_path)
2150                .with_context(|| format!("stat published semantic index {}", final_path.display()))?
2151                .len();
2152            let relative_index_path = final_path
2153                .strip_prefix(data_dir)
2154                .unwrap_or(final_path.as_path())
2155                .to_string_lossy()
2156                .to_string();
2157            manifest.publish_artifact(ArtifactRecord {
2158                tier: plan.tier,
2159                embedder_id: self.embedder_id().to_string(),
2160                model_revision: plan.model_revision,
2161                schema_version: SEMANTIC_SCHEMA_VERSION,
2162                chunking_version: CHUNKING_STRATEGY_VERSION,
2163                dimension: self.embedder_dimension(),
2164                doc_count: u64::try_from(published_index.record_count()).unwrap_or(u64::MAX),
2165                conversation_count: conversations_processed,
2166                db_fingerprint: plan.db_fingerprint,
2167                index_path: relative_index_path,
2168                size_bytes,
2169                started_at_ms: prior_checkpoint
2170                    .as_ref()
2171                    .map_or_else(now_ms, |checkpoint| checkpoint.saved_at_ms),
2172                completed_at_ms: now_ms(),
2173                ready: true,
2174            });
2175            manifest.refresh_backlog(plan.total_conversations, &db_fingerprint);
2176            manifest.save(data_dir)?;
2177            if sink.is_active() {
2178                sink.emit(
2179                    SemanticProgressEvent::PublishDone,
2180                    SemanticProgressFields {
2181                        rows_processed: Some(conversations_processed),
2182                        rows_total: Some(plan.total_conversations),
2183                        last_conversation_id: Some(plan.last_offset),
2184                        last_message_id,
2185                        note: Some(final_path.display().to_string()),
2186                        ..Default::default()
2187                    },
2188                );
2189            }
2190        } else {
2191            let docs_embedded_on_disk =
2192                u64::try_from(staged_index.record_count()).unwrap_or(u64::MAX);
2193            let checkpoint_docs = prior_docs
2194                .saturating_add(embedded_docs)
2195                .max(docs_embedded_on_disk);
2196            if sink.is_active() {
2197                sink.emit(
2198                    SemanticProgressEvent::CheckpointSaveStart,
2199                    SemanticProgressFields {
2200                        rows_processed: Some(conversations_processed),
2201                        rows_total: Some(plan.total_conversations),
2202                        last_conversation_id: Some(plan.last_offset),
2203                        last_message_id,
2204                        ..Default::default()
2205                    },
2206                );
2207            }
2208            // Preserve any existing `last_message_id` cursor when the
2209            // caller did not supply a fresher one — see sub-fix 2 for
2210            // why durable message-PK resume matters.
2211            let prior_last_message_id = prior_checkpoint
2212                .as_ref()
2213                .and_then(|checkpoint| checkpoint.last_message_id);
2214            manifest.save_checkpoint(BuildCheckpoint {
2215                tier: plan.tier,
2216                embedder_id: self.embedder_id().to_string(),
2217                last_offset: plan.last_offset,
2218                docs_embedded: checkpoint_docs,
2219                conversations_processed,
2220                total_conversations: plan.total_conversations,
2221                db_fingerprint: plan.db_fingerprint,
2222                schema_version: SEMANTIC_SCHEMA_VERSION,
2223                chunking_version: CHUNKING_STRATEGY_VERSION,
2224                saved_at_ms: now_ms(),
2225                last_message_id: last_message_id.or(prior_last_message_id),
2226                cursor_exhausted: plan.cursor_exhausted,
2227            });
2228            manifest.save(data_dir)?;
2229            if sink.is_active() {
2230                sink.emit(
2231                    SemanticProgressEvent::CheckpointSaveDone,
2232                    SemanticProgressFields {
2233                        rows_processed: Some(conversations_processed),
2234                        rows_total: Some(plan.total_conversations),
2235                        last_conversation_id: Some(plan.last_offset),
2236                        last_message_id: last_message_id.or(prior_last_message_id),
2237                        ..Default::default()
2238                    },
2239                );
2240            }
2241        }
2242
2243        Ok(SemanticBackfillBatchOutcome {
2244            tier: plan.tier,
2245            embedder_id: self.embedder_id().to_string(),
2246            embedded_docs,
2247            conversations_processed,
2248            total_conversations: plan.total_conversations,
2249            last_offset: plan.last_offset,
2250            checkpoint_saved: !complete,
2251            published: complete,
2252            index_path: if complete { final_path } else { staging_path },
2253            manifest_path,
2254        })
2255    }
2256
2257    pub fn run_backfill_from_storage(
2258        &self,
2259        storage: &FrankenStorage,
2260        data_dir: &Path,
2261        manifest: &mut SemanticManifest,
2262        plan: SemanticBackfillStoragePlan,
2263    ) -> Result<SemanticBackfillBatchOutcome> {
2264        self.run_backfill_from_storage_with_sink(
2265            storage,
2266            data_dir,
2267            manifest,
2268            plan,
2269            &SemanticProgressSink::disabled(),
2270        )
2271    }
2272
2273    /// Variant of [`run_backfill_from_storage`] that emits semantic
2274    /// progress events to a JSONL sink and persists `last_message_id`
2275    /// in the resumable checkpoint. The sink is silent unless
2276    /// `CASS_SEMANTIC_PROGRESS_JSONL` is set.
2277    pub fn run_backfill_from_storage_with_sink(
2278        &self,
2279        storage: &FrankenStorage,
2280        data_dir: &Path,
2281        manifest: &mut SemanticManifest,
2282        plan: SemanticBackfillStoragePlan,
2283        sink: &SemanticProgressSink,
2284    ) -> Result<SemanticBackfillBatchOutcome> {
2285        self.run_backfill_from_storage_with_caps_and_sink(
2286            storage,
2287            data_dir,
2288            manifest,
2289            plan,
2290            SemanticCheckpointCaps::unlimited(),
2291            sink,
2292        )
2293    }
2294
2295    /// Variant of [`run_backfill_from_storage_with_sink`] for CLI backfill
2296    /// runs. It applies operator checkpoint caps from
2297    /// `CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT` and
2298    /// `CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT` while keeping each selected
2299    /// conversation whole, so message-cursor resume cannot strand the tail of
2300    /// a partially selected conversation.
2301    pub fn run_capped_backfill_from_storage_with_sink(
2302        &self,
2303        storage: &FrankenStorage,
2304        data_dir: &Path,
2305        manifest: &mut SemanticManifest,
2306        plan: SemanticBackfillStoragePlan,
2307        sink: &SemanticProgressSink,
2308    ) -> Result<SemanticBackfillBatchOutcome> {
2309        self.run_backfill_from_storage_with_caps_and_sink(
2310            storage,
2311            data_dir,
2312            manifest,
2313            plan,
2314            SemanticCheckpointCaps::from_env(),
2315            sink,
2316        )
2317    }
2318
2319    fn run_backfill_from_storage_with_caps_and_sink(
2320        &self,
2321        storage: &FrankenStorage,
2322        data_dir: &Path,
2323        manifest: &mut SemanticManifest,
2324        plan: SemanticBackfillStoragePlan,
2325        caps: SemanticCheckpointCaps,
2326        sink: &SemanticProgressSink,
2327    ) -> Result<SemanticBackfillBatchOutcome> {
2328        let prior_checkpoint = manifest.checkpoint.as_ref().filter(|checkpoint| {
2329            checkpoint.tier == plan.tier
2330                && checkpoint.embedder_id == self.embedder_id()
2331                && checkpoint.is_valid(&plan.db_fingerprint)
2332        });
2333        let after_conversation_id = prior_checkpoint.map_or(0, |checkpoint| checkpoint.last_offset);
2334        let prior_last_message_id =
2335            prior_checkpoint.and_then(|checkpoint| checkpoint.last_message_id);
2336
2337        if sink.is_active() {
2338            sink.emit(
2339                SemanticProgressEvent::SelectionStart,
2340                SemanticProgressFields {
2341                    last_conversation_id: Some(after_conversation_id),
2342                    last_message_id: prior_last_message_id,
2343                    rows_total: Some(saturating_u64_from_usize(plan.max_conversations)),
2344                    note: Some(plan.tier.as_str().to_string()),
2345                    ..Default::default()
2346                },
2347            );
2348        }
2349
2350        let batch = match fetch_canonical_embedding_batch_inner_with_caps(
2351            storage,
2352            after_conversation_id,
2353            plan.max_conversations,
2354            prior_last_message_id,
2355            caps,
2356        ) {
2357            Ok(batch) => batch,
2358            Err(err) => {
2359                if sink.is_active() {
2360                    sink.emit(
2361                        SemanticProgressEvent::Error,
2362                        SemanticProgressFields {
2363                            error: Some(format!("selection: {err}")),
2364                            last_conversation_id: Some(after_conversation_id),
2365                            last_message_id: prior_last_message_id,
2366                            ..Default::default()
2367                        },
2368                    );
2369                }
2370                return Err(err);
2371            }
2372        };
2373
2374        if sink.is_active() {
2375            sink.emit(
2376                SemanticProgressEvent::SelectionDone,
2377                SemanticProgressFields {
2378                    last_conversation_id: Some(batch.last_conversation_id),
2379                    last_message_id: prior_last_message_id,
2380                    conversations_in_batch: Some(batch.conversations_in_batch),
2381                    rows_total: Some(batch.total_conversations),
2382                    ..Default::default()
2383                },
2384            );
2385            // PacketReplay {start,progress,done} bracket the
2386            // envelope/messages/packet build done by
2387            // `fetch_canonical_embedding_batch`. We can't easily plumb
2388            // a callback inside that helper without refactoring it, so
2389            // the start/done bracket here straddles the work that
2390            // already happened (replay always finishes before we see
2391            // the result). A future refactor — flagged in the
2392            // SQL-shape follow-up — can move the packet-replay work
2393            // into a streaming iterator and emit `progress` ticks
2394            // per conversation. For now, the bracket still gives
2395            // operators a clear "we got past replay" signal.
2396            sink.emit(
2397                SemanticProgressEvent::PacketReplayStart,
2398                SemanticProgressFields {
2399                    conversations_in_batch: Some(batch.conversations_in_batch),
2400                    ..Default::default()
2401                },
2402            );
2403            sink.emit(
2404                SemanticProgressEvent::PacketReplayProgress,
2405                SemanticProgressFields {
2406                    conversations_in_batch: Some(batch.conversations_in_batch),
2407                    rows_processed: Some(saturating_u64_from_usize(batch.inputs.len())),
2408                    bytes: Some(
2409                        batch
2410                            .inputs
2411                            .iter()
2412                            .map(|i| saturating_u64_from_usize(i.content.len()))
2413                            .sum(),
2414                    ),
2415                    ..Default::default()
2416                },
2417            );
2418            sink.emit(
2419                SemanticProgressEvent::PacketReplayDone,
2420                SemanticProgressFields {
2421                    conversations_in_batch: Some(batch.conversations_in_batch),
2422                    rows_processed: Some(saturating_u64_from_usize(batch.inputs.len())),
2423                    ..Default::default()
2424                },
2425            );
2426        }
2427
2428        // Compute the freshest `last_message_id` from the inputs we are
2429        // about to embed. EmbeddingInput.message_id is u64 (canonical
2430        // message PK); we coerce to i64 since the manifest stores i64.
2431        let batch_last_message_id = batch
2432            .inputs
2433            .iter()
2434            .map(|input| i64::try_from(input.message_id).unwrap_or(i64::MAX))
2435            .max();
2436        let next_last_message_id = match (prior_last_message_id, batch_last_message_id) {
2437            (Some(prior), Some(batch_max)) => Some(prior.max(batch_max)),
2438            (Some(prior), None) => Some(prior),
2439            (None, Some(batch_max)) => Some(batch_max),
2440            (None, None) => None,
2441        };
2442
2443        let outcome = self.run_backfill_batch_with_sink(
2444            &batch.inputs,
2445            data_dir,
2446            manifest,
2447            SemanticBackfillBatchPlan {
2448                tier: plan.tier,
2449                db_fingerprint: plan.db_fingerprint,
2450                model_revision: plan.model_revision,
2451                total_conversations: batch.total_conversations,
2452                conversations_in_batch: batch.conversations_in_batch,
2453                last_offset: batch.last_conversation_id,
2454                cursor_exhausted: batch.cursor_exhausted,
2455            },
2456            next_last_message_id,
2457            sink,
2458        );
2459
2460        match &outcome {
2461            Ok(o) => {
2462                if sink.is_active() {
2463                    sink.emit(
2464                        SemanticProgressEvent::Complete,
2465                        SemanticProgressFields {
2466                            rows_processed: Some(o.conversations_processed),
2467                            rows_total: Some(o.total_conversations),
2468                            last_conversation_id: Some(o.last_offset),
2469                            last_message_id: next_last_message_id,
2470                            ..Default::default()
2471                        },
2472                    );
2473                }
2474            }
2475            Err(err) => {
2476                if sink.is_active() {
2477                    sink.emit(
2478                        SemanticProgressEvent::Error,
2479                        SemanticProgressFields {
2480                            error: Some(err.to_string()),
2481                            last_conversation_id: Some(batch.last_conversation_id),
2482                            last_message_id: next_last_message_id,
2483                            ..Default::default()
2484                        },
2485                    );
2486                }
2487            }
2488        }
2489
2490        outcome
2491    }
2492
2493    /// Build and save an HNSW index for approximate nearest neighbor search.
2494    ///
2495    /// This creates an HNSW graph structure from the existing VectorIndex,
2496    /// enabling O(log n) approximate search with the `--approximate` flag.
2497    ///
2498    /// # Arguments
2499    /// * `vector_index` - The VectorIndex to build HNSW from
2500    /// * `data_dir` - Directory to save the HNSW index
2501    /// * `m` - Max connections per node (default: 16)
2502    /// * `ef_construction` - Search width during build (default: 200)
2503    ///
2504    /// # Returns
2505    /// Path to the saved HNSW index file
2506    pub fn build_hnsw_index(
2507        &self,
2508        vector_index: &FsVectorIndex,
2509        data_dir: &Path,
2510        m: Option<usize>,
2511        ef_construction: Option<usize>,
2512    ) -> Result<PathBuf> {
2513        let m = m.unwrap_or(FS_HNSW_DEFAULT_M);
2514        let ef_construction = ef_construction.unwrap_or(FS_HNSW_DEFAULT_EF_CONSTRUCTION);
2515
2516        tracing::info!(
2517            embedder = self.embedder_id(),
2518            count = vector_index.record_count(),
2519            m,
2520            ef_construction,
2521            "Building HNSW index for approximate nearest neighbor search"
2522        );
2523
2524        let config = FsHnswConfig {
2525            m,
2526            ef_construction,
2527            ..FsHnswConfig::default()
2528        };
2529        let hnsw = FsHnswIndex::build_from_vector_index(vector_index, config)
2530            .map_err(|err| anyhow::anyhow!("build HNSW index failed: {err}"))?;
2531
2532        let hnsw_path = hnsw_index_path(data_dir, self.embedder_id());
2533        if let Some(parent) = hnsw_path.parent() {
2534            std::fs::create_dir_all(parent)?;
2535        }
2536        hnsw.save(&hnsw_path)
2537            .map_err(|err| anyhow::anyhow!("save HNSW index failed: {err}"))?;
2538
2539        tracing::info!(?hnsw_path, "Saved HNSW index");
2540        Ok(hnsw_path)
2541    }
2542}
2543
2544#[cfg(test)]
2545mod tests {
2546    use super::*;
2547    use crate::model::types::{Agent, AgentKind, Conversation, Message, MessageRole};
2548    use crate::storage::sqlite::FrankenStorage;
2549    use serde_json::json;
2550    use std::path::Path;
2551    use tempfile::tempdir;
2552
2553    #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2554    struct ComparableSemanticInput {
2555        message_id: u64,
2556        created_at_ms: i64,
2557        agent_id: u32,
2558        workspace_id: u32,
2559        source_id: u32,
2560        role: u8,
2561        content: String,
2562    }
2563
2564    fn comparable_semantic_inputs(mut inputs: Vec<EmbeddingInput>) -> Vec<ComparableSemanticInput> {
2565        let mut comparable: Vec<ComparableSemanticInput> = inputs
2566            .drain(..)
2567            .map(|input| ComparableSemanticInput {
2568                message_id: input.message_id,
2569                created_at_ms: input.created_at_ms,
2570                agent_id: input.agent_id,
2571                workspace_id: input.workspace_id,
2572                source_id: input.source_id,
2573                role: input.role,
2574                content: input.content,
2575            })
2576            .collect();
2577        comparable.sort();
2578        comparable
2579    }
2580
2581    fn test_conversation(external_id: &str, body: &str) -> Conversation {
2582        test_conversation_fixture(
2583            external_id,
2584            vec![Message {
2585                id: None,
2586                idx: 0,
2587                role: MessageRole::User,
2588                author: None,
2589                created_at: Some(1_700_000_000_500),
2590                content: body.to_string(),
2591                extra_json: json!({}),
2592                snippets: Vec::new(),
2593            }],
2594            "local",
2595            None,
2596        )
2597    }
2598
2599    fn test_conversation_with_messages(external_id: &str, messages: Vec<Message>) -> Conversation {
2600        test_conversation_fixture(external_id, messages, "remote-laptop", Some("builder-host"))
2601    }
2602
2603    fn test_conversation_fixture(
2604        external_id: &str,
2605        messages: Vec<Message>,
2606        source_id: &str,
2607        origin_host: Option<&str>,
2608    ) -> Conversation {
2609        Conversation {
2610            id: None,
2611            agent_slug: "codex".to_string(),
2612            workspace: None,
2613            external_id: Some(external_id.to_string()),
2614            title: Some(format!("semantic {external_id}")),
2615            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2616            started_at: Some(1_700_000_000_000),
2617            ended_at: Some(1_700_000_001_000),
2618            approx_tokens: None,
2619            metadata_json: json!({}),
2620            messages,
2621            source_id: source_id.to_string(),
2622            origin_host: origin_host.map(str::to_string),
2623        }
2624    }
2625
2626    fn default_scheduler_signals() -> SemanticBackfillSchedulerSignals {
2627        SemanticBackfillSchedulerSignals {
2628            foreground_pressure: false,
2629            lexical_repair_active: false,
2630            force: false,
2631            operator_disabled: false,
2632        }
2633    }
2634
2635    struct EnvVarGuard {
2636        key: &'static str,
2637        prior: Option<String>,
2638    }
2639
2640    impl EnvVarGuard {
2641        fn set(key: &'static str, value: &str) -> Self {
2642            let prior = std::env::var(key).ok();
2643            // SAFETY: focused tests temporarily mutate process env and restore on drop.
2644            unsafe {
2645                std::env::set_var(key, value);
2646            }
2647            Self { key, prior }
2648        }
2649
2650        fn remove(key: &'static str) -> Self {
2651            let prior = std::env::var(key).ok();
2652            // SAFETY: focused tests temporarily mutate process env and restore on drop.
2653            unsafe {
2654                std::env::remove_var(key);
2655            }
2656            Self { key, prior }
2657        }
2658    }
2659
2660    impl Drop for EnvVarGuard {
2661        fn drop(&mut self) {
2662            // SAFETY: restores the process env value captured by this test guard.
2663            unsafe {
2664                match self.prior.as_deref() {
2665                    Some(value) => std::env::set_var(self.key, value),
2666                    None => std::env::remove_var(self.key),
2667                }
2668            }
2669        }
2670    }
2671
2672    #[test]
2673    fn semantic_backfill_scheduler_runs_and_scales_batch_under_idle_budget() {
2674        let policy = SemanticPolicy::compiled_defaults();
2675        let decision = semantic_backfill_scheduler_decision_for_capacity(
2676            &policy,
2677            64,
2678            &default_scheduler_signals(),
2679            80,
2680        );
2681
2682        assert!(decision.should_run());
2683        assert_eq!(decision.state, SemanticBackfillSchedulerState::Running);
2684        assert_eq!(
2685            decision.reason,
2686            SemanticBackfillSchedulerReason::IdleBudgetAvailable
2687        );
2688        assert_eq!(decision.scheduled_batch_conversations, 51);
2689        assert_eq!(decision.current_capacity_pct, 80);
2690        assert_eq!(decision.next_eligible_after_ms, 0);
2691    }
2692
2693    #[test]
2694    fn semantic_backfill_scheduler_reason_next_steps_are_stable() {
2695        for (reason, expected) in [
2696            (
2697                SemanticBackfillSchedulerReason::IdleBudgetAvailable,
2698                "background semantic backfill is within idle budgets",
2699            ),
2700            (
2701                SemanticBackfillSchedulerReason::OperatorDisabled,
2702                "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE",
2703            ),
2704            (
2705                SemanticBackfillSchedulerReason::PolicyDisabled,
2706                "semantic policy disables background semantic backfill",
2707            ),
2708            (
2709                SemanticBackfillSchedulerReason::ForegroundPressure,
2710                "foreground pressure is present; retry after the idle delay",
2711            ),
2712            (
2713                SemanticBackfillSchedulerReason::LexicalRepairActive,
2714                "lexical repair is active; semantic backfill is yielding",
2715            ),
2716            (
2717                SemanticBackfillSchedulerReason::CapacityBelowFloor,
2718                "machine responsiveness capacity is below the semantic backfill floor",
2719            ),
2720            (
2721                SemanticBackfillSchedulerReason::ThreadBudgetZero,
2722                "semantic backfill thread budget is zero",
2723            ),
2724            (
2725                SemanticBackfillSchedulerReason::BatchBudgetZero,
2726                "semantic backfill batch budget is zero",
2727            ),
2728        ] {
2729            assert_eq!(reason.next_step(), expected, "{reason:?}");
2730        }
2731    }
2732
2733    #[test]
2734    fn semantic_backfill_scheduler_yields_to_foreground_and_lexical_pressure() {
2735        let policy = SemanticPolicy::compiled_defaults();
2736        let foreground = SemanticBackfillSchedulerSignals {
2737            foreground_pressure: true,
2738            ..default_scheduler_signals()
2739        };
2740        let foreground_decision =
2741            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &foreground, 100);
2742        assert!(!foreground_decision.should_run());
2743        assert_eq!(
2744            foreground_decision.state,
2745            SemanticBackfillSchedulerState::Paused
2746        );
2747        assert_eq!(
2748            foreground_decision.reason,
2749            SemanticBackfillSchedulerReason::ForegroundPressure
2750        );
2751        assert_eq!(
2752            foreground_decision.next_eligible_after_ms,
2753            policy.idle_delay_seconds * 1000
2754        );
2755
2756        let lexical_repair = SemanticBackfillSchedulerSignals {
2757            lexical_repair_active: true,
2758            ..default_scheduler_signals()
2759        };
2760        let lexical_decision =
2761            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &lexical_repair, 100);
2762        assert!(!lexical_decision.should_run());
2763        assert_eq!(
2764            lexical_decision.state,
2765            SemanticBackfillSchedulerState::Paused
2766        );
2767        assert_eq!(
2768            lexical_decision.reason,
2769            SemanticBackfillSchedulerReason::LexicalRepairActive
2770        );
2771    }
2772
2773    #[test]
2774    fn semantic_backfill_scheduler_honors_policy_disable_and_force_override() {
2775        let mut policy = SemanticPolicy::compiled_defaults();
2776        policy.mode = crate::search::policy::SemanticMode::LexicalOnly;
2777
2778        let disabled = semantic_backfill_scheduler_decision_for_capacity(
2779            &policy,
2780            64,
2781            &default_scheduler_signals(),
2782            100,
2783        );
2784        assert!(!disabled.should_run());
2785        assert_eq!(disabled.state, SemanticBackfillSchedulerState::Disabled);
2786        assert_eq!(
2787            disabled.reason,
2788            SemanticBackfillSchedulerReason::PolicyDisabled
2789        );
2790
2791        let forced = SemanticBackfillSchedulerSignals {
2792            force: true,
2793            ..default_scheduler_signals()
2794        };
2795        let forced_decision =
2796            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &forced, 100);
2797        assert!(forced_decision.should_run());
2798        assert_eq!(
2799            forced_decision.reason,
2800            SemanticBackfillSchedulerReason::IdleBudgetAvailable
2801        );
2802        assert!(forced_decision.forced);
2803    }
2804
2805    #[test]
2806    fn test_batch_embedding() {
2807        let indexer = SemanticIndexer::new("hash", None).unwrap();
2808        let messages = vec![
2809            EmbeddingInput::new(1, "Hello world"),
2810            EmbeddingInput::new(2, "Goodbye world"),
2811        ];
2812
2813        let embeddings = indexer.embed_messages(&messages).unwrap();
2814
2815        assert_eq!(embeddings.len(), 2);
2816        assert_eq!(embeddings[0].message_id, 1);
2817        assert_eq!(embeddings[1].message_id, 2);
2818        assert_eq!(embeddings[0].embedding.len(), indexer.embedder_dimension());
2819    }
2820
2821    #[test]
2822    fn test_progress_indicator() {
2823        let indexer = SemanticIndexer::new("hash", None).unwrap();
2824        let messages: Vec<_> = (0..1000)
2825            .map(|i| EmbeddingInput::new(i as u64, format!("Message {}", i)))
2826            .collect();
2827
2828        let embeddings = indexer.embed_messages(&messages).unwrap();
2829        assert_eq!(embeddings.len(), messages.len());
2830    }
2831
2832    #[test]
2833    fn test_build_and_save_index() {
2834        let indexer = SemanticIndexer::new("hash", None).unwrap();
2835        let messages = vec![
2836            EmbeddingInput::new(1, "Hello world"),
2837            EmbeddingInput::new(2, "Goodbye world"),
2838        ];
2839
2840        let embeddings = indexer.embed_messages(&messages).unwrap();
2841        let tmp = tempdir().unwrap();
2842        let index = indexer
2843            .build_and_save_index(embeddings, tmp.path())
2844            .unwrap();
2845        assert_eq!(index.embedder_id(), indexer.embedder_id());
2846        assert_eq!(index.dimension(), indexer.embedder_dimension());
2847        assert_eq!(index.record_count(), 2);
2848    }
2849
2850    #[test]
2851    fn sharded_index_build_writes_sidecar_without_runtime_publish() {
2852        let indexer = SemanticIndexer::new("hash", None).unwrap();
2853        let messages: Vec<_> = (0..5)
2854            .map(|idx| EmbeddingInput::new(idx, format!("semantic shard message {idx}")))
2855            .collect();
2856        let embeddings = indexer.embed_messages(&messages).unwrap();
2857        let tmp = tempdir().unwrap();
2858
2859        let outcome = indexer
2860            .build_and_save_index_shards(
2861                embeddings,
2862                tmp.path(),
2863                SemanticShardBuildPlan {
2864                    tier: TierKind::Fast,
2865                    db_fingerprint: "db-fp-sharded-build".to_string(),
2866                    model_revision: "hash".to_string(),
2867                    total_conversations: 5,
2868                    max_records_per_shard: 2,
2869                    build_ann: false,
2870                },
2871            )
2872            .unwrap();
2873
2874        assert_eq!(outcome.shard_count, 3);
2875        assert_eq!(outcome.doc_count, 5);
2876        assert_eq!(outcome.total_conversations, 5);
2877        assert!(outcome.complete);
2878        assert_eq!(outcome.index_paths.len(), 3);
2879        for path in &outcome.index_paths {
2880            let shard = FsVectorIndex::open(path).unwrap();
2881            assert_eq!(shard.embedder_id(), indexer.embedder_id());
2882            assert!(shard.record_count() > 0);
2883        }
2884
2885        let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2886        let summary = shards.summary(TierKind::Fast, indexer.embedder_id(), "db-fp-sharded-build");
2887        assert!(summary.complete);
2888        assert_eq!(summary.ready_shards, 3);
2889        assert_eq!(summary.ann_ready_shards, 0);
2890        assert_eq!(summary.doc_count, 5);
2891        assert_eq!(summary.total_conversations, 5);
2892
2893        assert!(
2894            SemanticManifest::load(tmp.path()).unwrap().is_none(),
2895            "sidecar shards must not publish the main runtime manifest"
2896        );
2897        assert!(!vector_index_path(tmp.path(), indexer.embedder_id()).exists());
2898    }
2899
2900    #[test]
2901    fn sharded_index_build_rejects_zero_sized_shards() {
2902        let indexer = SemanticIndexer::new("hash", None).unwrap();
2903        let err = indexer
2904            .build_and_save_index_shards(
2905                std::iter::empty(),
2906                tempdir().unwrap().path(),
2907                SemanticShardBuildPlan {
2908                    tier: TierKind::Fast,
2909                    db_fingerprint: "db-fp-sharded-build".to_string(),
2910                    model_revision: "hash".to_string(),
2911                    total_conversations: 0,
2912                    max_records_per_shard: 0,
2913                    build_ann: false,
2914                },
2915            )
2916            .unwrap_err();
2917
2918        assert!(err.to_string().contains("max_records_per_shard > 0"));
2919    }
2920
2921    #[test]
2922    fn sharded_ann_build_records_per_shard_accelerators() {
2923        let indexer = SemanticIndexer::new("hash", None).unwrap();
2924        let messages: Vec<_> = (0..8)
2925            .map(|idx| EmbeddingInput::new(idx, format!("semantic ann shard message {idx}")))
2926            .collect();
2927        let embeddings = indexer.embed_messages(&messages).unwrap();
2928        let tmp = tempdir().unwrap();
2929
2930        let outcome = indexer
2931            .build_and_save_index_shards(
2932                embeddings,
2933                tmp.path(),
2934                SemanticShardBuildPlan {
2935                    tier: TierKind::Fast,
2936                    db_fingerprint: "db-fp-sharded-ann-build".to_string(),
2937                    model_revision: "hash".to_string(),
2938                    total_conversations: 8,
2939                    max_records_per_shard: 4,
2940                    build_ann: true,
2941                },
2942            )
2943            .unwrap();
2944
2945        assert_eq!(outcome.shard_count, 2);
2946        assert_eq!(outcome.ann_index_paths.len(), 2);
2947        for path in &outcome.ann_index_paths {
2948            assert!(path.exists(), "ANN shard missing at {}", path.display());
2949        }
2950
2951        let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2952        let summary = shards.summary(
2953            TierKind::Fast,
2954            indexer.embedder_id(),
2955            "db-fp-sharded-ann-build",
2956        );
2957        assert!(summary.complete);
2958        assert_eq!(summary.ann_ready_shards, 2);
2959        assert!(summary.ann_size_bytes > 0);
2960        assert!(
2961            shards
2962                .shards
2963                .iter()
2964                .all(|record| record.ann_index_path.is_some() && record.ann_ready)
2965        );
2966    }
2967
2968    /// Golden-output regression: any change to the embedding prep pipeline,
2969    /// the canonicalizer, the hash embedder's deterministic projection, or
2970    /// the ordering semantics of `embed_messages` must not silently mutate
2971    /// the bytes we write to the vector index. This digest is derived from a
2972    /// frozen 64-message corpus processed through the hash embedder; a
2973    /// mismatch means one of those contracts moved.
2974    #[test]
2975    fn embed_messages_golden_digest_hash_embedder() {
2976        use ring::digest::{Context, SHA256};
2977
2978        let corpus: Vec<EmbeddingInput> = (0..64)
2979            .map(|i| {
2980                let body = match i % 5 {
2981                    0 => format!("plain text message number {i}"),
2982                    1 => format!("**bold** line {i} with _emphasis_"),
2983                    2 => format!("```rust\nfn f_{i}() {{ println!(\"{i}\"); }}\n```"),
2984                    3 => format!("   whitespace {i}   "),
2985                    _ => format!("unicode \u{00E9}\u{0301} + emoji \u{1F600} {i}"),
2986                };
2987                EmbeddingInput::new(i as u64, body)
2988            })
2989            .collect();
2990
2991        let indexer = SemanticIndexer::new("hash", None)
2992            .unwrap()
2993            .with_batch_size(16)
2994            .unwrap();
2995        let embeddings = indexer.embed_messages(&corpus).unwrap();
2996
2997        // Digest over (message_id, content_hash, embedding f32 bytes) for every
2998        // embedded message, in the order emitted. Preserves order + content +
2999        // numeric equality without having to compare raw floats directly.
3000        let mut ctx = Context::new(&SHA256);
3001        for em in &embeddings {
3002            ctx.update(&em.message_id.to_le_bytes());
3003            ctx.update(&em.content_hash);
3004            for v in &em.embedding {
3005                ctx.update(&v.to_le_bytes());
3006            }
3007        }
3008        let digest = hex::encode(ctx.finish().as_ref());
3009
3010        // Captured 2026-04-21 against a freshly built hash embedder, batch
3011        // size 16, the frozen 64-message corpus above. Stable so long as
3012        // the prep pipeline, canonicalizer, and HashEmbedder::embed
3013        // implementation are all byte-preserving. If you intentionally
3014        // changed any of those, update this value AND record the reason
3015        // in the commit message.
3016        const EXPECTED: &str = "22d9ae7076925a4b70a194b0f519dfb1d465cc757368c296ef24055a02038c2c";
3017        assert_eq!(
3018            digest, EXPECTED,
3019            "embed_messages golden digest drifted; if this was intentional, \
3020             update EXPECTED in this test and record the reason in the commit message"
3021        );
3022    }
3023
3024    #[test]
3025    fn parallel_prep_matches_serial_prep_bitwise() {
3026        // Mix of short, long, empty, markdown, code-block, and unicode inputs
3027        // to make sure the canonicalizer is exercised across all of its paths.
3028        let inputs: Vec<EmbeddingInput> = (0..500)
3029            .map(|i| {
3030                let text = match i % 7 {
3031                    0 => format!("Plain message number {i} with some ordinary words."),
3032                    1 => format!("**Bold** and _italic_ markdown line {i}"),
3033                    2 => format!(
3034                        "```rust\nfn example_{i}() {{\n    println!(\"code block {i}\");\n}}\n```\nfollow-up text"
3035                    ),
3036                    3 => String::new(), // empty — should be filtered
3037                    4 => format!("   whitespace   galore   {i}   "),
3038                    5 => format!("Unicode \u{00E9}\u{0301} (combining accent) and emoji \u{1F600} line {i}"),
3039                    _ => format!(
3040                        "Mixed line {i}: `inline_code`, [link](http://x), {{braces}}, and \u{201C}curly quotes\u{201D}."
3041                    ),
3042                };
3043                EmbeddingInput::new(i as u64, text)
3044            })
3045            .collect();
3046
3047        let serial = prepare_window(&inputs, true);
3048        let parallel = prepare_window(&inputs, false);
3049
3050        assert_eq!(
3051            serial.len(),
3052            parallel.len(),
3053            "serial and parallel prep should skip the same number of empty canonicals"
3054        );
3055
3056        for (s, p) in serial.iter().zip(parallel.iter()) {
3057            assert_eq!(
3058                s.msg.message_id, p.msg.message_id,
3059                "ordering must be preserved between serial and parallel prep"
3060            );
3061            assert_eq!(
3062                s.canonical, p.canonical,
3063                "canonical form diverged between serial and parallel prep"
3064            );
3065            assert_eq!(
3066                s.hash, p.hash,
3067                "content hash diverged between serial and parallel prep"
3068            );
3069        }
3070    }
3071
3072    #[test]
3073    fn parallel_prep_filters_empty_canonicals() {
3074        let inputs = vec![
3075            EmbeddingInput::new(1, "valid content"),
3076            EmbeddingInput::new(2, ""),
3077            EmbeddingInput::new(3, "   \n\n   \t  "),
3078            EmbeddingInput::new(4, "more valid content"),
3079        ];
3080
3081        let prepared = prepare_window(&inputs, false);
3082        let ids: Vec<u64> = prepared.iter().map(|p| p.msg.message_id).collect();
3083
3084        assert!(ids.contains(&1));
3085        assert!(ids.contains(&4));
3086        // ids 2 and 3 should be dropped because their canonicals are empty.
3087        assert!(!ids.contains(&2));
3088        assert!(!ids.contains(&3));
3089    }
3090
3091    #[test]
3092    fn memoized_serial_prep_matches_stateless_prepare_window() {
3093        let inputs = vec![
3094            EmbeddingInput::new(1, "repeat me exactly"),
3095            EmbeddingInput::new(2, "repeat me exactly"),
3096            EmbeddingInput::new(3, "unique payload"),
3097            EmbeddingInput::new(4, ""),
3098            EmbeddingInput::new(5, "repeat me exactly"),
3099        ];
3100
3101        let baseline = prepare_window(&inputs, true);
3102        let mut cache = ContentAddressedMemoCache::with_capacity(16);
3103        let memoized = prepare_window_with_memo(&inputs, &mut cache);
3104
3105        assert_eq!(baseline.len(), memoized.len());
3106        for (plain, cached) in baseline.iter().zip(memoized.iter()) {
3107            assert_eq!(plain.msg.message_id, cached.msg.message_id);
3108            assert_eq!(plain.canonical, cached.canonical);
3109            assert_eq!(plain.hash, cached.hash);
3110        }
3111    }
3112
3113    #[test]
3114    fn semantic_prep_memo_key_uses_stable_content_hash_bytes() {
3115        let key = semantic_prep_memo_key("repeat me exactly");
3116        let expected = content_hash("repeat me exactly");
3117
3118        assert_eq!(key.content_hash.as_bytes(), expected.as_slice());
3119        assert_eq!(key.content_hash.as_bytes().len(), expected.len());
3120        assert_eq!(key.algorithm, SEMANTIC_PREP_MEMO_ALGORITHM);
3121        assert_eq!(key.algorithm_version, SEMANTIC_PREP_MEMO_VERSION);
3122    }
3123
3124    #[test]
3125    fn memoized_serial_prep_reuses_duplicate_content_across_windows() {
3126        let inputs = vec![
3127            EmbeddingInput::new(1, "repeat me exactly"),
3128            EmbeddingInput::new(2, "repeat me exactly"),
3129            EmbeddingInput::new(3, "unique payload"),
3130            EmbeddingInput::new(4, ""),
3131            EmbeddingInput::new(5, "repeat me exactly"),
3132        ];
3133
3134        let mut cache = ContentAddressedMemoCache::with_capacity(16);
3135        let prepared = prepare_window_with_memo(&inputs, &mut cache);
3136        let stats = cache.stats().clone();
3137
3138        assert_eq!(prepared.len(), 4);
3139        assert_eq!(stats.hits, 2);
3140        assert_eq!(stats.misses, 3);
3141        assert_eq!(stats.inserts, 2);
3142        assert_eq!(stats.live_entries, 2);
3143    }
3144
3145    #[test]
3146    fn packet_embedding_inputs_reuse_memoized_prep_for_duplicate_content() -> Result<()> {
3147        let temp = tempdir().unwrap();
3148        let db_path = temp.path().join("agent_search.db");
3149        let storage = FrankenStorage::open(&db_path)?;
3150        let agent_id = storage.ensure_agent(&Agent {
3151            id: None,
3152            slug: "codex".to_string(),
3153            name: "Codex".to_string(),
3154            version: None,
3155            kind: AgentKind::Cli,
3156        })?;
3157
3158        storage.insert_conversation_tree(
3159            agent_id,
3160            None,
3161            &test_conversation_with_messages(
3162                "packet-memo-conv-one",
3163                vec![
3164                    Message {
3165                        id: None,
3166                        idx: 0,
3167                        role: MessageRole::User,
3168                        author: None,
3169                        created_at: Some(1_700_000_010_100),
3170                        content: "shared semantic payload".to_string(),
3171                        extra_json: json!({}),
3172                        snippets: Vec::new(),
3173                    },
3174                    Message {
3175                        id: None,
3176                        idx: 1,
3177                        role: MessageRole::Agent,
3178                        author: None,
3179                        created_at: Some(1_700_000_010_200),
3180                        content: "unique semantic payload one".to_string(),
3181                        extra_json: json!({}),
3182                        snippets: Vec::new(),
3183                    },
3184                ],
3185            ),
3186        )?;
3187        storage.insert_conversation_tree(
3188            agent_id,
3189            None,
3190            &test_conversation_with_messages(
3191                "packet-memo-conv-two",
3192                vec![
3193                    Message {
3194                        id: None,
3195                        idx: 0,
3196                        role: MessageRole::Tool,
3197                        author: None,
3198                        created_at: Some(1_700_000_010_300),
3199                        content: "shared semantic payload".to_string(),
3200                        extra_json: json!({}),
3201                        snippets: Vec::new(),
3202                    },
3203                    Message {
3204                        id: None,
3205                        idx: 1,
3206                        role: MessageRole::Agent,
3207                        author: None,
3208                        created_at: Some(1_700_000_010_400),
3209                        content: "unique semantic payload two".to_string(),
3210                        extra_json: json!({}),
3211                        snippets: Vec::new(),
3212                    },
3213                ],
3214            ),
3215        )?;
3216
3217        let packet_inputs = packet_embedding_inputs_from_storage(&storage)?;
3218        let mut cache = ContentAddressedMemoCache::with_capacity(16);
3219        let prepared = prepare_window_with_memo(&packet_inputs, &mut cache);
3220        let stats = cache.stats().clone();
3221
3222        assert_eq!(packet_inputs.len(), 4);
3223        assert_eq!(prepared.len(), 4);
3224        assert_eq!(
3225            semantic_prep_memo_key("shared semantic payload")
3226                .content_hash
3227                .as_bytes()
3228                .len(),
3229            32
3230        );
3231        assert_eq!(stats.hits, 1);
3232        assert_eq!(stats.misses, 3);
3233        assert_eq!(stats.inserts, 3);
3234        assert_eq!(stats.live_entries, 3);
3235        Ok(())
3236    }
3237
3238    #[test]
3239    fn backfill_batch_saves_checkpoint_and_staged_index_until_complete() {
3240        let temp = tempdir().unwrap();
3241        let mut manifest = SemanticManifest::default();
3242        let indexer = SemanticIndexer::new("hash", None).unwrap();
3243        let messages = vec![
3244            EmbeddingInput::new(10, "first staged semantic message"),
3245            EmbeddingInput::new(11, "second staged semantic message"),
3246        ];
3247
3248        let outcome = indexer
3249            .run_backfill_batch(
3250                &messages,
3251                temp.path(),
3252                &mut manifest,
3253                SemanticBackfillBatchPlan {
3254                    tier: TierKind::Fast,
3255                    db_fingerprint: "db-fp-backfill-partial".to_string(),
3256                    model_revision: "hash".to_string(),
3257                    total_conversations: 2,
3258                    conversations_in_batch: 1,
3259                    last_offset: 1,
3260                    cursor_exhausted: false,
3261                },
3262            )
3263            .unwrap();
3264
3265        assert!(!outcome.published);
3266        assert!(outcome.checkpoint_saved);
3267        assert!(outcome.index_path.exists());
3268        assert!(!vector_index_path(temp.path(), indexer.embedder_id()).exists());
3269        let checkpoint = manifest.checkpoint.as_ref().expect("checkpoint");
3270        assert_eq!(checkpoint.tier, TierKind::Fast);
3271        assert_eq!(checkpoint.conversations_processed, 1);
3272        assert_eq!(checkpoint.docs_embedded, 2);
3273        assert_eq!(manifest.backlog.total_conversations, 2);
3274        assert!(SemanticManifest::path(temp.path()).exists());
3275    }
3276
3277    #[test]
3278    fn backfill_batch_does_not_publish_until_cursor_exhausted() -> Result<()> {
3279        let temp = tempdir()?;
3280        let mut manifest = SemanticManifest::default();
3281        let indexer = SemanticIndexer::new("hash", None)?;
3282        let db_fingerprint = "db-fp-backfill-cursor-not-exhausted";
3283
3284        let first = vec![EmbeddingInput::new(30, "first cursor batch")];
3285        let first_outcome = indexer.run_backfill_batch(
3286            &first,
3287            temp.path(),
3288            &mut manifest,
3289            SemanticBackfillBatchPlan {
3290                tier: TierKind::Fast,
3291                db_fingerprint: db_fingerprint.to_string(),
3292                model_revision: "hash".to_string(),
3293                total_conversations: 2,
3294                conversations_in_batch: 1,
3295                last_offset: 1,
3296                cursor_exhausted: false,
3297            },
3298        )?;
3299        anyhow::ensure!(!first_outcome.published, "first batch must not publish");
3300        anyhow::ensure!(
3301            first_outcome.checkpoint_saved,
3302            "first batch should save a checkpoint"
3303        );
3304
3305        let second = vec![EmbeddingInput::new(31, "second cursor batch")];
3306        let second_outcome = indexer.run_backfill_batch(
3307            &second,
3308            temp.path(),
3309            &mut manifest,
3310            SemanticBackfillBatchPlan {
3311                tier: TierKind::Fast,
3312                db_fingerprint: db_fingerprint.to_string(),
3313                model_revision: "hash".to_string(),
3314                total_conversations: 2,
3315                conversations_in_batch: 1,
3316                last_offset: 2,
3317                cursor_exhausted: false,
3318            },
3319        )?;
3320
3321        anyhow::ensure!(
3322            !second_outcome.published,
3323            "count-based completion would publish here; cursor state must win"
3324        );
3325        anyhow::ensure!(
3326            second_outcome.checkpoint_saved,
3327            "second batch should save a checkpoint"
3328        );
3329        anyhow::ensure!(
3330            !vector_index_path(temp.path(), indexer.embedder_id()).exists(),
3331            "non-exhausted cursor must not publish the final vector index"
3332        );
3333        let Some(checkpoint) = manifest.checkpoint.as_ref() else {
3334            bail!("checkpoint should remain after a non-exhausted cursor");
3335        };
3336        anyhow::ensure!(
3337            checkpoint.conversations_processed == 2,
3338            "wanted 2 processed conversations, got {}",
3339            checkpoint.conversations_processed
3340        );
3341        anyhow::ensure!(
3342            checkpoint.total_conversations == 2,
3343            "checkpoint should preserve the real DB total, got {}",
3344            checkpoint.total_conversations
3345        );
3346        anyhow::ensure!(
3347            !checkpoint.cursor_exhausted,
3348            "saved checkpoint should preserve the non-exhausted cursor state"
3349        );
3350        anyhow::ensure!(
3351            !checkpoint.is_complete(),
3352            "non-exhausted cursor checkpoint must not be complete"
3353        );
3354        anyhow::ensure!(
3355            checkpoint.progress_pct() == 99,
3356            "non-exhausted cursor checkpoint should report 99% progress, got {}",
3357            checkpoint.progress_pct()
3358        );
3359        anyhow::ensure!(
3360            second_outcome.conversations_processed == 2,
3361            "wanted outcome to report 2 processed conversations, got {}",
3362            second_outcome.conversations_processed
3363        );
3364        anyhow::ensure!(
3365            second_outcome.total_conversations == 2,
3366            "wanted outcome to preserve total 2, got {}",
3367            second_outcome.total_conversations
3368        );
3369        let progress_pct = second_outcome.progress_pct();
3370        anyhow::ensure!(
3371            (progress_pct - 99.0).abs() < f64::EPSILON,
3372            "non-published outcome should cap progress below 100%, got {}",
3373            progress_pct
3374        );
3375        Ok(())
3376    }
3377
3378    #[test]
3379    fn backfill_batch_resumes_staged_index_and_publishes_manifest_atomically() {
3380        let temp = tempdir().unwrap();
3381        let mut manifest = SemanticManifest::default();
3382        let indexer = SemanticIndexer::new("hash", None).unwrap();
3383        let db_fingerprint = "db-fp-backfill-complete";
3384        let staging_path = semantic_staging_index_path(
3385            temp.path(),
3386            TierKind::Fast,
3387            indexer.embedder_id(),
3388            db_fingerprint,
3389        );
3390
3391        let first = vec![EmbeddingInput::new(20, "first resume batch")];
3392        let first_outcome = indexer
3393            .run_backfill_batch(
3394                &first,
3395                temp.path(),
3396                &mut manifest,
3397                SemanticBackfillBatchPlan {
3398                    tier: TierKind::Fast,
3399                    db_fingerprint: db_fingerprint.to_string(),
3400                    model_revision: "hash".to_string(),
3401                    total_conversations: 2,
3402                    conversations_in_batch: 1,
3403                    last_offset: 1,
3404                    cursor_exhausted: false,
3405                },
3406            )
3407            .unwrap();
3408        assert_eq!(first_outcome.index_path, staging_path);
3409        assert!(staging_path.exists());
3410
3411        let second = vec![EmbeddingInput::new(21, "second resume batch")];
3412        let second_outcome = indexer
3413            .run_backfill_batch(
3414                &second,
3415                temp.path(),
3416                &mut manifest,
3417                SemanticBackfillBatchPlan {
3418                    tier: TierKind::Fast,
3419                    db_fingerprint: db_fingerprint.to_string(),
3420                    model_revision: "hash".to_string(),
3421                    total_conversations: 2,
3422                    conversations_in_batch: 1,
3423                    last_offset: 2,
3424                    cursor_exhausted: true,
3425                },
3426            )
3427            .unwrap();
3428
3429        assert!(second_outcome.published);
3430        assert!(!second_outcome.checkpoint_saved);
3431        assert!((second_outcome.progress_pct() - 100.0).abs() < f64::EPSILON);
3432        assert!(!staging_path.exists());
3433        let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3434        assert_eq!(second_outcome.index_path, final_path);
3435        assert!(final_path.exists());
3436        assert!(manifest.checkpoint.is_none());
3437        let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3438        assert!(artifact.ready);
3439        assert_eq!(artifact.conversation_count, 2);
3440        assert_eq!(artifact.doc_count, 2);
3441        assert_eq!(manifest.backlog.fast_tier_processed, 2);
3442
3443        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
3444        assert!(loaded.checkpoint.is_none());
3445        assert!(loaded.fast_tier.as_ref().is_some_and(|record| record.ready));
3446    }
3447
3448    #[test]
3449    fn backfill_publish_compacts_resumed_wal_before_rename() {
3450        let temp = tempdir().unwrap();
3451        let mut manifest = SemanticManifest::default();
3452        let indexer = SemanticIndexer::new("hash", None).unwrap();
3453        let db_fingerprint = "db-fp-backfill-small-resume";
3454        let first: Vec<EmbeddingInput> = (0..20)
3455            .map(|idx| EmbeddingInput::new(100 + idx, format!("first batch message {idx}")))
3456            .collect();
3457
3458        let first_outcome = indexer
3459            .run_backfill_batch(
3460                &first,
3461                temp.path(),
3462                &mut manifest,
3463                SemanticBackfillBatchPlan {
3464                    tier: TierKind::Fast,
3465                    db_fingerprint: db_fingerprint.to_string(),
3466                    model_revision: "hash".to_string(),
3467                    total_conversations: 2,
3468                    conversations_in_batch: 1,
3469                    last_offset: 1,
3470                    cursor_exhausted: false,
3471                },
3472            )
3473            .unwrap();
3474        assert!(first_outcome.checkpoint_saved);
3475
3476        let second = vec![EmbeddingInput::new(200, "small final resume batch")];
3477        let second_outcome = indexer
3478            .run_backfill_batch(
3479                &second,
3480                temp.path(),
3481                &mut manifest,
3482                SemanticBackfillBatchPlan {
3483                    tier: TierKind::Fast,
3484                    db_fingerprint: db_fingerprint.to_string(),
3485                    model_revision: "hash".to_string(),
3486                    total_conversations: 2,
3487                    conversations_in_batch: 1,
3488                    last_offset: 2,
3489                    cursor_exhausted: true,
3490                },
3491            )
3492            .unwrap();
3493
3494        assert!(second_outcome.published);
3495        let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3496        let mut final_wal_path = final_path.as_os_str().to_os_string();
3497        final_wal_path.push(".wal");
3498        assert!(!PathBuf::from(final_wal_path).exists());
3499
3500        let published_index = FsVectorIndex::open(&final_path).unwrap();
3501        assert_eq!(published_index.record_count(), 21);
3502        let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3503        assert_eq!(artifact.doc_count, 21);
3504        assert_eq!(artifact.conversation_count, 2);
3505    }
3506
3507    #[test]
3508    fn backfill_from_storage_fetches_canonical_batches_and_resumes() -> Result<()> {
3509        let temp = tempdir().unwrap();
3510        let db_path = temp.path().join("agent_search.db");
3511        let storage = FrankenStorage::open(&db_path)?;
3512        let agent_id = storage.ensure_agent(&Agent {
3513            id: None,
3514            slug: "codex".to_string(),
3515            name: "Codex".to_string(),
3516            version: None,
3517            kind: AgentKind::Cli,
3518        })?;
3519        storage.insert_conversation_tree(
3520            agent_id,
3521            None,
3522            &test_conversation("first", "first canonical semantic message"),
3523        )?;
3524        storage.insert_conversation_tree(
3525            agent_id,
3526            None,
3527            &test_conversation("second", "second canonical semantic message"),
3528        )?;
3529
3530        let mut manifest = SemanticManifest::default();
3531        let indexer = SemanticIndexer::new("hash", None)?;
3532
3533        let first = indexer.run_backfill_from_storage(
3534            &storage,
3535            temp.path(),
3536            &mut manifest,
3537            SemanticBackfillStoragePlan {
3538                tier: TierKind::Fast,
3539                db_fingerprint: "canonical-db-fp".to_string(),
3540                model_revision: "hash".to_string(),
3541                max_conversations: 1,
3542            },
3543        )?;
3544        assert!(!first.published);
3545        assert!(first.checkpoint_saved);
3546        assert_eq!(first.conversations_processed, 1);
3547        assert_eq!(first.total_conversations, 2);
3548        assert_eq!(first.embedded_docs, 1);
3549        assert!(first.last_offset > 0);
3550
3551        let second = indexer.run_backfill_from_storage(
3552            &storage,
3553            temp.path(),
3554            &mut manifest,
3555            SemanticBackfillStoragePlan {
3556                tier: TierKind::Fast,
3557                db_fingerprint: "canonical-db-fp".to_string(),
3558                model_revision: "hash".to_string(),
3559                max_conversations: 1,
3560            },
3561        )?;
3562        assert!(second.published);
3563        assert!(!second.checkpoint_saved);
3564        assert_eq!(second.conversations_processed, 2);
3565        assert_eq!(second.embedded_docs, 1);
3566        assert!(manifest.checkpoint.is_none());
3567        assert_eq!(
3568            manifest.fast_tier.as_ref().map(|record| record.doc_count),
3569            Some(2)
3570        );
3571        Ok(())
3572    }
3573
3574    #[test]
3575    fn canonical_embedding_batch_uses_conversation_packet_semantic_projection() -> Result<()> {
3576        let temp = tempdir().unwrap();
3577        let db_path = temp.path().join("agent_search.db");
3578        let storage = FrankenStorage::open(&db_path)?;
3579        let agent_id = storage.ensure_agent(&Agent {
3580            id: None,
3581            slug: "codex".to_string(),
3582            name: "Codex".to_string(),
3583            version: None,
3584            kind: AgentKind::Cli,
3585        })?;
3586        storage.insert_conversation_tree(
3587            agent_id,
3588            None,
3589            &test_conversation_with_messages(
3590                "packet-projection",
3591                vec![
3592                    Message {
3593                        id: None,
3594                        idx: 0,
3595                        role: MessageRole::User,
3596                        author: None,
3597                        created_at: Some(1_700_000_000_500),
3598                        content: "user semantic text".to_string(),
3599                        extra_json: json!({}),
3600                        snippets: Vec::new(),
3601                    },
3602                    Message {
3603                        id: None,
3604                        idx: 1,
3605                        role: MessageRole::Tool,
3606                        author: None,
3607                        created_at: Some(1_700_000_000_600),
3608                        content: "tool semantic text".to_string(),
3609                        extra_json: json!({}),
3610                        snippets: Vec::new(),
3611                    },
3612                    Message {
3613                        id: None,
3614                        idx: 2,
3615                        role: MessageRole::System,
3616                        author: None,
3617                        created_at: Some(1_700_000_000_700),
3618                        content: String::new(),
3619                        extra_json: json!({}),
3620                        snippets: Vec::new(),
3621                    },
3622                ],
3623            ),
3624        )?;
3625
3626        let batch = fetch_canonical_embedding_batch(&storage, 0, 1)?;
3627
3628        assert_eq!(batch.conversations_in_batch, 1);
3629        assert_eq!(batch.inputs.len(), 2);
3630        assert_eq!(batch.inputs[0].content, "user semantic text");
3631        assert_eq!(batch.inputs[1].content, "tool semantic text");
3632        assert_eq!(batch.inputs[0].role, role_code_from_str("user").unwrap());
3633        assert_eq!(batch.inputs[1].role, role_code_from_str("tool").unwrap());
3634        let normalized_source_id =
3635            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3636        let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3637        assert_eq!(batch.inputs[0].source_id, expected_hash);
3638        assert_eq!(batch.inputs[1].source_id, expected_hash);
3639        Ok(())
3640    }
3641
3642    #[test]
3643    fn canonical_embedding_batch_pushes_last_message_id_filter_into_selection() -> Result<()> {
3644        let temp = tempdir().unwrap();
3645        let db_path = temp.path().join("agent_search.db");
3646        let storage = FrankenStorage::open(&db_path)?;
3647        let agent_id = storage.ensure_agent(&Agent {
3648            id: None,
3649            slug: "codex".to_string(),
3650            name: "Codex".to_string(),
3651            version: None,
3652            kind: AgentKind::Cli,
3653        })?;
3654        storage.insert_conversation_tree(
3655            agent_id,
3656            None,
3657            &test_conversation("before-watermark", "old semantic message"),
3658        )?;
3659        let watermark: i64 = storage.raw().query_row_map(
3660            "SELECT MAX(id) FROM messages",
3661            &[] as &[ParamValue],
3662            |row| row.get_typed(0),
3663        )?;
3664        storage.insert_conversation_tree(
3665            agent_id,
3666            None,
3667            &test_conversation("after-watermark", "new semantic message"),
3668        )?;
3669
3670        let batch = fetch_canonical_embedding_batch_inner(&storage, 0, 8, Some(watermark))?;
3671
3672        assert_eq!(
3673            batch.conversations_in_batch, 1,
3674            "last_message_id must be pushed into candidate selection so old conversations are not counted in the batch"
3675        );
3676        assert_eq!(batch.total_conversations, 2);
3677        anyhow::ensure!(
3678            batch.cursor_exhausted,
3679            "message-cursor selection should exhaust the remaining cursor"
3680        );
3681        assert_eq!(batch.inputs.len(), 1);
3682        assert_eq!(batch.inputs[0].content, "new semantic message");
3683        assert!(
3684            i64::try_from(batch.inputs[0].message_id).unwrap_or(i64::MAX) > watermark,
3685            "selected semantic input must be strictly newer than the checkpoint message id"
3686        );
3687        Ok(())
3688    }
3689
3690    #[test]
3691    fn canonical_embedding_batch_reports_unexhausted_cursor_at_sql_limit() -> Result<()> {
3692        let temp = tempdir()?;
3693        let db_path = temp.path().join("agent_search.db");
3694        let storage = FrankenStorage::open(&db_path)?;
3695        let agent_id = storage.ensure_agent(&Agent {
3696            id: None,
3697            slug: "codex".to_string(),
3698            name: "Codex".to_string(),
3699            version: None,
3700            kind: AgentKind::Cli,
3701        })?;
3702        storage.insert_conversation_tree(
3703            agent_id,
3704            None,
3705            &test_conversation("limit-first", "first limit semantic message"),
3706        )?;
3707        storage.insert_conversation_tree(
3708            agent_id,
3709            None,
3710            &test_conversation("limit-second", "second limit semantic message"),
3711        )?;
3712
3713        let first = fetch_canonical_embedding_batch_inner(&storage, 0, 1, None)?;
3714        anyhow::ensure!(
3715            first.conversations_in_batch == 1,
3716            "wanted first batch to select 1 conversation, got {}",
3717            first.conversations_in_batch
3718        );
3719        anyhow::ensure!(
3720            !first.cursor_exhausted,
3721            "over-fetching one candidate should prove another conversation remains"
3722        );
3723
3724        let second =
3725            fetch_canonical_embedding_batch_inner(&storage, first.last_conversation_id, 1, None)?;
3726        anyhow::ensure!(
3727            second.conversations_in_batch == 1,
3728            "wanted second batch to select 1 conversation, got {}",
3729            second.conversations_in_batch
3730        );
3731        anyhow::ensure!(
3732            second.cursor_exhausted,
3733            "the final page should report cursor exhaustion even when it exactly fills the requested limit"
3734        );
3735        Ok(())
3736    }
3737
3738    #[test]
3739    fn checkpoint_caps_stop_at_whole_conversation_boundary() -> Result<()> {
3740        let temp = tempdir().unwrap();
3741        let db_path = temp.path().join("agent_search.db");
3742        let storage = FrankenStorage::open(&db_path)?;
3743        let agent_id = storage.ensure_agent(&Agent {
3744            id: None,
3745            slug: "codex".to_string(),
3746            name: "Codex".to_string(),
3747            version: None,
3748            kind: AgentKind::Cli,
3749        })?;
3750        for external_id in ["cap-first", "cap-second", "cap-third"] {
3751            storage.insert_conversation_tree(
3752                agent_id,
3753                None,
3754                &test_conversation_with_messages(
3755                    external_id,
3756                    vec![
3757                        Message {
3758                            id: None,
3759                            idx: 0,
3760                            role: MessageRole::User,
3761                            author: None,
3762                            created_at: Some(1_700_000_000_500),
3763                            content: format!("{external_id} user semantic text"),
3764                            extra_json: json!({}),
3765                            snippets: Vec::new(),
3766                        },
3767                        Message {
3768                            id: None,
3769                            idx: 1,
3770                            role: MessageRole::Agent,
3771                            author: None,
3772                            created_at: Some(1_700_000_000_600),
3773                            content: format!("{external_id} assistant semantic text"),
3774                            extra_json: json!({}),
3775                            snippets: Vec::new(),
3776                        },
3777                    ],
3778                ),
3779            )?;
3780        }
3781
3782        let first_conversation_id: i64 = storage.raw().query_row_map(
3783            "SELECT MIN(id) FROM conversations",
3784            &[] as &[ParamValue],
3785            |row| row.get_typed(0),
3786        )?;
3787        let batch = fetch_canonical_embedding_batch_inner_with_caps(
3788            &storage,
3789            0,
3790            8,
3791            None,
3792            SemanticCheckpointCaps {
3793                max_messages: 3,
3794                max_bytes: 0,
3795            },
3796        )?;
3797
3798        assert_eq!(batch.conversations_in_batch, 1);
3799        anyhow::ensure!(
3800            !batch.cursor_exhausted,
3801            "checkpoint caps that stop before the next whole conversation must not publish"
3802        );
3803        assert_eq!(batch.last_conversation_id, first_conversation_id);
3804        assert_eq!(batch.inputs.len(), 2);
3805        assert!(
3806            batch
3807                .inputs
3808                .iter()
3809                .all(|input| input.content.contains("cap-first"))
3810        );
3811        assert_eq!(batch.total_conversations, 3);
3812        Ok(())
3813    }
3814
3815    #[test]
3816    fn packet_embedding_inputs_from_storage_since_only_emits_new_canonical_messages() -> Result<()>
3817    {
3818        let temp = tempdir().unwrap();
3819        let db_path = temp.path().join("agent_search.db");
3820        let storage = FrankenStorage::open(&db_path)?;
3821        let agent_id = storage.ensure_agent(&Agent {
3822            id: None,
3823            slug: "codex".to_string(),
3824            name: "Codex".to_string(),
3825            version: None,
3826            kind: AgentKind::Cli,
3827        })?;
3828        storage.insert_conversation_tree(
3829            agent_id,
3830            None,
3831            &test_conversation_with_messages(
3832                "packet-delta",
3833                vec![
3834                    Message {
3835                        id: None,
3836                        idx: 0,
3837                        role: MessageRole::User,
3838                        author: None,
3839                        created_at: Some(1_700_000_000_500),
3840                        content: "existing semantic text".to_string(),
3841                        extra_json: json!({}),
3842                        snippets: Vec::new(),
3843                    },
3844                    Message {
3845                        id: None,
3846                        idx: 1,
3847                        role: MessageRole::Agent,
3848                        author: None,
3849                        created_at: Some(1_700_000_000_600),
3850                        content: "existing assistant text".to_string(),
3851                        extra_json: json!({}),
3852                        snippets: Vec::new(),
3853                    },
3854                ],
3855            ),
3856        )?;
3857        let watermark: i64 = storage.raw().query_row_map(
3858            "SELECT MAX(id) FROM messages",
3859            &[] as &[ParamValue],
3860            |row| row.get_typed(0),
3861        )?;
3862
3863        storage.insert_conversation_tree(
3864            agent_id,
3865            None,
3866            &test_conversation_with_messages(
3867                "packet-delta",
3868                vec![
3869                    Message {
3870                        id: None,
3871                        idx: 0,
3872                        role: MessageRole::User,
3873                        author: None,
3874                        created_at: Some(1_700_000_000_500),
3875                        content: "existing semantic text".to_string(),
3876                        extra_json: json!({}),
3877                        snippets: Vec::new(),
3878                    },
3879                    Message {
3880                        id: None,
3881                        idx: 1,
3882                        role: MessageRole::Agent,
3883                        author: None,
3884                        created_at: Some(1_700_000_000_600),
3885                        content: "existing assistant text".to_string(),
3886                        extra_json: json!({}),
3887                        snippets: Vec::new(),
3888                    },
3889                    Message {
3890                        id: None,
3891                        idx: 2,
3892                        role: MessageRole::Agent,
3893                        author: None,
3894                        created_at: Some(1_700_000_000_700),
3895                        content: "new packet semantic text".to_string(),
3896                        extra_json: json!({}),
3897                        snippets: Vec::new(),
3898                    },
3899                    Message {
3900                        id: None,
3901                        idx: 3,
3902                        role: MessageRole::System,
3903                        author: None,
3904                        created_at: Some(1_700_000_000_800),
3905                        content: String::new(),
3906                        extra_json: json!({}),
3907                        snippets: Vec::new(),
3908                    },
3909                ],
3910            ),
3911        )?;
3912
3913        let batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3914
3915        assert_eq!(batch.conversations_in_batch, 1);
3916        assert_eq!(batch.inputs.len(), 1);
3917        assert_eq!(batch.inputs[0].content, "new packet semantic text");
3918        assert_eq!(
3919            batch.inputs[0].role,
3920            role_code_from_str("assistant").unwrap()
3921        );
3922        let normalized_source_id =
3923            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3924        assert_eq!(
3925            batch.inputs[0].source_id,
3926            crc32fast::hash(normalized_source_id.as_bytes())
3927        );
3928        let expected_raw_max_id: i64 = storage.raw().query_row_map(
3929            "SELECT MAX(id) FROM messages",
3930            &[] as &[ParamValue],
3931            |row| row.get_typed(0),
3932        )?;
3933        assert_eq!(batch.raw_max_message_id, Some(expected_raw_max_id));
3934        Ok(())
3935    }
3936
3937    #[test]
3938    fn packet_catch_up_emits_expected_semantic_docs_after_watermark() -> Result<()> {
3939        let temp = tempdir().unwrap();
3940        let db_path = temp.path().join("agent_search.db");
3941        let storage = FrankenStorage::open(&db_path)?;
3942        let agent_id = storage.ensure_agent(&Agent {
3943            id: None,
3944            slug: "codex".to_string(),
3945            name: "Codex".to_string(),
3946            version: None,
3947            kind: AgentKind::Cli,
3948        })?;
3949        let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3950
3951        storage.insert_conversation_tree(
3952            agent_id,
3953            Some(workspace_id),
3954            &test_conversation_with_messages(
3955                "legacy-packet-semantics",
3956                vec![
3957                    Message {
3958                        id: None,
3959                        idx: 0,
3960                        role: MessageRole::User,
3961                        author: None,
3962                        created_at: Some(1_700_000_000_500),
3963                        content: "before watermark".to_string(),
3964                        extra_json: json!({}),
3965                        snippets: Vec::new(),
3966                    },
3967                    Message {
3968                        id: None,
3969                        idx: 1,
3970                        role: MessageRole::Agent,
3971                        author: None,
3972                        created_at: Some(1_700_000_000_600),
3973                        content: "before watermark assistant".to_string(),
3974                        extra_json: json!({}),
3975                        snippets: Vec::new(),
3976                    },
3977                ],
3978            ),
3979        )?;
3980
3981        let watermark: i64 = storage.raw().query_row_map(
3982            "SELECT MAX(id) FROM messages",
3983            &[] as &[ParamValue],
3984            |row| row.get_typed(0),
3985        )?;
3986
3987        storage.insert_conversation_tree(
3988            agent_id,
3989            Some(workspace_id),
3990            &test_conversation_with_messages(
3991                "legacy-packet-semantics",
3992                vec![
3993                    Message {
3994                        id: None,
3995                        idx: 0,
3996                        role: MessageRole::User,
3997                        author: None,
3998                        created_at: Some(1_700_000_000_500),
3999                        content: "before watermark".to_string(),
4000                        extra_json: json!({}),
4001                        snippets: Vec::new(),
4002                    },
4003                    Message {
4004                        id: None,
4005                        idx: 1,
4006                        role: MessageRole::Agent,
4007                        author: None,
4008                        created_at: Some(1_700_000_000_600),
4009                        content: "before watermark assistant".to_string(),
4010                        extra_json: json!({}),
4011                        snippets: Vec::new(),
4012                    },
4013                    Message {
4014                        id: None,
4015                        idx: 2,
4016                        role: MessageRole::Agent,
4017                        author: None,
4018                        created_at: Some(1_700_000_000_700),
4019                        content: "after watermark assistant".to_string(),
4020                        extra_json: json!({}),
4021                        snippets: Vec::new(),
4022                    },
4023                    Message {
4024                        id: None,
4025                        idx: 3,
4026                        role: MessageRole::System,
4027                        author: None,
4028                        created_at: Some(1_700_000_000_800),
4029                        content: String::new(),
4030                        extra_json: json!({}),
4031                        snippets: Vec::new(),
4032                    },
4033                ],
4034            ),
4035        )?;
4036        storage.insert_conversation_tree(
4037            agent_id,
4038            Some(workspace_id),
4039            &test_conversation_with_messages(
4040                "legacy-packet-semantics-second-conv",
4041                vec![
4042                    Message {
4043                        id: None,
4044                        idx: 0,
4045                        role: MessageRole::Tool,
4046                        author: None,
4047                        created_at: Some(1_700_000_000_900),
4048                        content: "after watermark tool".to_string(),
4049                        extra_json: json!({}),
4050                        snippets: Vec::new(),
4051                    },
4052                    Message {
4053                        id: None,
4054                        idx: 1,
4055                        role: MessageRole::System,
4056                        author: None,
4057                        created_at: Some(1_700_000_001_000),
4058                        content: String::new(),
4059                        extra_json: json!({}),
4060                        snippets: Vec::new(),
4061                    },
4062                ],
4063            ),
4064        )?;
4065
4066        let packet_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
4067        let normalized_source_id =
4068            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
4069        let source_id_hash = crc32fast::hash(normalized_source_id.as_bytes());
4070        let expected = vec![
4071            ComparableSemanticInput {
4072                message_id: u64::try_from(watermark + 1).unwrap(),
4073                created_at_ms: 1_700_000_000_700,
4074                agent_id: u32::try_from(agent_id).unwrap(),
4075                workspace_id: u32::try_from(workspace_id).unwrap(),
4076                source_id: source_id_hash,
4077                role: role_code_from_str("assistant").unwrap(),
4078                content: "after watermark assistant".to_string(),
4079            },
4080            ComparableSemanticInput {
4081                message_id: u64::try_from(watermark + 3).unwrap(),
4082                created_at_ms: 1_700_000_000_900,
4083                agent_id: u32::try_from(agent_id).unwrap(),
4084                workspace_id: u32::try_from(workspace_id).unwrap(),
4085                source_id: source_id_hash,
4086                role: role_code_from_str("tool").unwrap(),
4087                content: "after watermark tool".to_string(),
4088            },
4089        ];
4090
4091        assert_eq!(comparable_semantic_inputs(packet_batch.inputs), expected);
4092        assert_eq!(packet_batch.conversations_in_batch, 2);
4093        assert_eq!(packet_batch.raw_max_message_id, Some(watermark + 4));
4094        Ok(())
4095    }
4096
4097    #[test]
4098    fn packet_embedding_inputs_for_message_ids_matches_since_selection() -> Result<()> {
4099        let temp = tempdir().unwrap();
4100        let db_path = temp.path().join("agent_search.db");
4101        let storage = FrankenStorage::open(&db_path)?;
4102        let agent_id = storage.ensure_agent(&Agent {
4103            id: None,
4104            slug: "codex".to_string(),
4105            name: "Codex".to_string(),
4106            version: None,
4107            kind: AgentKind::Cli,
4108        })?;
4109        let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
4110
4111        storage.insert_conversation_tree(
4112            agent_id,
4113            Some(workspace_id),
4114            &test_conversation_with_messages(
4115                "selected-vs-since",
4116                vec![
4117                    Message {
4118                        id: None,
4119                        idx: 0,
4120                        role: MessageRole::User,
4121                        author: None,
4122                        created_at: Some(1_700_000_100_100),
4123                        content: "before watermark".to_string(),
4124                        extra_json: json!({}),
4125                        snippets: Vec::new(),
4126                    },
4127                    Message {
4128                        id: None,
4129                        idx: 1,
4130                        role: MessageRole::Agent,
4131                        author: None,
4132                        created_at: Some(1_700_000_100_200),
4133                        content: "before watermark assistant".to_string(),
4134                        extra_json: json!({}),
4135                        snippets: Vec::new(),
4136                    },
4137                ],
4138            ),
4139        )?;
4140
4141        let watermark: i64 = storage.raw().query_row_map(
4142            "SELECT MAX(id) FROM messages",
4143            &[] as &[ParamValue],
4144            |row| row.get_typed(0),
4145        )?;
4146
4147        storage.insert_conversation_tree(
4148            agent_id,
4149            Some(workspace_id),
4150            &test_conversation_with_messages(
4151                "selected-vs-since",
4152                vec![
4153                    Message {
4154                        id: None,
4155                        idx: 0,
4156                        role: MessageRole::User,
4157                        author: None,
4158                        created_at: Some(1_700_000_100_100),
4159                        content: "before watermark".to_string(),
4160                        extra_json: json!({}),
4161                        snippets: Vec::new(),
4162                    },
4163                    Message {
4164                        id: None,
4165                        idx: 1,
4166                        role: MessageRole::Agent,
4167                        author: None,
4168                        created_at: Some(1_700_000_100_200),
4169                        content: "before watermark assistant".to_string(),
4170                        extra_json: json!({}),
4171                        snippets: Vec::new(),
4172                    },
4173                    Message {
4174                        id: None,
4175                        idx: 2,
4176                        role: MessageRole::Tool,
4177                        author: None,
4178                        created_at: Some(1_700_000_100_300),
4179                        content: "after watermark tool".to_string(),
4180                        extra_json: json!({}),
4181                        snippets: Vec::new(),
4182                    },
4183                    Message {
4184                        id: None,
4185                        idx: 3,
4186                        role: MessageRole::System,
4187                        author: None,
4188                        created_at: Some(1_700_000_100_400),
4189                        content: String::new(),
4190                        extra_json: json!({}),
4191                        snippets: Vec::new(),
4192                    },
4193                ],
4194            ),
4195        )?;
4196        storage.insert_conversation_tree(
4197            agent_id,
4198            Some(workspace_id),
4199            &test_conversation_with_messages(
4200                "selected-vs-since-second",
4201                vec![Message {
4202                    id: None,
4203                    idx: 0,
4204                    role: MessageRole::Agent,
4205                    author: None,
4206                    created_at: Some(1_700_000_100_500),
4207                    content: "after watermark assistant".to_string(),
4208                    extra_json: json!({}),
4209                    snippets: Vec::new(),
4210                }],
4211            ),
4212        )?;
4213
4214        let since_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
4215        let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
4216            "SELECT DISTINCT conversation_id
4217             FROM messages
4218             WHERE id > ?1
4219             ORDER BY conversation_id ASC",
4220            &[ParamValue::from(watermark)],
4221            |row| row.get_typed(0),
4222        )?;
4223        let selected_message_ids: HashSet<i64> = storage
4224            .raw()
4225            .query_map_collect(
4226                "SELECT id
4227                 FROM messages
4228                 WHERE id > ?1
4229                 ORDER BY id ASC",
4230                &[ParamValue::from(watermark)],
4231                |row| row.get_typed(0),
4232            )?
4233            .into_iter()
4234            .collect();
4235        let selected_inputs = packet_embedding_inputs_from_storage_for_message_ids(
4236            &storage,
4237            &conversation_ids,
4238            &selected_message_ids,
4239        )?;
4240
4241        assert_eq!(
4242            comparable_semantic_inputs(selected_inputs),
4243            comparable_semantic_inputs(since_batch.inputs)
4244        );
4245        Ok(())
4246    }
4247
4248    #[test]
4249    fn default_batch_size_uses_new_value() {
4250        // The test setup must not leak a caller-provided CASS_SEMANTIC_BATCH_SIZE
4251        // override, which would mask the constant bump we're asserting on.
4252        let _guard = EnvVarGuard::remove("CASS_SEMANTIC_BATCH_SIZE");
4253        let indexer = SemanticIndexer::new("hash", None).unwrap();
4254        assert_eq!(indexer.batch_size(), DEFAULT_SEMANTIC_BATCH_SIZE);
4255    }
4256
4257    #[test]
4258    fn semantic_watchdog_and_checkpoint_caps_have_derived_defaults() {
4259        let _warn = EnvVarGuard::remove("CASS_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS");
4260        let _fail = EnvVarGuard::remove("CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS");
4261        let _messages = EnvVarGuard::remove("CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT");
4262        let _bytes = EnvVarGuard::remove("CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT");
4263
4264        assert_eq!(
4265            resolved_semantic_embed_batch_warn_after_ms(),
4266            DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS
4267        );
4268        assert_eq!(
4269            resolved_semantic_embed_batch_fail_after_ms(),
4270            DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS
4271        );
4272        assert_eq!(
4273            SemanticCheckpointCaps::from_env(),
4274            SemanticCheckpointCaps {
4275                max_messages: DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT,
4276                max_bytes: DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT,
4277            }
4278        );
4279    }
4280
4281    #[test]
4282    fn parallel_prep_enabled_reuses_truthy_env_parser() {
4283        for (value, expected) in [
4284            ("1", true),
4285            ("true", true),
4286            (" YeS ", true),
4287            ("on", true),
4288            ("0", false),
4289            ("false", false),
4290            ("off", false),
4291        ] {
4292            let _guard = EnvVarGuard::set("CASS_SEMANTIC_PREP_PARALLEL", value);
4293            assert_eq!(parallel_prep_enabled(), expected, "env value {value:?}");
4294        }
4295
4296        let _guard = EnvVarGuard::remove("CASS_SEMANTIC_PREP_PARALLEL");
4297        assert!(!parallel_prep_enabled());
4298    }
4299
4300    #[test]
4301    fn saturating_u64_from_usize_covers_bounds() {
4302        assert_eq!(saturating_u64_from_usize(0), 0);
4303        assert_eq!(saturating_u64_from_usize(42), 42);
4304        assert_eq!(
4305            saturating_u64_from_usize(usize::MAX),
4306            u64::try_from(usize::MAX).unwrap_or(u64::MAX)
4307        );
4308    }
4309
4310    /// `coding_agent_session_search-ibuuh.32` (sink #3 equivalence gate):
4311    /// the packet-driven `semantic_inputs_from_packets` helper must
4312    /// produce the same `EmbeddingInput` list a fresh storage replay
4313    /// returns for the same canonical corpus. Once this passes, callers
4314    /// that already hold packets (rebuild pipeline, salvage replay,
4315    /// repair flows) can drive the semantic preparation consumer
4316    /// without a second canonical-row round-trip.
4317    #[test]
4318    fn semantic_inputs_from_packets_matches_storage_replay() -> Result<()> {
4319        let temp = tempdir().unwrap();
4320        let db_path = temp.path().join("agent_search.db");
4321        let storage = FrankenStorage::open(&db_path)?;
4322
4323        let agent_id_codex = storage.ensure_agent(&Agent {
4324            id: None,
4325            slug: "codex".to_string(),
4326            name: "Codex".to_string(),
4327            version: None,
4328            kind: AgentKind::Cli,
4329        })?;
4330        let agent_id_claude = storage.ensure_agent(&Agent {
4331            id: None,
4332            slug: "claude_code".to_string(),
4333            name: "Claude Code".to_string(),
4334            version: None,
4335            kind: AgentKind::Cli,
4336        })?;
4337        let workspace_id =
4338            storage.ensure_workspace(Path::new("/tmp/semantic-equivalence-ws"), None)?;
4339
4340        // Two conversations on different agents, mixed roles, including
4341        // an empty-content system message that the semantic projection
4342        // must filter (matches the legacy storage replay).
4343        storage.insert_conversation_tree(
4344            agent_id_codex,
4345            Some(workspace_id),
4346            &test_conversation_with_messages(
4347                "packet-equiv-1",
4348                vec![
4349                    Message {
4350                        id: None,
4351                        idx: 0,
4352                        role: MessageRole::User,
4353                        author: None,
4354                        created_at: Some(1_700_000_000_500),
4355                        content: "first user prompt".to_string(),
4356                        extra_json: json!({}),
4357                        snippets: Vec::new(),
4358                    },
4359                    Message {
4360                        id: None,
4361                        idx: 1,
4362                        role: MessageRole::Agent,
4363                        author: None,
4364                        created_at: Some(1_700_000_000_600),
4365                        content: "first assistant reply".to_string(),
4366                        extra_json: json!({}),
4367                        snippets: Vec::new(),
4368                    },
4369                    Message {
4370                        id: None,
4371                        idx: 2,
4372                        role: MessageRole::System,
4373                        author: None,
4374                        created_at: Some(1_700_000_000_700),
4375                        // Empty content is filtered by both paths.
4376                        content: String::new(),
4377                        extra_json: json!({}),
4378                        snippets: Vec::new(),
4379                    },
4380                ],
4381            ),
4382        )?;
4383        storage.insert_conversation_tree(
4384            agent_id_claude,
4385            Some(workspace_id),
4386            &test_conversation_with_messages(
4387                "packet-equiv-2",
4388                vec![
4389                    Message {
4390                        id: None,
4391                        idx: 0,
4392                        role: MessageRole::Tool,
4393                        author: Some("ripgrep".to_string()),
4394                        created_at: Some(1_700_000_001_500),
4395                        content: "tool output line".to_string(),
4396                        extra_json: json!({}),
4397                        snippets: Vec::new(),
4398                    },
4399                    Message {
4400                        id: None,
4401                        idx: 1,
4402                        role: MessageRole::Agent,
4403                        author: None,
4404                        created_at: Some(1_700_000_001_600),
4405                        content: "second assistant reply".to_string(),
4406                        extra_json: json!({}),
4407                        snippets: Vec::new(),
4408                    },
4409                ],
4410            ),
4411        )?;
4412
4413        // Legacy path: the storage-driven replay that the rebuild
4414        // pipeline currently uses.
4415        let storage_inputs = packet_embedding_inputs_from_storage(&storage)?;
4416
4417        // Packet-driven path: re-fetch the canonical envelopes (so we
4418        // get the storage-internal agent/workspace ids the rebuild path
4419        // would normally pair with packets), then convert those rows
4420        // into ConversationPackets via canonical replay and feed them
4421        // through `semantic_inputs_from_packets`.
4422        let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
4423            "SELECT DISTINCT m.conversation_id
4424             FROM messages m
4425             JOIN conversations c ON c.id = m.conversation_id
4426             ORDER BY m.conversation_id ASC",
4427            &[] as &[ParamValue],
4428            |row| row.get_typed(0),
4429        )?;
4430        let envelopes = fetch_canonical_embedding_conversations(&storage, &conversation_ids)?;
4431        let mut grouped_messages =
4432            storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
4433        let mut packets: Vec<ConversationPacket> = Vec::with_capacity(envelopes.len());
4434        let mut contexts: Vec<SemanticPacketContext> = Vec::with_capacity(envelopes.len());
4435        for envelope in &envelopes {
4436            let messages = grouped_messages
4437                .remove(&envelope.conversation_id)
4438                .unwrap_or_default();
4439            let provenance = canonical_embedding_packet_provenance(envelope);
4440            let canonical = canonical_embedding_conversation(envelope, &provenance, messages);
4441            packets.push(ConversationPacket::from_canonical_replay(
4442                &canonical, provenance,
4443            ));
4444            contexts.push(SemanticPacketContext {
4445                conversation_id: envelope.conversation_id,
4446                agent_id: saturating_u32_from_i64(envelope.agent_id),
4447                workspace_id: saturating_u32_from_i64(envelope.workspace_id.unwrap_or(0)),
4448            });
4449        }
4450        let packet_inputs = semantic_inputs_from_packets(&packets, &contexts)?;
4451
4452        // The two paths must produce the same EmbeddingInput list
4453        // (sortable comparison normalizes ordering across the two
4454        // helpers' iteration orders).
4455        assert!(
4456            !storage_inputs.is_empty(),
4457            "fixture should produce non-empty semantic inputs (sanity)"
4458        );
4459        assert_eq!(
4460            comparable_semantic_inputs(storage_inputs.clone()),
4461            comparable_semantic_inputs(packet_inputs.clone()),
4462            "packet-driven semantic preparation must match storage replay byte-for-byte"
4463        );
4464
4465        // Sanity-pin a couple of contract details so a regression in
4466        // either path (e.g. role normalization or empty-content
4467        // filtering) trips a clear assertion rather than a generic
4468        // length mismatch.
4469        let storage_count = storage_inputs.len();
4470        let packet_count = packet_inputs.len();
4471        assert_eq!(
4472            storage_count, packet_count,
4473            "storage and packet semantic input counts must agree exactly"
4474        );
4475        // Empty-content system message must NOT appear in the output.
4476        assert!(
4477            packet_inputs.iter().all(|input| !input.content.is_empty()),
4478            "empty content must be filtered by the packet semantic projection"
4479        );
4480        // The remote-host source_id pins the cross-path provenance hash.
4481        let normalized_source_id =
4482            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
4483        let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
4484        assert!(
4485            packet_inputs
4486                .iter()
4487                .all(|input| input.source_id == expected_hash),
4488            "every emitted EmbeddingInput must hash provenance via the packet's normalized source_id"
4489        );
4490
4491        Ok(())
4492    }
4493
4494    /// Length-mismatch defense: if a caller hands `semantic_inputs_from_packets`
4495    /// a packet/context slice pair of different lengths, the helper must
4496    /// return an error rather than silently mis-correlating ids. Pinning
4497    /// this is part of the bead's "shadow / compare mode plus explicit
4498    /// kill-switch" acceptance language.
4499    #[test]
4500    fn semantic_inputs_from_packets_rejects_length_mismatch() {
4501        let provenance = ConversationPacketProvenance::local();
4502        let canonical = test_conversation("packet-mismatch", "hello");
4503        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
4504        let result = semantic_inputs_from_packets(&[packet], &[]);
4505        assert!(
4506            result.is_err(),
4507            "expected error on packet/context length mismatch"
4508        );
4509        let err = result.unwrap_err().to_string();
4510        assert!(
4511            err.contains("length mismatch"),
4512            "error should mention length mismatch, got: {err}"
4513        );
4514    }
4515}