1use std::collections::HashSet;
2use std::fs;
3use std::io::IsTerminal;
4use std::path::{Path, PathBuf};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8use frankensearch::index::{
9 HNSW_DEFAULT_EF_CONSTRUCTION as FS_HNSW_DEFAULT_EF_CONSTRUCTION,
10 HNSW_DEFAULT_M as FS_HNSW_DEFAULT_M, HnswConfig as FsHnswConfig, HnswIndex as FsHnswIndex,
11 Quantization as FsQuantization, VectorIndex as FsVectorIndex,
12 VectorIndexWriter as FsVectorIndexWriter,
13};
14use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
15use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
16use rayon::prelude::*;
17
18use crate::indexer::memoization::{
19 ContentAddressedMemoCache, MemoCacheAuditRecord, MemoContentHash, MemoKey, MemoLookup,
20};
21use crate::indexer::responsiveness;
22use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
23use crate::model::types::{Conversation, Message};
24use crate::search::canonicalize::{canonicalize_for_embedding, content_hash};
25use crate::search::embedder::Embedder;
26use crate::search::fastembed_embedder::FastEmbedder;
27use crate::search::hash_embedder::HashEmbedder;
28use crate::search::policy::{CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy};
29use crate::search::semantic_manifest::{
30 ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
31 TierKind,
32};
33use crate::search::tantivy::{
34 normalized_index_origin_host, normalized_index_origin_kind, normalized_index_source_id,
35};
36use crate::search::vector_index::{
37 ROLE_USER, SemanticDocId, VECTOR_INDEX_DIR, role_code_from_str, vector_index_path,
38};
39use crate::storage::sqlite::FrankenStorage;
40
41const DEFAULT_SEMANTIC_BATCH_SIZE: usize = 128;
46const DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY: usize = 4_096;
47const SEMANTIC_PREP_MEMO_ALGORITHM: &str = "semantic_prepare_window";
48const SEMANTIC_PREP_MEMO_VERSION: &str = "canonicalize_for_embedding:v2:stable-content-hash";
49
50fn resolved_default_batch_size() -> usize {
51 dotenvy::var("CASS_SEMANTIC_BATCH_SIZE")
52 .ok()
53 .and_then(|v| v.parse::<usize>().ok())
54 .filter(|v| *v > 0)
55 .unwrap_or(DEFAULT_SEMANTIC_BATCH_SIZE)
56}
57
58fn resolved_semantic_prep_memo_capacity() -> usize {
59 dotenvy::var("CASS_SEMANTIC_PREP_MEMO_CAPACITY")
60 .ok()
61 .and_then(|v| v.parse::<usize>().ok())
62 .filter(|v| *v > 0)
63 .unwrap_or(DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY)
64}
65
66fn parallel_prep_enabled() -> bool {
81 env_truthy("CASS_SEMANTIC_PREP_PARALLEL")
82}
83
84fn saturating_u64_from_usize(value: usize) -> u64 {
85 u64::try_from(value).unwrap_or(u64::MAX)
86}
87
88#[derive(Debug, Clone)]
89pub struct EmbeddingInput {
90 pub message_id: u64,
91 pub created_at_ms: i64,
92 pub agent_id: u32,
93 pub workspace_id: u32,
94 pub source_id: u32,
95 pub role: u8,
96 pub chunk_idx: u8,
97 pub content: String,
98}
99
100impl EmbeddingInput {
101 pub fn new(message_id: u64, content: impl Into<String>) -> Self {
102 Self {
103 message_id,
104 created_at_ms: 0,
105 agent_id: 0,
106 workspace_id: 0,
107 source_id: 0,
108 role: ROLE_USER,
109 chunk_idx: 0,
110 content: content.into(),
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
116pub struct EmbeddedMessage {
117 pub message_id: u64,
118 pub created_at_ms: i64,
119 pub agent_id: u32,
120 pub workspace_id: u32,
121 pub source_id: u32,
122 pub role: u8,
123 pub chunk_idx: u8,
124 pub content_hash: [u8; 32],
125 pub embedding: Vec<f32>,
126}
127
128#[derive(Debug, Clone)]
129pub struct SemanticBackfillBatchPlan {
130 pub tier: TierKind,
131 pub db_fingerprint: String,
132 pub model_revision: String,
133 pub total_conversations: u64,
134 pub conversations_in_batch: u64,
135 pub last_offset: i64,
136}
137
138#[derive(Debug, Clone)]
139pub struct SemanticBackfillStoragePlan {
140 pub tier: TierKind,
141 pub db_fingerprint: String,
142 pub model_revision: String,
143 pub max_conversations: usize,
144}
145
146#[derive(Debug, Clone)]
147pub struct SemanticBackfillBatchOutcome {
148 pub tier: TierKind,
149 pub embedder_id: String,
150 pub embedded_docs: u64,
151 pub conversations_processed: u64,
152 pub total_conversations: u64,
153 pub last_offset: i64,
154 pub checkpoint_saved: bool,
155 pub published: bool,
156 pub index_path: PathBuf,
157 pub manifest_path: PathBuf,
158}
159
160#[derive(Debug, Clone)]
161pub struct SemanticShardBuildPlan {
162 pub tier: TierKind,
163 pub db_fingerprint: String,
164 pub model_revision: String,
165 pub total_conversations: u64,
166 pub max_records_per_shard: usize,
167 pub build_ann: bool,
168}
169
170#[derive(Debug, Clone)]
171pub struct SemanticShardBuildOutcome {
172 pub tier: TierKind,
173 pub embedder_id: String,
174 pub shard_count: u32,
175 pub doc_count: u64,
176 pub total_conversations: u64,
177 pub index_paths: Vec<PathBuf>,
178 pub ann_index_paths: Vec<PathBuf>,
179 pub shard_manifest_path: PathBuf,
180 pub complete: bool,
181}
182
183#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
184#[serde(rename_all = "snake_case")]
185pub(crate) enum SemanticBackfillSchedulerState {
186 Running,
187 Paused,
188 Disabled,
189}
190
191impl SemanticBackfillSchedulerState {
192 pub(crate) fn as_str(self) -> &'static str {
193 match self {
194 Self::Running => "running",
195 Self::Paused => "paused",
196 Self::Disabled => "disabled",
197 }
198 }
199}
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
202#[serde(rename_all = "snake_case")]
203pub(crate) enum SemanticBackfillSchedulerReason {
204 IdleBudgetAvailable,
205 OperatorDisabled,
206 PolicyDisabled,
207 ForegroundPressure,
208 LexicalRepairActive,
209 CapacityBelowFloor,
210 ThreadBudgetZero,
211 BatchBudgetZero,
212}
213
214impl SemanticBackfillSchedulerReason {
215 pub(crate) fn next_step(self) -> &'static str {
216 match self {
217 Self::IdleBudgetAvailable => "background semantic backfill is within idle budgets",
218 Self::OperatorDisabled => {
219 "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE"
220 }
221 Self::PolicyDisabled => "semantic policy disables background semantic backfill",
222 Self::ForegroundPressure => {
223 "foreground pressure is present; retry after the idle delay"
224 }
225 Self::LexicalRepairActive => "lexical repair is active; semantic backfill is yielding",
226 Self::CapacityBelowFloor => {
227 "machine responsiveness capacity is below the semantic backfill floor"
228 }
229 Self::ThreadBudgetZero => "semantic backfill thread budget is zero",
230 Self::BatchBudgetZero => "semantic backfill batch budget is zero",
231 }
232 }
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub(crate) struct SemanticBackfillSchedulerSignals {
237 pub foreground_pressure: bool,
238 pub lexical_repair_active: bool,
239 pub force: bool,
240 pub operator_disabled: bool,
241}
242
243impl SemanticBackfillSchedulerSignals {
244 pub(crate) fn from_env() -> Self {
245 Self {
246 foreground_pressure: env_truthy("CASS_SEMANTIC_BACKFILL_FOREGROUND_ACTIVE"),
247 lexical_repair_active: env_truthy("CASS_SEMANTIC_BACKFILL_LEXICAL_REPAIR_ACTIVE"),
248 force: env_truthy("CASS_SEMANTIC_BACKFILL_FORCE"),
249 operator_disabled: env_truthy("CASS_SEMANTIC_BACKFILL_DISABLE"),
250 }
251 }
252}
253
254#[derive(Debug, Clone, serde::Serialize)]
255pub(crate) struct SemanticBackfillSchedulerDecision {
256 pub state: SemanticBackfillSchedulerState,
257 pub reason: SemanticBackfillSchedulerReason,
258 pub requested_batch_conversations: usize,
259 pub scheduled_batch_conversations: usize,
260 pub current_capacity_pct: u32,
261 pub min_capacity_pct: u32,
262 pub max_backfill_threads: usize,
263 pub idle_delay_seconds: u64,
264 pub chunk_timeout_seconds: u64,
265 pub foreground_pressure: bool,
266 pub lexical_repair_active: bool,
267 pub forced: bool,
268 pub next_eligible_after_ms: u64,
269}
270
271impl SemanticBackfillSchedulerDecision {
272 pub(crate) fn should_run(&self) -> bool {
273 matches!(self.state, SemanticBackfillSchedulerState::Running)
274 }
275}
276
277fn env_truthy(key: &str) -> bool {
278 dotenvy::var(key)
279 .ok()
280 .map(|value| {
281 matches!(
282 value.trim().to_ascii_lowercase().as_str(),
283 "1" | "true" | "yes" | "on"
284 )
285 })
286 .unwrap_or(false)
287}
288
289fn env_backfill_min_capacity_pct() -> u32 {
290 dotenvy::var("CASS_SEMANTIC_BACKFILL_MIN_CAPACITY_PCT")
291 .ok()
292 .and_then(|value| value.trim().parse::<u32>().ok())
293 .map(|value| value.clamp(1, 100))
294 .unwrap_or(75)
295}
296
297pub(crate) fn semantic_backfill_scheduler_decision(
298 policy: &SemanticPolicy,
299 requested_batch_conversations: usize,
300 signals: &SemanticBackfillSchedulerSignals,
301) -> SemanticBackfillSchedulerDecision {
302 semantic_backfill_scheduler_decision_for_capacity(
303 policy,
304 requested_batch_conversations,
305 signals,
306 responsiveness::current_capacity_pct(),
307 )
308}
309
310pub(crate) fn semantic_backfill_scheduler_decision_for_capacity(
311 policy: &SemanticPolicy,
312 requested_batch_conversations: usize,
313 signals: &SemanticBackfillSchedulerSignals,
314 current_capacity_pct: u32,
315) -> SemanticBackfillSchedulerDecision {
316 let min_capacity_pct = env_backfill_min_capacity_pct();
317 let paused_delay_ms = policy.idle_delay_seconds.saturating_mul(1000);
318 let mut decision = SemanticBackfillSchedulerDecision {
319 state: SemanticBackfillSchedulerState::Running,
320 reason: SemanticBackfillSchedulerReason::IdleBudgetAvailable,
321 requested_batch_conversations,
322 scheduled_batch_conversations: requested_batch_conversations,
323 current_capacity_pct: current_capacity_pct.clamp(0, 100),
324 min_capacity_pct,
325 max_backfill_threads: policy.max_backfill_threads,
326 idle_delay_seconds: policy.idle_delay_seconds,
327 chunk_timeout_seconds: policy.chunk_timeout_seconds,
328 foreground_pressure: signals.foreground_pressure,
329 lexical_repair_active: signals.lexical_repair_active,
330 forced: signals.force,
331 next_eligible_after_ms: 0,
332 };
333
334 if requested_batch_conversations == 0 {
335 return stopped_scheduler_decision(
336 decision,
337 SemanticBackfillSchedulerState::Disabled,
338 SemanticBackfillSchedulerReason::BatchBudgetZero,
339 paused_delay_ms,
340 );
341 }
342 if policy.max_backfill_threads == 0 && !signals.force {
343 return stopped_scheduler_decision(
344 decision,
345 SemanticBackfillSchedulerState::Disabled,
346 SemanticBackfillSchedulerReason::ThreadBudgetZero,
347 paused_delay_ms,
348 );
349 }
350 if signals.operator_disabled && !signals.force {
351 return stopped_scheduler_decision(
352 decision,
353 SemanticBackfillSchedulerState::Disabled,
354 SemanticBackfillSchedulerReason::OperatorDisabled,
355 paused_delay_ms,
356 );
357 }
358 if !policy.mode.should_build_semantic() && !signals.force {
359 return stopped_scheduler_decision(
360 decision,
361 SemanticBackfillSchedulerState::Disabled,
362 SemanticBackfillSchedulerReason::PolicyDisabled,
363 paused_delay_ms,
364 );
365 }
366 if signals.lexical_repair_active && !signals.force {
367 return stopped_scheduler_decision(
368 decision,
369 SemanticBackfillSchedulerState::Paused,
370 SemanticBackfillSchedulerReason::LexicalRepairActive,
371 paused_delay_ms,
372 );
373 }
374 if signals.foreground_pressure && !signals.force {
375 return stopped_scheduler_decision(
376 decision,
377 SemanticBackfillSchedulerState::Paused,
378 SemanticBackfillSchedulerReason::ForegroundPressure,
379 paused_delay_ms,
380 );
381 }
382 if current_capacity_pct < min_capacity_pct && !signals.force {
383 return stopped_scheduler_decision(
384 decision,
385 SemanticBackfillSchedulerState::Paused,
386 SemanticBackfillSchedulerReason::CapacityBelowFloor,
387 paused_delay_ms,
388 );
389 }
390
391 let capacity = current_capacity_pct.clamp(1, 100) as usize;
392 let scaled = requested_batch_conversations.saturating_mul(capacity) / 100;
393 decision.scheduled_batch_conversations = scaled.max(1).min(requested_batch_conversations);
394 decision
395}
396
397fn stopped_scheduler_decision(
398 mut decision: SemanticBackfillSchedulerDecision,
399 state: SemanticBackfillSchedulerState,
400 reason: SemanticBackfillSchedulerReason,
401 next_eligible_after_ms: u64,
402) -> SemanticBackfillSchedulerDecision {
403 decision.state = state;
404 decision.reason = reason;
405 decision.scheduled_batch_conversations = 0;
406 decision.next_eligible_after_ms = next_eligible_after_ms;
407 decision
408}
409
410fn now_ms() -> i64 {
411 SystemTime::now()
412 .duration_since(UNIX_EPOCH)
413 .map(|duration| duration.as_millis() as i64)
414 .unwrap_or(0)
415}
416
417fn hnsw_index_path(data_dir: &Path, embedder_id: &str) -> PathBuf {
418 data_dir
419 .join(VECTOR_INDEX_DIR)
420 .join(format!("hnsw-{embedder_id}.chsw"))
421}
422
423fn safe_path_component(raw: &str) -> String {
424 raw.chars()
425 .map(|ch| {
426 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
427 ch
428 } else {
429 '_'
430 }
431 })
432 .collect()
433}
434
435fn semantic_staging_index_path(
436 data_dir: &Path,
437 tier: TierKind,
438 embedder_id: &str,
439 db_fingerprint: &str,
440) -> PathBuf {
441 let fingerprint_hash = crc32fast::hash(db_fingerprint.as_bytes());
442 data_dir.join(VECTOR_INDEX_DIR).join(format!(
443 ".staging-{}-{}-{fingerprint_hash:08x}.fsvi",
444 tier.as_str(),
445 safe_path_component(embedder_id)
446 ))
447}
448
449fn semantic_generation_fingerprint_component(db_fingerprint: &str) -> String {
450 blake3::hash(db_fingerprint.as_bytes())
451 .to_hex()
452 .chars()
453 .take(16)
454 .collect()
455}
456
457fn semantic_shard_generation_dir(
458 data_dir: &Path,
459 tier: TierKind,
460 embedder_id: &str,
461 db_fingerprint: &str,
462) -> PathBuf {
463 let fingerprint_hash = semantic_generation_fingerprint_component(db_fingerprint);
464 data_dir.join(VECTOR_INDEX_DIR).join("shards").join(format!(
465 "{}-{}-{fingerprint_hash}",
466 tier.as_str(),
467 safe_path_component(embedder_id),
468 ))
469}
470
471fn semantic_shard_index_path(
472 data_dir: &Path,
473 tier: TierKind,
474 embedder_id: &str,
475 db_fingerprint: &str,
476 shard_index: u32,
477) -> PathBuf {
478 semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
479 .join(format!("shard-{shard_index:05}.fsvi"))
480}
481
482fn semantic_shard_ann_index_path(
483 data_dir: &Path,
484 tier: TierKind,
485 embedder_id: &str,
486 db_fingerprint: &str,
487 shard_index: u32,
488) -> PathBuf {
489 semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
490 .join(format!("shard-{shard_index:05}.chsw"))
491}
492
493#[cfg(not(windows))]
494fn sync_parent_directory(path: &Path) -> Result<()> {
495 let Some(parent) = path.parent() else {
496 return Ok(());
497 };
498 let directory = fs::File::open(parent)
499 .with_context(|| format!("opening parent directory {}", parent.display()))?;
500 directory
501 .sync_all()
502 .with_context(|| format!("syncing parent directory {}", parent.display()))
503}
504
505#[cfg(windows)]
506fn sync_parent_directory(_path: &Path) -> Result<()> {
507 Ok(())
508}
509
510fn semantic_doc_id_for_embedded(embedded: &EmbeddedMessage) -> String {
511 SemanticDocId {
512 message_id: embedded.message_id,
513 chunk_idx: embedded.chunk_idx,
514 agent_id: embedded.agent_id,
515 workspace_id: embedded.workspace_id,
516 source_id: embedded.source_id,
517 role: embedded.role,
518 created_at_ms: embedded.created_at_ms,
519 content_hash: Some(embedded.content_hash),
520 }
521 .to_doc_id_string()
522}
523
524struct CanonicalEmbeddingConversationRow {
525 conversation_id: i64,
526 agent_slug: String,
527 agent_id: i64,
528 workspace: Option<PathBuf>,
529 workspace_id: Option<i64>,
530 external_id: Option<String>,
531 title: Option<String>,
532 source_path: PathBuf,
533 started_at: Option<i64>,
534 ended_at: Option<i64>,
535 source_id: Option<String>,
536 origin_host: Option<String>,
537}
538
539struct CanonicalEmbeddingBatch {
540 inputs: Vec<EmbeddingInput>,
541 conversations_in_batch: u64,
542 last_conversation_id: i64,
543 total_conversations: u64,
544}
545
546pub(crate) struct CanonicalIncrementalEmbeddingBatch {
547 pub inputs: Vec<EmbeddingInput>,
548 pub conversations_in_batch: u64,
549 pub raw_max_message_id: Option<i64>,
550}
551
552fn matching_semantic_checkpoint_offset(
553 manifest: &SemanticManifest,
554 tier: TierKind,
555 embedder_id: &str,
556 db_fingerprint: &str,
557) -> i64 {
558 manifest
559 .checkpoint
560 .as_ref()
561 .filter(|checkpoint| {
562 checkpoint.tier == tier
563 && checkpoint.embedder_id == embedder_id
564 && checkpoint.is_valid(db_fingerprint)
565 })
566 .map_or(0, |checkpoint| checkpoint.last_offset)
567}
568
569fn total_semantic_conversations(storage: &FrankenStorage) -> Result<u64> {
570 let count: i64 = storage
571 .raw()
572 .query_row_map(
573 "SELECT COUNT(DISTINCT m.conversation_id)
574 FROM messages m
575 JOIN conversations c ON c.id = m.conversation_id",
576 &[] as &[ParamValue],
577 |row| row.get_typed(0),
578 )
579 .with_context(|| "counting canonical conversations with semantic messages")?;
580 Ok(u64::try_from(count.max(0)).unwrap_or(u64::MAX))
581}
582
583pub(crate) fn message_id_from_db(raw: i64) -> Option<u64> {
584 u64::try_from(raw).ok()
585}
586
587pub(crate) fn saturating_u32_from_i64(raw: i64) -> u32 {
588 match u32::try_from(raw) {
589 Ok(value) => value,
590 Err(_) if raw.is_negative() => 0,
591 Err(_) => u32::MAX,
592 }
593}
594
595fn canonical_embedding_created_at_ms(message_id: u64, created_at: Option<i64>) -> i64 {
596 created_at.unwrap_or_else(|| {
604 tracing::warn!(
605 message_id,
606 "semantic backfill: row has NULL created_at; defaulting to 0 (Unix epoch). \
607 Downstream time-range filters will treat this message as 1970-01-01."
608 );
609 0
610 })
611}
612
613fn canonical_embedding_packet_provenance(
614 row: &CanonicalEmbeddingConversationRow,
615) -> ConversationPacketProvenance {
616 let source_id =
617 normalized_index_source_id(row.source_id.as_deref(), None, row.origin_host.as_deref());
618 ConversationPacketProvenance {
619 origin_kind: normalized_index_origin_kind(&source_id, None),
620 origin_host: normalized_index_origin_host(row.origin_host.as_deref()),
621 source_id,
622 }
623}
624
625fn canonical_embedding_conversation(
626 row: &CanonicalEmbeddingConversationRow,
627 provenance: &ConversationPacketProvenance,
628 messages: Vec<Message>,
629) -> Conversation {
630 Conversation {
631 id: Some(row.conversation_id),
632 agent_slug: row.agent_slug.clone(),
633 workspace: row.workspace.clone(),
634 external_id: row.external_id.clone(),
635 title: row.title.clone(),
636 source_path: row.source_path.clone(),
637 started_at: row.started_at,
638 ended_at: row.ended_at,
639 approx_tokens: None,
640 metadata_json: serde_json::Value::Null,
641 messages,
642 source_id: provenance.source_id.clone(),
643 origin_host: provenance.origin_host.clone(),
644 }
645}
646
647fn embedding_input_from_packet_message(
648 conversation_id: i64,
649 agent_id: u32,
650 workspace_id: u32,
651 source_id_hash: u32,
652 message: &crate::model::conversation_packet::ConversationPacketMessage,
653) -> Option<EmbeddingInput> {
654 let Some(raw_message_id) = message.message_id else {
655 tracing::warn!(
656 conversation_id,
657 message_idx = message.idx,
658 "skipping semantic backfill message without canonical id in ConversationPacket replay"
659 );
660 return None;
661 };
662 let Some(message_id) = message_id_from_db(raw_message_id) else {
663 tracing::warn!(
664 conversation_id,
665 raw_message_id,
666 "skipping out-of-range id during semantic backfill"
667 );
668 return None;
669 };
670 Some(EmbeddingInput {
671 message_id,
672 created_at_ms: canonical_embedding_created_at_ms(message_id, message.created_at),
673 agent_id,
674 workspace_id,
675 source_id: source_id_hash,
676 role: role_code_from_str(&message.role).unwrap_or(ROLE_USER),
677 chunk_idx: 0,
678 content: message.content.clone(),
679 })
680}
681
682fn embedding_inputs_from_conversation_packet(
683 row: &CanonicalEmbeddingConversationRow,
684 packet: &ConversationPacket,
685) -> Vec<EmbeddingInput> {
686 let agent_id = saturating_u32_from_i64(row.agent_id);
687 let workspace_id = saturating_u32_from_i64(row.workspace_id.unwrap_or(0));
688 let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
689 packet
690 .projections
691 .semantic
692 .message_indices
693 .iter()
694 .filter_map(|message_index| {
695 packet
696 .payload
697 .messages
698 .get(*message_index)
699 .and_then(|message| {
700 embedding_input_from_packet_message(
701 row.conversation_id,
702 agent_id,
703 workspace_id,
704 source_id_hash,
705 message,
706 )
707 })
708 })
709 .collect()
710}
711
712fn fetch_canonical_embedding_conversations(
713 storage: &FrankenStorage,
714 conversation_ids: &[i64],
715) -> Result<Vec<CanonicalEmbeddingConversationRow>> {
716 let mut envelope_sql = String::from(
717 "SELECT c.id,
718 COALESCE(a.slug, 'unknown'),
719 COALESCE(c.agent_id, 0),
720 c.workspace_id,
721 w.path,
722 c.external_id,
723 c.title,
724 c.source_path,
725 c.started_at,
726 c.ended_at,
727 c.source_id,
728 c.origin_host
729 FROM conversations c
730 LEFT JOIN agents a ON a.id = c.agent_id
731 LEFT JOIN workspaces w ON w.id = c.workspace_id
732 WHERE c.id IN (",
733 );
734 let mut params = Vec::with_capacity(conversation_ids.len());
735 for (idx, conversation_id) in conversation_ids.iter().enumerate() {
736 if idx > 0 {
737 envelope_sql.push_str(", ");
738 }
739 envelope_sql.push_str(&format!("?{}", idx + 1));
740 params.push(ParamValue::from(*conversation_id));
741 }
742 envelope_sql.push_str(") ORDER BY c.id ASC");
743
744 storage
745 .raw()
746 .query_map_collect(&envelope_sql, ¶ms, |row| {
747 let workspace_path: Option<String> = row.get_typed(4)?;
748 Ok(CanonicalEmbeddingConversationRow {
749 conversation_id: row.get_typed(0)?,
750 agent_slug: row.get_typed(1)?,
751 agent_id: row.get_typed(2)?,
752 workspace_id: row.get_typed(3)?,
753 workspace: workspace_path.map(PathBuf::from),
754 external_id: row.get_typed(5)?,
755 title: row.get_typed(6)?,
756 source_path: PathBuf::from(row.get_typed::<String>(7)?),
757 started_at: row.get_typed(8)?,
758 ended_at: row.get_typed(9)?,
759 source_id: row.get_typed(10)?,
760 origin_host: row.get_typed(11)?,
761 })
762 })
763 .with_context(|| {
764 format!(
765 "fetching semantic backfill conversation envelopes for {} conversations",
766 conversation_ids.len()
767 )
768 })
769}
770
771#[allow(dead_code)]
782#[derive(Debug, Clone, Copy)]
783pub(crate) struct SemanticPacketContext {
784 pub conversation_id: i64,
785 pub agent_id: u32,
786 pub workspace_id: u32,
787}
788
789#[allow(dead_code)]
809pub(crate) fn semantic_inputs_from_packets(
810 packets: &[ConversationPacket],
811 contexts: &[SemanticPacketContext],
812) -> Result<Vec<EmbeddingInput>> {
813 if packets.len() != contexts.len() {
814 anyhow::bail!(
815 "semantic_inputs_from_packets length mismatch: {} packets vs {} contexts",
816 packets.len(),
817 contexts.len()
818 );
819 }
820 let mut inputs = Vec::new();
821 for (packet, context) in packets.iter().zip(contexts.iter()) {
822 let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
823 for &message_index in &packet.projections.semantic.message_indices {
824 let Some(message) = packet.payload.messages.get(message_index) else {
825 anyhow::bail!(
826 "packet semantic projection references missing message index {} \
827 (packet has {} messages)",
828 message_index,
829 packet.payload.messages.len()
830 );
831 };
832 if let Some(input) = embedding_input_from_packet_message(
833 context.conversation_id,
834 context.agent_id,
835 context.workspace_id,
836 source_id_hash,
837 message,
838 ) {
839 inputs.push(input);
840 }
841 }
842 }
843 tracing::debug!(
844 packets = packets.len(),
845 packet_driven = true,
846 semantic_inputs = inputs.len(),
847 "built semantic inputs from in-memory ConversationPacket batch"
848 );
849 Ok(inputs)
850}
851
852fn fetch_canonical_embedding_batch(
853 storage: &FrankenStorage,
854 after_conversation_id: i64,
855 max_conversations: usize,
856) -> Result<CanonicalEmbeddingBatch> {
857 let total_conversations = total_semantic_conversations(storage)?;
858 let max_conversations_i64 = i64::try_from(max_conversations.max(1)).unwrap_or(i64::MAX);
859 let conversation_ids: Vec<i64> = storage
860 .raw()
861 .query_map_collect(
862 "SELECT DISTINCT m.conversation_id
863 FROM messages m
864 JOIN conversations c ON c.id = m.conversation_id
865 WHERE m.conversation_id > ?1
866 ORDER BY m.conversation_id ASC
867 LIMIT ?2",
868 &[
869 ParamValue::from(after_conversation_id),
870 ParamValue::from(max_conversations_i64),
871 ],
872 |row| row.get_typed(0),
873 )
874 .with_context(|| {
875 format!("fetching semantic backfill conversation ids after {after_conversation_id}")
876 })?;
877
878 if conversation_ids.is_empty() {
879 return Ok(CanonicalEmbeddingBatch {
880 inputs: Vec::new(),
881 conversations_in_batch: 0,
882 last_conversation_id: after_conversation_id,
883 total_conversations,
884 });
885 }
886
887 let conversations = fetch_canonical_embedding_conversations(storage, &conversation_ids)?;
888
889 let mut grouped_messages =
890 storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
891 let mut inputs = Vec::new();
892 for conversation in &conversations {
893 let messages = grouped_messages
894 .remove(&conversation.conversation_id)
895 .unwrap_or_default();
896 let provenance = canonical_embedding_packet_provenance(conversation);
897 let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
898 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
899 inputs.extend(embedding_inputs_from_conversation_packet(
900 conversation,
901 &packet,
902 ));
903 }
904
905 let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
906 tracing::debug!(
907 conversations_in_batch,
908 packet_driven = true,
909 semantic_inputs = inputs.len(),
910 "built semantic backfill batch from ConversationPacket canonical replay"
911 );
912
913 Ok(CanonicalEmbeddingBatch {
914 inputs,
915 conversations_in_batch,
916 last_conversation_id: *conversation_ids.last().unwrap_or(&after_conversation_id),
917 total_conversations,
918 })
919}
920
921pub(crate) fn packet_embedding_inputs_from_storage(
922 storage: &FrankenStorage,
923) -> Result<Vec<EmbeddingInput>> {
924 Ok(fetch_canonical_embedding_batch(storage, 0, usize::MAX)?.inputs)
925}
926
927fn packet_embedding_inputs_from_selected_canonical_messages<F>(
928 storage: &FrankenStorage,
929 conversation_ids: &[i64],
930 mut include_message: F,
931) -> Result<(Vec<EmbeddingInput>, Option<i64>)>
932where
933 F: FnMut(&Message) -> bool,
934{
935 if conversation_ids.is_empty() {
936 return Ok((Vec::new(), None));
937 }
938
939 let conversations = fetch_canonical_embedding_conversations(storage, conversation_ids)?;
940 let mut grouped_messages =
941 storage.fetch_messages_for_lexical_rebuild_batch(conversation_ids, None, None)?;
942 let mut inputs = Vec::new();
943 let mut raw_max_message_id: Option<i64> = None;
944
945 for conversation in &conversations {
946 let mut messages = grouped_messages
947 .remove(&conversation.conversation_id)
948 .unwrap_or_default();
949 messages.retain(|message| {
950 let keep = include_message(message);
951 if keep && let Some(message_id) = message.id {
952 raw_max_message_id =
953 Some(raw_max_message_id.map_or(message_id, |current| current.max(message_id)));
954 }
955 keep
956 });
957 if messages.is_empty() {
958 continue;
959 }
960
961 let provenance = canonical_embedding_packet_provenance(conversation);
962 let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
963 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
964 inputs.extend(embedding_inputs_from_conversation_packet(
965 conversation,
966 &packet,
967 ));
968 }
969
970 Ok((inputs, raw_max_message_id))
971}
972
973pub(crate) fn packet_embedding_inputs_from_storage_since(
974 storage: &FrankenStorage,
975 since_message_id: i64,
976) -> Result<CanonicalIncrementalEmbeddingBatch> {
977 let conversation_ids: Vec<i64> = storage
978 .raw()
979 .query_map_collect(
980 "SELECT DISTINCT m.conversation_id
981 FROM messages m
982 WHERE m.id > ?1
983 ORDER BY m.conversation_id ASC",
984 &[ParamValue::from(since_message_id)],
985 |row| row.get_typed(0),
986 )
987 .with_context(|| {
988 format!(
989 "fetching canonical semantic catch-up conversation ids after message {since_message_id}"
990 )
991 })?;
992
993 if conversation_ids.is_empty() {
994 return Ok(CanonicalIncrementalEmbeddingBatch {
995 inputs: Vec::new(),
996 conversations_in_batch: 0,
997 raw_max_message_id: None,
998 });
999 }
1000
1001 let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1002 storage,
1003 &conversation_ids,
1004 |message| message.id.is_some_and(|id| id > since_message_id),
1005 )?;
1006
1007 let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
1008 tracing::debug!(
1009 since_message_id,
1010 conversations_in_batch,
1011 packet_driven = true,
1012 semantic_inputs = inputs.len(),
1013 "built semantic catch-up batch from ConversationPacket canonical replay"
1014 );
1015
1016 Ok(CanonicalIncrementalEmbeddingBatch {
1017 inputs,
1018 conversations_in_batch,
1019 raw_max_message_id,
1020 })
1021}
1022
1023pub(crate) fn packet_embedding_inputs_from_storage_for_message_ids(
1024 storage: &FrankenStorage,
1025 conversation_ids: &[i64],
1026 message_ids: &HashSet<i64>,
1027) -> Result<Vec<EmbeddingInput>> {
1028 if conversation_ids.is_empty() || message_ids.is_empty() {
1029 return Ok(Vec::new());
1030 }
1031
1032 let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1033 storage,
1034 conversation_ids,
1035 |message| message.id.is_some_and(|id| message_ids.contains(&id)),
1036 )?;
1037 tracing::debug!(
1038 conversations_in_batch = conversation_ids.len(),
1039 selected_message_ids = message_ids.len(),
1040 semantic_inputs = inputs.len(),
1041 raw_max_message_id,
1042 packet_driven = true,
1043 "built selected semantic batch from ConversationPacket canonical replay"
1044 );
1045
1046 Ok(inputs)
1047}
1048
1049struct Prepared<'a> {
1050 msg: &'a EmbeddingInput,
1051 canonical: String,
1052 hash: [u8; 32],
1053}
1054
1055#[derive(Debug, Clone, PartialEq, Eq)]
1056struct MemoizedPreparedMessage {
1057 canonical: String,
1058 hash: [u8; 32],
1059}
1060
1061fn semantic_prep_memo_key(content: &str) -> MemoKey {
1062 MemoKey::new(
1063 MemoContentHash::from_bytes(content_hash(content).to_vec()),
1064 SEMANTIC_PREP_MEMO_ALGORITHM,
1065 SEMANTIC_PREP_MEMO_VERSION,
1066 )
1067}
1068
1069fn memo_counter_delta(after: u64, before: u64) -> u64 {
1070 after.saturating_sub(before)
1071}
1072
1073fn trace_semantic_prep_memo_window(
1074 window_index: usize,
1075 window_len: usize,
1076 prepared_len: usize,
1077 entry_capacity: usize,
1078 before: &crate::indexer::memoization::MemoCacheStats,
1079 after: &crate::indexer::memoization::MemoCacheStats,
1080) {
1081 tracing::trace!(
1082 algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1083 algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1084 window_index,
1085 window_len,
1086 prepared_messages = prepared_len,
1087 skipped_messages = window_len.saturating_sub(prepared_len),
1088 hit_delta = memo_counter_delta(after.hits, before.hits),
1089 miss_delta = memo_counter_delta(after.misses, before.misses),
1090 insert_delta = memo_counter_delta(after.inserts, before.inserts),
1091 evictions_capacity_delta =
1092 memo_counter_delta(after.evictions_capacity, before.evictions_capacity),
1093 quarantined_delta = memo_counter_delta(after.quarantined, before.quarantined),
1094 live_entries = after.live_entries,
1095 entry_capacity,
1096 "semantic prep memo cache window"
1097 );
1098}
1099
1100fn trace_semantic_prep_memo_audit(audit: &MemoCacheAuditRecord) {
1101 tracing::trace!(?audit, "semantic prep memo cache audit");
1102}
1103
1104fn prepare_window_with_memo<'a>(
1105 window: &'a [EmbeddingInput],
1106 cache: &mut ContentAddressedMemoCache<MemoizedPreparedMessage>,
1107) -> Vec<Prepared<'a>> {
1108 window
1109 .iter()
1110 .filter_map(|msg| {
1111 let key = semantic_prep_memo_key(&msg.content);
1112 let (lookup, lookup_audit) = cache.get_with_audit(&key);
1113 trace_semantic_prep_memo_audit(&lookup_audit);
1114 match lookup {
1115 MemoLookup::Hit { value } => Some(Prepared {
1116 msg,
1117 canonical: value.canonical,
1118 hash: value.hash,
1119 }),
1120 MemoLookup::Miss | MemoLookup::Quarantined { .. } => {
1121 let canonical = canonicalize_for_embedding(&msg.content);
1122 if canonical.is_empty() {
1123 return None;
1124 }
1125 let hash = content_hash(&canonical);
1126 let insert_audit = cache.insert_with_audit(
1127 key,
1128 MemoizedPreparedMessage {
1129 canonical: canonical.clone(),
1130 hash,
1131 },
1132 );
1133 trace_semantic_prep_memo_audit(&insert_audit);
1134 Some(Prepared {
1135 msg,
1136 canonical,
1137 hash,
1138 })
1139 }
1140 }
1141 })
1142 .collect()
1143}
1144
1145fn prepare_window<'a>(window: &'a [EmbeddingInput], serial: bool) -> Vec<Prepared<'a>> {
1152 let prep = |msg: &'a EmbeddingInput| -> Option<Prepared<'a>> {
1153 let canonical = canonicalize_for_embedding(&msg.content);
1154 if canonical.is_empty() {
1155 return None;
1156 }
1157 let hash = content_hash(&canonical);
1158 Some(Prepared {
1159 msg,
1160 canonical,
1161 hash,
1162 })
1163 };
1164
1165 if serial {
1166 window.iter().filter_map(prep).collect()
1167 } else {
1168 window.par_iter().filter_map(prep).collect()
1169 }
1170}
1171
1172fn flush_prepared_batch(
1173 batch: &[Prepared<'_>],
1174 embeddings: &mut Vec<EmbeddedMessage>,
1175 pb: &ProgressBar,
1176 embedder: &dyn Embedder,
1177) -> Result<()> {
1178 if batch.is_empty() {
1179 return Ok(());
1180 }
1181
1182 let texts: Vec<&str> = batch.iter().map(|p| p.canonical.as_str()).collect();
1183 let vectors = embedder
1184 .embed_batch_sync(&texts)
1185 .map_err(|e| anyhow::anyhow!("embedding failed: {e}"))?;
1186
1187 if vectors.len() != batch.len() {
1188 bail!(
1189 "embedder returned {} embeddings for {} inputs",
1190 vectors.len(),
1191 batch.len()
1192 );
1193 }
1194
1195 for (prepared, vector) in batch.iter().zip(vectors) {
1196 if vector.len() != embedder.dimension() {
1197 bail!(
1198 "embedding dimension mismatch: expected {}, got {}",
1199 embedder.dimension(),
1200 vector.len()
1201 );
1202 }
1203 embeddings.push(EmbeddedMessage {
1204 message_id: prepared.msg.message_id,
1205 created_at_ms: prepared.msg.created_at_ms,
1206 agent_id: prepared.msg.agent_id,
1207 workspace_id: prepared.msg.workspace_id,
1208 source_id: prepared.msg.source_id,
1209 role: prepared.msg.role,
1210 chunk_idx: prepared.msg.chunk_idx,
1211 content_hash: prepared.hash,
1212 embedding: vector,
1213 });
1214 }
1215
1216 pb.inc(saturating_u64_from_usize(batch.len()));
1217 Ok(())
1218}
1219
1220pub struct SemanticIndexer {
1221 embedder: Box<dyn Embedder>,
1222 batch_size: usize,
1223}
1224
1225impl SemanticIndexer {
1226 pub fn new(embedder_type: &str, data_dir: Option<&Path>) -> Result<Self> {
1227 let embedder: Box<dyn Embedder> = match embedder_type {
1228 "fastembed" | "minilm" | "snowflake-arctic-s" | "nomic-embed" => {
1229 let dir = data_dir
1230 .ok_or_else(|| anyhow::anyhow!("data_dir required for fastembed embedder"))?;
1231 let embedder_name = if embedder_type == "fastembed" {
1232 "minilm"
1233 } else {
1234 embedder_type
1235 };
1236 Box::new(
1237 FastEmbedder::load_by_name(dir, embedder_name)
1238 .map_err(|e| anyhow::anyhow!("fastembed unavailable: {e}"))?,
1239 )
1240 }
1241 "hash" => Box::new(HashEmbedder::default()),
1242 other => bail!("unknown embedder: {other}"),
1243 };
1244
1245 Ok(Self {
1246 embedder,
1247 batch_size: resolved_default_batch_size(),
1248 })
1249 }
1250
1251 pub fn with_batch_size(mut self, batch_size: usize) -> Result<Self> {
1252 if batch_size == 0 {
1253 bail!("batch_size must be > 0");
1254 }
1255 self.batch_size = batch_size;
1256 Ok(self)
1257 }
1258
1259 pub fn batch_size(&self) -> usize {
1260 self.batch_size
1261 }
1262
1263 pub fn embedder_id(&self) -> &str {
1264 self.embedder.id()
1265 }
1266
1267 pub fn embedder_dimension(&self) -> usize {
1268 self.embedder.dimension()
1269 }
1270
1271 pub fn embed_messages(&self, messages: &[EmbeddingInput]) -> Result<Vec<EmbeddedMessage>> {
1272 if messages.is_empty() {
1273 return Ok(Vec::new());
1274 }
1275
1276 let show_progress = std::io::stderr().is_terminal();
1277 let pb = ProgressBar::new(saturating_u64_from_usize(messages.len()));
1278 if show_progress {
1279 let style = ProgressStyle::default_bar()
1280 .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} messages embedded")
1281 .unwrap_or_else(|_| ProgressStyle::default_bar());
1282 pb.set_style(style);
1283 } else {
1284 pb.set_draw_target(ProgressDrawTarget::hidden());
1285 }
1286
1287 let mut embeddings = Vec::with_capacity(messages.len());
1288
1289 let window = self.batch_size.saturating_mul(4);
1297 let serial_prep = !parallel_prep_enabled();
1298 let prep_memo_capacity = resolved_semantic_prep_memo_capacity();
1299 let mut prep_memo =
1300 serial_prep.then(|| ContentAddressedMemoCache::with_capacity(prep_memo_capacity));
1301 for (window_index, window_slice) in messages.chunks(window).enumerate() {
1302 let prepared_window = match prep_memo.as_mut() {
1303 Some(cache) => {
1304 let stats_before = cache.stats().clone();
1305 let prepared_window = prepare_window_with_memo(window_slice, cache);
1306 trace_semantic_prep_memo_window(
1307 window_index,
1308 window_slice.len(),
1309 prepared_window.len(),
1310 prep_memo_capacity,
1311 &stats_before,
1312 cache.stats(),
1313 );
1314 prepared_window
1315 }
1316 None => prepare_window(window_slice, false),
1317 };
1318 let skipped_in_window = window_slice.len() - prepared_window.len();
1319 if skipped_in_window > 0 {
1320 pb.inc(saturating_u64_from_usize(skipped_in_window));
1321 }
1322
1323 for batch in prepared_window.chunks(self.batch_size) {
1324 flush_prepared_batch(batch, &mut embeddings, &pb, self.embedder.as_ref())?;
1325 }
1326 }
1327
1328 if let Some(cache) = prep_memo.as_ref() {
1329 let stats = cache.stats();
1330 tracing::debug!(
1331 algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1332 algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1333 hits = stats.hits,
1334 misses = stats.misses,
1335 inserts = stats.inserts,
1336 quarantined = stats.quarantined,
1337 live_entries = stats.live_entries,
1338 entry_capacity = prep_memo_capacity,
1339 "semantic prep memo cache summary"
1340 );
1341 }
1342
1343 pb.finish_with_message("Embedding complete");
1344 Ok(embeddings)
1345 }
1346
1347 pub fn build_and_save_index<I>(
1348 &self,
1349 embedded_messages: I,
1350 data_dir: &Path,
1351 ) -> Result<FsVectorIndex>
1352 where
1353 I: IntoIterator<Item = EmbeddedMessage>,
1354 {
1355 let index_path = vector_index_path(data_dir, self.embedder_id());
1356 self.build_and_save_index_at_path(embedded_messages, &index_path)
1357 }
1358
1359 pub fn build_and_save_index_shards<I>(
1360 &self,
1361 embedded_messages: I,
1362 data_dir: &Path,
1363 plan: SemanticShardBuildPlan,
1364 ) -> Result<SemanticShardBuildOutcome>
1365 where
1366 I: IntoIterator<Item = EmbeddedMessage>,
1367 {
1368 if plan.db_fingerprint.trim().is_empty() {
1369 bail!("semantic shard build requires a non-empty DB fingerprint");
1370 }
1371 if plan.max_records_per_shard == 0 {
1372 bail!("semantic shard build requires max_records_per_shard > 0");
1373 }
1374
1375 let mut shard_records = Vec::new();
1376 let mut index_paths = Vec::new();
1377 let mut ann_index_paths = Vec::new();
1378 let mut current_records = Vec::with_capacity(plan.max_records_per_shard);
1379 let mut shard_index = 0u32;
1380 let mut total_docs = 0u64;
1381
1382 for embedded in embedded_messages {
1383 current_records.push(embedded);
1384 if current_records.len() >= plan.max_records_per_shard {
1385 let records = std::mem::take(&mut current_records);
1386 let (record, path, ann_path) =
1387 self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1388 total_docs = total_docs.saturating_add(record.doc_count);
1389 shard_records.push(record);
1390 index_paths.push(path);
1391 if let Some(path) = ann_path {
1392 ann_index_paths.push(path);
1393 }
1394 shard_index = shard_index
1395 .checked_add(1)
1396 .context("semantic shard index overflow")?;
1397 }
1398 }
1399
1400 if !current_records.is_empty() {
1401 let records = std::mem::take(&mut current_records);
1402 let (record, path, ann_path) =
1403 self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1404 total_docs = total_docs.saturating_add(record.doc_count);
1405 shard_records.push(record);
1406 index_paths.push(path);
1407 if let Some(path) = ann_path {
1408 ann_index_paths.push(path);
1409 }
1410 }
1411
1412 let shard_count = u32::try_from(shard_records.len())
1413 .context("semantic shard generation exceeded u32 shard count")?;
1414 for record in &mut shard_records {
1415 record.shard_count = shard_count;
1416 }
1417
1418 let mut shard_manifest = SemanticShardManifest::load_or_default(data_dir)
1419 .map_err(|err| anyhow::anyhow!("loading semantic shard manifest for publish: {err}"))?;
1420 shard_manifest.replace_shards_for_generation(
1421 plan.tier,
1422 self.embedder_id(),
1423 &plan.db_fingerprint,
1424 shard_records,
1425 );
1426 shard_manifest
1427 .save(data_dir)
1428 .map_err(|err| anyhow::anyhow!("saving semantic shard manifest: {err}"))?;
1429 let summary = shard_manifest.summary(plan.tier, self.embedder_id(), &plan.db_fingerprint);
1430
1431 tracing::info!(
1432 tier = plan.tier.as_str(),
1433 embedder = self.embedder_id(),
1434 shard_count,
1435 doc_count = total_docs,
1436 total_conversations = plan.total_conversations,
1437 "published semantic shard generation sidecar"
1438 );
1439
1440 Ok(SemanticShardBuildOutcome {
1441 tier: plan.tier,
1442 embedder_id: self.embedder_id().to_string(),
1443 shard_count,
1444 doc_count: total_docs,
1445 total_conversations: plan.total_conversations,
1446 index_paths,
1447 ann_index_paths,
1448 shard_manifest_path: SemanticShardManifest::path(data_dir),
1449 complete: summary.complete,
1450 })
1451 }
1452
1453 fn write_semantic_shard(
1454 &self,
1455 embedded_messages: Vec<EmbeddedMessage>,
1456 data_dir: &Path,
1457 plan: &SemanticShardBuildPlan,
1458 shard_index: u32,
1459 ) -> Result<(SemanticShardRecord, PathBuf, Option<PathBuf>)> {
1460 let started_at_ms = now_ms();
1461 let shard_path = semantic_shard_index_path(
1462 data_dir,
1463 plan.tier,
1464 self.embedder_id(),
1465 &plan.db_fingerprint,
1466 shard_index,
1467 );
1468 let shard_index_file = self.build_and_save_index_at_path(embedded_messages, &shard_path)?;
1469 let size_bytes = fs::metadata(&shard_path)
1470 .with_context(|| format!("stat semantic shard {}", shard_path.display()))?
1471 .len();
1472 let (ann_index_path, ann_size_bytes, ann_ready, ann_absolute_path) = if plan.build_ann {
1473 let ann_path = semantic_shard_ann_index_path(
1474 data_dir,
1475 plan.tier,
1476 self.embedder_id(),
1477 &plan.db_fingerprint,
1478 shard_index,
1479 );
1480 let config = FsHnswConfig {
1481 m: FS_HNSW_DEFAULT_M,
1482 ef_construction: FS_HNSW_DEFAULT_EF_CONSTRUCTION,
1483 ..FsHnswConfig::default()
1484 };
1485 let hnsw = FsHnswIndex::build_from_vector_index(&shard_index_file, config)
1486 .map_err(|err| anyhow::anyhow!("build shard HNSW index failed: {err}"))?;
1487 hnsw.save(&ann_path)
1488 .map_err(|err| anyhow::anyhow!("save shard HNSW index failed: {err}"))?;
1489 let ann_size_bytes = fs::metadata(&ann_path)
1490 .with_context(|| format!("stat semantic shard ANN {}", ann_path.display()))?
1491 .len();
1492 let relative_ann_path = ann_path
1493 .strip_prefix(data_dir)
1494 .unwrap_or(ann_path.as_path())
1495 .to_string_lossy()
1496 .to_string();
1497 (
1498 Some(relative_ann_path),
1499 ann_size_bytes,
1500 true,
1501 Some(ann_path),
1502 )
1503 } else {
1504 (None, 0, false, None)
1505 };
1506 let relative_index_path = shard_path
1507 .strip_prefix(data_dir)
1508 .unwrap_or(shard_path.as_path())
1509 .to_string_lossy()
1510 .to_string();
1511 let record = SemanticShardRecord {
1512 tier: plan.tier,
1513 embedder_id: self.embedder_id().to_string(),
1514 model_revision: plan.model_revision.clone(),
1515 schema_version: SEMANTIC_SCHEMA_VERSION,
1516 chunking_version: CHUNKING_STRATEGY_VERSION,
1517 dimension: self.embedder_dimension(),
1518 shard_index,
1519 shard_count: 0,
1520 doc_count: u64::try_from(shard_index_file.record_count()).unwrap_or(u64::MAX),
1521 total_conversations: plan.total_conversations,
1522 db_fingerprint: plan.db_fingerprint.clone(),
1523 index_path: relative_index_path,
1524 quantization: "f16".to_string(),
1525 mmap_ready: true,
1526 ann_index_path,
1527 ann_size_bytes,
1528 ann_ready,
1529 size_bytes,
1530 started_at_ms,
1531 completed_at_ms: now_ms(),
1532 ready: true,
1533 };
1534 Ok((record, shard_path, ann_absolute_path))
1535 }
1536
1537 fn build_and_save_index_at_path<I>(
1538 &self,
1539 embedded_messages: I,
1540 index_path: &Path,
1541 ) -> Result<FsVectorIndex>
1542 where
1543 I: IntoIterator<Item = EmbeddedMessage>,
1544 {
1545 if let Some(parent) = index_path.parent() {
1546 std::fs::create_dir_all(parent)?;
1547 }
1548
1549 let mut writer: FsVectorIndexWriter = FsVectorIndex::create_with_revision(
1551 index_path,
1552 self.embedder_id(),
1553 "1.0",
1554 self.embedder_dimension(),
1555 FsQuantization::F16,
1556 )
1557 .map_err(|err| anyhow::anyhow!("create fsvi index failed: {err}"))?;
1558
1559 let write_result: Result<()> = (|| {
1560 for embedded in embedded_messages {
1561 if embedded.embedding.len() != self.embedder_dimension() {
1562 bail!(
1563 "embedding dimension mismatch: expected {}, got {}",
1564 self.embedder_dimension(),
1565 embedded.embedding.len()
1566 );
1567 }
1568 let doc_id = semantic_doc_id_for_embedded(&embedded);
1569 writer
1570 .write_record(&doc_id, &embedded.embedding)
1571 .map_err(|err| anyhow::anyhow!("write fsvi record failed: {err}"))?;
1572 }
1573 Ok(())
1574 })();
1575
1576 if let Err(e) = &write_result {
1577 tracing::warn!("removing partial vector index after write failure: {e}");
1579 if let Err(rm_err) = std::fs::remove_file(index_path) {
1580 tracing::error!(
1581 "failed to remove partial index file {}: {rm_err}",
1582 index_path.display()
1583 );
1584 }
1585 return Err(anyhow::anyhow!("{e}"));
1586 }
1587
1588 writer
1589 .finish()
1590 .map_err(|err| anyhow::anyhow!("finish fsvi index failed: {err}"))?;
1591
1592 FsVectorIndex::open(index_path)
1593 .map_err(|err| anyhow::anyhow!("open fsvi index failed: {err}"))
1594 }
1595
1596 pub fn append_to_index(
1604 &self,
1605 embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1606 data_dir: &Path,
1607 ) -> Result<usize> {
1608 let index_path = vector_index_path(data_dir, self.embedder_id());
1609 self.append_to_index_path(embedded_messages, &index_path)
1610 }
1611
1612 fn append_to_index_path(
1613 &self,
1614 embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1615 index_path: &Path,
1616 ) -> Result<usize> {
1617 let mut index = FsVectorIndex::open(index_path)
1618 .map_err(|err| anyhow::anyhow!("open fsvi index for append: {err}"))?;
1619
1620 let entries: Vec<(String, Vec<f32>)> = embedded_messages
1621 .into_iter()
1622 .map(|em| {
1623 let doc_id = semantic_doc_id_for_embedded(&em);
1624 (doc_id, em.embedding)
1625 })
1626 .collect();
1627
1628 let count = entries.len();
1629 if count == 0 {
1630 return Ok(0);
1631 }
1632
1633 index
1634 .append_batch(&entries)
1635 .map_err(|err| anyhow::anyhow!("append_batch: {err}"))?;
1636
1637 if index.needs_compaction() {
1638 index
1639 .compact()
1640 .map_err(|err| anyhow::anyhow!("compaction: {err}"))?;
1641 }
1642
1643 Ok(count)
1644 }
1645
1646 fn write_backfill_staging_index(
1647 &self,
1648 embedded_messages: Vec<EmbeddedMessage>,
1649 staging_path: &Path,
1650 resume_existing: bool,
1651 ) -> Result<FsVectorIndex> {
1652 if resume_existing && staging_path.exists() {
1653 self.append_to_index_path(embedded_messages, staging_path)?;
1654 FsVectorIndex::open(staging_path)
1655 .map_err(|err| anyhow::anyhow!("open staged semantic index failed: {err}"))
1656 } else {
1657 self.build_and_save_index_at_path(embedded_messages, staging_path)
1658 }
1659 }
1660
1661 pub fn run_backfill_batch(
1662 &self,
1663 messages: &[EmbeddingInput],
1664 data_dir: &Path,
1665 manifest: &mut SemanticManifest,
1666 plan: SemanticBackfillBatchPlan,
1667 ) -> Result<SemanticBackfillBatchOutcome> {
1668 if plan.db_fingerprint.trim().is_empty() {
1669 bail!("semantic backfill requires a non-empty DB fingerprint");
1670 }
1671 if plan.total_conversations == 0 && plan.conversations_in_batch > 0 {
1672 bail!("semantic backfill batch cannot process conversations when total is zero");
1673 }
1674
1675 let manifest_path = SemanticManifest::path(data_dir);
1676 let staging_path = semantic_staging_index_path(
1677 data_dir,
1678 plan.tier,
1679 self.embedder_id(),
1680 &plan.db_fingerprint,
1681 );
1682 let final_path = vector_index_path(data_dir, self.embedder_id());
1683
1684 let prior_checkpoint = manifest
1685 .checkpoint
1686 .as_ref()
1687 .filter(|checkpoint| {
1688 checkpoint.tier == plan.tier
1689 && checkpoint.embedder_id == self.embedder_id()
1690 && checkpoint.is_valid(&plan.db_fingerprint)
1691 })
1692 .cloned();
1693 let prior_conversations = prior_checkpoint
1694 .as_ref()
1695 .map_or(0, |checkpoint| checkpoint.conversations_processed);
1696 let prior_docs = prior_checkpoint
1697 .as_ref()
1698 .map_or(0, |checkpoint| checkpoint.docs_embedded);
1699
1700 let embeddings = self.embed_messages(messages)?;
1701 let embedded_docs = u64::try_from(embeddings.len()).unwrap_or(u64::MAX);
1702 let mut staged_index = self.write_backfill_staging_index(
1703 embeddings,
1704 &staging_path,
1705 prior_checkpoint.is_some(),
1706 )?;
1707 let conversations_processed = prior_conversations
1708 .saturating_add(plan.conversations_in_batch)
1709 .min(plan.total_conversations);
1710 let complete = conversations_processed >= plan.total_conversations;
1711
1712 manifest.refresh_backlog(plan.total_conversations, &plan.db_fingerprint);
1713
1714 if complete {
1715 let db_fingerprint = plan.db_fingerprint.clone();
1716 if staged_index.wal_record_count() > 0 {
1717 staged_index.compact().map_err(|err| {
1718 anyhow::anyhow!("compact staged semantic index failed: {err}")
1719 })?;
1720 }
1721 drop(staged_index);
1722 fs::rename(&staging_path, &final_path).with_context(|| {
1723 format!(
1724 "publishing staged semantic index {} to {}",
1725 staging_path.display(),
1726 final_path.display()
1727 )
1728 })?;
1729 sync_parent_directory(&final_path)?;
1730 let published_index = FsVectorIndex::open(&final_path)
1731 .map_err(|err| anyhow::anyhow!("open published semantic index failed: {err}"))?;
1732 let size_bytes = fs::metadata(&final_path)
1733 .with_context(|| format!("stat published semantic index {}", final_path.display()))?
1734 .len();
1735 let relative_index_path = final_path
1736 .strip_prefix(data_dir)
1737 .unwrap_or(final_path.as_path())
1738 .to_string_lossy()
1739 .to_string();
1740 manifest.publish_artifact(ArtifactRecord {
1741 tier: plan.tier,
1742 embedder_id: self.embedder_id().to_string(),
1743 model_revision: plan.model_revision,
1744 schema_version: SEMANTIC_SCHEMA_VERSION,
1745 chunking_version: CHUNKING_STRATEGY_VERSION,
1746 dimension: self.embedder_dimension(),
1747 doc_count: u64::try_from(published_index.record_count()).unwrap_or(u64::MAX),
1748 conversation_count: conversations_processed,
1749 db_fingerprint: plan.db_fingerprint,
1750 index_path: relative_index_path,
1751 size_bytes,
1752 started_at_ms: prior_checkpoint
1753 .as_ref()
1754 .map_or_else(now_ms, |checkpoint| checkpoint.saved_at_ms),
1755 completed_at_ms: now_ms(),
1756 ready: true,
1757 });
1758 manifest.refresh_backlog(plan.total_conversations, &db_fingerprint);
1759 manifest.save(data_dir)?;
1760 } else {
1761 let docs_embedded_on_disk =
1762 u64::try_from(staged_index.record_count()).unwrap_or(u64::MAX);
1763 let checkpoint_docs = prior_docs
1764 .saturating_add(embedded_docs)
1765 .max(docs_embedded_on_disk);
1766 manifest.save_checkpoint(BuildCheckpoint {
1767 tier: plan.tier,
1768 embedder_id: self.embedder_id().to_string(),
1769 last_offset: plan.last_offset,
1770 docs_embedded: checkpoint_docs,
1771 conversations_processed,
1772 total_conversations: plan.total_conversations,
1773 db_fingerprint: plan.db_fingerprint,
1774 schema_version: SEMANTIC_SCHEMA_VERSION,
1775 chunking_version: CHUNKING_STRATEGY_VERSION,
1776 saved_at_ms: now_ms(),
1777 });
1778 manifest.save(data_dir)?;
1779 }
1780
1781 Ok(SemanticBackfillBatchOutcome {
1782 tier: plan.tier,
1783 embedder_id: self.embedder_id().to_string(),
1784 embedded_docs,
1785 conversations_processed,
1786 total_conversations: plan.total_conversations,
1787 last_offset: plan.last_offset,
1788 checkpoint_saved: !complete,
1789 published: complete,
1790 index_path: if complete { final_path } else { staging_path },
1791 manifest_path,
1792 })
1793 }
1794
1795 pub fn run_backfill_from_storage(
1796 &self,
1797 storage: &FrankenStorage,
1798 data_dir: &Path,
1799 manifest: &mut SemanticManifest,
1800 plan: SemanticBackfillStoragePlan,
1801 ) -> Result<SemanticBackfillBatchOutcome> {
1802 let after_conversation_id = matching_semantic_checkpoint_offset(
1803 manifest,
1804 plan.tier,
1805 self.embedder_id(),
1806 &plan.db_fingerprint,
1807 );
1808 let batch = fetch_canonical_embedding_batch(
1809 storage,
1810 after_conversation_id,
1811 plan.max_conversations,
1812 )?;
1813 self.run_backfill_batch(
1814 &batch.inputs,
1815 data_dir,
1816 manifest,
1817 SemanticBackfillBatchPlan {
1818 tier: plan.tier,
1819 db_fingerprint: plan.db_fingerprint,
1820 model_revision: plan.model_revision,
1821 total_conversations: batch.total_conversations,
1822 conversations_in_batch: batch.conversations_in_batch,
1823 last_offset: batch.last_conversation_id,
1824 },
1825 )
1826 }
1827
1828 pub fn build_hnsw_index(
1842 &self,
1843 vector_index: &FsVectorIndex,
1844 data_dir: &Path,
1845 m: Option<usize>,
1846 ef_construction: Option<usize>,
1847 ) -> Result<PathBuf> {
1848 let m = m.unwrap_or(FS_HNSW_DEFAULT_M);
1849 let ef_construction = ef_construction.unwrap_or(FS_HNSW_DEFAULT_EF_CONSTRUCTION);
1850
1851 tracing::info!(
1852 embedder = self.embedder_id(),
1853 count = vector_index.record_count(),
1854 m,
1855 ef_construction,
1856 "Building HNSW index for approximate nearest neighbor search"
1857 );
1858
1859 let config = FsHnswConfig {
1860 m,
1861 ef_construction,
1862 ..FsHnswConfig::default()
1863 };
1864 let hnsw = FsHnswIndex::build_from_vector_index(vector_index, config)
1865 .map_err(|err| anyhow::anyhow!("build HNSW index failed: {err}"))?;
1866
1867 let hnsw_path = hnsw_index_path(data_dir, self.embedder_id());
1868 if let Some(parent) = hnsw_path.parent() {
1869 std::fs::create_dir_all(parent)?;
1870 }
1871 hnsw.save(&hnsw_path)
1872 .map_err(|err| anyhow::anyhow!("save HNSW index failed: {err}"))?;
1873
1874 tracing::info!(?hnsw_path, "Saved HNSW index");
1875 Ok(hnsw_path)
1876 }
1877}
1878
1879#[cfg(test)]
1880mod tests {
1881 use super::*;
1882 use crate::model::types::{Agent, AgentKind, Conversation, Message, MessageRole};
1883 use crate::storage::sqlite::FrankenStorage;
1884 use serde_json::json;
1885 use std::path::Path;
1886 use tempfile::tempdir;
1887
1888 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1889 struct ComparableSemanticInput {
1890 message_id: u64,
1891 created_at_ms: i64,
1892 agent_id: u32,
1893 workspace_id: u32,
1894 source_id: u32,
1895 role: u8,
1896 content: String,
1897 }
1898
1899 fn comparable_semantic_inputs(mut inputs: Vec<EmbeddingInput>) -> Vec<ComparableSemanticInput> {
1900 let mut comparable: Vec<ComparableSemanticInput> = inputs
1901 .drain(..)
1902 .map(|input| ComparableSemanticInput {
1903 message_id: input.message_id,
1904 created_at_ms: input.created_at_ms,
1905 agent_id: input.agent_id,
1906 workspace_id: input.workspace_id,
1907 source_id: input.source_id,
1908 role: input.role,
1909 content: input.content,
1910 })
1911 .collect();
1912 comparable.sort();
1913 comparable
1914 }
1915
1916 fn test_conversation(external_id: &str, body: &str) -> Conversation {
1917 test_conversation_fixture(
1918 external_id,
1919 vec![Message {
1920 id: None,
1921 idx: 0,
1922 role: MessageRole::User,
1923 author: None,
1924 created_at: Some(1_700_000_000_500),
1925 content: body.to_string(),
1926 extra_json: json!({}),
1927 snippets: Vec::new(),
1928 }],
1929 "local",
1930 None,
1931 )
1932 }
1933
1934 fn test_conversation_with_messages(external_id: &str, messages: Vec<Message>) -> Conversation {
1935 test_conversation_fixture(external_id, messages, "remote-laptop", Some("builder-host"))
1936 }
1937
1938 fn test_conversation_fixture(
1939 external_id: &str,
1940 messages: Vec<Message>,
1941 source_id: &str,
1942 origin_host: Option<&str>,
1943 ) -> Conversation {
1944 Conversation {
1945 id: None,
1946 agent_slug: "codex".to_string(),
1947 workspace: None,
1948 external_id: Some(external_id.to_string()),
1949 title: Some(format!("semantic {external_id}")),
1950 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
1951 started_at: Some(1_700_000_000_000),
1952 ended_at: Some(1_700_000_001_000),
1953 approx_tokens: None,
1954 metadata_json: json!({}),
1955 messages,
1956 source_id: source_id.to_string(),
1957 origin_host: origin_host.map(str::to_string),
1958 }
1959 }
1960
1961 fn default_scheduler_signals() -> SemanticBackfillSchedulerSignals {
1962 SemanticBackfillSchedulerSignals {
1963 foreground_pressure: false,
1964 lexical_repair_active: false,
1965 force: false,
1966 operator_disabled: false,
1967 }
1968 }
1969
1970 struct EnvVarGuard {
1971 key: &'static str,
1972 prior: Option<String>,
1973 }
1974
1975 impl EnvVarGuard {
1976 fn set(key: &'static str, value: &str) -> Self {
1977 let prior = std::env::var(key).ok();
1978 unsafe {
1980 std::env::set_var(key, value);
1981 }
1982 Self { key, prior }
1983 }
1984
1985 fn remove(key: &'static str) -> Self {
1986 let prior = std::env::var(key).ok();
1987 unsafe {
1989 std::env::remove_var(key);
1990 }
1991 Self { key, prior }
1992 }
1993 }
1994
1995 impl Drop for EnvVarGuard {
1996 fn drop(&mut self) {
1997 unsafe {
1999 match self.prior.as_deref() {
2000 Some(value) => std::env::set_var(self.key, value),
2001 None => std::env::remove_var(self.key),
2002 }
2003 }
2004 }
2005 }
2006
2007 #[test]
2008 fn semantic_backfill_scheduler_runs_and_scales_batch_under_idle_budget() {
2009 let policy = SemanticPolicy::compiled_defaults();
2010 let decision = semantic_backfill_scheduler_decision_for_capacity(
2011 &policy,
2012 64,
2013 &default_scheduler_signals(),
2014 80,
2015 );
2016
2017 assert!(decision.should_run());
2018 assert_eq!(decision.state, SemanticBackfillSchedulerState::Running);
2019 assert_eq!(
2020 decision.reason,
2021 SemanticBackfillSchedulerReason::IdleBudgetAvailable
2022 );
2023 assert_eq!(decision.scheduled_batch_conversations, 51);
2024 assert_eq!(decision.current_capacity_pct, 80);
2025 assert_eq!(decision.next_eligible_after_ms, 0);
2026 }
2027
2028 #[test]
2029 fn semantic_backfill_scheduler_reason_next_steps_are_stable() {
2030 for (reason, expected) in [
2031 (
2032 SemanticBackfillSchedulerReason::IdleBudgetAvailable,
2033 "background semantic backfill is within idle budgets",
2034 ),
2035 (
2036 SemanticBackfillSchedulerReason::OperatorDisabled,
2037 "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE",
2038 ),
2039 (
2040 SemanticBackfillSchedulerReason::PolicyDisabled,
2041 "semantic policy disables background semantic backfill",
2042 ),
2043 (
2044 SemanticBackfillSchedulerReason::ForegroundPressure,
2045 "foreground pressure is present; retry after the idle delay",
2046 ),
2047 (
2048 SemanticBackfillSchedulerReason::LexicalRepairActive,
2049 "lexical repair is active; semantic backfill is yielding",
2050 ),
2051 (
2052 SemanticBackfillSchedulerReason::CapacityBelowFloor,
2053 "machine responsiveness capacity is below the semantic backfill floor",
2054 ),
2055 (
2056 SemanticBackfillSchedulerReason::ThreadBudgetZero,
2057 "semantic backfill thread budget is zero",
2058 ),
2059 (
2060 SemanticBackfillSchedulerReason::BatchBudgetZero,
2061 "semantic backfill batch budget is zero",
2062 ),
2063 ] {
2064 assert_eq!(reason.next_step(), expected, "{reason:?}");
2065 }
2066 }
2067
2068 #[test]
2069 fn semantic_backfill_scheduler_yields_to_foreground_and_lexical_pressure() {
2070 let policy = SemanticPolicy::compiled_defaults();
2071 let foreground = SemanticBackfillSchedulerSignals {
2072 foreground_pressure: true,
2073 ..default_scheduler_signals()
2074 };
2075 let foreground_decision =
2076 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &foreground, 100);
2077 assert!(!foreground_decision.should_run());
2078 assert_eq!(
2079 foreground_decision.state,
2080 SemanticBackfillSchedulerState::Paused
2081 );
2082 assert_eq!(
2083 foreground_decision.reason,
2084 SemanticBackfillSchedulerReason::ForegroundPressure
2085 );
2086 assert_eq!(
2087 foreground_decision.next_eligible_after_ms,
2088 policy.idle_delay_seconds * 1000
2089 );
2090
2091 let lexical_repair = SemanticBackfillSchedulerSignals {
2092 lexical_repair_active: true,
2093 ..default_scheduler_signals()
2094 };
2095 let lexical_decision =
2096 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &lexical_repair, 100);
2097 assert!(!lexical_decision.should_run());
2098 assert_eq!(
2099 lexical_decision.state,
2100 SemanticBackfillSchedulerState::Paused
2101 );
2102 assert_eq!(
2103 lexical_decision.reason,
2104 SemanticBackfillSchedulerReason::LexicalRepairActive
2105 );
2106 }
2107
2108 #[test]
2109 fn semantic_backfill_scheduler_honors_policy_disable_and_force_override() {
2110 let mut policy = SemanticPolicy::compiled_defaults();
2111 policy.mode = crate::search::policy::SemanticMode::LexicalOnly;
2112
2113 let disabled = semantic_backfill_scheduler_decision_for_capacity(
2114 &policy,
2115 64,
2116 &default_scheduler_signals(),
2117 100,
2118 );
2119 assert!(!disabled.should_run());
2120 assert_eq!(disabled.state, SemanticBackfillSchedulerState::Disabled);
2121 assert_eq!(
2122 disabled.reason,
2123 SemanticBackfillSchedulerReason::PolicyDisabled
2124 );
2125
2126 let forced = SemanticBackfillSchedulerSignals {
2127 force: true,
2128 ..default_scheduler_signals()
2129 };
2130 let forced_decision =
2131 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &forced, 100);
2132 assert!(forced_decision.should_run());
2133 assert_eq!(
2134 forced_decision.reason,
2135 SemanticBackfillSchedulerReason::IdleBudgetAvailable
2136 );
2137 assert!(forced_decision.forced);
2138 }
2139
2140 #[test]
2141 fn test_batch_embedding() {
2142 let indexer = SemanticIndexer::new("hash", None).unwrap();
2143 let messages = vec![
2144 EmbeddingInput::new(1, "Hello world"),
2145 EmbeddingInput::new(2, "Goodbye world"),
2146 ];
2147
2148 let embeddings = indexer.embed_messages(&messages).unwrap();
2149
2150 assert_eq!(embeddings.len(), 2);
2151 assert_eq!(embeddings[0].message_id, 1);
2152 assert_eq!(embeddings[1].message_id, 2);
2153 assert_eq!(embeddings[0].embedding.len(), indexer.embedder_dimension());
2154 }
2155
2156 #[test]
2157 fn test_progress_indicator() {
2158 let indexer = SemanticIndexer::new("hash", None).unwrap();
2159 let messages: Vec<_> = (0..1000)
2160 .map(|i| EmbeddingInput::new(i as u64, format!("Message {}", i)))
2161 .collect();
2162
2163 let embeddings = indexer.embed_messages(&messages).unwrap();
2164 assert_eq!(embeddings.len(), messages.len());
2165 }
2166
2167 #[test]
2168 fn test_build_and_save_index() {
2169 let indexer = SemanticIndexer::new("hash", None).unwrap();
2170 let messages = vec![
2171 EmbeddingInput::new(1, "Hello world"),
2172 EmbeddingInput::new(2, "Goodbye world"),
2173 ];
2174
2175 let embeddings = indexer.embed_messages(&messages).unwrap();
2176 let tmp = tempdir().unwrap();
2177 let index = indexer
2178 .build_and_save_index(embeddings, tmp.path())
2179 .unwrap();
2180 assert_eq!(index.embedder_id(), indexer.embedder_id());
2181 assert_eq!(index.dimension(), indexer.embedder_dimension());
2182 assert_eq!(index.record_count(), 2);
2183 }
2184
2185 #[test]
2186 fn sharded_index_build_writes_sidecar_without_runtime_publish() {
2187 let indexer = SemanticIndexer::new("hash", None).unwrap();
2188 let messages: Vec<_> = (0..5)
2189 .map(|idx| EmbeddingInput::new(idx, format!("semantic shard message {idx}")))
2190 .collect();
2191 let embeddings = indexer.embed_messages(&messages).unwrap();
2192 let tmp = tempdir().unwrap();
2193
2194 let outcome = indexer
2195 .build_and_save_index_shards(
2196 embeddings,
2197 tmp.path(),
2198 SemanticShardBuildPlan {
2199 tier: TierKind::Fast,
2200 db_fingerprint: "db-fp-sharded-build".to_string(),
2201 model_revision: "hash".to_string(),
2202 total_conversations: 5,
2203 max_records_per_shard: 2,
2204 build_ann: false,
2205 },
2206 )
2207 .unwrap();
2208
2209 assert_eq!(outcome.shard_count, 3);
2210 assert_eq!(outcome.doc_count, 5);
2211 assert_eq!(outcome.total_conversations, 5);
2212 assert!(outcome.complete);
2213 assert_eq!(outcome.index_paths.len(), 3);
2214 for path in &outcome.index_paths {
2215 let shard = FsVectorIndex::open(path).unwrap();
2216 assert_eq!(shard.embedder_id(), indexer.embedder_id());
2217 assert!(shard.record_count() > 0);
2218 }
2219
2220 let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2221 let summary = shards.summary(TierKind::Fast, indexer.embedder_id(), "db-fp-sharded-build");
2222 assert!(summary.complete);
2223 assert_eq!(summary.ready_shards, 3);
2224 assert_eq!(summary.ann_ready_shards, 0);
2225 assert_eq!(summary.doc_count, 5);
2226 assert_eq!(summary.total_conversations, 5);
2227
2228 assert!(
2229 SemanticManifest::load(tmp.path()).unwrap().is_none(),
2230 "sidecar shards must not publish the main runtime manifest"
2231 );
2232 assert!(!vector_index_path(tmp.path(), indexer.embedder_id()).exists());
2233 }
2234
2235 #[test]
2236 fn sharded_index_build_rejects_zero_sized_shards() {
2237 let indexer = SemanticIndexer::new("hash", None).unwrap();
2238 let err = indexer
2239 .build_and_save_index_shards(
2240 std::iter::empty(),
2241 tempdir().unwrap().path(),
2242 SemanticShardBuildPlan {
2243 tier: TierKind::Fast,
2244 db_fingerprint: "db-fp-sharded-build".to_string(),
2245 model_revision: "hash".to_string(),
2246 total_conversations: 0,
2247 max_records_per_shard: 0,
2248 build_ann: false,
2249 },
2250 )
2251 .unwrap_err();
2252
2253 assert!(err.to_string().contains("max_records_per_shard > 0"));
2254 }
2255
2256 #[test]
2257 fn sharded_ann_build_records_per_shard_accelerators() {
2258 let indexer = SemanticIndexer::new("hash", None).unwrap();
2259 let messages: Vec<_> = (0..8)
2260 .map(|idx| EmbeddingInput::new(idx, format!("semantic ann shard message {idx}")))
2261 .collect();
2262 let embeddings = indexer.embed_messages(&messages).unwrap();
2263 let tmp = tempdir().unwrap();
2264
2265 let outcome = indexer
2266 .build_and_save_index_shards(
2267 embeddings,
2268 tmp.path(),
2269 SemanticShardBuildPlan {
2270 tier: TierKind::Fast,
2271 db_fingerprint: "db-fp-sharded-ann-build".to_string(),
2272 model_revision: "hash".to_string(),
2273 total_conversations: 8,
2274 max_records_per_shard: 4,
2275 build_ann: true,
2276 },
2277 )
2278 .unwrap();
2279
2280 assert_eq!(outcome.shard_count, 2);
2281 assert_eq!(outcome.ann_index_paths.len(), 2);
2282 for path in &outcome.ann_index_paths {
2283 assert!(path.exists(), "ANN shard missing at {}", path.display());
2284 }
2285
2286 let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2287 let summary = shards.summary(
2288 TierKind::Fast,
2289 indexer.embedder_id(),
2290 "db-fp-sharded-ann-build",
2291 );
2292 assert!(summary.complete);
2293 assert_eq!(summary.ann_ready_shards, 2);
2294 assert!(summary.ann_size_bytes > 0);
2295 assert!(
2296 shards
2297 .shards
2298 .iter()
2299 .all(|record| record.ann_index_path.is_some() && record.ann_ready)
2300 );
2301 }
2302
2303 #[test]
2310 fn embed_messages_golden_digest_hash_embedder() {
2311 use ring::digest::{Context, SHA256};
2312
2313 let corpus: Vec<EmbeddingInput> = (0..64)
2314 .map(|i| {
2315 let body = match i % 5 {
2316 0 => format!("plain text message number {i}"),
2317 1 => format!("**bold** line {i} with _emphasis_"),
2318 2 => format!("```rust\nfn f_{i}() {{ println!(\"{i}\"); }}\n```"),
2319 3 => format!(" whitespace {i} "),
2320 _ => format!("unicode \u{00E9}\u{0301} + emoji \u{1F600} {i}"),
2321 };
2322 EmbeddingInput::new(i as u64, body)
2323 })
2324 .collect();
2325
2326 let indexer = SemanticIndexer::new("hash", None)
2327 .unwrap()
2328 .with_batch_size(16)
2329 .unwrap();
2330 let embeddings = indexer.embed_messages(&corpus).unwrap();
2331
2332 let mut ctx = Context::new(&SHA256);
2336 for em in &embeddings {
2337 ctx.update(&em.message_id.to_le_bytes());
2338 ctx.update(&em.content_hash);
2339 for v in &em.embedding {
2340 ctx.update(&v.to_le_bytes());
2341 }
2342 }
2343 let digest = hex::encode(ctx.finish().as_ref());
2344
2345 const EXPECTED: &str = "22d9ae7076925a4b70a194b0f519dfb1d465cc757368c296ef24055a02038c2c";
2352 assert_eq!(
2353 digest, EXPECTED,
2354 "embed_messages golden digest drifted; if this was intentional, \
2355 update EXPECTED in this test and record the reason in the commit message"
2356 );
2357 }
2358
2359 #[test]
2360 fn parallel_prep_matches_serial_prep_bitwise() {
2361 let inputs: Vec<EmbeddingInput> = (0..500)
2364 .map(|i| {
2365 let text = match i % 7 {
2366 0 => format!("Plain message number {i} with some ordinary words."),
2367 1 => format!("**Bold** and _italic_ markdown line {i}"),
2368 2 => format!(
2369 "```rust\nfn example_{i}() {{\n println!(\"code block {i}\");\n}}\n```\nfollow-up text"
2370 ),
2371 3 => String::new(), 4 => format!(" whitespace galore {i} "),
2373 5 => format!("Unicode \u{00E9}\u{0301} (combining accent) and emoji \u{1F600} line {i}"),
2374 _ => format!(
2375 "Mixed line {i}: `inline_code`, [link](http://x), {{braces}}, and \u{201C}curly quotes\u{201D}."
2376 ),
2377 };
2378 EmbeddingInput::new(i as u64, text)
2379 })
2380 .collect();
2381
2382 let serial = prepare_window(&inputs, true);
2383 let parallel = prepare_window(&inputs, false);
2384
2385 assert_eq!(
2386 serial.len(),
2387 parallel.len(),
2388 "serial and parallel prep should skip the same number of empty canonicals"
2389 );
2390
2391 for (s, p) in serial.iter().zip(parallel.iter()) {
2392 assert_eq!(
2393 s.msg.message_id, p.msg.message_id,
2394 "ordering must be preserved between serial and parallel prep"
2395 );
2396 assert_eq!(
2397 s.canonical, p.canonical,
2398 "canonical form diverged between serial and parallel prep"
2399 );
2400 assert_eq!(
2401 s.hash, p.hash,
2402 "content hash diverged between serial and parallel prep"
2403 );
2404 }
2405 }
2406
2407 #[test]
2408 fn parallel_prep_filters_empty_canonicals() {
2409 let inputs = vec![
2410 EmbeddingInput::new(1, "valid content"),
2411 EmbeddingInput::new(2, ""),
2412 EmbeddingInput::new(3, " \n\n \t "),
2413 EmbeddingInput::new(4, "more valid content"),
2414 ];
2415
2416 let prepared = prepare_window(&inputs, false);
2417 let ids: Vec<u64> = prepared.iter().map(|p| p.msg.message_id).collect();
2418
2419 assert!(ids.contains(&1));
2420 assert!(ids.contains(&4));
2421 assert!(!ids.contains(&2));
2423 assert!(!ids.contains(&3));
2424 }
2425
2426 #[test]
2427 fn memoized_serial_prep_matches_stateless_prepare_window() {
2428 let inputs = vec![
2429 EmbeddingInput::new(1, "repeat me exactly"),
2430 EmbeddingInput::new(2, "repeat me exactly"),
2431 EmbeddingInput::new(3, "unique payload"),
2432 EmbeddingInput::new(4, ""),
2433 EmbeddingInput::new(5, "repeat me exactly"),
2434 ];
2435
2436 let baseline = prepare_window(&inputs, true);
2437 let mut cache = ContentAddressedMemoCache::with_capacity(16);
2438 let memoized = prepare_window_with_memo(&inputs, &mut cache);
2439
2440 assert_eq!(baseline.len(), memoized.len());
2441 for (plain, cached) in baseline.iter().zip(memoized.iter()) {
2442 assert_eq!(plain.msg.message_id, cached.msg.message_id);
2443 assert_eq!(plain.canonical, cached.canonical);
2444 assert_eq!(plain.hash, cached.hash);
2445 }
2446 }
2447
2448 #[test]
2449 fn semantic_prep_memo_key_uses_stable_content_hash_bytes() {
2450 let key = semantic_prep_memo_key("repeat me exactly");
2451 let expected = content_hash("repeat me exactly");
2452
2453 assert_eq!(key.content_hash.as_bytes(), expected.as_slice());
2454 assert_eq!(key.content_hash.as_bytes().len(), expected.len());
2455 assert_eq!(key.algorithm, SEMANTIC_PREP_MEMO_ALGORITHM);
2456 assert_eq!(key.algorithm_version, SEMANTIC_PREP_MEMO_VERSION);
2457 }
2458
2459 #[test]
2460 fn memoized_serial_prep_reuses_duplicate_content_across_windows() {
2461 let inputs = vec![
2462 EmbeddingInput::new(1, "repeat me exactly"),
2463 EmbeddingInput::new(2, "repeat me exactly"),
2464 EmbeddingInput::new(3, "unique payload"),
2465 EmbeddingInput::new(4, ""),
2466 EmbeddingInput::new(5, "repeat me exactly"),
2467 ];
2468
2469 let mut cache = ContentAddressedMemoCache::with_capacity(16);
2470 let prepared = prepare_window_with_memo(&inputs, &mut cache);
2471 let stats = cache.stats().clone();
2472
2473 assert_eq!(prepared.len(), 4);
2474 assert_eq!(stats.hits, 2);
2475 assert_eq!(stats.misses, 3);
2476 assert_eq!(stats.inserts, 2);
2477 assert_eq!(stats.live_entries, 2);
2478 }
2479
2480 #[test]
2481 fn packet_embedding_inputs_reuse_memoized_prep_for_duplicate_content() -> Result<()> {
2482 let temp = tempdir().unwrap();
2483 let db_path = temp.path().join("agent_search.db");
2484 let storage = FrankenStorage::open(&db_path)?;
2485 let agent_id = storage.ensure_agent(&Agent {
2486 id: None,
2487 slug: "codex".to_string(),
2488 name: "Codex".to_string(),
2489 version: None,
2490 kind: AgentKind::Cli,
2491 })?;
2492
2493 storage.insert_conversation_tree(
2494 agent_id,
2495 None,
2496 &test_conversation_with_messages(
2497 "packet-memo-conv-one",
2498 vec![
2499 Message {
2500 id: None,
2501 idx: 0,
2502 role: MessageRole::User,
2503 author: None,
2504 created_at: Some(1_700_000_010_100),
2505 content: "shared semantic payload".to_string(),
2506 extra_json: json!({}),
2507 snippets: Vec::new(),
2508 },
2509 Message {
2510 id: None,
2511 idx: 1,
2512 role: MessageRole::Agent,
2513 author: None,
2514 created_at: Some(1_700_000_010_200),
2515 content: "unique semantic payload one".to_string(),
2516 extra_json: json!({}),
2517 snippets: Vec::new(),
2518 },
2519 ],
2520 ),
2521 )?;
2522 storage.insert_conversation_tree(
2523 agent_id,
2524 None,
2525 &test_conversation_with_messages(
2526 "packet-memo-conv-two",
2527 vec![
2528 Message {
2529 id: None,
2530 idx: 0,
2531 role: MessageRole::Tool,
2532 author: None,
2533 created_at: Some(1_700_000_010_300),
2534 content: "shared semantic payload".to_string(),
2535 extra_json: json!({}),
2536 snippets: Vec::new(),
2537 },
2538 Message {
2539 id: None,
2540 idx: 1,
2541 role: MessageRole::Agent,
2542 author: None,
2543 created_at: Some(1_700_000_010_400),
2544 content: "unique semantic payload two".to_string(),
2545 extra_json: json!({}),
2546 snippets: Vec::new(),
2547 },
2548 ],
2549 ),
2550 )?;
2551
2552 let packet_inputs = packet_embedding_inputs_from_storage(&storage)?;
2553 let mut cache = ContentAddressedMemoCache::with_capacity(16);
2554 let prepared = prepare_window_with_memo(&packet_inputs, &mut cache);
2555 let stats = cache.stats().clone();
2556
2557 assert_eq!(packet_inputs.len(), 4);
2558 assert_eq!(prepared.len(), 4);
2559 assert_eq!(
2560 semantic_prep_memo_key("shared semantic payload")
2561 .content_hash
2562 .as_bytes()
2563 .len(),
2564 32
2565 );
2566 assert_eq!(stats.hits, 1);
2567 assert_eq!(stats.misses, 3);
2568 assert_eq!(stats.inserts, 3);
2569 assert_eq!(stats.live_entries, 3);
2570 Ok(())
2571 }
2572
2573 #[test]
2574 fn backfill_batch_saves_checkpoint_and_staged_index_until_complete() {
2575 let temp = tempdir().unwrap();
2576 let mut manifest = SemanticManifest::default();
2577 let indexer = SemanticIndexer::new("hash", None).unwrap();
2578 let messages = vec![
2579 EmbeddingInput::new(10, "first staged semantic message"),
2580 EmbeddingInput::new(11, "second staged semantic message"),
2581 ];
2582
2583 let outcome = indexer
2584 .run_backfill_batch(
2585 &messages,
2586 temp.path(),
2587 &mut manifest,
2588 SemanticBackfillBatchPlan {
2589 tier: TierKind::Fast,
2590 db_fingerprint: "db-fp-backfill-partial".to_string(),
2591 model_revision: "hash".to_string(),
2592 total_conversations: 2,
2593 conversations_in_batch: 1,
2594 last_offset: 1,
2595 },
2596 )
2597 .unwrap();
2598
2599 assert!(!outcome.published);
2600 assert!(outcome.checkpoint_saved);
2601 assert!(outcome.index_path.exists());
2602 assert!(!vector_index_path(temp.path(), indexer.embedder_id()).exists());
2603 let checkpoint = manifest.checkpoint.as_ref().expect("checkpoint");
2604 assert_eq!(checkpoint.tier, TierKind::Fast);
2605 assert_eq!(checkpoint.conversations_processed, 1);
2606 assert_eq!(checkpoint.docs_embedded, 2);
2607 assert_eq!(manifest.backlog.total_conversations, 2);
2608 assert!(SemanticManifest::path(temp.path()).exists());
2609 }
2610
2611 #[test]
2612 fn backfill_batch_resumes_staged_index_and_publishes_manifest_atomically() {
2613 let temp = tempdir().unwrap();
2614 let mut manifest = SemanticManifest::default();
2615 let indexer = SemanticIndexer::new("hash", None).unwrap();
2616 let db_fingerprint = "db-fp-backfill-complete";
2617 let staging_path = semantic_staging_index_path(
2618 temp.path(),
2619 TierKind::Fast,
2620 indexer.embedder_id(),
2621 db_fingerprint,
2622 );
2623
2624 let first = vec![EmbeddingInput::new(20, "first resume batch")];
2625 let first_outcome = indexer
2626 .run_backfill_batch(
2627 &first,
2628 temp.path(),
2629 &mut manifest,
2630 SemanticBackfillBatchPlan {
2631 tier: TierKind::Fast,
2632 db_fingerprint: db_fingerprint.to_string(),
2633 model_revision: "hash".to_string(),
2634 total_conversations: 2,
2635 conversations_in_batch: 1,
2636 last_offset: 1,
2637 },
2638 )
2639 .unwrap();
2640 assert_eq!(first_outcome.index_path, staging_path);
2641 assert!(staging_path.exists());
2642
2643 let second = vec![EmbeddingInput::new(21, "second resume batch")];
2644 let second_outcome = indexer
2645 .run_backfill_batch(
2646 &second,
2647 temp.path(),
2648 &mut manifest,
2649 SemanticBackfillBatchPlan {
2650 tier: TierKind::Fast,
2651 db_fingerprint: db_fingerprint.to_string(),
2652 model_revision: "hash".to_string(),
2653 total_conversations: 2,
2654 conversations_in_batch: 1,
2655 last_offset: 2,
2656 },
2657 )
2658 .unwrap();
2659
2660 assert!(second_outcome.published);
2661 assert!(!second_outcome.checkpoint_saved);
2662 assert!(!staging_path.exists());
2663 let final_path = vector_index_path(temp.path(), indexer.embedder_id());
2664 assert_eq!(second_outcome.index_path, final_path);
2665 assert!(final_path.exists());
2666 assert!(manifest.checkpoint.is_none());
2667 let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
2668 assert!(artifact.ready);
2669 assert_eq!(artifact.conversation_count, 2);
2670 assert_eq!(artifact.doc_count, 2);
2671 assert_eq!(manifest.backlog.fast_tier_processed, 2);
2672
2673 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
2674 assert!(loaded.checkpoint.is_none());
2675 assert!(loaded.fast_tier.as_ref().is_some_and(|record| record.ready));
2676 }
2677
2678 #[test]
2679 fn backfill_publish_compacts_resumed_wal_before_rename() {
2680 let temp = tempdir().unwrap();
2681 let mut manifest = SemanticManifest::default();
2682 let indexer = SemanticIndexer::new("hash", None).unwrap();
2683 let db_fingerprint = "db-fp-backfill-small-resume";
2684 let first: Vec<EmbeddingInput> = (0..20)
2685 .map(|idx| EmbeddingInput::new(100 + idx, format!("first batch message {idx}")))
2686 .collect();
2687
2688 let first_outcome = indexer
2689 .run_backfill_batch(
2690 &first,
2691 temp.path(),
2692 &mut manifest,
2693 SemanticBackfillBatchPlan {
2694 tier: TierKind::Fast,
2695 db_fingerprint: db_fingerprint.to_string(),
2696 model_revision: "hash".to_string(),
2697 total_conversations: 2,
2698 conversations_in_batch: 1,
2699 last_offset: 1,
2700 },
2701 )
2702 .unwrap();
2703 assert!(first_outcome.checkpoint_saved);
2704
2705 let second = vec![EmbeddingInput::new(200, "small final resume batch")];
2706 let second_outcome = indexer
2707 .run_backfill_batch(
2708 &second,
2709 temp.path(),
2710 &mut manifest,
2711 SemanticBackfillBatchPlan {
2712 tier: TierKind::Fast,
2713 db_fingerprint: db_fingerprint.to_string(),
2714 model_revision: "hash".to_string(),
2715 total_conversations: 2,
2716 conversations_in_batch: 1,
2717 last_offset: 2,
2718 },
2719 )
2720 .unwrap();
2721
2722 assert!(second_outcome.published);
2723 let final_path = vector_index_path(temp.path(), indexer.embedder_id());
2724 let mut final_wal_path = final_path.as_os_str().to_os_string();
2725 final_wal_path.push(".wal");
2726 assert!(!PathBuf::from(final_wal_path).exists());
2727
2728 let published_index = FsVectorIndex::open(&final_path).unwrap();
2729 assert_eq!(published_index.record_count(), 21);
2730 let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
2731 assert_eq!(artifact.doc_count, 21);
2732 assert_eq!(artifact.conversation_count, 2);
2733 }
2734
2735 #[test]
2736 fn backfill_from_storage_fetches_canonical_batches_and_resumes() -> Result<()> {
2737 let temp = tempdir().unwrap();
2738 let db_path = temp.path().join("agent_search.db");
2739 let storage = FrankenStorage::open(&db_path)?;
2740 let agent_id = storage.ensure_agent(&Agent {
2741 id: None,
2742 slug: "codex".to_string(),
2743 name: "Codex".to_string(),
2744 version: None,
2745 kind: AgentKind::Cli,
2746 })?;
2747 storage.insert_conversation_tree(
2748 agent_id,
2749 None,
2750 &test_conversation("first", "first canonical semantic message"),
2751 )?;
2752 storage.insert_conversation_tree(
2753 agent_id,
2754 None,
2755 &test_conversation("second", "second canonical semantic message"),
2756 )?;
2757
2758 let mut manifest = SemanticManifest::default();
2759 let indexer = SemanticIndexer::new("hash", None)?;
2760
2761 let first = indexer.run_backfill_from_storage(
2762 &storage,
2763 temp.path(),
2764 &mut manifest,
2765 SemanticBackfillStoragePlan {
2766 tier: TierKind::Fast,
2767 db_fingerprint: "canonical-db-fp".to_string(),
2768 model_revision: "hash".to_string(),
2769 max_conversations: 1,
2770 },
2771 )?;
2772 assert!(!first.published);
2773 assert!(first.checkpoint_saved);
2774 assert_eq!(first.conversations_processed, 1);
2775 assert_eq!(first.total_conversations, 2);
2776 assert_eq!(first.embedded_docs, 1);
2777 assert!(first.last_offset > 0);
2778
2779 let second = indexer.run_backfill_from_storage(
2780 &storage,
2781 temp.path(),
2782 &mut manifest,
2783 SemanticBackfillStoragePlan {
2784 tier: TierKind::Fast,
2785 db_fingerprint: "canonical-db-fp".to_string(),
2786 model_revision: "hash".to_string(),
2787 max_conversations: 1,
2788 },
2789 )?;
2790 assert!(second.published);
2791 assert!(!second.checkpoint_saved);
2792 assert_eq!(second.conversations_processed, 2);
2793 assert_eq!(second.embedded_docs, 1);
2794 assert!(manifest.checkpoint.is_none());
2795 assert_eq!(
2796 manifest.fast_tier.as_ref().map(|record| record.doc_count),
2797 Some(2)
2798 );
2799 Ok(())
2800 }
2801
2802 #[test]
2803 fn canonical_embedding_batch_uses_conversation_packet_semantic_projection() -> Result<()> {
2804 let temp = tempdir().unwrap();
2805 let db_path = temp.path().join("agent_search.db");
2806 let storage = FrankenStorage::open(&db_path)?;
2807 let agent_id = storage.ensure_agent(&Agent {
2808 id: None,
2809 slug: "codex".to_string(),
2810 name: "Codex".to_string(),
2811 version: None,
2812 kind: AgentKind::Cli,
2813 })?;
2814 storage.insert_conversation_tree(
2815 agent_id,
2816 None,
2817 &test_conversation_with_messages(
2818 "packet-projection",
2819 vec![
2820 Message {
2821 id: None,
2822 idx: 0,
2823 role: MessageRole::User,
2824 author: None,
2825 created_at: Some(1_700_000_000_500),
2826 content: "user semantic text".to_string(),
2827 extra_json: json!({}),
2828 snippets: Vec::new(),
2829 },
2830 Message {
2831 id: None,
2832 idx: 1,
2833 role: MessageRole::Tool,
2834 author: None,
2835 created_at: Some(1_700_000_000_600),
2836 content: "tool semantic text".to_string(),
2837 extra_json: json!({}),
2838 snippets: Vec::new(),
2839 },
2840 Message {
2841 id: None,
2842 idx: 2,
2843 role: MessageRole::System,
2844 author: None,
2845 created_at: Some(1_700_000_000_700),
2846 content: String::new(),
2847 extra_json: json!({}),
2848 snippets: Vec::new(),
2849 },
2850 ],
2851 ),
2852 )?;
2853
2854 let batch = fetch_canonical_embedding_batch(&storage, 0, 1)?;
2855
2856 assert_eq!(batch.conversations_in_batch, 1);
2857 assert_eq!(batch.inputs.len(), 2);
2858 assert_eq!(batch.inputs[0].content, "user semantic text");
2859 assert_eq!(batch.inputs[1].content, "tool semantic text");
2860 assert_eq!(batch.inputs[0].role, role_code_from_str("user").unwrap());
2861 assert_eq!(batch.inputs[1].role, role_code_from_str("tool").unwrap());
2862 let normalized_source_id =
2863 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
2864 let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
2865 assert_eq!(batch.inputs[0].source_id, expected_hash);
2866 assert_eq!(batch.inputs[1].source_id, expected_hash);
2867 Ok(())
2868 }
2869
2870 #[test]
2871 fn packet_embedding_inputs_from_storage_since_only_emits_new_canonical_messages() -> Result<()>
2872 {
2873 let temp = tempdir().unwrap();
2874 let db_path = temp.path().join("agent_search.db");
2875 let storage = FrankenStorage::open(&db_path)?;
2876 let agent_id = storage.ensure_agent(&Agent {
2877 id: None,
2878 slug: "codex".to_string(),
2879 name: "Codex".to_string(),
2880 version: None,
2881 kind: AgentKind::Cli,
2882 })?;
2883 storage.insert_conversation_tree(
2884 agent_id,
2885 None,
2886 &test_conversation_with_messages(
2887 "packet-delta",
2888 vec![
2889 Message {
2890 id: None,
2891 idx: 0,
2892 role: MessageRole::User,
2893 author: None,
2894 created_at: Some(1_700_000_000_500),
2895 content: "existing semantic text".to_string(),
2896 extra_json: json!({}),
2897 snippets: Vec::new(),
2898 },
2899 Message {
2900 id: None,
2901 idx: 1,
2902 role: MessageRole::Agent,
2903 author: None,
2904 created_at: Some(1_700_000_000_600),
2905 content: "existing assistant text".to_string(),
2906 extra_json: json!({}),
2907 snippets: Vec::new(),
2908 },
2909 ],
2910 ),
2911 )?;
2912 let watermark: i64 = storage.raw().query_row_map(
2913 "SELECT MAX(id) FROM messages",
2914 &[] as &[ParamValue],
2915 |row| row.get_typed(0),
2916 )?;
2917
2918 storage.insert_conversation_tree(
2919 agent_id,
2920 None,
2921 &test_conversation_with_messages(
2922 "packet-delta",
2923 vec![
2924 Message {
2925 id: None,
2926 idx: 0,
2927 role: MessageRole::User,
2928 author: None,
2929 created_at: Some(1_700_000_000_500),
2930 content: "existing semantic text".to_string(),
2931 extra_json: json!({}),
2932 snippets: Vec::new(),
2933 },
2934 Message {
2935 id: None,
2936 idx: 1,
2937 role: MessageRole::Agent,
2938 author: None,
2939 created_at: Some(1_700_000_000_600),
2940 content: "existing assistant text".to_string(),
2941 extra_json: json!({}),
2942 snippets: Vec::new(),
2943 },
2944 Message {
2945 id: None,
2946 idx: 2,
2947 role: MessageRole::Agent,
2948 author: None,
2949 created_at: Some(1_700_000_000_700),
2950 content: "new packet semantic text".to_string(),
2951 extra_json: json!({}),
2952 snippets: Vec::new(),
2953 },
2954 Message {
2955 id: None,
2956 idx: 3,
2957 role: MessageRole::System,
2958 author: None,
2959 created_at: Some(1_700_000_000_800),
2960 content: String::new(),
2961 extra_json: json!({}),
2962 snippets: Vec::new(),
2963 },
2964 ],
2965 ),
2966 )?;
2967
2968 let batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
2969
2970 assert_eq!(batch.conversations_in_batch, 1);
2971 assert_eq!(batch.inputs.len(), 1);
2972 assert_eq!(batch.inputs[0].content, "new packet semantic text");
2973 assert_eq!(
2974 batch.inputs[0].role,
2975 role_code_from_str("assistant").unwrap()
2976 );
2977 let normalized_source_id =
2978 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
2979 assert_eq!(
2980 batch.inputs[0].source_id,
2981 crc32fast::hash(normalized_source_id.as_bytes())
2982 );
2983 let expected_raw_max_id: i64 = storage.raw().query_row_map(
2984 "SELECT MAX(id) FROM messages",
2985 &[] as &[ParamValue],
2986 |row| row.get_typed(0),
2987 )?;
2988 assert_eq!(batch.raw_max_message_id, Some(expected_raw_max_id));
2989 Ok(())
2990 }
2991
2992 #[test]
2993 fn packet_catch_up_emits_expected_semantic_docs_after_watermark() -> Result<()> {
2994 let temp = tempdir().unwrap();
2995 let db_path = temp.path().join("agent_search.db");
2996 let storage = FrankenStorage::open(&db_path)?;
2997 let agent_id = storage.ensure_agent(&Agent {
2998 id: None,
2999 slug: "codex".to_string(),
3000 name: "Codex".to_string(),
3001 version: None,
3002 kind: AgentKind::Cli,
3003 })?;
3004 let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3005
3006 storage.insert_conversation_tree(
3007 agent_id,
3008 Some(workspace_id),
3009 &test_conversation_with_messages(
3010 "legacy-packet-semantics",
3011 vec![
3012 Message {
3013 id: None,
3014 idx: 0,
3015 role: MessageRole::User,
3016 author: None,
3017 created_at: Some(1_700_000_000_500),
3018 content: "before watermark".to_string(),
3019 extra_json: json!({}),
3020 snippets: Vec::new(),
3021 },
3022 Message {
3023 id: None,
3024 idx: 1,
3025 role: MessageRole::Agent,
3026 author: None,
3027 created_at: Some(1_700_000_000_600),
3028 content: "before watermark assistant".to_string(),
3029 extra_json: json!({}),
3030 snippets: Vec::new(),
3031 },
3032 ],
3033 ),
3034 )?;
3035
3036 let watermark: i64 = storage.raw().query_row_map(
3037 "SELECT MAX(id) FROM messages",
3038 &[] as &[ParamValue],
3039 |row| row.get_typed(0),
3040 )?;
3041
3042 storage.insert_conversation_tree(
3043 agent_id,
3044 Some(workspace_id),
3045 &test_conversation_with_messages(
3046 "legacy-packet-semantics",
3047 vec![
3048 Message {
3049 id: None,
3050 idx: 0,
3051 role: MessageRole::User,
3052 author: None,
3053 created_at: Some(1_700_000_000_500),
3054 content: "before watermark".to_string(),
3055 extra_json: json!({}),
3056 snippets: Vec::new(),
3057 },
3058 Message {
3059 id: None,
3060 idx: 1,
3061 role: MessageRole::Agent,
3062 author: None,
3063 created_at: Some(1_700_000_000_600),
3064 content: "before watermark assistant".to_string(),
3065 extra_json: json!({}),
3066 snippets: Vec::new(),
3067 },
3068 Message {
3069 id: None,
3070 idx: 2,
3071 role: MessageRole::Agent,
3072 author: None,
3073 created_at: Some(1_700_000_000_700),
3074 content: "after watermark assistant".to_string(),
3075 extra_json: json!({}),
3076 snippets: Vec::new(),
3077 },
3078 Message {
3079 id: None,
3080 idx: 3,
3081 role: MessageRole::System,
3082 author: None,
3083 created_at: Some(1_700_000_000_800),
3084 content: String::new(),
3085 extra_json: json!({}),
3086 snippets: Vec::new(),
3087 },
3088 ],
3089 ),
3090 )?;
3091 storage.insert_conversation_tree(
3092 agent_id,
3093 Some(workspace_id),
3094 &test_conversation_with_messages(
3095 "legacy-packet-semantics-second-conv",
3096 vec![
3097 Message {
3098 id: None,
3099 idx: 0,
3100 role: MessageRole::Tool,
3101 author: None,
3102 created_at: Some(1_700_000_000_900),
3103 content: "after watermark tool".to_string(),
3104 extra_json: json!({}),
3105 snippets: Vec::new(),
3106 },
3107 Message {
3108 id: None,
3109 idx: 1,
3110 role: MessageRole::System,
3111 author: None,
3112 created_at: Some(1_700_000_001_000),
3113 content: String::new(),
3114 extra_json: json!({}),
3115 snippets: Vec::new(),
3116 },
3117 ],
3118 ),
3119 )?;
3120
3121 let packet_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3122 let normalized_source_id =
3123 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3124 let source_id_hash = crc32fast::hash(normalized_source_id.as_bytes());
3125 let expected = vec![
3126 ComparableSemanticInput {
3127 message_id: u64::try_from(watermark + 1).unwrap(),
3128 created_at_ms: 1_700_000_000_700,
3129 agent_id: u32::try_from(agent_id).unwrap(),
3130 workspace_id: u32::try_from(workspace_id).unwrap(),
3131 source_id: source_id_hash,
3132 role: role_code_from_str("assistant").unwrap(),
3133 content: "after watermark assistant".to_string(),
3134 },
3135 ComparableSemanticInput {
3136 message_id: u64::try_from(watermark + 3).unwrap(),
3137 created_at_ms: 1_700_000_000_900,
3138 agent_id: u32::try_from(agent_id).unwrap(),
3139 workspace_id: u32::try_from(workspace_id).unwrap(),
3140 source_id: source_id_hash,
3141 role: role_code_from_str("tool").unwrap(),
3142 content: "after watermark tool".to_string(),
3143 },
3144 ];
3145
3146 assert_eq!(comparable_semantic_inputs(packet_batch.inputs), expected);
3147 assert_eq!(packet_batch.conversations_in_batch, 2);
3148 assert_eq!(packet_batch.raw_max_message_id, Some(watermark + 4));
3149 Ok(())
3150 }
3151
3152 #[test]
3153 fn packet_embedding_inputs_for_message_ids_matches_since_selection() -> Result<()> {
3154 let temp = tempdir().unwrap();
3155 let db_path = temp.path().join("agent_search.db");
3156 let storage = FrankenStorage::open(&db_path)?;
3157 let agent_id = storage.ensure_agent(&Agent {
3158 id: None,
3159 slug: "codex".to_string(),
3160 name: "Codex".to_string(),
3161 version: None,
3162 kind: AgentKind::Cli,
3163 })?;
3164 let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3165
3166 storage.insert_conversation_tree(
3167 agent_id,
3168 Some(workspace_id),
3169 &test_conversation_with_messages(
3170 "selected-vs-since",
3171 vec![
3172 Message {
3173 id: None,
3174 idx: 0,
3175 role: MessageRole::User,
3176 author: None,
3177 created_at: Some(1_700_000_100_100),
3178 content: "before watermark".to_string(),
3179 extra_json: json!({}),
3180 snippets: Vec::new(),
3181 },
3182 Message {
3183 id: None,
3184 idx: 1,
3185 role: MessageRole::Agent,
3186 author: None,
3187 created_at: Some(1_700_000_100_200),
3188 content: "before watermark assistant".to_string(),
3189 extra_json: json!({}),
3190 snippets: Vec::new(),
3191 },
3192 ],
3193 ),
3194 )?;
3195
3196 let watermark: i64 = storage.raw().query_row_map(
3197 "SELECT MAX(id) FROM messages",
3198 &[] as &[ParamValue],
3199 |row| row.get_typed(0),
3200 )?;
3201
3202 storage.insert_conversation_tree(
3203 agent_id,
3204 Some(workspace_id),
3205 &test_conversation_with_messages(
3206 "selected-vs-since",
3207 vec![
3208 Message {
3209 id: None,
3210 idx: 0,
3211 role: MessageRole::User,
3212 author: None,
3213 created_at: Some(1_700_000_100_100),
3214 content: "before watermark".to_string(),
3215 extra_json: json!({}),
3216 snippets: Vec::new(),
3217 },
3218 Message {
3219 id: None,
3220 idx: 1,
3221 role: MessageRole::Agent,
3222 author: None,
3223 created_at: Some(1_700_000_100_200),
3224 content: "before watermark assistant".to_string(),
3225 extra_json: json!({}),
3226 snippets: Vec::new(),
3227 },
3228 Message {
3229 id: None,
3230 idx: 2,
3231 role: MessageRole::Tool,
3232 author: None,
3233 created_at: Some(1_700_000_100_300),
3234 content: "after watermark tool".to_string(),
3235 extra_json: json!({}),
3236 snippets: Vec::new(),
3237 },
3238 Message {
3239 id: None,
3240 idx: 3,
3241 role: MessageRole::System,
3242 author: None,
3243 created_at: Some(1_700_000_100_400),
3244 content: String::new(),
3245 extra_json: json!({}),
3246 snippets: Vec::new(),
3247 },
3248 ],
3249 ),
3250 )?;
3251 storage.insert_conversation_tree(
3252 agent_id,
3253 Some(workspace_id),
3254 &test_conversation_with_messages(
3255 "selected-vs-since-second",
3256 vec![Message {
3257 id: None,
3258 idx: 0,
3259 role: MessageRole::Agent,
3260 author: None,
3261 created_at: Some(1_700_000_100_500),
3262 content: "after watermark assistant".to_string(),
3263 extra_json: json!({}),
3264 snippets: Vec::new(),
3265 }],
3266 ),
3267 )?;
3268
3269 let since_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3270 let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
3271 "SELECT DISTINCT conversation_id
3272 FROM messages
3273 WHERE id > ?1
3274 ORDER BY conversation_id ASC",
3275 &[ParamValue::from(watermark)],
3276 |row| row.get_typed(0),
3277 )?;
3278 let selected_message_ids: HashSet<i64> = storage
3279 .raw()
3280 .query_map_collect(
3281 "SELECT id
3282 FROM messages
3283 WHERE id > ?1
3284 ORDER BY id ASC",
3285 &[ParamValue::from(watermark)],
3286 |row| row.get_typed(0),
3287 )?
3288 .into_iter()
3289 .collect();
3290 let selected_inputs = packet_embedding_inputs_from_storage_for_message_ids(
3291 &storage,
3292 &conversation_ids,
3293 &selected_message_ids,
3294 )?;
3295
3296 assert_eq!(
3297 comparable_semantic_inputs(selected_inputs),
3298 comparable_semantic_inputs(since_batch.inputs)
3299 );
3300 Ok(())
3301 }
3302
3303 #[test]
3304 fn default_batch_size_uses_new_value() {
3305 let _guard = EnvVarGuard::remove("CASS_SEMANTIC_BATCH_SIZE");
3308 let indexer = SemanticIndexer::new("hash", None).unwrap();
3309 assert_eq!(indexer.batch_size(), DEFAULT_SEMANTIC_BATCH_SIZE);
3310 }
3311
3312 #[test]
3313 fn parallel_prep_enabled_reuses_truthy_env_parser() {
3314 for (value, expected) in [
3315 ("1", true),
3316 ("true", true),
3317 (" YeS ", true),
3318 ("on", true),
3319 ("0", false),
3320 ("false", false),
3321 ("off", false),
3322 ] {
3323 let _guard = EnvVarGuard::set("CASS_SEMANTIC_PREP_PARALLEL", value);
3324 assert_eq!(parallel_prep_enabled(), expected, "env value {value:?}");
3325 }
3326
3327 let _guard = EnvVarGuard::remove("CASS_SEMANTIC_PREP_PARALLEL");
3328 assert!(!parallel_prep_enabled());
3329 }
3330
3331 #[test]
3332 fn saturating_u64_from_usize_covers_bounds() {
3333 assert_eq!(saturating_u64_from_usize(0), 0);
3334 assert_eq!(saturating_u64_from_usize(42), 42);
3335 assert_eq!(
3336 saturating_u64_from_usize(usize::MAX),
3337 u64::try_from(usize::MAX).unwrap_or(u64::MAX)
3338 );
3339 }
3340
3341 #[test]
3349 fn semantic_inputs_from_packets_matches_storage_replay() -> Result<()> {
3350 let temp = tempdir().unwrap();
3351 let db_path = temp.path().join("agent_search.db");
3352 let storage = FrankenStorage::open(&db_path)?;
3353
3354 let agent_id_codex = storage.ensure_agent(&Agent {
3355 id: None,
3356 slug: "codex".to_string(),
3357 name: "Codex".to_string(),
3358 version: None,
3359 kind: AgentKind::Cli,
3360 })?;
3361 let agent_id_claude = storage.ensure_agent(&Agent {
3362 id: None,
3363 slug: "claude_code".to_string(),
3364 name: "Claude Code".to_string(),
3365 version: None,
3366 kind: AgentKind::Cli,
3367 })?;
3368 let workspace_id =
3369 storage.ensure_workspace(Path::new("/tmp/semantic-equivalence-ws"), None)?;
3370
3371 storage.insert_conversation_tree(
3375 agent_id_codex,
3376 Some(workspace_id),
3377 &test_conversation_with_messages(
3378 "packet-equiv-1",
3379 vec![
3380 Message {
3381 id: None,
3382 idx: 0,
3383 role: MessageRole::User,
3384 author: None,
3385 created_at: Some(1_700_000_000_500),
3386 content: "first user prompt".to_string(),
3387 extra_json: json!({}),
3388 snippets: Vec::new(),
3389 },
3390 Message {
3391 id: None,
3392 idx: 1,
3393 role: MessageRole::Agent,
3394 author: None,
3395 created_at: Some(1_700_000_000_600),
3396 content: "first assistant reply".to_string(),
3397 extra_json: json!({}),
3398 snippets: Vec::new(),
3399 },
3400 Message {
3401 id: None,
3402 idx: 2,
3403 role: MessageRole::System,
3404 author: None,
3405 created_at: Some(1_700_000_000_700),
3406 content: String::new(),
3408 extra_json: json!({}),
3409 snippets: Vec::new(),
3410 },
3411 ],
3412 ),
3413 )?;
3414 storage.insert_conversation_tree(
3415 agent_id_claude,
3416 Some(workspace_id),
3417 &test_conversation_with_messages(
3418 "packet-equiv-2",
3419 vec![
3420 Message {
3421 id: None,
3422 idx: 0,
3423 role: MessageRole::Tool,
3424 author: Some("ripgrep".to_string()),
3425 created_at: Some(1_700_000_001_500),
3426 content: "tool output line".to_string(),
3427 extra_json: json!({}),
3428 snippets: Vec::new(),
3429 },
3430 Message {
3431 id: None,
3432 idx: 1,
3433 role: MessageRole::Agent,
3434 author: None,
3435 created_at: Some(1_700_000_001_600),
3436 content: "second assistant reply".to_string(),
3437 extra_json: json!({}),
3438 snippets: Vec::new(),
3439 },
3440 ],
3441 ),
3442 )?;
3443
3444 let storage_inputs = packet_embedding_inputs_from_storage(&storage)?;
3447
3448 let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
3454 "SELECT DISTINCT m.conversation_id
3455 FROM messages m
3456 JOIN conversations c ON c.id = m.conversation_id
3457 ORDER BY m.conversation_id ASC",
3458 &[] as &[ParamValue],
3459 |row| row.get_typed(0),
3460 )?;
3461 let envelopes = fetch_canonical_embedding_conversations(&storage, &conversation_ids)?;
3462 let mut grouped_messages =
3463 storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
3464 let mut packets: Vec<ConversationPacket> = Vec::with_capacity(envelopes.len());
3465 let mut contexts: Vec<SemanticPacketContext> = Vec::with_capacity(envelopes.len());
3466 for envelope in &envelopes {
3467 let messages = grouped_messages
3468 .remove(&envelope.conversation_id)
3469 .unwrap_or_default();
3470 let provenance = canonical_embedding_packet_provenance(envelope);
3471 let canonical = canonical_embedding_conversation(envelope, &provenance, messages);
3472 packets.push(ConversationPacket::from_canonical_replay(
3473 &canonical, provenance,
3474 ));
3475 contexts.push(SemanticPacketContext {
3476 conversation_id: envelope.conversation_id,
3477 agent_id: saturating_u32_from_i64(envelope.agent_id),
3478 workspace_id: saturating_u32_from_i64(envelope.workspace_id.unwrap_or(0)),
3479 });
3480 }
3481 let packet_inputs = semantic_inputs_from_packets(&packets, &contexts)?;
3482
3483 assert!(
3487 !storage_inputs.is_empty(),
3488 "fixture should produce non-empty semantic inputs (sanity)"
3489 );
3490 assert_eq!(
3491 comparable_semantic_inputs(storage_inputs.clone()),
3492 comparable_semantic_inputs(packet_inputs.clone()),
3493 "packet-driven semantic preparation must match storage replay byte-for-byte"
3494 );
3495
3496 let storage_count = storage_inputs.len();
3501 let packet_count = packet_inputs.len();
3502 assert_eq!(
3503 storage_count, packet_count,
3504 "storage and packet semantic input counts must agree exactly"
3505 );
3506 assert!(
3508 packet_inputs.iter().all(|input| !input.content.is_empty()),
3509 "empty content must be filtered by the packet semantic projection"
3510 );
3511 let normalized_source_id =
3513 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3514 let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3515 assert!(
3516 packet_inputs
3517 .iter()
3518 .all(|input| input.source_id == expected_hash),
3519 "every emitted EmbeddingInput must hash provenance via the packet's normalized source_id"
3520 );
3521
3522 Ok(())
3523 }
3524
3525 #[test]
3531 fn semantic_inputs_from_packets_rejects_length_mismatch() {
3532 let provenance = ConversationPacketProvenance::local();
3533 let canonical = test_conversation("packet-mismatch", "hello");
3534 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
3535 let result = semantic_inputs_from_packets(&[packet], &[]);
3536 assert!(
3537 result.is_err(),
3538 "expected error on packet/context length mismatch"
3539 );
3540 let err = result.unwrap_err().to_string();
3541 assert!(
3542 err.contains("length mismatch"),
3543 "error should mention length mismatch, got: {err}"
3544 );
3545 }
3546}