1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::io::IsTerminal;
4use std::path::{Path, PathBuf};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8use frankensearch::index::{
9 HNSW_DEFAULT_EF_CONSTRUCTION as FS_HNSW_DEFAULT_EF_CONSTRUCTION,
10 HNSW_DEFAULT_M as FS_HNSW_DEFAULT_M, HnswConfig as FsHnswConfig, HnswIndex as FsHnswIndex,
11 Quantization as FsQuantization, VectorIndex as FsVectorIndex,
12 VectorIndexWriter as FsVectorIndexWriter,
13};
14use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
15use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
16use rayon::prelude::*;
17
18use crate::indexer::memoization::{
19 ContentAddressedMemoCache, MemoCacheAuditRecord, MemoContentHash, MemoKey, MemoLookup,
20};
21use crate::indexer::responsiveness;
22use crate::indexer::semantic_progress::{
23 SemanticProgressEvent, SemanticProgressFields, SemanticProgressSink,
24};
25use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
26use crate::model::types::{Conversation, Message};
27use crate::search::canonicalize::{canonicalize_for_embedding, content_hash};
28use crate::search::embedder::Embedder;
29use crate::search::fastembed_embedder::FastEmbedder;
30use crate::search::hash_embedder::HashEmbedder;
31use crate::search::policy::{CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy};
32use crate::search::semantic_manifest::{
33 ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
34 TierKind,
35};
36use crate::search::tantivy::{
37 normalized_index_origin_host, normalized_index_origin_kind, normalized_index_source_id,
38};
39use crate::search::vector_index::{
40 ROLE_USER, SemanticDocId, VECTOR_INDEX_DIR, role_code_from_str, vector_index_path,
41};
42use crate::storage::sqlite::FrankenStorage;
43
44const DEFAULT_SEMANTIC_BATCH_SIZE: usize = 128;
49const DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY: usize = 4_096;
50const SEMANTIC_PREP_MEMO_ALGORITHM: &str = "semantic_prepare_window";
51const SEMANTIC_PREP_MEMO_VERSION: &str = "canonicalize_for_embedding:v2:stable-content-hash";
52
53fn resolved_default_batch_size() -> usize {
54 dotenvy::var("CASS_SEMANTIC_BATCH_SIZE")
55 .ok()
56 .and_then(|v| v.parse::<usize>().ok())
57 .filter(|v| *v > 0)
58 .unwrap_or(DEFAULT_SEMANTIC_BATCH_SIZE)
59}
60
61fn resolved_semantic_prep_memo_capacity() -> usize {
62 dotenvy::var("CASS_SEMANTIC_PREP_MEMO_CAPACITY")
63 .ok()
64 .and_then(|v| v.parse::<usize>().ok())
65 .filter(|v| *v > 0)
66 .unwrap_or(DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY)
67}
68
69fn parallel_prep_enabled() -> bool {
84 env_truthy("CASS_SEMANTIC_PREP_PARALLEL")
85}
86
87fn saturating_u64_from_usize(value: usize) -> u64 {
88 u64::try_from(value).unwrap_or(u64::MAX)
89}
90
91#[derive(Debug, Clone)]
92pub struct EmbeddingInput {
93 pub message_id: u64,
94 pub created_at_ms: i64,
95 pub agent_id: u32,
96 pub workspace_id: u32,
97 pub source_id: u32,
98 pub role: u8,
99 pub chunk_idx: u8,
100 pub content: String,
101}
102
103impl EmbeddingInput {
104 pub fn new(message_id: u64, content: impl Into<String>) -> Self {
105 Self {
106 message_id,
107 created_at_ms: 0,
108 agent_id: 0,
109 workspace_id: 0,
110 source_id: 0,
111 role: ROLE_USER,
112 chunk_idx: 0,
113 content: content.into(),
114 }
115 }
116}
117
118#[derive(Debug, Clone)]
119pub struct EmbeddedMessage {
120 pub message_id: u64,
121 pub created_at_ms: i64,
122 pub agent_id: u32,
123 pub workspace_id: u32,
124 pub source_id: u32,
125 pub role: u8,
126 pub chunk_idx: u8,
127 pub content_hash: [u8; 32],
128 pub embedding: Vec<f32>,
129}
130
131#[derive(Debug, Clone)]
132pub struct SemanticBackfillBatchPlan {
133 pub tier: TierKind,
134 pub db_fingerprint: String,
135 pub model_revision: String,
136 pub total_conversations: u64,
137 pub conversations_in_batch: u64,
138 pub last_offset: i64,
139}
140
141#[derive(Debug, Clone)]
142pub struct SemanticBackfillStoragePlan {
143 pub tier: TierKind,
144 pub db_fingerprint: String,
145 pub model_revision: String,
146 pub max_conversations: usize,
147}
148
149#[derive(Debug, Clone)]
150pub struct SemanticBackfillBatchOutcome {
151 pub tier: TierKind,
152 pub embedder_id: String,
153 pub embedded_docs: u64,
154 pub conversations_processed: u64,
155 pub total_conversations: u64,
156 pub last_offset: i64,
157 pub checkpoint_saved: bool,
158 pub published: bool,
159 pub index_path: PathBuf,
160 pub manifest_path: PathBuf,
161}
162
163#[derive(Debug, Clone)]
164pub struct SemanticShardBuildPlan {
165 pub tier: TierKind,
166 pub db_fingerprint: String,
167 pub model_revision: String,
168 pub total_conversations: u64,
169 pub max_records_per_shard: usize,
170 pub build_ann: bool,
171}
172
173#[derive(Debug, Clone)]
174pub struct SemanticShardBuildOutcome {
175 pub tier: TierKind,
176 pub embedder_id: String,
177 pub shard_count: u32,
178 pub doc_count: u64,
179 pub total_conversations: u64,
180 pub index_paths: Vec<PathBuf>,
181 pub ann_index_paths: Vec<PathBuf>,
182 pub shard_manifest_path: PathBuf,
183 pub complete: bool,
184}
185
186#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
187#[serde(rename_all = "snake_case")]
188pub(crate) enum SemanticBackfillSchedulerState {
189 Running,
190 Paused,
191 Disabled,
192}
193
194impl SemanticBackfillSchedulerState {
195 pub(crate) fn as_str(self) -> &'static str {
196 match self {
197 Self::Running => "running",
198 Self::Paused => "paused",
199 Self::Disabled => "disabled",
200 }
201 }
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
205#[serde(rename_all = "snake_case")]
206pub(crate) enum SemanticBackfillSchedulerReason {
207 IdleBudgetAvailable,
208 OperatorDisabled,
209 PolicyDisabled,
210 ForegroundPressure,
211 LexicalRepairActive,
212 CapacityBelowFloor,
213 ThreadBudgetZero,
214 BatchBudgetZero,
215}
216
217impl SemanticBackfillSchedulerReason {
218 pub(crate) fn next_step(self) -> &'static str {
219 match self {
220 Self::IdleBudgetAvailable => "background semantic backfill is within idle budgets",
221 Self::OperatorDisabled => {
222 "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE"
223 }
224 Self::PolicyDisabled => "semantic policy disables background semantic backfill",
225 Self::ForegroundPressure => {
226 "foreground pressure is present; retry after the idle delay"
227 }
228 Self::LexicalRepairActive => "lexical repair is active; semantic backfill is yielding",
229 Self::CapacityBelowFloor => {
230 "machine responsiveness capacity is below the semantic backfill floor"
231 }
232 Self::ThreadBudgetZero => "semantic backfill thread budget is zero",
233 Self::BatchBudgetZero => "semantic backfill batch budget is zero",
234 }
235 }
236}
237
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub(crate) struct SemanticBackfillSchedulerSignals {
240 pub foreground_pressure: bool,
241 pub lexical_repair_active: bool,
242 pub force: bool,
243 pub operator_disabled: bool,
244}
245
246impl SemanticBackfillSchedulerSignals {
247 pub(crate) fn from_env() -> Self {
248 Self {
249 foreground_pressure: env_truthy("CASS_SEMANTIC_BACKFILL_FOREGROUND_ACTIVE"),
250 lexical_repair_active: env_truthy("CASS_SEMANTIC_BACKFILL_LEXICAL_REPAIR_ACTIVE"),
251 force: env_truthy("CASS_SEMANTIC_BACKFILL_FORCE"),
252 operator_disabled: env_truthy("CASS_SEMANTIC_BACKFILL_DISABLE"),
253 }
254 }
255}
256
257#[derive(Debug, Clone, serde::Serialize)]
258pub(crate) struct SemanticBackfillSchedulerDecision {
259 pub state: SemanticBackfillSchedulerState,
260 pub reason: SemanticBackfillSchedulerReason,
261 pub requested_batch_conversations: usize,
262 pub scheduled_batch_conversations: usize,
263 pub current_capacity_pct: u32,
264 pub min_capacity_pct: u32,
265 pub max_backfill_threads: usize,
266 pub idle_delay_seconds: u64,
267 pub chunk_timeout_seconds: u64,
268 pub foreground_pressure: bool,
269 pub lexical_repair_active: bool,
270 pub forced: bool,
271 pub next_eligible_after_ms: u64,
272}
273
274impl SemanticBackfillSchedulerDecision {
275 pub(crate) fn should_run(&self) -> bool {
276 matches!(self.state, SemanticBackfillSchedulerState::Running)
277 }
278}
279
280fn env_truthy(key: &str) -> bool {
281 dotenvy::var(key)
282 .ok()
283 .map(|value| {
284 matches!(
285 value.trim().to_ascii_lowercase().as_str(),
286 "1" | "true" | "yes" | "on"
287 )
288 })
289 .unwrap_or(false)
290}
291
292fn env_backfill_min_capacity_pct() -> u32 {
293 dotenvy::var("CASS_SEMANTIC_BACKFILL_MIN_CAPACITY_PCT")
294 .ok()
295 .and_then(|value| value.trim().parse::<u32>().ok())
296 .map(|value| value.clamp(1, 100))
297 .unwrap_or(75)
298}
299
300pub(crate) fn semantic_backfill_scheduler_decision(
301 policy: &SemanticPolicy,
302 requested_batch_conversations: usize,
303 signals: &SemanticBackfillSchedulerSignals,
304) -> SemanticBackfillSchedulerDecision {
305 semantic_backfill_scheduler_decision_for_capacity(
306 policy,
307 requested_batch_conversations,
308 signals,
309 responsiveness::current_capacity_pct(),
310 )
311}
312
313pub(crate) fn semantic_backfill_scheduler_decision_for_capacity(
314 policy: &SemanticPolicy,
315 requested_batch_conversations: usize,
316 signals: &SemanticBackfillSchedulerSignals,
317 current_capacity_pct: u32,
318) -> SemanticBackfillSchedulerDecision {
319 let min_capacity_pct = env_backfill_min_capacity_pct();
320 let paused_delay_ms = policy.idle_delay_seconds.saturating_mul(1000);
321 let mut decision = SemanticBackfillSchedulerDecision {
322 state: SemanticBackfillSchedulerState::Running,
323 reason: SemanticBackfillSchedulerReason::IdleBudgetAvailable,
324 requested_batch_conversations,
325 scheduled_batch_conversations: requested_batch_conversations,
326 current_capacity_pct: current_capacity_pct.clamp(0, 100),
327 min_capacity_pct,
328 max_backfill_threads: policy.max_backfill_threads,
329 idle_delay_seconds: policy.idle_delay_seconds,
330 chunk_timeout_seconds: policy.chunk_timeout_seconds,
331 foreground_pressure: signals.foreground_pressure,
332 lexical_repair_active: signals.lexical_repair_active,
333 forced: signals.force,
334 next_eligible_after_ms: 0,
335 };
336
337 if requested_batch_conversations == 0 {
338 return stopped_scheduler_decision(
339 decision,
340 SemanticBackfillSchedulerState::Disabled,
341 SemanticBackfillSchedulerReason::BatchBudgetZero,
342 paused_delay_ms,
343 );
344 }
345 if policy.max_backfill_threads == 0 && !signals.force {
346 return stopped_scheduler_decision(
347 decision,
348 SemanticBackfillSchedulerState::Disabled,
349 SemanticBackfillSchedulerReason::ThreadBudgetZero,
350 paused_delay_ms,
351 );
352 }
353 if signals.operator_disabled && !signals.force {
354 return stopped_scheduler_decision(
355 decision,
356 SemanticBackfillSchedulerState::Disabled,
357 SemanticBackfillSchedulerReason::OperatorDisabled,
358 paused_delay_ms,
359 );
360 }
361 if !policy.mode.should_build_semantic() && !signals.force {
362 return stopped_scheduler_decision(
363 decision,
364 SemanticBackfillSchedulerState::Disabled,
365 SemanticBackfillSchedulerReason::PolicyDisabled,
366 paused_delay_ms,
367 );
368 }
369 if signals.lexical_repair_active && !signals.force {
370 return stopped_scheduler_decision(
371 decision,
372 SemanticBackfillSchedulerState::Paused,
373 SemanticBackfillSchedulerReason::LexicalRepairActive,
374 paused_delay_ms,
375 );
376 }
377 if signals.foreground_pressure && !signals.force {
378 return stopped_scheduler_decision(
379 decision,
380 SemanticBackfillSchedulerState::Paused,
381 SemanticBackfillSchedulerReason::ForegroundPressure,
382 paused_delay_ms,
383 );
384 }
385 if current_capacity_pct < min_capacity_pct && !signals.force {
386 return stopped_scheduler_decision(
387 decision,
388 SemanticBackfillSchedulerState::Paused,
389 SemanticBackfillSchedulerReason::CapacityBelowFloor,
390 paused_delay_ms,
391 );
392 }
393
394 let capacity = current_capacity_pct.clamp(1, 100) as usize;
395 let scaled = requested_batch_conversations.saturating_mul(capacity) / 100;
396 decision.scheduled_batch_conversations = scaled.max(1).min(requested_batch_conversations);
397 decision
398}
399
400fn stopped_scheduler_decision(
401 mut decision: SemanticBackfillSchedulerDecision,
402 state: SemanticBackfillSchedulerState,
403 reason: SemanticBackfillSchedulerReason,
404 next_eligible_after_ms: u64,
405) -> SemanticBackfillSchedulerDecision {
406 decision.state = state;
407 decision.reason = reason;
408 decision.scheduled_batch_conversations = 0;
409 decision.next_eligible_after_ms = next_eligible_after_ms;
410 decision
411}
412
413fn now_ms() -> i64 {
414 SystemTime::now()
415 .duration_since(UNIX_EPOCH)
416 .map(|duration| duration.as_millis() as i64)
417 .unwrap_or(0)
418}
419
420fn hnsw_index_path(data_dir: &Path, embedder_id: &str) -> PathBuf {
421 data_dir
422 .join(VECTOR_INDEX_DIR)
423 .join(format!("hnsw-{embedder_id}.chsw"))
424}
425
426fn safe_path_component(raw: &str) -> String {
427 raw.chars()
428 .map(|ch| {
429 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
430 ch
431 } else {
432 '_'
433 }
434 })
435 .collect()
436}
437
438fn semantic_staging_index_path(
439 data_dir: &Path,
440 tier: TierKind,
441 embedder_id: &str,
442 db_fingerprint: &str,
443) -> PathBuf {
444 let fingerprint_hash = crc32fast::hash(db_fingerprint.as_bytes());
445 data_dir.join(VECTOR_INDEX_DIR).join(format!(
446 ".staging-{}-{}-{fingerprint_hash:08x}.fsvi",
447 tier.as_str(),
448 safe_path_component(embedder_id)
449 ))
450}
451
452fn semantic_generation_fingerprint_component(db_fingerprint: &str) -> String {
453 blake3::hash(db_fingerprint.as_bytes())
454 .to_hex()
455 .chars()
456 .take(16)
457 .collect()
458}
459
460fn semantic_shard_generation_dir(
461 data_dir: &Path,
462 tier: TierKind,
463 embedder_id: &str,
464 db_fingerprint: &str,
465) -> PathBuf {
466 let fingerprint_hash = semantic_generation_fingerprint_component(db_fingerprint);
467 data_dir.join(VECTOR_INDEX_DIR).join("shards").join(format!(
468 "{}-{}-{fingerprint_hash}",
469 tier.as_str(),
470 safe_path_component(embedder_id),
471 ))
472}
473
474fn semantic_shard_index_path(
475 data_dir: &Path,
476 tier: TierKind,
477 embedder_id: &str,
478 db_fingerprint: &str,
479 shard_index: u32,
480) -> PathBuf {
481 semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
482 .join(format!("shard-{shard_index:05}.fsvi"))
483}
484
485fn semantic_shard_ann_index_path(
486 data_dir: &Path,
487 tier: TierKind,
488 embedder_id: &str,
489 db_fingerprint: &str,
490 shard_index: u32,
491) -> PathBuf {
492 semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
493 .join(format!("shard-{shard_index:05}.chsw"))
494}
495
496#[cfg(not(windows))]
497fn sync_parent_directory(path: &Path) -> Result<()> {
498 let Some(parent) = path.parent() else {
499 return Ok(());
500 };
501 let directory = fs::File::open(parent)
502 .with_context(|| format!("opening parent directory {}", parent.display()))?;
503 directory
504 .sync_all()
505 .with_context(|| format!("syncing parent directory {}", parent.display()))
506}
507
508#[cfg(windows)]
509fn sync_parent_directory(_path: &Path) -> Result<()> {
510 Ok(())
511}
512
513fn semantic_doc_id_for_embedded(embedded: &EmbeddedMessage) -> String {
514 SemanticDocId {
515 message_id: embedded.message_id,
516 chunk_idx: embedded.chunk_idx,
517 agent_id: embedded.agent_id,
518 workspace_id: embedded.workspace_id,
519 source_id: embedded.source_id,
520 role: embedded.role,
521 created_at_ms: embedded.created_at_ms,
522 content_hash: Some(embedded.content_hash),
523 }
524 .to_doc_id_string()
525}
526
527struct CanonicalEmbeddingConversationRow {
528 conversation_id: i64,
529 agent_slug: String,
530 agent_id: i64,
531 workspace: Option<PathBuf>,
532 workspace_id: Option<i64>,
533 external_id: Option<String>,
534 title: Option<String>,
535 source_path: PathBuf,
536 started_at: Option<i64>,
537 ended_at: Option<i64>,
538 source_id: Option<String>,
539 origin_host: Option<String>,
540}
541
542struct CanonicalEmbeddingBatch {
543 inputs: Vec<EmbeddingInput>,
544 conversations_in_batch: u64,
545 last_conversation_id: i64,
546 total_conversations: u64,
547}
548
549pub(crate) struct CanonicalIncrementalEmbeddingBatch {
550 pub inputs: Vec<EmbeddingInput>,
551 pub conversations_in_batch: u64,
552 pub raw_max_message_id: Option<i64>,
553}
554
555fn total_semantic_conversations(storage: &FrankenStorage) -> Result<u64> {
556 let hinted_fallback_sql = "SELECT c.id
557 FROM conversations c
558 WHERE EXISTS (
559 SELECT 1
560 FROM messages INDEXED BY sqlite_autoindex_messages_1
561 WHERE conversation_id = c.id
562 LIMIT 1
563 )";
564 let fallback_sql = "SELECT c.id
565 FROM conversations c
566 WHERE EXISTS (
567 SELECT 1
568 FROM messages
569 WHERE conversation_id = c.id
570 LIMIT 1
571 )";
572 let count: i64 = storage
573 .raw()
574 .query_row_map(
575 "SELECT COUNT(*) FROM conversations WHERE message_count > 0",
576 &[] as &[ParamValue],
577 |row| row.get_typed(0),
578 )
579 .or_else(|err| {
580 if !err.to_string().contains("no such column: message_count") {
581 return Err(err);
582 }
583 let conversation_ids: Vec<i64> = storage
584 .raw()
585 .query_map_collect(hinted_fallback_sql, &[] as &[ParamValue], |row| {
586 row.get_typed(0)
587 })
588 .or_else(|err| {
589 if err
590 .to_string()
591 .contains("no such index: sqlite_autoindex_messages_1")
592 {
593 return storage.raw().query_map_collect(
594 fallback_sql,
595 &[] as &[ParamValue],
596 |row| row.get_typed(0),
597 );
598 }
599 Err(err)
600 })?;
601 Ok(i64::try_from(conversation_ids.len()).unwrap_or(i64::MAX))
602 })
603 .with_context(|| "counting canonical conversations with semantic messages")?;
604 Ok(u64::try_from(count.max(0)).unwrap_or(u64::MAX))
605}
606
607pub(crate) fn message_id_from_db(raw: i64) -> Option<u64> {
608 u64::try_from(raw).ok()
609}
610
611pub(crate) fn saturating_u32_from_i64(raw: i64) -> u32 {
612 match u32::try_from(raw) {
613 Ok(value) => value,
614 Err(_) if raw.is_negative() => 0,
615 Err(_) => u32::MAX,
616 }
617}
618
619fn canonical_embedding_created_at_ms(message_id: u64, created_at: Option<i64>) -> i64 {
620 created_at.unwrap_or_else(|| {
628 tracing::warn!(
629 message_id,
630 "semantic backfill: row has NULL created_at; defaulting to 0 (Unix epoch). \
631 Downstream time-range filters will treat this message as 1970-01-01."
632 );
633 0
634 })
635}
636
637fn canonical_embedding_packet_provenance(
638 row: &CanonicalEmbeddingConversationRow,
639) -> ConversationPacketProvenance {
640 let source_id =
641 normalized_index_source_id(row.source_id.as_deref(), None, row.origin_host.as_deref());
642 ConversationPacketProvenance {
643 origin_kind: normalized_index_origin_kind(&source_id, None),
644 origin_host: normalized_index_origin_host(row.origin_host.as_deref()),
645 source_id,
646 }
647}
648
649fn canonical_embedding_conversation(
650 row: &CanonicalEmbeddingConversationRow,
651 provenance: &ConversationPacketProvenance,
652 messages: Vec<Message>,
653) -> Conversation {
654 Conversation {
655 id: Some(row.conversation_id),
656 agent_slug: row.agent_slug.clone(),
657 workspace: row.workspace.clone(),
658 external_id: row.external_id.clone(),
659 title: row.title.clone(),
660 source_path: row.source_path.clone(),
661 started_at: row.started_at,
662 ended_at: row.ended_at,
663 approx_tokens: None,
664 metadata_json: serde_json::Value::Null,
665 messages,
666 source_id: provenance.source_id.clone(),
667 origin_host: provenance.origin_host.clone(),
668 }
669}
670
671fn embedding_input_from_packet_message(
672 conversation_id: i64,
673 agent_id: u32,
674 workspace_id: u32,
675 source_id_hash: u32,
676 message: &crate::model::conversation_packet::ConversationPacketMessage,
677) -> Option<EmbeddingInput> {
678 let Some(raw_message_id) = message.message_id else {
679 tracing::warn!(
680 conversation_id,
681 message_idx = message.idx,
682 "skipping semantic backfill message without canonical id in ConversationPacket replay"
683 );
684 return None;
685 };
686 let Some(message_id) = message_id_from_db(raw_message_id) else {
687 tracing::warn!(
688 conversation_id,
689 raw_message_id,
690 "skipping out-of-range id during semantic backfill"
691 );
692 return None;
693 };
694 Some(EmbeddingInput {
695 message_id,
696 created_at_ms: canonical_embedding_created_at_ms(message_id, message.created_at),
697 agent_id,
698 workspace_id,
699 source_id: source_id_hash,
700 role: role_code_from_str(&message.role).unwrap_or(ROLE_USER),
701 chunk_idx: 0,
702 content: message.content.clone(),
703 })
704}
705
706fn embedding_inputs_from_conversation_packet(
707 row: &CanonicalEmbeddingConversationRow,
708 packet: &ConversationPacket,
709) -> Vec<EmbeddingInput> {
710 let agent_id = saturating_u32_from_i64(row.agent_id);
711 let workspace_id = saturating_u32_from_i64(row.workspace_id.unwrap_or(0));
712 let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
713 packet
714 .projections
715 .semantic
716 .message_indices
717 .iter()
718 .filter_map(|message_index| {
719 packet
720 .payload
721 .messages
722 .get(*message_index)
723 .and_then(|message| {
724 embedding_input_from_packet_message(
725 row.conversation_id,
726 agent_id,
727 workspace_id,
728 source_id_hash,
729 message,
730 )
731 })
732 })
733 .collect()
734}
735
736fn fetch_canonical_embedding_conversations(
737 storage: &FrankenStorage,
738 conversation_ids: &[i64],
739) -> Result<Vec<CanonicalEmbeddingConversationRow>> {
740 let mut envelope_sql = String::from(
741 "SELECT c.id,
742 COALESCE(a.slug, 'unknown'),
743 COALESCE(c.agent_id, 0),
744 c.workspace_id,
745 w.path,
746 c.external_id,
747 c.title,
748 c.source_path,
749 c.started_at,
750 c.ended_at,
751 c.source_id,
752 c.origin_host
753 FROM conversations c
754 LEFT JOIN agents a ON a.id = c.agent_id
755 LEFT JOIN workspaces w ON w.id = c.workspace_id
756 WHERE c.id IN (",
757 );
758 let mut params = Vec::with_capacity(conversation_ids.len());
759 for (idx, conversation_id) in conversation_ids.iter().enumerate() {
760 if idx > 0 {
761 envelope_sql.push_str(", ");
762 }
763 envelope_sql.push_str(&format!("?{}", idx + 1));
764 params.push(ParamValue::from(*conversation_id));
765 }
766 envelope_sql.push_str(") ORDER BY c.id ASC");
767
768 storage
769 .raw()
770 .query_map_collect(&envelope_sql, ¶ms, |row| {
771 let workspace_path: Option<String> = row.get_typed(4)?;
772 Ok(CanonicalEmbeddingConversationRow {
773 conversation_id: row.get_typed(0)?,
774 agent_slug: row.get_typed(1)?,
775 agent_id: row.get_typed(2)?,
776 workspace_id: row.get_typed(3)?,
777 workspace: workspace_path.map(PathBuf::from),
778 external_id: row.get_typed(5)?,
779 title: row.get_typed(6)?,
780 source_path: PathBuf::from(row.get_typed::<String>(7)?),
781 started_at: row.get_typed(8)?,
782 ended_at: row.get_typed(9)?,
783 source_id: row.get_typed(10)?,
784 origin_host: row.get_typed(11)?,
785 })
786 })
787 .with_context(|| {
788 format!(
789 "fetching semantic backfill conversation envelopes for {} conversations",
790 conversation_ids.len()
791 )
792 })
793}
794
795#[allow(dead_code)]
806#[derive(Debug, Clone, Copy)]
807pub(crate) struct SemanticPacketContext {
808 pub conversation_id: i64,
809 pub agent_id: u32,
810 pub workspace_id: u32,
811}
812
813#[allow(dead_code)]
833pub(crate) fn semantic_inputs_from_packets(
834 packets: &[ConversationPacket],
835 contexts: &[SemanticPacketContext],
836) -> Result<Vec<EmbeddingInput>> {
837 if packets.len() != contexts.len() {
838 anyhow::bail!(
839 "semantic_inputs_from_packets length mismatch: {} packets vs {} contexts",
840 packets.len(),
841 contexts.len()
842 );
843 }
844 let mut inputs = Vec::new();
845 for (packet, context) in packets.iter().zip(contexts.iter()) {
846 let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
847 for &message_index in &packet.projections.semantic.message_indices {
848 let Some(message) = packet.payload.messages.get(message_index) else {
849 anyhow::bail!(
850 "packet semantic projection references missing message index {} \
851 (packet has {} messages)",
852 message_index,
853 packet.payload.messages.len()
854 );
855 };
856 if let Some(input) = embedding_input_from_packet_message(
857 context.conversation_id,
858 context.agent_id,
859 context.workspace_id,
860 source_id_hash,
861 message,
862 ) {
863 inputs.push(input);
864 }
865 }
866 }
867 tracing::debug!(
868 packets = packets.len(),
869 packet_driven = true,
870 semantic_inputs = inputs.len(),
871 "built semantic inputs from in-memory ConversationPacket batch"
872 );
873 Ok(inputs)
874}
875
876fn fetch_canonical_embedding_batch(
877 storage: &FrankenStorage,
878 after_conversation_id: i64,
879 max_conversations: usize,
880) -> Result<CanonicalEmbeddingBatch> {
881 fetch_canonical_embedding_batch_inner(storage, after_conversation_id, max_conversations, None)
882}
883
884fn fetch_canonical_embedding_batch_inner(
890 storage: &FrankenStorage,
891 after_conversation_id: i64,
892 max_conversations: usize,
893 after_message_id: Option<i64>,
894) -> Result<CanonicalEmbeddingBatch> {
895 let total_conversations = total_semantic_conversations(storage)?;
896 let max_conversations_i64 = i64::try_from(max_conversations.max(1)).unwrap_or(i64::MAX);
897 let mut params = vec![
898 ParamValue::from(after_conversation_id),
899 ParamValue::from(max_conversations_i64),
900 ];
901 let message_cursor_predicate = if let Some(after_message_id) = after_message_id {
902 params.push(ParamValue::from(after_message_id));
903 " AND id > ?3"
904 } else {
905 ""
906 };
907 let hinted_sql = format!(
908 "SELECT c.id
909 FROM conversations c
910 WHERE c.id > ?1
911 AND EXISTS (
912 SELECT 1
913 FROM messages INDEXED BY sqlite_autoindex_messages_1
914 WHERE conversation_id = c.id{message_cursor_predicate}
915 LIMIT 1
916 )
917 ORDER BY c.id ASC
918 LIMIT ?2"
919 );
920 let fallback_sql = format!(
921 "SELECT c.id
922 FROM conversations c
923 WHERE c.id > ?1
924 AND EXISTS (
925 SELECT 1
926 FROM messages
927 WHERE conversation_id = c.id{message_cursor_predicate}
928 LIMIT 1
929 )
930 ORDER BY c.id ASC
931 LIMIT ?2"
932 );
933 let conversation_ids: Vec<i64> = storage
934 .raw()
935 .query_map_collect(&hinted_sql, ¶ms, |row| row.get_typed(0))
936 .or_else(|err| {
937 if err
938 .to_string()
939 .contains("no such index: sqlite_autoindex_messages_1")
940 {
941 return storage
942 .raw()
943 .query_map_collect(&fallback_sql, ¶ms, |row| row.get_typed(0));
944 }
945 Err(err)
946 })
947 .with_context(|| {
948 format!("fetching semantic backfill conversation ids after {after_conversation_id}")
949 })?;
950
951 if conversation_ids.is_empty() {
952 return Ok(CanonicalEmbeddingBatch {
953 inputs: Vec::new(),
954 conversations_in_batch: 0,
955 last_conversation_id: after_conversation_id,
956 total_conversations,
957 });
958 }
959
960 let conversations = fetch_canonical_embedding_conversations(storage, &conversation_ids)?;
961
962 let mut grouped_messages =
963 storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
964 let (inputs, _) = packet_embedding_inputs_from_materialized_canonical_messages(
965 &conversations,
966 &mut grouped_messages,
967 |message| match after_message_id {
968 Some(min_exclusive) => message.id.is_some_and(|id| id > min_exclusive),
969 None => true,
970 },
971 );
972
973 let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
974 tracing::debug!(
975 conversations_in_batch,
976 packet_driven = true,
977 semantic_inputs = inputs.len(),
978 ?after_message_id,
979 "built semantic backfill batch from ConversationPacket canonical replay"
980 );
981
982 Ok(CanonicalEmbeddingBatch {
983 inputs,
984 conversations_in_batch,
985 last_conversation_id: *conversation_ids.last().unwrap_or(&after_conversation_id),
986 total_conversations,
987 })
988}
989
990pub(crate) fn packet_embedding_inputs_from_storage(
991 storage: &FrankenStorage,
992) -> Result<Vec<EmbeddingInput>> {
993 Ok(fetch_canonical_embedding_batch(storage, 0, usize::MAX)?.inputs)
994}
995
996fn packet_embedding_inputs_from_selected_canonical_messages<F>(
997 storage: &FrankenStorage,
998 conversation_ids: &[i64],
999 include_message: F,
1000) -> Result<(Vec<EmbeddingInput>, Option<i64>)>
1001where
1002 F: FnMut(&Message) -> bool,
1003{
1004 if conversation_ids.is_empty() {
1005 return Ok((Vec::new(), None));
1006 }
1007
1008 let conversations = fetch_canonical_embedding_conversations(storage, conversation_ids)?;
1009 let mut grouped_messages =
1010 storage.fetch_messages_for_lexical_rebuild_batch(conversation_ids, None, None)?;
1011 Ok(
1012 packet_embedding_inputs_from_materialized_canonical_messages(
1013 &conversations,
1014 &mut grouped_messages,
1015 include_message,
1016 ),
1017 )
1018}
1019
1020fn packet_embedding_inputs_from_materialized_canonical_messages<F>(
1021 conversations: &[CanonicalEmbeddingConversationRow],
1022 grouped_messages: &mut HashMap<i64, Vec<Message>>,
1023 mut include_message: F,
1024) -> (Vec<EmbeddingInput>, Option<i64>)
1025where
1026 F: FnMut(&Message) -> bool,
1027{
1028 let mut inputs = Vec::new();
1029 let mut raw_max_message_id: Option<i64> = None;
1030
1031 for conversation in conversations {
1032 let mut messages = grouped_messages
1033 .remove(&conversation.conversation_id)
1034 .unwrap_or_default();
1035 messages.retain(|message| {
1036 let keep = include_message(message);
1037 if keep && let Some(message_id) = message.id {
1038 raw_max_message_id =
1039 Some(raw_max_message_id.map_or(message_id, |current| current.max(message_id)));
1040 }
1041 keep
1042 });
1043 if messages.is_empty() {
1044 continue;
1045 }
1046
1047 let provenance = canonical_embedding_packet_provenance(conversation);
1048 let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
1049 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
1050 inputs.extend(embedding_inputs_from_conversation_packet(
1051 conversation,
1052 &packet,
1053 ));
1054 }
1055
1056 (inputs, raw_max_message_id)
1057}
1058
1059pub(crate) fn packet_embedding_inputs_from_storage_since(
1060 storage: &FrankenStorage,
1061 since_message_id: i64,
1062) -> Result<CanonicalIncrementalEmbeddingBatch> {
1063 let conversation_ids: Vec<i64> = storage
1064 .raw()
1065 .query_map_collect(
1066 "SELECT DISTINCT m.conversation_id
1067 FROM messages m
1068 WHERE m.id > ?1
1069 ORDER BY m.conversation_id ASC",
1070 &[ParamValue::from(since_message_id)],
1071 |row| row.get_typed(0),
1072 )
1073 .with_context(|| {
1074 format!(
1075 "fetching canonical semantic catch-up conversation ids after message {since_message_id}"
1076 )
1077 })?;
1078
1079 if conversation_ids.is_empty() {
1080 return Ok(CanonicalIncrementalEmbeddingBatch {
1081 inputs: Vec::new(),
1082 conversations_in_batch: 0,
1083 raw_max_message_id: None,
1084 });
1085 }
1086
1087 let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1088 storage,
1089 &conversation_ids,
1090 |message| message.id.is_some_and(|id| id > since_message_id),
1091 )?;
1092
1093 let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
1094 tracing::debug!(
1095 since_message_id,
1096 conversations_in_batch,
1097 packet_driven = true,
1098 semantic_inputs = inputs.len(),
1099 "built semantic catch-up batch from ConversationPacket canonical replay"
1100 );
1101
1102 Ok(CanonicalIncrementalEmbeddingBatch {
1103 inputs,
1104 conversations_in_batch,
1105 raw_max_message_id,
1106 })
1107}
1108
1109pub(crate) fn packet_embedding_inputs_from_storage_for_message_ids(
1110 storage: &FrankenStorage,
1111 conversation_ids: &[i64],
1112 message_ids: &HashSet<i64>,
1113) -> Result<Vec<EmbeddingInput>> {
1114 if conversation_ids.is_empty() || message_ids.is_empty() {
1115 return Ok(Vec::new());
1116 }
1117
1118 let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1119 storage,
1120 conversation_ids,
1121 |message| message.id.is_some_and(|id| message_ids.contains(&id)),
1122 )?;
1123 tracing::debug!(
1124 conversations_in_batch = conversation_ids.len(),
1125 selected_message_ids = message_ids.len(),
1126 semantic_inputs = inputs.len(),
1127 raw_max_message_id,
1128 packet_driven = true,
1129 "built selected semantic batch from ConversationPacket canonical replay"
1130 );
1131
1132 Ok(inputs)
1133}
1134
1135struct Prepared<'a> {
1136 msg: &'a EmbeddingInput,
1137 canonical: String,
1138 hash: [u8; 32],
1139}
1140
1141#[derive(Debug, Clone, PartialEq, Eq)]
1142struct MemoizedPreparedMessage {
1143 canonical: String,
1144 hash: [u8; 32],
1145}
1146
1147fn semantic_prep_memo_key(content: &str) -> MemoKey {
1148 MemoKey::new(
1149 MemoContentHash::from_bytes(content_hash(content).to_vec()),
1150 SEMANTIC_PREP_MEMO_ALGORITHM,
1151 SEMANTIC_PREP_MEMO_VERSION,
1152 )
1153}
1154
1155fn memo_counter_delta(after: u64, before: u64) -> u64 {
1156 after.saturating_sub(before)
1157}
1158
1159fn trace_semantic_prep_memo_window(
1160 window_index: usize,
1161 window_len: usize,
1162 prepared_len: usize,
1163 entry_capacity: usize,
1164 before: &crate::indexer::memoization::MemoCacheStats,
1165 after: &crate::indexer::memoization::MemoCacheStats,
1166) {
1167 tracing::trace!(
1168 algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1169 algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1170 window_index,
1171 window_len,
1172 prepared_messages = prepared_len,
1173 skipped_messages = window_len.saturating_sub(prepared_len),
1174 hit_delta = memo_counter_delta(after.hits, before.hits),
1175 miss_delta = memo_counter_delta(after.misses, before.misses),
1176 insert_delta = memo_counter_delta(after.inserts, before.inserts),
1177 evictions_capacity_delta =
1178 memo_counter_delta(after.evictions_capacity, before.evictions_capacity),
1179 quarantined_delta = memo_counter_delta(after.quarantined, before.quarantined),
1180 live_entries = after.live_entries,
1181 entry_capacity,
1182 "semantic prep memo cache window"
1183 );
1184}
1185
1186fn trace_semantic_prep_memo_audit(audit: &MemoCacheAuditRecord) {
1187 tracing::trace!(?audit, "semantic prep memo cache audit");
1188}
1189
1190fn prepare_window_with_memo<'a>(
1191 window: &'a [EmbeddingInput],
1192 cache: &mut ContentAddressedMemoCache<MemoizedPreparedMessage>,
1193) -> Vec<Prepared<'a>> {
1194 window
1195 .iter()
1196 .filter_map(|msg| {
1197 let key = semantic_prep_memo_key(&msg.content);
1198 let (lookup, lookup_audit) = cache.get_with_audit(&key);
1199 trace_semantic_prep_memo_audit(&lookup_audit);
1200 match lookup {
1201 MemoLookup::Hit { value } => Some(Prepared {
1202 msg,
1203 canonical: value.canonical,
1204 hash: value.hash,
1205 }),
1206 MemoLookup::Miss | MemoLookup::Quarantined { .. } => {
1207 let canonical = canonicalize_for_embedding(&msg.content);
1208 if canonical.is_empty() {
1209 return None;
1210 }
1211 let hash = content_hash(&canonical);
1212 let insert_audit = cache.insert_with_audit(
1213 key,
1214 MemoizedPreparedMessage {
1215 canonical: canonical.clone(),
1216 hash,
1217 },
1218 );
1219 trace_semantic_prep_memo_audit(&insert_audit);
1220 Some(Prepared {
1221 msg,
1222 canonical,
1223 hash,
1224 })
1225 }
1226 }
1227 })
1228 .collect()
1229}
1230
1231fn prepare_window<'a>(window: &'a [EmbeddingInput], serial: bool) -> Vec<Prepared<'a>> {
1238 let prep = |msg: &'a EmbeddingInput| -> Option<Prepared<'a>> {
1239 let canonical = canonicalize_for_embedding(&msg.content);
1240 if canonical.is_empty() {
1241 return None;
1242 }
1243 let hash = content_hash(&canonical);
1244 Some(Prepared {
1245 msg,
1246 canonical,
1247 hash,
1248 })
1249 };
1250
1251 if serial {
1252 window.iter().filter_map(prep).collect()
1253 } else {
1254 window.par_iter().filter_map(prep).collect()
1255 }
1256}
1257
1258fn flush_prepared_batch(
1259 batch: &[Prepared<'_>],
1260 embeddings: &mut Vec<EmbeddedMessage>,
1261 pb: &ProgressBar,
1262 embedder: &dyn Embedder,
1263) -> Result<()> {
1264 if batch.is_empty() {
1265 return Ok(());
1266 }
1267
1268 let texts: Vec<&str> = batch.iter().map(|p| p.canonical.as_str()).collect();
1269 let vectors = embedder
1270 .embed_batch_sync(&texts)
1271 .map_err(|e| anyhow::anyhow!("embedding failed: {e}"))?;
1272
1273 if vectors.len() != batch.len() {
1274 bail!(
1275 "embedder returned {} embeddings for {} inputs",
1276 vectors.len(),
1277 batch.len()
1278 );
1279 }
1280
1281 for (prepared, vector) in batch.iter().zip(vectors) {
1282 if vector.len() != embedder.dimension() {
1283 bail!(
1284 "embedding dimension mismatch: expected {}, got {}",
1285 embedder.dimension(),
1286 vector.len()
1287 );
1288 }
1289 embeddings.push(EmbeddedMessage {
1290 message_id: prepared.msg.message_id,
1291 created_at_ms: prepared.msg.created_at_ms,
1292 agent_id: prepared.msg.agent_id,
1293 workspace_id: prepared.msg.workspace_id,
1294 source_id: prepared.msg.source_id,
1295 role: prepared.msg.role,
1296 chunk_idx: prepared.msg.chunk_idx,
1297 content_hash: prepared.hash,
1298 embedding: vector,
1299 });
1300 }
1301
1302 pb.inc(saturating_u64_from_usize(batch.len()));
1303 Ok(())
1304}
1305
1306pub struct SemanticIndexer {
1307 embedder: Box<dyn Embedder>,
1308 batch_size: usize,
1309}
1310
1311impl SemanticIndexer {
1312 pub fn new(embedder_type: &str, data_dir: Option<&Path>) -> Result<Self> {
1313 let embedder: Box<dyn Embedder> = match embedder_type {
1314 "fastembed" | "minilm" | "snowflake-arctic-s" | "nomic-embed" => {
1315 let dir = data_dir
1316 .ok_or_else(|| anyhow::anyhow!("data_dir required for fastembed embedder"))?;
1317 let embedder_name = if embedder_type == "fastembed" {
1318 "minilm"
1319 } else {
1320 embedder_type
1321 };
1322 Box::new(
1323 FastEmbedder::load_by_name(dir, embedder_name)
1324 .map_err(|e| anyhow::anyhow!("fastembed unavailable: {e}"))?,
1325 )
1326 }
1327 "hash" => Box::new(HashEmbedder::default()),
1328 other => bail!("unknown embedder: {other}"),
1329 };
1330
1331 Ok(Self {
1332 embedder,
1333 batch_size: resolved_default_batch_size(),
1334 })
1335 }
1336
1337 pub fn with_batch_size(mut self, batch_size: usize) -> Result<Self> {
1338 if batch_size == 0 {
1339 bail!("batch_size must be > 0");
1340 }
1341 self.batch_size = batch_size;
1342 Ok(self)
1343 }
1344
1345 pub fn batch_size(&self) -> usize {
1346 self.batch_size
1347 }
1348
1349 pub fn embedder_id(&self) -> &str {
1350 self.embedder.id()
1351 }
1352
1353 pub fn embedder_dimension(&self) -> usize {
1354 self.embedder.dimension()
1355 }
1356
1357 pub fn embed_messages(&self, messages: &[EmbeddingInput]) -> Result<Vec<EmbeddedMessage>> {
1358 self.embed_messages_with_sink(messages, &SemanticProgressSink::disabled())
1359 }
1360
1361 pub fn embed_messages_with_sink(
1366 &self,
1367 messages: &[EmbeddingInput],
1368 sink: &SemanticProgressSink,
1369 ) -> Result<Vec<EmbeddedMessage>> {
1370 if messages.is_empty() {
1371 return Ok(Vec::new());
1372 }
1373
1374 let show_progress = std::io::stderr().is_terminal();
1375 let pb = ProgressBar::new(saturating_u64_from_usize(messages.len()));
1376 if show_progress {
1377 let style = ProgressStyle::default_bar()
1378 .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} messages embedded")
1379 .unwrap_or_else(|_| ProgressStyle::default_bar());
1380 pb.set_style(style);
1381 } else {
1382 pb.set_draw_target(ProgressDrawTarget::hidden());
1383 }
1384
1385 let mut embeddings = Vec::with_capacity(messages.len());
1386
1387 let window = self.batch_size.saturating_mul(4);
1395 let serial_prep = !parallel_prep_enabled();
1396 let prep_memo_capacity = resolved_semantic_prep_memo_capacity();
1397 let mut prep_memo =
1398 serial_prep.then(|| ContentAddressedMemoCache::with_capacity(prep_memo_capacity));
1399 let mut batch_index: u64 = 0;
1400 let mut rows_processed: u64 = 0;
1401 let rows_total = u64::try_from(messages.len()).ok();
1402 for (window_index, window_slice) in messages.chunks(window).enumerate() {
1403 let prepared_window = match prep_memo.as_mut() {
1404 Some(cache) => {
1405 let stats_before = cache.stats().clone();
1406 let prepared_window = prepare_window_with_memo(window_slice, cache);
1407 trace_semantic_prep_memo_window(
1408 window_index,
1409 window_slice.len(),
1410 prepared_window.len(),
1411 prep_memo_capacity,
1412 &stats_before,
1413 cache.stats(),
1414 );
1415 prepared_window
1416 }
1417 None => prepare_window(window_slice, false),
1418 };
1419 let skipped_in_window = window_slice.len() - prepared_window.len();
1420 if skipped_in_window > 0 {
1421 pb.inc(saturating_u64_from_usize(skipped_in_window));
1422 }
1423
1424 for batch in prepared_window.chunks(self.batch_size) {
1425 let batch_rows = u64::try_from(batch.len()).unwrap_or(u64::MAX);
1426 let batch_bytes: u64 = batch.iter().map(|p| p.canonical.len() as u64).sum();
1432 if sink.is_active() {
1433 sink.emit(
1434 SemanticProgressEvent::EmbedBatchStart,
1435 SemanticProgressFields {
1436 batch_index: Some(batch_index),
1437 batch_rows: Some(batch_rows),
1438 rows_processed: Some(rows_processed),
1439 rows_total,
1440 bytes: Some(batch_bytes),
1441 ..Default::default()
1442 },
1443 );
1444 }
1445 flush_prepared_batch(batch, &mut embeddings, &pb, self.embedder.as_ref())?;
1446 rows_processed = rows_processed.saturating_add(batch_rows);
1447 if sink.is_active() {
1448 sink.emit(
1449 SemanticProgressEvent::EmbedBatchDone,
1450 SemanticProgressFields {
1451 batch_index: Some(batch_index),
1452 batch_rows: Some(batch_rows),
1453 rows_processed: Some(rows_processed),
1454 rows_total,
1455 bytes: Some(batch_bytes),
1456 ..Default::default()
1457 },
1458 );
1459 }
1460 batch_index = batch_index.saturating_add(1);
1461 }
1462 }
1463
1464 if let Some(cache) = prep_memo.as_ref() {
1465 let stats = cache.stats();
1466 tracing::debug!(
1467 algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1468 algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1469 hits = stats.hits,
1470 misses = stats.misses,
1471 inserts = stats.inserts,
1472 quarantined = stats.quarantined,
1473 live_entries = stats.live_entries,
1474 entry_capacity = prep_memo_capacity,
1475 "semantic prep memo cache summary"
1476 );
1477 }
1478
1479 pb.finish_with_message("Embedding complete");
1480 Ok(embeddings)
1481 }
1482
1483 pub fn build_and_save_index<I>(
1484 &self,
1485 embedded_messages: I,
1486 data_dir: &Path,
1487 ) -> Result<FsVectorIndex>
1488 where
1489 I: IntoIterator<Item = EmbeddedMessage>,
1490 {
1491 let index_path = vector_index_path(data_dir, self.embedder_id());
1492 self.build_and_save_index_at_path(embedded_messages, &index_path)
1493 }
1494
1495 pub fn build_and_save_index_shards<I>(
1496 &self,
1497 embedded_messages: I,
1498 data_dir: &Path,
1499 plan: SemanticShardBuildPlan,
1500 ) -> Result<SemanticShardBuildOutcome>
1501 where
1502 I: IntoIterator<Item = EmbeddedMessage>,
1503 {
1504 if plan.db_fingerprint.trim().is_empty() {
1505 bail!("semantic shard build requires a non-empty DB fingerprint");
1506 }
1507 if plan.max_records_per_shard == 0 {
1508 bail!("semantic shard build requires max_records_per_shard > 0");
1509 }
1510
1511 let mut shard_records = Vec::new();
1512 let mut index_paths = Vec::new();
1513 let mut ann_index_paths = Vec::new();
1514 let mut current_records = Vec::with_capacity(plan.max_records_per_shard);
1515 let mut shard_index = 0u32;
1516 let mut total_docs = 0u64;
1517
1518 for embedded in embedded_messages {
1519 current_records.push(embedded);
1520 if current_records.len() >= plan.max_records_per_shard {
1521 let records = std::mem::take(&mut current_records);
1522 let (record, path, ann_path) =
1523 self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1524 total_docs = total_docs.saturating_add(record.doc_count);
1525 shard_records.push(record);
1526 index_paths.push(path);
1527 if let Some(path) = ann_path {
1528 ann_index_paths.push(path);
1529 }
1530 shard_index = shard_index
1531 .checked_add(1)
1532 .context("semantic shard index overflow")?;
1533 }
1534 }
1535
1536 if !current_records.is_empty() {
1537 let records = std::mem::take(&mut current_records);
1538 let (record, path, ann_path) =
1539 self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1540 total_docs = total_docs.saturating_add(record.doc_count);
1541 shard_records.push(record);
1542 index_paths.push(path);
1543 if let Some(path) = ann_path {
1544 ann_index_paths.push(path);
1545 }
1546 }
1547
1548 let shard_count = u32::try_from(shard_records.len())
1549 .context("semantic shard generation exceeded u32 shard count")?;
1550 for record in &mut shard_records {
1551 record.shard_count = shard_count;
1552 }
1553
1554 let mut shard_manifest = SemanticShardManifest::load_or_default(data_dir)
1555 .map_err(|err| anyhow::anyhow!("loading semantic shard manifest for publish: {err}"))?;
1556 shard_manifest.replace_shards_for_generation(
1557 plan.tier,
1558 self.embedder_id(),
1559 &plan.db_fingerprint,
1560 shard_records,
1561 );
1562 shard_manifest
1563 .save(data_dir)
1564 .map_err(|err| anyhow::anyhow!("saving semantic shard manifest: {err}"))?;
1565 let summary = shard_manifest.summary(plan.tier, self.embedder_id(), &plan.db_fingerprint);
1566
1567 tracing::info!(
1568 tier = plan.tier.as_str(),
1569 embedder = self.embedder_id(),
1570 shard_count,
1571 doc_count = total_docs,
1572 total_conversations = plan.total_conversations,
1573 "published semantic shard generation sidecar"
1574 );
1575
1576 Ok(SemanticShardBuildOutcome {
1577 tier: plan.tier,
1578 embedder_id: self.embedder_id().to_string(),
1579 shard_count,
1580 doc_count: total_docs,
1581 total_conversations: plan.total_conversations,
1582 index_paths,
1583 ann_index_paths,
1584 shard_manifest_path: SemanticShardManifest::path(data_dir),
1585 complete: summary.complete,
1586 })
1587 }
1588
1589 fn write_semantic_shard(
1590 &self,
1591 embedded_messages: Vec<EmbeddedMessage>,
1592 data_dir: &Path,
1593 plan: &SemanticShardBuildPlan,
1594 shard_index: u32,
1595 ) -> Result<(SemanticShardRecord, PathBuf, Option<PathBuf>)> {
1596 let started_at_ms = now_ms();
1597 let shard_path = semantic_shard_index_path(
1598 data_dir,
1599 plan.tier,
1600 self.embedder_id(),
1601 &plan.db_fingerprint,
1602 shard_index,
1603 );
1604 let shard_index_file = self.build_and_save_index_at_path(embedded_messages, &shard_path)?;
1605 let size_bytes = fs::metadata(&shard_path)
1606 .with_context(|| format!("stat semantic shard {}", shard_path.display()))?
1607 .len();
1608 let (ann_index_path, ann_size_bytes, ann_ready, ann_absolute_path) = if plan.build_ann {
1609 let ann_path = semantic_shard_ann_index_path(
1610 data_dir,
1611 plan.tier,
1612 self.embedder_id(),
1613 &plan.db_fingerprint,
1614 shard_index,
1615 );
1616 let config = FsHnswConfig {
1617 m: FS_HNSW_DEFAULT_M,
1618 ef_construction: FS_HNSW_DEFAULT_EF_CONSTRUCTION,
1619 ..FsHnswConfig::default()
1620 };
1621 let hnsw = FsHnswIndex::build_from_vector_index(&shard_index_file, config)
1622 .map_err(|err| anyhow::anyhow!("build shard HNSW index failed: {err}"))?;
1623 hnsw.save(&ann_path)
1624 .map_err(|err| anyhow::anyhow!("save shard HNSW index failed: {err}"))?;
1625 let ann_size_bytes = fs::metadata(&ann_path)
1626 .with_context(|| format!("stat semantic shard ANN {}", ann_path.display()))?
1627 .len();
1628 let relative_ann_path = ann_path
1629 .strip_prefix(data_dir)
1630 .unwrap_or(ann_path.as_path())
1631 .to_string_lossy()
1632 .to_string();
1633 (
1634 Some(relative_ann_path),
1635 ann_size_bytes,
1636 true,
1637 Some(ann_path),
1638 )
1639 } else {
1640 (None, 0, false, None)
1641 };
1642 let relative_index_path = shard_path
1643 .strip_prefix(data_dir)
1644 .unwrap_or(shard_path.as_path())
1645 .to_string_lossy()
1646 .to_string();
1647 let record = SemanticShardRecord {
1648 tier: plan.tier,
1649 embedder_id: self.embedder_id().to_string(),
1650 model_revision: plan.model_revision.clone(),
1651 schema_version: SEMANTIC_SCHEMA_VERSION,
1652 chunking_version: CHUNKING_STRATEGY_VERSION,
1653 dimension: self.embedder_dimension(),
1654 shard_index,
1655 shard_count: 0,
1656 doc_count: u64::try_from(shard_index_file.record_count()).unwrap_or(u64::MAX),
1657 total_conversations: plan.total_conversations,
1658 db_fingerprint: plan.db_fingerprint.clone(),
1659 index_path: relative_index_path,
1660 quantization: "f16".to_string(),
1661 mmap_ready: true,
1662 ann_index_path,
1663 ann_size_bytes,
1664 ann_ready,
1665 size_bytes,
1666 started_at_ms,
1667 completed_at_ms: now_ms(),
1668 ready: true,
1669 };
1670 Ok((record, shard_path, ann_absolute_path))
1671 }
1672
1673 fn build_and_save_index_at_path<I>(
1674 &self,
1675 embedded_messages: I,
1676 index_path: &Path,
1677 ) -> Result<FsVectorIndex>
1678 where
1679 I: IntoIterator<Item = EmbeddedMessage>,
1680 {
1681 if let Some(parent) = index_path.parent() {
1682 std::fs::create_dir_all(parent)?;
1683 }
1684
1685 let mut writer: FsVectorIndexWriter = FsVectorIndex::create_with_revision(
1687 index_path,
1688 self.embedder_id(),
1689 "1.0",
1690 self.embedder_dimension(),
1691 FsQuantization::F16,
1692 )
1693 .map_err(|err| anyhow::anyhow!("create fsvi index failed: {err}"))?;
1694
1695 let write_result: Result<()> = (|| {
1696 for embedded in embedded_messages {
1697 if embedded.embedding.len() != self.embedder_dimension() {
1698 bail!(
1699 "embedding dimension mismatch: expected {}, got {}",
1700 self.embedder_dimension(),
1701 embedded.embedding.len()
1702 );
1703 }
1704 let doc_id = semantic_doc_id_for_embedded(&embedded);
1705 writer
1706 .write_record(&doc_id, &embedded.embedding)
1707 .map_err(|err| anyhow::anyhow!("write fsvi record failed: {err}"))?;
1708 }
1709 Ok(())
1710 })();
1711
1712 if let Err(e) = &write_result {
1713 tracing::warn!("removing partial vector index after write failure: {e}");
1715 if let Err(rm_err) = std::fs::remove_file(index_path) {
1716 tracing::error!(
1717 "failed to remove partial index file {}: {rm_err}",
1718 index_path.display()
1719 );
1720 }
1721 return Err(anyhow::anyhow!("{e}"));
1722 }
1723
1724 writer
1725 .finish()
1726 .map_err(|err| anyhow::anyhow!("finish fsvi index failed: {err}"))?;
1727
1728 FsVectorIndex::open(index_path)
1729 .map_err(|err| anyhow::anyhow!("open fsvi index failed: {err}"))
1730 }
1731
1732 pub fn append_to_index(
1740 &self,
1741 embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1742 data_dir: &Path,
1743 ) -> Result<usize> {
1744 let index_path = vector_index_path(data_dir, self.embedder_id());
1745 self.append_to_index_path(embedded_messages, &index_path)
1746 }
1747
1748 fn append_to_index_path(
1749 &self,
1750 embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1751 index_path: &Path,
1752 ) -> Result<usize> {
1753 let mut index = FsVectorIndex::open(index_path)
1754 .map_err(|err| anyhow::anyhow!("open fsvi index for append: {err}"))?;
1755
1756 let entries: Vec<(String, Vec<f32>)> = embedded_messages
1757 .into_iter()
1758 .map(|em| {
1759 let doc_id = semantic_doc_id_for_embedded(&em);
1760 (doc_id, em.embedding)
1761 })
1762 .collect();
1763
1764 let count = entries.len();
1765 if count == 0 {
1766 return Ok(0);
1767 }
1768
1769 index
1770 .append_batch(&entries)
1771 .map_err(|err| anyhow::anyhow!("append_batch: {err}"))?;
1772
1773 if index.needs_compaction() {
1774 index
1775 .compact()
1776 .map_err(|err| anyhow::anyhow!("compaction: {err}"))?;
1777 }
1778
1779 Ok(count)
1780 }
1781
1782 fn write_backfill_staging_index(
1783 &self,
1784 embedded_messages: Vec<EmbeddedMessage>,
1785 staging_path: &Path,
1786 resume_existing: bool,
1787 ) -> Result<FsVectorIndex> {
1788 if resume_existing && staging_path.exists() {
1789 self.append_to_index_path(embedded_messages, staging_path)?;
1790 FsVectorIndex::open(staging_path)
1791 .map_err(|err| anyhow::anyhow!("open staged semantic index failed: {err}"))
1792 } else {
1793 self.build_and_save_index_at_path(embedded_messages, staging_path)
1794 }
1795 }
1796
1797 pub fn run_backfill_batch(
1798 &self,
1799 messages: &[EmbeddingInput],
1800 data_dir: &Path,
1801 manifest: &mut SemanticManifest,
1802 plan: SemanticBackfillBatchPlan,
1803 ) -> Result<SemanticBackfillBatchOutcome> {
1804 self.run_backfill_batch_with_sink(
1805 messages,
1806 data_dir,
1807 manifest,
1808 plan,
1809 None,
1810 &SemanticProgressSink::disabled(),
1811 )
1812 }
1813
1814 pub fn run_backfill_batch_with_sink(
1820 &self,
1821 messages: &[EmbeddingInput],
1822 data_dir: &Path,
1823 manifest: &mut SemanticManifest,
1824 plan: SemanticBackfillBatchPlan,
1825 last_message_id: Option<i64>,
1826 sink: &SemanticProgressSink,
1827 ) -> Result<SemanticBackfillBatchOutcome> {
1828 if plan.db_fingerprint.trim().is_empty() {
1829 bail!("semantic backfill requires a non-empty DB fingerprint");
1830 }
1831 if plan.total_conversations == 0 && plan.conversations_in_batch > 0 {
1832 bail!("semantic backfill batch cannot process conversations when total is zero");
1833 }
1834
1835 let manifest_path = SemanticManifest::path(data_dir);
1836 let staging_path = semantic_staging_index_path(
1837 data_dir,
1838 plan.tier,
1839 self.embedder_id(),
1840 &plan.db_fingerprint,
1841 );
1842 let final_path = vector_index_path(data_dir, self.embedder_id());
1843
1844 let prior_checkpoint = manifest
1845 .checkpoint
1846 .as_ref()
1847 .filter(|checkpoint| {
1848 checkpoint.tier == plan.tier
1849 && checkpoint.embedder_id == self.embedder_id()
1850 && checkpoint.is_valid(&plan.db_fingerprint)
1851 })
1852 .cloned();
1853 let prior_conversations = prior_checkpoint
1854 .as_ref()
1855 .map_or(0, |checkpoint| checkpoint.conversations_processed);
1856 let prior_docs = prior_checkpoint
1857 .as_ref()
1858 .map_or(0, |checkpoint| checkpoint.docs_embedded);
1859
1860 let embeddings = self.embed_messages_with_sink(messages, sink)?;
1861 let embedded_docs = u64::try_from(embeddings.len()).unwrap_or(u64::MAX);
1862 if sink.is_active() {
1863 sink.emit(
1864 SemanticProgressEvent::StagingWriteStart,
1865 SemanticProgressFields {
1866 batch_rows: Some(embedded_docs),
1867 note: Some(staging_path.display().to_string()),
1868 ..Default::default()
1869 },
1870 );
1871 }
1872 let mut staged_index = self.write_backfill_staging_index(
1873 embeddings,
1874 &staging_path,
1875 prior_checkpoint.is_some(),
1876 )?;
1877 if sink.is_active() {
1878 sink.emit(
1879 SemanticProgressEvent::StagingWriteDone,
1880 SemanticProgressFields {
1881 batch_rows: Some(embedded_docs),
1882 note: Some(staging_path.display().to_string()),
1883 ..Default::default()
1884 },
1885 );
1886 }
1887 let conversations_processed = prior_conversations
1888 .saturating_add(plan.conversations_in_batch)
1889 .min(plan.total_conversations);
1890 let complete = conversations_processed >= plan.total_conversations;
1891
1892 manifest.refresh_backlog(plan.total_conversations, &plan.db_fingerprint);
1893
1894 if complete {
1895 let db_fingerprint = plan.db_fingerprint.clone();
1896 if staged_index.wal_record_count() > 0 {
1897 staged_index.compact().map_err(|err| {
1898 anyhow::anyhow!("compact staged semantic index failed: {err}")
1899 })?;
1900 }
1901 drop(staged_index);
1902 if sink.is_active() {
1903 sink.emit(
1904 SemanticProgressEvent::PublishStart,
1905 SemanticProgressFields {
1906 rows_processed: Some(conversations_processed),
1907 rows_total: Some(plan.total_conversations),
1908 last_conversation_id: Some(plan.last_offset),
1909 last_message_id,
1910 note: Some(final_path.display().to_string()),
1911 ..Default::default()
1912 },
1913 );
1914 }
1915 fs::rename(&staging_path, &final_path).with_context(|| {
1916 format!(
1917 "publishing staged semantic index {} to {}",
1918 staging_path.display(),
1919 final_path.display()
1920 )
1921 })?;
1922 sync_parent_directory(&final_path)?;
1923 let published_index = FsVectorIndex::open(&final_path)
1924 .map_err(|err| anyhow::anyhow!("open published semantic index failed: {err}"))?;
1925 let size_bytes = fs::metadata(&final_path)
1926 .with_context(|| format!("stat published semantic index {}", final_path.display()))?
1927 .len();
1928 let relative_index_path = final_path
1929 .strip_prefix(data_dir)
1930 .unwrap_or(final_path.as_path())
1931 .to_string_lossy()
1932 .to_string();
1933 manifest.publish_artifact(ArtifactRecord {
1934 tier: plan.tier,
1935 embedder_id: self.embedder_id().to_string(),
1936 model_revision: plan.model_revision,
1937 schema_version: SEMANTIC_SCHEMA_VERSION,
1938 chunking_version: CHUNKING_STRATEGY_VERSION,
1939 dimension: self.embedder_dimension(),
1940 doc_count: u64::try_from(published_index.record_count()).unwrap_or(u64::MAX),
1941 conversation_count: conversations_processed,
1942 db_fingerprint: plan.db_fingerprint,
1943 index_path: relative_index_path,
1944 size_bytes,
1945 started_at_ms: prior_checkpoint
1946 .as_ref()
1947 .map_or_else(now_ms, |checkpoint| checkpoint.saved_at_ms),
1948 completed_at_ms: now_ms(),
1949 ready: true,
1950 });
1951 manifest.refresh_backlog(plan.total_conversations, &db_fingerprint);
1952 manifest.save(data_dir)?;
1953 if sink.is_active() {
1954 sink.emit(
1955 SemanticProgressEvent::PublishDone,
1956 SemanticProgressFields {
1957 rows_processed: Some(conversations_processed),
1958 rows_total: Some(plan.total_conversations),
1959 last_conversation_id: Some(plan.last_offset),
1960 last_message_id,
1961 note: Some(final_path.display().to_string()),
1962 ..Default::default()
1963 },
1964 );
1965 }
1966 } else {
1967 let docs_embedded_on_disk =
1968 u64::try_from(staged_index.record_count()).unwrap_or(u64::MAX);
1969 let checkpoint_docs = prior_docs
1970 .saturating_add(embedded_docs)
1971 .max(docs_embedded_on_disk);
1972 if sink.is_active() {
1973 sink.emit(
1974 SemanticProgressEvent::CheckpointSaveStart,
1975 SemanticProgressFields {
1976 rows_processed: Some(conversations_processed),
1977 rows_total: Some(plan.total_conversations),
1978 last_conversation_id: Some(plan.last_offset),
1979 last_message_id,
1980 ..Default::default()
1981 },
1982 );
1983 }
1984 let prior_last_message_id = prior_checkpoint
1988 .as_ref()
1989 .and_then(|checkpoint| checkpoint.last_message_id);
1990 manifest.save_checkpoint(BuildCheckpoint {
1991 tier: plan.tier,
1992 embedder_id: self.embedder_id().to_string(),
1993 last_offset: plan.last_offset,
1994 docs_embedded: checkpoint_docs,
1995 conversations_processed,
1996 total_conversations: plan.total_conversations,
1997 db_fingerprint: plan.db_fingerprint,
1998 schema_version: SEMANTIC_SCHEMA_VERSION,
1999 chunking_version: CHUNKING_STRATEGY_VERSION,
2000 saved_at_ms: now_ms(),
2001 last_message_id: last_message_id.or(prior_last_message_id),
2002 });
2003 manifest.save(data_dir)?;
2004 if sink.is_active() {
2005 sink.emit(
2006 SemanticProgressEvent::CheckpointSaveDone,
2007 SemanticProgressFields {
2008 rows_processed: Some(conversations_processed),
2009 rows_total: Some(plan.total_conversations),
2010 last_conversation_id: Some(plan.last_offset),
2011 last_message_id: last_message_id.or(prior_last_message_id),
2012 ..Default::default()
2013 },
2014 );
2015 }
2016 }
2017
2018 Ok(SemanticBackfillBatchOutcome {
2019 tier: plan.tier,
2020 embedder_id: self.embedder_id().to_string(),
2021 embedded_docs,
2022 conversations_processed,
2023 total_conversations: plan.total_conversations,
2024 last_offset: plan.last_offset,
2025 checkpoint_saved: !complete,
2026 published: complete,
2027 index_path: if complete { final_path } else { staging_path },
2028 manifest_path,
2029 })
2030 }
2031
2032 pub fn run_backfill_from_storage(
2033 &self,
2034 storage: &FrankenStorage,
2035 data_dir: &Path,
2036 manifest: &mut SemanticManifest,
2037 plan: SemanticBackfillStoragePlan,
2038 ) -> Result<SemanticBackfillBatchOutcome> {
2039 self.run_backfill_from_storage_with_sink(
2040 storage,
2041 data_dir,
2042 manifest,
2043 plan,
2044 &SemanticProgressSink::disabled(),
2045 )
2046 }
2047
2048 pub fn run_backfill_from_storage_with_sink(
2053 &self,
2054 storage: &FrankenStorage,
2055 data_dir: &Path,
2056 manifest: &mut SemanticManifest,
2057 plan: SemanticBackfillStoragePlan,
2058 sink: &SemanticProgressSink,
2059 ) -> Result<SemanticBackfillBatchOutcome> {
2060 let prior_checkpoint = manifest.checkpoint.as_ref().filter(|checkpoint| {
2065 checkpoint.tier == plan.tier
2066 && checkpoint.embedder_id == self.embedder_id()
2067 && checkpoint.is_valid(&plan.db_fingerprint)
2068 });
2069 let after_conversation_id = prior_checkpoint.map_or(0, |checkpoint| checkpoint.last_offset);
2070 let prior_last_message_id =
2071 prior_checkpoint.and_then(|checkpoint| checkpoint.last_message_id);
2072
2073 if sink.is_active() {
2074 sink.emit(
2075 SemanticProgressEvent::SelectionStart,
2076 SemanticProgressFields {
2077 last_conversation_id: Some(after_conversation_id),
2078 last_message_id: prior_last_message_id,
2079 rows_total: Some(plan.max_conversations as u64),
2080 note: Some(plan.tier.as_str().to_string()),
2081 ..Default::default()
2082 },
2083 );
2084 }
2085
2086 let batch = match fetch_canonical_embedding_batch_inner(
2087 storage,
2088 after_conversation_id,
2089 plan.max_conversations,
2090 prior_last_message_id,
2091 ) {
2092 Ok(batch) => batch,
2093 Err(err) => {
2094 if sink.is_active() {
2095 sink.emit(
2096 SemanticProgressEvent::Error,
2097 SemanticProgressFields {
2098 error: Some(format!("selection: {err}")),
2099 last_conversation_id: Some(after_conversation_id),
2100 last_message_id: prior_last_message_id,
2101 ..Default::default()
2102 },
2103 );
2104 }
2105 return Err(err);
2106 }
2107 };
2108
2109 if sink.is_active() {
2110 sink.emit(
2111 SemanticProgressEvent::SelectionDone,
2112 SemanticProgressFields {
2113 last_conversation_id: Some(batch.last_conversation_id),
2114 last_message_id: prior_last_message_id,
2115 conversations_in_batch: Some(batch.conversations_in_batch),
2116 rows_total: Some(batch.total_conversations),
2117 ..Default::default()
2118 },
2119 );
2120 sink.emit(
2132 SemanticProgressEvent::PacketReplayStart,
2133 SemanticProgressFields {
2134 conversations_in_batch: Some(batch.conversations_in_batch),
2135 ..Default::default()
2136 },
2137 );
2138 sink.emit(
2139 SemanticProgressEvent::PacketReplayProgress,
2140 SemanticProgressFields {
2141 conversations_in_batch: Some(batch.conversations_in_batch),
2142 rows_processed: Some(batch.inputs.len() as u64),
2143 bytes: Some(batch.inputs.iter().map(|i| i.content.len() as u64).sum()),
2144 ..Default::default()
2145 },
2146 );
2147 sink.emit(
2148 SemanticProgressEvent::PacketReplayDone,
2149 SemanticProgressFields {
2150 conversations_in_batch: Some(batch.conversations_in_batch),
2151 rows_processed: Some(batch.inputs.len() as u64),
2152 ..Default::default()
2153 },
2154 );
2155 }
2156
2157 let batch_last_message_id = batch
2161 .inputs
2162 .iter()
2163 .map(|input| i64::try_from(input.message_id).unwrap_or(i64::MAX))
2164 .max();
2165 let next_last_message_id = match (prior_last_message_id, batch_last_message_id) {
2166 (Some(prior), Some(batch_max)) => Some(prior.max(batch_max)),
2167 (Some(prior), None) => Some(prior),
2168 (None, Some(batch_max)) => Some(batch_max),
2169 (None, None) => None,
2170 };
2171
2172 let outcome = self.run_backfill_batch_with_sink(
2173 &batch.inputs,
2174 data_dir,
2175 manifest,
2176 SemanticBackfillBatchPlan {
2177 tier: plan.tier,
2178 db_fingerprint: plan.db_fingerprint,
2179 model_revision: plan.model_revision,
2180 total_conversations: batch.total_conversations,
2181 conversations_in_batch: batch.conversations_in_batch,
2182 last_offset: batch.last_conversation_id,
2183 },
2184 next_last_message_id,
2185 sink,
2186 );
2187
2188 match &outcome {
2189 Ok(o) => {
2190 if sink.is_active() {
2191 sink.emit(
2192 SemanticProgressEvent::Complete,
2193 SemanticProgressFields {
2194 rows_processed: Some(o.conversations_processed),
2195 rows_total: Some(o.total_conversations),
2196 last_conversation_id: Some(o.last_offset),
2197 last_message_id: next_last_message_id,
2198 ..Default::default()
2199 },
2200 );
2201 }
2202 }
2203 Err(err) => {
2204 if sink.is_active() {
2205 sink.emit(
2206 SemanticProgressEvent::Error,
2207 SemanticProgressFields {
2208 error: Some(err.to_string()),
2209 last_conversation_id: Some(batch.last_conversation_id),
2210 last_message_id: next_last_message_id,
2211 ..Default::default()
2212 },
2213 );
2214 }
2215 }
2216 }
2217
2218 outcome
2219 }
2220
2221 pub fn build_hnsw_index(
2235 &self,
2236 vector_index: &FsVectorIndex,
2237 data_dir: &Path,
2238 m: Option<usize>,
2239 ef_construction: Option<usize>,
2240 ) -> Result<PathBuf> {
2241 let m = m.unwrap_or(FS_HNSW_DEFAULT_M);
2242 let ef_construction = ef_construction.unwrap_or(FS_HNSW_DEFAULT_EF_CONSTRUCTION);
2243
2244 tracing::info!(
2245 embedder = self.embedder_id(),
2246 count = vector_index.record_count(),
2247 m,
2248 ef_construction,
2249 "Building HNSW index for approximate nearest neighbor search"
2250 );
2251
2252 let config = FsHnswConfig {
2253 m,
2254 ef_construction,
2255 ..FsHnswConfig::default()
2256 };
2257 let hnsw = FsHnswIndex::build_from_vector_index(vector_index, config)
2258 .map_err(|err| anyhow::anyhow!("build HNSW index failed: {err}"))?;
2259
2260 let hnsw_path = hnsw_index_path(data_dir, self.embedder_id());
2261 if let Some(parent) = hnsw_path.parent() {
2262 std::fs::create_dir_all(parent)?;
2263 }
2264 hnsw.save(&hnsw_path)
2265 .map_err(|err| anyhow::anyhow!("save HNSW index failed: {err}"))?;
2266
2267 tracing::info!(?hnsw_path, "Saved HNSW index");
2268 Ok(hnsw_path)
2269 }
2270}
2271
2272#[cfg(test)]
2273mod tests {
2274 use super::*;
2275 use crate::model::types::{Agent, AgentKind, Conversation, Message, MessageRole};
2276 use crate::storage::sqlite::FrankenStorage;
2277 use serde_json::json;
2278 use std::path::Path;
2279 use tempfile::tempdir;
2280
2281 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2282 struct ComparableSemanticInput {
2283 message_id: u64,
2284 created_at_ms: i64,
2285 agent_id: u32,
2286 workspace_id: u32,
2287 source_id: u32,
2288 role: u8,
2289 content: String,
2290 }
2291
2292 fn comparable_semantic_inputs(mut inputs: Vec<EmbeddingInput>) -> Vec<ComparableSemanticInput> {
2293 let mut comparable: Vec<ComparableSemanticInput> = inputs
2294 .drain(..)
2295 .map(|input| ComparableSemanticInput {
2296 message_id: input.message_id,
2297 created_at_ms: input.created_at_ms,
2298 agent_id: input.agent_id,
2299 workspace_id: input.workspace_id,
2300 source_id: input.source_id,
2301 role: input.role,
2302 content: input.content,
2303 })
2304 .collect();
2305 comparable.sort();
2306 comparable
2307 }
2308
2309 fn test_conversation(external_id: &str, body: &str) -> Conversation {
2310 test_conversation_fixture(
2311 external_id,
2312 vec![Message {
2313 id: None,
2314 idx: 0,
2315 role: MessageRole::User,
2316 author: None,
2317 created_at: Some(1_700_000_000_500),
2318 content: body.to_string(),
2319 extra_json: json!({}),
2320 snippets: Vec::new(),
2321 }],
2322 "local",
2323 None,
2324 )
2325 }
2326
2327 fn test_conversation_with_messages(external_id: &str, messages: Vec<Message>) -> Conversation {
2328 test_conversation_fixture(external_id, messages, "remote-laptop", Some("builder-host"))
2329 }
2330
2331 fn test_conversation_fixture(
2332 external_id: &str,
2333 messages: Vec<Message>,
2334 source_id: &str,
2335 origin_host: Option<&str>,
2336 ) -> Conversation {
2337 Conversation {
2338 id: None,
2339 agent_slug: "codex".to_string(),
2340 workspace: None,
2341 external_id: Some(external_id.to_string()),
2342 title: Some(format!("semantic {external_id}")),
2343 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2344 started_at: Some(1_700_000_000_000),
2345 ended_at: Some(1_700_000_001_000),
2346 approx_tokens: None,
2347 metadata_json: json!({}),
2348 messages,
2349 source_id: source_id.to_string(),
2350 origin_host: origin_host.map(str::to_string),
2351 }
2352 }
2353
2354 fn default_scheduler_signals() -> SemanticBackfillSchedulerSignals {
2355 SemanticBackfillSchedulerSignals {
2356 foreground_pressure: false,
2357 lexical_repair_active: false,
2358 force: false,
2359 operator_disabled: false,
2360 }
2361 }
2362
2363 struct EnvVarGuard {
2364 key: &'static str,
2365 prior: Option<String>,
2366 }
2367
2368 impl EnvVarGuard {
2369 fn set(key: &'static str, value: &str) -> Self {
2370 let prior = std::env::var(key).ok();
2371 unsafe {
2373 std::env::set_var(key, value);
2374 }
2375 Self { key, prior }
2376 }
2377
2378 fn remove(key: &'static str) -> Self {
2379 let prior = std::env::var(key).ok();
2380 unsafe {
2382 std::env::remove_var(key);
2383 }
2384 Self { key, prior }
2385 }
2386 }
2387
2388 impl Drop for EnvVarGuard {
2389 fn drop(&mut self) {
2390 unsafe {
2392 match self.prior.as_deref() {
2393 Some(value) => std::env::set_var(self.key, value),
2394 None => std::env::remove_var(self.key),
2395 }
2396 }
2397 }
2398 }
2399
2400 #[test]
2401 fn semantic_backfill_scheduler_runs_and_scales_batch_under_idle_budget() {
2402 let policy = SemanticPolicy::compiled_defaults();
2403 let decision = semantic_backfill_scheduler_decision_for_capacity(
2404 &policy,
2405 64,
2406 &default_scheduler_signals(),
2407 80,
2408 );
2409
2410 assert!(decision.should_run());
2411 assert_eq!(decision.state, SemanticBackfillSchedulerState::Running);
2412 assert_eq!(
2413 decision.reason,
2414 SemanticBackfillSchedulerReason::IdleBudgetAvailable
2415 );
2416 assert_eq!(decision.scheduled_batch_conversations, 51);
2417 assert_eq!(decision.current_capacity_pct, 80);
2418 assert_eq!(decision.next_eligible_after_ms, 0);
2419 }
2420
2421 #[test]
2422 fn semantic_backfill_scheduler_reason_next_steps_are_stable() {
2423 for (reason, expected) in [
2424 (
2425 SemanticBackfillSchedulerReason::IdleBudgetAvailable,
2426 "background semantic backfill is within idle budgets",
2427 ),
2428 (
2429 SemanticBackfillSchedulerReason::OperatorDisabled,
2430 "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE",
2431 ),
2432 (
2433 SemanticBackfillSchedulerReason::PolicyDisabled,
2434 "semantic policy disables background semantic backfill",
2435 ),
2436 (
2437 SemanticBackfillSchedulerReason::ForegroundPressure,
2438 "foreground pressure is present; retry after the idle delay",
2439 ),
2440 (
2441 SemanticBackfillSchedulerReason::LexicalRepairActive,
2442 "lexical repair is active; semantic backfill is yielding",
2443 ),
2444 (
2445 SemanticBackfillSchedulerReason::CapacityBelowFloor,
2446 "machine responsiveness capacity is below the semantic backfill floor",
2447 ),
2448 (
2449 SemanticBackfillSchedulerReason::ThreadBudgetZero,
2450 "semantic backfill thread budget is zero",
2451 ),
2452 (
2453 SemanticBackfillSchedulerReason::BatchBudgetZero,
2454 "semantic backfill batch budget is zero",
2455 ),
2456 ] {
2457 assert_eq!(reason.next_step(), expected, "{reason:?}");
2458 }
2459 }
2460
2461 #[test]
2462 fn semantic_backfill_scheduler_yields_to_foreground_and_lexical_pressure() {
2463 let policy = SemanticPolicy::compiled_defaults();
2464 let foreground = SemanticBackfillSchedulerSignals {
2465 foreground_pressure: true,
2466 ..default_scheduler_signals()
2467 };
2468 let foreground_decision =
2469 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &foreground, 100);
2470 assert!(!foreground_decision.should_run());
2471 assert_eq!(
2472 foreground_decision.state,
2473 SemanticBackfillSchedulerState::Paused
2474 );
2475 assert_eq!(
2476 foreground_decision.reason,
2477 SemanticBackfillSchedulerReason::ForegroundPressure
2478 );
2479 assert_eq!(
2480 foreground_decision.next_eligible_after_ms,
2481 policy.idle_delay_seconds * 1000
2482 );
2483
2484 let lexical_repair = SemanticBackfillSchedulerSignals {
2485 lexical_repair_active: true,
2486 ..default_scheduler_signals()
2487 };
2488 let lexical_decision =
2489 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &lexical_repair, 100);
2490 assert!(!lexical_decision.should_run());
2491 assert_eq!(
2492 lexical_decision.state,
2493 SemanticBackfillSchedulerState::Paused
2494 );
2495 assert_eq!(
2496 lexical_decision.reason,
2497 SemanticBackfillSchedulerReason::LexicalRepairActive
2498 );
2499 }
2500
2501 #[test]
2502 fn semantic_backfill_scheduler_honors_policy_disable_and_force_override() {
2503 let mut policy = SemanticPolicy::compiled_defaults();
2504 policy.mode = crate::search::policy::SemanticMode::LexicalOnly;
2505
2506 let disabled = semantic_backfill_scheduler_decision_for_capacity(
2507 &policy,
2508 64,
2509 &default_scheduler_signals(),
2510 100,
2511 );
2512 assert!(!disabled.should_run());
2513 assert_eq!(disabled.state, SemanticBackfillSchedulerState::Disabled);
2514 assert_eq!(
2515 disabled.reason,
2516 SemanticBackfillSchedulerReason::PolicyDisabled
2517 );
2518
2519 let forced = SemanticBackfillSchedulerSignals {
2520 force: true,
2521 ..default_scheduler_signals()
2522 };
2523 let forced_decision =
2524 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &forced, 100);
2525 assert!(forced_decision.should_run());
2526 assert_eq!(
2527 forced_decision.reason,
2528 SemanticBackfillSchedulerReason::IdleBudgetAvailable
2529 );
2530 assert!(forced_decision.forced);
2531 }
2532
2533 #[test]
2534 fn test_batch_embedding() {
2535 let indexer = SemanticIndexer::new("hash", None).unwrap();
2536 let messages = vec![
2537 EmbeddingInput::new(1, "Hello world"),
2538 EmbeddingInput::new(2, "Goodbye world"),
2539 ];
2540
2541 let embeddings = indexer.embed_messages(&messages).unwrap();
2542
2543 assert_eq!(embeddings.len(), 2);
2544 assert_eq!(embeddings[0].message_id, 1);
2545 assert_eq!(embeddings[1].message_id, 2);
2546 assert_eq!(embeddings[0].embedding.len(), indexer.embedder_dimension());
2547 }
2548
2549 #[test]
2550 fn test_progress_indicator() {
2551 let indexer = SemanticIndexer::new("hash", None).unwrap();
2552 let messages: Vec<_> = (0..1000)
2553 .map(|i| EmbeddingInput::new(i as u64, format!("Message {}", i)))
2554 .collect();
2555
2556 let embeddings = indexer.embed_messages(&messages).unwrap();
2557 assert_eq!(embeddings.len(), messages.len());
2558 }
2559
2560 #[test]
2561 fn test_build_and_save_index() {
2562 let indexer = SemanticIndexer::new("hash", None).unwrap();
2563 let messages = vec![
2564 EmbeddingInput::new(1, "Hello world"),
2565 EmbeddingInput::new(2, "Goodbye world"),
2566 ];
2567
2568 let embeddings = indexer.embed_messages(&messages).unwrap();
2569 let tmp = tempdir().unwrap();
2570 let index = indexer
2571 .build_and_save_index(embeddings, tmp.path())
2572 .unwrap();
2573 assert_eq!(index.embedder_id(), indexer.embedder_id());
2574 assert_eq!(index.dimension(), indexer.embedder_dimension());
2575 assert_eq!(index.record_count(), 2);
2576 }
2577
2578 #[test]
2579 fn sharded_index_build_writes_sidecar_without_runtime_publish() {
2580 let indexer = SemanticIndexer::new("hash", None).unwrap();
2581 let messages: Vec<_> = (0..5)
2582 .map(|idx| EmbeddingInput::new(idx, format!("semantic shard message {idx}")))
2583 .collect();
2584 let embeddings = indexer.embed_messages(&messages).unwrap();
2585 let tmp = tempdir().unwrap();
2586
2587 let outcome = indexer
2588 .build_and_save_index_shards(
2589 embeddings,
2590 tmp.path(),
2591 SemanticShardBuildPlan {
2592 tier: TierKind::Fast,
2593 db_fingerprint: "db-fp-sharded-build".to_string(),
2594 model_revision: "hash".to_string(),
2595 total_conversations: 5,
2596 max_records_per_shard: 2,
2597 build_ann: false,
2598 },
2599 )
2600 .unwrap();
2601
2602 assert_eq!(outcome.shard_count, 3);
2603 assert_eq!(outcome.doc_count, 5);
2604 assert_eq!(outcome.total_conversations, 5);
2605 assert!(outcome.complete);
2606 assert_eq!(outcome.index_paths.len(), 3);
2607 for path in &outcome.index_paths {
2608 let shard = FsVectorIndex::open(path).unwrap();
2609 assert_eq!(shard.embedder_id(), indexer.embedder_id());
2610 assert!(shard.record_count() > 0);
2611 }
2612
2613 let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2614 let summary = shards.summary(TierKind::Fast, indexer.embedder_id(), "db-fp-sharded-build");
2615 assert!(summary.complete);
2616 assert_eq!(summary.ready_shards, 3);
2617 assert_eq!(summary.ann_ready_shards, 0);
2618 assert_eq!(summary.doc_count, 5);
2619 assert_eq!(summary.total_conversations, 5);
2620
2621 assert!(
2622 SemanticManifest::load(tmp.path()).unwrap().is_none(),
2623 "sidecar shards must not publish the main runtime manifest"
2624 );
2625 assert!(!vector_index_path(tmp.path(), indexer.embedder_id()).exists());
2626 }
2627
2628 #[test]
2629 fn sharded_index_build_rejects_zero_sized_shards() {
2630 let indexer = SemanticIndexer::new("hash", None).unwrap();
2631 let err = indexer
2632 .build_and_save_index_shards(
2633 std::iter::empty(),
2634 tempdir().unwrap().path(),
2635 SemanticShardBuildPlan {
2636 tier: TierKind::Fast,
2637 db_fingerprint: "db-fp-sharded-build".to_string(),
2638 model_revision: "hash".to_string(),
2639 total_conversations: 0,
2640 max_records_per_shard: 0,
2641 build_ann: false,
2642 },
2643 )
2644 .unwrap_err();
2645
2646 assert!(err.to_string().contains("max_records_per_shard > 0"));
2647 }
2648
2649 #[test]
2650 fn sharded_ann_build_records_per_shard_accelerators() {
2651 let indexer = SemanticIndexer::new("hash", None).unwrap();
2652 let messages: Vec<_> = (0..8)
2653 .map(|idx| EmbeddingInput::new(idx, format!("semantic ann shard message {idx}")))
2654 .collect();
2655 let embeddings = indexer.embed_messages(&messages).unwrap();
2656 let tmp = tempdir().unwrap();
2657
2658 let outcome = indexer
2659 .build_and_save_index_shards(
2660 embeddings,
2661 tmp.path(),
2662 SemanticShardBuildPlan {
2663 tier: TierKind::Fast,
2664 db_fingerprint: "db-fp-sharded-ann-build".to_string(),
2665 model_revision: "hash".to_string(),
2666 total_conversations: 8,
2667 max_records_per_shard: 4,
2668 build_ann: true,
2669 },
2670 )
2671 .unwrap();
2672
2673 assert_eq!(outcome.shard_count, 2);
2674 assert_eq!(outcome.ann_index_paths.len(), 2);
2675 for path in &outcome.ann_index_paths {
2676 assert!(path.exists(), "ANN shard missing at {}", path.display());
2677 }
2678
2679 let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2680 let summary = shards.summary(
2681 TierKind::Fast,
2682 indexer.embedder_id(),
2683 "db-fp-sharded-ann-build",
2684 );
2685 assert!(summary.complete);
2686 assert_eq!(summary.ann_ready_shards, 2);
2687 assert!(summary.ann_size_bytes > 0);
2688 assert!(
2689 shards
2690 .shards
2691 .iter()
2692 .all(|record| record.ann_index_path.is_some() && record.ann_ready)
2693 );
2694 }
2695
2696 #[test]
2703 fn embed_messages_golden_digest_hash_embedder() {
2704 use ring::digest::{Context, SHA256};
2705
2706 let corpus: Vec<EmbeddingInput> = (0..64)
2707 .map(|i| {
2708 let body = match i % 5 {
2709 0 => format!("plain text message number {i}"),
2710 1 => format!("**bold** line {i} with _emphasis_"),
2711 2 => format!("```rust\nfn f_{i}() {{ println!(\"{i}\"); }}\n```"),
2712 3 => format!(" whitespace {i} "),
2713 _ => format!("unicode \u{00E9}\u{0301} + emoji \u{1F600} {i}"),
2714 };
2715 EmbeddingInput::new(i as u64, body)
2716 })
2717 .collect();
2718
2719 let indexer = SemanticIndexer::new("hash", None)
2720 .unwrap()
2721 .with_batch_size(16)
2722 .unwrap();
2723 let embeddings = indexer.embed_messages(&corpus).unwrap();
2724
2725 let mut ctx = Context::new(&SHA256);
2729 for em in &embeddings {
2730 ctx.update(&em.message_id.to_le_bytes());
2731 ctx.update(&em.content_hash);
2732 for v in &em.embedding {
2733 ctx.update(&v.to_le_bytes());
2734 }
2735 }
2736 let digest = hex::encode(ctx.finish().as_ref());
2737
2738 const EXPECTED: &str = "22d9ae7076925a4b70a194b0f519dfb1d465cc757368c296ef24055a02038c2c";
2745 assert_eq!(
2746 digest, EXPECTED,
2747 "embed_messages golden digest drifted; if this was intentional, \
2748 update EXPECTED in this test and record the reason in the commit message"
2749 );
2750 }
2751
2752 #[test]
2753 fn parallel_prep_matches_serial_prep_bitwise() {
2754 let inputs: Vec<EmbeddingInput> = (0..500)
2757 .map(|i| {
2758 let text = match i % 7 {
2759 0 => format!("Plain message number {i} with some ordinary words."),
2760 1 => format!("**Bold** and _italic_ markdown line {i}"),
2761 2 => format!(
2762 "```rust\nfn example_{i}() {{\n println!(\"code block {i}\");\n}}\n```\nfollow-up text"
2763 ),
2764 3 => String::new(), 4 => format!(" whitespace galore {i} "),
2766 5 => format!("Unicode \u{00E9}\u{0301} (combining accent) and emoji \u{1F600} line {i}"),
2767 _ => format!(
2768 "Mixed line {i}: `inline_code`, [link](http://x), {{braces}}, and \u{201C}curly quotes\u{201D}."
2769 ),
2770 };
2771 EmbeddingInput::new(i as u64, text)
2772 })
2773 .collect();
2774
2775 let serial = prepare_window(&inputs, true);
2776 let parallel = prepare_window(&inputs, false);
2777
2778 assert_eq!(
2779 serial.len(),
2780 parallel.len(),
2781 "serial and parallel prep should skip the same number of empty canonicals"
2782 );
2783
2784 for (s, p) in serial.iter().zip(parallel.iter()) {
2785 assert_eq!(
2786 s.msg.message_id, p.msg.message_id,
2787 "ordering must be preserved between serial and parallel prep"
2788 );
2789 assert_eq!(
2790 s.canonical, p.canonical,
2791 "canonical form diverged between serial and parallel prep"
2792 );
2793 assert_eq!(
2794 s.hash, p.hash,
2795 "content hash diverged between serial and parallel prep"
2796 );
2797 }
2798 }
2799
2800 #[test]
2801 fn parallel_prep_filters_empty_canonicals() {
2802 let inputs = vec![
2803 EmbeddingInput::new(1, "valid content"),
2804 EmbeddingInput::new(2, ""),
2805 EmbeddingInput::new(3, " \n\n \t "),
2806 EmbeddingInput::new(4, "more valid content"),
2807 ];
2808
2809 let prepared = prepare_window(&inputs, false);
2810 let ids: Vec<u64> = prepared.iter().map(|p| p.msg.message_id).collect();
2811
2812 assert!(ids.contains(&1));
2813 assert!(ids.contains(&4));
2814 assert!(!ids.contains(&2));
2816 assert!(!ids.contains(&3));
2817 }
2818
2819 #[test]
2820 fn memoized_serial_prep_matches_stateless_prepare_window() {
2821 let inputs = vec![
2822 EmbeddingInput::new(1, "repeat me exactly"),
2823 EmbeddingInput::new(2, "repeat me exactly"),
2824 EmbeddingInput::new(3, "unique payload"),
2825 EmbeddingInput::new(4, ""),
2826 EmbeddingInput::new(5, "repeat me exactly"),
2827 ];
2828
2829 let baseline = prepare_window(&inputs, true);
2830 let mut cache = ContentAddressedMemoCache::with_capacity(16);
2831 let memoized = prepare_window_with_memo(&inputs, &mut cache);
2832
2833 assert_eq!(baseline.len(), memoized.len());
2834 for (plain, cached) in baseline.iter().zip(memoized.iter()) {
2835 assert_eq!(plain.msg.message_id, cached.msg.message_id);
2836 assert_eq!(plain.canonical, cached.canonical);
2837 assert_eq!(plain.hash, cached.hash);
2838 }
2839 }
2840
2841 #[test]
2842 fn semantic_prep_memo_key_uses_stable_content_hash_bytes() {
2843 let key = semantic_prep_memo_key("repeat me exactly");
2844 let expected = content_hash("repeat me exactly");
2845
2846 assert_eq!(key.content_hash.as_bytes(), expected.as_slice());
2847 assert_eq!(key.content_hash.as_bytes().len(), expected.len());
2848 assert_eq!(key.algorithm, SEMANTIC_PREP_MEMO_ALGORITHM);
2849 assert_eq!(key.algorithm_version, SEMANTIC_PREP_MEMO_VERSION);
2850 }
2851
2852 #[test]
2853 fn memoized_serial_prep_reuses_duplicate_content_across_windows() {
2854 let inputs = vec![
2855 EmbeddingInput::new(1, "repeat me exactly"),
2856 EmbeddingInput::new(2, "repeat me exactly"),
2857 EmbeddingInput::new(3, "unique payload"),
2858 EmbeddingInput::new(4, ""),
2859 EmbeddingInput::new(5, "repeat me exactly"),
2860 ];
2861
2862 let mut cache = ContentAddressedMemoCache::with_capacity(16);
2863 let prepared = prepare_window_with_memo(&inputs, &mut cache);
2864 let stats = cache.stats().clone();
2865
2866 assert_eq!(prepared.len(), 4);
2867 assert_eq!(stats.hits, 2);
2868 assert_eq!(stats.misses, 3);
2869 assert_eq!(stats.inserts, 2);
2870 assert_eq!(stats.live_entries, 2);
2871 }
2872
2873 #[test]
2874 fn packet_embedding_inputs_reuse_memoized_prep_for_duplicate_content() -> Result<()> {
2875 let temp = tempdir().unwrap();
2876 let db_path = temp.path().join("agent_search.db");
2877 let storage = FrankenStorage::open(&db_path)?;
2878 let agent_id = storage.ensure_agent(&Agent {
2879 id: None,
2880 slug: "codex".to_string(),
2881 name: "Codex".to_string(),
2882 version: None,
2883 kind: AgentKind::Cli,
2884 })?;
2885
2886 storage.insert_conversation_tree(
2887 agent_id,
2888 None,
2889 &test_conversation_with_messages(
2890 "packet-memo-conv-one",
2891 vec![
2892 Message {
2893 id: None,
2894 idx: 0,
2895 role: MessageRole::User,
2896 author: None,
2897 created_at: Some(1_700_000_010_100),
2898 content: "shared semantic payload".to_string(),
2899 extra_json: json!({}),
2900 snippets: Vec::new(),
2901 },
2902 Message {
2903 id: None,
2904 idx: 1,
2905 role: MessageRole::Agent,
2906 author: None,
2907 created_at: Some(1_700_000_010_200),
2908 content: "unique semantic payload one".to_string(),
2909 extra_json: json!({}),
2910 snippets: Vec::new(),
2911 },
2912 ],
2913 ),
2914 )?;
2915 storage.insert_conversation_tree(
2916 agent_id,
2917 None,
2918 &test_conversation_with_messages(
2919 "packet-memo-conv-two",
2920 vec![
2921 Message {
2922 id: None,
2923 idx: 0,
2924 role: MessageRole::Tool,
2925 author: None,
2926 created_at: Some(1_700_000_010_300),
2927 content: "shared semantic payload".to_string(),
2928 extra_json: json!({}),
2929 snippets: Vec::new(),
2930 },
2931 Message {
2932 id: None,
2933 idx: 1,
2934 role: MessageRole::Agent,
2935 author: None,
2936 created_at: Some(1_700_000_010_400),
2937 content: "unique semantic payload two".to_string(),
2938 extra_json: json!({}),
2939 snippets: Vec::new(),
2940 },
2941 ],
2942 ),
2943 )?;
2944
2945 let packet_inputs = packet_embedding_inputs_from_storage(&storage)?;
2946 let mut cache = ContentAddressedMemoCache::with_capacity(16);
2947 let prepared = prepare_window_with_memo(&packet_inputs, &mut cache);
2948 let stats = cache.stats().clone();
2949
2950 assert_eq!(packet_inputs.len(), 4);
2951 assert_eq!(prepared.len(), 4);
2952 assert_eq!(
2953 semantic_prep_memo_key("shared semantic payload")
2954 .content_hash
2955 .as_bytes()
2956 .len(),
2957 32
2958 );
2959 assert_eq!(stats.hits, 1);
2960 assert_eq!(stats.misses, 3);
2961 assert_eq!(stats.inserts, 3);
2962 assert_eq!(stats.live_entries, 3);
2963 Ok(())
2964 }
2965
2966 #[test]
2967 fn backfill_batch_saves_checkpoint_and_staged_index_until_complete() {
2968 let temp = tempdir().unwrap();
2969 let mut manifest = SemanticManifest::default();
2970 let indexer = SemanticIndexer::new("hash", None).unwrap();
2971 let messages = vec![
2972 EmbeddingInput::new(10, "first staged semantic message"),
2973 EmbeddingInput::new(11, "second staged semantic message"),
2974 ];
2975
2976 let outcome = indexer
2977 .run_backfill_batch(
2978 &messages,
2979 temp.path(),
2980 &mut manifest,
2981 SemanticBackfillBatchPlan {
2982 tier: TierKind::Fast,
2983 db_fingerprint: "db-fp-backfill-partial".to_string(),
2984 model_revision: "hash".to_string(),
2985 total_conversations: 2,
2986 conversations_in_batch: 1,
2987 last_offset: 1,
2988 },
2989 )
2990 .unwrap();
2991
2992 assert!(!outcome.published);
2993 assert!(outcome.checkpoint_saved);
2994 assert!(outcome.index_path.exists());
2995 assert!(!vector_index_path(temp.path(), indexer.embedder_id()).exists());
2996 let checkpoint = manifest.checkpoint.as_ref().expect("checkpoint");
2997 assert_eq!(checkpoint.tier, TierKind::Fast);
2998 assert_eq!(checkpoint.conversations_processed, 1);
2999 assert_eq!(checkpoint.docs_embedded, 2);
3000 assert_eq!(manifest.backlog.total_conversations, 2);
3001 assert!(SemanticManifest::path(temp.path()).exists());
3002 }
3003
3004 #[test]
3005 fn backfill_batch_resumes_staged_index_and_publishes_manifest_atomically() {
3006 let temp = tempdir().unwrap();
3007 let mut manifest = SemanticManifest::default();
3008 let indexer = SemanticIndexer::new("hash", None).unwrap();
3009 let db_fingerprint = "db-fp-backfill-complete";
3010 let staging_path = semantic_staging_index_path(
3011 temp.path(),
3012 TierKind::Fast,
3013 indexer.embedder_id(),
3014 db_fingerprint,
3015 );
3016
3017 let first = vec![EmbeddingInput::new(20, "first resume batch")];
3018 let first_outcome = indexer
3019 .run_backfill_batch(
3020 &first,
3021 temp.path(),
3022 &mut manifest,
3023 SemanticBackfillBatchPlan {
3024 tier: TierKind::Fast,
3025 db_fingerprint: db_fingerprint.to_string(),
3026 model_revision: "hash".to_string(),
3027 total_conversations: 2,
3028 conversations_in_batch: 1,
3029 last_offset: 1,
3030 },
3031 )
3032 .unwrap();
3033 assert_eq!(first_outcome.index_path, staging_path);
3034 assert!(staging_path.exists());
3035
3036 let second = vec![EmbeddingInput::new(21, "second resume batch")];
3037 let second_outcome = indexer
3038 .run_backfill_batch(
3039 &second,
3040 temp.path(),
3041 &mut manifest,
3042 SemanticBackfillBatchPlan {
3043 tier: TierKind::Fast,
3044 db_fingerprint: db_fingerprint.to_string(),
3045 model_revision: "hash".to_string(),
3046 total_conversations: 2,
3047 conversations_in_batch: 1,
3048 last_offset: 2,
3049 },
3050 )
3051 .unwrap();
3052
3053 assert!(second_outcome.published);
3054 assert!(!second_outcome.checkpoint_saved);
3055 assert!(!staging_path.exists());
3056 let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3057 assert_eq!(second_outcome.index_path, final_path);
3058 assert!(final_path.exists());
3059 assert!(manifest.checkpoint.is_none());
3060 let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3061 assert!(artifact.ready);
3062 assert_eq!(artifact.conversation_count, 2);
3063 assert_eq!(artifact.doc_count, 2);
3064 assert_eq!(manifest.backlog.fast_tier_processed, 2);
3065
3066 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
3067 assert!(loaded.checkpoint.is_none());
3068 assert!(loaded.fast_tier.as_ref().is_some_and(|record| record.ready));
3069 }
3070
3071 #[test]
3072 fn backfill_publish_compacts_resumed_wal_before_rename() {
3073 let temp = tempdir().unwrap();
3074 let mut manifest = SemanticManifest::default();
3075 let indexer = SemanticIndexer::new("hash", None).unwrap();
3076 let db_fingerprint = "db-fp-backfill-small-resume";
3077 let first: Vec<EmbeddingInput> = (0..20)
3078 .map(|idx| EmbeddingInput::new(100 + idx, format!("first batch message {idx}")))
3079 .collect();
3080
3081 let first_outcome = indexer
3082 .run_backfill_batch(
3083 &first,
3084 temp.path(),
3085 &mut manifest,
3086 SemanticBackfillBatchPlan {
3087 tier: TierKind::Fast,
3088 db_fingerprint: db_fingerprint.to_string(),
3089 model_revision: "hash".to_string(),
3090 total_conversations: 2,
3091 conversations_in_batch: 1,
3092 last_offset: 1,
3093 },
3094 )
3095 .unwrap();
3096 assert!(first_outcome.checkpoint_saved);
3097
3098 let second = vec![EmbeddingInput::new(200, "small final resume batch")];
3099 let second_outcome = indexer
3100 .run_backfill_batch(
3101 &second,
3102 temp.path(),
3103 &mut manifest,
3104 SemanticBackfillBatchPlan {
3105 tier: TierKind::Fast,
3106 db_fingerprint: db_fingerprint.to_string(),
3107 model_revision: "hash".to_string(),
3108 total_conversations: 2,
3109 conversations_in_batch: 1,
3110 last_offset: 2,
3111 },
3112 )
3113 .unwrap();
3114
3115 assert!(second_outcome.published);
3116 let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3117 let mut final_wal_path = final_path.as_os_str().to_os_string();
3118 final_wal_path.push(".wal");
3119 assert!(!PathBuf::from(final_wal_path).exists());
3120
3121 let published_index = FsVectorIndex::open(&final_path).unwrap();
3122 assert_eq!(published_index.record_count(), 21);
3123 let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3124 assert_eq!(artifact.doc_count, 21);
3125 assert_eq!(artifact.conversation_count, 2);
3126 }
3127
3128 #[test]
3129 fn backfill_from_storage_fetches_canonical_batches_and_resumes() -> Result<()> {
3130 let temp = tempdir().unwrap();
3131 let db_path = temp.path().join("agent_search.db");
3132 let storage = FrankenStorage::open(&db_path)?;
3133 let agent_id = storage.ensure_agent(&Agent {
3134 id: None,
3135 slug: "codex".to_string(),
3136 name: "Codex".to_string(),
3137 version: None,
3138 kind: AgentKind::Cli,
3139 })?;
3140 storage.insert_conversation_tree(
3141 agent_id,
3142 None,
3143 &test_conversation("first", "first canonical semantic message"),
3144 )?;
3145 storage.insert_conversation_tree(
3146 agent_id,
3147 None,
3148 &test_conversation("second", "second canonical semantic message"),
3149 )?;
3150
3151 let mut manifest = SemanticManifest::default();
3152 let indexer = SemanticIndexer::new("hash", None)?;
3153
3154 let first = indexer.run_backfill_from_storage(
3155 &storage,
3156 temp.path(),
3157 &mut manifest,
3158 SemanticBackfillStoragePlan {
3159 tier: TierKind::Fast,
3160 db_fingerprint: "canonical-db-fp".to_string(),
3161 model_revision: "hash".to_string(),
3162 max_conversations: 1,
3163 },
3164 )?;
3165 assert!(!first.published);
3166 assert!(first.checkpoint_saved);
3167 assert_eq!(first.conversations_processed, 1);
3168 assert_eq!(first.total_conversations, 2);
3169 assert_eq!(first.embedded_docs, 1);
3170 assert!(first.last_offset > 0);
3171
3172 let second = indexer.run_backfill_from_storage(
3173 &storage,
3174 temp.path(),
3175 &mut manifest,
3176 SemanticBackfillStoragePlan {
3177 tier: TierKind::Fast,
3178 db_fingerprint: "canonical-db-fp".to_string(),
3179 model_revision: "hash".to_string(),
3180 max_conversations: 1,
3181 },
3182 )?;
3183 assert!(second.published);
3184 assert!(!second.checkpoint_saved);
3185 assert_eq!(second.conversations_processed, 2);
3186 assert_eq!(second.embedded_docs, 1);
3187 assert!(manifest.checkpoint.is_none());
3188 assert_eq!(
3189 manifest.fast_tier.as_ref().map(|record| record.doc_count),
3190 Some(2)
3191 );
3192 Ok(())
3193 }
3194
3195 #[test]
3196 fn canonical_embedding_batch_uses_conversation_packet_semantic_projection() -> Result<()> {
3197 let temp = tempdir().unwrap();
3198 let db_path = temp.path().join("agent_search.db");
3199 let storage = FrankenStorage::open(&db_path)?;
3200 let agent_id = storage.ensure_agent(&Agent {
3201 id: None,
3202 slug: "codex".to_string(),
3203 name: "Codex".to_string(),
3204 version: None,
3205 kind: AgentKind::Cli,
3206 })?;
3207 storage.insert_conversation_tree(
3208 agent_id,
3209 None,
3210 &test_conversation_with_messages(
3211 "packet-projection",
3212 vec![
3213 Message {
3214 id: None,
3215 idx: 0,
3216 role: MessageRole::User,
3217 author: None,
3218 created_at: Some(1_700_000_000_500),
3219 content: "user semantic text".to_string(),
3220 extra_json: json!({}),
3221 snippets: Vec::new(),
3222 },
3223 Message {
3224 id: None,
3225 idx: 1,
3226 role: MessageRole::Tool,
3227 author: None,
3228 created_at: Some(1_700_000_000_600),
3229 content: "tool semantic text".to_string(),
3230 extra_json: json!({}),
3231 snippets: Vec::new(),
3232 },
3233 Message {
3234 id: None,
3235 idx: 2,
3236 role: MessageRole::System,
3237 author: None,
3238 created_at: Some(1_700_000_000_700),
3239 content: String::new(),
3240 extra_json: json!({}),
3241 snippets: Vec::new(),
3242 },
3243 ],
3244 ),
3245 )?;
3246
3247 let batch = fetch_canonical_embedding_batch(&storage, 0, 1)?;
3248
3249 assert_eq!(batch.conversations_in_batch, 1);
3250 assert_eq!(batch.inputs.len(), 2);
3251 assert_eq!(batch.inputs[0].content, "user semantic text");
3252 assert_eq!(batch.inputs[1].content, "tool semantic text");
3253 assert_eq!(batch.inputs[0].role, role_code_from_str("user").unwrap());
3254 assert_eq!(batch.inputs[1].role, role_code_from_str("tool").unwrap());
3255 let normalized_source_id =
3256 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3257 let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3258 assert_eq!(batch.inputs[0].source_id, expected_hash);
3259 assert_eq!(batch.inputs[1].source_id, expected_hash);
3260 Ok(())
3261 }
3262
3263 #[test]
3264 fn canonical_embedding_batch_pushes_last_message_id_filter_into_selection() -> Result<()> {
3265 let temp = tempdir().unwrap();
3266 let db_path = temp.path().join("agent_search.db");
3267 let storage = FrankenStorage::open(&db_path)?;
3268 let agent_id = storage.ensure_agent(&Agent {
3269 id: None,
3270 slug: "codex".to_string(),
3271 name: "Codex".to_string(),
3272 version: None,
3273 kind: AgentKind::Cli,
3274 })?;
3275 storage.insert_conversation_tree(
3276 agent_id,
3277 None,
3278 &test_conversation("before-watermark", "old semantic message"),
3279 )?;
3280 let watermark: i64 = storage.raw().query_row_map(
3281 "SELECT MAX(id) FROM messages",
3282 &[] as &[ParamValue],
3283 |row| row.get_typed(0),
3284 )?;
3285 storage.insert_conversation_tree(
3286 agent_id,
3287 None,
3288 &test_conversation("after-watermark", "new semantic message"),
3289 )?;
3290
3291 let batch = fetch_canonical_embedding_batch_inner(&storage, 0, 8, Some(watermark))?;
3292
3293 assert_eq!(
3294 batch.conversations_in_batch, 1,
3295 "last_message_id must be pushed into candidate selection so old conversations are not counted in the batch"
3296 );
3297 assert_eq!(batch.total_conversations, 2);
3298 assert_eq!(batch.inputs.len(), 1);
3299 assert_eq!(batch.inputs[0].content, "new semantic message");
3300 assert!(
3301 i64::try_from(batch.inputs[0].message_id).unwrap_or(i64::MAX) > watermark,
3302 "selected semantic input must be strictly newer than the checkpoint message id"
3303 );
3304 Ok(())
3305 }
3306
3307 #[test]
3308 fn packet_embedding_inputs_from_storage_since_only_emits_new_canonical_messages() -> Result<()>
3309 {
3310 let temp = tempdir().unwrap();
3311 let db_path = temp.path().join("agent_search.db");
3312 let storage = FrankenStorage::open(&db_path)?;
3313 let agent_id = storage.ensure_agent(&Agent {
3314 id: None,
3315 slug: "codex".to_string(),
3316 name: "Codex".to_string(),
3317 version: None,
3318 kind: AgentKind::Cli,
3319 })?;
3320 storage.insert_conversation_tree(
3321 agent_id,
3322 None,
3323 &test_conversation_with_messages(
3324 "packet-delta",
3325 vec![
3326 Message {
3327 id: None,
3328 idx: 0,
3329 role: MessageRole::User,
3330 author: None,
3331 created_at: Some(1_700_000_000_500),
3332 content: "existing semantic text".to_string(),
3333 extra_json: json!({}),
3334 snippets: Vec::new(),
3335 },
3336 Message {
3337 id: None,
3338 idx: 1,
3339 role: MessageRole::Agent,
3340 author: None,
3341 created_at: Some(1_700_000_000_600),
3342 content: "existing assistant text".to_string(),
3343 extra_json: json!({}),
3344 snippets: Vec::new(),
3345 },
3346 ],
3347 ),
3348 )?;
3349 let watermark: i64 = storage.raw().query_row_map(
3350 "SELECT MAX(id) FROM messages",
3351 &[] as &[ParamValue],
3352 |row| row.get_typed(0),
3353 )?;
3354
3355 storage.insert_conversation_tree(
3356 agent_id,
3357 None,
3358 &test_conversation_with_messages(
3359 "packet-delta",
3360 vec![
3361 Message {
3362 id: None,
3363 idx: 0,
3364 role: MessageRole::User,
3365 author: None,
3366 created_at: Some(1_700_000_000_500),
3367 content: "existing semantic text".to_string(),
3368 extra_json: json!({}),
3369 snippets: Vec::new(),
3370 },
3371 Message {
3372 id: None,
3373 idx: 1,
3374 role: MessageRole::Agent,
3375 author: None,
3376 created_at: Some(1_700_000_000_600),
3377 content: "existing assistant text".to_string(),
3378 extra_json: json!({}),
3379 snippets: Vec::new(),
3380 },
3381 Message {
3382 id: None,
3383 idx: 2,
3384 role: MessageRole::Agent,
3385 author: None,
3386 created_at: Some(1_700_000_000_700),
3387 content: "new packet semantic text".to_string(),
3388 extra_json: json!({}),
3389 snippets: Vec::new(),
3390 },
3391 Message {
3392 id: None,
3393 idx: 3,
3394 role: MessageRole::System,
3395 author: None,
3396 created_at: Some(1_700_000_000_800),
3397 content: String::new(),
3398 extra_json: json!({}),
3399 snippets: Vec::new(),
3400 },
3401 ],
3402 ),
3403 )?;
3404
3405 let batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3406
3407 assert_eq!(batch.conversations_in_batch, 1);
3408 assert_eq!(batch.inputs.len(), 1);
3409 assert_eq!(batch.inputs[0].content, "new packet semantic text");
3410 assert_eq!(
3411 batch.inputs[0].role,
3412 role_code_from_str("assistant").unwrap()
3413 );
3414 let normalized_source_id =
3415 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3416 assert_eq!(
3417 batch.inputs[0].source_id,
3418 crc32fast::hash(normalized_source_id.as_bytes())
3419 );
3420 let expected_raw_max_id: i64 = storage.raw().query_row_map(
3421 "SELECT MAX(id) FROM messages",
3422 &[] as &[ParamValue],
3423 |row| row.get_typed(0),
3424 )?;
3425 assert_eq!(batch.raw_max_message_id, Some(expected_raw_max_id));
3426 Ok(())
3427 }
3428
3429 #[test]
3430 fn packet_catch_up_emits_expected_semantic_docs_after_watermark() -> Result<()> {
3431 let temp = tempdir().unwrap();
3432 let db_path = temp.path().join("agent_search.db");
3433 let storage = FrankenStorage::open(&db_path)?;
3434 let agent_id = storage.ensure_agent(&Agent {
3435 id: None,
3436 slug: "codex".to_string(),
3437 name: "Codex".to_string(),
3438 version: None,
3439 kind: AgentKind::Cli,
3440 })?;
3441 let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3442
3443 storage.insert_conversation_tree(
3444 agent_id,
3445 Some(workspace_id),
3446 &test_conversation_with_messages(
3447 "legacy-packet-semantics",
3448 vec![
3449 Message {
3450 id: None,
3451 idx: 0,
3452 role: MessageRole::User,
3453 author: None,
3454 created_at: Some(1_700_000_000_500),
3455 content: "before watermark".to_string(),
3456 extra_json: json!({}),
3457 snippets: Vec::new(),
3458 },
3459 Message {
3460 id: None,
3461 idx: 1,
3462 role: MessageRole::Agent,
3463 author: None,
3464 created_at: Some(1_700_000_000_600),
3465 content: "before watermark assistant".to_string(),
3466 extra_json: json!({}),
3467 snippets: Vec::new(),
3468 },
3469 ],
3470 ),
3471 )?;
3472
3473 let watermark: i64 = storage.raw().query_row_map(
3474 "SELECT MAX(id) FROM messages",
3475 &[] as &[ParamValue],
3476 |row| row.get_typed(0),
3477 )?;
3478
3479 storage.insert_conversation_tree(
3480 agent_id,
3481 Some(workspace_id),
3482 &test_conversation_with_messages(
3483 "legacy-packet-semantics",
3484 vec![
3485 Message {
3486 id: None,
3487 idx: 0,
3488 role: MessageRole::User,
3489 author: None,
3490 created_at: Some(1_700_000_000_500),
3491 content: "before watermark".to_string(),
3492 extra_json: json!({}),
3493 snippets: Vec::new(),
3494 },
3495 Message {
3496 id: None,
3497 idx: 1,
3498 role: MessageRole::Agent,
3499 author: None,
3500 created_at: Some(1_700_000_000_600),
3501 content: "before watermark assistant".to_string(),
3502 extra_json: json!({}),
3503 snippets: Vec::new(),
3504 },
3505 Message {
3506 id: None,
3507 idx: 2,
3508 role: MessageRole::Agent,
3509 author: None,
3510 created_at: Some(1_700_000_000_700),
3511 content: "after watermark assistant".to_string(),
3512 extra_json: json!({}),
3513 snippets: Vec::new(),
3514 },
3515 Message {
3516 id: None,
3517 idx: 3,
3518 role: MessageRole::System,
3519 author: None,
3520 created_at: Some(1_700_000_000_800),
3521 content: String::new(),
3522 extra_json: json!({}),
3523 snippets: Vec::new(),
3524 },
3525 ],
3526 ),
3527 )?;
3528 storage.insert_conversation_tree(
3529 agent_id,
3530 Some(workspace_id),
3531 &test_conversation_with_messages(
3532 "legacy-packet-semantics-second-conv",
3533 vec![
3534 Message {
3535 id: None,
3536 idx: 0,
3537 role: MessageRole::Tool,
3538 author: None,
3539 created_at: Some(1_700_000_000_900),
3540 content: "after watermark tool".to_string(),
3541 extra_json: json!({}),
3542 snippets: Vec::new(),
3543 },
3544 Message {
3545 id: None,
3546 idx: 1,
3547 role: MessageRole::System,
3548 author: None,
3549 created_at: Some(1_700_000_001_000),
3550 content: String::new(),
3551 extra_json: json!({}),
3552 snippets: Vec::new(),
3553 },
3554 ],
3555 ),
3556 )?;
3557
3558 let packet_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3559 let normalized_source_id =
3560 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3561 let source_id_hash = crc32fast::hash(normalized_source_id.as_bytes());
3562 let expected = vec![
3563 ComparableSemanticInput {
3564 message_id: u64::try_from(watermark + 1).unwrap(),
3565 created_at_ms: 1_700_000_000_700,
3566 agent_id: u32::try_from(agent_id).unwrap(),
3567 workspace_id: u32::try_from(workspace_id).unwrap(),
3568 source_id: source_id_hash,
3569 role: role_code_from_str("assistant").unwrap(),
3570 content: "after watermark assistant".to_string(),
3571 },
3572 ComparableSemanticInput {
3573 message_id: u64::try_from(watermark + 3).unwrap(),
3574 created_at_ms: 1_700_000_000_900,
3575 agent_id: u32::try_from(agent_id).unwrap(),
3576 workspace_id: u32::try_from(workspace_id).unwrap(),
3577 source_id: source_id_hash,
3578 role: role_code_from_str("tool").unwrap(),
3579 content: "after watermark tool".to_string(),
3580 },
3581 ];
3582
3583 assert_eq!(comparable_semantic_inputs(packet_batch.inputs), expected);
3584 assert_eq!(packet_batch.conversations_in_batch, 2);
3585 assert_eq!(packet_batch.raw_max_message_id, Some(watermark + 4));
3586 Ok(())
3587 }
3588
3589 #[test]
3590 fn packet_embedding_inputs_for_message_ids_matches_since_selection() -> Result<()> {
3591 let temp = tempdir().unwrap();
3592 let db_path = temp.path().join("agent_search.db");
3593 let storage = FrankenStorage::open(&db_path)?;
3594 let agent_id = storage.ensure_agent(&Agent {
3595 id: None,
3596 slug: "codex".to_string(),
3597 name: "Codex".to_string(),
3598 version: None,
3599 kind: AgentKind::Cli,
3600 })?;
3601 let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3602
3603 storage.insert_conversation_tree(
3604 agent_id,
3605 Some(workspace_id),
3606 &test_conversation_with_messages(
3607 "selected-vs-since",
3608 vec![
3609 Message {
3610 id: None,
3611 idx: 0,
3612 role: MessageRole::User,
3613 author: None,
3614 created_at: Some(1_700_000_100_100),
3615 content: "before watermark".to_string(),
3616 extra_json: json!({}),
3617 snippets: Vec::new(),
3618 },
3619 Message {
3620 id: None,
3621 idx: 1,
3622 role: MessageRole::Agent,
3623 author: None,
3624 created_at: Some(1_700_000_100_200),
3625 content: "before watermark assistant".to_string(),
3626 extra_json: json!({}),
3627 snippets: Vec::new(),
3628 },
3629 ],
3630 ),
3631 )?;
3632
3633 let watermark: i64 = storage.raw().query_row_map(
3634 "SELECT MAX(id) FROM messages",
3635 &[] as &[ParamValue],
3636 |row| row.get_typed(0),
3637 )?;
3638
3639 storage.insert_conversation_tree(
3640 agent_id,
3641 Some(workspace_id),
3642 &test_conversation_with_messages(
3643 "selected-vs-since",
3644 vec![
3645 Message {
3646 id: None,
3647 idx: 0,
3648 role: MessageRole::User,
3649 author: None,
3650 created_at: Some(1_700_000_100_100),
3651 content: "before watermark".to_string(),
3652 extra_json: json!({}),
3653 snippets: Vec::new(),
3654 },
3655 Message {
3656 id: None,
3657 idx: 1,
3658 role: MessageRole::Agent,
3659 author: None,
3660 created_at: Some(1_700_000_100_200),
3661 content: "before watermark assistant".to_string(),
3662 extra_json: json!({}),
3663 snippets: Vec::new(),
3664 },
3665 Message {
3666 id: None,
3667 idx: 2,
3668 role: MessageRole::Tool,
3669 author: None,
3670 created_at: Some(1_700_000_100_300),
3671 content: "after watermark tool".to_string(),
3672 extra_json: json!({}),
3673 snippets: Vec::new(),
3674 },
3675 Message {
3676 id: None,
3677 idx: 3,
3678 role: MessageRole::System,
3679 author: None,
3680 created_at: Some(1_700_000_100_400),
3681 content: String::new(),
3682 extra_json: json!({}),
3683 snippets: Vec::new(),
3684 },
3685 ],
3686 ),
3687 )?;
3688 storage.insert_conversation_tree(
3689 agent_id,
3690 Some(workspace_id),
3691 &test_conversation_with_messages(
3692 "selected-vs-since-second",
3693 vec![Message {
3694 id: None,
3695 idx: 0,
3696 role: MessageRole::Agent,
3697 author: None,
3698 created_at: Some(1_700_000_100_500),
3699 content: "after watermark assistant".to_string(),
3700 extra_json: json!({}),
3701 snippets: Vec::new(),
3702 }],
3703 ),
3704 )?;
3705
3706 let since_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3707 let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
3708 "SELECT DISTINCT conversation_id
3709 FROM messages
3710 WHERE id > ?1
3711 ORDER BY conversation_id ASC",
3712 &[ParamValue::from(watermark)],
3713 |row| row.get_typed(0),
3714 )?;
3715 let selected_message_ids: HashSet<i64> = storage
3716 .raw()
3717 .query_map_collect(
3718 "SELECT id
3719 FROM messages
3720 WHERE id > ?1
3721 ORDER BY id ASC",
3722 &[ParamValue::from(watermark)],
3723 |row| row.get_typed(0),
3724 )?
3725 .into_iter()
3726 .collect();
3727 let selected_inputs = packet_embedding_inputs_from_storage_for_message_ids(
3728 &storage,
3729 &conversation_ids,
3730 &selected_message_ids,
3731 )?;
3732
3733 assert_eq!(
3734 comparable_semantic_inputs(selected_inputs),
3735 comparable_semantic_inputs(since_batch.inputs)
3736 );
3737 Ok(())
3738 }
3739
3740 #[test]
3741 fn default_batch_size_uses_new_value() {
3742 let _guard = EnvVarGuard::remove("CASS_SEMANTIC_BATCH_SIZE");
3745 let indexer = SemanticIndexer::new("hash", None).unwrap();
3746 assert_eq!(indexer.batch_size(), DEFAULT_SEMANTIC_BATCH_SIZE);
3747 }
3748
3749 #[test]
3750 fn parallel_prep_enabled_reuses_truthy_env_parser() {
3751 for (value, expected) in [
3752 ("1", true),
3753 ("true", true),
3754 (" YeS ", true),
3755 ("on", true),
3756 ("0", false),
3757 ("false", false),
3758 ("off", false),
3759 ] {
3760 let _guard = EnvVarGuard::set("CASS_SEMANTIC_PREP_PARALLEL", value);
3761 assert_eq!(parallel_prep_enabled(), expected, "env value {value:?}");
3762 }
3763
3764 let _guard = EnvVarGuard::remove("CASS_SEMANTIC_PREP_PARALLEL");
3765 assert!(!parallel_prep_enabled());
3766 }
3767
3768 #[test]
3769 fn saturating_u64_from_usize_covers_bounds() {
3770 assert_eq!(saturating_u64_from_usize(0), 0);
3771 assert_eq!(saturating_u64_from_usize(42), 42);
3772 assert_eq!(
3773 saturating_u64_from_usize(usize::MAX),
3774 u64::try_from(usize::MAX).unwrap_or(u64::MAX)
3775 );
3776 }
3777
3778 #[test]
3786 fn semantic_inputs_from_packets_matches_storage_replay() -> Result<()> {
3787 let temp = tempdir().unwrap();
3788 let db_path = temp.path().join("agent_search.db");
3789 let storage = FrankenStorage::open(&db_path)?;
3790
3791 let agent_id_codex = storage.ensure_agent(&Agent {
3792 id: None,
3793 slug: "codex".to_string(),
3794 name: "Codex".to_string(),
3795 version: None,
3796 kind: AgentKind::Cli,
3797 })?;
3798 let agent_id_claude = storage.ensure_agent(&Agent {
3799 id: None,
3800 slug: "claude_code".to_string(),
3801 name: "Claude Code".to_string(),
3802 version: None,
3803 kind: AgentKind::Cli,
3804 })?;
3805 let workspace_id =
3806 storage.ensure_workspace(Path::new("/tmp/semantic-equivalence-ws"), None)?;
3807
3808 storage.insert_conversation_tree(
3812 agent_id_codex,
3813 Some(workspace_id),
3814 &test_conversation_with_messages(
3815 "packet-equiv-1",
3816 vec![
3817 Message {
3818 id: None,
3819 idx: 0,
3820 role: MessageRole::User,
3821 author: None,
3822 created_at: Some(1_700_000_000_500),
3823 content: "first user prompt".to_string(),
3824 extra_json: json!({}),
3825 snippets: Vec::new(),
3826 },
3827 Message {
3828 id: None,
3829 idx: 1,
3830 role: MessageRole::Agent,
3831 author: None,
3832 created_at: Some(1_700_000_000_600),
3833 content: "first assistant reply".to_string(),
3834 extra_json: json!({}),
3835 snippets: Vec::new(),
3836 },
3837 Message {
3838 id: None,
3839 idx: 2,
3840 role: MessageRole::System,
3841 author: None,
3842 created_at: Some(1_700_000_000_700),
3843 content: String::new(),
3845 extra_json: json!({}),
3846 snippets: Vec::new(),
3847 },
3848 ],
3849 ),
3850 )?;
3851 storage.insert_conversation_tree(
3852 agent_id_claude,
3853 Some(workspace_id),
3854 &test_conversation_with_messages(
3855 "packet-equiv-2",
3856 vec![
3857 Message {
3858 id: None,
3859 idx: 0,
3860 role: MessageRole::Tool,
3861 author: Some("ripgrep".to_string()),
3862 created_at: Some(1_700_000_001_500),
3863 content: "tool output line".to_string(),
3864 extra_json: json!({}),
3865 snippets: Vec::new(),
3866 },
3867 Message {
3868 id: None,
3869 idx: 1,
3870 role: MessageRole::Agent,
3871 author: None,
3872 created_at: Some(1_700_000_001_600),
3873 content: "second assistant reply".to_string(),
3874 extra_json: json!({}),
3875 snippets: Vec::new(),
3876 },
3877 ],
3878 ),
3879 )?;
3880
3881 let storage_inputs = packet_embedding_inputs_from_storage(&storage)?;
3884
3885 let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
3891 "SELECT DISTINCT m.conversation_id
3892 FROM messages m
3893 JOIN conversations c ON c.id = m.conversation_id
3894 ORDER BY m.conversation_id ASC",
3895 &[] as &[ParamValue],
3896 |row| row.get_typed(0),
3897 )?;
3898 let envelopes = fetch_canonical_embedding_conversations(&storage, &conversation_ids)?;
3899 let mut grouped_messages =
3900 storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
3901 let mut packets: Vec<ConversationPacket> = Vec::with_capacity(envelopes.len());
3902 let mut contexts: Vec<SemanticPacketContext> = Vec::with_capacity(envelopes.len());
3903 for envelope in &envelopes {
3904 let messages = grouped_messages
3905 .remove(&envelope.conversation_id)
3906 .unwrap_or_default();
3907 let provenance = canonical_embedding_packet_provenance(envelope);
3908 let canonical = canonical_embedding_conversation(envelope, &provenance, messages);
3909 packets.push(ConversationPacket::from_canonical_replay(
3910 &canonical, provenance,
3911 ));
3912 contexts.push(SemanticPacketContext {
3913 conversation_id: envelope.conversation_id,
3914 agent_id: saturating_u32_from_i64(envelope.agent_id),
3915 workspace_id: saturating_u32_from_i64(envelope.workspace_id.unwrap_or(0)),
3916 });
3917 }
3918 let packet_inputs = semantic_inputs_from_packets(&packets, &contexts)?;
3919
3920 assert!(
3924 !storage_inputs.is_empty(),
3925 "fixture should produce non-empty semantic inputs (sanity)"
3926 );
3927 assert_eq!(
3928 comparable_semantic_inputs(storage_inputs.clone()),
3929 comparable_semantic_inputs(packet_inputs.clone()),
3930 "packet-driven semantic preparation must match storage replay byte-for-byte"
3931 );
3932
3933 let storage_count = storage_inputs.len();
3938 let packet_count = packet_inputs.len();
3939 assert_eq!(
3940 storage_count, packet_count,
3941 "storage and packet semantic input counts must agree exactly"
3942 );
3943 assert!(
3945 packet_inputs.iter().all(|input| !input.content.is_empty()),
3946 "empty content must be filtered by the packet semantic projection"
3947 );
3948 let normalized_source_id =
3950 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3951 let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3952 assert!(
3953 packet_inputs
3954 .iter()
3955 .all(|input| input.source_id == expected_hash),
3956 "every emitted EmbeddingInput must hash provenance via the packet's normalized source_id"
3957 );
3958
3959 Ok(())
3960 }
3961
3962 #[test]
3968 fn semantic_inputs_from_packets_rejects_length_mismatch() {
3969 let provenance = ConversationPacketProvenance::local();
3970 let canonical = test_conversation("packet-mismatch", "hello");
3971 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
3972 let result = semantic_inputs_from_packets(&[packet], &[]);
3973 assert!(
3974 result.is_err(),
3975 "expected error on packet/context length mismatch"
3976 );
3977 let err = result.unwrap_err().to_string();
3978 assert!(
3979 err.contains("length mismatch"),
3980 "error should mention length mismatch, got: {err}"
3981 );
3982 }
3983}