1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::io::IsTerminal;
4use std::path::{Path, PathBuf};
5use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8use frankensearch::index::{
9 HNSW_DEFAULT_EF_CONSTRUCTION as FS_HNSW_DEFAULT_EF_CONSTRUCTION,
10 HNSW_DEFAULT_M as FS_HNSW_DEFAULT_M, HnswConfig as FsHnswConfig, HnswIndex as FsHnswIndex,
11 Quantization as FsQuantization, VectorIndex as FsVectorIndex,
12 VectorIndexWriter as FsVectorIndexWriter,
13};
14use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
15use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
16use rayon::prelude::*;
17
18use crate::indexer::memoization::{
19 ContentAddressedMemoCache, MemoCacheAuditRecord, MemoContentHash, MemoKey, MemoLookup,
20};
21use crate::indexer::responsiveness;
22use crate::indexer::semantic_progress::{
23 SemanticProgressEvent, SemanticProgressFields, SemanticProgressSink,
24};
25use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
26use crate::model::types::{Conversation, Message};
27use crate::search::canonicalize::{canonicalize_for_embedding, content_hash};
28use crate::search::embedder::Embedder;
29use crate::search::fastembed_embedder::FastEmbedder;
30use crate::search::hash_embedder::HashEmbedder;
31use crate::search::policy::{CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy};
32use crate::search::semantic_manifest::{
33 ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
34 TierKind,
35};
36use crate::search::tantivy::{
37 normalized_index_origin_host, normalized_index_origin_kind, normalized_index_source_id,
38};
39use crate::search::vector_index::{
40 ROLE_USER, SemanticDocId, VECTOR_INDEX_DIR, role_code_from_str, vector_index_path,
41};
42use crate::storage::sqlite::FrankenStorage;
43
44const DEFAULT_SEMANTIC_BATCH_SIZE: usize = 128;
49const DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY: usize = 4_096;
50const DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS: u64 = 30_000;
51const DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS: u64 = 300_000;
52const DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT: usize = 10_000;
53const DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT: u64 = 8 * 1024 * 1024;
54const SEMANTIC_PREP_MEMO_ALGORITHM: &str = "semantic_prepare_window";
55const SEMANTIC_PREP_MEMO_VERSION: &str = "canonicalize_for_embedding:v2:stable-content-hash";
56
57fn resolved_env_usize(key: &str, default: usize) -> usize {
58 dotenvy::var(key)
59 .ok()
60 .and_then(|v| v.parse::<usize>().ok())
61 .unwrap_or(default)
62}
63
64fn resolved_env_u64(key: &str, default: u64) -> u64 {
65 dotenvy::var(key)
66 .ok()
67 .and_then(|v| v.parse::<u64>().ok())
68 .unwrap_or(default)
69}
70
71fn resolved_default_batch_size() -> usize {
72 dotenvy::var("CASS_SEMANTIC_BATCH_SIZE")
73 .ok()
74 .and_then(|v| v.parse::<usize>().ok())
75 .filter(|v| *v > 0)
76 .unwrap_or(DEFAULT_SEMANTIC_BATCH_SIZE)
77}
78
79fn resolved_semantic_prep_memo_capacity() -> usize {
80 dotenvy::var("CASS_SEMANTIC_PREP_MEMO_CAPACITY")
81 .ok()
82 .and_then(|v| v.parse::<usize>().ok())
83 .filter(|v| *v > 0)
84 .unwrap_or(DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY)
85}
86
87fn resolved_semantic_embed_batch_warn_after_ms() -> u64 {
88 resolved_env_u64(
89 "CASS_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS",
90 DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS,
91 )
92}
93
94fn resolved_semantic_embed_batch_fail_after_ms() -> u64 {
95 resolved_env_u64(
96 "CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS",
97 DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS,
98 )
99}
100
101fn parallel_prep_enabled() -> bool {
116 env_truthy("CASS_SEMANTIC_PREP_PARALLEL")
117}
118
119fn saturating_u64_from_usize(value: usize) -> u64 {
120 u64::try_from(value).unwrap_or(u64::MAX)
121}
122
123fn saturating_u64_from_millis(value: u128) -> u64 {
124 u64::try_from(value).unwrap_or(u64::MAX)
125}
126
127#[derive(Debug, Clone)]
128pub struct EmbeddingInput {
129 pub message_id: u64,
130 pub created_at_ms: i64,
131 pub agent_id: u32,
132 pub workspace_id: u32,
133 pub source_id: u32,
134 pub role: u8,
135 pub chunk_idx: u8,
136 pub content: String,
137}
138
139impl EmbeddingInput {
140 pub fn new(message_id: u64, content: impl Into<String>) -> Self {
141 Self {
142 message_id,
143 created_at_ms: 0,
144 agent_id: 0,
145 workspace_id: 0,
146 source_id: 0,
147 role: ROLE_USER,
148 chunk_idx: 0,
149 content: content.into(),
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
155pub struct EmbeddedMessage {
156 pub message_id: u64,
157 pub created_at_ms: i64,
158 pub agent_id: u32,
159 pub workspace_id: u32,
160 pub source_id: u32,
161 pub role: u8,
162 pub chunk_idx: u8,
163 pub content_hash: [u8; 32],
164 pub embedding: Vec<f32>,
165}
166
167#[derive(Debug, Clone)]
168pub struct SemanticBackfillBatchPlan {
169 pub tier: TierKind,
170 pub db_fingerprint: String,
171 pub model_revision: String,
172 pub total_conversations: u64,
173 pub conversations_in_batch: u64,
174 pub last_offset: i64,
175}
176
177#[derive(Debug, Clone)]
178pub struct SemanticBackfillStoragePlan {
179 pub tier: TierKind,
180 pub db_fingerprint: String,
181 pub model_revision: String,
182 pub max_conversations: usize,
183}
184
185#[derive(Debug, Clone)]
186pub struct SemanticBackfillBatchOutcome {
187 pub tier: TierKind,
188 pub embedder_id: String,
189 pub embedded_docs: u64,
190 pub conversations_processed: u64,
191 pub total_conversations: u64,
192 pub last_offset: i64,
193 pub checkpoint_saved: bool,
194 pub published: bool,
195 pub index_path: PathBuf,
196 pub manifest_path: PathBuf,
197}
198
199#[derive(Debug, Clone)]
200pub struct SemanticShardBuildPlan {
201 pub tier: TierKind,
202 pub db_fingerprint: String,
203 pub model_revision: String,
204 pub total_conversations: u64,
205 pub max_records_per_shard: usize,
206 pub build_ann: bool,
207}
208
209#[derive(Debug, Clone)]
210pub struct SemanticShardBuildOutcome {
211 pub tier: TierKind,
212 pub embedder_id: String,
213 pub shard_count: u32,
214 pub doc_count: u64,
215 pub total_conversations: u64,
216 pub index_paths: Vec<PathBuf>,
217 pub ann_index_paths: Vec<PathBuf>,
218 pub shard_manifest_path: PathBuf,
219 pub complete: bool,
220}
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
223#[serde(rename_all = "snake_case")]
224pub(crate) enum SemanticBackfillSchedulerState {
225 Running,
226 Paused,
227 Disabled,
228}
229
230impl SemanticBackfillSchedulerState {
231 pub(crate) fn as_str(self) -> &'static str {
232 match self {
233 Self::Running => "running",
234 Self::Paused => "paused",
235 Self::Disabled => "disabled",
236 }
237 }
238}
239
240#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
241#[serde(rename_all = "snake_case")]
242pub(crate) enum SemanticBackfillSchedulerReason {
243 IdleBudgetAvailable,
244 OperatorDisabled,
245 PolicyDisabled,
246 ForegroundPressure,
247 LexicalRepairActive,
248 CapacityBelowFloor,
249 ThreadBudgetZero,
250 BatchBudgetZero,
251}
252
253impl SemanticBackfillSchedulerReason {
254 pub(crate) fn next_step(self) -> &'static str {
255 match self {
256 Self::IdleBudgetAvailable => "background semantic backfill is within idle budgets",
257 Self::OperatorDisabled => {
258 "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE"
259 }
260 Self::PolicyDisabled => "semantic policy disables background semantic backfill",
261 Self::ForegroundPressure => {
262 "foreground pressure is present; retry after the idle delay"
263 }
264 Self::LexicalRepairActive => "lexical repair is active; semantic backfill is yielding",
265 Self::CapacityBelowFloor => {
266 "machine responsiveness capacity is below the semantic backfill floor"
267 }
268 Self::ThreadBudgetZero => "semantic backfill thread budget is zero",
269 Self::BatchBudgetZero => "semantic backfill batch budget is zero",
270 }
271 }
272}
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275pub(crate) struct SemanticBackfillSchedulerSignals {
276 pub foreground_pressure: bool,
277 pub lexical_repair_active: bool,
278 pub force: bool,
279 pub operator_disabled: bool,
280}
281
282impl SemanticBackfillSchedulerSignals {
283 pub(crate) fn from_env() -> Self {
284 Self {
285 foreground_pressure: env_truthy("CASS_SEMANTIC_BACKFILL_FOREGROUND_ACTIVE"),
286 lexical_repair_active: env_truthy("CASS_SEMANTIC_BACKFILL_LEXICAL_REPAIR_ACTIVE"),
287 force: env_truthy("CASS_SEMANTIC_BACKFILL_FORCE"),
288 operator_disabled: env_truthy("CASS_SEMANTIC_BACKFILL_DISABLE"),
289 }
290 }
291}
292
293#[derive(Debug, Clone, serde::Serialize)]
294pub(crate) struct SemanticBackfillSchedulerDecision {
295 pub state: SemanticBackfillSchedulerState,
296 pub reason: SemanticBackfillSchedulerReason,
297 pub requested_batch_conversations: usize,
298 pub scheduled_batch_conversations: usize,
299 pub current_capacity_pct: u32,
300 pub min_capacity_pct: u32,
301 pub max_backfill_threads: usize,
302 pub idle_delay_seconds: u64,
303 pub chunk_timeout_seconds: u64,
304 pub foreground_pressure: bool,
305 pub lexical_repair_active: bool,
306 pub forced: bool,
307 pub next_eligible_after_ms: u64,
308}
309
310impl SemanticBackfillSchedulerDecision {
311 pub(crate) fn should_run(&self) -> bool {
312 matches!(self.state, SemanticBackfillSchedulerState::Running)
313 }
314}
315
316fn env_truthy(key: &str) -> bool {
317 dotenvy::var(key)
318 .ok()
319 .map(|value| {
320 matches!(
321 value.trim().to_ascii_lowercase().as_str(),
322 "1" | "true" | "yes" | "on"
323 )
324 })
325 .unwrap_or(false)
326}
327
328fn env_backfill_min_capacity_pct() -> u32 {
329 dotenvy::var("CASS_SEMANTIC_BACKFILL_MIN_CAPACITY_PCT")
330 .ok()
331 .and_then(|value| value.trim().parse::<u32>().ok())
332 .map(|value| value.clamp(1, 100))
333 .unwrap_or(75)
334}
335
336pub(crate) fn semantic_backfill_scheduler_decision(
337 policy: &SemanticPolicy,
338 requested_batch_conversations: usize,
339 signals: &SemanticBackfillSchedulerSignals,
340) -> SemanticBackfillSchedulerDecision {
341 semantic_backfill_scheduler_decision_for_capacity(
342 policy,
343 requested_batch_conversations,
344 signals,
345 responsiveness::current_capacity_pct(),
346 )
347}
348
349pub(crate) fn semantic_backfill_scheduler_decision_for_capacity(
350 policy: &SemanticPolicy,
351 requested_batch_conversations: usize,
352 signals: &SemanticBackfillSchedulerSignals,
353 current_capacity_pct: u32,
354) -> SemanticBackfillSchedulerDecision {
355 let min_capacity_pct = env_backfill_min_capacity_pct();
356 let paused_delay_ms = policy.idle_delay_seconds.saturating_mul(1000);
357 let mut decision = SemanticBackfillSchedulerDecision {
358 state: SemanticBackfillSchedulerState::Running,
359 reason: SemanticBackfillSchedulerReason::IdleBudgetAvailable,
360 requested_batch_conversations,
361 scheduled_batch_conversations: requested_batch_conversations,
362 current_capacity_pct: current_capacity_pct.clamp(0, 100),
363 min_capacity_pct,
364 max_backfill_threads: policy.max_backfill_threads,
365 idle_delay_seconds: policy.idle_delay_seconds,
366 chunk_timeout_seconds: policy.chunk_timeout_seconds,
367 foreground_pressure: signals.foreground_pressure,
368 lexical_repair_active: signals.lexical_repair_active,
369 forced: signals.force,
370 next_eligible_after_ms: 0,
371 };
372
373 if requested_batch_conversations == 0 {
374 return stopped_scheduler_decision(
375 decision,
376 SemanticBackfillSchedulerState::Disabled,
377 SemanticBackfillSchedulerReason::BatchBudgetZero,
378 paused_delay_ms,
379 );
380 }
381 if policy.max_backfill_threads == 0 && !signals.force {
382 return stopped_scheduler_decision(
383 decision,
384 SemanticBackfillSchedulerState::Disabled,
385 SemanticBackfillSchedulerReason::ThreadBudgetZero,
386 paused_delay_ms,
387 );
388 }
389 if signals.operator_disabled && !signals.force {
390 return stopped_scheduler_decision(
391 decision,
392 SemanticBackfillSchedulerState::Disabled,
393 SemanticBackfillSchedulerReason::OperatorDisabled,
394 paused_delay_ms,
395 );
396 }
397 if !policy.mode.should_build_semantic() && !signals.force {
398 return stopped_scheduler_decision(
399 decision,
400 SemanticBackfillSchedulerState::Disabled,
401 SemanticBackfillSchedulerReason::PolicyDisabled,
402 paused_delay_ms,
403 );
404 }
405 if signals.lexical_repair_active && !signals.force {
406 return stopped_scheduler_decision(
407 decision,
408 SemanticBackfillSchedulerState::Paused,
409 SemanticBackfillSchedulerReason::LexicalRepairActive,
410 paused_delay_ms,
411 );
412 }
413 if signals.foreground_pressure && !signals.force {
414 return stopped_scheduler_decision(
415 decision,
416 SemanticBackfillSchedulerState::Paused,
417 SemanticBackfillSchedulerReason::ForegroundPressure,
418 paused_delay_ms,
419 );
420 }
421 if current_capacity_pct < min_capacity_pct && !signals.force {
422 return stopped_scheduler_decision(
423 decision,
424 SemanticBackfillSchedulerState::Paused,
425 SemanticBackfillSchedulerReason::CapacityBelowFloor,
426 paused_delay_ms,
427 );
428 }
429
430 let capacity = current_capacity_pct.clamp(1, 100) as usize;
431 let scaled = requested_batch_conversations.saturating_mul(capacity) / 100;
432 decision.scheduled_batch_conversations = scaled.max(1).min(requested_batch_conversations);
433 decision
434}
435
436fn stopped_scheduler_decision(
437 mut decision: SemanticBackfillSchedulerDecision,
438 state: SemanticBackfillSchedulerState,
439 reason: SemanticBackfillSchedulerReason,
440 next_eligible_after_ms: u64,
441) -> SemanticBackfillSchedulerDecision {
442 decision.state = state;
443 decision.reason = reason;
444 decision.scheduled_batch_conversations = 0;
445 decision.next_eligible_after_ms = next_eligible_after_ms;
446 decision
447}
448
449fn now_ms() -> i64 {
450 SystemTime::now()
451 .duration_since(UNIX_EPOCH)
452 .map(|duration| duration.as_millis() as i64)
453 .unwrap_or(0)
454}
455
456fn hnsw_index_path(data_dir: &Path, embedder_id: &str) -> PathBuf {
457 data_dir
458 .join(VECTOR_INDEX_DIR)
459 .join(format!("hnsw-{embedder_id}.chsw"))
460}
461
462fn safe_path_component(raw: &str) -> String {
463 raw.chars()
464 .map(|ch| {
465 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
466 ch
467 } else {
468 '_'
469 }
470 })
471 .collect()
472}
473
474fn semantic_staging_index_path(
475 data_dir: &Path,
476 tier: TierKind,
477 embedder_id: &str,
478 db_fingerprint: &str,
479) -> PathBuf {
480 let fingerprint_hash = crc32fast::hash(db_fingerprint.as_bytes());
481 data_dir.join(VECTOR_INDEX_DIR).join(format!(
482 ".staging-{}-{}-{fingerprint_hash:08x}.fsvi",
483 tier.as_str(),
484 safe_path_component(embedder_id)
485 ))
486}
487
488fn semantic_generation_fingerprint_component(db_fingerprint: &str) -> String {
489 blake3::hash(db_fingerprint.as_bytes())
490 .to_hex()
491 .chars()
492 .take(16)
493 .collect()
494}
495
496fn semantic_shard_generation_dir(
497 data_dir: &Path,
498 tier: TierKind,
499 embedder_id: &str,
500 db_fingerprint: &str,
501) -> PathBuf {
502 let fingerprint_hash = semantic_generation_fingerprint_component(db_fingerprint);
503 data_dir.join(VECTOR_INDEX_DIR).join("shards").join(format!(
504 "{}-{}-{fingerprint_hash}",
505 tier.as_str(),
506 safe_path_component(embedder_id),
507 ))
508}
509
510fn semantic_shard_index_path(
511 data_dir: &Path,
512 tier: TierKind,
513 embedder_id: &str,
514 db_fingerprint: &str,
515 shard_index: u32,
516) -> PathBuf {
517 semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
518 .join(format!("shard-{shard_index:05}.fsvi"))
519}
520
521fn semantic_shard_ann_index_path(
522 data_dir: &Path,
523 tier: TierKind,
524 embedder_id: &str,
525 db_fingerprint: &str,
526 shard_index: u32,
527) -> PathBuf {
528 semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
529 .join(format!("shard-{shard_index:05}.chsw"))
530}
531
532#[cfg(not(windows))]
533fn sync_parent_directory(path: &Path) -> Result<()> {
534 let Some(parent) = path.parent() else {
535 return Ok(());
536 };
537 let directory = fs::File::open(parent)
538 .with_context(|| format!("opening parent directory {}", parent.display()))?;
539 directory
540 .sync_all()
541 .with_context(|| format!("syncing parent directory {}", parent.display()))
542}
543
544#[cfg(windows)]
545fn sync_parent_directory(_path: &Path) -> Result<()> {
546 Ok(())
547}
548
549fn semantic_doc_id_for_embedded(embedded: &EmbeddedMessage) -> String {
550 SemanticDocId {
551 message_id: embedded.message_id,
552 chunk_idx: embedded.chunk_idx,
553 agent_id: embedded.agent_id,
554 workspace_id: embedded.workspace_id,
555 source_id: embedded.source_id,
556 role: embedded.role,
557 created_at_ms: embedded.created_at_ms,
558 content_hash: Some(embedded.content_hash),
559 }
560 .to_doc_id_string()
561}
562
563struct CanonicalEmbeddingConversationRow {
564 conversation_id: i64,
565 agent_slug: String,
566 agent_id: i64,
567 workspace: Option<PathBuf>,
568 workspace_id: Option<i64>,
569 external_id: Option<String>,
570 title: Option<String>,
571 source_path: PathBuf,
572 started_at: Option<i64>,
573 ended_at: Option<i64>,
574 source_id: Option<String>,
575 origin_host: Option<String>,
576}
577
578struct CanonicalEmbeddingBatch {
579 inputs: Vec<EmbeddingInput>,
580 conversations_in_batch: u64,
581 last_conversation_id: i64,
582 total_conversations: u64,
583}
584
585#[derive(Debug, Clone, Copy, PartialEq, Eq)]
586struct SemanticCheckpointCaps {
587 max_messages: usize,
588 max_bytes: u64,
589}
590
591impl SemanticCheckpointCaps {
592 fn unlimited() -> Self {
593 Self {
594 max_messages: 0,
595 max_bytes: 0,
596 }
597 }
598
599 fn from_env() -> Self {
600 Self {
601 max_messages: resolved_env_usize(
602 "CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT",
603 DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT,
604 ),
605 max_bytes: resolved_env_u64(
606 "CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT",
607 DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT,
608 ),
609 }
610 }
611
612 fn message_limited(self) -> bool {
613 self.max_messages > 0
614 }
615
616 fn byte_limited(self) -> bool {
617 self.max_bytes > 0
618 }
619}
620
621pub(crate) struct CanonicalIncrementalEmbeddingBatch {
622 pub inputs: Vec<EmbeddingInput>,
623 pub conversations_in_batch: u64,
624 pub raw_max_message_id: Option<i64>,
625}
626
627fn total_semantic_conversations(storage: &FrankenStorage) -> Result<u64> {
628 let hinted_fallback_sql = "SELECT c.id
629 FROM conversations c
630 WHERE EXISTS (
631 SELECT 1
632 FROM messages INDEXED BY sqlite_autoindex_messages_1
633 WHERE conversation_id = c.id
634 LIMIT 1
635 )";
636 let fallback_sql = "SELECT c.id
637 FROM conversations c
638 WHERE EXISTS (
639 SELECT 1
640 FROM messages
641 WHERE conversation_id = c.id
642 LIMIT 1
643 )";
644 let count: i64 = storage
645 .raw()
646 .query_row_map(
647 "SELECT COUNT(*) FROM conversations WHERE message_count > 0",
648 &[] as &[ParamValue],
649 |row| row.get_typed(0),
650 )
651 .or_else(|err| {
652 if !err.to_string().contains("no such column: message_count") {
653 return Err(err);
654 }
655 let conversation_ids: Vec<i64> = storage
656 .raw()
657 .query_map_collect(hinted_fallback_sql, &[] as &[ParamValue], |row| {
658 row.get_typed(0)
659 })
660 .or_else(|err| {
661 if err
662 .to_string()
663 .contains("no such index: sqlite_autoindex_messages_1")
664 {
665 return storage.raw().query_map_collect(
666 fallback_sql,
667 &[] as &[ParamValue],
668 |row| row.get_typed(0),
669 );
670 }
671 Err(err)
672 })?;
673 Ok(i64::try_from(conversation_ids.len()).unwrap_or(i64::MAX))
674 })
675 .with_context(|| "counting canonical conversations with semantic messages")?;
676 Ok(u64::try_from(count.max(0)).unwrap_or(u64::MAX))
677}
678
679pub(crate) fn message_id_from_db(raw: i64) -> Option<u64> {
680 u64::try_from(raw).ok()
681}
682
683pub(crate) fn saturating_u32_from_i64(raw: i64) -> u32 {
684 match u32::try_from(raw) {
685 Ok(value) => value,
686 Err(_) if raw.is_negative() => 0,
687 Err(_) => u32::MAX,
688 }
689}
690
691fn canonical_embedding_created_at_ms(message_id: u64, created_at: Option<i64>) -> i64 {
692 created_at.unwrap_or_else(|| {
700 tracing::warn!(
701 message_id,
702 "semantic backfill: row has NULL created_at; defaulting to 0 (Unix epoch). \
703 Downstream time-range filters will treat this message as 1970-01-01."
704 );
705 0
706 })
707}
708
709fn canonical_embedding_packet_provenance(
710 row: &CanonicalEmbeddingConversationRow,
711) -> ConversationPacketProvenance {
712 let source_id =
713 normalized_index_source_id(row.source_id.as_deref(), None, row.origin_host.as_deref());
714 ConversationPacketProvenance {
715 origin_kind: normalized_index_origin_kind(&source_id, None),
716 origin_host: normalized_index_origin_host(row.origin_host.as_deref()),
717 source_id,
718 }
719}
720
721fn canonical_embedding_conversation(
722 row: &CanonicalEmbeddingConversationRow,
723 provenance: &ConversationPacketProvenance,
724 messages: Vec<Message>,
725) -> Conversation {
726 Conversation {
727 id: Some(row.conversation_id),
728 agent_slug: row.agent_slug.clone(),
729 workspace: row.workspace.clone(),
730 external_id: row.external_id.clone(),
731 title: row.title.clone(),
732 source_path: row.source_path.clone(),
733 started_at: row.started_at,
734 ended_at: row.ended_at,
735 approx_tokens: None,
736 metadata_json: serde_json::Value::Null,
737 messages,
738 source_id: provenance.source_id.clone(),
739 origin_host: provenance.origin_host.clone(),
740 }
741}
742
743fn embedding_input_from_packet_message(
744 conversation_id: i64,
745 agent_id: u32,
746 workspace_id: u32,
747 source_id_hash: u32,
748 message: &crate::model::conversation_packet::ConversationPacketMessage,
749) -> Option<EmbeddingInput> {
750 let Some(raw_message_id) = message.message_id else {
751 tracing::warn!(
752 conversation_id,
753 message_idx = message.idx,
754 "skipping semantic backfill message without canonical id in ConversationPacket replay"
755 );
756 return None;
757 };
758 let Some(message_id) = message_id_from_db(raw_message_id) else {
759 tracing::warn!(
760 conversation_id,
761 raw_message_id,
762 "skipping out-of-range id during semantic backfill"
763 );
764 return None;
765 };
766 Some(EmbeddingInput {
767 message_id,
768 created_at_ms: canonical_embedding_created_at_ms(message_id, message.created_at),
769 agent_id,
770 workspace_id,
771 source_id: source_id_hash,
772 role: role_code_from_str(&message.role).unwrap_or(ROLE_USER),
773 chunk_idx: 0,
774 content: message.content.clone(),
775 })
776}
777
778fn embedding_inputs_from_conversation_packet(
779 row: &CanonicalEmbeddingConversationRow,
780 packet: &ConversationPacket,
781) -> Vec<EmbeddingInput> {
782 let agent_id = saturating_u32_from_i64(row.agent_id);
783 let workspace_id = saturating_u32_from_i64(row.workspace_id.unwrap_or(0));
784 let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
785 packet
786 .projections
787 .semantic
788 .message_indices
789 .iter()
790 .filter_map(|message_index| {
791 packet
792 .payload
793 .messages
794 .get(*message_index)
795 .and_then(|message| {
796 embedding_input_from_packet_message(
797 row.conversation_id,
798 agent_id,
799 workspace_id,
800 source_id_hash,
801 message,
802 )
803 })
804 })
805 .collect()
806}
807
808fn fetch_canonical_embedding_conversations(
809 storage: &FrankenStorage,
810 conversation_ids: &[i64],
811) -> Result<Vec<CanonicalEmbeddingConversationRow>> {
812 let mut envelope_sql = String::from(
813 "SELECT c.id,
814 COALESCE(a.slug, 'unknown'),
815 COALESCE(c.agent_id, 0),
816 c.workspace_id,
817 w.path,
818 c.external_id,
819 c.title,
820 c.source_path,
821 c.started_at,
822 c.ended_at,
823 c.source_id,
824 c.origin_host
825 FROM conversations c
826 LEFT JOIN agents a ON a.id = c.agent_id
827 LEFT JOIN workspaces w ON w.id = c.workspace_id
828 WHERE c.id IN (",
829 );
830 let mut params = Vec::with_capacity(conversation_ids.len());
831 for (idx, conversation_id) in conversation_ids.iter().enumerate() {
832 if idx > 0 {
833 envelope_sql.push_str(", ");
834 }
835 envelope_sql.push_str(&format!("?{}", idx + 1));
836 params.push(ParamValue::from(*conversation_id));
837 }
838 envelope_sql.push_str(") ORDER BY c.id ASC");
839
840 storage
841 .raw()
842 .query_map_collect(&envelope_sql, ¶ms, |row| {
843 let workspace_path: Option<String> = row.get_typed(4)?;
844 Ok(CanonicalEmbeddingConversationRow {
845 conversation_id: row.get_typed(0)?,
846 agent_slug: row.get_typed(1)?,
847 agent_id: row.get_typed(2)?,
848 workspace_id: row.get_typed(3)?,
849 workspace: workspace_path.map(PathBuf::from),
850 external_id: row.get_typed(5)?,
851 title: row.get_typed(6)?,
852 source_path: PathBuf::from(row.get_typed::<String>(7)?),
853 started_at: row.get_typed(8)?,
854 ended_at: row.get_typed(9)?,
855 source_id: row.get_typed(10)?,
856 origin_host: row.get_typed(11)?,
857 })
858 })
859 .with_context(|| {
860 format!(
861 "fetching semantic backfill conversation envelopes for {} conversations",
862 conversation_ids.len()
863 )
864 })
865}
866
867#[allow(dead_code)]
878#[derive(Debug, Clone, Copy)]
879pub(crate) struct SemanticPacketContext {
880 pub conversation_id: i64,
881 pub agent_id: u32,
882 pub workspace_id: u32,
883}
884
885#[allow(dead_code)]
905pub(crate) fn semantic_inputs_from_packets(
906 packets: &[ConversationPacket],
907 contexts: &[SemanticPacketContext],
908) -> Result<Vec<EmbeddingInput>> {
909 if packets.len() != contexts.len() {
910 anyhow::bail!(
911 "semantic_inputs_from_packets length mismatch: {} packets vs {} contexts",
912 packets.len(),
913 contexts.len()
914 );
915 }
916 let mut inputs = Vec::new();
917 for (packet, context) in packets.iter().zip(contexts.iter()) {
918 let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
919 for &message_index in &packet.projections.semantic.message_indices {
920 let Some(message) = packet.payload.messages.get(message_index) else {
921 anyhow::bail!(
922 "packet semantic projection references missing message index {} \
923 (packet has {} messages)",
924 message_index,
925 packet.payload.messages.len()
926 );
927 };
928 if let Some(input) = embedding_input_from_packet_message(
929 context.conversation_id,
930 context.agent_id,
931 context.workspace_id,
932 source_id_hash,
933 message,
934 ) {
935 inputs.push(input);
936 }
937 }
938 }
939 tracing::debug!(
940 packets = packets.len(),
941 packet_driven = true,
942 semantic_inputs = inputs.len(),
943 "built semantic inputs from in-memory ConversationPacket batch"
944 );
945 Ok(inputs)
946}
947
948fn fetch_canonical_embedding_batch(
949 storage: &FrankenStorage,
950 after_conversation_id: i64,
951 max_conversations: usize,
952) -> Result<CanonicalEmbeddingBatch> {
953 fetch_canonical_embedding_batch_inner(storage, after_conversation_id, max_conversations, None)
954}
955
956fn fetch_canonical_embedding_batch_inner(
962 storage: &FrankenStorage,
963 after_conversation_id: i64,
964 max_conversations: usize,
965 after_message_id: Option<i64>,
966) -> Result<CanonicalEmbeddingBatch> {
967 fetch_canonical_embedding_batch_inner_with_caps(
968 storage,
969 after_conversation_id,
970 max_conversations,
971 after_message_id,
972 SemanticCheckpointCaps::unlimited(),
973 )
974}
975
976fn fetch_canonical_embedding_batch_inner_with_caps(
977 storage: &FrankenStorage,
978 after_conversation_id: i64,
979 max_conversations: usize,
980 after_message_id: Option<i64>,
981 caps: SemanticCheckpointCaps,
982) -> Result<CanonicalEmbeddingBatch> {
983 let total_conversations = total_semantic_conversations(storage)?;
984 let max_conversations_i64 = i64::try_from(max_conversations.max(1)).unwrap_or(i64::MAX);
985 let mut params = vec![
986 ParamValue::from(after_conversation_id),
987 ParamValue::from(max_conversations_i64),
988 ];
989 let message_cursor_predicate = if let Some(after_message_id) = after_message_id {
990 params.push(ParamValue::from(after_message_id));
991 " AND id > ?3"
992 } else {
993 ""
994 };
995 let hinted_sql = format!(
996 "SELECT c.id
997 FROM conversations c
998 WHERE c.id > ?1
999 AND EXISTS (
1000 SELECT 1
1001 FROM messages INDEXED BY sqlite_autoindex_messages_1
1002 WHERE conversation_id = c.id{message_cursor_predicate}
1003 LIMIT 1
1004 )
1005 ORDER BY c.id ASC
1006 LIMIT ?2"
1007 );
1008 let fallback_sql = format!(
1009 "SELECT c.id
1010 FROM conversations c
1011 WHERE c.id > ?1
1012 AND EXISTS (
1013 SELECT 1
1014 FROM messages
1015 WHERE conversation_id = c.id{message_cursor_predicate}
1016 LIMIT 1
1017 )
1018 ORDER BY c.id ASC
1019 LIMIT ?2"
1020 );
1021 let conversation_ids: Vec<i64> = storage
1022 .raw()
1023 .query_map_collect(&hinted_sql, ¶ms, |row| row.get_typed(0))
1024 .or_else(|err| {
1025 if err
1026 .to_string()
1027 .contains("no such index: sqlite_autoindex_messages_1")
1028 {
1029 return storage
1030 .raw()
1031 .query_map_collect(&fallback_sql, ¶ms, |row| row.get_typed(0));
1032 }
1033 Err(err)
1034 })
1035 .with_context(|| {
1036 format!("fetching semantic backfill conversation ids after {after_conversation_id}")
1037 })?;
1038
1039 if conversation_ids.is_empty() {
1040 return Ok(CanonicalEmbeddingBatch {
1041 inputs: Vec::new(),
1042 conversations_in_batch: 0,
1043 last_conversation_id: after_conversation_id,
1044 total_conversations,
1045 });
1046 }
1047
1048 let conversations = fetch_canonical_embedding_conversations(storage, &conversation_ids)?;
1049
1050 let mut grouped_messages =
1051 storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
1052 let (conversations, capped_last_conversation_id) = select_checkpoint_capped_conversations(
1053 conversations,
1054 &mut grouped_messages,
1055 after_message_id,
1056 caps,
1057 );
1058 let (inputs, _) = packet_embedding_inputs_from_materialized_canonical_messages(
1059 &conversations,
1060 &mut grouped_messages,
1061 |_| true,
1062 );
1063
1064 let conversations_in_batch = u64::try_from(conversations.len()).unwrap_or(u64::MAX);
1065 tracing::debug!(
1066 conversations_in_batch,
1067 packet_driven = true,
1068 semantic_inputs = inputs.len(),
1069 max_messages_per_checkpoint = caps.max_messages,
1070 max_bytes_per_checkpoint = caps.max_bytes,
1071 ?after_message_id,
1072 "built semantic backfill batch from ConversationPacket canonical replay"
1073 );
1074
1075 Ok(CanonicalEmbeddingBatch {
1076 inputs,
1077 conversations_in_batch,
1078 last_conversation_id: capped_last_conversation_id.unwrap_or(after_conversation_id),
1079 total_conversations,
1080 })
1081}
1082
1083fn select_checkpoint_capped_conversations(
1084 conversations: Vec<CanonicalEmbeddingConversationRow>,
1085 grouped_messages: &mut HashMap<i64, Vec<Message>>,
1086 after_message_id: Option<i64>,
1087 caps: SemanticCheckpointCaps,
1088) -> (Vec<CanonicalEmbeddingConversationRow>, Option<i64>) {
1089 let mut selected = Vec::new();
1090 let mut selected_messages = 0usize;
1091 let mut selected_bytes = 0u64;
1092 let mut last_conversation_id = None;
1093
1094 for conversation in conversations {
1095 let mut messages = grouped_messages
1096 .remove(&conversation.conversation_id)
1097 .unwrap_or_default();
1098 if let Some(min_exclusive) = after_message_id {
1099 messages.retain(|message| message.id.is_some_and(|id| id > min_exclusive));
1100 }
1101 if messages.is_empty() {
1102 continue;
1103 }
1104
1105 let message_count = messages.len();
1106 let byte_count = messages
1107 .iter()
1108 .map(|message| saturating_u64_from_usize(message.content.len()))
1109 .fold(0u64, u64::saturating_add);
1110 let would_exceed_messages = caps.message_limited()
1111 && !selected.is_empty()
1112 && selected_messages.saturating_add(message_count) > caps.max_messages;
1113 let would_exceed_bytes = caps.byte_limited()
1114 && !selected.is_empty()
1115 && selected_bytes.saturating_add(byte_count) > caps.max_bytes;
1116 if would_exceed_messages || would_exceed_bytes {
1117 tracing::debug!(
1118 conversation_id = conversation.conversation_id,
1119 selected_conversations = selected.len(),
1120 selected_messages,
1121 selected_bytes,
1122 candidate_messages = message_count,
1123 candidate_bytes = byte_count,
1124 max_messages_per_checkpoint = caps.max_messages,
1125 max_bytes_per_checkpoint = caps.max_bytes,
1126 "semantic checkpoint cap stopped this batch before the next full conversation"
1127 );
1128 break;
1129 }
1130
1131 selected_messages = selected_messages.saturating_add(message_count);
1132 selected_bytes = selected_bytes.saturating_add(byte_count);
1133 last_conversation_id = Some(conversation.conversation_id);
1134 grouped_messages.insert(conversation.conversation_id, messages);
1135 selected.push(conversation);
1136 }
1137
1138 (selected, last_conversation_id)
1139}
1140
1141pub(crate) fn packet_embedding_inputs_from_storage(
1142 storage: &FrankenStorage,
1143) -> Result<Vec<EmbeddingInput>> {
1144 Ok(fetch_canonical_embedding_batch(storage, 0, usize::MAX)?.inputs)
1145}
1146
1147fn packet_embedding_inputs_from_selected_canonical_messages<F>(
1148 storage: &FrankenStorage,
1149 conversation_ids: &[i64],
1150 include_message: F,
1151) -> Result<(Vec<EmbeddingInput>, Option<i64>)>
1152where
1153 F: FnMut(&Message) -> bool,
1154{
1155 if conversation_ids.is_empty() {
1156 return Ok((Vec::new(), None));
1157 }
1158
1159 let conversations = fetch_canonical_embedding_conversations(storage, conversation_ids)?;
1160 let mut grouped_messages =
1161 storage.fetch_messages_for_lexical_rebuild_batch(conversation_ids, None, None)?;
1162 Ok(
1163 packet_embedding_inputs_from_materialized_canonical_messages(
1164 &conversations,
1165 &mut grouped_messages,
1166 include_message,
1167 ),
1168 )
1169}
1170
1171fn packet_embedding_inputs_from_materialized_canonical_messages<F>(
1172 conversations: &[CanonicalEmbeddingConversationRow],
1173 grouped_messages: &mut HashMap<i64, Vec<Message>>,
1174 mut include_message: F,
1175) -> (Vec<EmbeddingInput>, Option<i64>)
1176where
1177 F: FnMut(&Message) -> bool,
1178{
1179 let mut inputs = Vec::new();
1180 let mut raw_max_message_id: Option<i64> = None;
1181
1182 for conversation in conversations {
1183 let mut messages = grouped_messages
1184 .remove(&conversation.conversation_id)
1185 .unwrap_or_default();
1186 messages.retain(|message| {
1187 let keep = include_message(message);
1188 if keep && let Some(message_id) = message.id {
1189 raw_max_message_id =
1190 Some(raw_max_message_id.map_or(message_id, |current| current.max(message_id)));
1191 }
1192 keep
1193 });
1194 if messages.is_empty() {
1195 continue;
1196 }
1197
1198 let provenance = canonical_embedding_packet_provenance(conversation);
1199 let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
1200 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
1201 inputs.extend(embedding_inputs_from_conversation_packet(
1202 conversation,
1203 &packet,
1204 ));
1205 }
1206
1207 (inputs, raw_max_message_id)
1208}
1209
1210pub(crate) fn packet_embedding_inputs_from_storage_since(
1211 storage: &FrankenStorage,
1212 since_message_id: i64,
1213) -> Result<CanonicalIncrementalEmbeddingBatch> {
1214 let conversation_ids: Vec<i64> = storage
1215 .raw()
1216 .query_map_collect(
1217 "SELECT DISTINCT m.conversation_id
1218 FROM messages m
1219 WHERE m.id > ?1
1220 ORDER BY m.conversation_id ASC",
1221 &[ParamValue::from(since_message_id)],
1222 |row| row.get_typed(0),
1223 )
1224 .with_context(|| {
1225 format!(
1226 "fetching canonical semantic catch-up conversation ids after message {since_message_id}"
1227 )
1228 })?;
1229
1230 if conversation_ids.is_empty() {
1231 return Ok(CanonicalIncrementalEmbeddingBatch {
1232 inputs: Vec::new(),
1233 conversations_in_batch: 0,
1234 raw_max_message_id: None,
1235 });
1236 }
1237
1238 let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1239 storage,
1240 &conversation_ids,
1241 |message| message.id.is_some_and(|id| id > since_message_id),
1242 )?;
1243
1244 let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
1245 tracing::debug!(
1246 since_message_id,
1247 conversations_in_batch,
1248 packet_driven = true,
1249 semantic_inputs = inputs.len(),
1250 "built semantic catch-up batch from ConversationPacket canonical replay"
1251 );
1252
1253 Ok(CanonicalIncrementalEmbeddingBatch {
1254 inputs,
1255 conversations_in_batch,
1256 raw_max_message_id,
1257 })
1258}
1259
1260pub(crate) fn packet_embedding_inputs_from_storage_for_message_ids(
1261 storage: &FrankenStorage,
1262 conversation_ids: &[i64],
1263 message_ids: &HashSet<i64>,
1264) -> Result<Vec<EmbeddingInput>> {
1265 if conversation_ids.is_empty() || message_ids.is_empty() {
1266 return Ok(Vec::new());
1267 }
1268
1269 let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1270 storage,
1271 conversation_ids,
1272 |message| message.id.is_some_and(|id| message_ids.contains(&id)),
1273 )?;
1274 tracing::debug!(
1275 conversations_in_batch = conversation_ids.len(),
1276 selected_message_ids = message_ids.len(),
1277 semantic_inputs = inputs.len(),
1278 raw_max_message_id,
1279 packet_driven = true,
1280 "built selected semantic batch from ConversationPacket canonical replay"
1281 );
1282
1283 Ok(inputs)
1284}
1285
1286struct Prepared<'a> {
1287 msg: &'a EmbeddingInput,
1288 canonical: String,
1289 hash: [u8; 32],
1290}
1291
1292#[derive(Debug, Clone, PartialEq, Eq)]
1293struct MemoizedPreparedMessage {
1294 canonical: String,
1295 hash: [u8; 32],
1296}
1297
1298fn semantic_prep_memo_key(content: &str) -> MemoKey {
1299 MemoKey::new(
1300 MemoContentHash::from_bytes(content_hash(content).to_vec()),
1301 SEMANTIC_PREP_MEMO_ALGORITHM,
1302 SEMANTIC_PREP_MEMO_VERSION,
1303 )
1304}
1305
1306fn memo_counter_delta(after: u64, before: u64) -> u64 {
1307 after.saturating_sub(before)
1308}
1309
1310fn trace_semantic_prep_memo_window(
1311 window_index: usize,
1312 window_len: usize,
1313 prepared_len: usize,
1314 entry_capacity: usize,
1315 before: &crate::indexer::memoization::MemoCacheStats,
1316 after: &crate::indexer::memoization::MemoCacheStats,
1317) {
1318 tracing::trace!(
1319 algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1320 algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1321 window_index,
1322 window_len,
1323 prepared_messages = prepared_len,
1324 skipped_messages = window_len.saturating_sub(prepared_len),
1325 hit_delta = memo_counter_delta(after.hits, before.hits),
1326 miss_delta = memo_counter_delta(after.misses, before.misses),
1327 insert_delta = memo_counter_delta(after.inserts, before.inserts),
1328 evictions_capacity_delta =
1329 memo_counter_delta(after.evictions_capacity, before.evictions_capacity),
1330 quarantined_delta = memo_counter_delta(after.quarantined, before.quarantined),
1331 live_entries = after.live_entries,
1332 entry_capacity,
1333 "semantic prep memo cache window"
1334 );
1335}
1336
1337fn trace_semantic_prep_memo_audit(audit: &MemoCacheAuditRecord) {
1338 tracing::trace!(?audit, "semantic prep memo cache audit");
1339}
1340
1341fn prepare_window_with_memo<'a>(
1342 window: &'a [EmbeddingInput],
1343 cache: &mut ContentAddressedMemoCache<MemoizedPreparedMessage>,
1344) -> Vec<Prepared<'a>> {
1345 window
1346 .iter()
1347 .filter_map(|msg| {
1348 let key = semantic_prep_memo_key(&msg.content);
1349 let (lookup, lookup_audit) = cache.get_with_audit(&key);
1350 trace_semantic_prep_memo_audit(&lookup_audit);
1351 match lookup {
1352 MemoLookup::Hit { value } => Some(Prepared {
1353 msg,
1354 canonical: value.canonical,
1355 hash: value.hash,
1356 }),
1357 MemoLookup::Miss | MemoLookup::Quarantined { .. } => {
1358 let canonical = canonicalize_for_embedding(&msg.content);
1359 if canonical.is_empty() {
1360 return None;
1361 }
1362 let hash = content_hash(&canonical);
1363 let insert_audit = cache.insert_with_audit(
1364 key,
1365 MemoizedPreparedMessage {
1366 canonical: canonical.clone(),
1367 hash,
1368 },
1369 );
1370 trace_semantic_prep_memo_audit(&insert_audit);
1371 Some(Prepared {
1372 msg,
1373 canonical,
1374 hash,
1375 })
1376 }
1377 }
1378 })
1379 .collect()
1380}
1381
1382fn prepare_window<'a>(window: &'a [EmbeddingInput], serial: bool) -> Vec<Prepared<'a>> {
1389 let prep = |msg: &'a EmbeddingInput| -> Option<Prepared<'a>> {
1390 let canonical = canonicalize_for_embedding(&msg.content);
1391 if canonical.is_empty() {
1392 return None;
1393 }
1394 let hash = content_hash(&canonical);
1395 Some(Prepared {
1396 msg,
1397 canonical,
1398 hash,
1399 })
1400 };
1401
1402 if serial {
1403 window.iter().filter_map(prep).collect()
1404 } else {
1405 window.par_iter().filter_map(prep).collect()
1406 }
1407}
1408
1409fn flush_prepared_batch(
1410 batch: &[Prepared<'_>],
1411 embeddings: &mut Vec<EmbeddedMessage>,
1412 pb: &ProgressBar,
1413 embedder: &dyn Embedder,
1414) -> Result<()> {
1415 if batch.is_empty() {
1416 return Ok(());
1417 }
1418
1419 let texts: Vec<&str> = batch.iter().map(|p| p.canonical.as_str()).collect();
1420 let vectors = embedder
1421 .embed_batch_sync(&texts)
1422 .map_err(|e| anyhow::anyhow!("embedding failed: {e}"))?;
1423
1424 if vectors.len() != batch.len() {
1425 bail!(
1426 "embedder returned {} embeddings for {} inputs",
1427 vectors.len(),
1428 batch.len()
1429 );
1430 }
1431
1432 for (prepared, vector) in batch.iter().zip(vectors) {
1433 if vector.len() != embedder.dimension() {
1434 bail!(
1435 "embedding dimension mismatch: expected {}, got {}",
1436 embedder.dimension(),
1437 vector.len()
1438 );
1439 }
1440 embeddings.push(EmbeddedMessage {
1441 message_id: prepared.msg.message_id,
1442 created_at_ms: prepared.msg.created_at_ms,
1443 agent_id: prepared.msg.agent_id,
1444 workspace_id: prepared.msg.workspace_id,
1445 source_id: prepared.msg.source_id,
1446 role: prepared.msg.role,
1447 chunk_idx: prepared.msg.chunk_idx,
1448 content_hash: prepared.hash,
1449 embedding: vector,
1450 });
1451 }
1452
1453 pb.inc(saturating_u64_from_usize(batch.len()));
1454 Ok(())
1455}
1456
1457pub struct SemanticIndexer {
1458 embedder: Box<dyn Embedder>,
1459 batch_size: usize,
1460}
1461
1462impl SemanticIndexer {
1463 pub fn new(embedder_type: &str, data_dir: Option<&Path>) -> Result<Self> {
1464 let embedder: Box<dyn Embedder> = match embedder_type {
1465 "fastembed" | "minilm" | "snowflake-arctic-s" | "nomic-embed" => {
1466 let dir = data_dir
1467 .ok_or_else(|| anyhow::anyhow!("data_dir required for fastembed embedder"))?;
1468 let embedder_name = if embedder_type == "fastembed" {
1469 "minilm"
1470 } else {
1471 embedder_type
1472 };
1473 Box::new(
1474 FastEmbedder::load_by_name(dir, embedder_name)
1475 .map_err(|e| anyhow::anyhow!("fastembed unavailable: {e}"))?,
1476 )
1477 }
1478 "hash" => Box::new(HashEmbedder::default()),
1479 other => bail!("unknown embedder: {other}"),
1480 };
1481
1482 Ok(Self {
1483 embedder,
1484 batch_size: resolved_default_batch_size(),
1485 })
1486 }
1487
1488 pub fn with_batch_size(mut self, batch_size: usize) -> Result<Self> {
1489 if batch_size == 0 {
1490 bail!("batch_size must be > 0");
1491 }
1492 self.batch_size = batch_size;
1493 Ok(self)
1494 }
1495
1496 pub fn batch_size(&self) -> usize {
1497 self.batch_size
1498 }
1499
1500 pub fn embedder_id(&self) -> &str {
1501 self.embedder.id()
1502 }
1503
1504 pub fn embedder_dimension(&self) -> usize {
1505 self.embedder.dimension()
1506 }
1507
1508 pub fn embed_messages(&self, messages: &[EmbeddingInput]) -> Result<Vec<EmbeddedMessage>> {
1509 self.embed_messages_with_sink(messages, &SemanticProgressSink::disabled())
1510 }
1511
1512 pub fn embed_messages_with_sink(
1517 &self,
1518 messages: &[EmbeddingInput],
1519 sink: &SemanticProgressSink,
1520 ) -> Result<Vec<EmbeddedMessage>> {
1521 if messages.is_empty() {
1522 return Ok(Vec::new());
1523 }
1524
1525 let show_progress = std::io::stderr().is_terminal();
1526 let pb = ProgressBar::new(saturating_u64_from_usize(messages.len()));
1527 if show_progress {
1528 let style = ProgressStyle::default_bar()
1529 .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} messages embedded")
1530 .unwrap_or_else(|_| ProgressStyle::default_bar());
1531 pb.set_style(style);
1532 } else {
1533 pb.set_draw_target(ProgressDrawTarget::hidden());
1534 }
1535
1536 let mut embeddings = Vec::with_capacity(messages.len());
1537
1538 let window = self.batch_size.saturating_mul(4);
1546 let serial_prep = !parallel_prep_enabled();
1547 let prep_memo_capacity = resolved_semantic_prep_memo_capacity();
1548 let mut prep_memo =
1549 serial_prep.then(|| ContentAddressedMemoCache::with_capacity(prep_memo_capacity));
1550 let mut batch_index: u64 = 0;
1551 let mut rows_processed: u64 = 0;
1552 let rows_total = u64::try_from(messages.len()).ok();
1553 let warn_after_ms = resolved_semantic_embed_batch_warn_after_ms();
1554 let fail_after_ms = resolved_semantic_embed_batch_fail_after_ms();
1555 for (window_index, window_slice) in messages.chunks(window).enumerate() {
1556 let prepared_window = match prep_memo.as_mut() {
1557 Some(cache) => {
1558 let stats_before = cache.stats().clone();
1559 let prepared_window = prepare_window_with_memo(window_slice, cache);
1560 trace_semantic_prep_memo_window(
1561 window_index,
1562 window_slice.len(),
1563 prepared_window.len(),
1564 prep_memo_capacity,
1565 &stats_before,
1566 cache.stats(),
1567 );
1568 prepared_window
1569 }
1570 None => prepare_window(window_slice, false),
1571 };
1572 let skipped_in_window = window_slice.len() - prepared_window.len();
1573 if skipped_in_window > 0 {
1574 pb.inc(saturating_u64_from_usize(skipped_in_window));
1575 }
1576
1577 for batch in prepared_window.chunks(self.batch_size) {
1578 let batch_rows = u64::try_from(batch.len()).unwrap_or(u64::MAX);
1579 let batch_bytes: u64 = batch
1585 .iter()
1586 .map(|p| saturating_u64_from_usize(p.canonical.len()))
1587 .sum();
1588 if sink.is_active() {
1589 sink.emit(
1590 SemanticProgressEvent::EmbedBatchStart,
1591 SemanticProgressFields {
1592 batch_index: Some(batch_index),
1593 batch_rows: Some(batch_rows),
1594 rows_processed: Some(rows_processed),
1595 rows_total,
1596 bytes: Some(batch_bytes),
1597 ..Default::default()
1598 },
1599 );
1600 }
1601 let batch_started = Instant::now();
1602 flush_prepared_batch(batch, &mut embeddings, &pb, self.embedder.as_ref())?;
1603 let elapsed_ms = saturating_u64_from_millis(batch_started.elapsed().as_millis());
1604 rows_processed = rows_processed.saturating_add(batch_rows);
1605 if warn_after_ms > 0 && elapsed_ms > warn_after_ms {
1606 tracing::warn!(
1607 batch_index,
1608 elapsed_ms,
1609 warn_after_ms,
1610 batch_rows,
1611 batch_bytes,
1612 embedder = self.embedder_id(),
1613 "semantic embed batch exceeded watchdog warning threshold"
1614 );
1615 }
1616 if fail_after_ms > 0 && elapsed_ms > fail_after_ms {
1617 bail!(
1618 "semantic embed batch {batch_index} took {elapsed_ms}ms, exceeding \
1619 CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS={fail_after_ms}"
1620 );
1621 }
1622 if sink.is_active() {
1623 sink.emit(
1624 SemanticProgressEvent::EmbedBatchDone,
1625 SemanticProgressFields {
1626 batch_index: Some(batch_index),
1627 batch_rows: Some(batch_rows),
1628 rows_processed: Some(rows_processed),
1629 rows_total,
1630 bytes: Some(batch_bytes),
1631 ..Default::default()
1632 },
1633 );
1634 }
1635 batch_index = batch_index.saturating_add(1);
1636 }
1637 }
1638
1639 if let Some(cache) = prep_memo.as_ref() {
1640 let stats = cache.stats();
1641 tracing::debug!(
1642 algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1643 algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1644 hits = stats.hits,
1645 misses = stats.misses,
1646 inserts = stats.inserts,
1647 quarantined = stats.quarantined,
1648 live_entries = stats.live_entries,
1649 entry_capacity = prep_memo_capacity,
1650 "semantic prep memo cache summary"
1651 );
1652 }
1653
1654 pb.finish_with_message("Embedding complete");
1655 Ok(embeddings)
1656 }
1657
1658 pub fn build_and_save_index<I>(
1659 &self,
1660 embedded_messages: I,
1661 data_dir: &Path,
1662 ) -> Result<FsVectorIndex>
1663 where
1664 I: IntoIterator<Item = EmbeddedMessage>,
1665 {
1666 let index_path = vector_index_path(data_dir, self.embedder_id());
1667 self.build_and_save_index_at_path(embedded_messages, &index_path)
1668 }
1669
1670 pub fn build_and_save_index_shards<I>(
1671 &self,
1672 embedded_messages: I,
1673 data_dir: &Path,
1674 plan: SemanticShardBuildPlan,
1675 ) -> Result<SemanticShardBuildOutcome>
1676 where
1677 I: IntoIterator<Item = EmbeddedMessage>,
1678 {
1679 if plan.db_fingerprint.trim().is_empty() {
1680 bail!("semantic shard build requires a non-empty DB fingerprint");
1681 }
1682 if plan.max_records_per_shard == 0 {
1683 bail!("semantic shard build requires max_records_per_shard > 0");
1684 }
1685
1686 let mut shard_records = Vec::new();
1687 let mut index_paths = Vec::new();
1688 let mut ann_index_paths = Vec::new();
1689 let mut current_records = Vec::with_capacity(plan.max_records_per_shard);
1690 let mut shard_index = 0u32;
1691 let mut total_docs = 0u64;
1692
1693 for embedded in embedded_messages {
1694 current_records.push(embedded);
1695 if current_records.len() >= plan.max_records_per_shard {
1696 let records = std::mem::take(&mut current_records);
1697 let (record, path, ann_path) =
1698 self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1699 total_docs = total_docs.saturating_add(record.doc_count);
1700 shard_records.push(record);
1701 index_paths.push(path);
1702 if let Some(path) = ann_path {
1703 ann_index_paths.push(path);
1704 }
1705 shard_index = shard_index
1706 .checked_add(1)
1707 .context("semantic shard index overflow")?;
1708 }
1709 }
1710
1711 if !current_records.is_empty() {
1712 let records = std::mem::take(&mut current_records);
1713 let (record, path, ann_path) =
1714 self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1715 total_docs = total_docs.saturating_add(record.doc_count);
1716 shard_records.push(record);
1717 index_paths.push(path);
1718 if let Some(path) = ann_path {
1719 ann_index_paths.push(path);
1720 }
1721 }
1722
1723 let shard_count = u32::try_from(shard_records.len())
1724 .context("semantic shard generation exceeded u32 shard count")?;
1725 for record in &mut shard_records {
1726 record.shard_count = shard_count;
1727 }
1728
1729 let mut shard_manifest = SemanticShardManifest::load_or_default(data_dir)
1730 .map_err(|err| anyhow::anyhow!("loading semantic shard manifest for publish: {err}"))?;
1731 shard_manifest.replace_shards_for_generation(
1732 plan.tier,
1733 self.embedder_id(),
1734 &plan.db_fingerprint,
1735 shard_records,
1736 );
1737 shard_manifest
1738 .save(data_dir)
1739 .map_err(|err| anyhow::anyhow!("saving semantic shard manifest: {err}"))?;
1740 let summary = shard_manifest.summary(plan.tier, self.embedder_id(), &plan.db_fingerprint);
1741
1742 tracing::info!(
1743 tier = plan.tier.as_str(),
1744 embedder = self.embedder_id(),
1745 shard_count,
1746 doc_count = total_docs,
1747 total_conversations = plan.total_conversations,
1748 "published semantic shard generation sidecar"
1749 );
1750
1751 Ok(SemanticShardBuildOutcome {
1752 tier: plan.tier,
1753 embedder_id: self.embedder_id().to_string(),
1754 shard_count,
1755 doc_count: total_docs,
1756 total_conversations: plan.total_conversations,
1757 index_paths,
1758 ann_index_paths,
1759 shard_manifest_path: SemanticShardManifest::path(data_dir),
1760 complete: summary.complete,
1761 })
1762 }
1763
1764 fn write_semantic_shard(
1765 &self,
1766 embedded_messages: Vec<EmbeddedMessage>,
1767 data_dir: &Path,
1768 plan: &SemanticShardBuildPlan,
1769 shard_index: u32,
1770 ) -> Result<(SemanticShardRecord, PathBuf, Option<PathBuf>)> {
1771 let started_at_ms = now_ms();
1772 let shard_path = semantic_shard_index_path(
1773 data_dir,
1774 plan.tier,
1775 self.embedder_id(),
1776 &plan.db_fingerprint,
1777 shard_index,
1778 );
1779 let shard_index_file = self.build_and_save_index_at_path(embedded_messages, &shard_path)?;
1780 let size_bytes = fs::metadata(&shard_path)
1781 .with_context(|| format!("stat semantic shard {}", shard_path.display()))?
1782 .len();
1783 let (ann_index_path, ann_size_bytes, ann_ready, ann_absolute_path) = if plan.build_ann {
1784 let ann_path = semantic_shard_ann_index_path(
1785 data_dir,
1786 plan.tier,
1787 self.embedder_id(),
1788 &plan.db_fingerprint,
1789 shard_index,
1790 );
1791 let config = FsHnswConfig {
1792 m: FS_HNSW_DEFAULT_M,
1793 ef_construction: FS_HNSW_DEFAULT_EF_CONSTRUCTION,
1794 ..FsHnswConfig::default()
1795 };
1796 let hnsw = FsHnswIndex::build_from_vector_index(&shard_index_file, config)
1797 .map_err(|err| anyhow::anyhow!("build shard HNSW index failed: {err}"))?;
1798 hnsw.save(&ann_path)
1799 .map_err(|err| anyhow::anyhow!("save shard HNSW index failed: {err}"))?;
1800 let ann_size_bytes = fs::metadata(&ann_path)
1801 .with_context(|| format!("stat semantic shard ANN {}", ann_path.display()))?
1802 .len();
1803 let relative_ann_path = ann_path
1804 .strip_prefix(data_dir)
1805 .unwrap_or(ann_path.as_path())
1806 .to_string_lossy()
1807 .to_string();
1808 (
1809 Some(relative_ann_path),
1810 ann_size_bytes,
1811 true,
1812 Some(ann_path),
1813 )
1814 } else {
1815 (None, 0, false, None)
1816 };
1817 let relative_index_path = shard_path
1818 .strip_prefix(data_dir)
1819 .unwrap_or(shard_path.as_path())
1820 .to_string_lossy()
1821 .to_string();
1822 let record = SemanticShardRecord {
1823 tier: plan.tier,
1824 embedder_id: self.embedder_id().to_string(),
1825 model_revision: plan.model_revision.clone(),
1826 schema_version: SEMANTIC_SCHEMA_VERSION,
1827 chunking_version: CHUNKING_STRATEGY_VERSION,
1828 dimension: self.embedder_dimension(),
1829 shard_index,
1830 shard_count: 0,
1831 doc_count: u64::try_from(shard_index_file.record_count()).unwrap_or(u64::MAX),
1832 total_conversations: plan.total_conversations,
1833 db_fingerprint: plan.db_fingerprint.clone(),
1834 index_path: relative_index_path,
1835 quantization: "f16".to_string(),
1836 mmap_ready: true,
1837 ann_index_path,
1838 ann_size_bytes,
1839 ann_ready,
1840 size_bytes,
1841 started_at_ms,
1842 completed_at_ms: now_ms(),
1843 ready: true,
1844 };
1845 Ok((record, shard_path, ann_absolute_path))
1846 }
1847
1848 fn build_and_save_index_at_path<I>(
1849 &self,
1850 embedded_messages: I,
1851 index_path: &Path,
1852 ) -> Result<FsVectorIndex>
1853 where
1854 I: IntoIterator<Item = EmbeddedMessage>,
1855 {
1856 if let Some(parent) = index_path.parent() {
1857 std::fs::create_dir_all(parent)?;
1858 }
1859
1860 let mut writer: FsVectorIndexWriter = FsVectorIndex::create_with_revision(
1862 index_path,
1863 self.embedder_id(),
1864 "1.0",
1865 self.embedder_dimension(),
1866 FsQuantization::F16,
1867 )
1868 .map_err(|err| anyhow::anyhow!("create fsvi index failed: {err}"))?;
1869
1870 let write_result: Result<()> = (|| {
1871 for embedded in embedded_messages {
1872 if embedded.embedding.len() != self.embedder_dimension() {
1873 bail!(
1874 "embedding dimension mismatch: expected {}, got {}",
1875 self.embedder_dimension(),
1876 embedded.embedding.len()
1877 );
1878 }
1879 let doc_id = semantic_doc_id_for_embedded(&embedded);
1880 writer
1881 .write_record(&doc_id, &embedded.embedding)
1882 .map_err(|err| anyhow::anyhow!("write fsvi record failed: {err}"))?;
1883 }
1884 Ok(())
1885 })();
1886
1887 if let Err(e) = &write_result {
1888 tracing::warn!("removing partial vector index after write failure: {e}");
1890 if let Err(rm_err) = std::fs::remove_file(index_path) {
1891 tracing::error!(
1892 "failed to remove partial index file {}: {rm_err}",
1893 index_path.display()
1894 );
1895 }
1896 return Err(anyhow::anyhow!("{e}"));
1897 }
1898
1899 writer
1900 .finish()
1901 .map_err(|err| anyhow::anyhow!("finish fsvi index failed: {err}"))?;
1902
1903 FsVectorIndex::open(index_path)
1904 .map_err(|err| anyhow::anyhow!("open fsvi index failed: {err}"))
1905 }
1906
1907 pub fn append_to_index(
1915 &self,
1916 embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1917 data_dir: &Path,
1918 ) -> Result<usize> {
1919 let index_path = vector_index_path(data_dir, self.embedder_id());
1920 self.append_to_index_path(embedded_messages, &index_path)
1921 }
1922
1923 fn append_to_index_path(
1924 &self,
1925 embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1926 index_path: &Path,
1927 ) -> Result<usize> {
1928 let mut index = FsVectorIndex::open(index_path)
1929 .map_err(|err| anyhow::anyhow!("open fsvi index for append: {err}"))?;
1930
1931 let entries: Vec<(String, Vec<f32>)> = embedded_messages
1932 .into_iter()
1933 .map(|em| {
1934 let doc_id = semantic_doc_id_for_embedded(&em);
1935 (doc_id, em.embedding)
1936 })
1937 .collect();
1938
1939 let count = entries.len();
1940 if count == 0 {
1941 return Ok(0);
1942 }
1943
1944 index
1945 .append_batch(&entries)
1946 .map_err(|err| anyhow::anyhow!("append_batch: {err}"))?;
1947
1948 if index.needs_compaction() {
1949 index
1950 .compact()
1951 .map_err(|err| anyhow::anyhow!("compaction: {err}"))?;
1952 }
1953
1954 Ok(count)
1955 }
1956
1957 fn write_backfill_staging_index(
1958 &self,
1959 embedded_messages: Vec<EmbeddedMessage>,
1960 staging_path: &Path,
1961 resume_existing: bool,
1962 ) -> Result<FsVectorIndex> {
1963 if resume_existing && staging_path.exists() {
1964 self.append_to_index_path(embedded_messages, staging_path)?;
1965 FsVectorIndex::open(staging_path)
1966 .map_err(|err| anyhow::anyhow!("open staged semantic index failed: {err}"))
1967 } else {
1968 self.build_and_save_index_at_path(embedded_messages, staging_path)
1969 }
1970 }
1971
1972 pub fn run_backfill_batch(
1973 &self,
1974 messages: &[EmbeddingInput],
1975 data_dir: &Path,
1976 manifest: &mut SemanticManifest,
1977 plan: SemanticBackfillBatchPlan,
1978 ) -> Result<SemanticBackfillBatchOutcome> {
1979 self.run_backfill_batch_with_sink(
1980 messages,
1981 data_dir,
1982 manifest,
1983 plan,
1984 None,
1985 &SemanticProgressSink::disabled(),
1986 )
1987 }
1988
1989 pub fn run_backfill_batch_with_sink(
1995 &self,
1996 messages: &[EmbeddingInput],
1997 data_dir: &Path,
1998 manifest: &mut SemanticManifest,
1999 plan: SemanticBackfillBatchPlan,
2000 last_message_id: Option<i64>,
2001 sink: &SemanticProgressSink,
2002 ) -> Result<SemanticBackfillBatchOutcome> {
2003 if plan.db_fingerprint.trim().is_empty() {
2004 bail!("semantic backfill requires a non-empty DB fingerprint");
2005 }
2006 if plan.total_conversations == 0 && plan.conversations_in_batch > 0 {
2007 bail!("semantic backfill batch cannot process conversations when total is zero");
2008 }
2009
2010 let manifest_path = SemanticManifest::path(data_dir);
2011 let staging_path = semantic_staging_index_path(
2012 data_dir,
2013 plan.tier,
2014 self.embedder_id(),
2015 &plan.db_fingerprint,
2016 );
2017 let final_path = vector_index_path(data_dir, self.embedder_id());
2018
2019 let prior_checkpoint = manifest
2020 .checkpoint
2021 .as_ref()
2022 .filter(|checkpoint| {
2023 checkpoint.tier == plan.tier
2024 && checkpoint.embedder_id == self.embedder_id()
2025 && checkpoint.is_valid(&plan.db_fingerprint)
2026 })
2027 .cloned();
2028 let prior_conversations = prior_checkpoint
2029 .as_ref()
2030 .map_or(0, |checkpoint| checkpoint.conversations_processed);
2031 let prior_docs = prior_checkpoint
2032 .as_ref()
2033 .map_or(0, |checkpoint| checkpoint.docs_embedded);
2034
2035 let embeddings = self.embed_messages_with_sink(messages, sink)?;
2036 let embedded_docs = u64::try_from(embeddings.len()).unwrap_or(u64::MAX);
2037 if sink.is_active() {
2038 sink.emit(
2039 SemanticProgressEvent::StagingWriteStart,
2040 SemanticProgressFields {
2041 batch_rows: Some(embedded_docs),
2042 note: Some(staging_path.display().to_string()),
2043 ..Default::default()
2044 },
2045 );
2046 }
2047 let mut staged_index = self.write_backfill_staging_index(
2048 embeddings,
2049 &staging_path,
2050 prior_checkpoint.is_some(),
2051 )?;
2052 if sink.is_active() {
2053 sink.emit(
2054 SemanticProgressEvent::StagingWriteDone,
2055 SemanticProgressFields {
2056 batch_rows: Some(embedded_docs),
2057 note: Some(staging_path.display().to_string()),
2058 ..Default::default()
2059 },
2060 );
2061 }
2062 let conversations_processed = prior_conversations
2063 .saturating_add(plan.conversations_in_batch)
2064 .min(plan.total_conversations);
2065 let complete = conversations_processed >= plan.total_conversations;
2066
2067 manifest.refresh_backlog(plan.total_conversations, &plan.db_fingerprint);
2068
2069 if complete {
2070 let db_fingerprint = plan.db_fingerprint.clone();
2071 if staged_index.wal_record_count() > 0 {
2072 staged_index.compact().map_err(|err| {
2073 anyhow::anyhow!("compact staged semantic index failed: {err}")
2074 })?;
2075 }
2076 drop(staged_index);
2077 if sink.is_active() {
2078 sink.emit(
2079 SemanticProgressEvent::PublishStart,
2080 SemanticProgressFields {
2081 rows_processed: Some(conversations_processed),
2082 rows_total: Some(plan.total_conversations),
2083 last_conversation_id: Some(plan.last_offset),
2084 last_message_id,
2085 note: Some(final_path.display().to_string()),
2086 ..Default::default()
2087 },
2088 );
2089 }
2090 fs::rename(&staging_path, &final_path).with_context(|| {
2091 format!(
2092 "publishing staged semantic index {} to {}",
2093 staging_path.display(),
2094 final_path.display()
2095 )
2096 })?;
2097 sync_parent_directory(&final_path)?;
2098 let published_index = FsVectorIndex::open(&final_path)
2099 .map_err(|err| anyhow::anyhow!("open published semantic index failed: {err}"))?;
2100 let size_bytes = fs::metadata(&final_path)
2101 .with_context(|| format!("stat published semantic index {}", final_path.display()))?
2102 .len();
2103 let relative_index_path = final_path
2104 .strip_prefix(data_dir)
2105 .unwrap_or(final_path.as_path())
2106 .to_string_lossy()
2107 .to_string();
2108 manifest.publish_artifact(ArtifactRecord {
2109 tier: plan.tier,
2110 embedder_id: self.embedder_id().to_string(),
2111 model_revision: plan.model_revision,
2112 schema_version: SEMANTIC_SCHEMA_VERSION,
2113 chunking_version: CHUNKING_STRATEGY_VERSION,
2114 dimension: self.embedder_dimension(),
2115 doc_count: u64::try_from(published_index.record_count()).unwrap_or(u64::MAX),
2116 conversation_count: conversations_processed,
2117 db_fingerprint: plan.db_fingerprint,
2118 index_path: relative_index_path,
2119 size_bytes,
2120 started_at_ms: prior_checkpoint
2121 .as_ref()
2122 .map_or_else(now_ms, |checkpoint| checkpoint.saved_at_ms),
2123 completed_at_ms: now_ms(),
2124 ready: true,
2125 });
2126 manifest.refresh_backlog(plan.total_conversations, &db_fingerprint);
2127 manifest.save(data_dir)?;
2128 if sink.is_active() {
2129 sink.emit(
2130 SemanticProgressEvent::PublishDone,
2131 SemanticProgressFields {
2132 rows_processed: Some(conversations_processed),
2133 rows_total: Some(plan.total_conversations),
2134 last_conversation_id: Some(plan.last_offset),
2135 last_message_id,
2136 note: Some(final_path.display().to_string()),
2137 ..Default::default()
2138 },
2139 );
2140 }
2141 } else {
2142 let docs_embedded_on_disk =
2143 u64::try_from(staged_index.record_count()).unwrap_or(u64::MAX);
2144 let checkpoint_docs = prior_docs
2145 .saturating_add(embedded_docs)
2146 .max(docs_embedded_on_disk);
2147 if sink.is_active() {
2148 sink.emit(
2149 SemanticProgressEvent::CheckpointSaveStart,
2150 SemanticProgressFields {
2151 rows_processed: Some(conversations_processed),
2152 rows_total: Some(plan.total_conversations),
2153 last_conversation_id: Some(plan.last_offset),
2154 last_message_id,
2155 ..Default::default()
2156 },
2157 );
2158 }
2159 let prior_last_message_id = prior_checkpoint
2163 .as_ref()
2164 .and_then(|checkpoint| checkpoint.last_message_id);
2165 manifest.save_checkpoint(BuildCheckpoint {
2166 tier: plan.tier,
2167 embedder_id: self.embedder_id().to_string(),
2168 last_offset: plan.last_offset,
2169 docs_embedded: checkpoint_docs,
2170 conversations_processed,
2171 total_conversations: plan.total_conversations,
2172 db_fingerprint: plan.db_fingerprint,
2173 schema_version: SEMANTIC_SCHEMA_VERSION,
2174 chunking_version: CHUNKING_STRATEGY_VERSION,
2175 saved_at_ms: now_ms(),
2176 last_message_id: last_message_id.or(prior_last_message_id),
2177 });
2178 manifest.save(data_dir)?;
2179 if sink.is_active() {
2180 sink.emit(
2181 SemanticProgressEvent::CheckpointSaveDone,
2182 SemanticProgressFields {
2183 rows_processed: Some(conversations_processed),
2184 rows_total: Some(plan.total_conversations),
2185 last_conversation_id: Some(plan.last_offset),
2186 last_message_id: last_message_id.or(prior_last_message_id),
2187 ..Default::default()
2188 },
2189 );
2190 }
2191 }
2192
2193 Ok(SemanticBackfillBatchOutcome {
2194 tier: plan.tier,
2195 embedder_id: self.embedder_id().to_string(),
2196 embedded_docs,
2197 conversations_processed,
2198 total_conversations: plan.total_conversations,
2199 last_offset: plan.last_offset,
2200 checkpoint_saved: !complete,
2201 published: complete,
2202 index_path: if complete { final_path } else { staging_path },
2203 manifest_path,
2204 })
2205 }
2206
2207 pub fn run_backfill_from_storage(
2208 &self,
2209 storage: &FrankenStorage,
2210 data_dir: &Path,
2211 manifest: &mut SemanticManifest,
2212 plan: SemanticBackfillStoragePlan,
2213 ) -> Result<SemanticBackfillBatchOutcome> {
2214 self.run_backfill_from_storage_with_sink(
2215 storage,
2216 data_dir,
2217 manifest,
2218 plan,
2219 &SemanticProgressSink::disabled(),
2220 )
2221 }
2222
2223 pub fn run_backfill_from_storage_with_sink(
2228 &self,
2229 storage: &FrankenStorage,
2230 data_dir: &Path,
2231 manifest: &mut SemanticManifest,
2232 plan: SemanticBackfillStoragePlan,
2233 sink: &SemanticProgressSink,
2234 ) -> Result<SemanticBackfillBatchOutcome> {
2235 self.run_backfill_from_storage_with_caps_and_sink(
2236 storage,
2237 data_dir,
2238 manifest,
2239 plan,
2240 SemanticCheckpointCaps::unlimited(),
2241 sink,
2242 )
2243 }
2244
2245 pub fn run_capped_backfill_from_storage_with_sink(
2252 &self,
2253 storage: &FrankenStorage,
2254 data_dir: &Path,
2255 manifest: &mut SemanticManifest,
2256 plan: SemanticBackfillStoragePlan,
2257 sink: &SemanticProgressSink,
2258 ) -> Result<SemanticBackfillBatchOutcome> {
2259 self.run_backfill_from_storage_with_caps_and_sink(
2260 storage,
2261 data_dir,
2262 manifest,
2263 plan,
2264 SemanticCheckpointCaps::from_env(),
2265 sink,
2266 )
2267 }
2268
2269 fn run_backfill_from_storage_with_caps_and_sink(
2270 &self,
2271 storage: &FrankenStorage,
2272 data_dir: &Path,
2273 manifest: &mut SemanticManifest,
2274 plan: SemanticBackfillStoragePlan,
2275 caps: SemanticCheckpointCaps,
2276 sink: &SemanticProgressSink,
2277 ) -> Result<SemanticBackfillBatchOutcome> {
2278 let prior_checkpoint = manifest.checkpoint.as_ref().filter(|checkpoint| {
2279 checkpoint.tier == plan.tier
2280 && checkpoint.embedder_id == self.embedder_id()
2281 && checkpoint.is_valid(&plan.db_fingerprint)
2282 });
2283 let after_conversation_id = prior_checkpoint.map_or(0, |checkpoint| checkpoint.last_offset);
2284 let prior_last_message_id =
2285 prior_checkpoint.and_then(|checkpoint| checkpoint.last_message_id);
2286
2287 if sink.is_active() {
2288 sink.emit(
2289 SemanticProgressEvent::SelectionStart,
2290 SemanticProgressFields {
2291 last_conversation_id: Some(after_conversation_id),
2292 last_message_id: prior_last_message_id,
2293 rows_total: Some(saturating_u64_from_usize(plan.max_conversations)),
2294 note: Some(plan.tier.as_str().to_string()),
2295 ..Default::default()
2296 },
2297 );
2298 }
2299
2300 let batch = match fetch_canonical_embedding_batch_inner_with_caps(
2301 storage,
2302 after_conversation_id,
2303 plan.max_conversations,
2304 prior_last_message_id,
2305 caps,
2306 ) {
2307 Ok(batch) => batch,
2308 Err(err) => {
2309 if sink.is_active() {
2310 sink.emit(
2311 SemanticProgressEvent::Error,
2312 SemanticProgressFields {
2313 error: Some(format!("selection: {err}")),
2314 last_conversation_id: Some(after_conversation_id),
2315 last_message_id: prior_last_message_id,
2316 ..Default::default()
2317 },
2318 );
2319 }
2320 return Err(err);
2321 }
2322 };
2323
2324 if sink.is_active() {
2325 sink.emit(
2326 SemanticProgressEvent::SelectionDone,
2327 SemanticProgressFields {
2328 last_conversation_id: Some(batch.last_conversation_id),
2329 last_message_id: prior_last_message_id,
2330 conversations_in_batch: Some(batch.conversations_in_batch),
2331 rows_total: Some(batch.total_conversations),
2332 ..Default::default()
2333 },
2334 );
2335 sink.emit(
2347 SemanticProgressEvent::PacketReplayStart,
2348 SemanticProgressFields {
2349 conversations_in_batch: Some(batch.conversations_in_batch),
2350 ..Default::default()
2351 },
2352 );
2353 sink.emit(
2354 SemanticProgressEvent::PacketReplayProgress,
2355 SemanticProgressFields {
2356 conversations_in_batch: Some(batch.conversations_in_batch),
2357 rows_processed: Some(saturating_u64_from_usize(batch.inputs.len())),
2358 bytes: Some(
2359 batch
2360 .inputs
2361 .iter()
2362 .map(|i| saturating_u64_from_usize(i.content.len()))
2363 .sum(),
2364 ),
2365 ..Default::default()
2366 },
2367 );
2368 sink.emit(
2369 SemanticProgressEvent::PacketReplayDone,
2370 SemanticProgressFields {
2371 conversations_in_batch: Some(batch.conversations_in_batch),
2372 rows_processed: Some(saturating_u64_from_usize(batch.inputs.len())),
2373 ..Default::default()
2374 },
2375 );
2376 }
2377
2378 let batch_last_message_id = batch
2382 .inputs
2383 .iter()
2384 .map(|input| i64::try_from(input.message_id).unwrap_or(i64::MAX))
2385 .max();
2386 let next_last_message_id = match (prior_last_message_id, batch_last_message_id) {
2387 (Some(prior), Some(batch_max)) => Some(prior.max(batch_max)),
2388 (Some(prior), None) => Some(prior),
2389 (None, Some(batch_max)) => Some(batch_max),
2390 (None, None) => None,
2391 };
2392
2393 let outcome = self.run_backfill_batch_with_sink(
2394 &batch.inputs,
2395 data_dir,
2396 manifest,
2397 SemanticBackfillBatchPlan {
2398 tier: plan.tier,
2399 db_fingerprint: plan.db_fingerprint,
2400 model_revision: plan.model_revision,
2401 total_conversations: batch.total_conversations,
2402 conversations_in_batch: batch.conversations_in_batch,
2403 last_offset: batch.last_conversation_id,
2404 },
2405 next_last_message_id,
2406 sink,
2407 );
2408
2409 match &outcome {
2410 Ok(o) => {
2411 if sink.is_active() {
2412 sink.emit(
2413 SemanticProgressEvent::Complete,
2414 SemanticProgressFields {
2415 rows_processed: Some(o.conversations_processed),
2416 rows_total: Some(o.total_conversations),
2417 last_conversation_id: Some(o.last_offset),
2418 last_message_id: next_last_message_id,
2419 ..Default::default()
2420 },
2421 );
2422 }
2423 }
2424 Err(err) => {
2425 if sink.is_active() {
2426 sink.emit(
2427 SemanticProgressEvent::Error,
2428 SemanticProgressFields {
2429 error: Some(err.to_string()),
2430 last_conversation_id: Some(batch.last_conversation_id),
2431 last_message_id: next_last_message_id,
2432 ..Default::default()
2433 },
2434 );
2435 }
2436 }
2437 }
2438
2439 outcome
2440 }
2441
2442 pub fn build_hnsw_index(
2456 &self,
2457 vector_index: &FsVectorIndex,
2458 data_dir: &Path,
2459 m: Option<usize>,
2460 ef_construction: Option<usize>,
2461 ) -> Result<PathBuf> {
2462 let m = m.unwrap_or(FS_HNSW_DEFAULT_M);
2463 let ef_construction = ef_construction.unwrap_or(FS_HNSW_DEFAULT_EF_CONSTRUCTION);
2464
2465 tracing::info!(
2466 embedder = self.embedder_id(),
2467 count = vector_index.record_count(),
2468 m,
2469 ef_construction,
2470 "Building HNSW index for approximate nearest neighbor search"
2471 );
2472
2473 let config = FsHnswConfig {
2474 m,
2475 ef_construction,
2476 ..FsHnswConfig::default()
2477 };
2478 let hnsw = FsHnswIndex::build_from_vector_index(vector_index, config)
2479 .map_err(|err| anyhow::anyhow!("build HNSW index failed: {err}"))?;
2480
2481 let hnsw_path = hnsw_index_path(data_dir, self.embedder_id());
2482 if let Some(parent) = hnsw_path.parent() {
2483 std::fs::create_dir_all(parent)?;
2484 }
2485 hnsw.save(&hnsw_path)
2486 .map_err(|err| anyhow::anyhow!("save HNSW index failed: {err}"))?;
2487
2488 tracing::info!(?hnsw_path, "Saved HNSW index");
2489 Ok(hnsw_path)
2490 }
2491}
2492
2493#[cfg(test)]
2494mod tests {
2495 use super::*;
2496 use crate::model::types::{Agent, AgentKind, Conversation, Message, MessageRole};
2497 use crate::storage::sqlite::FrankenStorage;
2498 use serde_json::json;
2499 use std::path::Path;
2500 use tempfile::tempdir;
2501
2502 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2503 struct ComparableSemanticInput {
2504 message_id: u64,
2505 created_at_ms: i64,
2506 agent_id: u32,
2507 workspace_id: u32,
2508 source_id: u32,
2509 role: u8,
2510 content: String,
2511 }
2512
2513 fn comparable_semantic_inputs(mut inputs: Vec<EmbeddingInput>) -> Vec<ComparableSemanticInput> {
2514 let mut comparable: Vec<ComparableSemanticInput> = inputs
2515 .drain(..)
2516 .map(|input| ComparableSemanticInput {
2517 message_id: input.message_id,
2518 created_at_ms: input.created_at_ms,
2519 agent_id: input.agent_id,
2520 workspace_id: input.workspace_id,
2521 source_id: input.source_id,
2522 role: input.role,
2523 content: input.content,
2524 })
2525 .collect();
2526 comparable.sort();
2527 comparable
2528 }
2529
2530 fn test_conversation(external_id: &str, body: &str) -> Conversation {
2531 test_conversation_fixture(
2532 external_id,
2533 vec![Message {
2534 id: None,
2535 idx: 0,
2536 role: MessageRole::User,
2537 author: None,
2538 created_at: Some(1_700_000_000_500),
2539 content: body.to_string(),
2540 extra_json: json!({}),
2541 snippets: Vec::new(),
2542 }],
2543 "local",
2544 None,
2545 )
2546 }
2547
2548 fn test_conversation_with_messages(external_id: &str, messages: Vec<Message>) -> Conversation {
2549 test_conversation_fixture(external_id, messages, "remote-laptop", Some("builder-host"))
2550 }
2551
2552 fn test_conversation_fixture(
2553 external_id: &str,
2554 messages: Vec<Message>,
2555 source_id: &str,
2556 origin_host: Option<&str>,
2557 ) -> Conversation {
2558 Conversation {
2559 id: None,
2560 agent_slug: "codex".to_string(),
2561 workspace: None,
2562 external_id: Some(external_id.to_string()),
2563 title: Some(format!("semantic {external_id}")),
2564 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2565 started_at: Some(1_700_000_000_000),
2566 ended_at: Some(1_700_000_001_000),
2567 approx_tokens: None,
2568 metadata_json: json!({}),
2569 messages,
2570 source_id: source_id.to_string(),
2571 origin_host: origin_host.map(str::to_string),
2572 }
2573 }
2574
2575 fn default_scheduler_signals() -> SemanticBackfillSchedulerSignals {
2576 SemanticBackfillSchedulerSignals {
2577 foreground_pressure: false,
2578 lexical_repair_active: false,
2579 force: false,
2580 operator_disabled: false,
2581 }
2582 }
2583
2584 struct EnvVarGuard {
2585 key: &'static str,
2586 prior: Option<String>,
2587 }
2588
2589 impl EnvVarGuard {
2590 fn set(key: &'static str, value: &str) -> Self {
2591 let prior = std::env::var(key).ok();
2592 unsafe {
2594 std::env::set_var(key, value);
2595 }
2596 Self { key, prior }
2597 }
2598
2599 fn remove(key: &'static str) -> Self {
2600 let prior = std::env::var(key).ok();
2601 unsafe {
2603 std::env::remove_var(key);
2604 }
2605 Self { key, prior }
2606 }
2607 }
2608
2609 impl Drop for EnvVarGuard {
2610 fn drop(&mut self) {
2611 unsafe {
2613 match self.prior.as_deref() {
2614 Some(value) => std::env::set_var(self.key, value),
2615 None => std::env::remove_var(self.key),
2616 }
2617 }
2618 }
2619 }
2620
2621 #[test]
2622 fn semantic_backfill_scheduler_runs_and_scales_batch_under_idle_budget() {
2623 let policy = SemanticPolicy::compiled_defaults();
2624 let decision = semantic_backfill_scheduler_decision_for_capacity(
2625 &policy,
2626 64,
2627 &default_scheduler_signals(),
2628 80,
2629 );
2630
2631 assert!(decision.should_run());
2632 assert_eq!(decision.state, SemanticBackfillSchedulerState::Running);
2633 assert_eq!(
2634 decision.reason,
2635 SemanticBackfillSchedulerReason::IdleBudgetAvailable
2636 );
2637 assert_eq!(decision.scheduled_batch_conversations, 51);
2638 assert_eq!(decision.current_capacity_pct, 80);
2639 assert_eq!(decision.next_eligible_after_ms, 0);
2640 }
2641
2642 #[test]
2643 fn semantic_backfill_scheduler_reason_next_steps_are_stable() {
2644 for (reason, expected) in [
2645 (
2646 SemanticBackfillSchedulerReason::IdleBudgetAvailable,
2647 "background semantic backfill is within idle budgets",
2648 ),
2649 (
2650 SemanticBackfillSchedulerReason::OperatorDisabled,
2651 "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE",
2652 ),
2653 (
2654 SemanticBackfillSchedulerReason::PolicyDisabled,
2655 "semantic policy disables background semantic backfill",
2656 ),
2657 (
2658 SemanticBackfillSchedulerReason::ForegroundPressure,
2659 "foreground pressure is present; retry after the idle delay",
2660 ),
2661 (
2662 SemanticBackfillSchedulerReason::LexicalRepairActive,
2663 "lexical repair is active; semantic backfill is yielding",
2664 ),
2665 (
2666 SemanticBackfillSchedulerReason::CapacityBelowFloor,
2667 "machine responsiveness capacity is below the semantic backfill floor",
2668 ),
2669 (
2670 SemanticBackfillSchedulerReason::ThreadBudgetZero,
2671 "semantic backfill thread budget is zero",
2672 ),
2673 (
2674 SemanticBackfillSchedulerReason::BatchBudgetZero,
2675 "semantic backfill batch budget is zero",
2676 ),
2677 ] {
2678 assert_eq!(reason.next_step(), expected, "{reason:?}");
2679 }
2680 }
2681
2682 #[test]
2683 fn semantic_backfill_scheduler_yields_to_foreground_and_lexical_pressure() {
2684 let policy = SemanticPolicy::compiled_defaults();
2685 let foreground = SemanticBackfillSchedulerSignals {
2686 foreground_pressure: true,
2687 ..default_scheduler_signals()
2688 };
2689 let foreground_decision =
2690 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &foreground, 100);
2691 assert!(!foreground_decision.should_run());
2692 assert_eq!(
2693 foreground_decision.state,
2694 SemanticBackfillSchedulerState::Paused
2695 );
2696 assert_eq!(
2697 foreground_decision.reason,
2698 SemanticBackfillSchedulerReason::ForegroundPressure
2699 );
2700 assert_eq!(
2701 foreground_decision.next_eligible_after_ms,
2702 policy.idle_delay_seconds * 1000
2703 );
2704
2705 let lexical_repair = SemanticBackfillSchedulerSignals {
2706 lexical_repair_active: true,
2707 ..default_scheduler_signals()
2708 };
2709 let lexical_decision =
2710 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &lexical_repair, 100);
2711 assert!(!lexical_decision.should_run());
2712 assert_eq!(
2713 lexical_decision.state,
2714 SemanticBackfillSchedulerState::Paused
2715 );
2716 assert_eq!(
2717 lexical_decision.reason,
2718 SemanticBackfillSchedulerReason::LexicalRepairActive
2719 );
2720 }
2721
2722 #[test]
2723 fn semantic_backfill_scheduler_honors_policy_disable_and_force_override() {
2724 let mut policy = SemanticPolicy::compiled_defaults();
2725 policy.mode = crate::search::policy::SemanticMode::LexicalOnly;
2726
2727 let disabled = semantic_backfill_scheduler_decision_for_capacity(
2728 &policy,
2729 64,
2730 &default_scheduler_signals(),
2731 100,
2732 );
2733 assert!(!disabled.should_run());
2734 assert_eq!(disabled.state, SemanticBackfillSchedulerState::Disabled);
2735 assert_eq!(
2736 disabled.reason,
2737 SemanticBackfillSchedulerReason::PolicyDisabled
2738 );
2739
2740 let forced = SemanticBackfillSchedulerSignals {
2741 force: true,
2742 ..default_scheduler_signals()
2743 };
2744 let forced_decision =
2745 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &forced, 100);
2746 assert!(forced_decision.should_run());
2747 assert_eq!(
2748 forced_decision.reason,
2749 SemanticBackfillSchedulerReason::IdleBudgetAvailable
2750 );
2751 assert!(forced_decision.forced);
2752 }
2753
2754 #[test]
2755 fn test_batch_embedding() {
2756 let indexer = SemanticIndexer::new("hash", None).unwrap();
2757 let messages = vec![
2758 EmbeddingInput::new(1, "Hello world"),
2759 EmbeddingInput::new(2, "Goodbye world"),
2760 ];
2761
2762 let embeddings = indexer.embed_messages(&messages).unwrap();
2763
2764 assert_eq!(embeddings.len(), 2);
2765 assert_eq!(embeddings[0].message_id, 1);
2766 assert_eq!(embeddings[1].message_id, 2);
2767 assert_eq!(embeddings[0].embedding.len(), indexer.embedder_dimension());
2768 }
2769
2770 #[test]
2771 fn test_progress_indicator() {
2772 let indexer = SemanticIndexer::new("hash", None).unwrap();
2773 let messages: Vec<_> = (0..1000)
2774 .map(|i| EmbeddingInput::new(i as u64, format!("Message {}", i)))
2775 .collect();
2776
2777 let embeddings = indexer.embed_messages(&messages).unwrap();
2778 assert_eq!(embeddings.len(), messages.len());
2779 }
2780
2781 #[test]
2782 fn test_build_and_save_index() {
2783 let indexer = SemanticIndexer::new("hash", None).unwrap();
2784 let messages = vec![
2785 EmbeddingInput::new(1, "Hello world"),
2786 EmbeddingInput::new(2, "Goodbye world"),
2787 ];
2788
2789 let embeddings = indexer.embed_messages(&messages).unwrap();
2790 let tmp = tempdir().unwrap();
2791 let index = indexer
2792 .build_and_save_index(embeddings, tmp.path())
2793 .unwrap();
2794 assert_eq!(index.embedder_id(), indexer.embedder_id());
2795 assert_eq!(index.dimension(), indexer.embedder_dimension());
2796 assert_eq!(index.record_count(), 2);
2797 }
2798
2799 #[test]
2800 fn sharded_index_build_writes_sidecar_without_runtime_publish() {
2801 let indexer = SemanticIndexer::new("hash", None).unwrap();
2802 let messages: Vec<_> = (0..5)
2803 .map(|idx| EmbeddingInput::new(idx, format!("semantic shard message {idx}")))
2804 .collect();
2805 let embeddings = indexer.embed_messages(&messages).unwrap();
2806 let tmp = tempdir().unwrap();
2807
2808 let outcome = indexer
2809 .build_and_save_index_shards(
2810 embeddings,
2811 tmp.path(),
2812 SemanticShardBuildPlan {
2813 tier: TierKind::Fast,
2814 db_fingerprint: "db-fp-sharded-build".to_string(),
2815 model_revision: "hash".to_string(),
2816 total_conversations: 5,
2817 max_records_per_shard: 2,
2818 build_ann: false,
2819 },
2820 )
2821 .unwrap();
2822
2823 assert_eq!(outcome.shard_count, 3);
2824 assert_eq!(outcome.doc_count, 5);
2825 assert_eq!(outcome.total_conversations, 5);
2826 assert!(outcome.complete);
2827 assert_eq!(outcome.index_paths.len(), 3);
2828 for path in &outcome.index_paths {
2829 let shard = FsVectorIndex::open(path).unwrap();
2830 assert_eq!(shard.embedder_id(), indexer.embedder_id());
2831 assert!(shard.record_count() > 0);
2832 }
2833
2834 let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2835 let summary = shards.summary(TierKind::Fast, indexer.embedder_id(), "db-fp-sharded-build");
2836 assert!(summary.complete);
2837 assert_eq!(summary.ready_shards, 3);
2838 assert_eq!(summary.ann_ready_shards, 0);
2839 assert_eq!(summary.doc_count, 5);
2840 assert_eq!(summary.total_conversations, 5);
2841
2842 assert!(
2843 SemanticManifest::load(tmp.path()).unwrap().is_none(),
2844 "sidecar shards must not publish the main runtime manifest"
2845 );
2846 assert!(!vector_index_path(tmp.path(), indexer.embedder_id()).exists());
2847 }
2848
2849 #[test]
2850 fn sharded_index_build_rejects_zero_sized_shards() {
2851 let indexer = SemanticIndexer::new("hash", None).unwrap();
2852 let err = indexer
2853 .build_and_save_index_shards(
2854 std::iter::empty(),
2855 tempdir().unwrap().path(),
2856 SemanticShardBuildPlan {
2857 tier: TierKind::Fast,
2858 db_fingerprint: "db-fp-sharded-build".to_string(),
2859 model_revision: "hash".to_string(),
2860 total_conversations: 0,
2861 max_records_per_shard: 0,
2862 build_ann: false,
2863 },
2864 )
2865 .unwrap_err();
2866
2867 assert!(err.to_string().contains("max_records_per_shard > 0"));
2868 }
2869
2870 #[test]
2871 fn sharded_ann_build_records_per_shard_accelerators() {
2872 let indexer = SemanticIndexer::new("hash", None).unwrap();
2873 let messages: Vec<_> = (0..8)
2874 .map(|idx| EmbeddingInput::new(idx, format!("semantic ann shard message {idx}")))
2875 .collect();
2876 let embeddings = indexer.embed_messages(&messages).unwrap();
2877 let tmp = tempdir().unwrap();
2878
2879 let outcome = indexer
2880 .build_and_save_index_shards(
2881 embeddings,
2882 tmp.path(),
2883 SemanticShardBuildPlan {
2884 tier: TierKind::Fast,
2885 db_fingerprint: "db-fp-sharded-ann-build".to_string(),
2886 model_revision: "hash".to_string(),
2887 total_conversations: 8,
2888 max_records_per_shard: 4,
2889 build_ann: true,
2890 },
2891 )
2892 .unwrap();
2893
2894 assert_eq!(outcome.shard_count, 2);
2895 assert_eq!(outcome.ann_index_paths.len(), 2);
2896 for path in &outcome.ann_index_paths {
2897 assert!(path.exists(), "ANN shard missing at {}", path.display());
2898 }
2899
2900 let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2901 let summary = shards.summary(
2902 TierKind::Fast,
2903 indexer.embedder_id(),
2904 "db-fp-sharded-ann-build",
2905 );
2906 assert!(summary.complete);
2907 assert_eq!(summary.ann_ready_shards, 2);
2908 assert!(summary.ann_size_bytes > 0);
2909 assert!(
2910 shards
2911 .shards
2912 .iter()
2913 .all(|record| record.ann_index_path.is_some() && record.ann_ready)
2914 );
2915 }
2916
2917 #[test]
2924 fn embed_messages_golden_digest_hash_embedder() {
2925 use ring::digest::{Context, SHA256};
2926
2927 let corpus: Vec<EmbeddingInput> = (0..64)
2928 .map(|i| {
2929 let body = match i % 5 {
2930 0 => format!("plain text message number {i}"),
2931 1 => format!("**bold** line {i} with _emphasis_"),
2932 2 => format!("```rust\nfn f_{i}() {{ println!(\"{i}\"); }}\n```"),
2933 3 => format!(" whitespace {i} "),
2934 _ => format!("unicode \u{00E9}\u{0301} + emoji \u{1F600} {i}"),
2935 };
2936 EmbeddingInput::new(i as u64, body)
2937 })
2938 .collect();
2939
2940 let indexer = SemanticIndexer::new("hash", None)
2941 .unwrap()
2942 .with_batch_size(16)
2943 .unwrap();
2944 let embeddings = indexer.embed_messages(&corpus).unwrap();
2945
2946 let mut ctx = Context::new(&SHA256);
2950 for em in &embeddings {
2951 ctx.update(&em.message_id.to_le_bytes());
2952 ctx.update(&em.content_hash);
2953 for v in &em.embedding {
2954 ctx.update(&v.to_le_bytes());
2955 }
2956 }
2957 let digest = hex::encode(ctx.finish().as_ref());
2958
2959 const EXPECTED: &str = "22d9ae7076925a4b70a194b0f519dfb1d465cc757368c296ef24055a02038c2c";
2966 assert_eq!(
2967 digest, EXPECTED,
2968 "embed_messages golden digest drifted; if this was intentional, \
2969 update EXPECTED in this test and record the reason in the commit message"
2970 );
2971 }
2972
2973 #[test]
2974 fn parallel_prep_matches_serial_prep_bitwise() {
2975 let inputs: Vec<EmbeddingInput> = (0..500)
2978 .map(|i| {
2979 let text = match i % 7 {
2980 0 => format!("Plain message number {i} with some ordinary words."),
2981 1 => format!("**Bold** and _italic_ markdown line {i}"),
2982 2 => format!(
2983 "```rust\nfn example_{i}() {{\n println!(\"code block {i}\");\n}}\n```\nfollow-up text"
2984 ),
2985 3 => String::new(), 4 => format!(" whitespace galore {i} "),
2987 5 => format!("Unicode \u{00E9}\u{0301} (combining accent) and emoji \u{1F600} line {i}"),
2988 _ => format!(
2989 "Mixed line {i}: `inline_code`, [link](http://x), {{braces}}, and \u{201C}curly quotes\u{201D}."
2990 ),
2991 };
2992 EmbeddingInput::new(i as u64, text)
2993 })
2994 .collect();
2995
2996 let serial = prepare_window(&inputs, true);
2997 let parallel = prepare_window(&inputs, false);
2998
2999 assert_eq!(
3000 serial.len(),
3001 parallel.len(),
3002 "serial and parallel prep should skip the same number of empty canonicals"
3003 );
3004
3005 for (s, p) in serial.iter().zip(parallel.iter()) {
3006 assert_eq!(
3007 s.msg.message_id, p.msg.message_id,
3008 "ordering must be preserved between serial and parallel prep"
3009 );
3010 assert_eq!(
3011 s.canonical, p.canonical,
3012 "canonical form diverged between serial and parallel prep"
3013 );
3014 assert_eq!(
3015 s.hash, p.hash,
3016 "content hash diverged between serial and parallel prep"
3017 );
3018 }
3019 }
3020
3021 #[test]
3022 fn parallel_prep_filters_empty_canonicals() {
3023 let inputs = vec![
3024 EmbeddingInput::new(1, "valid content"),
3025 EmbeddingInput::new(2, ""),
3026 EmbeddingInput::new(3, " \n\n \t "),
3027 EmbeddingInput::new(4, "more valid content"),
3028 ];
3029
3030 let prepared = prepare_window(&inputs, false);
3031 let ids: Vec<u64> = prepared.iter().map(|p| p.msg.message_id).collect();
3032
3033 assert!(ids.contains(&1));
3034 assert!(ids.contains(&4));
3035 assert!(!ids.contains(&2));
3037 assert!(!ids.contains(&3));
3038 }
3039
3040 #[test]
3041 fn memoized_serial_prep_matches_stateless_prepare_window() {
3042 let inputs = vec![
3043 EmbeddingInput::new(1, "repeat me exactly"),
3044 EmbeddingInput::new(2, "repeat me exactly"),
3045 EmbeddingInput::new(3, "unique payload"),
3046 EmbeddingInput::new(4, ""),
3047 EmbeddingInput::new(5, "repeat me exactly"),
3048 ];
3049
3050 let baseline = prepare_window(&inputs, true);
3051 let mut cache = ContentAddressedMemoCache::with_capacity(16);
3052 let memoized = prepare_window_with_memo(&inputs, &mut cache);
3053
3054 assert_eq!(baseline.len(), memoized.len());
3055 for (plain, cached) in baseline.iter().zip(memoized.iter()) {
3056 assert_eq!(plain.msg.message_id, cached.msg.message_id);
3057 assert_eq!(plain.canonical, cached.canonical);
3058 assert_eq!(plain.hash, cached.hash);
3059 }
3060 }
3061
3062 #[test]
3063 fn semantic_prep_memo_key_uses_stable_content_hash_bytes() {
3064 let key = semantic_prep_memo_key("repeat me exactly");
3065 let expected = content_hash("repeat me exactly");
3066
3067 assert_eq!(key.content_hash.as_bytes(), expected.as_slice());
3068 assert_eq!(key.content_hash.as_bytes().len(), expected.len());
3069 assert_eq!(key.algorithm, SEMANTIC_PREP_MEMO_ALGORITHM);
3070 assert_eq!(key.algorithm_version, SEMANTIC_PREP_MEMO_VERSION);
3071 }
3072
3073 #[test]
3074 fn memoized_serial_prep_reuses_duplicate_content_across_windows() {
3075 let inputs = vec![
3076 EmbeddingInput::new(1, "repeat me exactly"),
3077 EmbeddingInput::new(2, "repeat me exactly"),
3078 EmbeddingInput::new(3, "unique payload"),
3079 EmbeddingInput::new(4, ""),
3080 EmbeddingInput::new(5, "repeat me exactly"),
3081 ];
3082
3083 let mut cache = ContentAddressedMemoCache::with_capacity(16);
3084 let prepared = prepare_window_with_memo(&inputs, &mut cache);
3085 let stats = cache.stats().clone();
3086
3087 assert_eq!(prepared.len(), 4);
3088 assert_eq!(stats.hits, 2);
3089 assert_eq!(stats.misses, 3);
3090 assert_eq!(stats.inserts, 2);
3091 assert_eq!(stats.live_entries, 2);
3092 }
3093
3094 #[test]
3095 fn packet_embedding_inputs_reuse_memoized_prep_for_duplicate_content() -> Result<()> {
3096 let temp = tempdir().unwrap();
3097 let db_path = temp.path().join("agent_search.db");
3098 let storage = FrankenStorage::open(&db_path)?;
3099 let agent_id = storage.ensure_agent(&Agent {
3100 id: None,
3101 slug: "codex".to_string(),
3102 name: "Codex".to_string(),
3103 version: None,
3104 kind: AgentKind::Cli,
3105 })?;
3106
3107 storage.insert_conversation_tree(
3108 agent_id,
3109 None,
3110 &test_conversation_with_messages(
3111 "packet-memo-conv-one",
3112 vec![
3113 Message {
3114 id: None,
3115 idx: 0,
3116 role: MessageRole::User,
3117 author: None,
3118 created_at: Some(1_700_000_010_100),
3119 content: "shared semantic payload".to_string(),
3120 extra_json: json!({}),
3121 snippets: Vec::new(),
3122 },
3123 Message {
3124 id: None,
3125 idx: 1,
3126 role: MessageRole::Agent,
3127 author: None,
3128 created_at: Some(1_700_000_010_200),
3129 content: "unique semantic payload one".to_string(),
3130 extra_json: json!({}),
3131 snippets: Vec::new(),
3132 },
3133 ],
3134 ),
3135 )?;
3136 storage.insert_conversation_tree(
3137 agent_id,
3138 None,
3139 &test_conversation_with_messages(
3140 "packet-memo-conv-two",
3141 vec![
3142 Message {
3143 id: None,
3144 idx: 0,
3145 role: MessageRole::Tool,
3146 author: None,
3147 created_at: Some(1_700_000_010_300),
3148 content: "shared semantic payload".to_string(),
3149 extra_json: json!({}),
3150 snippets: Vec::new(),
3151 },
3152 Message {
3153 id: None,
3154 idx: 1,
3155 role: MessageRole::Agent,
3156 author: None,
3157 created_at: Some(1_700_000_010_400),
3158 content: "unique semantic payload two".to_string(),
3159 extra_json: json!({}),
3160 snippets: Vec::new(),
3161 },
3162 ],
3163 ),
3164 )?;
3165
3166 let packet_inputs = packet_embedding_inputs_from_storage(&storage)?;
3167 let mut cache = ContentAddressedMemoCache::with_capacity(16);
3168 let prepared = prepare_window_with_memo(&packet_inputs, &mut cache);
3169 let stats = cache.stats().clone();
3170
3171 assert_eq!(packet_inputs.len(), 4);
3172 assert_eq!(prepared.len(), 4);
3173 assert_eq!(
3174 semantic_prep_memo_key("shared semantic payload")
3175 .content_hash
3176 .as_bytes()
3177 .len(),
3178 32
3179 );
3180 assert_eq!(stats.hits, 1);
3181 assert_eq!(stats.misses, 3);
3182 assert_eq!(stats.inserts, 3);
3183 assert_eq!(stats.live_entries, 3);
3184 Ok(())
3185 }
3186
3187 #[test]
3188 fn backfill_batch_saves_checkpoint_and_staged_index_until_complete() {
3189 let temp = tempdir().unwrap();
3190 let mut manifest = SemanticManifest::default();
3191 let indexer = SemanticIndexer::new("hash", None).unwrap();
3192 let messages = vec![
3193 EmbeddingInput::new(10, "first staged semantic message"),
3194 EmbeddingInput::new(11, "second staged semantic message"),
3195 ];
3196
3197 let outcome = indexer
3198 .run_backfill_batch(
3199 &messages,
3200 temp.path(),
3201 &mut manifest,
3202 SemanticBackfillBatchPlan {
3203 tier: TierKind::Fast,
3204 db_fingerprint: "db-fp-backfill-partial".to_string(),
3205 model_revision: "hash".to_string(),
3206 total_conversations: 2,
3207 conversations_in_batch: 1,
3208 last_offset: 1,
3209 },
3210 )
3211 .unwrap();
3212
3213 assert!(!outcome.published);
3214 assert!(outcome.checkpoint_saved);
3215 assert!(outcome.index_path.exists());
3216 assert!(!vector_index_path(temp.path(), indexer.embedder_id()).exists());
3217 let checkpoint = manifest.checkpoint.as_ref().expect("checkpoint");
3218 assert_eq!(checkpoint.tier, TierKind::Fast);
3219 assert_eq!(checkpoint.conversations_processed, 1);
3220 assert_eq!(checkpoint.docs_embedded, 2);
3221 assert_eq!(manifest.backlog.total_conversations, 2);
3222 assert!(SemanticManifest::path(temp.path()).exists());
3223 }
3224
3225 #[test]
3226 fn backfill_batch_resumes_staged_index_and_publishes_manifest_atomically() {
3227 let temp = tempdir().unwrap();
3228 let mut manifest = SemanticManifest::default();
3229 let indexer = SemanticIndexer::new("hash", None).unwrap();
3230 let db_fingerprint = "db-fp-backfill-complete";
3231 let staging_path = semantic_staging_index_path(
3232 temp.path(),
3233 TierKind::Fast,
3234 indexer.embedder_id(),
3235 db_fingerprint,
3236 );
3237
3238 let first = vec![EmbeddingInput::new(20, "first resume batch")];
3239 let first_outcome = indexer
3240 .run_backfill_batch(
3241 &first,
3242 temp.path(),
3243 &mut manifest,
3244 SemanticBackfillBatchPlan {
3245 tier: TierKind::Fast,
3246 db_fingerprint: db_fingerprint.to_string(),
3247 model_revision: "hash".to_string(),
3248 total_conversations: 2,
3249 conversations_in_batch: 1,
3250 last_offset: 1,
3251 },
3252 )
3253 .unwrap();
3254 assert_eq!(first_outcome.index_path, staging_path);
3255 assert!(staging_path.exists());
3256
3257 let second = vec![EmbeddingInput::new(21, "second resume batch")];
3258 let second_outcome = indexer
3259 .run_backfill_batch(
3260 &second,
3261 temp.path(),
3262 &mut manifest,
3263 SemanticBackfillBatchPlan {
3264 tier: TierKind::Fast,
3265 db_fingerprint: db_fingerprint.to_string(),
3266 model_revision: "hash".to_string(),
3267 total_conversations: 2,
3268 conversations_in_batch: 1,
3269 last_offset: 2,
3270 },
3271 )
3272 .unwrap();
3273
3274 assert!(second_outcome.published);
3275 assert!(!second_outcome.checkpoint_saved);
3276 assert!(!staging_path.exists());
3277 let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3278 assert_eq!(second_outcome.index_path, final_path);
3279 assert!(final_path.exists());
3280 assert!(manifest.checkpoint.is_none());
3281 let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3282 assert!(artifact.ready);
3283 assert_eq!(artifact.conversation_count, 2);
3284 assert_eq!(artifact.doc_count, 2);
3285 assert_eq!(manifest.backlog.fast_tier_processed, 2);
3286
3287 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
3288 assert!(loaded.checkpoint.is_none());
3289 assert!(loaded.fast_tier.as_ref().is_some_and(|record| record.ready));
3290 }
3291
3292 #[test]
3293 fn backfill_publish_compacts_resumed_wal_before_rename() {
3294 let temp = tempdir().unwrap();
3295 let mut manifest = SemanticManifest::default();
3296 let indexer = SemanticIndexer::new("hash", None).unwrap();
3297 let db_fingerprint = "db-fp-backfill-small-resume";
3298 let first: Vec<EmbeddingInput> = (0..20)
3299 .map(|idx| EmbeddingInput::new(100 + idx, format!("first batch message {idx}")))
3300 .collect();
3301
3302 let first_outcome = indexer
3303 .run_backfill_batch(
3304 &first,
3305 temp.path(),
3306 &mut manifest,
3307 SemanticBackfillBatchPlan {
3308 tier: TierKind::Fast,
3309 db_fingerprint: db_fingerprint.to_string(),
3310 model_revision: "hash".to_string(),
3311 total_conversations: 2,
3312 conversations_in_batch: 1,
3313 last_offset: 1,
3314 },
3315 )
3316 .unwrap();
3317 assert!(first_outcome.checkpoint_saved);
3318
3319 let second = vec![EmbeddingInput::new(200, "small final resume batch")];
3320 let second_outcome = indexer
3321 .run_backfill_batch(
3322 &second,
3323 temp.path(),
3324 &mut manifest,
3325 SemanticBackfillBatchPlan {
3326 tier: TierKind::Fast,
3327 db_fingerprint: db_fingerprint.to_string(),
3328 model_revision: "hash".to_string(),
3329 total_conversations: 2,
3330 conversations_in_batch: 1,
3331 last_offset: 2,
3332 },
3333 )
3334 .unwrap();
3335
3336 assert!(second_outcome.published);
3337 let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3338 let mut final_wal_path = final_path.as_os_str().to_os_string();
3339 final_wal_path.push(".wal");
3340 assert!(!PathBuf::from(final_wal_path).exists());
3341
3342 let published_index = FsVectorIndex::open(&final_path).unwrap();
3343 assert_eq!(published_index.record_count(), 21);
3344 let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3345 assert_eq!(artifact.doc_count, 21);
3346 assert_eq!(artifact.conversation_count, 2);
3347 }
3348
3349 #[test]
3350 fn backfill_from_storage_fetches_canonical_batches_and_resumes() -> Result<()> {
3351 let temp = tempdir().unwrap();
3352 let db_path = temp.path().join("agent_search.db");
3353 let storage = FrankenStorage::open(&db_path)?;
3354 let agent_id = storage.ensure_agent(&Agent {
3355 id: None,
3356 slug: "codex".to_string(),
3357 name: "Codex".to_string(),
3358 version: None,
3359 kind: AgentKind::Cli,
3360 })?;
3361 storage.insert_conversation_tree(
3362 agent_id,
3363 None,
3364 &test_conversation("first", "first canonical semantic message"),
3365 )?;
3366 storage.insert_conversation_tree(
3367 agent_id,
3368 None,
3369 &test_conversation("second", "second canonical semantic message"),
3370 )?;
3371
3372 let mut manifest = SemanticManifest::default();
3373 let indexer = SemanticIndexer::new("hash", None)?;
3374
3375 let first = indexer.run_backfill_from_storage(
3376 &storage,
3377 temp.path(),
3378 &mut manifest,
3379 SemanticBackfillStoragePlan {
3380 tier: TierKind::Fast,
3381 db_fingerprint: "canonical-db-fp".to_string(),
3382 model_revision: "hash".to_string(),
3383 max_conversations: 1,
3384 },
3385 )?;
3386 assert!(!first.published);
3387 assert!(first.checkpoint_saved);
3388 assert_eq!(first.conversations_processed, 1);
3389 assert_eq!(first.total_conversations, 2);
3390 assert_eq!(first.embedded_docs, 1);
3391 assert!(first.last_offset > 0);
3392
3393 let second = indexer.run_backfill_from_storage(
3394 &storage,
3395 temp.path(),
3396 &mut manifest,
3397 SemanticBackfillStoragePlan {
3398 tier: TierKind::Fast,
3399 db_fingerprint: "canonical-db-fp".to_string(),
3400 model_revision: "hash".to_string(),
3401 max_conversations: 1,
3402 },
3403 )?;
3404 assert!(second.published);
3405 assert!(!second.checkpoint_saved);
3406 assert_eq!(second.conversations_processed, 2);
3407 assert_eq!(second.embedded_docs, 1);
3408 assert!(manifest.checkpoint.is_none());
3409 assert_eq!(
3410 manifest.fast_tier.as_ref().map(|record| record.doc_count),
3411 Some(2)
3412 );
3413 Ok(())
3414 }
3415
3416 #[test]
3417 fn canonical_embedding_batch_uses_conversation_packet_semantic_projection() -> Result<()> {
3418 let temp = tempdir().unwrap();
3419 let db_path = temp.path().join("agent_search.db");
3420 let storage = FrankenStorage::open(&db_path)?;
3421 let agent_id = storage.ensure_agent(&Agent {
3422 id: None,
3423 slug: "codex".to_string(),
3424 name: "Codex".to_string(),
3425 version: None,
3426 kind: AgentKind::Cli,
3427 })?;
3428 storage.insert_conversation_tree(
3429 agent_id,
3430 None,
3431 &test_conversation_with_messages(
3432 "packet-projection",
3433 vec![
3434 Message {
3435 id: None,
3436 idx: 0,
3437 role: MessageRole::User,
3438 author: None,
3439 created_at: Some(1_700_000_000_500),
3440 content: "user semantic text".to_string(),
3441 extra_json: json!({}),
3442 snippets: Vec::new(),
3443 },
3444 Message {
3445 id: None,
3446 idx: 1,
3447 role: MessageRole::Tool,
3448 author: None,
3449 created_at: Some(1_700_000_000_600),
3450 content: "tool semantic text".to_string(),
3451 extra_json: json!({}),
3452 snippets: Vec::new(),
3453 },
3454 Message {
3455 id: None,
3456 idx: 2,
3457 role: MessageRole::System,
3458 author: None,
3459 created_at: Some(1_700_000_000_700),
3460 content: String::new(),
3461 extra_json: json!({}),
3462 snippets: Vec::new(),
3463 },
3464 ],
3465 ),
3466 )?;
3467
3468 let batch = fetch_canonical_embedding_batch(&storage, 0, 1)?;
3469
3470 assert_eq!(batch.conversations_in_batch, 1);
3471 assert_eq!(batch.inputs.len(), 2);
3472 assert_eq!(batch.inputs[0].content, "user semantic text");
3473 assert_eq!(batch.inputs[1].content, "tool semantic text");
3474 assert_eq!(batch.inputs[0].role, role_code_from_str("user").unwrap());
3475 assert_eq!(batch.inputs[1].role, role_code_from_str("tool").unwrap());
3476 let normalized_source_id =
3477 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3478 let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3479 assert_eq!(batch.inputs[0].source_id, expected_hash);
3480 assert_eq!(batch.inputs[1].source_id, expected_hash);
3481 Ok(())
3482 }
3483
3484 #[test]
3485 fn canonical_embedding_batch_pushes_last_message_id_filter_into_selection() -> Result<()> {
3486 let temp = tempdir().unwrap();
3487 let db_path = temp.path().join("agent_search.db");
3488 let storage = FrankenStorage::open(&db_path)?;
3489 let agent_id = storage.ensure_agent(&Agent {
3490 id: None,
3491 slug: "codex".to_string(),
3492 name: "Codex".to_string(),
3493 version: None,
3494 kind: AgentKind::Cli,
3495 })?;
3496 storage.insert_conversation_tree(
3497 agent_id,
3498 None,
3499 &test_conversation("before-watermark", "old semantic message"),
3500 )?;
3501 let watermark: i64 = storage.raw().query_row_map(
3502 "SELECT MAX(id) FROM messages",
3503 &[] as &[ParamValue],
3504 |row| row.get_typed(0),
3505 )?;
3506 storage.insert_conversation_tree(
3507 agent_id,
3508 None,
3509 &test_conversation("after-watermark", "new semantic message"),
3510 )?;
3511
3512 let batch = fetch_canonical_embedding_batch_inner(&storage, 0, 8, Some(watermark))?;
3513
3514 assert_eq!(
3515 batch.conversations_in_batch, 1,
3516 "last_message_id must be pushed into candidate selection so old conversations are not counted in the batch"
3517 );
3518 assert_eq!(batch.total_conversations, 2);
3519 assert_eq!(batch.inputs.len(), 1);
3520 assert_eq!(batch.inputs[0].content, "new semantic message");
3521 assert!(
3522 i64::try_from(batch.inputs[0].message_id).unwrap_or(i64::MAX) > watermark,
3523 "selected semantic input must be strictly newer than the checkpoint message id"
3524 );
3525 Ok(())
3526 }
3527
3528 #[test]
3529 fn checkpoint_caps_stop_at_whole_conversation_boundary() -> Result<()> {
3530 let temp = tempdir().unwrap();
3531 let db_path = temp.path().join("agent_search.db");
3532 let storage = FrankenStorage::open(&db_path)?;
3533 let agent_id = storage.ensure_agent(&Agent {
3534 id: None,
3535 slug: "codex".to_string(),
3536 name: "Codex".to_string(),
3537 version: None,
3538 kind: AgentKind::Cli,
3539 })?;
3540 for external_id in ["cap-first", "cap-second", "cap-third"] {
3541 storage.insert_conversation_tree(
3542 agent_id,
3543 None,
3544 &test_conversation_with_messages(
3545 external_id,
3546 vec![
3547 Message {
3548 id: None,
3549 idx: 0,
3550 role: MessageRole::User,
3551 author: None,
3552 created_at: Some(1_700_000_000_500),
3553 content: format!("{external_id} user semantic text"),
3554 extra_json: json!({}),
3555 snippets: Vec::new(),
3556 },
3557 Message {
3558 id: None,
3559 idx: 1,
3560 role: MessageRole::Agent,
3561 author: None,
3562 created_at: Some(1_700_000_000_600),
3563 content: format!("{external_id} assistant semantic text"),
3564 extra_json: json!({}),
3565 snippets: Vec::new(),
3566 },
3567 ],
3568 ),
3569 )?;
3570 }
3571
3572 let first_conversation_id: i64 = storage.raw().query_row_map(
3573 "SELECT MIN(id) FROM conversations",
3574 &[] as &[ParamValue],
3575 |row| row.get_typed(0),
3576 )?;
3577 let batch = fetch_canonical_embedding_batch_inner_with_caps(
3578 &storage,
3579 0,
3580 8,
3581 None,
3582 SemanticCheckpointCaps {
3583 max_messages: 3,
3584 max_bytes: 0,
3585 },
3586 )?;
3587
3588 assert_eq!(batch.conversations_in_batch, 1);
3589 assert_eq!(batch.last_conversation_id, first_conversation_id);
3590 assert_eq!(batch.inputs.len(), 2);
3591 assert!(
3592 batch
3593 .inputs
3594 .iter()
3595 .all(|input| input.content.contains("cap-first"))
3596 );
3597 assert_eq!(batch.total_conversations, 3);
3598 Ok(())
3599 }
3600
3601 #[test]
3602 fn packet_embedding_inputs_from_storage_since_only_emits_new_canonical_messages() -> Result<()>
3603 {
3604 let temp = tempdir().unwrap();
3605 let db_path = temp.path().join("agent_search.db");
3606 let storage = FrankenStorage::open(&db_path)?;
3607 let agent_id = storage.ensure_agent(&Agent {
3608 id: None,
3609 slug: "codex".to_string(),
3610 name: "Codex".to_string(),
3611 version: None,
3612 kind: AgentKind::Cli,
3613 })?;
3614 storage.insert_conversation_tree(
3615 agent_id,
3616 None,
3617 &test_conversation_with_messages(
3618 "packet-delta",
3619 vec![
3620 Message {
3621 id: None,
3622 idx: 0,
3623 role: MessageRole::User,
3624 author: None,
3625 created_at: Some(1_700_000_000_500),
3626 content: "existing semantic text".to_string(),
3627 extra_json: json!({}),
3628 snippets: Vec::new(),
3629 },
3630 Message {
3631 id: None,
3632 idx: 1,
3633 role: MessageRole::Agent,
3634 author: None,
3635 created_at: Some(1_700_000_000_600),
3636 content: "existing assistant text".to_string(),
3637 extra_json: json!({}),
3638 snippets: Vec::new(),
3639 },
3640 ],
3641 ),
3642 )?;
3643 let watermark: i64 = storage.raw().query_row_map(
3644 "SELECT MAX(id) FROM messages",
3645 &[] as &[ParamValue],
3646 |row| row.get_typed(0),
3647 )?;
3648
3649 storage.insert_conversation_tree(
3650 agent_id,
3651 None,
3652 &test_conversation_with_messages(
3653 "packet-delta",
3654 vec![
3655 Message {
3656 id: None,
3657 idx: 0,
3658 role: MessageRole::User,
3659 author: None,
3660 created_at: Some(1_700_000_000_500),
3661 content: "existing semantic text".to_string(),
3662 extra_json: json!({}),
3663 snippets: Vec::new(),
3664 },
3665 Message {
3666 id: None,
3667 idx: 1,
3668 role: MessageRole::Agent,
3669 author: None,
3670 created_at: Some(1_700_000_000_600),
3671 content: "existing assistant text".to_string(),
3672 extra_json: json!({}),
3673 snippets: Vec::new(),
3674 },
3675 Message {
3676 id: None,
3677 idx: 2,
3678 role: MessageRole::Agent,
3679 author: None,
3680 created_at: Some(1_700_000_000_700),
3681 content: "new packet semantic text".to_string(),
3682 extra_json: json!({}),
3683 snippets: Vec::new(),
3684 },
3685 Message {
3686 id: None,
3687 idx: 3,
3688 role: MessageRole::System,
3689 author: None,
3690 created_at: Some(1_700_000_000_800),
3691 content: String::new(),
3692 extra_json: json!({}),
3693 snippets: Vec::new(),
3694 },
3695 ],
3696 ),
3697 )?;
3698
3699 let batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3700
3701 assert_eq!(batch.conversations_in_batch, 1);
3702 assert_eq!(batch.inputs.len(), 1);
3703 assert_eq!(batch.inputs[0].content, "new packet semantic text");
3704 assert_eq!(
3705 batch.inputs[0].role,
3706 role_code_from_str("assistant").unwrap()
3707 );
3708 let normalized_source_id =
3709 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3710 assert_eq!(
3711 batch.inputs[0].source_id,
3712 crc32fast::hash(normalized_source_id.as_bytes())
3713 );
3714 let expected_raw_max_id: i64 = storage.raw().query_row_map(
3715 "SELECT MAX(id) FROM messages",
3716 &[] as &[ParamValue],
3717 |row| row.get_typed(0),
3718 )?;
3719 assert_eq!(batch.raw_max_message_id, Some(expected_raw_max_id));
3720 Ok(())
3721 }
3722
3723 #[test]
3724 fn packet_catch_up_emits_expected_semantic_docs_after_watermark() -> Result<()> {
3725 let temp = tempdir().unwrap();
3726 let db_path = temp.path().join("agent_search.db");
3727 let storage = FrankenStorage::open(&db_path)?;
3728 let agent_id = storage.ensure_agent(&Agent {
3729 id: None,
3730 slug: "codex".to_string(),
3731 name: "Codex".to_string(),
3732 version: None,
3733 kind: AgentKind::Cli,
3734 })?;
3735 let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3736
3737 storage.insert_conversation_tree(
3738 agent_id,
3739 Some(workspace_id),
3740 &test_conversation_with_messages(
3741 "legacy-packet-semantics",
3742 vec![
3743 Message {
3744 id: None,
3745 idx: 0,
3746 role: MessageRole::User,
3747 author: None,
3748 created_at: Some(1_700_000_000_500),
3749 content: "before watermark".to_string(),
3750 extra_json: json!({}),
3751 snippets: Vec::new(),
3752 },
3753 Message {
3754 id: None,
3755 idx: 1,
3756 role: MessageRole::Agent,
3757 author: None,
3758 created_at: Some(1_700_000_000_600),
3759 content: "before watermark assistant".to_string(),
3760 extra_json: json!({}),
3761 snippets: Vec::new(),
3762 },
3763 ],
3764 ),
3765 )?;
3766
3767 let watermark: i64 = storage.raw().query_row_map(
3768 "SELECT MAX(id) FROM messages",
3769 &[] as &[ParamValue],
3770 |row| row.get_typed(0),
3771 )?;
3772
3773 storage.insert_conversation_tree(
3774 agent_id,
3775 Some(workspace_id),
3776 &test_conversation_with_messages(
3777 "legacy-packet-semantics",
3778 vec![
3779 Message {
3780 id: None,
3781 idx: 0,
3782 role: MessageRole::User,
3783 author: None,
3784 created_at: Some(1_700_000_000_500),
3785 content: "before watermark".to_string(),
3786 extra_json: json!({}),
3787 snippets: Vec::new(),
3788 },
3789 Message {
3790 id: None,
3791 idx: 1,
3792 role: MessageRole::Agent,
3793 author: None,
3794 created_at: Some(1_700_000_000_600),
3795 content: "before watermark assistant".to_string(),
3796 extra_json: json!({}),
3797 snippets: Vec::new(),
3798 },
3799 Message {
3800 id: None,
3801 idx: 2,
3802 role: MessageRole::Agent,
3803 author: None,
3804 created_at: Some(1_700_000_000_700),
3805 content: "after watermark assistant".to_string(),
3806 extra_json: json!({}),
3807 snippets: Vec::new(),
3808 },
3809 Message {
3810 id: None,
3811 idx: 3,
3812 role: MessageRole::System,
3813 author: None,
3814 created_at: Some(1_700_000_000_800),
3815 content: String::new(),
3816 extra_json: json!({}),
3817 snippets: Vec::new(),
3818 },
3819 ],
3820 ),
3821 )?;
3822 storage.insert_conversation_tree(
3823 agent_id,
3824 Some(workspace_id),
3825 &test_conversation_with_messages(
3826 "legacy-packet-semantics-second-conv",
3827 vec![
3828 Message {
3829 id: None,
3830 idx: 0,
3831 role: MessageRole::Tool,
3832 author: None,
3833 created_at: Some(1_700_000_000_900),
3834 content: "after watermark tool".to_string(),
3835 extra_json: json!({}),
3836 snippets: Vec::new(),
3837 },
3838 Message {
3839 id: None,
3840 idx: 1,
3841 role: MessageRole::System,
3842 author: None,
3843 created_at: Some(1_700_000_001_000),
3844 content: String::new(),
3845 extra_json: json!({}),
3846 snippets: Vec::new(),
3847 },
3848 ],
3849 ),
3850 )?;
3851
3852 let packet_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3853 let normalized_source_id =
3854 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3855 let source_id_hash = crc32fast::hash(normalized_source_id.as_bytes());
3856 let expected = vec![
3857 ComparableSemanticInput {
3858 message_id: u64::try_from(watermark + 1).unwrap(),
3859 created_at_ms: 1_700_000_000_700,
3860 agent_id: u32::try_from(agent_id).unwrap(),
3861 workspace_id: u32::try_from(workspace_id).unwrap(),
3862 source_id: source_id_hash,
3863 role: role_code_from_str("assistant").unwrap(),
3864 content: "after watermark assistant".to_string(),
3865 },
3866 ComparableSemanticInput {
3867 message_id: u64::try_from(watermark + 3).unwrap(),
3868 created_at_ms: 1_700_000_000_900,
3869 agent_id: u32::try_from(agent_id).unwrap(),
3870 workspace_id: u32::try_from(workspace_id).unwrap(),
3871 source_id: source_id_hash,
3872 role: role_code_from_str("tool").unwrap(),
3873 content: "after watermark tool".to_string(),
3874 },
3875 ];
3876
3877 assert_eq!(comparable_semantic_inputs(packet_batch.inputs), expected);
3878 assert_eq!(packet_batch.conversations_in_batch, 2);
3879 assert_eq!(packet_batch.raw_max_message_id, Some(watermark + 4));
3880 Ok(())
3881 }
3882
3883 #[test]
3884 fn packet_embedding_inputs_for_message_ids_matches_since_selection() -> Result<()> {
3885 let temp = tempdir().unwrap();
3886 let db_path = temp.path().join("agent_search.db");
3887 let storage = FrankenStorage::open(&db_path)?;
3888 let agent_id = storage.ensure_agent(&Agent {
3889 id: None,
3890 slug: "codex".to_string(),
3891 name: "Codex".to_string(),
3892 version: None,
3893 kind: AgentKind::Cli,
3894 })?;
3895 let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3896
3897 storage.insert_conversation_tree(
3898 agent_id,
3899 Some(workspace_id),
3900 &test_conversation_with_messages(
3901 "selected-vs-since",
3902 vec![
3903 Message {
3904 id: None,
3905 idx: 0,
3906 role: MessageRole::User,
3907 author: None,
3908 created_at: Some(1_700_000_100_100),
3909 content: "before watermark".to_string(),
3910 extra_json: json!({}),
3911 snippets: Vec::new(),
3912 },
3913 Message {
3914 id: None,
3915 idx: 1,
3916 role: MessageRole::Agent,
3917 author: None,
3918 created_at: Some(1_700_000_100_200),
3919 content: "before watermark assistant".to_string(),
3920 extra_json: json!({}),
3921 snippets: Vec::new(),
3922 },
3923 ],
3924 ),
3925 )?;
3926
3927 let watermark: i64 = storage.raw().query_row_map(
3928 "SELECT MAX(id) FROM messages",
3929 &[] as &[ParamValue],
3930 |row| row.get_typed(0),
3931 )?;
3932
3933 storage.insert_conversation_tree(
3934 agent_id,
3935 Some(workspace_id),
3936 &test_conversation_with_messages(
3937 "selected-vs-since",
3938 vec![
3939 Message {
3940 id: None,
3941 idx: 0,
3942 role: MessageRole::User,
3943 author: None,
3944 created_at: Some(1_700_000_100_100),
3945 content: "before watermark".to_string(),
3946 extra_json: json!({}),
3947 snippets: Vec::new(),
3948 },
3949 Message {
3950 id: None,
3951 idx: 1,
3952 role: MessageRole::Agent,
3953 author: None,
3954 created_at: Some(1_700_000_100_200),
3955 content: "before watermark assistant".to_string(),
3956 extra_json: json!({}),
3957 snippets: Vec::new(),
3958 },
3959 Message {
3960 id: None,
3961 idx: 2,
3962 role: MessageRole::Tool,
3963 author: None,
3964 created_at: Some(1_700_000_100_300),
3965 content: "after watermark tool".to_string(),
3966 extra_json: json!({}),
3967 snippets: Vec::new(),
3968 },
3969 Message {
3970 id: None,
3971 idx: 3,
3972 role: MessageRole::System,
3973 author: None,
3974 created_at: Some(1_700_000_100_400),
3975 content: String::new(),
3976 extra_json: json!({}),
3977 snippets: Vec::new(),
3978 },
3979 ],
3980 ),
3981 )?;
3982 storage.insert_conversation_tree(
3983 agent_id,
3984 Some(workspace_id),
3985 &test_conversation_with_messages(
3986 "selected-vs-since-second",
3987 vec![Message {
3988 id: None,
3989 idx: 0,
3990 role: MessageRole::Agent,
3991 author: None,
3992 created_at: Some(1_700_000_100_500),
3993 content: "after watermark assistant".to_string(),
3994 extra_json: json!({}),
3995 snippets: Vec::new(),
3996 }],
3997 ),
3998 )?;
3999
4000 let since_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
4001 let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
4002 "SELECT DISTINCT conversation_id
4003 FROM messages
4004 WHERE id > ?1
4005 ORDER BY conversation_id ASC",
4006 &[ParamValue::from(watermark)],
4007 |row| row.get_typed(0),
4008 )?;
4009 let selected_message_ids: HashSet<i64> = storage
4010 .raw()
4011 .query_map_collect(
4012 "SELECT id
4013 FROM messages
4014 WHERE id > ?1
4015 ORDER BY id ASC",
4016 &[ParamValue::from(watermark)],
4017 |row| row.get_typed(0),
4018 )?
4019 .into_iter()
4020 .collect();
4021 let selected_inputs = packet_embedding_inputs_from_storage_for_message_ids(
4022 &storage,
4023 &conversation_ids,
4024 &selected_message_ids,
4025 )?;
4026
4027 assert_eq!(
4028 comparable_semantic_inputs(selected_inputs),
4029 comparable_semantic_inputs(since_batch.inputs)
4030 );
4031 Ok(())
4032 }
4033
4034 #[test]
4035 fn default_batch_size_uses_new_value() {
4036 let _guard = EnvVarGuard::remove("CASS_SEMANTIC_BATCH_SIZE");
4039 let indexer = SemanticIndexer::new("hash", None).unwrap();
4040 assert_eq!(indexer.batch_size(), DEFAULT_SEMANTIC_BATCH_SIZE);
4041 }
4042
4043 #[test]
4044 fn semantic_watchdog_and_checkpoint_caps_have_derived_defaults() {
4045 let _warn = EnvVarGuard::remove("CASS_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS");
4046 let _fail = EnvVarGuard::remove("CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS");
4047 let _messages = EnvVarGuard::remove("CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT");
4048 let _bytes = EnvVarGuard::remove("CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT");
4049
4050 assert_eq!(
4051 resolved_semantic_embed_batch_warn_after_ms(),
4052 DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS
4053 );
4054 assert_eq!(
4055 resolved_semantic_embed_batch_fail_after_ms(),
4056 DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS
4057 );
4058 assert_eq!(
4059 SemanticCheckpointCaps::from_env(),
4060 SemanticCheckpointCaps {
4061 max_messages: DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT,
4062 max_bytes: DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT,
4063 }
4064 );
4065 }
4066
4067 #[test]
4068 fn parallel_prep_enabled_reuses_truthy_env_parser() {
4069 for (value, expected) in [
4070 ("1", true),
4071 ("true", true),
4072 (" YeS ", true),
4073 ("on", true),
4074 ("0", false),
4075 ("false", false),
4076 ("off", false),
4077 ] {
4078 let _guard = EnvVarGuard::set("CASS_SEMANTIC_PREP_PARALLEL", value);
4079 assert_eq!(parallel_prep_enabled(), expected, "env value {value:?}");
4080 }
4081
4082 let _guard = EnvVarGuard::remove("CASS_SEMANTIC_PREP_PARALLEL");
4083 assert!(!parallel_prep_enabled());
4084 }
4085
4086 #[test]
4087 fn saturating_u64_from_usize_covers_bounds() {
4088 assert_eq!(saturating_u64_from_usize(0), 0);
4089 assert_eq!(saturating_u64_from_usize(42), 42);
4090 assert_eq!(
4091 saturating_u64_from_usize(usize::MAX),
4092 u64::try_from(usize::MAX).unwrap_or(u64::MAX)
4093 );
4094 }
4095
4096 #[test]
4104 fn semantic_inputs_from_packets_matches_storage_replay() -> Result<()> {
4105 let temp = tempdir().unwrap();
4106 let db_path = temp.path().join("agent_search.db");
4107 let storage = FrankenStorage::open(&db_path)?;
4108
4109 let agent_id_codex = storage.ensure_agent(&Agent {
4110 id: None,
4111 slug: "codex".to_string(),
4112 name: "Codex".to_string(),
4113 version: None,
4114 kind: AgentKind::Cli,
4115 })?;
4116 let agent_id_claude = storage.ensure_agent(&Agent {
4117 id: None,
4118 slug: "claude_code".to_string(),
4119 name: "Claude Code".to_string(),
4120 version: None,
4121 kind: AgentKind::Cli,
4122 })?;
4123 let workspace_id =
4124 storage.ensure_workspace(Path::new("/tmp/semantic-equivalence-ws"), None)?;
4125
4126 storage.insert_conversation_tree(
4130 agent_id_codex,
4131 Some(workspace_id),
4132 &test_conversation_with_messages(
4133 "packet-equiv-1",
4134 vec![
4135 Message {
4136 id: None,
4137 idx: 0,
4138 role: MessageRole::User,
4139 author: None,
4140 created_at: Some(1_700_000_000_500),
4141 content: "first user prompt".to_string(),
4142 extra_json: json!({}),
4143 snippets: Vec::new(),
4144 },
4145 Message {
4146 id: None,
4147 idx: 1,
4148 role: MessageRole::Agent,
4149 author: None,
4150 created_at: Some(1_700_000_000_600),
4151 content: "first assistant reply".to_string(),
4152 extra_json: json!({}),
4153 snippets: Vec::new(),
4154 },
4155 Message {
4156 id: None,
4157 idx: 2,
4158 role: MessageRole::System,
4159 author: None,
4160 created_at: Some(1_700_000_000_700),
4161 content: String::new(),
4163 extra_json: json!({}),
4164 snippets: Vec::new(),
4165 },
4166 ],
4167 ),
4168 )?;
4169 storage.insert_conversation_tree(
4170 agent_id_claude,
4171 Some(workspace_id),
4172 &test_conversation_with_messages(
4173 "packet-equiv-2",
4174 vec![
4175 Message {
4176 id: None,
4177 idx: 0,
4178 role: MessageRole::Tool,
4179 author: Some("ripgrep".to_string()),
4180 created_at: Some(1_700_000_001_500),
4181 content: "tool output line".to_string(),
4182 extra_json: json!({}),
4183 snippets: Vec::new(),
4184 },
4185 Message {
4186 id: None,
4187 idx: 1,
4188 role: MessageRole::Agent,
4189 author: None,
4190 created_at: Some(1_700_000_001_600),
4191 content: "second assistant reply".to_string(),
4192 extra_json: json!({}),
4193 snippets: Vec::new(),
4194 },
4195 ],
4196 ),
4197 )?;
4198
4199 let storage_inputs = packet_embedding_inputs_from_storage(&storage)?;
4202
4203 let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
4209 "SELECT DISTINCT m.conversation_id
4210 FROM messages m
4211 JOIN conversations c ON c.id = m.conversation_id
4212 ORDER BY m.conversation_id ASC",
4213 &[] as &[ParamValue],
4214 |row| row.get_typed(0),
4215 )?;
4216 let envelopes = fetch_canonical_embedding_conversations(&storage, &conversation_ids)?;
4217 let mut grouped_messages =
4218 storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
4219 let mut packets: Vec<ConversationPacket> = Vec::with_capacity(envelopes.len());
4220 let mut contexts: Vec<SemanticPacketContext> = Vec::with_capacity(envelopes.len());
4221 for envelope in &envelopes {
4222 let messages = grouped_messages
4223 .remove(&envelope.conversation_id)
4224 .unwrap_or_default();
4225 let provenance = canonical_embedding_packet_provenance(envelope);
4226 let canonical = canonical_embedding_conversation(envelope, &provenance, messages);
4227 packets.push(ConversationPacket::from_canonical_replay(
4228 &canonical, provenance,
4229 ));
4230 contexts.push(SemanticPacketContext {
4231 conversation_id: envelope.conversation_id,
4232 agent_id: saturating_u32_from_i64(envelope.agent_id),
4233 workspace_id: saturating_u32_from_i64(envelope.workspace_id.unwrap_or(0)),
4234 });
4235 }
4236 let packet_inputs = semantic_inputs_from_packets(&packets, &contexts)?;
4237
4238 assert!(
4242 !storage_inputs.is_empty(),
4243 "fixture should produce non-empty semantic inputs (sanity)"
4244 );
4245 assert_eq!(
4246 comparable_semantic_inputs(storage_inputs.clone()),
4247 comparable_semantic_inputs(packet_inputs.clone()),
4248 "packet-driven semantic preparation must match storage replay byte-for-byte"
4249 );
4250
4251 let storage_count = storage_inputs.len();
4256 let packet_count = packet_inputs.len();
4257 assert_eq!(
4258 storage_count, packet_count,
4259 "storage and packet semantic input counts must agree exactly"
4260 );
4261 assert!(
4263 packet_inputs.iter().all(|input| !input.content.is_empty()),
4264 "empty content must be filtered by the packet semantic projection"
4265 );
4266 let normalized_source_id =
4268 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
4269 let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
4270 assert!(
4271 packet_inputs
4272 .iter()
4273 .all(|input| input.source_id == expected_hash),
4274 "every emitted EmbeddingInput must hash provenance via the packet's normalized source_id"
4275 );
4276
4277 Ok(())
4278 }
4279
4280 #[test]
4286 fn semantic_inputs_from_packets_rejects_length_mismatch() {
4287 let provenance = ConversationPacketProvenance::local();
4288 let canonical = test_conversation("packet-mismatch", "hello");
4289 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
4290 let result = semantic_inputs_from_packets(&[packet], &[]);
4291 assert!(
4292 result.is_err(),
4293 "expected error on packet/context length mismatch"
4294 );
4295 let err = result.unwrap_err().to_string();
4296 assert!(
4297 err.contains("length mismatch"),
4298 "error should mention length mismatch, got: {err}"
4299 );
4300 }
4301}