Skip to main content

coding_agent_search/indexer/
semantic.rs

1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::io::IsTerminal;
4use std::path::{Path, PathBuf};
5use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8use frankensearch::index::{
9    HNSW_DEFAULT_EF_CONSTRUCTION as FS_HNSW_DEFAULT_EF_CONSTRUCTION,
10    HNSW_DEFAULT_M as FS_HNSW_DEFAULT_M, HnswConfig as FsHnswConfig, HnswIndex as FsHnswIndex,
11    Quantization as FsQuantization, VectorIndex as FsVectorIndex,
12    VectorIndexWriter as FsVectorIndexWriter,
13};
14use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
15use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
16use rayon::prelude::*;
17
18use crate::indexer::memoization::{
19    ContentAddressedMemoCache, MemoCacheAuditRecord, MemoContentHash, MemoKey, MemoLookup,
20};
21use crate::indexer::responsiveness;
22use crate::indexer::semantic_progress::{
23    SemanticProgressEvent, SemanticProgressFields, SemanticProgressSink,
24};
25use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
26use crate::model::types::{Conversation, Message};
27use crate::search::canonicalize::{canonicalize_for_embedding, content_hash};
28use crate::search::embedder::Embedder;
29use crate::search::fastembed_embedder::FastEmbedder;
30use crate::search::hash_embedder::HashEmbedder;
31use crate::search::policy::{CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy};
32use crate::search::semantic_manifest::{
33    ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
34    TierKind,
35};
36use crate::search::tantivy::{
37    normalized_index_origin_host, normalized_index_origin_kind, normalized_index_source_id,
38};
39use crate::search::vector_index::{
40    ROLE_USER, SemanticDocId, VECTOR_INDEX_DIR, role_code_from_str, vector_index_path,
41};
42use crate::storage::sqlite::FrankenStorage;
43
44/// Default embedder batch size. 128 is a sweet spot for ONNX MiniLM models on
45/// modern CPUs: big enough to amortize dispatch overhead and keep the tensor
46/// kernels saturated, small enough that one batch fits comfortably in L2 and
47/// memory reservation stays bounded for large corpora.
48const DEFAULT_SEMANTIC_BATCH_SIZE: usize = 128;
49const DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY: usize = 4_096;
50const DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS: u64 = 30_000;
51const DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS: u64 = 300_000;
52const DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT: usize = 10_000;
53const DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT: u64 = 8 * 1024 * 1024;
54const SEMANTIC_PREP_MEMO_ALGORITHM: &str = "semantic_prepare_window";
55const SEMANTIC_PREP_MEMO_VERSION: &str = "canonicalize_for_embedding:v2:stable-content-hash";
56
57fn resolved_env_usize(key: &str, default: usize) -> usize {
58    dotenvy::var(key)
59        .ok()
60        .and_then(|v| v.parse::<usize>().ok())
61        .unwrap_or(default)
62}
63
64fn resolved_env_u64(key: &str, default: u64) -> u64 {
65    dotenvy::var(key)
66        .ok()
67        .and_then(|v| v.parse::<u64>().ok())
68        .unwrap_or(default)
69}
70
71fn resolved_default_batch_size() -> usize {
72    dotenvy::var("CASS_SEMANTIC_BATCH_SIZE")
73        .ok()
74        .and_then(|v| v.parse::<usize>().ok())
75        .filter(|v| *v > 0)
76        .unwrap_or(DEFAULT_SEMANTIC_BATCH_SIZE)
77}
78
79fn resolved_semantic_prep_memo_capacity() -> usize {
80    dotenvy::var("CASS_SEMANTIC_PREP_MEMO_CAPACITY")
81        .ok()
82        .and_then(|v| v.parse::<usize>().ok())
83        .filter(|v| *v > 0)
84        .unwrap_or(DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY)
85}
86
87fn resolved_semantic_embed_batch_warn_after_ms() -> u64 {
88    resolved_env_u64(
89        "CASS_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS",
90        DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS,
91    )
92}
93
94fn resolved_semantic_embed_batch_fail_after_ms() -> u64 {
95    resolved_env_u64(
96        "CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS",
97        DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS,
98    )
99}
100
101/// Opt in to the rayon-parallel canonicalize+hash prep step. **Default: OFF.**
102///
103/// The parallel path is kept because canonicalize+hash CAN dominate the
104/// embedding wall-clock on pathological inputs (very long messages, costly
105/// Unicode normalization). But criterion baselines captured under
106/// `tests/artifacts/perf/2026-04-21-profile-run/baselines.md` showed a
107/// 1.2×–2.3× **regression** on the hash embedder across every batch size
108/// tested (2 000 messages, mixed markdown/code/unicode): rayon's per-task
109/// scheduling overhead is larger than the per-message canonicalize+hash cost
110/// when the embedder itself is cheap. For the production ONNX (MiniLM)
111/// embedder, per-batch inference already dwarfs prep, so parallel prep never
112/// buys meaningful wall-clock — the prep step is ≤ 1% of total embed time.
113///
114/// Set `CASS_SEMANTIC_PREP_PARALLEL=1` / `true` / `yes` / `on` to opt in.
115fn parallel_prep_enabled() -> bool {
116    env_truthy("CASS_SEMANTIC_PREP_PARALLEL")
117}
118
119fn saturating_u64_from_usize(value: usize) -> u64 {
120    u64::try_from(value).unwrap_or(u64::MAX)
121}
122
123fn saturating_u64_from_millis(value: u128) -> u64 {
124    u64::try_from(value).unwrap_or(u64::MAX)
125}
126
127#[derive(Debug, Clone)]
128pub struct EmbeddingInput {
129    pub message_id: u64,
130    pub created_at_ms: i64,
131    pub agent_id: u32,
132    pub workspace_id: u32,
133    pub source_id: u32,
134    pub role: u8,
135    pub chunk_idx: u8,
136    pub content: String,
137}
138
139impl EmbeddingInput {
140    pub fn new(message_id: u64, content: impl Into<String>) -> Self {
141        Self {
142            message_id,
143            created_at_ms: 0,
144            agent_id: 0,
145            workspace_id: 0,
146            source_id: 0,
147            role: ROLE_USER,
148            chunk_idx: 0,
149            content: content.into(),
150        }
151    }
152}
153
154#[derive(Debug, Clone)]
155pub struct EmbeddedMessage {
156    pub message_id: u64,
157    pub created_at_ms: i64,
158    pub agent_id: u32,
159    pub workspace_id: u32,
160    pub source_id: u32,
161    pub role: u8,
162    pub chunk_idx: u8,
163    pub content_hash: [u8; 32],
164    pub embedding: Vec<f32>,
165}
166
167#[derive(Debug, Clone)]
168pub struct SemanticBackfillBatchPlan {
169    pub tier: TierKind,
170    pub db_fingerprint: String,
171    pub model_revision: String,
172    pub total_conversations: u64,
173    pub conversations_in_batch: u64,
174    pub last_offset: i64,
175}
176
177#[derive(Debug, Clone)]
178pub struct SemanticBackfillStoragePlan {
179    pub tier: TierKind,
180    pub db_fingerprint: String,
181    pub model_revision: String,
182    pub max_conversations: usize,
183}
184
185#[derive(Debug, Clone)]
186pub struct SemanticBackfillBatchOutcome {
187    pub tier: TierKind,
188    pub embedder_id: String,
189    pub embedded_docs: u64,
190    pub conversations_processed: u64,
191    pub total_conversations: u64,
192    pub last_offset: i64,
193    pub checkpoint_saved: bool,
194    pub published: bool,
195    pub index_path: PathBuf,
196    pub manifest_path: PathBuf,
197}
198
199#[derive(Debug, Clone)]
200pub struct SemanticShardBuildPlan {
201    pub tier: TierKind,
202    pub db_fingerprint: String,
203    pub model_revision: String,
204    pub total_conversations: u64,
205    pub max_records_per_shard: usize,
206    pub build_ann: bool,
207}
208
209#[derive(Debug, Clone)]
210pub struct SemanticShardBuildOutcome {
211    pub tier: TierKind,
212    pub embedder_id: String,
213    pub shard_count: u32,
214    pub doc_count: u64,
215    pub total_conversations: u64,
216    pub index_paths: Vec<PathBuf>,
217    pub ann_index_paths: Vec<PathBuf>,
218    pub shard_manifest_path: PathBuf,
219    pub complete: bool,
220}
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
223#[serde(rename_all = "snake_case")]
224pub(crate) enum SemanticBackfillSchedulerState {
225    Running,
226    Paused,
227    Disabled,
228}
229
230impl SemanticBackfillSchedulerState {
231    pub(crate) fn as_str(self) -> &'static str {
232        match self {
233            Self::Running => "running",
234            Self::Paused => "paused",
235            Self::Disabled => "disabled",
236        }
237    }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
241#[serde(rename_all = "snake_case")]
242pub(crate) enum SemanticBackfillSchedulerReason {
243    IdleBudgetAvailable,
244    OperatorDisabled,
245    PolicyDisabled,
246    ForegroundPressure,
247    LexicalRepairActive,
248    CapacityBelowFloor,
249    ThreadBudgetZero,
250    BatchBudgetZero,
251}
252
253impl SemanticBackfillSchedulerReason {
254    pub(crate) fn next_step(self) -> &'static str {
255        match self {
256            Self::IdleBudgetAvailable => "background semantic backfill is within idle budgets",
257            Self::OperatorDisabled => {
258                "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE"
259            }
260            Self::PolicyDisabled => "semantic policy disables background semantic backfill",
261            Self::ForegroundPressure => {
262                "foreground pressure is present; retry after the idle delay"
263            }
264            Self::LexicalRepairActive => "lexical repair is active; semantic backfill is yielding",
265            Self::CapacityBelowFloor => {
266                "machine responsiveness capacity is below the semantic backfill floor"
267            }
268            Self::ThreadBudgetZero => "semantic backfill thread budget is zero",
269            Self::BatchBudgetZero => "semantic backfill batch budget is zero",
270        }
271    }
272}
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275pub(crate) struct SemanticBackfillSchedulerSignals {
276    pub foreground_pressure: bool,
277    pub lexical_repair_active: bool,
278    pub force: bool,
279    pub operator_disabled: bool,
280}
281
282impl SemanticBackfillSchedulerSignals {
283    pub(crate) fn from_env() -> Self {
284        Self {
285            foreground_pressure: env_truthy("CASS_SEMANTIC_BACKFILL_FOREGROUND_ACTIVE"),
286            lexical_repair_active: env_truthy("CASS_SEMANTIC_BACKFILL_LEXICAL_REPAIR_ACTIVE"),
287            force: env_truthy("CASS_SEMANTIC_BACKFILL_FORCE"),
288            operator_disabled: env_truthy("CASS_SEMANTIC_BACKFILL_DISABLE"),
289        }
290    }
291}
292
293#[derive(Debug, Clone, serde::Serialize)]
294pub(crate) struct SemanticBackfillSchedulerDecision {
295    pub state: SemanticBackfillSchedulerState,
296    pub reason: SemanticBackfillSchedulerReason,
297    pub requested_batch_conversations: usize,
298    pub scheduled_batch_conversations: usize,
299    pub current_capacity_pct: u32,
300    pub min_capacity_pct: u32,
301    pub max_backfill_threads: usize,
302    pub idle_delay_seconds: u64,
303    pub chunk_timeout_seconds: u64,
304    pub foreground_pressure: bool,
305    pub lexical_repair_active: bool,
306    pub forced: bool,
307    pub next_eligible_after_ms: u64,
308}
309
310impl SemanticBackfillSchedulerDecision {
311    pub(crate) fn should_run(&self) -> bool {
312        matches!(self.state, SemanticBackfillSchedulerState::Running)
313    }
314}
315
316fn env_truthy(key: &str) -> bool {
317    dotenvy::var(key)
318        .ok()
319        .map(|value| {
320            matches!(
321                value.trim().to_ascii_lowercase().as_str(),
322                "1" | "true" | "yes" | "on"
323            )
324        })
325        .unwrap_or(false)
326}
327
328fn env_backfill_min_capacity_pct() -> u32 {
329    dotenvy::var("CASS_SEMANTIC_BACKFILL_MIN_CAPACITY_PCT")
330        .ok()
331        .and_then(|value| value.trim().parse::<u32>().ok())
332        .map(|value| value.clamp(1, 100))
333        .unwrap_or(75)
334}
335
336pub(crate) fn semantic_backfill_scheduler_decision(
337    policy: &SemanticPolicy,
338    requested_batch_conversations: usize,
339    signals: &SemanticBackfillSchedulerSignals,
340) -> SemanticBackfillSchedulerDecision {
341    semantic_backfill_scheduler_decision_for_capacity(
342        policy,
343        requested_batch_conversations,
344        signals,
345        responsiveness::current_capacity_pct(),
346    )
347}
348
349pub(crate) fn semantic_backfill_scheduler_decision_for_capacity(
350    policy: &SemanticPolicy,
351    requested_batch_conversations: usize,
352    signals: &SemanticBackfillSchedulerSignals,
353    current_capacity_pct: u32,
354) -> SemanticBackfillSchedulerDecision {
355    let min_capacity_pct = env_backfill_min_capacity_pct();
356    let paused_delay_ms = policy.idle_delay_seconds.saturating_mul(1000);
357    let mut decision = SemanticBackfillSchedulerDecision {
358        state: SemanticBackfillSchedulerState::Running,
359        reason: SemanticBackfillSchedulerReason::IdleBudgetAvailable,
360        requested_batch_conversations,
361        scheduled_batch_conversations: requested_batch_conversations,
362        current_capacity_pct: current_capacity_pct.clamp(0, 100),
363        min_capacity_pct,
364        max_backfill_threads: policy.max_backfill_threads,
365        idle_delay_seconds: policy.idle_delay_seconds,
366        chunk_timeout_seconds: policy.chunk_timeout_seconds,
367        foreground_pressure: signals.foreground_pressure,
368        lexical_repair_active: signals.lexical_repair_active,
369        forced: signals.force,
370        next_eligible_after_ms: 0,
371    };
372
373    if requested_batch_conversations == 0 {
374        return stopped_scheduler_decision(
375            decision,
376            SemanticBackfillSchedulerState::Disabled,
377            SemanticBackfillSchedulerReason::BatchBudgetZero,
378            paused_delay_ms,
379        );
380    }
381    if policy.max_backfill_threads == 0 && !signals.force {
382        return stopped_scheduler_decision(
383            decision,
384            SemanticBackfillSchedulerState::Disabled,
385            SemanticBackfillSchedulerReason::ThreadBudgetZero,
386            paused_delay_ms,
387        );
388    }
389    if signals.operator_disabled && !signals.force {
390        return stopped_scheduler_decision(
391            decision,
392            SemanticBackfillSchedulerState::Disabled,
393            SemanticBackfillSchedulerReason::OperatorDisabled,
394            paused_delay_ms,
395        );
396    }
397    if !policy.mode.should_build_semantic() && !signals.force {
398        return stopped_scheduler_decision(
399            decision,
400            SemanticBackfillSchedulerState::Disabled,
401            SemanticBackfillSchedulerReason::PolicyDisabled,
402            paused_delay_ms,
403        );
404    }
405    if signals.lexical_repair_active && !signals.force {
406        return stopped_scheduler_decision(
407            decision,
408            SemanticBackfillSchedulerState::Paused,
409            SemanticBackfillSchedulerReason::LexicalRepairActive,
410            paused_delay_ms,
411        );
412    }
413    if signals.foreground_pressure && !signals.force {
414        return stopped_scheduler_decision(
415            decision,
416            SemanticBackfillSchedulerState::Paused,
417            SemanticBackfillSchedulerReason::ForegroundPressure,
418            paused_delay_ms,
419        );
420    }
421    if current_capacity_pct < min_capacity_pct && !signals.force {
422        return stopped_scheduler_decision(
423            decision,
424            SemanticBackfillSchedulerState::Paused,
425            SemanticBackfillSchedulerReason::CapacityBelowFloor,
426            paused_delay_ms,
427        );
428    }
429
430    let capacity = current_capacity_pct.clamp(1, 100) as usize;
431    let scaled = requested_batch_conversations.saturating_mul(capacity) / 100;
432    decision.scheduled_batch_conversations = scaled.max(1).min(requested_batch_conversations);
433    decision
434}
435
436fn stopped_scheduler_decision(
437    mut decision: SemanticBackfillSchedulerDecision,
438    state: SemanticBackfillSchedulerState,
439    reason: SemanticBackfillSchedulerReason,
440    next_eligible_after_ms: u64,
441) -> SemanticBackfillSchedulerDecision {
442    decision.state = state;
443    decision.reason = reason;
444    decision.scheduled_batch_conversations = 0;
445    decision.next_eligible_after_ms = next_eligible_after_ms;
446    decision
447}
448
449fn now_ms() -> i64 {
450    SystemTime::now()
451        .duration_since(UNIX_EPOCH)
452        .map(|duration| duration.as_millis() as i64)
453        .unwrap_or(0)
454}
455
456fn hnsw_index_path(data_dir: &Path, embedder_id: &str) -> PathBuf {
457    data_dir
458        .join(VECTOR_INDEX_DIR)
459        .join(format!("hnsw-{embedder_id}.chsw"))
460}
461
462fn safe_path_component(raw: &str) -> String {
463    raw.chars()
464        .map(|ch| {
465            if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
466                ch
467            } else {
468                '_'
469            }
470        })
471        .collect()
472}
473
474fn semantic_staging_index_path(
475    data_dir: &Path,
476    tier: TierKind,
477    embedder_id: &str,
478    db_fingerprint: &str,
479) -> PathBuf {
480    let fingerprint_hash = crc32fast::hash(db_fingerprint.as_bytes());
481    data_dir.join(VECTOR_INDEX_DIR).join(format!(
482        ".staging-{}-{}-{fingerprint_hash:08x}.fsvi",
483        tier.as_str(),
484        safe_path_component(embedder_id)
485    ))
486}
487
488fn semantic_generation_fingerprint_component(db_fingerprint: &str) -> String {
489    blake3::hash(db_fingerprint.as_bytes())
490        .to_hex()
491        .chars()
492        .take(16)
493        .collect()
494}
495
496fn semantic_shard_generation_dir(
497    data_dir: &Path,
498    tier: TierKind,
499    embedder_id: &str,
500    db_fingerprint: &str,
501) -> PathBuf {
502    let fingerprint_hash = semantic_generation_fingerprint_component(db_fingerprint);
503    data_dir.join(VECTOR_INDEX_DIR).join("shards").join(format!(
504        "{}-{}-{fingerprint_hash}",
505        tier.as_str(),
506        safe_path_component(embedder_id),
507    ))
508}
509
510fn semantic_shard_index_path(
511    data_dir: &Path,
512    tier: TierKind,
513    embedder_id: &str,
514    db_fingerprint: &str,
515    shard_index: u32,
516) -> PathBuf {
517    semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
518        .join(format!("shard-{shard_index:05}.fsvi"))
519}
520
521fn semantic_shard_ann_index_path(
522    data_dir: &Path,
523    tier: TierKind,
524    embedder_id: &str,
525    db_fingerprint: &str,
526    shard_index: u32,
527) -> PathBuf {
528    semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
529        .join(format!("shard-{shard_index:05}.chsw"))
530}
531
532#[cfg(not(windows))]
533fn sync_parent_directory(path: &Path) -> Result<()> {
534    let Some(parent) = path.parent() else {
535        return Ok(());
536    };
537    let directory = fs::File::open(parent)
538        .with_context(|| format!("opening parent directory {}", parent.display()))?;
539    directory
540        .sync_all()
541        .with_context(|| format!("syncing parent directory {}", parent.display()))
542}
543
544#[cfg(windows)]
545fn sync_parent_directory(_path: &Path) -> Result<()> {
546    Ok(())
547}
548
549fn semantic_doc_id_for_embedded(embedded: &EmbeddedMessage) -> String {
550    SemanticDocId {
551        message_id: embedded.message_id,
552        chunk_idx: embedded.chunk_idx,
553        agent_id: embedded.agent_id,
554        workspace_id: embedded.workspace_id,
555        source_id: embedded.source_id,
556        role: embedded.role,
557        created_at_ms: embedded.created_at_ms,
558        content_hash: Some(embedded.content_hash),
559    }
560    .to_doc_id_string()
561}
562
563struct CanonicalEmbeddingConversationRow {
564    conversation_id: i64,
565    agent_slug: String,
566    agent_id: i64,
567    workspace: Option<PathBuf>,
568    workspace_id: Option<i64>,
569    external_id: Option<String>,
570    title: Option<String>,
571    source_path: PathBuf,
572    started_at: Option<i64>,
573    ended_at: Option<i64>,
574    source_id: Option<String>,
575    origin_host: Option<String>,
576}
577
578struct CanonicalEmbeddingBatch {
579    inputs: Vec<EmbeddingInput>,
580    conversations_in_batch: u64,
581    last_conversation_id: i64,
582    total_conversations: u64,
583}
584
585#[derive(Debug, Clone, Copy, PartialEq, Eq)]
586struct SemanticCheckpointCaps {
587    max_messages: usize,
588    max_bytes: u64,
589}
590
591impl SemanticCheckpointCaps {
592    fn unlimited() -> Self {
593        Self {
594            max_messages: 0,
595            max_bytes: 0,
596        }
597    }
598
599    fn from_env() -> Self {
600        Self {
601            max_messages: resolved_env_usize(
602                "CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT",
603                DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT,
604            ),
605            max_bytes: resolved_env_u64(
606                "CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT",
607                DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT,
608            ),
609        }
610    }
611
612    fn message_limited(self) -> bool {
613        self.max_messages > 0
614    }
615
616    fn byte_limited(self) -> bool {
617        self.max_bytes > 0
618    }
619}
620
621pub(crate) struct CanonicalIncrementalEmbeddingBatch {
622    pub inputs: Vec<EmbeddingInput>,
623    pub conversations_in_batch: u64,
624    pub raw_max_message_id: Option<i64>,
625}
626
627fn total_semantic_conversations(storage: &FrankenStorage) -> Result<u64> {
628    let hinted_fallback_sql = "SELECT c.id
629             FROM conversations c
630             WHERE EXISTS (
631                 SELECT 1
632                 FROM messages INDEXED BY sqlite_autoindex_messages_1
633                 WHERE conversation_id = c.id
634                 LIMIT 1
635             )";
636    let fallback_sql = "SELECT c.id
637             FROM conversations c
638             WHERE EXISTS (
639                 SELECT 1
640                 FROM messages
641                 WHERE conversation_id = c.id
642                 LIMIT 1
643             )";
644    let count: i64 = storage
645        .raw()
646        .query_row_map(
647            "SELECT COUNT(*) FROM conversations WHERE message_count > 0",
648            &[] as &[ParamValue],
649            |row| row.get_typed(0),
650        )
651        .or_else(|err| {
652            if !err.to_string().contains("no such column: message_count") {
653                return Err(err);
654            }
655            let conversation_ids: Vec<i64> = storage
656                .raw()
657                .query_map_collect(hinted_fallback_sql, &[] as &[ParamValue], |row| {
658                    row.get_typed(0)
659                })
660                .or_else(|err| {
661                    if err
662                        .to_string()
663                        .contains("no such index: sqlite_autoindex_messages_1")
664                    {
665                        return storage.raw().query_map_collect(
666                            fallback_sql,
667                            &[] as &[ParamValue],
668                            |row| row.get_typed(0),
669                        );
670                    }
671                    Err(err)
672                })?;
673            Ok(i64::try_from(conversation_ids.len()).unwrap_or(i64::MAX))
674        })
675        .with_context(|| "counting canonical conversations with semantic messages")?;
676    Ok(u64::try_from(count.max(0)).unwrap_or(u64::MAX))
677}
678
679pub(crate) fn message_id_from_db(raw: i64) -> Option<u64> {
680    u64::try_from(raw).ok()
681}
682
683pub(crate) fn saturating_u32_from_i64(raw: i64) -> u32 {
684    match u32::try_from(raw) {
685        Ok(value) => value,
686        Err(_) if raw.is_negative() => 0,
687        Err(_) => u32::MAX,
688    }
689}
690
691fn canonical_embedding_created_at_ms(message_id: u64, created_at: Option<i64>) -> i64 {
692    // `created_at_ms` feeds time-range filters in the vector index
693    // (src/search/vector_index.rs range predicates) and contributes to
694    // `stable_hit_hash`. Defaulting a NULL created_at to 0 silently
695    // masquerades the message as Unix-epoch (1970), which is indistinguishable
696    // from a legitimately-ancient row in downstream filters. Emit a warn
697    // so operators see NULL-created_at rows in the logs instead of only
698    // finding them by puzzling over 1970 timestamps in semantic hits.
699    created_at.unwrap_or_else(|| {
700        tracing::warn!(
701            message_id,
702            "semantic backfill: row has NULL created_at; defaulting to 0 (Unix epoch). \
703             Downstream time-range filters will treat this message as 1970-01-01."
704        );
705        0
706    })
707}
708
709fn canonical_embedding_packet_provenance(
710    row: &CanonicalEmbeddingConversationRow,
711) -> ConversationPacketProvenance {
712    let source_id =
713        normalized_index_source_id(row.source_id.as_deref(), None, row.origin_host.as_deref());
714    ConversationPacketProvenance {
715        origin_kind: normalized_index_origin_kind(&source_id, None),
716        origin_host: normalized_index_origin_host(row.origin_host.as_deref()),
717        source_id,
718    }
719}
720
721fn canonical_embedding_conversation(
722    row: &CanonicalEmbeddingConversationRow,
723    provenance: &ConversationPacketProvenance,
724    messages: Vec<Message>,
725) -> Conversation {
726    Conversation {
727        id: Some(row.conversation_id),
728        agent_slug: row.agent_slug.clone(),
729        workspace: row.workspace.clone(),
730        external_id: row.external_id.clone(),
731        title: row.title.clone(),
732        source_path: row.source_path.clone(),
733        started_at: row.started_at,
734        ended_at: row.ended_at,
735        approx_tokens: None,
736        metadata_json: serde_json::Value::Null,
737        messages,
738        source_id: provenance.source_id.clone(),
739        origin_host: provenance.origin_host.clone(),
740    }
741}
742
743fn embedding_input_from_packet_message(
744    conversation_id: i64,
745    agent_id: u32,
746    workspace_id: u32,
747    source_id_hash: u32,
748    message: &crate::model::conversation_packet::ConversationPacketMessage,
749) -> Option<EmbeddingInput> {
750    let Some(raw_message_id) = message.message_id else {
751        tracing::warn!(
752            conversation_id,
753            message_idx = message.idx,
754            "skipping semantic backfill message without canonical id in ConversationPacket replay"
755        );
756        return None;
757    };
758    let Some(message_id) = message_id_from_db(raw_message_id) else {
759        tracing::warn!(
760            conversation_id,
761            raw_message_id,
762            "skipping out-of-range id during semantic backfill"
763        );
764        return None;
765    };
766    Some(EmbeddingInput {
767        message_id,
768        created_at_ms: canonical_embedding_created_at_ms(message_id, message.created_at),
769        agent_id,
770        workspace_id,
771        source_id: source_id_hash,
772        role: role_code_from_str(&message.role).unwrap_or(ROLE_USER),
773        chunk_idx: 0,
774        content: message.content.clone(),
775    })
776}
777
778fn embedding_inputs_from_conversation_packet(
779    row: &CanonicalEmbeddingConversationRow,
780    packet: &ConversationPacket,
781) -> Vec<EmbeddingInput> {
782    let agent_id = saturating_u32_from_i64(row.agent_id);
783    let workspace_id = saturating_u32_from_i64(row.workspace_id.unwrap_or(0));
784    let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
785    packet
786        .projections
787        .semantic
788        .message_indices
789        .iter()
790        .filter_map(|message_index| {
791            packet
792                .payload
793                .messages
794                .get(*message_index)
795                .and_then(|message| {
796                    embedding_input_from_packet_message(
797                        row.conversation_id,
798                        agent_id,
799                        workspace_id,
800                        source_id_hash,
801                        message,
802                    )
803                })
804        })
805        .collect()
806}
807
808fn fetch_canonical_embedding_conversations(
809    storage: &FrankenStorage,
810    conversation_ids: &[i64],
811) -> Result<Vec<CanonicalEmbeddingConversationRow>> {
812    let mut envelope_sql = String::from(
813        "SELECT c.id,
814                COALESCE(a.slug, 'unknown'),
815                COALESCE(c.agent_id, 0),
816                c.workspace_id,
817                w.path,
818                c.external_id,
819                c.title,
820                c.source_path,
821                c.started_at,
822                c.ended_at,
823                c.source_id,
824                c.origin_host
825         FROM conversations c
826         LEFT JOIN agents a ON a.id = c.agent_id
827         LEFT JOIN workspaces w ON w.id = c.workspace_id
828         WHERE c.id IN (",
829    );
830    let mut params = Vec::with_capacity(conversation_ids.len());
831    for (idx, conversation_id) in conversation_ids.iter().enumerate() {
832        if idx > 0 {
833            envelope_sql.push_str(", ");
834        }
835        envelope_sql.push_str(&format!("?{}", idx + 1));
836        params.push(ParamValue::from(*conversation_id));
837    }
838    envelope_sql.push_str(") ORDER BY c.id ASC");
839
840    storage
841        .raw()
842        .query_map_collect(&envelope_sql, &params, |row| {
843            let workspace_path: Option<String> = row.get_typed(4)?;
844            Ok(CanonicalEmbeddingConversationRow {
845                conversation_id: row.get_typed(0)?,
846                agent_slug: row.get_typed(1)?,
847                agent_id: row.get_typed(2)?,
848                workspace_id: row.get_typed(3)?,
849                workspace: workspace_path.map(PathBuf::from),
850                external_id: row.get_typed(5)?,
851                title: row.get_typed(6)?,
852                source_path: PathBuf::from(row.get_typed::<String>(7)?),
853                started_at: row.get_typed(8)?,
854                ended_at: row.get_typed(9)?,
855                source_id: row.get_typed(10)?,
856                origin_host: row.get_typed(11)?,
857            })
858        })
859        .with_context(|| {
860            format!(
861                "fetching semantic backfill conversation envelopes for {} conversations",
862                conversation_ids.len()
863            )
864        })
865}
866
867/// Per-packet semantic context that supplies the database-internal
868/// agent / workspace ids the canonical embedding row carries but the
869/// `ConversationPacket` does not (those ids are storage-internal,
870/// not part of the packet contract).
871///
872/// `coding_agent_session_search-ibuuh.32` (sink #3): when a caller
873/// already holds packets (rebuild pipeline, salvage replay, repair
874/// flows, etc.) it can pair them with their canonical
875/// agent_id/workspace_id and drive the semantic preparation consumer
876/// without a second storage round-trip.
877#[allow(dead_code)]
878#[derive(Debug, Clone, Copy)]
879pub(crate) struct SemanticPacketContext {
880    pub conversation_id: i64,
881    pub agent_id: u32,
882    pub workspace_id: u32,
883}
884
885/// Packet-driven counterpart to
886/// [`packet_embedding_inputs_from_storage`]: derives the same
887/// `EmbeddingInput` list a fresh storage replay would produce, but
888/// without re-querying canonical conversation rows.
889///
890/// Invariants:
891/// - The `i`th element of `contexts` describes the `i`th packet.
892/// - Returns `Err` if the lengths disagree, so a callsite cannot
893///   silently mis-correlate packets and contexts.
894/// - `source_id_hash` is derived from `packet.payload.provenance.source_id`
895///   the same way `embedding_inputs_from_conversation_packet` derives
896///   it from the canonical row, so the produced `EmbeddingInput.source_id`
897///   matches both paths byte-for-byte.
898///
899/// The `semantic_inputs_from_packets_matches_storage_replay`
900/// equivalence test pins every produced `EmbeddingInput` field is
901/// identical to what the legacy storage-side replay returns for the
902/// same canonical corpus, so callers that already hold packets can
903/// switch to this helper without changing semantic-index output.
904#[allow(dead_code)]
905pub(crate) fn semantic_inputs_from_packets(
906    packets: &[ConversationPacket],
907    contexts: &[SemanticPacketContext],
908) -> Result<Vec<EmbeddingInput>> {
909    if packets.len() != contexts.len() {
910        anyhow::bail!(
911            "semantic_inputs_from_packets length mismatch: {} packets vs {} contexts",
912            packets.len(),
913            contexts.len()
914        );
915    }
916    let mut inputs = Vec::new();
917    for (packet, context) in packets.iter().zip(contexts.iter()) {
918        let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
919        for &message_index in &packet.projections.semantic.message_indices {
920            let Some(message) = packet.payload.messages.get(message_index) else {
921                anyhow::bail!(
922                    "packet semantic projection references missing message index {} \
923                     (packet has {} messages)",
924                    message_index,
925                    packet.payload.messages.len()
926                );
927            };
928            if let Some(input) = embedding_input_from_packet_message(
929                context.conversation_id,
930                context.agent_id,
931                context.workspace_id,
932                source_id_hash,
933                message,
934            ) {
935                inputs.push(input);
936            }
937        }
938    }
939    tracing::debug!(
940        packets = packets.len(),
941        packet_driven = true,
942        semantic_inputs = inputs.len(),
943        "built semantic inputs from in-memory ConversationPacket batch"
944    );
945    Ok(inputs)
946}
947
948fn fetch_canonical_embedding_batch(
949    storage: &FrankenStorage,
950    after_conversation_id: i64,
951    max_conversations: usize,
952) -> Result<CanonicalEmbeddingBatch> {
953    fetch_canonical_embedding_batch_inner(storage, after_conversation_id, max_conversations, None)
954}
955
956/// Variant of [`fetch_canonical_embedding_batch`] that additionally
957/// filters out canonical messages with `message_id <= after_message_id`
958/// when set. This is how sub-fix 2 (`last_message_id` cursor) enforces
959/// the "resume MUST advance past `last_message_id`" rule on a partially
960/// embedded conversation.
961fn fetch_canonical_embedding_batch_inner(
962    storage: &FrankenStorage,
963    after_conversation_id: i64,
964    max_conversations: usize,
965    after_message_id: Option<i64>,
966) -> Result<CanonicalEmbeddingBatch> {
967    fetch_canonical_embedding_batch_inner_with_caps(
968        storage,
969        after_conversation_id,
970        max_conversations,
971        after_message_id,
972        SemanticCheckpointCaps::unlimited(),
973    )
974}
975
976fn fetch_canonical_embedding_batch_inner_with_caps(
977    storage: &FrankenStorage,
978    after_conversation_id: i64,
979    max_conversations: usize,
980    after_message_id: Option<i64>,
981    caps: SemanticCheckpointCaps,
982) -> Result<CanonicalEmbeddingBatch> {
983    let total_conversations = total_semantic_conversations(storage)?;
984    let max_conversations_i64 = i64::try_from(max_conversations.max(1)).unwrap_or(i64::MAX);
985    let mut params = vec![
986        ParamValue::from(after_conversation_id),
987        ParamValue::from(max_conversations_i64),
988    ];
989    let message_cursor_predicate = if let Some(after_message_id) = after_message_id {
990        params.push(ParamValue::from(after_message_id));
991        " AND id > ?3"
992    } else {
993        ""
994    };
995    let hinted_sql = format!(
996        "SELECT c.id
997         FROM conversations c
998         WHERE c.id > ?1
999           AND EXISTS (
1000               SELECT 1
1001               FROM messages INDEXED BY sqlite_autoindex_messages_1
1002               WHERE conversation_id = c.id{message_cursor_predicate}
1003               LIMIT 1
1004           )
1005         ORDER BY c.id ASC
1006         LIMIT ?2"
1007    );
1008    let fallback_sql = format!(
1009        "SELECT c.id
1010         FROM conversations c
1011         WHERE c.id > ?1
1012           AND EXISTS (
1013               SELECT 1
1014               FROM messages
1015               WHERE conversation_id = c.id{message_cursor_predicate}
1016               LIMIT 1
1017           )
1018         ORDER BY c.id ASC
1019         LIMIT ?2"
1020    );
1021    let conversation_ids: Vec<i64> = storage
1022        .raw()
1023        .query_map_collect(&hinted_sql, &params, |row| row.get_typed(0))
1024        .or_else(|err| {
1025            if err
1026                .to_string()
1027                .contains("no such index: sqlite_autoindex_messages_1")
1028            {
1029                return storage
1030                    .raw()
1031                    .query_map_collect(&fallback_sql, &params, |row| row.get_typed(0));
1032            }
1033            Err(err)
1034        })
1035        .with_context(|| {
1036            format!("fetching semantic backfill conversation ids after {after_conversation_id}")
1037        })?;
1038
1039    if conversation_ids.is_empty() {
1040        return Ok(CanonicalEmbeddingBatch {
1041            inputs: Vec::new(),
1042            conversations_in_batch: 0,
1043            last_conversation_id: after_conversation_id,
1044            total_conversations,
1045        });
1046    }
1047
1048    let conversations = fetch_canonical_embedding_conversations(storage, &conversation_ids)?;
1049
1050    let mut grouped_messages =
1051        storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
1052    let (conversations, capped_last_conversation_id) = select_checkpoint_capped_conversations(
1053        conversations,
1054        &mut grouped_messages,
1055        after_message_id,
1056        caps,
1057    );
1058    let (inputs, _) = packet_embedding_inputs_from_materialized_canonical_messages(
1059        &conversations,
1060        &mut grouped_messages,
1061        |_| true,
1062    );
1063
1064    let conversations_in_batch = u64::try_from(conversations.len()).unwrap_or(u64::MAX);
1065    tracing::debug!(
1066        conversations_in_batch,
1067        packet_driven = true,
1068        semantic_inputs = inputs.len(),
1069        max_messages_per_checkpoint = caps.max_messages,
1070        max_bytes_per_checkpoint = caps.max_bytes,
1071        ?after_message_id,
1072        "built semantic backfill batch from ConversationPacket canonical replay"
1073    );
1074
1075    Ok(CanonicalEmbeddingBatch {
1076        inputs,
1077        conversations_in_batch,
1078        last_conversation_id: capped_last_conversation_id.unwrap_or(after_conversation_id),
1079        total_conversations,
1080    })
1081}
1082
1083fn select_checkpoint_capped_conversations(
1084    conversations: Vec<CanonicalEmbeddingConversationRow>,
1085    grouped_messages: &mut HashMap<i64, Vec<Message>>,
1086    after_message_id: Option<i64>,
1087    caps: SemanticCheckpointCaps,
1088) -> (Vec<CanonicalEmbeddingConversationRow>, Option<i64>) {
1089    let mut selected = Vec::new();
1090    let mut selected_messages = 0usize;
1091    let mut selected_bytes = 0u64;
1092    let mut last_conversation_id = None;
1093
1094    for conversation in conversations {
1095        let mut messages = grouped_messages
1096            .remove(&conversation.conversation_id)
1097            .unwrap_or_default();
1098        if let Some(min_exclusive) = after_message_id {
1099            messages.retain(|message| message.id.is_some_and(|id| id > min_exclusive));
1100        }
1101        if messages.is_empty() {
1102            continue;
1103        }
1104
1105        let message_count = messages.len();
1106        let byte_count = messages
1107            .iter()
1108            .map(|message| saturating_u64_from_usize(message.content.len()))
1109            .fold(0u64, u64::saturating_add);
1110        let would_exceed_messages = caps.message_limited()
1111            && !selected.is_empty()
1112            && selected_messages.saturating_add(message_count) > caps.max_messages;
1113        let would_exceed_bytes = caps.byte_limited()
1114            && !selected.is_empty()
1115            && selected_bytes.saturating_add(byte_count) > caps.max_bytes;
1116        if would_exceed_messages || would_exceed_bytes {
1117            tracing::debug!(
1118                conversation_id = conversation.conversation_id,
1119                selected_conversations = selected.len(),
1120                selected_messages,
1121                selected_bytes,
1122                candidate_messages = message_count,
1123                candidate_bytes = byte_count,
1124                max_messages_per_checkpoint = caps.max_messages,
1125                max_bytes_per_checkpoint = caps.max_bytes,
1126                "semantic checkpoint cap stopped this batch before the next full conversation"
1127            );
1128            break;
1129        }
1130
1131        selected_messages = selected_messages.saturating_add(message_count);
1132        selected_bytes = selected_bytes.saturating_add(byte_count);
1133        last_conversation_id = Some(conversation.conversation_id);
1134        grouped_messages.insert(conversation.conversation_id, messages);
1135        selected.push(conversation);
1136    }
1137
1138    (selected, last_conversation_id)
1139}
1140
1141pub(crate) fn packet_embedding_inputs_from_storage(
1142    storage: &FrankenStorage,
1143) -> Result<Vec<EmbeddingInput>> {
1144    Ok(fetch_canonical_embedding_batch(storage, 0, usize::MAX)?.inputs)
1145}
1146
1147fn packet_embedding_inputs_from_selected_canonical_messages<F>(
1148    storage: &FrankenStorage,
1149    conversation_ids: &[i64],
1150    include_message: F,
1151) -> Result<(Vec<EmbeddingInput>, Option<i64>)>
1152where
1153    F: FnMut(&Message) -> bool,
1154{
1155    if conversation_ids.is_empty() {
1156        return Ok((Vec::new(), None));
1157    }
1158
1159    let conversations = fetch_canonical_embedding_conversations(storage, conversation_ids)?;
1160    let mut grouped_messages =
1161        storage.fetch_messages_for_lexical_rebuild_batch(conversation_ids, None, None)?;
1162    Ok(
1163        packet_embedding_inputs_from_materialized_canonical_messages(
1164            &conversations,
1165            &mut grouped_messages,
1166            include_message,
1167        ),
1168    )
1169}
1170
1171fn packet_embedding_inputs_from_materialized_canonical_messages<F>(
1172    conversations: &[CanonicalEmbeddingConversationRow],
1173    grouped_messages: &mut HashMap<i64, Vec<Message>>,
1174    mut include_message: F,
1175) -> (Vec<EmbeddingInput>, Option<i64>)
1176where
1177    F: FnMut(&Message) -> bool,
1178{
1179    let mut inputs = Vec::new();
1180    let mut raw_max_message_id: Option<i64> = None;
1181
1182    for conversation in conversations {
1183        let mut messages = grouped_messages
1184            .remove(&conversation.conversation_id)
1185            .unwrap_or_default();
1186        messages.retain(|message| {
1187            let keep = include_message(message);
1188            if keep && let Some(message_id) = message.id {
1189                raw_max_message_id =
1190                    Some(raw_max_message_id.map_or(message_id, |current| current.max(message_id)));
1191            }
1192            keep
1193        });
1194        if messages.is_empty() {
1195            continue;
1196        }
1197
1198        let provenance = canonical_embedding_packet_provenance(conversation);
1199        let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
1200        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
1201        inputs.extend(embedding_inputs_from_conversation_packet(
1202            conversation,
1203            &packet,
1204        ));
1205    }
1206
1207    (inputs, raw_max_message_id)
1208}
1209
1210pub(crate) fn packet_embedding_inputs_from_storage_since(
1211    storage: &FrankenStorage,
1212    since_message_id: i64,
1213) -> Result<CanonicalIncrementalEmbeddingBatch> {
1214    let conversation_ids: Vec<i64> = storage
1215        .raw()
1216        .query_map_collect(
1217            "SELECT DISTINCT m.conversation_id
1218             FROM messages m
1219             WHERE m.id > ?1
1220             ORDER BY m.conversation_id ASC",
1221            &[ParamValue::from(since_message_id)],
1222            |row| row.get_typed(0),
1223        )
1224        .with_context(|| {
1225            format!(
1226                "fetching canonical semantic catch-up conversation ids after message {since_message_id}"
1227            )
1228        })?;
1229
1230    if conversation_ids.is_empty() {
1231        return Ok(CanonicalIncrementalEmbeddingBatch {
1232            inputs: Vec::new(),
1233            conversations_in_batch: 0,
1234            raw_max_message_id: None,
1235        });
1236    }
1237
1238    let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1239        storage,
1240        &conversation_ids,
1241        |message| message.id.is_some_and(|id| id > since_message_id),
1242    )?;
1243
1244    let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
1245    tracing::debug!(
1246        since_message_id,
1247        conversations_in_batch,
1248        packet_driven = true,
1249        semantic_inputs = inputs.len(),
1250        "built semantic catch-up batch from ConversationPacket canonical replay"
1251    );
1252
1253    Ok(CanonicalIncrementalEmbeddingBatch {
1254        inputs,
1255        conversations_in_batch,
1256        raw_max_message_id,
1257    })
1258}
1259
1260pub(crate) fn packet_embedding_inputs_from_storage_for_message_ids(
1261    storage: &FrankenStorage,
1262    conversation_ids: &[i64],
1263    message_ids: &HashSet<i64>,
1264) -> Result<Vec<EmbeddingInput>> {
1265    if conversation_ids.is_empty() || message_ids.is_empty() {
1266        return Ok(Vec::new());
1267    }
1268
1269    let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1270        storage,
1271        conversation_ids,
1272        |message| message.id.is_some_and(|id| message_ids.contains(&id)),
1273    )?;
1274    tracing::debug!(
1275        conversations_in_batch = conversation_ids.len(),
1276        selected_message_ids = message_ids.len(),
1277        semantic_inputs = inputs.len(),
1278        raw_max_message_id,
1279        packet_driven = true,
1280        "built selected semantic batch from ConversationPacket canonical replay"
1281    );
1282
1283    Ok(inputs)
1284}
1285
1286struct Prepared<'a> {
1287    msg: &'a EmbeddingInput,
1288    canonical: String,
1289    hash: [u8; 32],
1290}
1291
1292#[derive(Debug, Clone, PartialEq, Eq)]
1293struct MemoizedPreparedMessage {
1294    canonical: String,
1295    hash: [u8; 32],
1296}
1297
1298fn semantic_prep_memo_key(content: &str) -> MemoKey {
1299    MemoKey::new(
1300        MemoContentHash::from_bytes(content_hash(content).to_vec()),
1301        SEMANTIC_PREP_MEMO_ALGORITHM,
1302        SEMANTIC_PREP_MEMO_VERSION,
1303    )
1304}
1305
1306fn memo_counter_delta(after: u64, before: u64) -> u64 {
1307    after.saturating_sub(before)
1308}
1309
1310fn trace_semantic_prep_memo_window(
1311    window_index: usize,
1312    window_len: usize,
1313    prepared_len: usize,
1314    entry_capacity: usize,
1315    before: &crate::indexer::memoization::MemoCacheStats,
1316    after: &crate::indexer::memoization::MemoCacheStats,
1317) {
1318    tracing::trace!(
1319        algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1320        algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1321        window_index,
1322        window_len,
1323        prepared_messages = prepared_len,
1324        skipped_messages = window_len.saturating_sub(prepared_len),
1325        hit_delta = memo_counter_delta(after.hits, before.hits),
1326        miss_delta = memo_counter_delta(after.misses, before.misses),
1327        insert_delta = memo_counter_delta(after.inserts, before.inserts),
1328        evictions_capacity_delta =
1329            memo_counter_delta(after.evictions_capacity, before.evictions_capacity),
1330        quarantined_delta = memo_counter_delta(after.quarantined, before.quarantined),
1331        live_entries = after.live_entries,
1332        entry_capacity,
1333        "semantic prep memo cache window"
1334    );
1335}
1336
1337fn trace_semantic_prep_memo_audit(audit: &MemoCacheAuditRecord) {
1338    tracing::trace!(?audit, "semantic prep memo cache audit");
1339}
1340
1341fn prepare_window_with_memo<'a>(
1342    window: &'a [EmbeddingInput],
1343    cache: &mut ContentAddressedMemoCache<MemoizedPreparedMessage>,
1344) -> Vec<Prepared<'a>> {
1345    window
1346        .iter()
1347        .filter_map(|msg| {
1348            let key = semantic_prep_memo_key(&msg.content);
1349            let (lookup, lookup_audit) = cache.get_with_audit(&key);
1350            trace_semantic_prep_memo_audit(&lookup_audit);
1351            match lookup {
1352                MemoLookup::Hit { value } => Some(Prepared {
1353                    msg,
1354                    canonical: value.canonical,
1355                    hash: value.hash,
1356                }),
1357                MemoLookup::Miss | MemoLookup::Quarantined { .. } => {
1358                    let canonical = canonicalize_for_embedding(&msg.content);
1359                    if canonical.is_empty() {
1360                        return None;
1361                    }
1362                    let hash = content_hash(&canonical);
1363                    let insert_audit = cache.insert_with_audit(
1364                        key,
1365                        MemoizedPreparedMessage {
1366                            canonical: canonical.clone(),
1367                            hash,
1368                        },
1369                    );
1370                    trace_semantic_prep_memo_audit(&insert_audit);
1371                    Some(Prepared {
1372                        msg,
1373                        canonical,
1374                        hash,
1375                    })
1376                }
1377            }
1378        })
1379        .collect()
1380}
1381
1382/// Canonicalize + hash a window of messages. Default is serial; opt in to
1383/// the rayon-parallel path via `CASS_SEMANTIC_PREP_PARALLEL=1` (see the
1384/// `parallel_prep_enabled` docstring for why it is not the default).
1385/// Parallel results preserve input order via `par_iter().filter_map().collect()`.
1386/// Messages whose canonical form is empty are filtered out so the embedder
1387/// batch is never polluted with useless inputs.
1388fn prepare_window<'a>(window: &'a [EmbeddingInput], serial: bool) -> Vec<Prepared<'a>> {
1389    let prep = |msg: &'a EmbeddingInput| -> Option<Prepared<'a>> {
1390        let canonical = canonicalize_for_embedding(&msg.content);
1391        if canonical.is_empty() {
1392            return None;
1393        }
1394        let hash = content_hash(&canonical);
1395        Some(Prepared {
1396            msg,
1397            canonical,
1398            hash,
1399        })
1400    };
1401
1402    if serial {
1403        window.iter().filter_map(prep).collect()
1404    } else {
1405        window.par_iter().filter_map(prep).collect()
1406    }
1407}
1408
1409fn flush_prepared_batch(
1410    batch: &[Prepared<'_>],
1411    embeddings: &mut Vec<EmbeddedMessage>,
1412    pb: &ProgressBar,
1413    embedder: &dyn Embedder,
1414) -> Result<()> {
1415    if batch.is_empty() {
1416        return Ok(());
1417    }
1418
1419    let texts: Vec<&str> = batch.iter().map(|p| p.canonical.as_str()).collect();
1420    let vectors = embedder
1421        .embed_batch_sync(&texts)
1422        .map_err(|e| anyhow::anyhow!("embedding failed: {e}"))?;
1423
1424    if vectors.len() != batch.len() {
1425        bail!(
1426            "embedder returned {} embeddings for {} inputs",
1427            vectors.len(),
1428            batch.len()
1429        );
1430    }
1431
1432    for (prepared, vector) in batch.iter().zip(vectors) {
1433        if vector.len() != embedder.dimension() {
1434            bail!(
1435                "embedding dimension mismatch: expected {}, got {}",
1436                embedder.dimension(),
1437                vector.len()
1438            );
1439        }
1440        embeddings.push(EmbeddedMessage {
1441            message_id: prepared.msg.message_id,
1442            created_at_ms: prepared.msg.created_at_ms,
1443            agent_id: prepared.msg.agent_id,
1444            workspace_id: prepared.msg.workspace_id,
1445            source_id: prepared.msg.source_id,
1446            role: prepared.msg.role,
1447            chunk_idx: prepared.msg.chunk_idx,
1448            content_hash: prepared.hash,
1449            embedding: vector,
1450        });
1451    }
1452
1453    pb.inc(saturating_u64_from_usize(batch.len()));
1454    Ok(())
1455}
1456
1457pub struct SemanticIndexer {
1458    embedder: Box<dyn Embedder>,
1459    batch_size: usize,
1460}
1461
1462impl SemanticIndexer {
1463    pub fn new(embedder_type: &str, data_dir: Option<&Path>) -> Result<Self> {
1464        let embedder: Box<dyn Embedder> = match embedder_type {
1465            "fastembed" | "minilm" | "snowflake-arctic-s" | "nomic-embed" => {
1466                let dir = data_dir
1467                    .ok_or_else(|| anyhow::anyhow!("data_dir required for fastembed embedder"))?;
1468                let embedder_name = if embedder_type == "fastembed" {
1469                    "minilm"
1470                } else {
1471                    embedder_type
1472                };
1473                Box::new(
1474                    FastEmbedder::load_by_name(dir, embedder_name)
1475                        .map_err(|e| anyhow::anyhow!("fastembed unavailable: {e}"))?,
1476                )
1477            }
1478            "hash" => Box::new(HashEmbedder::default()),
1479            other => bail!("unknown embedder: {other}"),
1480        };
1481
1482        Ok(Self {
1483            embedder,
1484            batch_size: resolved_default_batch_size(),
1485        })
1486    }
1487
1488    pub fn with_batch_size(mut self, batch_size: usize) -> Result<Self> {
1489        if batch_size == 0 {
1490            bail!("batch_size must be > 0");
1491        }
1492        self.batch_size = batch_size;
1493        Ok(self)
1494    }
1495
1496    pub fn batch_size(&self) -> usize {
1497        self.batch_size
1498    }
1499
1500    pub fn embedder_id(&self) -> &str {
1501        self.embedder.id()
1502    }
1503
1504    pub fn embedder_dimension(&self) -> usize {
1505        self.embedder.dimension()
1506    }
1507
1508    pub fn embed_messages(&self, messages: &[EmbeddingInput]) -> Result<Vec<EmbeddedMessage>> {
1509        self.embed_messages_with_sink(messages, &SemanticProgressSink::disabled())
1510    }
1511
1512    /// Variant of [`embed_messages`] that emits `embed_batch_*` events
1513    /// into the given JSONL sink. The sink is silent unless
1514    /// `CASS_SEMANTIC_PROGRESS_JSONL` is set, so this path is safe to
1515    /// take in production.
1516    pub fn embed_messages_with_sink(
1517        &self,
1518        messages: &[EmbeddingInput],
1519        sink: &SemanticProgressSink,
1520    ) -> Result<Vec<EmbeddedMessage>> {
1521        if messages.is_empty() {
1522            return Ok(Vec::new());
1523        }
1524
1525        let show_progress = std::io::stderr().is_terminal();
1526        let pb = ProgressBar::new(saturating_u64_from_usize(messages.len()));
1527        if show_progress {
1528            let style = ProgressStyle::default_bar()
1529                .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} messages embedded")
1530                .unwrap_or_else(|_| ProgressStyle::default_bar());
1531            pb.set_style(style);
1532        } else {
1533            pb.set_draw_target(ProgressDrawTarget::hidden());
1534        }
1535
1536        let mut embeddings = Vec::with_capacity(messages.len());
1537
1538        // Process the corpus in windows of ~4 batches. Within each window,
1539        // rayon parallelizes the canonicalize + hash prep across cores; the
1540        // ONNX embedder is then fed serially in `batch_size` chunks so its
1541        // internal thread pool stays saturated without being starved by the
1542        // single-threaded prep loop we had before. `with_batch_size` and
1543        // `resolved_default_batch_size` both guarantee `batch_size >= 1`,
1544        // so saturating_mul(4) is always >= batch_size — no further clamp.
1545        let window = self.batch_size.saturating_mul(4);
1546        let serial_prep = !parallel_prep_enabled();
1547        let prep_memo_capacity = resolved_semantic_prep_memo_capacity();
1548        let mut prep_memo =
1549            serial_prep.then(|| ContentAddressedMemoCache::with_capacity(prep_memo_capacity));
1550        let mut batch_index: u64 = 0;
1551        let mut rows_processed: u64 = 0;
1552        let rows_total = u64::try_from(messages.len()).ok();
1553        let warn_after_ms = resolved_semantic_embed_batch_warn_after_ms();
1554        let fail_after_ms = resolved_semantic_embed_batch_fail_after_ms();
1555        for (window_index, window_slice) in messages.chunks(window).enumerate() {
1556            let prepared_window = match prep_memo.as_mut() {
1557                Some(cache) => {
1558                    let stats_before = cache.stats().clone();
1559                    let prepared_window = prepare_window_with_memo(window_slice, cache);
1560                    trace_semantic_prep_memo_window(
1561                        window_index,
1562                        window_slice.len(),
1563                        prepared_window.len(),
1564                        prep_memo_capacity,
1565                        &stats_before,
1566                        cache.stats(),
1567                    );
1568                    prepared_window
1569                }
1570                None => prepare_window(window_slice, false),
1571            };
1572            let skipped_in_window = window_slice.len() - prepared_window.len();
1573            if skipped_in_window > 0 {
1574                pb.inc(saturating_u64_from_usize(skipped_in_window));
1575            }
1576
1577            for batch in prepared_window.chunks(self.batch_size) {
1578                let batch_rows = u64::try_from(batch.len()).unwrap_or(u64::MAX);
1579                // Sum the canonicalized byte count so an operator can
1580                // distinguish a stalled inference from a stalled query —
1581                // a tiny `bytes` value paired with a long batch wall-time
1582                // points at the model; a huge `bytes` paired with a short
1583                // wall-time points at the storage side.
1584                let batch_bytes: u64 = batch
1585                    .iter()
1586                    .map(|p| saturating_u64_from_usize(p.canonical.len()))
1587                    .sum();
1588                if sink.is_active() {
1589                    sink.emit(
1590                        SemanticProgressEvent::EmbedBatchStart,
1591                        SemanticProgressFields {
1592                            batch_index: Some(batch_index),
1593                            batch_rows: Some(batch_rows),
1594                            rows_processed: Some(rows_processed),
1595                            rows_total,
1596                            bytes: Some(batch_bytes),
1597                            ..Default::default()
1598                        },
1599                    );
1600                }
1601                let batch_started = Instant::now();
1602                flush_prepared_batch(batch, &mut embeddings, &pb, self.embedder.as_ref())?;
1603                let elapsed_ms = saturating_u64_from_millis(batch_started.elapsed().as_millis());
1604                rows_processed = rows_processed.saturating_add(batch_rows);
1605                if warn_after_ms > 0 && elapsed_ms > warn_after_ms {
1606                    tracing::warn!(
1607                        batch_index,
1608                        elapsed_ms,
1609                        warn_after_ms,
1610                        batch_rows,
1611                        batch_bytes,
1612                        embedder = self.embedder_id(),
1613                        "semantic embed batch exceeded watchdog warning threshold"
1614                    );
1615                }
1616                if fail_after_ms > 0 && elapsed_ms > fail_after_ms {
1617                    bail!(
1618                        "semantic embed batch {batch_index} took {elapsed_ms}ms, exceeding \
1619                         CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS={fail_after_ms}"
1620                    );
1621                }
1622                if sink.is_active() {
1623                    sink.emit(
1624                        SemanticProgressEvent::EmbedBatchDone,
1625                        SemanticProgressFields {
1626                            batch_index: Some(batch_index),
1627                            batch_rows: Some(batch_rows),
1628                            rows_processed: Some(rows_processed),
1629                            rows_total,
1630                            bytes: Some(batch_bytes),
1631                            ..Default::default()
1632                        },
1633                    );
1634                }
1635                batch_index = batch_index.saturating_add(1);
1636            }
1637        }
1638
1639        if let Some(cache) = prep_memo.as_ref() {
1640            let stats = cache.stats();
1641            tracing::debug!(
1642                algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1643                algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1644                hits = stats.hits,
1645                misses = stats.misses,
1646                inserts = stats.inserts,
1647                quarantined = stats.quarantined,
1648                live_entries = stats.live_entries,
1649                entry_capacity = prep_memo_capacity,
1650                "semantic prep memo cache summary"
1651            );
1652        }
1653
1654        pb.finish_with_message("Embedding complete");
1655        Ok(embeddings)
1656    }
1657
1658    pub fn build_and_save_index<I>(
1659        &self,
1660        embedded_messages: I,
1661        data_dir: &Path,
1662    ) -> Result<FsVectorIndex>
1663    where
1664        I: IntoIterator<Item = EmbeddedMessage>,
1665    {
1666        let index_path = vector_index_path(data_dir, self.embedder_id());
1667        self.build_and_save_index_at_path(embedded_messages, &index_path)
1668    }
1669
1670    pub fn build_and_save_index_shards<I>(
1671        &self,
1672        embedded_messages: I,
1673        data_dir: &Path,
1674        plan: SemanticShardBuildPlan,
1675    ) -> Result<SemanticShardBuildOutcome>
1676    where
1677        I: IntoIterator<Item = EmbeddedMessage>,
1678    {
1679        if plan.db_fingerprint.trim().is_empty() {
1680            bail!("semantic shard build requires a non-empty DB fingerprint");
1681        }
1682        if plan.max_records_per_shard == 0 {
1683            bail!("semantic shard build requires max_records_per_shard > 0");
1684        }
1685
1686        let mut shard_records = Vec::new();
1687        let mut index_paths = Vec::new();
1688        let mut ann_index_paths = Vec::new();
1689        let mut current_records = Vec::with_capacity(plan.max_records_per_shard);
1690        let mut shard_index = 0u32;
1691        let mut total_docs = 0u64;
1692
1693        for embedded in embedded_messages {
1694            current_records.push(embedded);
1695            if current_records.len() >= plan.max_records_per_shard {
1696                let records = std::mem::take(&mut current_records);
1697                let (record, path, ann_path) =
1698                    self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1699                total_docs = total_docs.saturating_add(record.doc_count);
1700                shard_records.push(record);
1701                index_paths.push(path);
1702                if let Some(path) = ann_path {
1703                    ann_index_paths.push(path);
1704                }
1705                shard_index = shard_index
1706                    .checked_add(1)
1707                    .context("semantic shard index overflow")?;
1708            }
1709        }
1710
1711        if !current_records.is_empty() {
1712            let records = std::mem::take(&mut current_records);
1713            let (record, path, ann_path) =
1714                self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1715            total_docs = total_docs.saturating_add(record.doc_count);
1716            shard_records.push(record);
1717            index_paths.push(path);
1718            if let Some(path) = ann_path {
1719                ann_index_paths.push(path);
1720            }
1721        }
1722
1723        let shard_count = u32::try_from(shard_records.len())
1724            .context("semantic shard generation exceeded u32 shard count")?;
1725        for record in &mut shard_records {
1726            record.shard_count = shard_count;
1727        }
1728
1729        let mut shard_manifest = SemanticShardManifest::load_or_default(data_dir)
1730            .map_err(|err| anyhow::anyhow!("loading semantic shard manifest for publish: {err}"))?;
1731        shard_manifest.replace_shards_for_generation(
1732            plan.tier,
1733            self.embedder_id(),
1734            &plan.db_fingerprint,
1735            shard_records,
1736        );
1737        shard_manifest
1738            .save(data_dir)
1739            .map_err(|err| anyhow::anyhow!("saving semantic shard manifest: {err}"))?;
1740        let summary = shard_manifest.summary(plan.tier, self.embedder_id(), &plan.db_fingerprint);
1741
1742        tracing::info!(
1743            tier = plan.tier.as_str(),
1744            embedder = self.embedder_id(),
1745            shard_count,
1746            doc_count = total_docs,
1747            total_conversations = plan.total_conversations,
1748            "published semantic shard generation sidecar"
1749        );
1750
1751        Ok(SemanticShardBuildOutcome {
1752            tier: plan.tier,
1753            embedder_id: self.embedder_id().to_string(),
1754            shard_count,
1755            doc_count: total_docs,
1756            total_conversations: plan.total_conversations,
1757            index_paths,
1758            ann_index_paths,
1759            shard_manifest_path: SemanticShardManifest::path(data_dir),
1760            complete: summary.complete,
1761        })
1762    }
1763
1764    fn write_semantic_shard(
1765        &self,
1766        embedded_messages: Vec<EmbeddedMessage>,
1767        data_dir: &Path,
1768        plan: &SemanticShardBuildPlan,
1769        shard_index: u32,
1770    ) -> Result<(SemanticShardRecord, PathBuf, Option<PathBuf>)> {
1771        let started_at_ms = now_ms();
1772        let shard_path = semantic_shard_index_path(
1773            data_dir,
1774            plan.tier,
1775            self.embedder_id(),
1776            &plan.db_fingerprint,
1777            shard_index,
1778        );
1779        let shard_index_file = self.build_and_save_index_at_path(embedded_messages, &shard_path)?;
1780        let size_bytes = fs::metadata(&shard_path)
1781            .with_context(|| format!("stat semantic shard {}", shard_path.display()))?
1782            .len();
1783        let (ann_index_path, ann_size_bytes, ann_ready, ann_absolute_path) = if plan.build_ann {
1784            let ann_path = semantic_shard_ann_index_path(
1785                data_dir,
1786                plan.tier,
1787                self.embedder_id(),
1788                &plan.db_fingerprint,
1789                shard_index,
1790            );
1791            let config = FsHnswConfig {
1792                m: FS_HNSW_DEFAULT_M,
1793                ef_construction: FS_HNSW_DEFAULT_EF_CONSTRUCTION,
1794                ..FsHnswConfig::default()
1795            };
1796            let hnsw = FsHnswIndex::build_from_vector_index(&shard_index_file, config)
1797                .map_err(|err| anyhow::anyhow!("build shard HNSW index failed: {err}"))?;
1798            hnsw.save(&ann_path)
1799                .map_err(|err| anyhow::anyhow!("save shard HNSW index failed: {err}"))?;
1800            let ann_size_bytes = fs::metadata(&ann_path)
1801                .with_context(|| format!("stat semantic shard ANN {}", ann_path.display()))?
1802                .len();
1803            let relative_ann_path = ann_path
1804                .strip_prefix(data_dir)
1805                .unwrap_or(ann_path.as_path())
1806                .to_string_lossy()
1807                .to_string();
1808            (
1809                Some(relative_ann_path),
1810                ann_size_bytes,
1811                true,
1812                Some(ann_path),
1813            )
1814        } else {
1815            (None, 0, false, None)
1816        };
1817        let relative_index_path = shard_path
1818            .strip_prefix(data_dir)
1819            .unwrap_or(shard_path.as_path())
1820            .to_string_lossy()
1821            .to_string();
1822        let record = SemanticShardRecord {
1823            tier: plan.tier,
1824            embedder_id: self.embedder_id().to_string(),
1825            model_revision: plan.model_revision.clone(),
1826            schema_version: SEMANTIC_SCHEMA_VERSION,
1827            chunking_version: CHUNKING_STRATEGY_VERSION,
1828            dimension: self.embedder_dimension(),
1829            shard_index,
1830            shard_count: 0,
1831            doc_count: u64::try_from(shard_index_file.record_count()).unwrap_or(u64::MAX),
1832            total_conversations: plan.total_conversations,
1833            db_fingerprint: plan.db_fingerprint.clone(),
1834            index_path: relative_index_path,
1835            quantization: "f16".to_string(),
1836            mmap_ready: true,
1837            ann_index_path,
1838            ann_size_bytes,
1839            ann_ready,
1840            size_bytes,
1841            started_at_ms,
1842            completed_at_ms: now_ms(),
1843            ready: true,
1844        };
1845        Ok((record, shard_path, ann_absolute_path))
1846    }
1847
1848    fn build_and_save_index_at_path<I>(
1849        &self,
1850        embedded_messages: I,
1851        index_path: &Path,
1852    ) -> Result<FsVectorIndex>
1853    where
1854        I: IntoIterator<Item = EmbeddedMessage>,
1855    {
1856        if let Some(parent) = index_path.parent() {
1857            std::fs::create_dir_all(parent)?;
1858        }
1859
1860        // Store as f16 by default (smaller, faster I/O). Embeddings are validated by the writer.
1861        let mut writer: FsVectorIndexWriter = FsVectorIndex::create_with_revision(
1862            index_path,
1863            self.embedder_id(),
1864            "1.0",
1865            self.embedder_dimension(),
1866            FsQuantization::F16,
1867        )
1868        .map_err(|err| anyhow::anyhow!("create fsvi index failed: {err}"))?;
1869
1870        let write_result: Result<()> = (|| {
1871            for embedded in embedded_messages {
1872                if embedded.embedding.len() != self.embedder_dimension() {
1873                    bail!(
1874                        "embedding dimension mismatch: expected {}, got {}",
1875                        self.embedder_dimension(),
1876                        embedded.embedding.len()
1877                    );
1878                }
1879                let doc_id = semantic_doc_id_for_embedded(&embedded);
1880                writer
1881                    .write_record(&doc_id, &embedded.embedding)
1882                    .map_err(|err| anyhow::anyhow!("write fsvi record failed: {err}"))?;
1883            }
1884            Ok(())
1885        })();
1886
1887        if let Err(e) = &write_result {
1888            // Clean up partial index file to prevent corruption
1889            tracing::warn!("removing partial vector index after write failure: {e}");
1890            if let Err(rm_err) = std::fs::remove_file(index_path) {
1891                tracing::error!(
1892                    "failed to remove partial index file {}: {rm_err}",
1893                    index_path.display()
1894                );
1895            }
1896            return Err(anyhow::anyhow!("{e}"));
1897        }
1898
1899        writer
1900            .finish()
1901            .map_err(|err| anyhow::anyhow!("finish fsvi index failed: {err}"))?;
1902
1903        FsVectorIndex::open(index_path)
1904            .map_err(|err| anyhow::anyhow!("open fsvi index failed: {err}"))
1905    }
1906
1907    /// Append new embeddings to an existing FSVI index via the WAL.
1908    ///
1909    /// Used for incremental semantic indexing in watch mode. Opens the
1910    /// existing index, appends a batch of new embeddings, and compacts if
1911    /// the WAL has grown large enough.
1912    ///
1913    /// Returns the number of entries appended.
1914    pub fn append_to_index(
1915        &self,
1916        embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1917        data_dir: &Path,
1918    ) -> Result<usize> {
1919        let index_path = vector_index_path(data_dir, self.embedder_id());
1920        self.append_to_index_path(embedded_messages, &index_path)
1921    }
1922
1923    fn append_to_index_path(
1924        &self,
1925        embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1926        index_path: &Path,
1927    ) -> Result<usize> {
1928        let mut index = FsVectorIndex::open(index_path)
1929            .map_err(|err| anyhow::anyhow!("open fsvi index for append: {err}"))?;
1930
1931        let entries: Vec<(String, Vec<f32>)> = embedded_messages
1932            .into_iter()
1933            .map(|em| {
1934                let doc_id = semantic_doc_id_for_embedded(&em);
1935                (doc_id, em.embedding)
1936            })
1937            .collect();
1938
1939        let count = entries.len();
1940        if count == 0 {
1941            return Ok(0);
1942        }
1943
1944        index
1945            .append_batch(&entries)
1946            .map_err(|err| anyhow::anyhow!("append_batch: {err}"))?;
1947
1948        if index.needs_compaction() {
1949            index
1950                .compact()
1951                .map_err(|err| anyhow::anyhow!("compaction: {err}"))?;
1952        }
1953
1954        Ok(count)
1955    }
1956
1957    fn write_backfill_staging_index(
1958        &self,
1959        embedded_messages: Vec<EmbeddedMessage>,
1960        staging_path: &Path,
1961        resume_existing: bool,
1962    ) -> Result<FsVectorIndex> {
1963        if resume_existing && staging_path.exists() {
1964            self.append_to_index_path(embedded_messages, staging_path)?;
1965            FsVectorIndex::open(staging_path)
1966                .map_err(|err| anyhow::anyhow!("open staged semantic index failed: {err}"))
1967        } else {
1968            self.build_and_save_index_at_path(embedded_messages, staging_path)
1969        }
1970    }
1971
1972    pub fn run_backfill_batch(
1973        &self,
1974        messages: &[EmbeddingInput],
1975        data_dir: &Path,
1976        manifest: &mut SemanticManifest,
1977        plan: SemanticBackfillBatchPlan,
1978    ) -> Result<SemanticBackfillBatchOutcome> {
1979        self.run_backfill_batch_with_sink(
1980            messages,
1981            data_dir,
1982            manifest,
1983            plan,
1984            None,
1985            &SemanticProgressSink::disabled(),
1986        )
1987    }
1988
1989    /// Variant of [`run_backfill_batch`] that emits semantic progress
1990    /// events to the given JSONL sink and persists `last_message_id`
1991    /// into the resumable checkpoint when supplied. The sink is silent
1992    /// unless `CASS_SEMANTIC_PROGRESS_JSONL` is set, so this path is
1993    /// safe to take in production.
1994    pub fn run_backfill_batch_with_sink(
1995        &self,
1996        messages: &[EmbeddingInput],
1997        data_dir: &Path,
1998        manifest: &mut SemanticManifest,
1999        plan: SemanticBackfillBatchPlan,
2000        last_message_id: Option<i64>,
2001        sink: &SemanticProgressSink,
2002    ) -> Result<SemanticBackfillBatchOutcome> {
2003        if plan.db_fingerprint.trim().is_empty() {
2004            bail!("semantic backfill requires a non-empty DB fingerprint");
2005        }
2006        if plan.total_conversations == 0 && plan.conversations_in_batch > 0 {
2007            bail!("semantic backfill batch cannot process conversations when total is zero");
2008        }
2009
2010        let manifest_path = SemanticManifest::path(data_dir);
2011        let staging_path = semantic_staging_index_path(
2012            data_dir,
2013            plan.tier,
2014            self.embedder_id(),
2015            &plan.db_fingerprint,
2016        );
2017        let final_path = vector_index_path(data_dir, self.embedder_id());
2018
2019        let prior_checkpoint = manifest
2020            .checkpoint
2021            .as_ref()
2022            .filter(|checkpoint| {
2023                checkpoint.tier == plan.tier
2024                    && checkpoint.embedder_id == self.embedder_id()
2025                    && checkpoint.is_valid(&plan.db_fingerprint)
2026            })
2027            .cloned();
2028        let prior_conversations = prior_checkpoint
2029            .as_ref()
2030            .map_or(0, |checkpoint| checkpoint.conversations_processed);
2031        let prior_docs = prior_checkpoint
2032            .as_ref()
2033            .map_or(0, |checkpoint| checkpoint.docs_embedded);
2034
2035        let embeddings = self.embed_messages_with_sink(messages, sink)?;
2036        let embedded_docs = u64::try_from(embeddings.len()).unwrap_or(u64::MAX);
2037        if sink.is_active() {
2038            sink.emit(
2039                SemanticProgressEvent::StagingWriteStart,
2040                SemanticProgressFields {
2041                    batch_rows: Some(embedded_docs),
2042                    note: Some(staging_path.display().to_string()),
2043                    ..Default::default()
2044                },
2045            );
2046        }
2047        let mut staged_index = self.write_backfill_staging_index(
2048            embeddings,
2049            &staging_path,
2050            prior_checkpoint.is_some(),
2051        )?;
2052        if sink.is_active() {
2053            sink.emit(
2054                SemanticProgressEvent::StagingWriteDone,
2055                SemanticProgressFields {
2056                    batch_rows: Some(embedded_docs),
2057                    note: Some(staging_path.display().to_string()),
2058                    ..Default::default()
2059                },
2060            );
2061        }
2062        let conversations_processed = prior_conversations
2063            .saturating_add(plan.conversations_in_batch)
2064            .min(plan.total_conversations);
2065        let complete = conversations_processed >= plan.total_conversations;
2066
2067        manifest.refresh_backlog(plan.total_conversations, &plan.db_fingerprint);
2068
2069        if complete {
2070            let db_fingerprint = plan.db_fingerprint.clone();
2071            if staged_index.wal_record_count() > 0 {
2072                staged_index.compact().map_err(|err| {
2073                    anyhow::anyhow!("compact staged semantic index failed: {err}")
2074                })?;
2075            }
2076            drop(staged_index);
2077            if sink.is_active() {
2078                sink.emit(
2079                    SemanticProgressEvent::PublishStart,
2080                    SemanticProgressFields {
2081                        rows_processed: Some(conversations_processed),
2082                        rows_total: Some(plan.total_conversations),
2083                        last_conversation_id: Some(plan.last_offset),
2084                        last_message_id,
2085                        note: Some(final_path.display().to_string()),
2086                        ..Default::default()
2087                    },
2088                );
2089            }
2090            fs::rename(&staging_path, &final_path).with_context(|| {
2091                format!(
2092                    "publishing staged semantic index {} to {}",
2093                    staging_path.display(),
2094                    final_path.display()
2095                )
2096            })?;
2097            sync_parent_directory(&final_path)?;
2098            let published_index = FsVectorIndex::open(&final_path)
2099                .map_err(|err| anyhow::anyhow!("open published semantic index failed: {err}"))?;
2100            let size_bytes = fs::metadata(&final_path)
2101                .with_context(|| format!("stat published semantic index {}", final_path.display()))?
2102                .len();
2103            let relative_index_path = final_path
2104                .strip_prefix(data_dir)
2105                .unwrap_or(final_path.as_path())
2106                .to_string_lossy()
2107                .to_string();
2108            manifest.publish_artifact(ArtifactRecord {
2109                tier: plan.tier,
2110                embedder_id: self.embedder_id().to_string(),
2111                model_revision: plan.model_revision,
2112                schema_version: SEMANTIC_SCHEMA_VERSION,
2113                chunking_version: CHUNKING_STRATEGY_VERSION,
2114                dimension: self.embedder_dimension(),
2115                doc_count: u64::try_from(published_index.record_count()).unwrap_or(u64::MAX),
2116                conversation_count: conversations_processed,
2117                db_fingerprint: plan.db_fingerprint,
2118                index_path: relative_index_path,
2119                size_bytes,
2120                started_at_ms: prior_checkpoint
2121                    .as_ref()
2122                    .map_or_else(now_ms, |checkpoint| checkpoint.saved_at_ms),
2123                completed_at_ms: now_ms(),
2124                ready: true,
2125            });
2126            manifest.refresh_backlog(plan.total_conversations, &db_fingerprint);
2127            manifest.save(data_dir)?;
2128            if sink.is_active() {
2129                sink.emit(
2130                    SemanticProgressEvent::PublishDone,
2131                    SemanticProgressFields {
2132                        rows_processed: Some(conversations_processed),
2133                        rows_total: Some(plan.total_conversations),
2134                        last_conversation_id: Some(plan.last_offset),
2135                        last_message_id,
2136                        note: Some(final_path.display().to_string()),
2137                        ..Default::default()
2138                    },
2139                );
2140            }
2141        } else {
2142            let docs_embedded_on_disk =
2143                u64::try_from(staged_index.record_count()).unwrap_or(u64::MAX);
2144            let checkpoint_docs = prior_docs
2145                .saturating_add(embedded_docs)
2146                .max(docs_embedded_on_disk);
2147            if sink.is_active() {
2148                sink.emit(
2149                    SemanticProgressEvent::CheckpointSaveStart,
2150                    SemanticProgressFields {
2151                        rows_processed: Some(conversations_processed),
2152                        rows_total: Some(plan.total_conversations),
2153                        last_conversation_id: Some(plan.last_offset),
2154                        last_message_id,
2155                        ..Default::default()
2156                    },
2157                );
2158            }
2159            // Preserve any existing `last_message_id` cursor when the
2160            // caller did not supply a fresher one — see sub-fix 2 for
2161            // why durable message-PK resume matters.
2162            let prior_last_message_id = prior_checkpoint
2163                .as_ref()
2164                .and_then(|checkpoint| checkpoint.last_message_id);
2165            manifest.save_checkpoint(BuildCheckpoint {
2166                tier: plan.tier,
2167                embedder_id: self.embedder_id().to_string(),
2168                last_offset: plan.last_offset,
2169                docs_embedded: checkpoint_docs,
2170                conversations_processed,
2171                total_conversations: plan.total_conversations,
2172                db_fingerprint: plan.db_fingerprint,
2173                schema_version: SEMANTIC_SCHEMA_VERSION,
2174                chunking_version: CHUNKING_STRATEGY_VERSION,
2175                saved_at_ms: now_ms(),
2176                last_message_id: last_message_id.or(prior_last_message_id),
2177            });
2178            manifest.save(data_dir)?;
2179            if sink.is_active() {
2180                sink.emit(
2181                    SemanticProgressEvent::CheckpointSaveDone,
2182                    SemanticProgressFields {
2183                        rows_processed: Some(conversations_processed),
2184                        rows_total: Some(plan.total_conversations),
2185                        last_conversation_id: Some(plan.last_offset),
2186                        last_message_id: last_message_id.or(prior_last_message_id),
2187                        ..Default::default()
2188                    },
2189                );
2190            }
2191        }
2192
2193        Ok(SemanticBackfillBatchOutcome {
2194            tier: plan.tier,
2195            embedder_id: self.embedder_id().to_string(),
2196            embedded_docs,
2197            conversations_processed,
2198            total_conversations: plan.total_conversations,
2199            last_offset: plan.last_offset,
2200            checkpoint_saved: !complete,
2201            published: complete,
2202            index_path: if complete { final_path } else { staging_path },
2203            manifest_path,
2204        })
2205    }
2206
2207    pub fn run_backfill_from_storage(
2208        &self,
2209        storage: &FrankenStorage,
2210        data_dir: &Path,
2211        manifest: &mut SemanticManifest,
2212        plan: SemanticBackfillStoragePlan,
2213    ) -> Result<SemanticBackfillBatchOutcome> {
2214        self.run_backfill_from_storage_with_sink(
2215            storage,
2216            data_dir,
2217            manifest,
2218            plan,
2219            &SemanticProgressSink::disabled(),
2220        )
2221    }
2222
2223    /// Variant of [`run_backfill_from_storage`] that emits semantic
2224    /// progress events to a JSONL sink and persists `last_message_id`
2225    /// in the resumable checkpoint. The sink is silent unless
2226    /// `CASS_SEMANTIC_PROGRESS_JSONL` is set.
2227    pub fn run_backfill_from_storage_with_sink(
2228        &self,
2229        storage: &FrankenStorage,
2230        data_dir: &Path,
2231        manifest: &mut SemanticManifest,
2232        plan: SemanticBackfillStoragePlan,
2233        sink: &SemanticProgressSink,
2234    ) -> Result<SemanticBackfillBatchOutcome> {
2235        self.run_backfill_from_storage_with_caps_and_sink(
2236            storage,
2237            data_dir,
2238            manifest,
2239            plan,
2240            SemanticCheckpointCaps::unlimited(),
2241            sink,
2242        )
2243    }
2244
2245    /// Variant of [`run_backfill_from_storage_with_sink`] for CLI backfill
2246    /// runs. It applies operator checkpoint caps from
2247    /// `CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT` and
2248    /// `CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT` while keeping each selected
2249    /// conversation whole, so message-cursor resume cannot strand the tail of
2250    /// a partially selected conversation.
2251    pub fn run_capped_backfill_from_storage_with_sink(
2252        &self,
2253        storage: &FrankenStorage,
2254        data_dir: &Path,
2255        manifest: &mut SemanticManifest,
2256        plan: SemanticBackfillStoragePlan,
2257        sink: &SemanticProgressSink,
2258    ) -> Result<SemanticBackfillBatchOutcome> {
2259        self.run_backfill_from_storage_with_caps_and_sink(
2260            storage,
2261            data_dir,
2262            manifest,
2263            plan,
2264            SemanticCheckpointCaps::from_env(),
2265            sink,
2266        )
2267    }
2268
2269    fn run_backfill_from_storage_with_caps_and_sink(
2270        &self,
2271        storage: &FrankenStorage,
2272        data_dir: &Path,
2273        manifest: &mut SemanticManifest,
2274        plan: SemanticBackfillStoragePlan,
2275        caps: SemanticCheckpointCaps,
2276        sink: &SemanticProgressSink,
2277    ) -> Result<SemanticBackfillBatchOutcome> {
2278        let prior_checkpoint = manifest.checkpoint.as_ref().filter(|checkpoint| {
2279            checkpoint.tier == plan.tier
2280                && checkpoint.embedder_id == self.embedder_id()
2281                && checkpoint.is_valid(&plan.db_fingerprint)
2282        });
2283        let after_conversation_id = prior_checkpoint.map_or(0, |checkpoint| checkpoint.last_offset);
2284        let prior_last_message_id =
2285            prior_checkpoint.and_then(|checkpoint| checkpoint.last_message_id);
2286
2287        if sink.is_active() {
2288            sink.emit(
2289                SemanticProgressEvent::SelectionStart,
2290                SemanticProgressFields {
2291                    last_conversation_id: Some(after_conversation_id),
2292                    last_message_id: prior_last_message_id,
2293                    rows_total: Some(saturating_u64_from_usize(plan.max_conversations)),
2294                    note: Some(plan.tier.as_str().to_string()),
2295                    ..Default::default()
2296                },
2297            );
2298        }
2299
2300        let batch = match fetch_canonical_embedding_batch_inner_with_caps(
2301            storage,
2302            after_conversation_id,
2303            plan.max_conversations,
2304            prior_last_message_id,
2305            caps,
2306        ) {
2307            Ok(batch) => batch,
2308            Err(err) => {
2309                if sink.is_active() {
2310                    sink.emit(
2311                        SemanticProgressEvent::Error,
2312                        SemanticProgressFields {
2313                            error: Some(format!("selection: {err}")),
2314                            last_conversation_id: Some(after_conversation_id),
2315                            last_message_id: prior_last_message_id,
2316                            ..Default::default()
2317                        },
2318                    );
2319                }
2320                return Err(err);
2321            }
2322        };
2323
2324        if sink.is_active() {
2325            sink.emit(
2326                SemanticProgressEvent::SelectionDone,
2327                SemanticProgressFields {
2328                    last_conversation_id: Some(batch.last_conversation_id),
2329                    last_message_id: prior_last_message_id,
2330                    conversations_in_batch: Some(batch.conversations_in_batch),
2331                    rows_total: Some(batch.total_conversations),
2332                    ..Default::default()
2333                },
2334            );
2335            // PacketReplay {start,progress,done} bracket the
2336            // envelope/messages/packet build done by
2337            // `fetch_canonical_embedding_batch`. We can't easily plumb
2338            // a callback inside that helper without refactoring it, so
2339            // the start/done bracket here straddles the work that
2340            // already happened (replay always finishes before we see
2341            // the result). A future refactor — flagged in the
2342            // SQL-shape follow-up — can move the packet-replay work
2343            // into a streaming iterator and emit `progress` ticks
2344            // per conversation. For now, the bracket still gives
2345            // operators a clear "we got past replay" signal.
2346            sink.emit(
2347                SemanticProgressEvent::PacketReplayStart,
2348                SemanticProgressFields {
2349                    conversations_in_batch: Some(batch.conversations_in_batch),
2350                    ..Default::default()
2351                },
2352            );
2353            sink.emit(
2354                SemanticProgressEvent::PacketReplayProgress,
2355                SemanticProgressFields {
2356                    conversations_in_batch: Some(batch.conversations_in_batch),
2357                    rows_processed: Some(saturating_u64_from_usize(batch.inputs.len())),
2358                    bytes: Some(
2359                        batch
2360                            .inputs
2361                            .iter()
2362                            .map(|i| saturating_u64_from_usize(i.content.len()))
2363                            .sum(),
2364                    ),
2365                    ..Default::default()
2366                },
2367            );
2368            sink.emit(
2369                SemanticProgressEvent::PacketReplayDone,
2370                SemanticProgressFields {
2371                    conversations_in_batch: Some(batch.conversations_in_batch),
2372                    rows_processed: Some(saturating_u64_from_usize(batch.inputs.len())),
2373                    ..Default::default()
2374                },
2375            );
2376        }
2377
2378        // Compute the freshest `last_message_id` from the inputs we are
2379        // about to embed. EmbeddingInput.message_id is u64 (canonical
2380        // message PK); we coerce to i64 since the manifest stores i64.
2381        let batch_last_message_id = batch
2382            .inputs
2383            .iter()
2384            .map(|input| i64::try_from(input.message_id).unwrap_or(i64::MAX))
2385            .max();
2386        let next_last_message_id = match (prior_last_message_id, batch_last_message_id) {
2387            (Some(prior), Some(batch_max)) => Some(prior.max(batch_max)),
2388            (Some(prior), None) => Some(prior),
2389            (None, Some(batch_max)) => Some(batch_max),
2390            (None, None) => None,
2391        };
2392
2393        let outcome = self.run_backfill_batch_with_sink(
2394            &batch.inputs,
2395            data_dir,
2396            manifest,
2397            SemanticBackfillBatchPlan {
2398                tier: plan.tier,
2399                db_fingerprint: plan.db_fingerprint,
2400                model_revision: plan.model_revision,
2401                total_conversations: batch.total_conversations,
2402                conversations_in_batch: batch.conversations_in_batch,
2403                last_offset: batch.last_conversation_id,
2404            },
2405            next_last_message_id,
2406            sink,
2407        );
2408
2409        match &outcome {
2410            Ok(o) => {
2411                if sink.is_active() {
2412                    sink.emit(
2413                        SemanticProgressEvent::Complete,
2414                        SemanticProgressFields {
2415                            rows_processed: Some(o.conversations_processed),
2416                            rows_total: Some(o.total_conversations),
2417                            last_conversation_id: Some(o.last_offset),
2418                            last_message_id: next_last_message_id,
2419                            ..Default::default()
2420                        },
2421                    );
2422                }
2423            }
2424            Err(err) => {
2425                if sink.is_active() {
2426                    sink.emit(
2427                        SemanticProgressEvent::Error,
2428                        SemanticProgressFields {
2429                            error: Some(err.to_string()),
2430                            last_conversation_id: Some(batch.last_conversation_id),
2431                            last_message_id: next_last_message_id,
2432                            ..Default::default()
2433                        },
2434                    );
2435                }
2436            }
2437        }
2438
2439        outcome
2440    }
2441
2442    /// Build and save an HNSW index for approximate nearest neighbor search.
2443    ///
2444    /// This creates an HNSW graph structure from the existing VectorIndex,
2445    /// enabling O(log n) approximate search with the `--approximate` flag.
2446    ///
2447    /// # Arguments
2448    /// * `vector_index` - The VectorIndex to build HNSW from
2449    /// * `data_dir` - Directory to save the HNSW index
2450    /// * `m` - Max connections per node (default: 16)
2451    /// * `ef_construction` - Search width during build (default: 200)
2452    ///
2453    /// # Returns
2454    /// Path to the saved HNSW index file
2455    pub fn build_hnsw_index(
2456        &self,
2457        vector_index: &FsVectorIndex,
2458        data_dir: &Path,
2459        m: Option<usize>,
2460        ef_construction: Option<usize>,
2461    ) -> Result<PathBuf> {
2462        let m = m.unwrap_or(FS_HNSW_DEFAULT_M);
2463        let ef_construction = ef_construction.unwrap_or(FS_HNSW_DEFAULT_EF_CONSTRUCTION);
2464
2465        tracing::info!(
2466            embedder = self.embedder_id(),
2467            count = vector_index.record_count(),
2468            m,
2469            ef_construction,
2470            "Building HNSW index for approximate nearest neighbor search"
2471        );
2472
2473        let config = FsHnswConfig {
2474            m,
2475            ef_construction,
2476            ..FsHnswConfig::default()
2477        };
2478        let hnsw = FsHnswIndex::build_from_vector_index(vector_index, config)
2479            .map_err(|err| anyhow::anyhow!("build HNSW index failed: {err}"))?;
2480
2481        let hnsw_path = hnsw_index_path(data_dir, self.embedder_id());
2482        if let Some(parent) = hnsw_path.parent() {
2483            std::fs::create_dir_all(parent)?;
2484        }
2485        hnsw.save(&hnsw_path)
2486            .map_err(|err| anyhow::anyhow!("save HNSW index failed: {err}"))?;
2487
2488        tracing::info!(?hnsw_path, "Saved HNSW index");
2489        Ok(hnsw_path)
2490    }
2491}
2492
2493#[cfg(test)]
2494mod tests {
2495    use super::*;
2496    use crate::model::types::{Agent, AgentKind, Conversation, Message, MessageRole};
2497    use crate::storage::sqlite::FrankenStorage;
2498    use serde_json::json;
2499    use std::path::Path;
2500    use tempfile::tempdir;
2501
2502    #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2503    struct ComparableSemanticInput {
2504        message_id: u64,
2505        created_at_ms: i64,
2506        agent_id: u32,
2507        workspace_id: u32,
2508        source_id: u32,
2509        role: u8,
2510        content: String,
2511    }
2512
2513    fn comparable_semantic_inputs(mut inputs: Vec<EmbeddingInput>) -> Vec<ComparableSemanticInput> {
2514        let mut comparable: Vec<ComparableSemanticInput> = inputs
2515            .drain(..)
2516            .map(|input| ComparableSemanticInput {
2517                message_id: input.message_id,
2518                created_at_ms: input.created_at_ms,
2519                agent_id: input.agent_id,
2520                workspace_id: input.workspace_id,
2521                source_id: input.source_id,
2522                role: input.role,
2523                content: input.content,
2524            })
2525            .collect();
2526        comparable.sort();
2527        comparable
2528    }
2529
2530    fn test_conversation(external_id: &str, body: &str) -> Conversation {
2531        test_conversation_fixture(
2532            external_id,
2533            vec![Message {
2534                id: None,
2535                idx: 0,
2536                role: MessageRole::User,
2537                author: None,
2538                created_at: Some(1_700_000_000_500),
2539                content: body.to_string(),
2540                extra_json: json!({}),
2541                snippets: Vec::new(),
2542            }],
2543            "local",
2544            None,
2545        )
2546    }
2547
2548    fn test_conversation_with_messages(external_id: &str, messages: Vec<Message>) -> Conversation {
2549        test_conversation_fixture(external_id, messages, "remote-laptop", Some("builder-host"))
2550    }
2551
2552    fn test_conversation_fixture(
2553        external_id: &str,
2554        messages: Vec<Message>,
2555        source_id: &str,
2556        origin_host: Option<&str>,
2557    ) -> Conversation {
2558        Conversation {
2559            id: None,
2560            agent_slug: "codex".to_string(),
2561            workspace: None,
2562            external_id: Some(external_id.to_string()),
2563            title: Some(format!("semantic {external_id}")),
2564            source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2565            started_at: Some(1_700_000_000_000),
2566            ended_at: Some(1_700_000_001_000),
2567            approx_tokens: None,
2568            metadata_json: json!({}),
2569            messages,
2570            source_id: source_id.to_string(),
2571            origin_host: origin_host.map(str::to_string),
2572        }
2573    }
2574
2575    fn default_scheduler_signals() -> SemanticBackfillSchedulerSignals {
2576        SemanticBackfillSchedulerSignals {
2577            foreground_pressure: false,
2578            lexical_repair_active: false,
2579            force: false,
2580            operator_disabled: false,
2581        }
2582    }
2583
2584    struct EnvVarGuard {
2585        key: &'static str,
2586        prior: Option<String>,
2587    }
2588
2589    impl EnvVarGuard {
2590        fn set(key: &'static str, value: &str) -> Self {
2591            let prior = std::env::var(key).ok();
2592            // SAFETY: focused tests temporarily mutate process env and restore on drop.
2593            unsafe {
2594                std::env::set_var(key, value);
2595            }
2596            Self { key, prior }
2597        }
2598
2599        fn remove(key: &'static str) -> Self {
2600            let prior = std::env::var(key).ok();
2601            // SAFETY: focused tests temporarily mutate process env and restore on drop.
2602            unsafe {
2603                std::env::remove_var(key);
2604            }
2605            Self { key, prior }
2606        }
2607    }
2608
2609    impl Drop for EnvVarGuard {
2610        fn drop(&mut self) {
2611            // SAFETY: restores the process env value captured by this test guard.
2612            unsafe {
2613                match self.prior.as_deref() {
2614                    Some(value) => std::env::set_var(self.key, value),
2615                    None => std::env::remove_var(self.key),
2616                }
2617            }
2618        }
2619    }
2620
2621    #[test]
2622    fn semantic_backfill_scheduler_runs_and_scales_batch_under_idle_budget() {
2623        let policy = SemanticPolicy::compiled_defaults();
2624        let decision = semantic_backfill_scheduler_decision_for_capacity(
2625            &policy,
2626            64,
2627            &default_scheduler_signals(),
2628            80,
2629        );
2630
2631        assert!(decision.should_run());
2632        assert_eq!(decision.state, SemanticBackfillSchedulerState::Running);
2633        assert_eq!(
2634            decision.reason,
2635            SemanticBackfillSchedulerReason::IdleBudgetAvailable
2636        );
2637        assert_eq!(decision.scheduled_batch_conversations, 51);
2638        assert_eq!(decision.current_capacity_pct, 80);
2639        assert_eq!(decision.next_eligible_after_ms, 0);
2640    }
2641
2642    #[test]
2643    fn semantic_backfill_scheduler_reason_next_steps_are_stable() {
2644        for (reason, expected) in [
2645            (
2646                SemanticBackfillSchedulerReason::IdleBudgetAvailable,
2647                "background semantic backfill is within idle budgets",
2648            ),
2649            (
2650                SemanticBackfillSchedulerReason::OperatorDisabled,
2651                "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE",
2652            ),
2653            (
2654                SemanticBackfillSchedulerReason::PolicyDisabled,
2655                "semantic policy disables background semantic backfill",
2656            ),
2657            (
2658                SemanticBackfillSchedulerReason::ForegroundPressure,
2659                "foreground pressure is present; retry after the idle delay",
2660            ),
2661            (
2662                SemanticBackfillSchedulerReason::LexicalRepairActive,
2663                "lexical repair is active; semantic backfill is yielding",
2664            ),
2665            (
2666                SemanticBackfillSchedulerReason::CapacityBelowFloor,
2667                "machine responsiveness capacity is below the semantic backfill floor",
2668            ),
2669            (
2670                SemanticBackfillSchedulerReason::ThreadBudgetZero,
2671                "semantic backfill thread budget is zero",
2672            ),
2673            (
2674                SemanticBackfillSchedulerReason::BatchBudgetZero,
2675                "semantic backfill batch budget is zero",
2676            ),
2677        ] {
2678            assert_eq!(reason.next_step(), expected, "{reason:?}");
2679        }
2680    }
2681
2682    #[test]
2683    fn semantic_backfill_scheduler_yields_to_foreground_and_lexical_pressure() {
2684        let policy = SemanticPolicy::compiled_defaults();
2685        let foreground = SemanticBackfillSchedulerSignals {
2686            foreground_pressure: true,
2687            ..default_scheduler_signals()
2688        };
2689        let foreground_decision =
2690            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &foreground, 100);
2691        assert!(!foreground_decision.should_run());
2692        assert_eq!(
2693            foreground_decision.state,
2694            SemanticBackfillSchedulerState::Paused
2695        );
2696        assert_eq!(
2697            foreground_decision.reason,
2698            SemanticBackfillSchedulerReason::ForegroundPressure
2699        );
2700        assert_eq!(
2701            foreground_decision.next_eligible_after_ms,
2702            policy.idle_delay_seconds * 1000
2703        );
2704
2705        let lexical_repair = SemanticBackfillSchedulerSignals {
2706            lexical_repair_active: true,
2707            ..default_scheduler_signals()
2708        };
2709        let lexical_decision =
2710            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &lexical_repair, 100);
2711        assert!(!lexical_decision.should_run());
2712        assert_eq!(
2713            lexical_decision.state,
2714            SemanticBackfillSchedulerState::Paused
2715        );
2716        assert_eq!(
2717            lexical_decision.reason,
2718            SemanticBackfillSchedulerReason::LexicalRepairActive
2719        );
2720    }
2721
2722    #[test]
2723    fn semantic_backfill_scheduler_honors_policy_disable_and_force_override() {
2724        let mut policy = SemanticPolicy::compiled_defaults();
2725        policy.mode = crate::search::policy::SemanticMode::LexicalOnly;
2726
2727        let disabled = semantic_backfill_scheduler_decision_for_capacity(
2728            &policy,
2729            64,
2730            &default_scheduler_signals(),
2731            100,
2732        );
2733        assert!(!disabled.should_run());
2734        assert_eq!(disabled.state, SemanticBackfillSchedulerState::Disabled);
2735        assert_eq!(
2736            disabled.reason,
2737            SemanticBackfillSchedulerReason::PolicyDisabled
2738        );
2739
2740        let forced = SemanticBackfillSchedulerSignals {
2741            force: true,
2742            ..default_scheduler_signals()
2743        };
2744        let forced_decision =
2745            semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &forced, 100);
2746        assert!(forced_decision.should_run());
2747        assert_eq!(
2748            forced_decision.reason,
2749            SemanticBackfillSchedulerReason::IdleBudgetAvailable
2750        );
2751        assert!(forced_decision.forced);
2752    }
2753
2754    #[test]
2755    fn test_batch_embedding() {
2756        let indexer = SemanticIndexer::new("hash", None).unwrap();
2757        let messages = vec![
2758            EmbeddingInput::new(1, "Hello world"),
2759            EmbeddingInput::new(2, "Goodbye world"),
2760        ];
2761
2762        let embeddings = indexer.embed_messages(&messages).unwrap();
2763
2764        assert_eq!(embeddings.len(), 2);
2765        assert_eq!(embeddings[0].message_id, 1);
2766        assert_eq!(embeddings[1].message_id, 2);
2767        assert_eq!(embeddings[0].embedding.len(), indexer.embedder_dimension());
2768    }
2769
2770    #[test]
2771    fn test_progress_indicator() {
2772        let indexer = SemanticIndexer::new("hash", None).unwrap();
2773        let messages: Vec<_> = (0..1000)
2774            .map(|i| EmbeddingInput::new(i as u64, format!("Message {}", i)))
2775            .collect();
2776
2777        let embeddings = indexer.embed_messages(&messages).unwrap();
2778        assert_eq!(embeddings.len(), messages.len());
2779    }
2780
2781    #[test]
2782    fn test_build_and_save_index() {
2783        let indexer = SemanticIndexer::new("hash", None).unwrap();
2784        let messages = vec![
2785            EmbeddingInput::new(1, "Hello world"),
2786            EmbeddingInput::new(2, "Goodbye world"),
2787        ];
2788
2789        let embeddings = indexer.embed_messages(&messages).unwrap();
2790        let tmp = tempdir().unwrap();
2791        let index = indexer
2792            .build_and_save_index(embeddings, tmp.path())
2793            .unwrap();
2794        assert_eq!(index.embedder_id(), indexer.embedder_id());
2795        assert_eq!(index.dimension(), indexer.embedder_dimension());
2796        assert_eq!(index.record_count(), 2);
2797    }
2798
2799    #[test]
2800    fn sharded_index_build_writes_sidecar_without_runtime_publish() {
2801        let indexer = SemanticIndexer::new("hash", None).unwrap();
2802        let messages: Vec<_> = (0..5)
2803            .map(|idx| EmbeddingInput::new(idx, format!("semantic shard message {idx}")))
2804            .collect();
2805        let embeddings = indexer.embed_messages(&messages).unwrap();
2806        let tmp = tempdir().unwrap();
2807
2808        let outcome = indexer
2809            .build_and_save_index_shards(
2810                embeddings,
2811                tmp.path(),
2812                SemanticShardBuildPlan {
2813                    tier: TierKind::Fast,
2814                    db_fingerprint: "db-fp-sharded-build".to_string(),
2815                    model_revision: "hash".to_string(),
2816                    total_conversations: 5,
2817                    max_records_per_shard: 2,
2818                    build_ann: false,
2819                },
2820            )
2821            .unwrap();
2822
2823        assert_eq!(outcome.shard_count, 3);
2824        assert_eq!(outcome.doc_count, 5);
2825        assert_eq!(outcome.total_conversations, 5);
2826        assert!(outcome.complete);
2827        assert_eq!(outcome.index_paths.len(), 3);
2828        for path in &outcome.index_paths {
2829            let shard = FsVectorIndex::open(path).unwrap();
2830            assert_eq!(shard.embedder_id(), indexer.embedder_id());
2831            assert!(shard.record_count() > 0);
2832        }
2833
2834        let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2835        let summary = shards.summary(TierKind::Fast, indexer.embedder_id(), "db-fp-sharded-build");
2836        assert!(summary.complete);
2837        assert_eq!(summary.ready_shards, 3);
2838        assert_eq!(summary.ann_ready_shards, 0);
2839        assert_eq!(summary.doc_count, 5);
2840        assert_eq!(summary.total_conversations, 5);
2841
2842        assert!(
2843            SemanticManifest::load(tmp.path()).unwrap().is_none(),
2844            "sidecar shards must not publish the main runtime manifest"
2845        );
2846        assert!(!vector_index_path(tmp.path(), indexer.embedder_id()).exists());
2847    }
2848
2849    #[test]
2850    fn sharded_index_build_rejects_zero_sized_shards() {
2851        let indexer = SemanticIndexer::new("hash", None).unwrap();
2852        let err = indexer
2853            .build_and_save_index_shards(
2854                std::iter::empty(),
2855                tempdir().unwrap().path(),
2856                SemanticShardBuildPlan {
2857                    tier: TierKind::Fast,
2858                    db_fingerprint: "db-fp-sharded-build".to_string(),
2859                    model_revision: "hash".to_string(),
2860                    total_conversations: 0,
2861                    max_records_per_shard: 0,
2862                    build_ann: false,
2863                },
2864            )
2865            .unwrap_err();
2866
2867        assert!(err.to_string().contains("max_records_per_shard > 0"));
2868    }
2869
2870    #[test]
2871    fn sharded_ann_build_records_per_shard_accelerators() {
2872        let indexer = SemanticIndexer::new("hash", None).unwrap();
2873        let messages: Vec<_> = (0..8)
2874            .map(|idx| EmbeddingInput::new(idx, format!("semantic ann shard message {idx}")))
2875            .collect();
2876        let embeddings = indexer.embed_messages(&messages).unwrap();
2877        let tmp = tempdir().unwrap();
2878
2879        let outcome = indexer
2880            .build_and_save_index_shards(
2881                embeddings,
2882                tmp.path(),
2883                SemanticShardBuildPlan {
2884                    tier: TierKind::Fast,
2885                    db_fingerprint: "db-fp-sharded-ann-build".to_string(),
2886                    model_revision: "hash".to_string(),
2887                    total_conversations: 8,
2888                    max_records_per_shard: 4,
2889                    build_ann: true,
2890                },
2891            )
2892            .unwrap();
2893
2894        assert_eq!(outcome.shard_count, 2);
2895        assert_eq!(outcome.ann_index_paths.len(), 2);
2896        for path in &outcome.ann_index_paths {
2897            assert!(path.exists(), "ANN shard missing at {}", path.display());
2898        }
2899
2900        let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2901        let summary = shards.summary(
2902            TierKind::Fast,
2903            indexer.embedder_id(),
2904            "db-fp-sharded-ann-build",
2905        );
2906        assert!(summary.complete);
2907        assert_eq!(summary.ann_ready_shards, 2);
2908        assert!(summary.ann_size_bytes > 0);
2909        assert!(
2910            shards
2911                .shards
2912                .iter()
2913                .all(|record| record.ann_index_path.is_some() && record.ann_ready)
2914        );
2915    }
2916
2917    /// Golden-output regression: any change to the embedding prep pipeline,
2918    /// the canonicalizer, the hash embedder's deterministic projection, or
2919    /// the ordering semantics of `embed_messages` must not silently mutate
2920    /// the bytes we write to the vector index. This digest is derived from a
2921    /// frozen 64-message corpus processed through the hash embedder; a
2922    /// mismatch means one of those contracts moved.
2923    #[test]
2924    fn embed_messages_golden_digest_hash_embedder() {
2925        use ring::digest::{Context, SHA256};
2926
2927        let corpus: Vec<EmbeddingInput> = (0..64)
2928            .map(|i| {
2929                let body = match i % 5 {
2930                    0 => format!("plain text message number {i}"),
2931                    1 => format!("**bold** line {i} with _emphasis_"),
2932                    2 => format!("```rust\nfn f_{i}() {{ println!(\"{i}\"); }}\n```"),
2933                    3 => format!("   whitespace {i}   "),
2934                    _ => format!("unicode \u{00E9}\u{0301} + emoji \u{1F600} {i}"),
2935                };
2936                EmbeddingInput::new(i as u64, body)
2937            })
2938            .collect();
2939
2940        let indexer = SemanticIndexer::new("hash", None)
2941            .unwrap()
2942            .with_batch_size(16)
2943            .unwrap();
2944        let embeddings = indexer.embed_messages(&corpus).unwrap();
2945
2946        // Digest over (message_id, content_hash, embedding f32 bytes) for every
2947        // embedded message, in the order emitted. Preserves order + content +
2948        // numeric equality without having to compare raw floats directly.
2949        let mut ctx = Context::new(&SHA256);
2950        for em in &embeddings {
2951            ctx.update(&em.message_id.to_le_bytes());
2952            ctx.update(&em.content_hash);
2953            for v in &em.embedding {
2954                ctx.update(&v.to_le_bytes());
2955            }
2956        }
2957        let digest = hex::encode(ctx.finish().as_ref());
2958
2959        // Captured 2026-04-21 against a freshly built hash embedder, batch
2960        // size 16, the frozen 64-message corpus above. Stable so long as
2961        // the prep pipeline, canonicalizer, and HashEmbedder::embed
2962        // implementation are all byte-preserving. If you intentionally
2963        // changed any of those, update this value AND record the reason
2964        // in the commit message.
2965        const EXPECTED: &str = "22d9ae7076925a4b70a194b0f519dfb1d465cc757368c296ef24055a02038c2c";
2966        assert_eq!(
2967            digest, EXPECTED,
2968            "embed_messages golden digest drifted; if this was intentional, \
2969             update EXPECTED in this test and record the reason in the commit message"
2970        );
2971    }
2972
2973    #[test]
2974    fn parallel_prep_matches_serial_prep_bitwise() {
2975        // Mix of short, long, empty, markdown, code-block, and unicode inputs
2976        // to make sure the canonicalizer is exercised across all of its paths.
2977        let inputs: Vec<EmbeddingInput> = (0..500)
2978            .map(|i| {
2979                let text = match i % 7 {
2980                    0 => format!("Plain message number {i} with some ordinary words."),
2981                    1 => format!("**Bold** and _italic_ markdown line {i}"),
2982                    2 => format!(
2983                        "```rust\nfn example_{i}() {{\n    println!(\"code block {i}\");\n}}\n```\nfollow-up text"
2984                    ),
2985                    3 => String::new(), // empty — should be filtered
2986                    4 => format!("   whitespace   galore   {i}   "),
2987                    5 => format!("Unicode \u{00E9}\u{0301} (combining accent) and emoji \u{1F600} line {i}"),
2988                    _ => format!(
2989                        "Mixed line {i}: `inline_code`, [link](http://x), {{braces}}, and \u{201C}curly quotes\u{201D}."
2990                    ),
2991                };
2992                EmbeddingInput::new(i as u64, text)
2993            })
2994            .collect();
2995
2996        let serial = prepare_window(&inputs, true);
2997        let parallel = prepare_window(&inputs, false);
2998
2999        assert_eq!(
3000            serial.len(),
3001            parallel.len(),
3002            "serial and parallel prep should skip the same number of empty canonicals"
3003        );
3004
3005        for (s, p) in serial.iter().zip(parallel.iter()) {
3006            assert_eq!(
3007                s.msg.message_id, p.msg.message_id,
3008                "ordering must be preserved between serial and parallel prep"
3009            );
3010            assert_eq!(
3011                s.canonical, p.canonical,
3012                "canonical form diverged between serial and parallel prep"
3013            );
3014            assert_eq!(
3015                s.hash, p.hash,
3016                "content hash diverged between serial and parallel prep"
3017            );
3018        }
3019    }
3020
3021    #[test]
3022    fn parallel_prep_filters_empty_canonicals() {
3023        let inputs = vec![
3024            EmbeddingInput::new(1, "valid content"),
3025            EmbeddingInput::new(2, ""),
3026            EmbeddingInput::new(3, "   \n\n   \t  "),
3027            EmbeddingInput::new(4, "more valid content"),
3028        ];
3029
3030        let prepared = prepare_window(&inputs, false);
3031        let ids: Vec<u64> = prepared.iter().map(|p| p.msg.message_id).collect();
3032
3033        assert!(ids.contains(&1));
3034        assert!(ids.contains(&4));
3035        // ids 2 and 3 should be dropped because their canonicals are empty.
3036        assert!(!ids.contains(&2));
3037        assert!(!ids.contains(&3));
3038    }
3039
3040    #[test]
3041    fn memoized_serial_prep_matches_stateless_prepare_window() {
3042        let inputs = vec![
3043            EmbeddingInput::new(1, "repeat me exactly"),
3044            EmbeddingInput::new(2, "repeat me exactly"),
3045            EmbeddingInput::new(3, "unique payload"),
3046            EmbeddingInput::new(4, ""),
3047            EmbeddingInput::new(5, "repeat me exactly"),
3048        ];
3049
3050        let baseline = prepare_window(&inputs, true);
3051        let mut cache = ContentAddressedMemoCache::with_capacity(16);
3052        let memoized = prepare_window_with_memo(&inputs, &mut cache);
3053
3054        assert_eq!(baseline.len(), memoized.len());
3055        for (plain, cached) in baseline.iter().zip(memoized.iter()) {
3056            assert_eq!(plain.msg.message_id, cached.msg.message_id);
3057            assert_eq!(plain.canonical, cached.canonical);
3058            assert_eq!(plain.hash, cached.hash);
3059        }
3060    }
3061
3062    #[test]
3063    fn semantic_prep_memo_key_uses_stable_content_hash_bytes() {
3064        let key = semantic_prep_memo_key("repeat me exactly");
3065        let expected = content_hash("repeat me exactly");
3066
3067        assert_eq!(key.content_hash.as_bytes(), expected.as_slice());
3068        assert_eq!(key.content_hash.as_bytes().len(), expected.len());
3069        assert_eq!(key.algorithm, SEMANTIC_PREP_MEMO_ALGORITHM);
3070        assert_eq!(key.algorithm_version, SEMANTIC_PREP_MEMO_VERSION);
3071    }
3072
3073    #[test]
3074    fn memoized_serial_prep_reuses_duplicate_content_across_windows() {
3075        let inputs = vec![
3076            EmbeddingInput::new(1, "repeat me exactly"),
3077            EmbeddingInput::new(2, "repeat me exactly"),
3078            EmbeddingInput::new(3, "unique payload"),
3079            EmbeddingInput::new(4, ""),
3080            EmbeddingInput::new(5, "repeat me exactly"),
3081        ];
3082
3083        let mut cache = ContentAddressedMemoCache::with_capacity(16);
3084        let prepared = prepare_window_with_memo(&inputs, &mut cache);
3085        let stats = cache.stats().clone();
3086
3087        assert_eq!(prepared.len(), 4);
3088        assert_eq!(stats.hits, 2);
3089        assert_eq!(stats.misses, 3);
3090        assert_eq!(stats.inserts, 2);
3091        assert_eq!(stats.live_entries, 2);
3092    }
3093
3094    #[test]
3095    fn packet_embedding_inputs_reuse_memoized_prep_for_duplicate_content() -> Result<()> {
3096        let temp = tempdir().unwrap();
3097        let db_path = temp.path().join("agent_search.db");
3098        let storage = FrankenStorage::open(&db_path)?;
3099        let agent_id = storage.ensure_agent(&Agent {
3100            id: None,
3101            slug: "codex".to_string(),
3102            name: "Codex".to_string(),
3103            version: None,
3104            kind: AgentKind::Cli,
3105        })?;
3106
3107        storage.insert_conversation_tree(
3108            agent_id,
3109            None,
3110            &test_conversation_with_messages(
3111                "packet-memo-conv-one",
3112                vec![
3113                    Message {
3114                        id: None,
3115                        idx: 0,
3116                        role: MessageRole::User,
3117                        author: None,
3118                        created_at: Some(1_700_000_010_100),
3119                        content: "shared semantic payload".to_string(),
3120                        extra_json: json!({}),
3121                        snippets: Vec::new(),
3122                    },
3123                    Message {
3124                        id: None,
3125                        idx: 1,
3126                        role: MessageRole::Agent,
3127                        author: None,
3128                        created_at: Some(1_700_000_010_200),
3129                        content: "unique semantic payload one".to_string(),
3130                        extra_json: json!({}),
3131                        snippets: Vec::new(),
3132                    },
3133                ],
3134            ),
3135        )?;
3136        storage.insert_conversation_tree(
3137            agent_id,
3138            None,
3139            &test_conversation_with_messages(
3140                "packet-memo-conv-two",
3141                vec![
3142                    Message {
3143                        id: None,
3144                        idx: 0,
3145                        role: MessageRole::Tool,
3146                        author: None,
3147                        created_at: Some(1_700_000_010_300),
3148                        content: "shared semantic payload".to_string(),
3149                        extra_json: json!({}),
3150                        snippets: Vec::new(),
3151                    },
3152                    Message {
3153                        id: None,
3154                        idx: 1,
3155                        role: MessageRole::Agent,
3156                        author: None,
3157                        created_at: Some(1_700_000_010_400),
3158                        content: "unique semantic payload two".to_string(),
3159                        extra_json: json!({}),
3160                        snippets: Vec::new(),
3161                    },
3162                ],
3163            ),
3164        )?;
3165
3166        let packet_inputs = packet_embedding_inputs_from_storage(&storage)?;
3167        let mut cache = ContentAddressedMemoCache::with_capacity(16);
3168        let prepared = prepare_window_with_memo(&packet_inputs, &mut cache);
3169        let stats = cache.stats().clone();
3170
3171        assert_eq!(packet_inputs.len(), 4);
3172        assert_eq!(prepared.len(), 4);
3173        assert_eq!(
3174            semantic_prep_memo_key("shared semantic payload")
3175                .content_hash
3176                .as_bytes()
3177                .len(),
3178            32
3179        );
3180        assert_eq!(stats.hits, 1);
3181        assert_eq!(stats.misses, 3);
3182        assert_eq!(stats.inserts, 3);
3183        assert_eq!(stats.live_entries, 3);
3184        Ok(())
3185    }
3186
3187    #[test]
3188    fn backfill_batch_saves_checkpoint_and_staged_index_until_complete() {
3189        let temp = tempdir().unwrap();
3190        let mut manifest = SemanticManifest::default();
3191        let indexer = SemanticIndexer::new("hash", None).unwrap();
3192        let messages = vec![
3193            EmbeddingInput::new(10, "first staged semantic message"),
3194            EmbeddingInput::new(11, "second staged semantic message"),
3195        ];
3196
3197        let outcome = indexer
3198            .run_backfill_batch(
3199                &messages,
3200                temp.path(),
3201                &mut manifest,
3202                SemanticBackfillBatchPlan {
3203                    tier: TierKind::Fast,
3204                    db_fingerprint: "db-fp-backfill-partial".to_string(),
3205                    model_revision: "hash".to_string(),
3206                    total_conversations: 2,
3207                    conversations_in_batch: 1,
3208                    last_offset: 1,
3209                },
3210            )
3211            .unwrap();
3212
3213        assert!(!outcome.published);
3214        assert!(outcome.checkpoint_saved);
3215        assert!(outcome.index_path.exists());
3216        assert!(!vector_index_path(temp.path(), indexer.embedder_id()).exists());
3217        let checkpoint = manifest.checkpoint.as_ref().expect("checkpoint");
3218        assert_eq!(checkpoint.tier, TierKind::Fast);
3219        assert_eq!(checkpoint.conversations_processed, 1);
3220        assert_eq!(checkpoint.docs_embedded, 2);
3221        assert_eq!(manifest.backlog.total_conversations, 2);
3222        assert!(SemanticManifest::path(temp.path()).exists());
3223    }
3224
3225    #[test]
3226    fn backfill_batch_resumes_staged_index_and_publishes_manifest_atomically() {
3227        let temp = tempdir().unwrap();
3228        let mut manifest = SemanticManifest::default();
3229        let indexer = SemanticIndexer::new("hash", None).unwrap();
3230        let db_fingerprint = "db-fp-backfill-complete";
3231        let staging_path = semantic_staging_index_path(
3232            temp.path(),
3233            TierKind::Fast,
3234            indexer.embedder_id(),
3235            db_fingerprint,
3236        );
3237
3238        let first = vec![EmbeddingInput::new(20, "first resume batch")];
3239        let first_outcome = indexer
3240            .run_backfill_batch(
3241                &first,
3242                temp.path(),
3243                &mut manifest,
3244                SemanticBackfillBatchPlan {
3245                    tier: TierKind::Fast,
3246                    db_fingerprint: db_fingerprint.to_string(),
3247                    model_revision: "hash".to_string(),
3248                    total_conversations: 2,
3249                    conversations_in_batch: 1,
3250                    last_offset: 1,
3251                },
3252            )
3253            .unwrap();
3254        assert_eq!(first_outcome.index_path, staging_path);
3255        assert!(staging_path.exists());
3256
3257        let second = vec![EmbeddingInput::new(21, "second resume batch")];
3258        let second_outcome = indexer
3259            .run_backfill_batch(
3260                &second,
3261                temp.path(),
3262                &mut manifest,
3263                SemanticBackfillBatchPlan {
3264                    tier: TierKind::Fast,
3265                    db_fingerprint: db_fingerprint.to_string(),
3266                    model_revision: "hash".to_string(),
3267                    total_conversations: 2,
3268                    conversations_in_batch: 1,
3269                    last_offset: 2,
3270                },
3271            )
3272            .unwrap();
3273
3274        assert!(second_outcome.published);
3275        assert!(!second_outcome.checkpoint_saved);
3276        assert!(!staging_path.exists());
3277        let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3278        assert_eq!(second_outcome.index_path, final_path);
3279        assert!(final_path.exists());
3280        assert!(manifest.checkpoint.is_none());
3281        let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3282        assert!(artifact.ready);
3283        assert_eq!(artifact.conversation_count, 2);
3284        assert_eq!(artifact.doc_count, 2);
3285        assert_eq!(manifest.backlog.fast_tier_processed, 2);
3286
3287        let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
3288        assert!(loaded.checkpoint.is_none());
3289        assert!(loaded.fast_tier.as_ref().is_some_and(|record| record.ready));
3290    }
3291
3292    #[test]
3293    fn backfill_publish_compacts_resumed_wal_before_rename() {
3294        let temp = tempdir().unwrap();
3295        let mut manifest = SemanticManifest::default();
3296        let indexer = SemanticIndexer::new("hash", None).unwrap();
3297        let db_fingerprint = "db-fp-backfill-small-resume";
3298        let first: Vec<EmbeddingInput> = (0..20)
3299            .map(|idx| EmbeddingInput::new(100 + idx, format!("first batch message {idx}")))
3300            .collect();
3301
3302        let first_outcome = indexer
3303            .run_backfill_batch(
3304                &first,
3305                temp.path(),
3306                &mut manifest,
3307                SemanticBackfillBatchPlan {
3308                    tier: TierKind::Fast,
3309                    db_fingerprint: db_fingerprint.to_string(),
3310                    model_revision: "hash".to_string(),
3311                    total_conversations: 2,
3312                    conversations_in_batch: 1,
3313                    last_offset: 1,
3314                },
3315            )
3316            .unwrap();
3317        assert!(first_outcome.checkpoint_saved);
3318
3319        let second = vec![EmbeddingInput::new(200, "small final resume batch")];
3320        let second_outcome = indexer
3321            .run_backfill_batch(
3322                &second,
3323                temp.path(),
3324                &mut manifest,
3325                SemanticBackfillBatchPlan {
3326                    tier: TierKind::Fast,
3327                    db_fingerprint: db_fingerprint.to_string(),
3328                    model_revision: "hash".to_string(),
3329                    total_conversations: 2,
3330                    conversations_in_batch: 1,
3331                    last_offset: 2,
3332                },
3333            )
3334            .unwrap();
3335
3336        assert!(second_outcome.published);
3337        let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3338        let mut final_wal_path = final_path.as_os_str().to_os_string();
3339        final_wal_path.push(".wal");
3340        assert!(!PathBuf::from(final_wal_path).exists());
3341
3342        let published_index = FsVectorIndex::open(&final_path).unwrap();
3343        assert_eq!(published_index.record_count(), 21);
3344        let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3345        assert_eq!(artifact.doc_count, 21);
3346        assert_eq!(artifact.conversation_count, 2);
3347    }
3348
3349    #[test]
3350    fn backfill_from_storage_fetches_canonical_batches_and_resumes() -> Result<()> {
3351        let temp = tempdir().unwrap();
3352        let db_path = temp.path().join("agent_search.db");
3353        let storage = FrankenStorage::open(&db_path)?;
3354        let agent_id = storage.ensure_agent(&Agent {
3355            id: None,
3356            slug: "codex".to_string(),
3357            name: "Codex".to_string(),
3358            version: None,
3359            kind: AgentKind::Cli,
3360        })?;
3361        storage.insert_conversation_tree(
3362            agent_id,
3363            None,
3364            &test_conversation("first", "first canonical semantic message"),
3365        )?;
3366        storage.insert_conversation_tree(
3367            agent_id,
3368            None,
3369            &test_conversation("second", "second canonical semantic message"),
3370        )?;
3371
3372        let mut manifest = SemanticManifest::default();
3373        let indexer = SemanticIndexer::new("hash", None)?;
3374
3375        let first = indexer.run_backfill_from_storage(
3376            &storage,
3377            temp.path(),
3378            &mut manifest,
3379            SemanticBackfillStoragePlan {
3380                tier: TierKind::Fast,
3381                db_fingerprint: "canonical-db-fp".to_string(),
3382                model_revision: "hash".to_string(),
3383                max_conversations: 1,
3384            },
3385        )?;
3386        assert!(!first.published);
3387        assert!(first.checkpoint_saved);
3388        assert_eq!(first.conversations_processed, 1);
3389        assert_eq!(first.total_conversations, 2);
3390        assert_eq!(first.embedded_docs, 1);
3391        assert!(first.last_offset > 0);
3392
3393        let second = indexer.run_backfill_from_storage(
3394            &storage,
3395            temp.path(),
3396            &mut manifest,
3397            SemanticBackfillStoragePlan {
3398                tier: TierKind::Fast,
3399                db_fingerprint: "canonical-db-fp".to_string(),
3400                model_revision: "hash".to_string(),
3401                max_conversations: 1,
3402            },
3403        )?;
3404        assert!(second.published);
3405        assert!(!second.checkpoint_saved);
3406        assert_eq!(second.conversations_processed, 2);
3407        assert_eq!(second.embedded_docs, 1);
3408        assert!(manifest.checkpoint.is_none());
3409        assert_eq!(
3410            manifest.fast_tier.as_ref().map(|record| record.doc_count),
3411            Some(2)
3412        );
3413        Ok(())
3414    }
3415
3416    #[test]
3417    fn canonical_embedding_batch_uses_conversation_packet_semantic_projection() -> Result<()> {
3418        let temp = tempdir().unwrap();
3419        let db_path = temp.path().join("agent_search.db");
3420        let storage = FrankenStorage::open(&db_path)?;
3421        let agent_id = storage.ensure_agent(&Agent {
3422            id: None,
3423            slug: "codex".to_string(),
3424            name: "Codex".to_string(),
3425            version: None,
3426            kind: AgentKind::Cli,
3427        })?;
3428        storage.insert_conversation_tree(
3429            agent_id,
3430            None,
3431            &test_conversation_with_messages(
3432                "packet-projection",
3433                vec![
3434                    Message {
3435                        id: None,
3436                        idx: 0,
3437                        role: MessageRole::User,
3438                        author: None,
3439                        created_at: Some(1_700_000_000_500),
3440                        content: "user semantic text".to_string(),
3441                        extra_json: json!({}),
3442                        snippets: Vec::new(),
3443                    },
3444                    Message {
3445                        id: None,
3446                        idx: 1,
3447                        role: MessageRole::Tool,
3448                        author: None,
3449                        created_at: Some(1_700_000_000_600),
3450                        content: "tool semantic text".to_string(),
3451                        extra_json: json!({}),
3452                        snippets: Vec::new(),
3453                    },
3454                    Message {
3455                        id: None,
3456                        idx: 2,
3457                        role: MessageRole::System,
3458                        author: None,
3459                        created_at: Some(1_700_000_000_700),
3460                        content: String::new(),
3461                        extra_json: json!({}),
3462                        snippets: Vec::new(),
3463                    },
3464                ],
3465            ),
3466        )?;
3467
3468        let batch = fetch_canonical_embedding_batch(&storage, 0, 1)?;
3469
3470        assert_eq!(batch.conversations_in_batch, 1);
3471        assert_eq!(batch.inputs.len(), 2);
3472        assert_eq!(batch.inputs[0].content, "user semantic text");
3473        assert_eq!(batch.inputs[1].content, "tool semantic text");
3474        assert_eq!(batch.inputs[0].role, role_code_from_str("user").unwrap());
3475        assert_eq!(batch.inputs[1].role, role_code_from_str("tool").unwrap());
3476        let normalized_source_id =
3477            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3478        let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3479        assert_eq!(batch.inputs[0].source_id, expected_hash);
3480        assert_eq!(batch.inputs[1].source_id, expected_hash);
3481        Ok(())
3482    }
3483
3484    #[test]
3485    fn canonical_embedding_batch_pushes_last_message_id_filter_into_selection() -> Result<()> {
3486        let temp = tempdir().unwrap();
3487        let db_path = temp.path().join("agent_search.db");
3488        let storage = FrankenStorage::open(&db_path)?;
3489        let agent_id = storage.ensure_agent(&Agent {
3490            id: None,
3491            slug: "codex".to_string(),
3492            name: "Codex".to_string(),
3493            version: None,
3494            kind: AgentKind::Cli,
3495        })?;
3496        storage.insert_conversation_tree(
3497            agent_id,
3498            None,
3499            &test_conversation("before-watermark", "old semantic message"),
3500        )?;
3501        let watermark: i64 = storage.raw().query_row_map(
3502            "SELECT MAX(id) FROM messages",
3503            &[] as &[ParamValue],
3504            |row| row.get_typed(0),
3505        )?;
3506        storage.insert_conversation_tree(
3507            agent_id,
3508            None,
3509            &test_conversation("after-watermark", "new semantic message"),
3510        )?;
3511
3512        let batch = fetch_canonical_embedding_batch_inner(&storage, 0, 8, Some(watermark))?;
3513
3514        assert_eq!(
3515            batch.conversations_in_batch, 1,
3516            "last_message_id must be pushed into candidate selection so old conversations are not counted in the batch"
3517        );
3518        assert_eq!(batch.total_conversations, 2);
3519        assert_eq!(batch.inputs.len(), 1);
3520        assert_eq!(batch.inputs[0].content, "new semantic message");
3521        assert!(
3522            i64::try_from(batch.inputs[0].message_id).unwrap_or(i64::MAX) > watermark,
3523            "selected semantic input must be strictly newer than the checkpoint message id"
3524        );
3525        Ok(())
3526    }
3527
3528    #[test]
3529    fn checkpoint_caps_stop_at_whole_conversation_boundary() -> Result<()> {
3530        let temp = tempdir().unwrap();
3531        let db_path = temp.path().join("agent_search.db");
3532        let storage = FrankenStorage::open(&db_path)?;
3533        let agent_id = storage.ensure_agent(&Agent {
3534            id: None,
3535            slug: "codex".to_string(),
3536            name: "Codex".to_string(),
3537            version: None,
3538            kind: AgentKind::Cli,
3539        })?;
3540        for external_id in ["cap-first", "cap-second", "cap-third"] {
3541            storage.insert_conversation_tree(
3542                agent_id,
3543                None,
3544                &test_conversation_with_messages(
3545                    external_id,
3546                    vec![
3547                        Message {
3548                            id: None,
3549                            idx: 0,
3550                            role: MessageRole::User,
3551                            author: None,
3552                            created_at: Some(1_700_000_000_500),
3553                            content: format!("{external_id} user semantic text"),
3554                            extra_json: json!({}),
3555                            snippets: Vec::new(),
3556                        },
3557                        Message {
3558                            id: None,
3559                            idx: 1,
3560                            role: MessageRole::Agent,
3561                            author: None,
3562                            created_at: Some(1_700_000_000_600),
3563                            content: format!("{external_id} assistant semantic text"),
3564                            extra_json: json!({}),
3565                            snippets: Vec::new(),
3566                        },
3567                    ],
3568                ),
3569            )?;
3570        }
3571
3572        let first_conversation_id: i64 = storage.raw().query_row_map(
3573            "SELECT MIN(id) FROM conversations",
3574            &[] as &[ParamValue],
3575            |row| row.get_typed(0),
3576        )?;
3577        let batch = fetch_canonical_embedding_batch_inner_with_caps(
3578            &storage,
3579            0,
3580            8,
3581            None,
3582            SemanticCheckpointCaps {
3583                max_messages: 3,
3584                max_bytes: 0,
3585            },
3586        )?;
3587
3588        assert_eq!(batch.conversations_in_batch, 1);
3589        assert_eq!(batch.last_conversation_id, first_conversation_id);
3590        assert_eq!(batch.inputs.len(), 2);
3591        assert!(
3592            batch
3593                .inputs
3594                .iter()
3595                .all(|input| input.content.contains("cap-first"))
3596        );
3597        assert_eq!(batch.total_conversations, 3);
3598        Ok(())
3599    }
3600
3601    #[test]
3602    fn packet_embedding_inputs_from_storage_since_only_emits_new_canonical_messages() -> Result<()>
3603    {
3604        let temp = tempdir().unwrap();
3605        let db_path = temp.path().join("agent_search.db");
3606        let storage = FrankenStorage::open(&db_path)?;
3607        let agent_id = storage.ensure_agent(&Agent {
3608            id: None,
3609            slug: "codex".to_string(),
3610            name: "Codex".to_string(),
3611            version: None,
3612            kind: AgentKind::Cli,
3613        })?;
3614        storage.insert_conversation_tree(
3615            agent_id,
3616            None,
3617            &test_conversation_with_messages(
3618                "packet-delta",
3619                vec![
3620                    Message {
3621                        id: None,
3622                        idx: 0,
3623                        role: MessageRole::User,
3624                        author: None,
3625                        created_at: Some(1_700_000_000_500),
3626                        content: "existing semantic text".to_string(),
3627                        extra_json: json!({}),
3628                        snippets: Vec::new(),
3629                    },
3630                    Message {
3631                        id: None,
3632                        idx: 1,
3633                        role: MessageRole::Agent,
3634                        author: None,
3635                        created_at: Some(1_700_000_000_600),
3636                        content: "existing assistant text".to_string(),
3637                        extra_json: json!({}),
3638                        snippets: Vec::new(),
3639                    },
3640                ],
3641            ),
3642        )?;
3643        let watermark: i64 = storage.raw().query_row_map(
3644            "SELECT MAX(id) FROM messages",
3645            &[] as &[ParamValue],
3646            |row| row.get_typed(0),
3647        )?;
3648
3649        storage.insert_conversation_tree(
3650            agent_id,
3651            None,
3652            &test_conversation_with_messages(
3653                "packet-delta",
3654                vec![
3655                    Message {
3656                        id: None,
3657                        idx: 0,
3658                        role: MessageRole::User,
3659                        author: None,
3660                        created_at: Some(1_700_000_000_500),
3661                        content: "existing semantic text".to_string(),
3662                        extra_json: json!({}),
3663                        snippets: Vec::new(),
3664                    },
3665                    Message {
3666                        id: None,
3667                        idx: 1,
3668                        role: MessageRole::Agent,
3669                        author: None,
3670                        created_at: Some(1_700_000_000_600),
3671                        content: "existing assistant text".to_string(),
3672                        extra_json: json!({}),
3673                        snippets: Vec::new(),
3674                    },
3675                    Message {
3676                        id: None,
3677                        idx: 2,
3678                        role: MessageRole::Agent,
3679                        author: None,
3680                        created_at: Some(1_700_000_000_700),
3681                        content: "new packet semantic text".to_string(),
3682                        extra_json: json!({}),
3683                        snippets: Vec::new(),
3684                    },
3685                    Message {
3686                        id: None,
3687                        idx: 3,
3688                        role: MessageRole::System,
3689                        author: None,
3690                        created_at: Some(1_700_000_000_800),
3691                        content: String::new(),
3692                        extra_json: json!({}),
3693                        snippets: Vec::new(),
3694                    },
3695                ],
3696            ),
3697        )?;
3698
3699        let batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3700
3701        assert_eq!(batch.conversations_in_batch, 1);
3702        assert_eq!(batch.inputs.len(), 1);
3703        assert_eq!(batch.inputs[0].content, "new packet semantic text");
3704        assert_eq!(
3705            batch.inputs[0].role,
3706            role_code_from_str("assistant").unwrap()
3707        );
3708        let normalized_source_id =
3709            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3710        assert_eq!(
3711            batch.inputs[0].source_id,
3712            crc32fast::hash(normalized_source_id.as_bytes())
3713        );
3714        let expected_raw_max_id: i64 = storage.raw().query_row_map(
3715            "SELECT MAX(id) FROM messages",
3716            &[] as &[ParamValue],
3717            |row| row.get_typed(0),
3718        )?;
3719        assert_eq!(batch.raw_max_message_id, Some(expected_raw_max_id));
3720        Ok(())
3721    }
3722
3723    #[test]
3724    fn packet_catch_up_emits_expected_semantic_docs_after_watermark() -> Result<()> {
3725        let temp = tempdir().unwrap();
3726        let db_path = temp.path().join("agent_search.db");
3727        let storage = FrankenStorage::open(&db_path)?;
3728        let agent_id = storage.ensure_agent(&Agent {
3729            id: None,
3730            slug: "codex".to_string(),
3731            name: "Codex".to_string(),
3732            version: None,
3733            kind: AgentKind::Cli,
3734        })?;
3735        let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3736
3737        storage.insert_conversation_tree(
3738            agent_id,
3739            Some(workspace_id),
3740            &test_conversation_with_messages(
3741                "legacy-packet-semantics",
3742                vec![
3743                    Message {
3744                        id: None,
3745                        idx: 0,
3746                        role: MessageRole::User,
3747                        author: None,
3748                        created_at: Some(1_700_000_000_500),
3749                        content: "before watermark".to_string(),
3750                        extra_json: json!({}),
3751                        snippets: Vec::new(),
3752                    },
3753                    Message {
3754                        id: None,
3755                        idx: 1,
3756                        role: MessageRole::Agent,
3757                        author: None,
3758                        created_at: Some(1_700_000_000_600),
3759                        content: "before watermark assistant".to_string(),
3760                        extra_json: json!({}),
3761                        snippets: Vec::new(),
3762                    },
3763                ],
3764            ),
3765        )?;
3766
3767        let watermark: i64 = storage.raw().query_row_map(
3768            "SELECT MAX(id) FROM messages",
3769            &[] as &[ParamValue],
3770            |row| row.get_typed(0),
3771        )?;
3772
3773        storage.insert_conversation_tree(
3774            agent_id,
3775            Some(workspace_id),
3776            &test_conversation_with_messages(
3777                "legacy-packet-semantics",
3778                vec![
3779                    Message {
3780                        id: None,
3781                        idx: 0,
3782                        role: MessageRole::User,
3783                        author: None,
3784                        created_at: Some(1_700_000_000_500),
3785                        content: "before watermark".to_string(),
3786                        extra_json: json!({}),
3787                        snippets: Vec::new(),
3788                    },
3789                    Message {
3790                        id: None,
3791                        idx: 1,
3792                        role: MessageRole::Agent,
3793                        author: None,
3794                        created_at: Some(1_700_000_000_600),
3795                        content: "before watermark assistant".to_string(),
3796                        extra_json: json!({}),
3797                        snippets: Vec::new(),
3798                    },
3799                    Message {
3800                        id: None,
3801                        idx: 2,
3802                        role: MessageRole::Agent,
3803                        author: None,
3804                        created_at: Some(1_700_000_000_700),
3805                        content: "after watermark assistant".to_string(),
3806                        extra_json: json!({}),
3807                        snippets: Vec::new(),
3808                    },
3809                    Message {
3810                        id: None,
3811                        idx: 3,
3812                        role: MessageRole::System,
3813                        author: None,
3814                        created_at: Some(1_700_000_000_800),
3815                        content: String::new(),
3816                        extra_json: json!({}),
3817                        snippets: Vec::new(),
3818                    },
3819                ],
3820            ),
3821        )?;
3822        storage.insert_conversation_tree(
3823            agent_id,
3824            Some(workspace_id),
3825            &test_conversation_with_messages(
3826                "legacy-packet-semantics-second-conv",
3827                vec![
3828                    Message {
3829                        id: None,
3830                        idx: 0,
3831                        role: MessageRole::Tool,
3832                        author: None,
3833                        created_at: Some(1_700_000_000_900),
3834                        content: "after watermark tool".to_string(),
3835                        extra_json: json!({}),
3836                        snippets: Vec::new(),
3837                    },
3838                    Message {
3839                        id: None,
3840                        idx: 1,
3841                        role: MessageRole::System,
3842                        author: None,
3843                        created_at: Some(1_700_000_001_000),
3844                        content: String::new(),
3845                        extra_json: json!({}),
3846                        snippets: Vec::new(),
3847                    },
3848                ],
3849            ),
3850        )?;
3851
3852        let packet_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3853        let normalized_source_id =
3854            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3855        let source_id_hash = crc32fast::hash(normalized_source_id.as_bytes());
3856        let expected = vec![
3857            ComparableSemanticInput {
3858                message_id: u64::try_from(watermark + 1).unwrap(),
3859                created_at_ms: 1_700_000_000_700,
3860                agent_id: u32::try_from(agent_id).unwrap(),
3861                workspace_id: u32::try_from(workspace_id).unwrap(),
3862                source_id: source_id_hash,
3863                role: role_code_from_str("assistant").unwrap(),
3864                content: "after watermark assistant".to_string(),
3865            },
3866            ComparableSemanticInput {
3867                message_id: u64::try_from(watermark + 3).unwrap(),
3868                created_at_ms: 1_700_000_000_900,
3869                agent_id: u32::try_from(agent_id).unwrap(),
3870                workspace_id: u32::try_from(workspace_id).unwrap(),
3871                source_id: source_id_hash,
3872                role: role_code_from_str("tool").unwrap(),
3873                content: "after watermark tool".to_string(),
3874            },
3875        ];
3876
3877        assert_eq!(comparable_semantic_inputs(packet_batch.inputs), expected);
3878        assert_eq!(packet_batch.conversations_in_batch, 2);
3879        assert_eq!(packet_batch.raw_max_message_id, Some(watermark + 4));
3880        Ok(())
3881    }
3882
3883    #[test]
3884    fn packet_embedding_inputs_for_message_ids_matches_since_selection() -> Result<()> {
3885        let temp = tempdir().unwrap();
3886        let db_path = temp.path().join("agent_search.db");
3887        let storage = FrankenStorage::open(&db_path)?;
3888        let agent_id = storage.ensure_agent(&Agent {
3889            id: None,
3890            slug: "codex".to_string(),
3891            name: "Codex".to_string(),
3892            version: None,
3893            kind: AgentKind::Cli,
3894        })?;
3895        let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3896
3897        storage.insert_conversation_tree(
3898            agent_id,
3899            Some(workspace_id),
3900            &test_conversation_with_messages(
3901                "selected-vs-since",
3902                vec![
3903                    Message {
3904                        id: None,
3905                        idx: 0,
3906                        role: MessageRole::User,
3907                        author: None,
3908                        created_at: Some(1_700_000_100_100),
3909                        content: "before watermark".to_string(),
3910                        extra_json: json!({}),
3911                        snippets: Vec::new(),
3912                    },
3913                    Message {
3914                        id: None,
3915                        idx: 1,
3916                        role: MessageRole::Agent,
3917                        author: None,
3918                        created_at: Some(1_700_000_100_200),
3919                        content: "before watermark assistant".to_string(),
3920                        extra_json: json!({}),
3921                        snippets: Vec::new(),
3922                    },
3923                ],
3924            ),
3925        )?;
3926
3927        let watermark: i64 = storage.raw().query_row_map(
3928            "SELECT MAX(id) FROM messages",
3929            &[] as &[ParamValue],
3930            |row| row.get_typed(0),
3931        )?;
3932
3933        storage.insert_conversation_tree(
3934            agent_id,
3935            Some(workspace_id),
3936            &test_conversation_with_messages(
3937                "selected-vs-since",
3938                vec![
3939                    Message {
3940                        id: None,
3941                        idx: 0,
3942                        role: MessageRole::User,
3943                        author: None,
3944                        created_at: Some(1_700_000_100_100),
3945                        content: "before watermark".to_string(),
3946                        extra_json: json!({}),
3947                        snippets: Vec::new(),
3948                    },
3949                    Message {
3950                        id: None,
3951                        idx: 1,
3952                        role: MessageRole::Agent,
3953                        author: None,
3954                        created_at: Some(1_700_000_100_200),
3955                        content: "before watermark assistant".to_string(),
3956                        extra_json: json!({}),
3957                        snippets: Vec::new(),
3958                    },
3959                    Message {
3960                        id: None,
3961                        idx: 2,
3962                        role: MessageRole::Tool,
3963                        author: None,
3964                        created_at: Some(1_700_000_100_300),
3965                        content: "after watermark tool".to_string(),
3966                        extra_json: json!({}),
3967                        snippets: Vec::new(),
3968                    },
3969                    Message {
3970                        id: None,
3971                        idx: 3,
3972                        role: MessageRole::System,
3973                        author: None,
3974                        created_at: Some(1_700_000_100_400),
3975                        content: String::new(),
3976                        extra_json: json!({}),
3977                        snippets: Vec::new(),
3978                    },
3979                ],
3980            ),
3981        )?;
3982        storage.insert_conversation_tree(
3983            agent_id,
3984            Some(workspace_id),
3985            &test_conversation_with_messages(
3986                "selected-vs-since-second",
3987                vec![Message {
3988                    id: None,
3989                    idx: 0,
3990                    role: MessageRole::Agent,
3991                    author: None,
3992                    created_at: Some(1_700_000_100_500),
3993                    content: "after watermark assistant".to_string(),
3994                    extra_json: json!({}),
3995                    snippets: Vec::new(),
3996                }],
3997            ),
3998        )?;
3999
4000        let since_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
4001        let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
4002            "SELECT DISTINCT conversation_id
4003             FROM messages
4004             WHERE id > ?1
4005             ORDER BY conversation_id ASC",
4006            &[ParamValue::from(watermark)],
4007            |row| row.get_typed(0),
4008        )?;
4009        let selected_message_ids: HashSet<i64> = storage
4010            .raw()
4011            .query_map_collect(
4012                "SELECT id
4013                 FROM messages
4014                 WHERE id > ?1
4015                 ORDER BY id ASC",
4016                &[ParamValue::from(watermark)],
4017                |row| row.get_typed(0),
4018            )?
4019            .into_iter()
4020            .collect();
4021        let selected_inputs = packet_embedding_inputs_from_storage_for_message_ids(
4022            &storage,
4023            &conversation_ids,
4024            &selected_message_ids,
4025        )?;
4026
4027        assert_eq!(
4028            comparable_semantic_inputs(selected_inputs),
4029            comparable_semantic_inputs(since_batch.inputs)
4030        );
4031        Ok(())
4032    }
4033
4034    #[test]
4035    fn default_batch_size_uses_new_value() {
4036        // The test setup must not leak a caller-provided CASS_SEMANTIC_BATCH_SIZE
4037        // override, which would mask the constant bump we're asserting on.
4038        let _guard = EnvVarGuard::remove("CASS_SEMANTIC_BATCH_SIZE");
4039        let indexer = SemanticIndexer::new("hash", None).unwrap();
4040        assert_eq!(indexer.batch_size(), DEFAULT_SEMANTIC_BATCH_SIZE);
4041    }
4042
4043    #[test]
4044    fn semantic_watchdog_and_checkpoint_caps_have_derived_defaults() {
4045        let _warn = EnvVarGuard::remove("CASS_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS");
4046        let _fail = EnvVarGuard::remove("CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS");
4047        let _messages = EnvVarGuard::remove("CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT");
4048        let _bytes = EnvVarGuard::remove("CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT");
4049
4050        assert_eq!(
4051            resolved_semantic_embed_batch_warn_after_ms(),
4052            DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS
4053        );
4054        assert_eq!(
4055            resolved_semantic_embed_batch_fail_after_ms(),
4056            DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS
4057        );
4058        assert_eq!(
4059            SemanticCheckpointCaps::from_env(),
4060            SemanticCheckpointCaps {
4061                max_messages: DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT,
4062                max_bytes: DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT,
4063            }
4064        );
4065    }
4066
4067    #[test]
4068    fn parallel_prep_enabled_reuses_truthy_env_parser() {
4069        for (value, expected) in [
4070            ("1", true),
4071            ("true", true),
4072            (" YeS ", true),
4073            ("on", true),
4074            ("0", false),
4075            ("false", false),
4076            ("off", false),
4077        ] {
4078            let _guard = EnvVarGuard::set("CASS_SEMANTIC_PREP_PARALLEL", value);
4079            assert_eq!(parallel_prep_enabled(), expected, "env value {value:?}");
4080        }
4081
4082        let _guard = EnvVarGuard::remove("CASS_SEMANTIC_PREP_PARALLEL");
4083        assert!(!parallel_prep_enabled());
4084    }
4085
4086    #[test]
4087    fn saturating_u64_from_usize_covers_bounds() {
4088        assert_eq!(saturating_u64_from_usize(0), 0);
4089        assert_eq!(saturating_u64_from_usize(42), 42);
4090        assert_eq!(
4091            saturating_u64_from_usize(usize::MAX),
4092            u64::try_from(usize::MAX).unwrap_or(u64::MAX)
4093        );
4094    }
4095
4096    /// `coding_agent_session_search-ibuuh.32` (sink #3 equivalence gate):
4097    /// the packet-driven `semantic_inputs_from_packets` helper must
4098    /// produce the same `EmbeddingInput` list a fresh storage replay
4099    /// returns for the same canonical corpus. Once this passes, callers
4100    /// that already hold packets (rebuild pipeline, salvage replay,
4101    /// repair flows) can drive the semantic preparation consumer
4102    /// without a second canonical-row round-trip.
4103    #[test]
4104    fn semantic_inputs_from_packets_matches_storage_replay() -> Result<()> {
4105        let temp = tempdir().unwrap();
4106        let db_path = temp.path().join("agent_search.db");
4107        let storage = FrankenStorage::open(&db_path)?;
4108
4109        let agent_id_codex = storage.ensure_agent(&Agent {
4110            id: None,
4111            slug: "codex".to_string(),
4112            name: "Codex".to_string(),
4113            version: None,
4114            kind: AgentKind::Cli,
4115        })?;
4116        let agent_id_claude = storage.ensure_agent(&Agent {
4117            id: None,
4118            slug: "claude_code".to_string(),
4119            name: "Claude Code".to_string(),
4120            version: None,
4121            kind: AgentKind::Cli,
4122        })?;
4123        let workspace_id =
4124            storage.ensure_workspace(Path::new("/tmp/semantic-equivalence-ws"), None)?;
4125
4126        // Two conversations on different agents, mixed roles, including
4127        // an empty-content system message that the semantic projection
4128        // must filter (matches the legacy storage replay).
4129        storage.insert_conversation_tree(
4130            agent_id_codex,
4131            Some(workspace_id),
4132            &test_conversation_with_messages(
4133                "packet-equiv-1",
4134                vec![
4135                    Message {
4136                        id: None,
4137                        idx: 0,
4138                        role: MessageRole::User,
4139                        author: None,
4140                        created_at: Some(1_700_000_000_500),
4141                        content: "first user prompt".to_string(),
4142                        extra_json: json!({}),
4143                        snippets: Vec::new(),
4144                    },
4145                    Message {
4146                        id: None,
4147                        idx: 1,
4148                        role: MessageRole::Agent,
4149                        author: None,
4150                        created_at: Some(1_700_000_000_600),
4151                        content: "first assistant reply".to_string(),
4152                        extra_json: json!({}),
4153                        snippets: Vec::new(),
4154                    },
4155                    Message {
4156                        id: None,
4157                        idx: 2,
4158                        role: MessageRole::System,
4159                        author: None,
4160                        created_at: Some(1_700_000_000_700),
4161                        // Empty content is filtered by both paths.
4162                        content: String::new(),
4163                        extra_json: json!({}),
4164                        snippets: Vec::new(),
4165                    },
4166                ],
4167            ),
4168        )?;
4169        storage.insert_conversation_tree(
4170            agent_id_claude,
4171            Some(workspace_id),
4172            &test_conversation_with_messages(
4173                "packet-equiv-2",
4174                vec![
4175                    Message {
4176                        id: None,
4177                        idx: 0,
4178                        role: MessageRole::Tool,
4179                        author: Some("ripgrep".to_string()),
4180                        created_at: Some(1_700_000_001_500),
4181                        content: "tool output line".to_string(),
4182                        extra_json: json!({}),
4183                        snippets: Vec::new(),
4184                    },
4185                    Message {
4186                        id: None,
4187                        idx: 1,
4188                        role: MessageRole::Agent,
4189                        author: None,
4190                        created_at: Some(1_700_000_001_600),
4191                        content: "second assistant reply".to_string(),
4192                        extra_json: json!({}),
4193                        snippets: Vec::new(),
4194                    },
4195                ],
4196            ),
4197        )?;
4198
4199        // Legacy path: the storage-driven replay that the rebuild
4200        // pipeline currently uses.
4201        let storage_inputs = packet_embedding_inputs_from_storage(&storage)?;
4202
4203        // Packet-driven path: re-fetch the canonical envelopes (so we
4204        // get the storage-internal agent/workspace ids the rebuild path
4205        // would normally pair with packets), then convert those rows
4206        // into ConversationPackets via canonical replay and feed them
4207        // through `semantic_inputs_from_packets`.
4208        let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
4209            "SELECT DISTINCT m.conversation_id
4210             FROM messages m
4211             JOIN conversations c ON c.id = m.conversation_id
4212             ORDER BY m.conversation_id ASC",
4213            &[] as &[ParamValue],
4214            |row| row.get_typed(0),
4215        )?;
4216        let envelopes = fetch_canonical_embedding_conversations(&storage, &conversation_ids)?;
4217        let mut grouped_messages =
4218            storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
4219        let mut packets: Vec<ConversationPacket> = Vec::with_capacity(envelopes.len());
4220        let mut contexts: Vec<SemanticPacketContext> = Vec::with_capacity(envelopes.len());
4221        for envelope in &envelopes {
4222            let messages = grouped_messages
4223                .remove(&envelope.conversation_id)
4224                .unwrap_or_default();
4225            let provenance = canonical_embedding_packet_provenance(envelope);
4226            let canonical = canonical_embedding_conversation(envelope, &provenance, messages);
4227            packets.push(ConversationPacket::from_canonical_replay(
4228                &canonical, provenance,
4229            ));
4230            contexts.push(SemanticPacketContext {
4231                conversation_id: envelope.conversation_id,
4232                agent_id: saturating_u32_from_i64(envelope.agent_id),
4233                workspace_id: saturating_u32_from_i64(envelope.workspace_id.unwrap_or(0)),
4234            });
4235        }
4236        let packet_inputs = semantic_inputs_from_packets(&packets, &contexts)?;
4237
4238        // The two paths must produce the same EmbeddingInput list
4239        // (sortable comparison normalizes ordering across the two
4240        // helpers' iteration orders).
4241        assert!(
4242            !storage_inputs.is_empty(),
4243            "fixture should produce non-empty semantic inputs (sanity)"
4244        );
4245        assert_eq!(
4246            comparable_semantic_inputs(storage_inputs.clone()),
4247            comparable_semantic_inputs(packet_inputs.clone()),
4248            "packet-driven semantic preparation must match storage replay byte-for-byte"
4249        );
4250
4251        // Sanity-pin a couple of contract details so a regression in
4252        // either path (e.g. role normalization or empty-content
4253        // filtering) trips a clear assertion rather than a generic
4254        // length mismatch.
4255        let storage_count = storage_inputs.len();
4256        let packet_count = packet_inputs.len();
4257        assert_eq!(
4258            storage_count, packet_count,
4259            "storage and packet semantic input counts must agree exactly"
4260        );
4261        // Empty-content system message must NOT appear in the output.
4262        assert!(
4263            packet_inputs.iter().all(|input| !input.content.is_empty()),
4264            "empty content must be filtered by the packet semantic projection"
4265        );
4266        // The remote-host source_id pins the cross-path provenance hash.
4267        let normalized_source_id =
4268            normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
4269        let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
4270        assert!(
4271            packet_inputs
4272                .iter()
4273                .all(|input| input.source_id == expected_hash),
4274            "every emitted EmbeddingInput must hash provenance via the packet's normalized source_id"
4275        );
4276
4277        Ok(())
4278    }
4279
4280    /// Length-mismatch defense: if a caller hands `semantic_inputs_from_packets`
4281    /// a packet/context slice pair of different lengths, the helper must
4282    /// return an error rather than silently mis-correlating ids. Pinning
4283    /// this is part of the bead's "shadow / compare mode plus explicit
4284    /// kill-switch" acceptance language.
4285    #[test]
4286    fn semantic_inputs_from_packets_rejects_length_mismatch() {
4287        let provenance = ConversationPacketProvenance::local();
4288        let canonical = test_conversation("packet-mismatch", "hello");
4289        let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
4290        let result = semantic_inputs_from_packets(&[packet], &[]);
4291        assert!(
4292            result.is_err(),
4293            "expected error on packet/context length mismatch"
4294        );
4295        let err = result.unwrap_err().to_string();
4296        assert!(
4297            err.contains("length mismatch"),
4298            "error should mention length mismatch, got: {err}"
4299        );
4300    }
4301}