1use std::collections::{HashMap, HashSet};
2use std::fs;
3use std::io::IsTerminal;
4use std::path::{Path, PathBuf};
5use std::time::{Instant, SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8use frankensearch::index::{
9 HNSW_DEFAULT_EF_CONSTRUCTION as FS_HNSW_DEFAULT_EF_CONSTRUCTION,
10 HNSW_DEFAULT_M as FS_HNSW_DEFAULT_M, HnswConfig as FsHnswConfig, HnswIndex as FsHnswIndex,
11 Quantization as FsQuantization, VectorIndex as FsVectorIndex,
12 VectorIndexWriter as FsVectorIndexWriter,
13};
14use frankensqlite::compat::{ConnectionExt, ParamValue, RowExt};
15use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
16use rayon::prelude::*;
17
18use crate::indexer::memoization::{
19 ContentAddressedMemoCache, MemoCacheAuditRecord, MemoContentHash, MemoKey, MemoLookup,
20};
21use crate::indexer::responsiveness;
22use crate::indexer::semantic_progress::{
23 SemanticProgressEvent, SemanticProgressFields, SemanticProgressSink,
24};
25use crate::model::conversation_packet::{ConversationPacket, ConversationPacketProvenance};
26use crate::model::types::{Conversation, Message};
27use crate::search::canonicalize::{canonicalize_for_embedding, content_hash};
28use crate::search::embedder::Embedder;
29use crate::search::fastembed_embedder::FastEmbedder;
30use crate::search::hash_embedder::HashEmbedder;
31use crate::search::policy::{CHUNKING_STRATEGY_VERSION, SEMANTIC_SCHEMA_VERSION, SemanticPolicy};
32use crate::search::semantic_manifest::{
33 ArtifactRecord, BuildCheckpoint, SemanticManifest, SemanticShardManifest, SemanticShardRecord,
34 TierKind,
35};
36use crate::search::tantivy::{
37 normalized_index_origin_host, normalized_index_origin_kind, normalized_index_source_id,
38};
39use crate::search::vector_index::{
40 ROLE_USER, SemanticDocId, VECTOR_INDEX_DIR, role_code_from_str, vector_index_path,
41};
42use crate::storage::sqlite::FrankenStorage;
43
44const DEFAULT_SEMANTIC_BATCH_SIZE: usize = 128;
49const DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY: usize = 4_096;
50const DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS: u64 = 30_000;
51const DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS: u64 = 300_000;
52const DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT: usize = 10_000;
53const DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT: u64 = 8 * 1024 * 1024;
54const SEMANTIC_PREP_MEMO_ALGORITHM: &str = "semantic_prepare_window";
55const SEMANTIC_PREP_MEMO_VERSION: &str = "canonicalize_for_embedding:v2:stable-content-hash";
56
57fn resolved_env_usize(key: &str, default: usize) -> usize {
58 dotenvy::var(key)
59 .ok()
60 .and_then(|v| v.parse::<usize>().ok())
61 .unwrap_or(default)
62}
63
64fn resolved_env_u64(key: &str, default: u64) -> u64 {
65 dotenvy::var(key)
66 .ok()
67 .and_then(|v| v.parse::<u64>().ok())
68 .unwrap_or(default)
69}
70
71fn resolved_default_batch_size() -> usize {
72 dotenvy::var("CASS_SEMANTIC_BATCH_SIZE")
73 .ok()
74 .and_then(|v| v.parse::<usize>().ok())
75 .filter(|v| *v > 0)
76 .unwrap_or(DEFAULT_SEMANTIC_BATCH_SIZE)
77}
78
79fn resolved_semantic_prep_memo_capacity() -> usize {
80 dotenvy::var("CASS_SEMANTIC_PREP_MEMO_CAPACITY")
81 .ok()
82 .and_then(|v| v.parse::<usize>().ok())
83 .filter(|v| *v > 0)
84 .unwrap_or(DEFAULT_SEMANTIC_PREP_MEMO_CAPACITY)
85}
86
87fn resolved_semantic_embed_batch_warn_after_ms() -> u64 {
88 resolved_env_u64(
89 "CASS_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS",
90 DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS,
91 )
92}
93
94fn resolved_semantic_embed_batch_fail_after_ms() -> u64 {
95 resolved_env_u64(
96 "CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS",
97 DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS,
98 )
99}
100
101fn parallel_prep_enabled() -> bool {
116 env_truthy("CASS_SEMANTIC_PREP_PARALLEL")
117}
118
119fn saturating_u64_from_usize(value: usize) -> u64 {
120 u64::try_from(value).unwrap_or(u64::MAX)
121}
122
123fn saturating_u64_from_millis(value: u128) -> u64 {
124 u64::try_from(value).unwrap_or(u64::MAX)
125}
126
127#[derive(Debug, Clone)]
128pub struct EmbeddingInput {
129 pub message_id: u64,
130 pub created_at_ms: i64,
131 pub agent_id: u32,
132 pub workspace_id: u32,
133 pub source_id: u32,
134 pub role: u8,
135 pub chunk_idx: u8,
136 pub content: String,
137}
138
139impl EmbeddingInput {
140 pub fn new(message_id: u64, content: impl Into<String>) -> Self {
141 Self {
142 message_id,
143 created_at_ms: 0,
144 agent_id: 0,
145 workspace_id: 0,
146 source_id: 0,
147 role: ROLE_USER,
148 chunk_idx: 0,
149 content: content.into(),
150 }
151 }
152}
153
154#[derive(Debug, Clone)]
155pub struct EmbeddedMessage {
156 pub message_id: u64,
157 pub created_at_ms: i64,
158 pub agent_id: u32,
159 pub workspace_id: u32,
160 pub source_id: u32,
161 pub role: u8,
162 pub chunk_idx: u8,
163 pub content_hash: [u8; 32],
164 pub embedding: Vec<f32>,
165}
166
167#[derive(Debug, Clone)]
168pub struct SemanticBackfillBatchPlan {
169 pub tier: TierKind,
170 pub db_fingerprint: String,
171 pub model_revision: String,
172 pub total_conversations: u64,
173 pub conversations_in_batch: u64,
174 pub last_offset: i64,
175 pub cursor_exhausted: bool,
176}
177
178#[derive(Debug, Clone)]
179pub struct SemanticBackfillStoragePlan {
180 pub tier: TierKind,
181 pub db_fingerprint: String,
182 pub model_revision: String,
183 pub max_conversations: usize,
184}
185
186#[derive(Debug, Clone)]
187pub struct SemanticBackfillBatchOutcome {
188 pub tier: TierKind,
189 pub embedder_id: String,
190 pub embedded_docs: u64,
191 pub conversations_processed: u64,
192 pub total_conversations: u64,
193 pub last_offset: i64,
194 pub checkpoint_saved: bool,
195 pub published: bool,
196 pub index_path: PathBuf,
197 pub manifest_path: PathBuf,
198}
199
200impl SemanticBackfillBatchOutcome {
201 pub fn progress_pct(&self) -> f64 {
202 if self.total_conversations == 0 {
203 return if self.published { 100.0 } else { 0.0 };
204 }
205
206 let pct = (self.conversations_processed as f64 / self.total_conversations as f64) * 100.0;
207 let pct = pct.min(100.0);
208 if pct >= 100.0 && !self.published {
209 99.0
210 } else {
211 pct
212 }
213 }
214}
215
216#[derive(Debug, Clone)]
217pub struct SemanticShardBuildPlan {
218 pub tier: TierKind,
219 pub db_fingerprint: String,
220 pub model_revision: String,
221 pub total_conversations: u64,
222 pub max_records_per_shard: usize,
223 pub build_ann: bool,
224}
225
226#[derive(Debug, Clone)]
227pub struct SemanticShardBuildOutcome {
228 pub tier: TierKind,
229 pub embedder_id: String,
230 pub shard_count: u32,
231 pub doc_count: u64,
232 pub total_conversations: u64,
233 pub index_paths: Vec<PathBuf>,
234 pub ann_index_paths: Vec<PathBuf>,
235 pub shard_manifest_path: PathBuf,
236 pub complete: bool,
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
240#[serde(rename_all = "snake_case")]
241pub(crate) enum SemanticBackfillSchedulerState {
242 Running,
243 Paused,
244 Disabled,
245}
246
247impl SemanticBackfillSchedulerState {
248 pub(crate) fn as_str(self) -> &'static str {
249 match self {
250 Self::Running => "running",
251 Self::Paused => "paused",
252 Self::Disabled => "disabled",
253 }
254 }
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
258#[serde(rename_all = "snake_case")]
259pub(crate) enum SemanticBackfillSchedulerReason {
260 IdleBudgetAvailable,
261 OperatorDisabled,
262 PolicyDisabled,
263 ForegroundPressure,
264 LexicalRepairActive,
265 CapacityBelowFloor,
266 ThreadBudgetZero,
267 BatchBudgetZero,
268}
269
270impl SemanticBackfillSchedulerReason {
271 pub(crate) fn next_step(self) -> &'static str {
272 match self {
273 Self::IdleBudgetAvailable => "background semantic backfill is within idle budgets",
274 Self::OperatorDisabled => {
275 "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE"
276 }
277 Self::PolicyDisabled => "semantic policy disables background semantic backfill",
278 Self::ForegroundPressure => {
279 "foreground pressure is present; retry after the idle delay"
280 }
281 Self::LexicalRepairActive => "lexical repair is active; semantic backfill is yielding",
282 Self::CapacityBelowFloor => {
283 "machine responsiveness capacity is below the semantic backfill floor"
284 }
285 Self::ThreadBudgetZero => "semantic backfill thread budget is zero",
286 Self::BatchBudgetZero => "semantic backfill batch budget is zero",
287 }
288 }
289}
290
291#[derive(Debug, Clone, Copy, PartialEq, Eq)]
292pub(crate) struct SemanticBackfillSchedulerSignals {
293 pub foreground_pressure: bool,
294 pub lexical_repair_active: bool,
295 pub force: bool,
296 pub operator_disabled: bool,
297}
298
299impl SemanticBackfillSchedulerSignals {
300 pub(crate) fn from_env() -> Self {
301 Self {
302 foreground_pressure: env_truthy("CASS_SEMANTIC_BACKFILL_FOREGROUND_ACTIVE"),
303 lexical_repair_active: env_truthy("CASS_SEMANTIC_BACKFILL_LEXICAL_REPAIR_ACTIVE"),
304 force: env_truthy("CASS_SEMANTIC_BACKFILL_FORCE"),
305 operator_disabled: env_truthy("CASS_SEMANTIC_BACKFILL_DISABLE"),
306 }
307 }
308}
309
310#[derive(Debug, Clone, serde::Serialize)]
311pub(crate) struct SemanticBackfillSchedulerDecision {
312 pub state: SemanticBackfillSchedulerState,
313 pub reason: SemanticBackfillSchedulerReason,
314 pub requested_batch_conversations: usize,
315 pub scheduled_batch_conversations: usize,
316 pub current_capacity_pct: u32,
317 pub min_capacity_pct: u32,
318 pub max_backfill_threads: usize,
319 pub idle_delay_seconds: u64,
320 pub chunk_timeout_seconds: u64,
321 pub foreground_pressure: bool,
322 pub lexical_repair_active: bool,
323 pub forced: bool,
324 pub next_eligible_after_ms: u64,
325}
326
327impl SemanticBackfillSchedulerDecision {
328 pub(crate) fn should_run(&self) -> bool {
329 matches!(self.state, SemanticBackfillSchedulerState::Running)
330 }
331}
332
333fn env_truthy(key: &str) -> bool {
334 dotenvy::var(key)
335 .ok()
336 .map(|value| {
337 matches!(
338 value.trim().to_ascii_lowercase().as_str(),
339 "1" | "true" | "yes" | "on"
340 )
341 })
342 .unwrap_or(false)
343}
344
345fn env_backfill_min_capacity_pct() -> u32 {
346 dotenvy::var("CASS_SEMANTIC_BACKFILL_MIN_CAPACITY_PCT")
347 .ok()
348 .and_then(|value| value.trim().parse::<u32>().ok())
349 .map(|value| value.clamp(1, 100))
350 .unwrap_or(75)
351}
352
353pub(crate) fn semantic_backfill_scheduler_decision(
354 policy: &SemanticPolicy,
355 requested_batch_conversations: usize,
356 signals: &SemanticBackfillSchedulerSignals,
357) -> SemanticBackfillSchedulerDecision {
358 semantic_backfill_scheduler_decision_for_capacity(
359 policy,
360 requested_batch_conversations,
361 signals,
362 responsiveness::current_capacity_pct(),
363 )
364}
365
366pub(crate) fn semantic_backfill_scheduler_decision_for_capacity(
367 policy: &SemanticPolicy,
368 requested_batch_conversations: usize,
369 signals: &SemanticBackfillSchedulerSignals,
370 current_capacity_pct: u32,
371) -> SemanticBackfillSchedulerDecision {
372 let min_capacity_pct = env_backfill_min_capacity_pct();
373 let paused_delay_ms = policy.idle_delay_seconds.saturating_mul(1000);
374 let mut decision = SemanticBackfillSchedulerDecision {
375 state: SemanticBackfillSchedulerState::Running,
376 reason: SemanticBackfillSchedulerReason::IdleBudgetAvailable,
377 requested_batch_conversations,
378 scheduled_batch_conversations: requested_batch_conversations,
379 current_capacity_pct: current_capacity_pct.clamp(0, 100),
380 min_capacity_pct,
381 max_backfill_threads: policy.max_backfill_threads,
382 idle_delay_seconds: policy.idle_delay_seconds,
383 chunk_timeout_seconds: policy.chunk_timeout_seconds,
384 foreground_pressure: signals.foreground_pressure,
385 lexical_repair_active: signals.lexical_repair_active,
386 forced: signals.force,
387 next_eligible_after_ms: 0,
388 };
389
390 if requested_batch_conversations == 0 {
391 return stopped_scheduler_decision(
392 decision,
393 SemanticBackfillSchedulerState::Disabled,
394 SemanticBackfillSchedulerReason::BatchBudgetZero,
395 paused_delay_ms,
396 );
397 }
398 if policy.max_backfill_threads == 0 && !signals.force {
399 return stopped_scheduler_decision(
400 decision,
401 SemanticBackfillSchedulerState::Disabled,
402 SemanticBackfillSchedulerReason::ThreadBudgetZero,
403 paused_delay_ms,
404 );
405 }
406 if signals.operator_disabled && !signals.force {
407 return stopped_scheduler_decision(
408 decision,
409 SemanticBackfillSchedulerState::Disabled,
410 SemanticBackfillSchedulerReason::OperatorDisabled,
411 paused_delay_ms,
412 );
413 }
414 if !policy.mode.should_build_semantic() && !signals.force {
415 return stopped_scheduler_decision(
416 decision,
417 SemanticBackfillSchedulerState::Disabled,
418 SemanticBackfillSchedulerReason::PolicyDisabled,
419 paused_delay_ms,
420 );
421 }
422 if signals.lexical_repair_active && !signals.force {
423 return stopped_scheduler_decision(
424 decision,
425 SemanticBackfillSchedulerState::Paused,
426 SemanticBackfillSchedulerReason::LexicalRepairActive,
427 paused_delay_ms,
428 );
429 }
430 if signals.foreground_pressure && !signals.force {
431 return stopped_scheduler_decision(
432 decision,
433 SemanticBackfillSchedulerState::Paused,
434 SemanticBackfillSchedulerReason::ForegroundPressure,
435 paused_delay_ms,
436 );
437 }
438 if current_capacity_pct < min_capacity_pct && !signals.force {
439 return stopped_scheduler_decision(
440 decision,
441 SemanticBackfillSchedulerState::Paused,
442 SemanticBackfillSchedulerReason::CapacityBelowFloor,
443 paused_delay_ms,
444 );
445 }
446
447 let capacity = current_capacity_pct.clamp(1, 100) as usize;
448 let scaled = requested_batch_conversations.saturating_mul(capacity) / 100;
449 decision.scheduled_batch_conversations = scaled.max(1).min(requested_batch_conversations);
450 decision
451}
452
453fn stopped_scheduler_decision(
454 mut decision: SemanticBackfillSchedulerDecision,
455 state: SemanticBackfillSchedulerState,
456 reason: SemanticBackfillSchedulerReason,
457 next_eligible_after_ms: u64,
458) -> SemanticBackfillSchedulerDecision {
459 decision.state = state;
460 decision.reason = reason;
461 decision.scheduled_batch_conversations = 0;
462 decision.next_eligible_after_ms = next_eligible_after_ms;
463 decision
464}
465
466fn now_ms() -> i64 {
467 SystemTime::now()
468 .duration_since(UNIX_EPOCH)
469 .map(|duration| duration.as_millis() as i64)
470 .unwrap_or(0)
471}
472
473fn hnsw_index_path(data_dir: &Path, embedder_id: &str) -> PathBuf {
474 data_dir
475 .join(VECTOR_INDEX_DIR)
476 .join(format!("hnsw-{embedder_id}.chsw"))
477}
478
479fn safe_path_component(raw: &str) -> String {
480 raw.chars()
481 .map(|ch| {
482 if ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_') {
483 ch
484 } else {
485 '_'
486 }
487 })
488 .collect()
489}
490
491fn semantic_staging_index_path(
492 data_dir: &Path,
493 tier: TierKind,
494 embedder_id: &str,
495 db_fingerprint: &str,
496) -> PathBuf {
497 let fingerprint_hash = crc32fast::hash(db_fingerprint.as_bytes());
498 data_dir.join(VECTOR_INDEX_DIR).join(format!(
499 ".staging-{}-{}-{fingerprint_hash:08x}.fsvi",
500 tier.as_str(),
501 safe_path_component(embedder_id)
502 ))
503}
504
505fn semantic_generation_fingerprint_component(db_fingerprint: &str) -> String {
506 blake3::hash(db_fingerprint.as_bytes())
507 .to_hex()
508 .chars()
509 .take(16)
510 .collect()
511}
512
513fn semantic_shard_generation_dir(
514 data_dir: &Path,
515 tier: TierKind,
516 embedder_id: &str,
517 db_fingerprint: &str,
518) -> PathBuf {
519 let fingerprint_hash = semantic_generation_fingerprint_component(db_fingerprint);
520 data_dir.join(VECTOR_INDEX_DIR).join("shards").join(format!(
521 "{}-{}-{fingerprint_hash}",
522 tier.as_str(),
523 safe_path_component(embedder_id),
524 ))
525}
526
527fn semantic_shard_index_path(
528 data_dir: &Path,
529 tier: TierKind,
530 embedder_id: &str,
531 db_fingerprint: &str,
532 shard_index: u32,
533) -> PathBuf {
534 semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
535 .join(format!("shard-{shard_index:05}.fsvi"))
536}
537
538fn semantic_shard_ann_index_path(
539 data_dir: &Path,
540 tier: TierKind,
541 embedder_id: &str,
542 db_fingerprint: &str,
543 shard_index: u32,
544) -> PathBuf {
545 semantic_shard_generation_dir(data_dir, tier, embedder_id, db_fingerprint)
546 .join(format!("shard-{shard_index:05}.chsw"))
547}
548
549#[cfg(not(windows))]
550fn sync_parent_directory(path: &Path) -> Result<()> {
551 let Some(parent) = path.parent() else {
552 return Ok(());
553 };
554 let directory = fs::File::open(parent)
555 .with_context(|| format!("opening parent directory {}", parent.display()))?;
556 directory
557 .sync_all()
558 .with_context(|| format!("syncing parent directory {}", parent.display()))
559}
560
561#[cfg(windows)]
562fn sync_parent_directory(_path: &Path) -> Result<()> {
563 Ok(())
564}
565
566fn semantic_doc_id_for_embedded(embedded: &EmbeddedMessage) -> String {
567 SemanticDocId {
568 message_id: embedded.message_id,
569 chunk_idx: embedded.chunk_idx,
570 agent_id: embedded.agent_id,
571 workspace_id: embedded.workspace_id,
572 source_id: embedded.source_id,
573 role: embedded.role,
574 created_at_ms: embedded.created_at_ms,
575 content_hash: Some(embedded.content_hash),
576 }
577 .to_doc_id_string()
578}
579
580struct CanonicalEmbeddingConversationRow {
581 conversation_id: i64,
582 agent_slug: String,
583 agent_id: i64,
584 workspace: Option<PathBuf>,
585 workspace_id: Option<i64>,
586 external_id: Option<String>,
587 title: Option<String>,
588 source_path: PathBuf,
589 started_at: Option<i64>,
590 ended_at: Option<i64>,
591 source_id: Option<String>,
592 origin_host: Option<String>,
593}
594
595struct CanonicalEmbeddingBatch {
596 inputs: Vec<EmbeddingInput>,
597 conversations_in_batch: u64,
598 last_conversation_id: i64,
599 total_conversations: u64,
600 cursor_exhausted: bool,
601}
602
603#[derive(Debug, Clone, Copy, PartialEq, Eq)]
604struct SemanticCheckpointCaps {
605 max_messages: usize,
606 max_bytes: u64,
607}
608
609impl SemanticCheckpointCaps {
610 fn unlimited() -> Self {
611 Self {
612 max_messages: 0,
613 max_bytes: 0,
614 }
615 }
616
617 fn from_env() -> Self {
618 Self {
619 max_messages: resolved_env_usize(
620 "CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT",
621 DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT,
622 ),
623 max_bytes: resolved_env_u64(
624 "CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT",
625 DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT,
626 ),
627 }
628 }
629
630 fn message_limited(self) -> bool {
631 self.max_messages > 0
632 }
633
634 fn byte_limited(self) -> bool {
635 self.max_bytes > 0
636 }
637}
638
639pub(crate) struct CanonicalIncrementalEmbeddingBatch {
640 pub inputs: Vec<EmbeddingInput>,
641 pub conversations_in_batch: u64,
642 pub raw_max_message_id: Option<i64>,
643}
644
645fn total_semantic_conversations(storage: &FrankenStorage) -> Result<u64> {
646 let hinted_fallback_sql = "SELECT c.id
647 FROM conversations c
648 WHERE EXISTS (
649 SELECT 1
650 FROM messages INDEXED BY sqlite_autoindex_messages_1
651 WHERE conversation_id = c.id
652 LIMIT 1
653 )";
654 let fallback_sql = "SELECT c.id
655 FROM conversations c
656 WHERE EXISTS (
657 SELECT 1
658 FROM messages
659 WHERE conversation_id = c.id
660 LIMIT 1
661 )";
662 let count: i64 = storage
663 .raw()
664 .query_row_map(
665 "SELECT COUNT(*) FROM conversations WHERE message_count > 0",
666 &[] as &[ParamValue],
667 |row| row.get_typed(0),
668 )
669 .or_else(|err| {
670 if !err.to_string().contains("no such column: message_count") {
671 return Err(err);
672 }
673 let conversation_ids: Vec<i64> = storage
674 .raw()
675 .query_map_collect(hinted_fallback_sql, &[] as &[ParamValue], |row| {
676 row.get_typed(0)
677 })
678 .or_else(|err| {
679 if err
680 .to_string()
681 .contains("no such index: sqlite_autoindex_messages_1")
682 {
683 return storage.raw().query_map_collect(
684 fallback_sql,
685 &[] as &[ParamValue],
686 |row| row.get_typed(0),
687 );
688 }
689 Err(err)
690 })?;
691 Ok(i64::try_from(conversation_ids.len()).unwrap_or(i64::MAX))
692 })
693 .with_context(|| "counting canonical conversations with semantic messages")?;
694 Ok(u64::try_from(count.max(0)).unwrap_or(u64::MAX))
695}
696
697pub(crate) fn message_id_from_db(raw: i64) -> Option<u64> {
698 u64::try_from(raw).ok()
699}
700
701pub(crate) fn saturating_u32_from_i64(raw: i64) -> u32 {
702 match u32::try_from(raw) {
703 Ok(value) => value,
704 Err(_) if raw.is_negative() => 0,
705 Err(_) => u32::MAX,
706 }
707}
708
709fn canonical_embedding_created_at_ms(message_id: u64, created_at: Option<i64>) -> i64 {
710 created_at.unwrap_or_else(|| {
718 tracing::warn!(
719 message_id,
720 "semantic backfill: row has NULL created_at; defaulting to 0 (Unix epoch). \
721 Downstream time-range filters will treat this message as 1970-01-01."
722 );
723 0
724 })
725}
726
727fn canonical_embedding_packet_provenance(
728 row: &CanonicalEmbeddingConversationRow,
729) -> ConversationPacketProvenance {
730 let source_id =
731 normalized_index_source_id(row.source_id.as_deref(), None, row.origin_host.as_deref());
732 ConversationPacketProvenance {
733 origin_kind: normalized_index_origin_kind(&source_id, None),
734 origin_host: normalized_index_origin_host(row.origin_host.as_deref()),
735 source_id,
736 }
737}
738
739fn canonical_embedding_conversation(
740 row: &CanonicalEmbeddingConversationRow,
741 provenance: &ConversationPacketProvenance,
742 messages: Vec<Message>,
743) -> Conversation {
744 Conversation {
745 id: Some(row.conversation_id),
746 agent_slug: row.agent_slug.clone(),
747 workspace: row.workspace.clone(),
748 external_id: row.external_id.clone(),
749 title: row.title.clone(),
750 source_path: row.source_path.clone(),
751 started_at: row.started_at,
752 ended_at: row.ended_at,
753 approx_tokens: None,
754 metadata_json: serde_json::Value::Null,
755 messages,
756 source_id: provenance.source_id.clone(),
757 origin_host: provenance.origin_host.clone(),
758 }
759}
760
761fn embedding_input_from_packet_message(
762 conversation_id: i64,
763 agent_id: u32,
764 workspace_id: u32,
765 source_id_hash: u32,
766 message: &crate::model::conversation_packet::ConversationPacketMessage,
767) -> Option<EmbeddingInput> {
768 let Some(raw_message_id) = message.message_id else {
769 tracing::warn!(
770 conversation_id,
771 message_idx = message.idx,
772 "skipping semantic backfill message without canonical id in ConversationPacket replay"
773 );
774 return None;
775 };
776 let Some(message_id) = message_id_from_db(raw_message_id) else {
777 tracing::warn!(
778 conversation_id,
779 raw_message_id,
780 "skipping out-of-range id during semantic backfill"
781 );
782 return None;
783 };
784 Some(EmbeddingInput {
785 message_id,
786 created_at_ms: canonical_embedding_created_at_ms(message_id, message.created_at),
787 agent_id,
788 workspace_id,
789 source_id: source_id_hash,
790 role: role_code_from_str(&message.role).unwrap_or(ROLE_USER),
791 chunk_idx: 0,
792 content: message.content.clone(),
793 })
794}
795
796fn embedding_inputs_from_conversation_packet(
797 row: &CanonicalEmbeddingConversationRow,
798 packet: &ConversationPacket,
799) -> Vec<EmbeddingInput> {
800 let agent_id = saturating_u32_from_i64(row.agent_id);
801 let workspace_id = saturating_u32_from_i64(row.workspace_id.unwrap_or(0));
802 let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
803 packet
804 .projections
805 .semantic
806 .message_indices
807 .iter()
808 .filter_map(|message_index| {
809 packet
810 .payload
811 .messages
812 .get(*message_index)
813 .and_then(|message| {
814 embedding_input_from_packet_message(
815 row.conversation_id,
816 agent_id,
817 workspace_id,
818 source_id_hash,
819 message,
820 )
821 })
822 })
823 .collect()
824}
825
826fn fetch_canonical_embedding_conversations(
827 storage: &FrankenStorage,
828 conversation_ids: &[i64],
829) -> Result<Vec<CanonicalEmbeddingConversationRow>> {
830 let mut envelope_sql = String::from(
831 "SELECT c.id,
832 COALESCE(a.slug, 'unknown'),
833 COALESCE(c.agent_id, 0),
834 c.workspace_id,
835 w.path,
836 c.external_id,
837 c.title,
838 c.source_path,
839 c.started_at,
840 c.ended_at,
841 c.source_id,
842 c.origin_host
843 FROM conversations c
844 LEFT JOIN agents a ON a.id = c.agent_id
845 LEFT JOIN workspaces w ON w.id = c.workspace_id
846 WHERE c.id IN (",
847 );
848 let mut params = Vec::with_capacity(conversation_ids.len());
849 for (idx, conversation_id) in conversation_ids.iter().enumerate() {
850 if idx > 0 {
851 envelope_sql.push_str(", ");
852 }
853 envelope_sql.push_str(&format!("?{}", idx + 1));
854 params.push(ParamValue::from(*conversation_id));
855 }
856 envelope_sql.push_str(") ORDER BY c.id ASC");
857
858 storage
859 .raw()
860 .query_map_collect(&envelope_sql, ¶ms, |row| {
861 let workspace_path: Option<String> = row.get_typed(4)?;
862 Ok(CanonicalEmbeddingConversationRow {
863 conversation_id: row.get_typed(0)?,
864 agent_slug: row.get_typed(1)?,
865 agent_id: row.get_typed(2)?,
866 workspace_id: row.get_typed(3)?,
867 workspace: workspace_path.map(PathBuf::from),
868 external_id: row.get_typed(5)?,
869 title: row.get_typed(6)?,
870 source_path: PathBuf::from(row.get_typed::<String>(7)?),
871 started_at: row.get_typed(8)?,
872 ended_at: row.get_typed(9)?,
873 source_id: row.get_typed(10)?,
874 origin_host: row.get_typed(11)?,
875 })
876 })
877 .with_context(|| {
878 format!(
879 "fetching semantic backfill conversation envelopes for {} conversations",
880 conversation_ids.len()
881 )
882 })
883}
884
885#[allow(dead_code)]
896#[derive(Debug, Clone, Copy)]
897pub(crate) struct SemanticPacketContext {
898 pub conversation_id: i64,
899 pub agent_id: u32,
900 pub workspace_id: u32,
901}
902
903#[allow(dead_code)]
923pub(crate) fn semantic_inputs_from_packets(
924 packets: &[ConversationPacket],
925 contexts: &[SemanticPacketContext],
926) -> Result<Vec<EmbeddingInput>> {
927 if packets.len() != contexts.len() {
928 anyhow::bail!(
929 "semantic_inputs_from_packets length mismatch: {} packets vs {} contexts",
930 packets.len(),
931 contexts.len()
932 );
933 }
934 let mut inputs = Vec::new();
935 for (packet, context) in packets.iter().zip(contexts.iter()) {
936 let source_id_hash = crc32fast::hash(packet.payload.provenance.source_id.as_bytes());
937 for &message_index in &packet.projections.semantic.message_indices {
938 let Some(message) = packet.payload.messages.get(message_index) else {
939 anyhow::bail!(
940 "packet semantic projection references missing message index {} \
941 (packet has {} messages)",
942 message_index,
943 packet.payload.messages.len()
944 );
945 };
946 if let Some(input) = embedding_input_from_packet_message(
947 context.conversation_id,
948 context.agent_id,
949 context.workspace_id,
950 source_id_hash,
951 message,
952 ) {
953 inputs.push(input);
954 }
955 }
956 }
957 tracing::debug!(
958 packets = packets.len(),
959 packet_driven = true,
960 semantic_inputs = inputs.len(),
961 "built semantic inputs from in-memory ConversationPacket batch"
962 );
963 Ok(inputs)
964}
965
966fn fetch_canonical_embedding_batch(
967 storage: &FrankenStorage,
968 after_conversation_id: i64,
969 max_conversations: usize,
970) -> Result<CanonicalEmbeddingBatch> {
971 fetch_canonical_embedding_batch_inner(storage, after_conversation_id, max_conversations, None)
972}
973
974fn fetch_canonical_embedding_batch_inner(
980 storage: &FrankenStorage,
981 after_conversation_id: i64,
982 max_conversations: usize,
983 after_message_id: Option<i64>,
984) -> Result<CanonicalEmbeddingBatch> {
985 fetch_canonical_embedding_batch_inner_with_caps(
986 storage,
987 after_conversation_id,
988 max_conversations,
989 after_message_id,
990 SemanticCheckpointCaps::unlimited(),
991 )
992}
993
994fn fetch_canonical_embedding_batch_inner_with_caps(
995 storage: &FrankenStorage,
996 after_conversation_id: i64,
997 max_conversations: usize,
998 after_message_id: Option<i64>,
999 caps: SemanticCheckpointCaps,
1000) -> Result<CanonicalEmbeddingBatch> {
1001 let total_conversations = total_semantic_conversations(storage)?;
1002 let max_conversations = max_conversations.max(1);
1003 let query_limit = max_conversations.saturating_add(1);
1004 let query_limit_i64 = i64::try_from(query_limit).unwrap_or(i64::MAX);
1005 let mut params = vec![
1006 ParamValue::from(after_conversation_id),
1007 ParamValue::from(query_limit_i64),
1008 ];
1009 let message_cursor_predicate = if let Some(after_message_id) = after_message_id {
1010 params.push(ParamValue::from(after_message_id));
1011 " AND id > ?3"
1012 } else {
1013 ""
1014 };
1015 let hinted_sql = format!(
1016 "SELECT c.id
1017 FROM conversations c
1018 WHERE c.id > ?1
1019 AND EXISTS (
1020 SELECT 1
1021 FROM messages INDEXED BY sqlite_autoindex_messages_1
1022 WHERE conversation_id = c.id{message_cursor_predicate}
1023 LIMIT 1
1024 )
1025 ORDER BY c.id ASC
1026 LIMIT ?2"
1027 );
1028 let fallback_sql = format!(
1029 "SELECT c.id
1030 FROM conversations c
1031 WHERE c.id > ?1
1032 AND EXISTS (
1033 SELECT 1
1034 FROM messages
1035 WHERE conversation_id = c.id{message_cursor_predicate}
1036 LIMIT 1
1037 )
1038 ORDER BY c.id ASC
1039 LIMIT ?2"
1040 );
1041 let mut conversation_ids: Vec<i64> = storage
1042 .raw()
1043 .query_map_collect(&hinted_sql, ¶ms, |row| row.get_typed(0))
1044 .or_else(|err| {
1045 if err
1046 .to_string()
1047 .contains("no such index: sqlite_autoindex_messages_1")
1048 {
1049 return storage
1050 .raw()
1051 .query_map_collect(&fallback_sql, ¶ms, |row| row.get_typed(0));
1052 }
1053 Err(err)
1054 })
1055 .with_context(|| {
1056 format!("fetching semantic backfill conversation ids after {after_conversation_id}")
1057 })?;
1058
1059 if conversation_ids.is_empty() {
1060 return Ok(CanonicalEmbeddingBatch {
1061 inputs: Vec::new(),
1062 conversations_in_batch: 0,
1063 last_conversation_id: after_conversation_id,
1064 total_conversations,
1065 cursor_exhausted: true,
1066 });
1067 }
1068
1069 let has_more_from_sql_limit = conversation_ids.len() > max_conversations;
1070 if has_more_from_sql_limit {
1071 conversation_ids.truncate(max_conversations);
1072 }
1073
1074 let conversations = fetch_canonical_embedding_conversations(storage, &conversation_ids)?;
1075
1076 let mut grouped_messages =
1077 storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
1078 let CheckpointCappedSelection {
1079 conversations,
1080 last_conversation_id,
1081 stopped_before_candidate,
1082 } = select_checkpoint_capped_conversations(
1083 conversations,
1084 &mut grouped_messages,
1085 after_message_id,
1086 caps,
1087 );
1088 let (inputs, _) = packet_embedding_inputs_from_materialized_canonical_messages(
1089 &conversations,
1090 &mut grouped_messages,
1091 |_| true,
1092 );
1093
1094 let conversations_in_batch = u64::try_from(conversations.len()).unwrap_or(u64::MAX);
1095 let cursor_exhausted = !has_more_from_sql_limit && !stopped_before_candidate;
1096 tracing::debug!(
1097 conversations_in_batch,
1098 cursor_exhausted,
1099 packet_driven = true,
1100 semantic_inputs = inputs.len(),
1101 max_messages_per_checkpoint = caps.max_messages,
1102 max_bytes_per_checkpoint = caps.max_bytes,
1103 ?after_message_id,
1104 "built semantic backfill batch from ConversationPacket canonical replay"
1105 );
1106
1107 Ok(CanonicalEmbeddingBatch {
1108 inputs,
1109 conversations_in_batch,
1110 last_conversation_id: last_conversation_id.unwrap_or(after_conversation_id),
1111 total_conversations,
1112 cursor_exhausted,
1113 })
1114}
1115
1116struct CheckpointCappedSelection {
1117 conversations: Vec<CanonicalEmbeddingConversationRow>,
1118 last_conversation_id: Option<i64>,
1119 stopped_before_candidate: bool,
1120}
1121
1122fn select_checkpoint_capped_conversations(
1123 conversations: Vec<CanonicalEmbeddingConversationRow>,
1124 grouped_messages: &mut HashMap<i64, Vec<Message>>,
1125 after_message_id: Option<i64>,
1126 caps: SemanticCheckpointCaps,
1127) -> CheckpointCappedSelection {
1128 let mut selected = Vec::new();
1129 let mut selected_messages = 0usize;
1130 let mut selected_bytes = 0u64;
1131 let mut last_conversation_id = None;
1132 let mut stopped_before_candidate = false;
1133
1134 for conversation in conversations {
1135 let mut messages = grouped_messages
1136 .remove(&conversation.conversation_id)
1137 .unwrap_or_default();
1138 if let Some(min_exclusive) = after_message_id {
1139 messages.retain(|message| message.id.is_some_and(|id| id > min_exclusive));
1140 }
1141 if messages.is_empty() {
1142 continue;
1143 }
1144
1145 let message_count = messages.len();
1146 let byte_count = messages
1147 .iter()
1148 .map(|message| saturating_u64_from_usize(message.content.len()))
1149 .fold(0u64, u64::saturating_add);
1150 let would_exceed_messages = caps.message_limited()
1151 && !selected.is_empty()
1152 && selected_messages.saturating_add(message_count) > caps.max_messages;
1153 let would_exceed_bytes = caps.byte_limited()
1154 && !selected.is_empty()
1155 && selected_bytes.saturating_add(byte_count) > caps.max_bytes;
1156 if would_exceed_messages || would_exceed_bytes {
1157 stopped_before_candidate = true;
1158 tracing::debug!(
1159 conversation_id = conversation.conversation_id,
1160 selected_conversations = selected.len(),
1161 selected_messages,
1162 selected_bytes,
1163 candidate_messages = message_count,
1164 candidate_bytes = byte_count,
1165 max_messages_per_checkpoint = caps.max_messages,
1166 max_bytes_per_checkpoint = caps.max_bytes,
1167 "semantic checkpoint cap stopped this batch before the next full conversation"
1168 );
1169 break;
1170 }
1171
1172 selected_messages = selected_messages.saturating_add(message_count);
1173 selected_bytes = selected_bytes.saturating_add(byte_count);
1174 last_conversation_id = Some(conversation.conversation_id);
1175 grouped_messages.insert(conversation.conversation_id, messages);
1176 selected.push(conversation);
1177 }
1178
1179 CheckpointCappedSelection {
1180 conversations: selected,
1181 last_conversation_id,
1182 stopped_before_candidate,
1183 }
1184}
1185
1186pub(crate) fn packet_embedding_inputs_from_storage(
1187 storage: &FrankenStorage,
1188) -> Result<Vec<EmbeddingInput>> {
1189 Ok(fetch_canonical_embedding_batch(storage, 0, usize::MAX)?.inputs)
1190}
1191
1192fn packet_embedding_inputs_from_selected_canonical_messages<F>(
1193 storage: &FrankenStorage,
1194 conversation_ids: &[i64],
1195 include_message: F,
1196) -> Result<(Vec<EmbeddingInput>, Option<i64>)>
1197where
1198 F: FnMut(&Message) -> bool,
1199{
1200 if conversation_ids.is_empty() {
1201 return Ok((Vec::new(), None));
1202 }
1203
1204 let conversations = fetch_canonical_embedding_conversations(storage, conversation_ids)?;
1205 let mut grouped_messages =
1206 storage.fetch_messages_for_lexical_rebuild_batch(conversation_ids, None, None)?;
1207 Ok(
1208 packet_embedding_inputs_from_materialized_canonical_messages(
1209 &conversations,
1210 &mut grouped_messages,
1211 include_message,
1212 ),
1213 )
1214}
1215
1216fn packet_embedding_inputs_from_materialized_canonical_messages<F>(
1217 conversations: &[CanonicalEmbeddingConversationRow],
1218 grouped_messages: &mut HashMap<i64, Vec<Message>>,
1219 mut include_message: F,
1220) -> (Vec<EmbeddingInput>, Option<i64>)
1221where
1222 F: FnMut(&Message) -> bool,
1223{
1224 let mut inputs = Vec::new();
1225 let mut raw_max_message_id: Option<i64> = None;
1226
1227 for conversation in conversations {
1228 let mut messages = grouped_messages
1229 .remove(&conversation.conversation_id)
1230 .unwrap_or_default();
1231 messages.retain(|message| {
1232 let keep = include_message(message);
1233 if keep && let Some(message_id) = message.id {
1234 raw_max_message_id =
1235 Some(raw_max_message_id.map_or(message_id, |current| current.max(message_id)));
1236 }
1237 keep
1238 });
1239 if messages.is_empty() {
1240 continue;
1241 }
1242
1243 let provenance = canonical_embedding_packet_provenance(conversation);
1244 let canonical = canonical_embedding_conversation(conversation, &provenance, messages);
1245 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
1246 inputs.extend(embedding_inputs_from_conversation_packet(
1247 conversation,
1248 &packet,
1249 ));
1250 }
1251
1252 (inputs, raw_max_message_id)
1253}
1254
1255pub(crate) fn packet_embedding_inputs_from_storage_since(
1256 storage: &FrankenStorage,
1257 since_message_id: i64,
1258) -> Result<CanonicalIncrementalEmbeddingBatch> {
1259 let conversation_ids: Vec<i64> = storage
1260 .raw()
1261 .query_map_collect(
1262 "SELECT DISTINCT m.conversation_id
1263 FROM messages m
1264 WHERE m.id > ?1
1265 ORDER BY m.conversation_id ASC",
1266 &[ParamValue::from(since_message_id)],
1267 |row| row.get_typed(0),
1268 )
1269 .with_context(|| {
1270 format!(
1271 "fetching canonical semantic catch-up conversation ids after message {since_message_id}"
1272 )
1273 })?;
1274
1275 if conversation_ids.is_empty() {
1276 return Ok(CanonicalIncrementalEmbeddingBatch {
1277 inputs: Vec::new(),
1278 conversations_in_batch: 0,
1279 raw_max_message_id: None,
1280 });
1281 }
1282
1283 let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1284 storage,
1285 &conversation_ids,
1286 |message| message.id.is_some_and(|id| id > since_message_id),
1287 )?;
1288
1289 let conversations_in_batch = u64::try_from(conversation_ids.len()).unwrap_or(u64::MAX);
1290 tracing::debug!(
1291 since_message_id,
1292 conversations_in_batch,
1293 packet_driven = true,
1294 semantic_inputs = inputs.len(),
1295 "built semantic catch-up batch from ConversationPacket canonical replay"
1296 );
1297
1298 Ok(CanonicalIncrementalEmbeddingBatch {
1299 inputs,
1300 conversations_in_batch,
1301 raw_max_message_id,
1302 })
1303}
1304
1305pub(crate) fn packet_embedding_inputs_from_storage_for_message_ids(
1306 storage: &FrankenStorage,
1307 conversation_ids: &[i64],
1308 message_ids: &HashSet<i64>,
1309) -> Result<Vec<EmbeddingInput>> {
1310 if conversation_ids.is_empty() || message_ids.is_empty() {
1311 return Ok(Vec::new());
1312 }
1313
1314 let (inputs, raw_max_message_id) = packet_embedding_inputs_from_selected_canonical_messages(
1315 storage,
1316 conversation_ids,
1317 |message| message.id.is_some_and(|id| message_ids.contains(&id)),
1318 )?;
1319 tracing::debug!(
1320 conversations_in_batch = conversation_ids.len(),
1321 selected_message_ids = message_ids.len(),
1322 semantic_inputs = inputs.len(),
1323 raw_max_message_id,
1324 packet_driven = true,
1325 "built selected semantic batch from ConversationPacket canonical replay"
1326 );
1327
1328 Ok(inputs)
1329}
1330
1331struct Prepared<'a> {
1332 msg: &'a EmbeddingInput,
1333 canonical: String,
1334 hash: [u8; 32],
1335}
1336
1337#[derive(Debug, Clone, PartialEq, Eq)]
1338struct MemoizedPreparedMessage {
1339 canonical: String,
1340 hash: [u8; 32],
1341}
1342
1343fn semantic_prep_memo_key(content: &str) -> MemoKey {
1344 MemoKey::new(
1345 MemoContentHash::from_bytes(content_hash(content).to_vec()),
1346 SEMANTIC_PREP_MEMO_ALGORITHM,
1347 SEMANTIC_PREP_MEMO_VERSION,
1348 )
1349}
1350
1351fn memo_counter_delta(after: u64, before: u64) -> u64 {
1352 after.saturating_sub(before)
1353}
1354
1355fn trace_semantic_prep_memo_window(
1356 window_index: usize,
1357 window_len: usize,
1358 prepared_len: usize,
1359 entry_capacity: usize,
1360 before: &crate::indexer::memoization::MemoCacheStats,
1361 after: &crate::indexer::memoization::MemoCacheStats,
1362) {
1363 tracing::trace!(
1364 algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1365 algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1366 window_index,
1367 window_len,
1368 prepared_messages = prepared_len,
1369 skipped_messages = window_len.saturating_sub(prepared_len),
1370 hit_delta = memo_counter_delta(after.hits, before.hits),
1371 miss_delta = memo_counter_delta(after.misses, before.misses),
1372 insert_delta = memo_counter_delta(after.inserts, before.inserts),
1373 evictions_capacity_delta =
1374 memo_counter_delta(after.evictions_capacity, before.evictions_capacity),
1375 quarantined_delta = memo_counter_delta(after.quarantined, before.quarantined),
1376 live_entries = after.live_entries,
1377 entry_capacity,
1378 "semantic prep memo cache window"
1379 );
1380}
1381
1382fn trace_semantic_prep_memo_audit(audit: &MemoCacheAuditRecord) {
1383 tracing::trace!(?audit, "semantic prep memo cache audit");
1384}
1385
1386fn prepare_window_with_memo<'a>(
1387 window: &'a [EmbeddingInput],
1388 cache: &mut ContentAddressedMemoCache<MemoizedPreparedMessage>,
1389) -> Vec<Prepared<'a>> {
1390 window
1391 .iter()
1392 .filter_map(|msg| {
1393 let key = semantic_prep_memo_key(&msg.content);
1394 let (lookup, lookup_audit) = cache.get_with_audit(&key);
1395 trace_semantic_prep_memo_audit(&lookup_audit);
1396 match lookup {
1397 MemoLookup::Hit { value } => Some(Prepared {
1398 msg,
1399 canonical: value.canonical,
1400 hash: value.hash,
1401 }),
1402 MemoLookup::Miss | MemoLookup::Quarantined { .. } => {
1403 let canonical = canonicalize_for_embedding(&msg.content);
1404 if canonical.is_empty() {
1405 return None;
1406 }
1407 let hash = content_hash(&canonical);
1408 let insert_audit = cache.insert_with_audit(
1409 key,
1410 MemoizedPreparedMessage {
1411 canonical: canonical.clone(),
1412 hash,
1413 },
1414 );
1415 trace_semantic_prep_memo_audit(&insert_audit);
1416 Some(Prepared {
1417 msg,
1418 canonical,
1419 hash,
1420 })
1421 }
1422 }
1423 })
1424 .collect()
1425}
1426
1427fn prepare_window<'a>(window: &'a [EmbeddingInput], serial: bool) -> Vec<Prepared<'a>> {
1434 let prep = |msg: &'a EmbeddingInput| -> Option<Prepared<'a>> {
1435 let canonical = canonicalize_for_embedding(&msg.content);
1436 if canonical.is_empty() {
1437 return None;
1438 }
1439 let hash = content_hash(&canonical);
1440 Some(Prepared {
1441 msg,
1442 canonical,
1443 hash,
1444 })
1445 };
1446
1447 if serial {
1448 window.iter().filter_map(prep).collect()
1449 } else {
1450 window.par_iter().filter_map(prep).collect()
1451 }
1452}
1453
1454fn flush_prepared_batch(
1455 batch: &[Prepared<'_>],
1456 embeddings: &mut Vec<EmbeddedMessage>,
1457 pb: &ProgressBar,
1458 embedder: &dyn Embedder,
1459) -> Result<()> {
1460 if batch.is_empty() {
1461 return Ok(());
1462 }
1463
1464 let texts: Vec<&str> = batch.iter().map(|p| p.canonical.as_str()).collect();
1465 let vectors = embedder
1466 .embed_batch_sync(&texts)
1467 .map_err(|e| anyhow::anyhow!("embedding failed: {e}"))?;
1468
1469 if vectors.len() != batch.len() {
1470 bail!(
1471 "embedder returned {} embeddings for {} inputs",
1472 vectors.len(),
1473 batch.len()
1474 );
1475 }
1476
1477 for (prepared, vector) in batch.iter().zip(vectors) {
1478 if vector.len() != embedder.dimension() {
1479 bail!(
1480 "embedding dimension mismatch: expected {}, got {}",
1481 embedder.dimension(),
1482 vector.len()
1483 );
1484 }
1485 embeddings.push(EmbeddedMessage {
1486 message_id: prepared.msg.message_id,
1487 created_at_ms: prepared.msg.created_at_ms,
1488 agent_id: prepared.msg.agent_id,
1489 workspace_id: prepared.msg.workspace_id,
1490 source_id: prepared.msg.source_id,
1491 role: prepared.msg.role,
1492 chunk_idx: prepared.msg.chunk_idx,
1493 content_hash: prepared.hash,
1494 embedding: vector,
1495 });
1496 }
1497
1498 pb.inc(saturating_u64_from_usize(batch.len()));
1499 Ok(())
1500}
1501
1502pub struct SemanticIndexer {
1503 embedder: Box<dyn Embedder>,
1504 batch_size: usize,
1505}
1506
1507impl SemanticIndexer {
1508 pub fn new(embedder_type: &str, data_dir: Option<&Path>) -> Result<Self> {
1509 let embedder: Box<dyn Embedder> = match embedder_type {
1510 "fastembed" | "minilm" | "snowflake-arctic-s" | "nomic-embed" => {
1511 let dir = data_dir
1512 .ok_or_else(|| anyhow::anyhow!("data_dir required for fastembed embedder"))?;
1513 let embedder_name = if embedder_type == "fastembed" {
1514 "minilm"
1515 } else {
1516 embedder_type
1517 };
1518 Box::new(
1519 FastEmbedder::load_by_name(dir, embedder_name)
1520 .map_err(|e| anyhow::anyhow!("fastembed unavailable: {e}"))?,
1521 )
1522 }
1523 "hash" => Box::new(HashEmbedder::default()),
1524 other => bail!("unknown embedder: {other}"),
1525 };
1526
1527 Ok(Self {
1528 embedder,
1529 batch_size: resolved_default_batch_size(),
1530 })
1531 }
1532
1533 pub fn with_batch_size(mut self, batch_size: usize) -> Result<Self> {
1534 if batch_size == 0 {
1535 bail!("batch_size must be > 0");
1536 }
1537 self.batch_size = batch_size;
1538 Ok(self)
1539 }
1540
1541 pub fn batch_size(&self) -> usize {
1542 self.batch_size
1543 }
1544
1545 pub fn embedder_id(&self) -> &str {
1546 self.embedder.id()
1547 }
1548
1549 pub fn embedder_dimension(&self) -> usize {
1550 self.embedder.dimension()
1551 }
1552
1553 pub fn embed_messages(&self, messages: &[EmbeddingInput]) -> Result<Vec<EmbeddedMessage>> {
1554 self.embed_messages_with_sink(messages, &SemanticProgressSink::disabled())
1555 }
1556
1557 pub fn embed_messages_with_sink(
1562 &self,
1563 messages: &[EmbeddingInput],
1564 sink: &SemanticProgressSink,
1565 ) -> Result<Vec<EmbeddedMessage>> {
1566 if messages.is_empty() {
1567 return Ok(Vec::new());
1568 }
1569
1570 let show_progress = std::io::stderr().is_terminal();
1571 let pb = ProgressBar::new(saturating_u64_from_usize(messages.len()));
1572 if show_progress {
1573 let style = ProgressStyle::default_bar()
1574 .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} messages embedded")
1575 .unwrap_or_else(|_| ProgressStyle::default_bar());
1576 pb.set_style(style);
1577 } else {
1578 pb.set_draw_target(ProgressDrawTarget::hidden());
1579 }
1580
1581 let mut embeddings = Vec::with_capacity(messages.len());
1582
1583 let window = self.batch_size.saturating_mul(4);
1591 let serial_prep = !parallel_prep_enabled();
1592 let prep_memo_capacity = resolved_semantic_prep_memo_capacity();
1593 let mut prep_memo =
1594 serial_prep.then(|| ContentAddressedMemoCache::with_capacity(prep_memo_capacity));
1595 let mut batch_index: u64 = 0;
1596 let mut rows_processed: u64 = 0;
1597 let rows_total = u64::try_from(messages.len()).ok();
1598 let warn_after_ms = resolved_semantic_embed_batch_warn_after_ms();
1599 let fail_after_ms = resolved_semantic_embed_batch_fail_after_ms();
1600 for (window_index, window_slice) in messages.chunks(window).enumerate() {
1601 let prepared_window = match prep_memo.as_mut() {
1602 Some(cache) => {
1603 let stats_before = cache.stats().clone();
1604 let prepared_window = prepare_window_with_memo(window_slice, cache);
1605 trace_semantic_prep_memo_window(
1606 window_index,
1607 window_slice.len(),
1608 prepared_window.len(),
1609 prep_memo_capacity,
1610 &stats_before,
1611 cache.stats(),
1612 );
1613 prepared_window
1614 }
1615 None => prepare_window(window_slice, false),
1616 };
1617 let skipped_in_window = window_slice.len() - prepared_window.len();
1618 if skipped_in_window > 0 {
1619 pb.inc(saturating_u64_from_usize(skipped_in_window));
1620 }
1621
1622 for batch in prepared_window.chunks(self.batch_size) {
1623 let batch_rows = u64::try_from(batch.len()).unwrap_or(u64::MAX);
1624 let batch_bytes: u64 = batch
1630 .iter()
1631 .map(|p| saturating_u64_from_usize(p.canonical.len()))
1632 .sum();
1633 if sink.is_active() {
1634 sink.emit(
1635 SemanticProgressEvent::EmbedBatchStart,
1636 SemanticProgressFields {
1637 batch_index: Some(batch_index),
1638 batch_rows: Some(batch_rows),
1639 rows_processed: Some(rows_processed),
1640 rows_total,
1641 bytes: Some(batch_bytes),
1642 ..Default::default()
1643 },
1644 );
1645 }
1646 let batch_started = Instant::now();
1647 flush_prepared_batch(batch, &mut embeddings, &pb, self.embedder.as_ref())?;
1648 let elapsed_ms = saturating_u64_from_millis(batch_started.elapsed().as_millis());
1649 rows_processed = rows_processed.saturating_add(batch_rows);
1650 if warn_after_ms > 0 && elapsed_ms > warn_after_ms {
1651 tracing::warn!(
1652 batch_index,
1653 elapsed_ms,
1654 warn_after_ms,
1655 batch_rows,
1656 batch_bytes,
1657 embedder = self.embedder_id(),
1658 "semantic embed batch exceeded watchdog warning threshold"
1659 );
1660 }
1661 if fail_after_ms > 0 && elapsed_ms > fail_after_ms {
1662 bail!(
1663 "semantic embed batch {batch_index} took {elapsed_ms}ms, exceeding \
1664 CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS={fail_after_ms}"
1665 );
1666 }
1667 if sink.is_active() {
1668 sink.emit(
1669 SemanticProgressEvent::EmbedBatchDone,
1670 SemanticProgressFields {
1671 batch_index: Some(batch_index),
1672 batch_rows: Some(batch_rows),
1673 rows_processed: Some(rows_processed),
1674 rows_total,
1675 bytes: Some(batch_bytes),
1676 ..Default::default()
1677 },
1678 );
1679 }
1680 batch_index = batch_index.saturating_add(1);
1681 }
1682 }
1683
1684 if let Some(cache) = prep_memo.as_ref() {
1685 let stats = cache.stats();
1686 tracing::debug!(
1687 algorithm = SEMANTIC_PREP_MEMO_ALGORITHM,
1688 algorithm_version = SEMANTIC_PREP_MEMO_VERSION,
1689 hits = stats.hits,
1690 misses = stats.misses,
1691 inserts = stats.inserts,
1692 quarantined = stats.quarantined,
1693 live_entries = stats.live_entries,
1694 entry_capacity = prep_memo_capacity,
1695 "semantic prep memo cache summary"
1696 );
1697 }
1698
1699 pb.finish_with_message("Embedding complete");
1700 Ok(embeddings)
1701 }
1702
1703 pub fn build_and_save_index<I>(
1704 &self,
1705 embedded_messages: I,
1706 data_dir: &Path,
1707 ) -> Result<FsVectorIndex>
1708 where
1709 I: IntoIterator<Item = EmbeddedMessage>,
1710 {
1711 let index_path = vector_index_path(data_dir, self.embedder_id());
1712 self.build_and_save_index_at_path(embedded_messages, &index_path)
1713 }
1714
1715 pub fn build_and_save_index_shards<I>(
1716 &self,
1717 embedded_messages: I,
1718 data_dir: &Path,
1719 plan: SemanticShardBuildPlan,
1720 ) -> Result<SemanticShardBuildOutcome>
1721 where
1722 I: IntoIterator<Item = EmbeddedMessage>,
1723 {
1724 if plan.db_fingerprint.trim().is_empty() {
1725 bail!("semantic shard build requires a non-empty DB fingerprint");
1726 }
1727 if plan.max_records_per_shard == 0 {
1728 bail!("semantic shard build requires max_records_per_shard > 0");
1729 }
1730
1731 let mut shard_records = Vec::new();
1732 let mut index_paths = Vec::new();
1733 let mut ann_index_paths = Vec::new();
1734 let mut current_records = Vec::with_capacity(plan.max_records_per_shard);
1735 let mut shard_index = 0u32;
1736 let mut total_docs = 0u64;
1737
1738 for embedded in embedded_messages {
1739 current_records.push(embedded);
1740 if current_records.len() >= plan.max_records_per_shard {
1741 let records = std::mem::take(&mut current_records);
1742 let (record, path, ann_path) =
1743 self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1744 total_docs = total_docs.saturating_add(record.doc_count);
1745 shard_records.push(record);
1746 index_paths.push(path);
1747 if let Some(path) = ann_path {
1748 ann_index_paths.push(path);
1749 }
1750 shard_index = shard_index
1751 .checked_add(1)
1752 .context("semantic shard index overflow")?;
1753 }
1754 }
1755
1756 if !current_records.is_empty() {
1757 let records = std::mem::take(&mut current_records);
1758 let (record, path, ann_path) =
1759 self.write_semantic_shard(records, data_dir, &plan, shard_index)?;
1760 total_docs = total_docs.saturating_add(record.doc_count);
1761 shard_records.push(record);
1762 index_paths.push(path);
1763 if let Some(path) = ann_path {
1764 ann_index_paths.push(path);
1765 }
1766 }
1767
1768 let shard_count = u32::try_from(shard_records.len())
1769 .context("semantic shard generation exceeded u32 shard count")?;
1770 for record in &mut shard_records {
1771 record.shard_count = shard_count;
1772 }
1773
1774 let mut shard_manifest = SemanticShardManifest::load_or_default(data_dir)
1775 .map_err(|err| anyhow::anyhow!("loading semantic shard manifest for publish: {err}"))?;
1776 shard_manifest.replace_shards_for_generation(
1777 plan.tier,
1778 self.embedder_id(),
1779 &plan.db_fingerprint,
1780 shard_records,
1781 );
1782 shard_manifest
1783 .save(data_dir)
1784 .map_err(|err| anyhow::anyhow!("saving semantic shard manifest: {err}"))?;
1785 let summary = shard_manifest.summary(plan.tier, self.embedder_id(), &plan.db_fingerprint);
1786
1787 tracing::info!(
1788 tier = plan.tier.as_str(),
1789 embedder = self.embedder_id(),
1790 shard_count,
1791 doc_count = total_docs,
1792 total_conversations = plan.total_conversations,
1793 "published semantic shard generation sidecar"
1794 );
1795
1796 Ok(SemanticShardBuildOutcome {
1797 tier: plan.tier,
1798 embedder_id: self.embedder_id().to_string(),
1799 shard_count,
1800 doc_count: total_docs,
1801 total_conversations: plan.total_conversations,
1802 index_paths,
1803 ann_index_paths,
1804 shard_manifest_path: SemanticShardManifest::path(data_dir),
1805 complete: summary.complete,
1806 })
1807 }
1808
1809 fn write_semantic_shard(
1810 &self,
1811 embedded_messages: Vec<EmbeddedMessage>,
1812 data_dir: &Path,
1813 plan: &SemanticShardBuildPlan,
1814 shard_index: u32,
1815 ) -> Result<(SemanticShardRecord, PathBuf, Option<PathBuf>)> {
1816 let started_at_ms = now_ms();
1817 let shard_path = semantic_shard_index_path(
1818 data_dir,
1819 plan.tier,
1820 self.embedder_id(),
1821 &plan.db_fingerprint,
1822 shard_index,
1823 );
1824 let shard_index_file = self.build_and_save_index_at_path(embedded_messages, &shard_path)?;
1825 let size_bytes = fs::metadata(&shard_path)
1826 .with_context(|| format!("stat semantic shard {}", shard_path.display()))?
1827 .len();
1828 let (ann_index_path, ann_size_bytes, ann_ready, ann_absolute_path) = if plan.build_ann {
1829 let ann_path = semantic_shard_ann_index_path(
1830 data_dir,
1831 plan.tier,
1832 self.embedder_id(),
1833 &plan.db_fingerprint,
1834 shard_index,
1835 );
1836 let config = FsHnswConfig {
1837 m: FS_HNSW_DEFAULT_M,
1838 ef_construction: FS_HNSW_DEFAULT_EF_CONSTRUCTION,
1839 ..FsHnswConfig::default()
1840 };
1841 let hnsw = FsHnswIndex::build_from_vector_index(&shard_index_file, config)
1842 .map_err(|err| anyhow::anyhow!("build shard HNSW index failed: {err}"))?;
1843 hnsw.save(&ann_path)
1844 .map_err(|err| anyhow::anyhow!("save shard HNSW index failed: {err}"))?;
1845 let ann_size_bytes = fs::metadata(&ann_path)
1846 .with_context(|| format!("stat semantic shard ANN {}", ann_path.display()))?
1847 .len();
1848 let relative_ann_path = ann_path
1849 .strip_prefix(data_dir)
1850 .unwrap_or(ann_path.as_path())
1851 .to_string_lossy()
1852 .to_string();
1853 (
1854 Some(relative_ann_path),
1855 ann_size_bytes,
1856 true,
1857 Some(ann_path),
1858 )
1859 } else {
1860 (None, 0, false, None)
1861 };
1862 let relative_index_path = shard_path
1863 .strip_prefix(data_dir)
1864 .unwrap_or(shard_path.as_path())
1865 .to_string_lossy()
1866 .to_string();
1867 let record = SemanticShardRecord {
1868 tier: plan.tier,
1869 embedder_id: self.embedder_id().to_string(),
1870 model_revision: plan.model_revision.clone(),
1871 schema_version: SEMANTIC_SCHEMA_VERSION,
1872 chunking_version: CHUNKING_STRATEGY_VERSION,
1873 dimension: self.embedder_dimension(),
1874 shard_index,
1875 shard_count: 0,
1876 doc_count: u64::try_from(shard_index_file.record_count()).unwrap_or(u64::MAX),
1877 total_conversations: plan.total_conversations,
1878 db_fingerprint: plan.db_fingerprint.clone(),
1879 index_path: relative_index_path,
1880 quantization: "f16".to_string(),
1881 mmap_ready: true,
1882 ann_index_path,
1883 ann_size_bytes,
1884 ann_ready,
1885 size_bytes,
1886 started_at_ms,
1887 completed_at_ms: now_ms(),
1888 ready: true,
1889 };
1890 Ok((record, shard_path, ann_absolute_path))
1891 }
1892
1893 fn build_and_save_index_at_path<I>(
1894 &self,
1895 embedded_messages: I,
1896 index_path: &Path,
1897 ) -> Result<FsVectorIndex>
1898 where
1899 I: IntoIterator<Item = EmbeddedMessage>,
1900 {
1901 if let Some(parent) = index_path.parent() {
1902 std::fs::create_dir_all(parent)?;
1903 }
1904
1905 let mut writer: FsVectorIndexWriter = FsVectorIndex::create_with_revision(
1907 index_path,
1908 self.embedder_id(),
1909 "1.0",
1910 self.embedder_dimension(),
1911 FsQuantization::F16,
1912 )
1913 .map_err(|err| anyhow::anyhow!("create fsvi index failed: {err}"))?;
1914
1915 let write_result: Result<()> = (|| {
1916 for embedded in embedded_messages {
1917 if embedded.embedding.len() != self.embedder_dimension() {
1918 bail!(
1919 "embedding dimension mismatch: expected {}, got {}",
1920 self.embedder_dimension(),
1921 embedded.embedding.len()
1922 );
1923 }
1924 let doc_id = semantic_doc_id_for_embedded(&embedded);
1925 writer
1926 .write_record(&doc_id, &embedded.embedding)
1927 .map_err(|err| anyhow::anyhow!("write fsvi record failed: {err}"))?;
1928 }
1929 Ok(())
1930 })();
1931
1932 if let Err(e) = &write_result {
1933 tracing::warn!("removing partial vector index after write failure: {e}");
1935 if let Err(rm_err) = std::fs::remove_file(index_path) {
1936 tracing::error!(
1937 "failed to remove partial index file {}: {rm_err}",
1938 index_path.display()
1939 );
1940 }
1941 return Err(anyhow::anyhow!("{e}"));
1942 }
1943
1944 writer
1945 .finish()
1946 .map_err(|err| anyhow::anyhow!("finish fsvi index failed: {err}"))?;
1947
1948 FsVectorIndex::open(index_path)
1949 .map_err(|err| anyhow::anyhow!("open fsvi index failed: {err}"))
1950 }
1951
1952 pub fn append_to_index(
1960 &self,
1961 embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1962 data_dir: &Path,
1963 ) -> Result<usize> {
1964 let index_path = vector_index_path(data_dir, self.embedder_id());
1965 self.append_to_index_path(embedded_messages, &index_path)
1966 }
1967
1968 fn append_to_index_path(
1969 &self,
1970 embedded_messages: impl IntoIterator<Item = EmbeddedMessage>,
1971 index_path: &Path,
1972 ) -> Result<usize> {
1973 let mut index = FsVectorIndex::open(index_path)
1974 .map_err(|err| anyhow::anyhow!("open fsvi index for append: {err}"))?;
1975
1976 let entries: Vec<(String, Vec<f32>)> = embedded_messages
1977 .into_iter()
1978 .map(|em| {
1979 let doc_id = semantic_doc_id_for_embedded(&em);
1980 (doc_id, em.embedding)
1981 })
1982 .collect();
1983
1984 let count = entries.len();
1985 if count == 0 {
1986 return Ok(0);
1987 }
1988
1989 index
1990 .append_batch(&entries)
1991 .map_err(|err| anyhow::anyhow!("append_batch: {err}"))?;
1992
1993 if index.needs_compaction() {
1994 index
1995 .compact()
1996 .map_err(|err| anyhow::anyhow!("compaction: {err}"))?;
1997 }
1998
1999 Ok(count)
2000 }
2001
2002 fn write_backfill_staging_index(
2003 &self,
2004 embedded_messages: Vec<EmbeddedMessage>,
2005 staging_path: &Path,
2006 resume_existing: bool,
2007 ) -> Result<FsVectorIndex> {
2008 if resume_existing && staging_path.exists() {
2009 self.append_to_index_path(embedded_messages, staging_path)?;
2010 FsVectorIndex::open(staging_path)
2011 .map_err(|err| anyhow::anyhow!("open staged semantic index failed: {err}"))
2012 } else {
2013 self.build_and_save_index_at_path(embedded_messages, staging_path)
2014 }
2015 }
2016
2017 pub fn run_backfill_batch(
2018 &self,
2019 messages: &[EmbeddingInput],
2020 data_dir: &Path,
2021 manifest: &mut SemanticManifest,
2022 plan: SemanticBackfillBatchPlan,
2023 ) -> Result<SemanticBackfillBatchOutcome> {
2024 self.run_backfill_batch_with_sink(
2025 messages,
2026 data_dir,
2027 manifest,
2028 plan,
2029 None,
2030 &SemanticProgressSink::disabled(),
2031 )
2032 }
2033
2034 pub fn run_backfill_batch_with_sink(
2040 &self,
2041 messages: &[EmbeddingInput],
2042 data_dir: &Path,
2043 manifest: &mut SemanticManifest,
2044 plan: SemanticBackfillBatchPlan,
2045 last_message_id: Option<i64>,
2046 sink: &SemanticProgressSink,
2047 ) -> Result<SemanticBackfillBatchOutcome> {
2048 if plan.db_fingerprint.trim().is_empty() {
2049 bail!("semantic backfill requires a non-empty DB fingerprint");
2050 }
2051 if plan.total_conversations == 0 && plan.conversations_in_batch > 0 {
2052 bail!("semantic backfill batch cannot process conversations when total is zero");
2053 }
2054
2055 let manifest_path = SemanticManifest::path(data_dir);
2056 let staging_path = semantic_staging_index_path(
2057 data_dir,
2058 plan.tier,
2059 self.embedder_id(),
2060 &plan.db_fingerprint,
2061 );
2062 let final_path = vector_index_path(data_dir, self.embedder_id());
2063
2064 let prior_checkpoint = manifest
2065 .checkpoint
2066 .as_ref()
2067 .filter(|checkpoint| {
2068 checkpoint.tier == plan.tier
2069 && checkpoint.embedder_id == self.embedder_id()
2070 && checkpoint.is_valid(&plan.db_fingerprint)
2071 })
2072 .cloned();
2073 let prior_conversations = prior_checkpoint
2074 .as_ref()
2075 .map_or(0, |checkpoint| checkpoint.conversations_processed);
2076 let prior_docs = prior_checkpoint
2077 .as_ref()
2078 .map_or(0, |checkpoint| checkpoint.docs_embedded);
2079
2080 let embeddings = self.embed_messages_with_sink(messages, sink)?;
2081 let embedded_docs = u64::try_from(embeddings.len()).unwrap_or(u64::MAX);
2082 if sink.is_active() {
2083 sink.emit(
2084 SemanticProgressEvent::StagingWriteStart,
2085 SemanticProgressFields {
2086 batch_rows: Some(embedded_docs),
2087 note: Some(staging_path.display().to_string()),
2088 ..Default::default()
2089 },
2090 );
2091 }
2092 let mut staged_index = self.write_backfill_staging_index(
2093 embeddings,
2094 &staging_path,
2095 prior_checkpoint.is_some(),
2096 )?;
2097 if sink.is_active() {
2098 sink.emit(
2099 SemanticProgressEvent::StagingWriteDone,
2100 SemanticProgressFields {
2101 batch_rows: Some(embedded_docs),
2102 note: Some(staging_path.display().to_string()),
2103 ..Default::default()
2104 },
2105 );
2106 }
2107 let counted_conversations_processed =
2108 prior_conversations.saturating_add(plan.conversations_in_batch);
2109 let conversations_processed = if plan.cursor_exhausted {
2110 plan.total_conversations
2111 } else {
2112 counted_conversations_processed.min(plan.total_conversations)
2113 };
2114 let complete = plan.cursor_exhausted;
2115
2116 manifest.refresh_backlog(plan.total_conversations, &plan.db_fingerprint);
2117
2118 if complete {
2119 let db_fingerprint = plan.db_fingerprint.clone();
2120 if staged_index.wal_record_count() > 0 {
2121 staged_index.compact().map_err(|err| {
2122 anyhow::anyhow!("compact staged semantic index failed: {err}")
2123 })?;
2124 }
2125 drop(staged_index);
2126 if sink.is_active() {
2127 sink.emit(
2128 SemanticProgressEvent::PublishStart,
2129 SemanticProgressFields {
2130 rows_processed: Some(conversations_processed),
2131 rows_total: Some(plan.total_conversations),
2132 last_conversation_id: Some(plan.last_offset),
2133 last_message_id,
2134 note: Some(final_path.display().to_string()),
2135 ..Default::default()
2136 },
2137 );
2138 }
2139 fs::rename(&staging_path, &final_path).with_context(|| {
2140 format!(
2141 "publishing staged semantic index {} to {}",
2142 staging_path.display(),
2143 final_path.display()
2144 )
2145 })?;
2146 sync_parent_directory(&final_path)?;
2147 let published_index = FsVectorIndex::open(&final_path)
2148 .map_err(|err| anyhow::anyhow!("open published semantic index failed: {err}"))?;
2149 let size_bytes = fs::metadata(&final_path)
2150 .with_context(|| format!("stat published semantic index {}", final_path.display()))?
2151 .len();
2152 let relative_index_path = final_path
2153 .strip_prefix(data_dir)
2154 .unwrap_or(final_path.as_path())
2155 .to_string_lossy()
2156 .to_string();
2157 manifest.publish_artifact(ArtifactRecord {
2158 tier: plan.tier,
2159 embedder_id: self.embedder_id().to_string(),
2160 model_revision: plan.model_revision,
2161 schema_version: SEMANTIC_SCHEMA_VERSION,
2162 chunking_version: CHUNKING_STRATEGY_VERSION,
2163 dimension: self.embedder_dimension(),
2164 doc_count: u64::try_from(published_index.record_count()).unwrap_or(u64::MAX),
2165 conversation_count: conversations_processed,
2166 db_fingerprint: plan.db_fingerprint,
2167 index_path: relative_index_path,
2168 size_bytes,
2169 started_at_ms: prior_checkpoint
2170 .as_ref()
2171 .map_or_else(now_ms, |checkpoint| checkpoint.saved_at_ms),
2172 completed_at_ms: now_ms(),
2173 ready: true,
2174 });
2175 manifest.refresh_backlog(plan.total_conversations, &db_fingerprint);
2176 manifest.save(data_dir)?;
2177 if sink.is_active() {
2178 sink.emit(
2179 SemanticProgressEvent::PublishDone,
2180 SemanticProgressFields {
2181 rows_processed: Some(conversations_processed),
2182 rows_total: Some(plan.total_conversations),
2183 last_conversation_id: Some(plan.last_offset),
2184 last_message_id,
2185 note: Some(final_path.display().to_string()),
2186 ..Default::default()
2187 },
2188 );
2189 }
2190 } else {
2191 let docs_embedded_on_disk =
2192 u64::try_from(staged_index.record_count()).unwrap_or(u64::MAX);
2193 let checkpoint_docs = prior_docs
2194 .saturating_add(embedded_docs)
2195 .max(docs_embedded_on_disk);
2196 if sink.is_active() {
2197 sink.emit(
2198 SemanticProgressEvent::CheckpointSaveStart,
2199 SemanticProgressFields {
2200 rows_processed: Some(conversations_processed),
2201 rows_total: Some(plan.total_conversations),
2202 last_conversation_id: Some(plan.last_offset),
2203 last_message_id,
2204 ..Default::default()
2205 },
2206 );
2207 }
2208 let prior_last_message_id = prior_checkpoint
2212 .as_ref()
2213 .and_then(|checkpoint| checkpoint.last_message_id);
2214 manifest.save_checkpoint(BuildCheckpoint {
2215 tier: plan.tier,
2216 embedder_id: self.embedder_id().to_string(),
2217 last_offset: plan.last_offset,
2218 docs_embedded: checkpoint_docs,
2219 conversations_processed,
2220 total_conversations: plan.total_conversations,
2221 db_fingerprint: plan.db_fingerprint,
2222 schema_version: SEMANTIC_SCHEMA_VERSION,
2223 chunking_version: CHUNKING_STRATEGY_VERSION,
2224 saved_at_ms: now_ms(),
2225 last_message_id: last_message_id.or(prior_last_message_id),
2226 cursor_exhausted: plan.cursor_exhausted,
2227 });
2228 manifest.save(data_dir)?;
2229 if sink.is_active() {
2230 sink.emit(
2231 SemanticProgressEvent::CheckpointSaveDone,
2232 SemanticProgressFields {
2233 rows_processed: Some(conversations_processed),
2234 rows_total: Some(plan.total_conversations),
2235 last_conversation_id: Some(plan.last_offset),
2236 last_message_id: last_message_id.or(prior_last_message_id),
2237 ..Default::default()
2238 },
2239 );
2240 }
2241 }
2242
2243 Ok(SemanticBackfillBatchOutcome {
2244 tier: plan.tier,
2245 embedder_id: self.embedder_id().to_string(),
2246 embedded_docs,
2247 conversations_processed,
2248 total_conversations: plan.total_conversations,
2249 last_offset: plan.last_offset,
2250 checkpoint_saved: !complete,
2251 published: complete,
2252 index_path: if complete { final_path } else { staging_path },
2253 manifest_path,
2254 })
2255 }
2256
2257 pub fn run_backfill_from_storage(
2258 &self,
2259 storage: &FrankenStorage,
2260 data_dir: &Path,
2261 manifest: &mut SemanticManifest,
2262 plan: SemanticBackfillStoragePlan,
2263 ) -> Result<SemanticBackfillBatchOutcome> {
2264 self.run_backfill_from_storage_with_sink(
2265 storage,
2266 data_dir,
2267 manifest,
2268 plan,
2269 &SemanticProgressSink::disabled(),
2270 )
2271 }
2272
2273 pub fn run_backfill_from_storage_with_sink(
2278 &self,
2279 storage: &FrankenStorage,
2280 data_dir: &Path,
2281 manifest: &mut SemanticManifest,
2282 plan: SemanticBackfillStoragePlan,
2283 sink: &SemanticProgressSink,
2284 ) -> Result<SemanticBackfillBatchOutcome> {
2285 self.run_backfill_from_storage_with_caps_and_sink(
2286 storage,
2287 data_dir,
2288 manifest,
2289 plan,
2290 SemanticCheckpointCaps::unlimited(),
2291 sink,
2292 )
2293 }
2294
2295 pub fn run_capped_backfill_from_storage_with_sink(
2302 &self,
2303 storage: &FrankenStorage,
2304 data_dir: &Path,
2305 manifest: &mut SemanticManifest,
2306 plan: SemanticBackfillStoragePlan,
2307 sink: &SemanticProgressSink,
2308 ) -> Result<SemanticBackfillBatchOutcome> {
2309 self.run_backfill_from_storage_with_caps_and_sink(
2310 storage,
2311 data_dir,
2312 manifest,
2313 plan,
2314 SemanticCheckpointCaps::from_env(),
2315 sink,
2316 )
2317 }
2318
2319 fn run_backfill_from_storage_with_caps_and_sink(
2320 &self,
2321 storage: &FrankenStorage,
2322 data_dir: &Path,
2323 manifest: &mut SemanticManifest,
2324 plan: SemanticBackfillStoragePlan,
2325 caps: SemanticCheckpointCaps,
2326 sink: &SemanticProgressSink,
2327 ) -> Result<SemanticBackfillBatchOutcome> {
2328 let prior_checkpoint = manifest.checkpoint.as_ref().filter(|checkpoint| {
2329 checkpoint.tier == plan.tier
2330 && checkpoint.embedder_id == self.embedder_id()
2331 && checkpoint.is_valid(&plan.db_fingerprint)
2332 });
2333 let after_conversation_id = prior_checkpoint.map_or(0, |checkpoint| checkpoint.last_offset);
2334 let prior_last_message_id =
2335 prior_checkpoint.and_then(|checkpoint| checkpoint.last_message_id);
2336
2337 if sink.is_active() {
2338 sink.emit(
2339 SemanticProgressEvent::SelectionStart,
2340 SemanticProgressFields {
2341 last_conversation_id: Some(after_conversation_id),
2342 last_message_id: prior_last_message_id,
2343 rows_total: Some(saturating_u64_from_usize(plan.max_conversations)),
2344 note: Some(plan.tier.as_str().to_string()),
2345 ..Default::default()
2346 },
2347 );
2348 }
2349
2350 let batch = match fetch_canonical_embedding_batch_inner_with_caps(
2351 storage,
2352 after_conversation_id,
2353 plan.max_conversations,
2354 prior_last_message_id,
2355 caps,
2356 ) {
2357 Ok(batch) => batch,
2358 Err(err) => {
2359 if sink.is_active() {
2360 sink.emit(
2361 SemanticProgressEvent::Error,
2362 SemanticProgressFields {
2363 error: Some(format!("selection: {err}")),
2364 last_conversation_id: Some(after_conversation_id),
2365 last_message_id: prior_last_message_id,
2366 ..Default::default()
2367 },
2368 );
2369 }
2370 return Err(err);
2371 }
2372 };
2373
2374 if sink.is_active() {
2375 sink.emit(
2376 SemanticProgressEvent::SelectionDone,
2377 SemanticProgressFields {
2378 last_conversation_id: Some(batch.last_conversation_id),
2379 last_message_id: prior_last_message_id,
2380 conversations_in_batch: Some(batch.conversations_in_batch),
2381 rows_total: Some(batch.total_conversations),
2382 ..Default::default()
2383 },
2384 );
2385 sink.emit(
2397 SemanticProgressEvent::PacketReplayStart,
2398 SemanticProgressFields {
2399 conversations_in_batch: Some(batch.conversations_in_batch),
2400 ..Default::default()
2401 },
2402 );
2403 sink.emit(
2404 SemanticProgressEvent::PacketReplayProgress,
2405 SemanticProgressFields {
2406 conversations_in_batch: Some(batch.conversations_in_batch),
2407 rows_processed: Some(saturating_u64_from_usize(batch.inputs.len())),
2408 bytes: Some(
2409 batch
2410 .inputs
2411 .iter()
2412 .map(|i| saturating_u64_from_usize(i.content.len()))
2413 .sum(),
2414 ),
2415 ..Default::default()
2416 },
2417 );
2418 sink.emit(
2419 SemanticProgressEvent::PacketReplayDone,
2420 SemanticProgressFields {
2421 conversations_in_batch: Some(batch.conversations_in_batch),
2422 rows_processed: Some(saturating_u64_from_usize(batch.inputs.len())),
2423 ..Default::default()
2424 },
2425 );
2426 }
2427
2428 let batch_last_message_id = batch
2432 .inputs
2433 .iter()
2434 .map(|input| i64::try_from(input.message_id).unwrap_or(i64::MAX))
2435 .max();
2436 let next_last_message_id = match (prior_last_message_id, batch_last_message_id) {
2437 (Some(prior), Some(batch_max)) => Some(prior.max(batch_max)),
2438 (Some(prior), None) => Some(prior),
2439 (None, Some(batch_max)) => Some(batch_max),
2440 (None, None) => None,
2441 };
2442
2443 let outcome = self.run_backfill_batch_with_sink(
2444 &batch.inputs,
2445 data_dir,
2446 manifest,
2447 SemanticBackfillBatchPlan {
2448 tier: plan.tier,
2449 db_fingerprint: plan.db_fingerprint,
2450 model_revision: plan.model_revision,
2451 total_conversations: batch.total_conversations,
2452 conversations_in_batch: batch.conversations_in_batch,
2453 last_offset: batch.last_conversation_id,
2454 cursor_exhausted: batch.cursor_exhausted,
2455 },
2456 next_last_message_id,
2457 sink,
2458 );
2459
2460 match &outcome {
2461 Ok(o) => {
2462 if sink.is_active() {
2463 sink.emit(
2464 SemanticProgressEvent::Complete,
2465 SemanticProgressFields {
2466 rows_processed: Some(o.conversations_processed),
2467 rows_total: Some(o.total_conversations),
2468 last_conversation_id: Some(o.last_offset),
2469 last_message_id: next_last_message_id,
2470 ..Default::default()
2471 },
2472 );
2473 }
2474 }
2475 Err(err) => {
2476 if sink.is_active() {
2477 sink.emit(
2478 SemanticProgressEvent::Error,
2479 SemanticProgressFields {
2480 error: Some(err.to_string()),
2481 last_conversation_id: Some(batch.last_conversation_id),
2482 last_message_id: next_last_message_id,
2483 ..Default::default()
2484 },
2485 );
2486 }
2487 }
2488 }
2489
2490 outcome
2491 }
2492
2493 pub fn build_hnsw_index(
2507 &self,
2508 vector_index: &FsVectorIndex,
2509 data_dir: &Path,
2510 m: Option<usize>,
2511 ef_construction: Option<usize>,
2512 ) -> Result<PathBuf> {
2513 let m = m.unwrap_or(FS_HNSW_DEFAULT_M);
2514 let ef_construction = ef_construction.unwrap_or(FS_HNSW_DEFAULT_EF_CONSTRUCTION);
2515
2516 tracing::info!(
2517 embedder = self.embedder_id(),
2518 count = vector_index.record_count(),
2519 m,
2520 ef_construction,
2521 "Building HNSW index for approximate nearest neighbor search"
2522 );
2523
2524 let config = FsHnswConfig {
2525 m,
2526 ef_construction,
2527 ..FsHnswConfig::default()
2528 };
2529 let hnsw = FsHnswIndex::build_from_vector_index(vector_index, config)
2530 .map_err(|err| anyhow::anyhow!("build HNSW index failed: {err}"))?;
2531
2532 let hnsw_path = hnsw_index_path(data_dir, self.embedder_id());
2533 if let Some(parent) = hnsw_path.parent() {
2534 std::fs::create_dir_all(parent)?;
2535 }
2536 hnsw.save(&hnsw_path)
2537 .map_err(|err| anyhow::anyhow!("save HNSW index failed: {err}"))?;
2538
2539 tracing::info!(?hnsw_path, "Saved HNSW index");
2540 Ok(hnsw_path)
2541 }
2542}
2543
2544#[cfg(test)]
2545mod tests {
2546 use super::*;
2547 use crate::model::types::{Agent, AgentKind, Conversation, Message, MessageRole};
2548 use crate::storage::sqlite::FrankenStorage;
2549 use serde_json::json;
2550 use std::path::Path;
2551 use tempfile::tempdir;
2552
2553 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
2554 struct ComparableSemanticInput {
2555 message_id: u64,
2556 created_at_ms: i64,
2557 agent_id: u32,
2558 workspace_id: u32,
2559 source_id: u32,
2560 role: u8,
2561 content: String,
2562 }
2563
2564 fn comparable_semantic_inputs(mut inputs: Vec<EmbeddingInput>) -> Vec<ComparableSemanticInput> {
2565 let mut comparable: Vec<ComparableSemanticInput> = inputs
2566 .drain(..)
2567 .map(|input| ComparableSemanticInput {
2568 message_id: input.message_id,
2569 created_at_ms: input.created_at_ms,
2570 agent_id: input.agent_id,
2571 workspace_id: input.workspace_id,
2572 source_id: input.source_id,
2573 role: input.role,
2574 content: input.content,
2575 })
2576 .collect();
2577 comparable.sort();
2578 comparable
2579 }
2580
2581 fn test_conversation(external_id: &str, body: &str) -> Conversation {
2582 test_conversation_fixture(
2583 external_id,
2584 vec![Message {
2585 id: None,
2586 idx: 0,
2587 role: MessageRole::User,
2588 author: None,
2589 created_at: Some(1_700_000_000_500),
2590 content: body.to_string(),
2591 extra_json: json!({}),
2592 snippets: Vec::new(),
2593 }],
2594 "local",
2595 None,
2596 )
2597 }
2598
2599 fn test_conversation_with_messages(external_id: &str, messages: Vec<Message>) -> Conversation {
2600 test_conversation_fixture(external_id, messages, "remote-laptop", Some("builder-host"))
2601 }
2602
2603 fn test_conversation_fixture(
2604 external_id: &str,
2605 messages: Vec<Message>,
2606 source_id: &str,
2607 origin_host: Option<&str>,
2608 ) -> Conversation {
2609 Conversation {
2610 id: None,
2611 agent_slug: "codex".to_string(),
2612 workspace: None,
2613 external_id: Some(external_id.to_string()),
2614 title: Some(format!("semantic {external_id}")),
2615 source_path: PathBuf::from(format!("/tmp/{external_id}.jsonl")),
2616 started_at: Some(1_700_000_000_000),
2617 ended_at: Some(1_700_000_001_000),
2618 approx_tokens: None,
2619 metadata_json: json!({}),
2620 messages,
2621 source_id: source_id.to_string(),
2622 origin_host: origin_host.map(str::to_string),
2623 }
2624 }
2625
2626 fn default_scheduler_signals() -> SemanticBackfillSchedulerSignals {
2627 SemanticBackfillSchedulerSignals {
2628 foreground_pressure: false,
2629 lexical_repair_active: false,
2630 force: false,
2631 operator_disabled: false,
2632 }
2633 }
2634
2635 struct EnvVarGuard {
2636 key: &'static str,
2637 prior: Option<String>,
2638 }
2639
2640 impl EnvVarGuard {
2641 fn set(key: &'static str, value: &str) -> Self {
2642 let prior = std::env::var(key).ok();
2643 unsafe {
2645 std::env::set_var(key, value);
2646 }
2647 Self { key, prior }
2648 }
2649
2650 fn remove(key: &'static str) -> Self {
2651 let prior = std::env::var(key).ok();
2652 unsafe {
2654 std::env::remove_var(key);
2655 }
2656 Self { key, prior }
2657 }
2658 }
2659
2660 impl Drop for EnvVarGuard {
2661 fn drop(&mut self) {
2662 unsafe {
2664 match self.prior.as_deref() {
2665 Some(value) => std::env::set_var(self.key, value),
2666 None => std::env::remove_var(self.key),
2667 }
2668 }
2669 }
2670 }
2671
2672 #[test]
2673 fn semantic_backfill_scheduler_runs_and_scales_batch_under_idle_budget() {
2674 let policy = SemanticPolicy::compiled_defaults();
2675 let decision = semantic_backfill_scheduler_decision_for_capacity(
2676 &policy,
2677 64,
2678 &default_scheduler_signals(),
2679 80,
2680 );
2681
2682 assert!(decision.should_run());
2683 assert_eq!(decision.state, SemanticBackfillSchedulerState::Running);
2684 assert_eq!(
2685 decision.reason,
2686 SemanticBackfillSchedulerReason::IdleBudgetAvailable
2687 );
2688 assert_eq!(decision.scheduled_batch_conversations, 51);
2689 assert_eq!(decision.current_capacity_pct, 80);
2690 assert_eq!(decision.next_eligible_after_ms, 0);
2691 }
2692
2693 #[test]
2694 fn semantic_backfill_scheduler_reason_next_steps_are_stable() {
2695 for (reason, expected) in [
2696 (
2697 SemanticBackfillSchedulerReason::IdleBudgetAvailable,
2698 "background semantic backfill is within idle budgets",
2699 ),
2700 (
2701 SemanticBackfillSchedulerReason::OperatorDisabled,
2702 "background semantic backfill is disabled by CASS_SEMANTIC_BACKFILL_DISABLE",
2703 ),
2704 (
2705 SemanticBackfillSchedulerReason::PolicyDisabled,
2706 "semantic policy disables background semantic backfill",
2707 ),
2708 (
2709 SemanticBackfillSchedulerReason::ForegroundPressure,
2710 "foreground pressure is present; retry after the idle delay",
2711 ),
2712 (
2713 SemanticBackfillSchedulerReason::LexicalRepairActive,
2714 "lexical repair is active; semantic backfill is yielding",
2715 ),
2716 (
2717 SemanticBackfillSchedulerReason::CapacityBelowFloor,
2718 "machine responsiveness capacity is below the semantic backfill floor",
2719 ),
2720 (
2721 SemanticBackfillSchedulerReason::ThreadBudgetZero,
2722 "semantic backfill thread budget is zero",
2723 ),
2724 (
2725 SemanticBackfillSchedulerReason::BatchBudgetZero,
2726 "semantic backfill batch budget is zero",
2727 ),
2728 ] {
2729 assert_eq!(reason.next_step(), expected, "{reason:?}");
2730 }
2731 }
2732
2733 #[test]
2734 fn semantic_backfill_scheduler_yields_to_foreground_and_lexical_pressure() {
2735 let policy = SemanticPolicy::compiled_defaults();
2736 let foreground = SemanticBackfillSchedulerSignals {
2737 foreground_pressure: true,
2738 ..default_scheduler_signals()
2739 };
2740 let foreground_decision =
2741 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &foreground, 100);
2742 assert!(!foreground_decision.should_run());
2743 assert_eq!(
2744 foreground_decision.state,
2745 SemanticBackfillSchedulerState::Paused
2746 );
2747 assert_eq!(
2748 foreground_decision.reason,
2749 SemanticBackfillSchedulerReason::ForegroundPressure
2750 );
2751 assert_eq!(
2752 foreground_decision.next_eligible_after_ms,
2753 policy.idle_delay_seconds * 1000
2754 );
2755
2756 let lexical_repair = SemanticBackfillSchedulerSignals {
2757 lexical_repair_active: true,
2758 ..default_scheduler_signals()
2759 };
2760 let lexical_decision =
2761 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &lexical_repair, 100);
2762 assert!(!lexical_decision.should_run());
2763 assert_eq!(
2764 lexical_decision.state,
2765 SemanticBackfillSchedulerState::Paused
2766 );
2767 assert_eq!(
2768 lexical_decision.reason,
2769 SemanticBackfillSchedulerReason::LexicalRepairActive
2770 );
2771 }
2772
2773 #[test]
2774 fn semantic_backfill_scheduler_honors_policy_disable_and_force_override() {
2775 let mut policy = SemanticPolicy::compiled_defaults();
2776 policy.mode = crate::search::policy::SemanticMode::LexicalOnly;
2777
2778 let disabled = semantic_backfill_scheduler_decision_for_capacity(
2779 &policy,
2780 64,
2781 &default_scheduler_signals(),
2782 100,
2783 );
2784 assert!(!disabled.should_run());
2785 assert_eq!(disabled.state, SemanticBackfillSchedulerState::Disabled);
2786 assert_eq!(
2787 disabled.reason,
2788 SemanticBackfillSchedulerReason::PolicyDisabled
2789 );
2790
2791 let forced = SemanticBackfillSchedulerSignals {
2792 force: true,
2793 ..default_scheduler_signals()
2794 };
2795 let forced_decision =
2796 semantic_backfill_scheduler_decision_for_capacity(&policy, 64, &forced, 100);
2797 assert!(forced_decision.should_run());
2798 assert_eq!(
2799 forced_decision.reason,
2800 SemanticBackfillSchedulerReason::IdleBudgetAvailable
2801 );
2802 assert!(forced_decision.forced);
2803 }
2804
2805 #[test]
2806 fn test_batch_embedding() {
2807 let indexer = SemanticIndexer::new("hash", None).unwrap();
2808 let messages = vec![
2809 EmbeddingInput::new(1, "Hello world"),
2810 EmbeddingInput::new(2, "Goodbye world"),
2811 ];
2812
2813 let embeddings = indexer.embed_messages(&messages).unwrap();
2814
2815 assert_eq!(embeddings.len(), 2);
2816 assert_eq!(embeddings[0].message_id, 1);
2817 assert_eq!(embeddings[1].message_id, 2);
2818 assert_eq!(embeddings[0].embedding.len(), indexer.embedder_dimension());
2819 }
2820
2821 #[test]
2822 fn test_progress_indicator() {
2823 let indexer = SemanticIndexer::new("hash", None).unwrap();
2824 let messages: Vec<_> = (0..1000)
2825 .map(|i| EmbeddingInput::new(i as u64, format!("Message {}", i)))
2826 .collect();
2827
2828 let embeddings = indexer.embed_messages(&messages).unwrap();
2829 assert_eq!(embeddings.len(), messages.len());
2830 }
2831
2832 #[test]
2833 fn test_build_and_save_index() {
2834 let indexer = SemanticIndexer::new("hash", None).unwrap();
2835 let messages = vec![
2836 EmbeddingInput::new(1, "Hello world"),
2837 EmbeddingInput::new(2, "Goodbye world"),
2838 ];
2839
2840 let embeddings = indexer.embed_messages(&messages).unwrap();
2841 let tmp = tempdir().unwrap();
2842 let index = indexer
2843 .build_and_save_index(embeddings, tmp.path())
2844 .unwrap();
2845 assert_eq!(index.embedder_id(), indexer.embedder_id());
2846 assert_eq!(index.dimension(), indexer.embedder_dimension());
2847 assert_eq!(index.record_count(), 2);
2848 }
2849
2850 #[test]
2851 fn sharded_index_build_writes_sidecar_without_runtime_publish() {
2852 let indexer = SemanticIndexer::new("hash", None).unwrap();
2853 let messages: Vec<_> = (0..5)
2854 .map(|idx| EmbeddingInput::new(idx, format!("semantic shard message {idx}")))
2855 .collect();
2856 let embeddings = indexer.embed_messages(&messages).unwrap();
2857 let tmp = tempdir().unwrap();
2858
2859 let outcome = indexer
2860 .build_and_save_index_shards(
2861 embeddings,
2862 tmp.path(),
2863 SemanticShardBuildPlan {
2864 tier: TierKind::Fast,
2865 db_fingerprint: "db-fp-sharded-build".to_string(),
2866 model_revision: "hash".to_string(),
2867 total_conversations: 5,
2868 max_records_per_shard: 2,
2869 build_ann: false,
2870 },
2871 )
2872 .unwrap();
2873
2874 assert_eq!(outcome.shard_count, 3);
2875 assert_eq!(outcome.doc_count, 5);
2876 assert_eq!(outcome.total_conversations, 5);
2877 assert!(outcome.complete);
2878 assert_eq!(outcome.index_paths.len(), 3);
2879 for path in &outcome.index_paths {
2880 let shard = FsVectorIndex::open(path).unwrap();
2881 assert_eq!(shard.embedder_id(), indexer.embedder_id());
2882 assert!(shard.record_count() > 0);
2883 }
2884
2885 let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2886 let summary = shards.summary(TierKind::Fast, indexer.embedder_id(), "db-fp-sharded-build");
2887 assert!(summary.complete);
2888 assert_eq!(summary.ready_shards, 3);
2889 assert_eq!(summary.ann_ready_shards, 0);
2890 assert_eq!(summary.doc_count, 5);
2891 assert_eq!(summary.total_conversations, 5);
2892
2893 assert!(
2894 SemanticManifest::load(tmp.path()).unwrap().is_none(),
2895 "sidecar shards must not publish the main runtime manifest"
2896 );
2897 assert!(!vector_index_path(tmp.path(), indexer.embedder_id()).exists());
2898 }
2899
2900 #[test]
2901 fn sharded_index_build_rejects_zero_sized_shards() {
2902 let indexer = SemanticIndexer::new("hash", None).unwrap();
2903 let err = indexer
2904 .build_and_save_index_shards(
2905 std::iter::empty(),
2906 tempdir().unwrap().path(),
2907 SemanticShardBuildPlan {
2908 tier: TierKind::Fast,
2909 db_fingerprint: "db-fp-sharded-build".to_string(),
2910 model_revision: "hash".to_string(),
2911 total_conversations: 0,
2912 max_records_per_shard: 0,
2913 build_ann: false,
2914 },
2915 )
2916 .unwrap_err();
2917
2918 assert!(err.to_string().contains("max_records_per_shard > 0"));
2919 }
2920
2921 #[test]
2922 fn sharded_ann_build_records_per_shard_accelerators() {
2923 let indexer = SemanticIndexer::new("hash", None).unwrap();
2924 let messages: Vec<_> = (0..8)
2925 .map(|idx| EmbeddingInput::new(idx, format!("semantic ann shard message {idx}")))
2926 .collect();
2927 let embeddings = indexer.embed_messages(&messages).unwrap();
2928 let tmp = tempdir().unwrap();
2929
2930 let outcome = indexer
2931 .build_and_save_index_shards(
2932 embeddings,
2933 tmp.path(),
2934 SemanticShardBuildPlan {
2935 tier: TierKind::Fast,
2936 db_fingerprint: "db-fp-sharded-ann-build".to_string(),
2937 model_revision: "hash".to_string(),
2938 total_conversations: 8,
2939 max_records_per_shard: 4,
2940 build_ann: true,
2941 },
2942 )
2943 .unwrap();
2944
2945 assert_eq!(outcome.shard_count, 2);
2946 assert_eq!(outcome.ann_index_paths.len(), 2);
2947 for path in &outcome.ann_index_paths {
2948 assert!(path.exists(), "ANN shard missing at {}", path.display());
2949 }
2950
2951 let shards = SemanticShardManifest::load(tmp.path()).unwrap().unwrap();
2952 let summary = shards.summary(
2953 TierKind::Fast,
2954 indexer.embedder_id(),
2955 "db-fp-sharded-ann-build",
2956 );
2957 assert!(summary.complete);
2958 assert_eq!(summary.ann_ready_shards, 2);
2959 assert!(summary.ann_size_bytes > 0);
2960 assert!(
2961 shards
2962 .shards
2963 .iter()
2964 .all(|record| record.ann_index_path.is_some() && record.ann_ready)
2965 );
2966 }
2967
2968 #[test]
2975 fn embed_messages_golden_digest_hash_embedder() {
2976 use ring::digest::{Context, SHA256};
2977
2978 let corpus: Vec<EmbeddingInput> = (0..64)
2979 .map(|i| {
2980 let body = match i % 5 {
2981 0 => format!("plain text message number {i}"),
2982 1 => format!("**bold** line {i} with _emphasis_"),
2983 2 => format!("```rust\nfn f_{i}() {{ println!(\"{i}\"); }}\n```"),
2984 3 => format!(" whitespace {i} "),
2985 _ => format!("unicode \u{00E9}\u{0301} + emoji \u{1F600} {i}"),
2986 };
2987 EmbeddingInput::new(i as u64, body)
2988 })
2989 .collect();
2990
2991 let indexer = SemanticIndexer::new("hash", None)
2992 .unwrap()
2993 .with_batch_size(16)
2994 .unwrap();
2995 let embeddings = indexer.embed_messages(&corpus).unwrap();
2996
2997 let mut ctx = Context::new(&SHA256);
3001 for em in &embeddings {
3002 ctx.update(&em.message_id.to_le_bytes());
3003 ctx.update(&em.content_hash);
3004 for v in &em.embedding {
3005 ctx.update(&v.to_le_bytes());
3006 }
3007 }
3008 let digest = hex::encode(ctx.finish().as_ref());
3009
3010 const EXPECTED: &str = "22d9ae7076925a4b70a194b0f519dfb1d465cc757368c296ef24055a02038c2c";
3017 assert_eq!(
3018 digest, EXPECTED,
3019 "embed_messages golden digest drifted; if this was intentional, \
3020 update EXPECTED in this test and record the reason in the commit message"
3021 );
3022 }
3023
3024 #[test]
3025 fn parallel_prep_matches_serial_prep_bitwise() {
3026 let inputs: Vec<EmbeddingInput> = (0..500)
3029 .map(|i| {
3030 let text = match i % 7 {
3031 0 => format!("Plain message number {i} with some ordinary words."),
3032 1 => format!("**Bold** and _italic_ markdown line {i}"),
3033 2 => format!(
3034 "```rust\nfn example_{i}() {{\n println!(\"code block {i}\");\n}}\n```\nfollow-up text"
3035 ),
3036 3 => String::new(), 4 => format!(" whitespace galore {i} "),
3038 5 => format!("Unicode \u{00E9}\u{0301} (combining accent) and emoji \u{1F600} line {i}"),
3039 _ => format!(
3040 "Mixed line {i}: `inline_code`, [link](http://x), {{braces}}, and \u{201C}curly quotes\u{201D}."
3041 ),
3042 };
3043 EmbeddingInput::new(i as u64, text)
3044 })
3045 .collect();
3046
3047 let serial = prepare_window(&inputs, true);
3048 let parallel = prepare_window(&inputs, false);
3049
3050 assert_eq!(
3051 serial.len(),
3052 parallel.len(),
3053 "serial and parallel prep should skip the same number of empty canonicals"
3054 );
3055
3056 for (s, p) in serial.iter().zip(parallel.iter()) {
3057 assert_eq!(
3058 s.msg.message_id, p.msg.message_id,
3059 "ordering must be preserved between serial and parallel prep"
3060 );
3061 assert_eq!(
3062 s.canonical, p.canonical,
3063 "canonical form diverged between serial and parallel prep"
3064 );
3065 assert_eq!(
3066 s.hash, p.hash,
3067 "content hash diverged between serial and parallel prep"
3068 );
3069 }
3070 }
3071
3072 #[test]
3073 fn parallel_prep_filters_empty_canonicals() {
3074 let inputs = vec![
3075 EmbeddingInput::new(1, "valid content"),
3076 EmbeddingInput::new(2, ""),
3077 EmbeddingInput::new(3, " \n\n \t "),
3078 EmbeddingInput::new(4, "more valid content"),
3079 ];
3080
3081 let prepared = prepare_window(&inputs, false);
3082 let ids: Vec<u64> = prepared.iter().map(|p| p.msg.message_id).collect();
3083
3084 assert!(ids.contains(&1));
3085 assert!(ids.contains(&4));
3086 assert!(!ids.contains(&2));
3088 assert!(!ids.contains(&3));
3089 }
3090
3091 #[test]
3092 fn memoized_serial_prep_matches_stateless_prepare_window() {
3093 let inputs = vec![
3094 EmbeddingInput::new(1, "repeat me exactly"),
3095 EmbeddingInput::new(2, "repeat me exactly"),
3096 EmbeddingInput::new(3, "unique payload"),
3097 EmbeddingInput::new(4, ""),
3098 EmbeddingInput::new(5, "repeat me exactly"),
3099 ];
3100
3101 let baseline = prepare_window(&inputs, true);
3102 let mut cache = ContentAddressedMemoCache::with_capacity(16);
3103 let memoized = prepare_window_with_memo(&inputs, &mut cache);
3104
3105 assert_eq!(baseline.len(), memoized.len());
3106 for (plain, cached) in baseline.iter().zip(memoized.iter()) {
3107 assert_eq!(plain.msg.message_id, cached.msg.message_id);
3108 assert_eq!(plain.canonical, cached.canonical);
3109 assert_eq!(plain.hash, cached.hash);
3110 }
3111 }
3112
3113 #[test]
3114 fn semantic_prep_memo_key_uses_stable_content_hash_bytes() {
3115 let key = semantic_prep_memo_key("repeat me exactly");
3116 let expected = content_hash("repeat me exactly");
3117
3118 assert_eq!(key.content_hash.as_bytes(), expected.as_slice());
3119 assert_eq!(key.content_hash.as_bytes().len(), expected.len());
3120 assert_eq!(key.algorithm, SEMANTIC_PREP_MEMO_ALGORITHM);
3121 assert_eq!(key.algorithm_version, SEMANTIC_PREP_MEMO_VERSION);
3122 }
3123
3124 #[test]
3125 fn memoized_serial_prep_reuses_duplicate_content_across_windows() {
3126 let inputs = vec![
3127 EmbeddingInput::new(1, "repeat me exactly"),
3128 EmbeddingInput::new(2, "repeat me exactly"),
3129 EmbeddingInput::new(3, "unique payload"),
3130 EmbeddingInput::new(4, ""),
3131 EmbeddingInput::new(5, "repeat me exactly"),
3132 ];
3133
3134 let mut cache = ContentAddressedMemoCache::with_capacity(16);
3135 let prepared = prepare_window_with_memo(&inputs, &mut cache);
3136 let stats = cache.stats().clone();
3137
3138 assert_eq!(prepared.len(), 4);
3139 assert_eq!(stats.hits, 2);
3140 assert_eq!(stats.misses, 3);
3141 assert_eq!(stats.inserts, 2);
3142 assert_eq!(stats.live_entries, 2);
3143 }
3144
3145 #[test]
3146 fn packet_embedding_inputs_reuse_memoized_prep_for_duplicate_content() -> Result<()> {
3147 let temp = tempdir().unwrap();
3148 let db_path = temp.path().join("agent_search.db");
3149 let storage = FrankenStorage::open(&db_path)?;
3150 let agent_id = storage.ensure_agent(&Agent {
3151 id: None,
3152 slug: "codex".to_string(),
3153 name: "Codex".to_string(),
3154 version: None,
3155 kind: AgentKind::Cli,
3156 })?;
3157
3158 storage.insert_conversation_tree(
3159 agent_id,
3160 None,
3161 &test_conversation_with_messages(
3162 "packet-memo-conv-one",
3163 vec![
3164 Message {
3165 id: None,
3166 idx: 0,
3167 role: MessageRole::User,
3168 author: None,
3169 created_at: Some(1_700_000_010_100),
3170 content: "shared semantic payload".to_string(),
3171 extra_json: json!({}),
3172 snippets: Vec::new(),
3173 },
3174 Message {
3175 id: None,
3176 idx: 1,
3177 role: MessageRole::Agent,
3178 author: None,
3179 created_at: Some(1_700_000_010_200),
3180 content: "unique semantic payload one".to_string(),
3181 extra_json: json!({}),
3182 snippets: Vec::new(),
3183 },
3184 ],
3185 ),
3186 )?;
3187 storage.insert_conversation_tree(
3188 agent_id,
3189 None,
3190 &test_conversation_with_messages(
3191 "packet-memo-conv-two",
3192 vec![
3193 Message {
3194 id: None,
3195 idx: 0,
3196 role: MessageRole::Tool,
3197 author: None,
3198 created_at: Some(1_700_000_010_300),
3199 content: "shared semantic payload".to_string(),
3200 extra_json: json!({}),
3201 snippets: Vec::new(),
3202 },
3203 Message {
3204 id: None,
3205 idx: 1,
3206 role: MessageRole::Agent,
3207 author: None,
3208 created_at: Some(1_700_000_010_400),
3209 content: "unique semantic payload two".to_string(),
3210 extra_json: json!({}),
3211 snippets: Vec::new(),
3212 },
3213 ],
3214 ),
3215 )?;
3216
3217 let packet_inputs = packet_embedding_inputs_from_storage(&storage)?;
3218 let mut cache = ContentAddressedMemoCache::with_capacity(16);
3219 let prepared = prepare_window_with_memo(&packet_inputs, &mut cache);
3220 let stats = cache.stats().clone();
3221
3222 assert_eq!(packet_inputs.len(), 4);
3223 assert_eq!(prepared.len(), 4);
3224 assert_eq!(
3225 semantic_prep_memo_key("shared semantic payload")
3226 .content_hash
3227 .as_bytes()
3228 .len(),
3229 32
3230 );
3231 assert_eq!(stats.hits, 1);
3232 assert_eq!(stats.misses, 3);
3233 assert_eq!(stats.inserts, 3);
3234 assert_eq!(stats.live_entries, 3);
3235 Ok(())
3236 }
3237
3238 #[test]
3239 fn backfill_batch_saves_checkpoint_and_staged_index_until_complete() {
3240 let temp = tempdir().unwrap();
3241 let mut manifest = SemanticManifest::default();
3242 let indexer = SemanticIndexer::new("hash", None).unwrap();
3243 let messages = vec![
3244 EmbeddingInput::new(10, "first staged semantic message"),
3245 EmbeddingInput::new(11, "second staged semantic message"),
3246 ];
3247
3248 let outcome = indexer
3249 .run_backfill_batch(
3250 &messages,
3251 temp.path(),
3252 &mut manifest,
3253 SemanticBackfillBatchPlan {
3254 tier: TierKind::Fast,
3255 db_fingerprint: "db-fp-backfill-partial".to_string(),
3256 model_revision: "hash".to_string(),
3257 total_conversations: 2,
3258 conversations_in_batch: 1,
3259 last_offset: 1,
3260 cursor_exhausted: false,
3261 },
3262 )
3263 .unwrap();
3264
3265 assert!(!outcome.published);
3266 assert!(outcome.checkpoint_saved);
3267 assert!(outcome.index_path.exists());
3268 assert!(!vector_index_path(temp.path(), indexer.embedder_id()).exists());
3269 let checkpoint = manifest.checkpoint.as_ref().expect("checkpoint");
3270 assert_eq!(checkpoint.tier, TierKind::Fast);
3271 assert_eq!(checkpoint.conversations_processed, 1);
3272 assert_eq!(checkpoint.docs_embedded, 2);
3273 assert_eq!(manifest.backlog.total_conversations, 2);
3274 assert!(SemanticManifest::path(temp.path()).exists());
3275 }
3276
3277 #[test]
3278 fn backfill_batch_does_not_publish_until_cursor_exhausted() -> Result<()> {
3279 let temp = tempdir()?;
3280 let mut manifest = SemanticManifest::default();
3281 let indexer = SemanticIndexer::new("hash", None)?;
3282 let db_fingerprint = "db-fp-backfill-cursor-not-exhausted";
3283
3284 let first = vec![EmbeddingInput::new(30, "first cursor batch")];
3285 let first_outcome = indexer.run_backfill_batch(
3286 &first,
3287 temp.path(),
3288 &mut manifest,
3289 SemanticBackfillBatchPlan {
3290 tier: TierKind::Fast,
3291 db_fingerprint: db_fingerprint.to_string(),
3292 model_revision: "hash".to_string(),
3293 total_conversations: 2,
3294 conversations_in_batch: 1,
3295 last_offset: 1,
3296 cursor_exhausted: false,
3297 },
3298 )?;
3299 anyhow::ensure!(!first_outcome.published, "first batch must not publish");
3300 anyhow::ensure!(
3301 first_outcome.checkpoint_saved,
3302 "first batch should save a checkpoint"
3303 );
3304
3305 let second = vec![EmbeddingInput::new(31, "second cursor batch")];
3306 let second_outcome = indexer.run_backfill_batch(
3307 &second,
3308 temp.path(),
3309 &mut manifest,
3310 SemanticBackfillBatchPlan {
3311 tier: TierKind::Fast,
3312 db_fingerprint: db_fingerprint.to_string(),
3313 model_revision: "hash".to_string(),
3314 total_conversations: 2,
3315 conversations_in_batch: 1,
3316 last_offset: 2,
3317 cursor_exhausted: false,
3318 },
3319 )?;
3320
3321 anyhow::ensure!(
3322 !second_outcome.published,
3323 "count-based completion would publish here; cursor state must win"
3324 );
3325 anyhow::ensure!(
3326 second_outcome.checkpoint_saved,
3327 "second batch should save a checkpoint"
3328 );
3329 anyhow::ensure!(
3330 !vector_index_path(temp.path(), indexer.embedder_id()).exists(),
3331 "non-exhausted cursor must not publish the final vector index"
3332 );
3333 let Some(checkpoint) = manifest.checkpoint.as_ref() else {
3334 bail!("checkpoint should remain after a non-exhausted cursor");
3335 };
3336 anyhow::ensure!(
3337 checkpoint.conversations_processed == 2,
3338 "wanted 2 processed conversations, got {}",
3339 checkpoint.conversations_processed
3340 );
3341 anyhow::ensure!(
3342 checkpoint.total_conversations == 2,
3343 "checkpoint should preserve the real DB total, got {}",
3344 checkpoint.total_conversations
3345 );
3346 anyhow::ensure!(
3347 !checkpoint.cursor_exhausted,
3348 "saved checkpoint should preserve the non-exhausted cursor state"
3349 );
3350 anyhow::ensure!(
3351 !checkpoint.is_complete(),
3352 "non-exhausted cursor checkpoint must not be complete"
3353 );
3354 anyhow::ensure!(
3355 checkpoint.progress_pct() == 99,
3356 "non-exhausted cursor checkpoint should report 99% progress, got {}",
3357 checkpoint.progress_pct()
3358 );
3359 anyhow::ensure!(
3360 second_outcome.conversations_processed == 2,
3361 "wanted outcome to report 2 processed conversations, got {}",
3362 second_outcome.conversations_processed
3363 );
3364 anyhow::ensure!(
3365 second_outcome.total_conversations == 2,
3366 "wanted outcome to preserve total 2, got {}",
3367 second_outcome.total_conversations
3368 );
3369 let progress_pct = second_outcome.progress_pct();
3370 anyhow::ensure!(
3371 (progress_pct - 99.0).abs() < f64::EPSILON,
3372 "non-published outcome should cap progress below 100%, got {}",
3373 progress_pct
3374 );
3375 Ok(())
3376 }
3377
3378 #[test]
3379 fn backfill_batch_resumes_staged_index_and_publishes_manifest_atomically() {
3380 let temp = tempdir().unwrap();
3381 let mut manifest = SemanticManifest::default();
3382 let indexer = SemanticIndexer::new("hash", None).unwrap();
3383 let db_fingerprint = "db-fp-backfill-complete";
3384 let staging_path = semantic_staging_index_path(
3385 temp.path(),
3386 TierKind::Fast,
3387 indexer.embedder_id(),
3388 db_fingerprint,
3389 );
3390
3391 let first = vec![EmbeddingInput::new(20, "first resume batch")];
3392 let first_outcome = indexer
3393 .run_backfill_batch(
3394 &first,
3395 temp.path(),
3396 &mut manifest,
3397 SemanticBackfillBatchPlan {
3398 tier: TierKind::Fast,
3399 db_fingerprint: db_fingerprint.to_string(),
3400 model_revision: "hash".to_string(),
3401 total_conversations: 2,
3402 conversations_in_batch: 1,
3403 last_offset: 1,
3404 cursor_exhausted: false,
3405 },
3406 )
3407 .unwrap();
3408 assert_eq!(first_outcome.index_path, staging_path);
3409 assert!(staging_path.exists());
3410
3411 let second = vec![EmbeddingInput::new(21, "second resume batch")];
3412 let second_outcome = indexer
3413 .run_backfill_batch(
3414 &second,
3415 temp.path(),
3416 &mut manifest,
3417 SemanticBackfillBatchPlan {
3418 tier: TierKind::Fast,
3419 db_fingerprint: db_fingerprint.to_string(),
3420 model_revision: "hash".to_string(),
3421 total_conversations: 2,
3422 conversations_in_batch: 1,
3423 last_offset: 2,
3424 cursor_exhausted: true,
3425 },
3426 )
3427 .unwrap();
3428
3429 assert!(second_outcome.published);
3430 assert!(!second_outcome.checkpoint_saved);
3431 assert!((second_outcome.progress_pct() - 100.0).abs() < f64::EPSILON);
3432 assert!(!staging_path.exists());
3433 let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3434 assert_eq!(second_outcome.index_path, final_path);
3435 assert!(final_path.exists());
3436 assert!(manifest.checkpoint.is_none());
3437 let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3438 assert!(artifact.ready);
3439 assert_eq!(artifact.conversation_count, 2);
3440 assert_eq!(artifact.doc_count, 2);
3441 assert_eq!(manifest.backlog.fast_tier_processed, 2);
3442
3443 let loaded = SemanticManifest::load(temp.path()).unwrap().unwrap();
3444 assert!(loaded.checkpoint.is_none());
3445 assert!(loaded.fast_tier.as_ref().is_some_and(|record| record.ready));
3446 }
3447
3448 #[test]
3449 fn backfill_publish_compacts_resumed_wal_before_rename() {
3450 let temp = tempdir().unwrap();
3451 let mut manifest = SemanticManifest::default();
3452 let indexer = SemanticIndexer::new("hash", None).unwrap();
3453 let db_fingerprint = "db-fp-backfill-small-resume";
3454 let first: Vec<EmbeddingInput> = (0..20)
3455 .map(|idx| EmbeddingInput::new(100 + idx, format!("first batch message {idx}")))
3456 .collect();
3457
3458 let first_outcome = indexer
3459 .run_backfill_batch(
3460 &first,
3461 temp.path(),
3462 &mut manifest,
3463 SemanticBackfillBatchPlan {
3464 tier: TierKind::Fast,
3465 db_fingerprint: db_fingerprint.to_string(),
3466 model_revision: "hash".to_string(),
3467 total_conversations: 2,
3468 conversations_in_batch: 1,
3469 last_offset: 1,
3470 cursor_exhausted: false,
3471 },
3472 )
3473 .unwrap();
3474 assert!(first_outcome.checkpoint_saved);
3475
3476 let second = vec![EmbeddingInput::new(200, "small final resume batch")];
3477 let second_outcome = indexer
3478 .run_backfill_batch(
3479 &second,
3480 temp.path(),
3481 &mut manifest,
3482 SemanticBackfillBatchPlan {
3483 tier: TierKind::Fast,
3484 db_fingerprint: db_fingerprint.to_string(),
3485 model_revision: "hash".to_string(),
3486 total_conversations: 2,
3487 conversations_in_batch: 1,
3488 last_offset: 2,
3489 cursor_exhausted: true,
3490 },
3491 )
3492 .unwrap();
3493
3494 assert!(second_outcome.published);
3495 let final_path = vector_index_path(temp.path(), indexer.embedder_id());
3496 let mut final_wal_path = final_path.as_os_str().to_os_string();
3497 final_wal_path.push(".wal");
3498 assert!(!PathBuf::from(final_wal_path).exists());
3499
3500 let published_index = FsVectorIndex::open(&final_path).unwrap();
3501 assert_eq!(published_index.record_count(), 21);
3502 let artifact = manifest.fast_tier.as_ref().expect("published fast tier");
3503 assert_eq!(artifact.doc_count, 21);
3504 assert_eq!(artifact.conversation_count, 2);
3505 }
3506
3507 #[test]
3508 fn backfill_from_storage_fetches_canonical_batches_and_resumes() -> Result<()> {
3509 let temp = tempdir().unwrap();
3510 let db_path = temp.path().join("agent_search.db");
3511 let storage = FrankenStorage::open(&db_path)?;
3512 let agent_id = storage.ensure_agent(&Agent {
3513 id: None,
3514 slug: "codex".to_string(),
3515 name: "Codex".to_string(),
3516 version: None,
3517 kind: AgentKind::Cli,
3518 })?;
3519 storage.insert_conversation_tree(
3520 agent_id,
3521 None,
3522 &test_conversation("first", "first canonical semantic message"),
3523 )?;
3524 storage.insert_conversation_tree(
3525 agent_id,
3526 None,
3527 &test_conversation("second", "second canonical semantic message"),
3528 )?;
3529
3530 let mut manifest = SemanticManifest::default();
3531 let indexer = SemanticIndexer::new("hash", None)?;
3532
3533 let first = indexer.run_backfill_from_storage(
3534 &storage,
3535 temp.path(),
3536 &mut manifest,
3537 SemanticBackfillStoragePlan {
3538 tier: TierKind::Fast,
3539 db_fingerprint: "canonical-db-fp".to_string(),
3540 model_revision: "hash".to_string(),
3541 max_conversations: 1,
3542 },
3543 )?;
3544 assert!(!first.published);
3545 assert!(first.checkpoint_saved);
3546 assert_eq!(first.conversations_processed, 1);
3547 assert_eq!(first.total_conversations, 2);
3548 assert_eq!(first.embedded_docs, 1);
3549 assert!(first.last_offset > 0);
3550
3551 let second = indexer.run_backfill_from_storage(
3552 &storage,
3553 temp.path(),
3554 &mut manifest,
3555 SemanticBackfillStoragePlan {
3556 tier: TierKind::Fast,
3557 db_fingerprint: "canonical-db-fp".to_string(),
3558 model_revision: "hash".to_string(),
3559 max_conversations: 1,
3560 },
3561 )?;
3562 assert!(second.published);
3563 assert!(!second.checkpoint_saved);
3564 assert_eq!(second.conversations_processed, 2);
3565 assert_eq!(second.embedded_docs, 1);
3566 assert!(manifest.checkpoint.is_none());
3567 assert_eq!(
3568 manifest.fast_tier.as_ref().map(|record| record.doc_count),
3569 Some(2)
3570 );
3571 Ok(())
3572 }
3573
3574 #[test]
3575 fn canonical_embedding_batch_uses_conversation_packet_semantic_projection() -> Result<()> {
3576 let temp = tempdir().unwrap();
3577 let db_path = temp.path().join("agent_search.db");
3578 let storage = FrankenStorage::open(&db_path)?;
3579 let agent_id = storage.ensure_agent(&Agent {
3580 id: None,
3581 slug: "codex".to_string(),
3582 name: "Codex".to_string(),
3583 version: None,
3584 kind: AgentKind::Cli,
3585 })?;
3586 storage.insert_conversation_tree(
3587 agent_id,
3588 None,
3589 &test_conversation_with_messages(
3590 "packet-projection",
3591 vec![
3592 Message {
3593 id: None,
3594 idx: 0,
3595 role: MessageRole::User,
3596 author: None,
3597 created_at: Some(1_700_000_000_500),
3598 content: "user semantic text".to_string(),
3599 extra_json: json!({}),
3600 snippets: Vec::new(),
3601 },
3602 Message {
3603 id: None,
3604 idx: 1,
3605 role: MessageRole::Tool,
3606 author: None,
3607 created_at: Some(1_700_000_000_600),
3608 content: "tool semantic text".to_string(),
3609 extra_json: json!({}),
3610 snippets: Vec::new(),
3611 },
3612 Message {
3613 id: None,
3614 idx: 2,
3615 role: MessageRole::System,
3616 author: None,
3617 created_at: Some(1_700_000_000_700),
3618 content: String::new(),
3619 extra_json: json!({}),
3620 snippets: Vec::new(),
3621 },
3622 ],
3623 ),
3624 )?;
3625
3626 let batch = fetch_canonical_embedding_batch(&storage, 0, 1)?;
3627
3628 assert_eq!(batch.conversations_in_batch, 1);
3629 assert_eq!(batch.inputs.len(), 2);
3630 assert_eq!(batch.inputs[0].content, "user semantic text");
3631 assert_eq!(batch.inputs[1].content, "tool semantic text");
3632 assert_eq!(batch.inputs[0].role, role_code_from_str("user").unwrap());
3633 assert_eq!(batch.inputs[1].role, role_code_from_str("tool").unwrap());
3634 let normalized_source_id =
3635 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3636 let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
3637 assert_eq!(batch.inputs[0].source_id, expected_hash);
3638 assert_eq!(batch.inputs[1].source_id, expected_hash);
3639 Ok(())
3640 }
3641
3642 #[test]
3643 fn canonical_embedding_batch_pushes_last_message_id_filter_into_selection() -> Result<()> {
3644 let temp = tempdir().unwrap();
3645 let db_path = temp.path().join("agent_search.db");
3646 let storage = FrankenStorage::open(&db_path)?;
3647 let agent_id = storage.ensure_agent(&Agent {
3648 id: None,
3649 slug: "codex".to_string(),
3650 name: "Codex".to_string(),
3651 version: None,
3652 kind: AgentKind::Cli,
3653 })?;
3654 storage.insert_conversation_tree(
3655 agent_id,
3656 None,
3657 &test_conversation("before-watermark", "old semantic message"),
3658 )?;
3659 let watermark: i64 = storage.raw().query_row_map(
3660 "SELECT MAX(id) FROM messages",
3661 &[] as &[ParamValue],
3662 |row| row.get_typed(0),
3663 )?;
3664 storage.insert_conversation_tree(
3665 agent_id,
3666 None,
3667 &test_conversation("after-watermark", "new semantic message"),
3668 )?;
3669
3670 let batch = fetch_canonical_embedding_batch_inner(&storage, 0, 8, Some(watermark))?;
3671
3672 assert_eq!(
3673 batch.conversations_in_batch, 1,
3674 "last_message_id must be pushed into candidate selection so old conversations are not counted in the batch"
3675 );
3676 assert_eq!(batch.total_conversations, 2);
3677 anyhow::ensure!(
3678 batch.cursor_exhausted,
3679 "message-cursor selection should exhaust the remaining cursor"
3680 );
3681 assert_eq!(batch.inputs.len(), 1);
3682 assert_eq!(batch.inputs[0].content, "new semantic message");
3683 assert!(
3684 i64::try_from(batch.inputs[0].message_id).unwrap_or(i64::MAX) > watermark,
3685 "selected semantic input must be strictly newer than the checkpoint message id"
3686 );
3687 Ok(())
3688 }
3689
3690 #[test]
3691 fn canonical_embedding_batch_reports_unexhausted_cursor_at_sql_limit() -> Result<()> {
3692 let temp = tempdir()?;
3693 let db_path = temp.path().join("agent_search.db");
3694 let storage = FrankenStorage::open(&db_path)?;
3695 let agent_id = storage.ensure_agent(&Agent {
3696 id: None,
3697 slug: "codex".to_string(),
3698 name: "Codex".to_string(),
3699 version: None,
3700 kind: AgentKind::Cli,
3701 })?;
3702 storage.insert_conversation_tree(
3703 agent_id,
3704 None,
3705 &test_conversation("limit-first", "first limit semantic message"),
3706 )?;
3707 storage.insert_conversation_tree(
3708 agent_id,
3709 None,
3710 &test_conversation("limit-second", "second limit semantic message"),
3711 )?;
3712
3713 let first = fetch_canonical_embedding_batch_inner(&storage, 0, 1, None)?;
3714 anyhow::ensure!(
3715 first.conversations_in_batch == 1,
3716 "wanted first batch to select 1 conversation, got {}",
3717 first.conversations_in_batch
3718 );
3719 anyhow::ensure!(
3720 !first.cursor_exhausted,
3721 "over-fetching one candidate should prove another conversation remains"
3722 );
3723
3724 let second =
3725 fetch_canonical_embedding_batch_inner(&storage, first.last_conversation_id, 1, None)?;
3726 anyhow::ensure!(
3727 second.conversations_in_batch == 1,
3728 "wanted second batch to select 1 conversation, got {}",
3729 second.conversations_in_batch
3730 );
3731 anyhow::ensure!(
3732 second.cursor_exhausted,
3733 "the final page should report cursor exhaustion even when it exactly fills the requested limit"
3734 );
3735 Ok(())
3736 }
3737
3738 #[test]
3739 fn checkpoint_caps_stop_at_whole_conversation_boundary() -> Result<()> {
3740 let temp = tempdir().unwrap();
3741 let db_path = temp.path().join("agent_search.db");
3742 let storage = FrankenStorage::open(&db_path)?;
3743 let agent_id = storage.ensure_agent(&Agent {
3744 id: None,
3745 slug: "codex".to_string(),
3746 name: "Codex".to_string(),
3747 version: None,
3748 kind: AgentKind::Cli,
3749 })?;
3750 for external_id in ["cap-first", "cap-second", "cap-third"] {
3751 storage.insert_conversation_tree(
3752 agent_id,
3753 None,
3754 &test_conversation_with_messages(
3755 external_id,
3756 vec![
3757 Message {
3758 id: None,
3759 idx: 0,
3760 role: MessageRole::User,
3761 author: None,
3762 created_at: Some(1_700_000_000_500),
3763 content: format!("{external_id} user semantic text"),
3764 extra_json: json!({}),
3765 snippets: Vec::new(),
3766 },
3767 Message {
3768 id: None,
3769 idx: 1,
3770 role: MessageRole::Agent,
3771 author: None,
3772 created_at: Some(1_700_000_000_600),
3773 content: format!("{external_id} assistant semantic text"),
3774 extra_json: json!({}),
3775 snippets: Vec::new(),
3776 },
3777 ],
3778 ),
3779 )?;
3780 }
3781
3782 let first_conversation_id: i64 = storage.raw().query_row_map(
3783 "SELECT MIN(id) FROM conversations",
3784 &[] as &[ParamValue],
3785 |row| row.get_typed(0),
3786 )?;
3787 let batch = fetch_canonical_embedding_batch_inner_with_caps(
3788 &storage,
3789 0,
3790 8,
3791 None,
3792 SemanticCheckpointCaps {
3793 max_messages: 3,
3794 max_bytes: 0,
3795 },
3796 )?;
3797
3798 assert_eq!(batch.conversations_in_batch, 1);
3799 anyhow::ensure!(
3800 !batch.cursor_exhausted,
3801 "checkpoint caps that stop before the next whole conversation must not publish"
3802 );
3803 assert_eq!(batch.last_conversation_id, first_conversation_id);
3804 assert_eq!(batch.inputs.len(), 2);
3805 assert!(
3806 batch
3807 .inputs
3808 .iter()
3809 .all(|input| input.content.contains("cap-first"))
3810 );
3811 assert_eq!(batch.total_conversations, 3);
3812 Ok(())
3813 }
3814
3815 #[test]
3816 fn packet_embedding_inputs_from_storage_since_only_emits_new_canonical_messages() -> Result<()>
3817 {
3818 let temp = tempdir().unwrap();
3819 let db_path = temp.path().join("agent_search.db");
3820 let storage = FrankenStorage::open(&db_path)?;
3821 let agent_id = storage.ensure_agent(&Agent {
3822 id: None,
3823 slug: "codex".to_string(),
3824 name: "Codex".to_string(),
3825 version: None,
3826 kind: AgentKind::Cli,
3827 })?;
3828 storage.insert_conversation_tree(
3829 agent_id,
3830 None,
3831 &test_conversation_with_messages(
3832 "packet-delta",
3833 vec![
3834 Message {
3835 id: None,
3836 idx: 0,
3837 role: MessageRole::User,
3838 author: None,
3839 created_at: Some(1_700_000_000_500),
3840 content: "existing semantic text".to_string(),
3841 extra_json: json!({}),
3842 snippets: Vec::new(),
3843 },
3844 Message {
3845 id: None,
3846 idx: 1,
3847 role: MessageRole::Agent,
3848 author: None,
3849 created_at: Some(1_700_000_000_600),
3850 content: "existing assistant text".to_string(),
3851 extra_json: json!({}),
3852 snippets: Vec::new(),
3853 },
3854 ],
3855 ),
3856 )?;
3857 let watermark: i64 = storage.raw().query_row_map(
3858 "SELECT MAX(id) FROM messages",
3859 &[] as &[ParamValue],
3860 |row| row.get_typed(0),
3861 )?;
3862
3863 storage.insert_conversation_tree(
3864 agent_id,
3865 None,
3866 &test_conversation_with_messages(
3867 "packet-delta",
3868 vec![
3869 Message {
3870 id: None,
3871 idx: 0,
3872 role: MessageRole::User,
3873 author: None,
3874 created_at: Some(1_700_000_000_500),
3875 content: "existing semantic text".to_string(),
3876 extra_json: json!({}),
3877 snippets: Vec::new(),
3878 },
3879 Message {
3880 id: None,
3881 idx: 1,
3882 role: MessageRole::Agent,
3883 author: None,
3884 created_at: Some(1_700_000_000_600),
3885 content: "existing assistant text".to_string(),
3886 extra_json: json!({}),
3887 snippets: Vec::new(),
3888 },
3889 Message {
3890 id: None,
3891 idx: 2,
3892 role: MessageRole::Agent,
3893 author: None,
3894 created_at: Some(1_700_000_000_700),
3895 content: "new packet semantic text".to_string(),
3896 extra_json: json!({}),
3897 snippets: Vec::new(),
3898 },
3899 Message {
3900 id: None,
3901 idx: 3,
3902 role: MessageRole::System,
3903 author: None,
3904 created_at: Some(1_700_000_000_800),
3905 content: String::new(),
3906 extra_json: json!({}),
3907 snippets: Vec::new(),
3908 },
3909 ],
3910 ),
3911 )?;
3912
3913 let batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
3914
3915 assert_eq!(batch.conversations_in_batch, 1);
3916 assert_eq!(batch.inputs.len(), 1);
3917 assert_eq!(batch.inputs[0].content, "new packet semantic text");
3918 assert_eq!(
3919 batch.inputs[0].role,
3920 role_code_from_str("assistant").unwrap()
3921 );
3922 let normalized_source_id =
3923 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
3924 assert_eq!(
3925 batch.inputs[0].source_id,
3926 crc32fast::hash(normalized_source_id.as_bytes())
3927 );
3928 let expected_raw_max_id: i64 = storage.raw().query_row_map(
3929 "SELECT MAX(id) FROM messages",
3930 &[] as &[ParamValue],
3931 |row| row.get_typed(0),
3932 )?;
3933 assert_eq!(batch.raw_max_message_id, Some(expected_raw_max_id));
3934 Ok(())
3935 }
3936
3937 #[test]
3938 fn packet_catch_up_emits_expected_semantic_docs_after_watermark() -> Result<()> {
3939 let temp = tempdir().unwrap();
3940 let db_path = temp.path().join("agent_search.db");
3941 let storage = FrankenStorage::open(&db_path)?;
3942 let agent_id = storage.ensure_agent(&Agent {
3943 id: None,
3944 slug: "codex".to_string(),
3945 name: "Codex".to_string(),
3946 version: None,
3947 kind: AgentKind::Cli,
3948 })?;
3949 let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
3950
3951 storage.insert_conversation_tree(
3952 agent_id,
3953 Some(workspace_id),
3954 &test_conversation_with_messages(
3955 "legacy-packet-semantics",
3956 vec![
3957 Message {
3958 id: None,
3959 idx: 0,
3960 role: MessageRole::User,
3961 author: None,
3962 created_at: Some(1_700_000_000_500),
3963 content: "before watermark".to_string(),
3964 extra_json: json!({}),
3965 snippets: Vec::new(),
3966 },
3967 Message {
3968 id: None,
3969 idx: 1,
3970 role: MessageRole::Agent,
3971 author: None,
3972 created_at: Some(1_700_000_000_600),
3973 content: "before watermark assistant".to_string(),
3974 extra_json: json!({}),
3975 snippets: Vec::new(),
3976 },
3977 ],
3978 ),
3979 )?;
3980
3981 let watermark: i64 = storage.raw().query_row_map(
3982 "SELECT MAX(id) FROM messages",
3983 &[] as &[ParamValue],
3984 |row| row.get_typed(0),
3985 )?;
3986
3987 storage.insert_conversation_tree(
3988 agent_id,
3989 Some(workspace_id),
3990 &test_conversation_with_messages(
3991 "legacy-packet-semantics",
3992 vec![
3993 Message {
3994 id: None,
3995 idx: 0,
3996 role: MessageRole::User,
3997 author: None,
3998 created_at: Some(1_700_000_000_500),
3999 content: "before watermark".to_string(),
4000 extra_json: json!({}),
4001 snippets: Vec::new(),
4002 },
4003 Message {
4004 id: None,
4005 idx: 1,
4006 role: MessageRole::Agent,
4007 author: None,
4008 created_at: Some(1_700_000_000_600),
4009 content: "before watermark assistant".to_string(),
4010 extra_json: json!({}),
4011 snippets: Vec::new(),
4012 },
4013 Message {
4014 id: None,
4015 idx: 2,
4016 role: MessageRole::Agent,
4017 author: None,
4018 created_at: Some(1_700_000_000_700),
4019 content: "after watermark assistant".to_string(),
4020 extra_json: json!({}),
4021 snippets: Vec::new(),
4022 },
4023 Message {
4024 id: None,
4025 idx: 3,
4026 role: MessageRole::System,
4027 author: None,
4028 created_at: Some(1_700_000_000_800),
4029 content: String::new(),
4030 extra_json: json!({}),
4031 snippets: Vec::new(),
4032 },
4033 ],
4034 ),
4035 )?;
4036 storage.insert_conversation_tree(
4037 agent_id,
4038 Some(workspace_id),
4039 &test_conversation_with_messages(
4040 "legacy-packet-semantics-second-conv",
4041 vec![
4042 Message {
4043 id: None,
4044 idx: 0,
4045 role: MessageRole::Tool,
4046 author: None,
4047 created_at: Some(1_700_000_000_900),
4048 content: "after watermark tool".to_string(),
4049 extra_json: json!({}),
4050 snippets: Vec::new(),
4051 },
4052 Message {
4053 id: None,
4054 idx: 1,
4055 role: MessageRole::System,
4056 author: None,
4057 created_at: Some(1_700_000_001_000),
4058 content: String::new(),
4059 extra_json: json!({}),
4060 snippets: Vec::new(),
4061 },
4062 ],
4063 ),
4064 )?;
4065
4066 let packet_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
4067 let normalized_source_id =
4068 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
4069 let source_id_hash = crc32fast::hash(normalized_source_id.as_bytes());
4070 let expected = vec![
4071 ComparableSemanticInput {
4072 message_id: u64::try_from(watermark + 1).unwrap(),
4073 created_at_ms: 1_700_000_000_700,
4074 agent_id: u32::try_from(agent_id).unwrap(),
4075 workspace_id: u32::try_from(workspace_id).unwrap(),
4076 source_id: source_id_hash,
4077 role: role_code_from_str("assistant").unwrap(),
4078 content: "after watermark assistant".to_string(),
4079 },
4080 ComparableSemanticInput {
4081 message_id: u64::try_from(watermark + 3).unwrap(),
4082 created_at_ms: 1_700_000_000_900,
4083 agent_id: u32::try_from(agent_id).unwrap(),
4084 workspace_id: u32::try_from(workspace_id).unwrap(),
4085 source_id: source_id_hash,
4086 role: role_code_from_str("tool").unwrap(),
4087 content: "after watermark tool".to_string(),
4088 },
4089 ];
4090
4091 assert_eq!(comparable_semantic_inputs(packet_batch.inputs), expected);
4092 assert_eq!(packet_batch.conversations_in_batch, 2);
4093 assert_eq!(packet_batch.raw_max_message_id, Some(watermark + 4));
4094 Ok(())
4095 }
4096
4097 #[test]
4098 fn packet_embedding_inputs_for_message_ids_matches_since_selection() -> Result<()> {
4099 let temp = tempdir().unwrap();
4100 let db_path = temp.path().join("agent_search.db");
4101 let storage = FrankenStorage::open(&db_path)?;
4102 let agent_id = storage.ensure_agent(&Agent {
4103 id: None,
4104 slug: "codex".to_string(),
4105 name: "Codex".to_string(),
4106 version: None,
4107 kind: AgentKind::Cli,
4108 })?;
4109 let workspace_id = storage.ensure_workspace(Path::new("/tmp/workspace"), None)?;
4110
4111 storage.insert_conversation_tree(
4112 agent_id,
4113 Some(workspace_id),
4114 &test_conversation_with_messages(
4115 "selected-vs-since",
4116 vec![
4117 Message {
4118 id: None,
4119 idx: 0,
4120 role: MessageRole::User,
4121 author: None,
4122 created_at: Some(1_700_000_100_100),
4123 content: "before watermark".to_string(),
4124 extra_json: json!({}),
4125 snippets: Vec::new(),
4126 },
4127 Message {
4128 id: None,
4129 idx: 1,
4130 role: MessageRole::Agent,
4131 author: None,
4132 created_at: Some(1_700_000_100_200),
4133 content: "before watermark assistant".to_string(),
4134 extra_json: json!({}),
4135 snippets: Vec::new(),
4136 },
4137 ],
4138 ),
4139 )?;
4140
4141 let watermark: i64 = storage.raw().query_row_map(
4142 "SELECT MAX(id) FROM messages",
4143 &[] as &[ParamValue],
4144 |row| row.get_typed(0),
4145 )?;
4146
4147 storage.insert_conversation_tree(
4148 agent_id,
4149 Some(workspace_id),
4150 &test_conversation_with_messages(
4151 "selected-vs-since",
4152 vec![
4153 Message {
4154 id: None,
4155 idx: 0,
4156 role: MessageRole::User,
4157 author: None,
4158 created_at: Some(1_700_000_100_100),
4159 content: "before watermark".to_string(),
4160 extra_json: json!({}),
4161 snippets: Vec::new(),
4162 },
4163 Message {
4164 id: None,
4165 idx: 1,
4166 role: MessageRole::Agent,
4167 author: None,
4168 created_at: Some(1_700_000_100_200),
4169 content: "before watermark assistant".to_string(),
4170 extra_json: json!({}),
4171 snippets: Vec::new(),
4172 },
4173 Message {
4174 id: None,
4175 idx: 2,
4176 role: MessageRole::Tool,
4177 author: None,
4178 created_at: Some(1_700_000_100_300),
4179 content: "after watermark tool".to_string(),
4180 extra_json: json!({}),
4181 snippets: Vec::new(),
4182 },
4183 Message {
4184 id: None,
4185 idx: 3,
4186 role: MessageRole::System,
4187 author: None,
4188 created_at: Some(1_700_000_100_400),
4189 content: String::new(),
4190 extra_json: json!({}),
4191 snippets: Vec::new(),
4192 },
4193 ],
4194 ),
4195 )?;
4196 storage.insert_conversation_tree(
4197 agent_id,
4198 Some(workspace_id),
4199 &test_conversation_with_messages(
4200 "selected-vs-since-second",
4201 vec![Message {
4202 id: None,
4203 idx: 0,
4204 role: MessageRole::Agent,
4205 author: None,
4206 created_at: Some(1_700_000_100_500),
4207 content: "after watermark assistant".to_string(),
4208 extra_json: json!({}),
4209 snippets: Vec::new(),
4210 }],
4211 ),
4212 )?;
4213
4214 let since_batch = packet_embedding_inputs_from_storage_since(&storage, watermark)?;
4215 let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
4216 "SELECT DISTINCT conversation_id
4217 FROM messages
4218 WHERE id > ?1
4219 ORDER BY conversation_id ASC",
4220 &[ParamValue::from(watermark)],
4221 |row| row.get_typed(0),
4222 )?;
4223 let selected_message_ids: HashSet<i64> = storage
4224 .raw()
4225 .query_map_collect(
4226 "SELECT id
4227 FROM messages
4228 WHERE id > ?1
4229 ORDER BY id ASC",
4230 &[ParamValue::from(watermark)],
4231 |row| row.get_typed(0),
4232 )?
4233 .into_iter()
4234 .collect();
4235 let selected_inputs = packet_embedding_inputs_from_storage_for_message_ids(
4236 &storage,
4237 &conversation_ids,
4238 &selected_message_ids,
4239 )?;
4240
4241 assert_eq!(
4242 comparable_semantic_inputs(selected_inputs),
4243 comparable_semantic_inputs(since_batch.inputs)
4244 );
4245 Ok(())
4246 }
4247
4248 #[test]
4249 fn default_batch_size_uses_new_value() {
4250 let _guard = EnvVarGuard::remove("CASS_SEMANTIC_BATCH_SIZE");
4253 let indexer = SemanticIndexer::new("hash", None).unwrap();
4254 assert_eq!(indexer.batch_size(), DEFAULT_SEMANTIC_BATCH_SIZE);
4255 }
4256
4257 #[test]
4258 fn semantic_watchdog_and_checkpoint_caps_have_derived_defaults() {
4259 let _warn = EnvVarGuard::remove("CASS_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS");
4260 let _fail = EnvVarGuard::remove("CASS_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS");
4261 let _messages = EnvVarGuard::remove("CASS_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT");
4262 let _bytes = EnvVarGuard::remove("CASS_SEMANTIC_MAX_BYTES_PER_CHECKPOINT");
4263
4264 assert_eq!(
4265 resolved_semantic_embed_batch_warn_after_ms(),
4266 DEFAULT_SEMANTIC_EMBED_BATCH_WARN_AFTER_MS
4267 );
4268 assert_eq!(
4269 resolved_semantic_embed_batch_fail_after_ms(),
4270 DEFAULT_SEMANTIC_EMBED_BATCH_FAIL_AFTER_MS
4271 );
4272 assert_eq!(
4273 SemanticCheckpointCaps::from_env(),
4274 SemanticCheckpointCaps {
4275 max_messages: DEFAULT_SEMANTIC_MAX_MESSAGES_PER_CHECKPOINT,
4276 max_bytes: DEFAULT_SEMANTIC_MAX_BYTES_PER_CHECKPOINT,
4277 }
4278 );
4279 }
4280
4281 #[test]
4282 fn parallel_prep_enabled_reuses_truthy_env_parser() {
4283 for (value, expected) in [
4284 ("1", true),
4285 ("true", true),
4286 (" YeS ", true),
4287 ("on", true),
4288 ("0", false),
4289 ("false", false),
4290 ("off", false),
4291 ] {
4292 let _guard = EnvVarGuard::set("CASS_SEMANTIC_PREP_PARALLEL", value);
4293 assert_eq!(parallel_prep_enabled(), expected, "env value {value:?}");
4294 }
4295
4296 let _guard = EnvVarGuard::remove("CASS_SEMANTIC_PREP_PARALLEL");
4297 assert!(!parallel_prep_enabled());
4298 }
4299
4300 #[test]
4301 fn saturating_u64_from_usize_covers_bounds() {
4302 assert_eq!(saturating_u64_from_usize(0), 0);
4303 assert_eq!(saturating_u64_from_usize(42), 42);
4304 assert_eq!(
4305 saturating_u64_from_usize(usize::MAX),
4306 u64::try_from(usize::MAX).unwrap_or(u64::MAX)
4307 );
4308 }
4309
4310 #[test]
4318 fn semantic_inputs_from_packets_matches_storage_replay() -> Result<()> {
4319 let temp = tempdir().unwrap();
4320 let db_path = temp.path().join("agent_search.db");
4321 let storage = FrankenStorage::open(&db_path)?;
4322
4323 let agent_id_codex = storage.ensure_agent(&Agent {
4324 id: None,
4325 slug: "codex".to_string(),
4326 name: "Codex".to_string(),
4327 version: None,
4328 kind: AgentKind::Cli,
4329 })?;
4330 let agent_id_claude = storage.ensure_agent(&Agent {
4331 id: None,
4332 slug: "claude_code".to_string(),
4333 name: "Claude Code".to_string(),
4334 version: None,
4335 kind: AgentKind::Cli,
4336 })?;
4337 let workspace_id =
4338 storage.ensure_workspace(Path::new("/tmp/semantic-equivalence-ws"), None)?;
4339
4340 storage.insert_conversation_tree(
4344 agent_id_codex,
4345 Some(workspace_id),
4346 &test_conversation_with_messages(
4347 "packet-equiv-1",
4348 vec![
4349 Message {
4350 id: None,
4351 idx: 0,
4352 role: MessageRole::User,
4353 author: None,
4354 created_at: Some(1_700_000_000_500),
4355 content: "first user prompt".to_string(),
4356 extra_json: json!({}),
4357 snippets: Vec::new(),
4358 },
4359 Message {
4360 id: None,
4361 idx: 1,
4362 role: MessageRole::Agent,
4363 author: None,
4364 created_at: Some(1_700_000_000_600),
4365 content: "first assistant reply".to_string(),
4366 extra_json: json!({}),
4367 snippets: Vec::new(),
4368 },
4369 Message {
4370 id: None,
4371 idx: 2,
4372 role: MessageRole::System,
4373 author: None,
4374 created_at: Some(1_700_000_000_700),
4375 content: String::new(),
4377 extra_json: json!({}),
4378 snippets: Vec::new(),
4379 },
4380 ],
4381 ),
4382 )?;
4383 storage.insert_conversation_tree(
4384 agent_id_claude,
4385 Some(workspace_id),
4386 &test_conversation_with_messages(
4387 "packet-equiv-2",
4388 vec![
4389 Message {
4390 id: None,
4391 idx: 0,
4392 role: MessageRole::Tool,
4393 author: Some("ripgrep".to_string()),
4394 created_at: Some(1_700_000_001_500),
4395 content: "tool output line".to_string(),
4396 extra_json: json!({}),
4397 snippets: Vec::new(),
4398 },
4399 Message {
4400 id: None,
4401 idx: 1,
4402 role: MessageRole::Agent,
4403 author: None,
4404 created_at: Some(1_700_000_001_600),
4405 content: "second assistant reply".to_string(),
4406 extra_json: json!({}),
4407 snippets: Vec::new(),
4408 },
4409 ],
4410 ),
4411 )?;
4412
4413 let storage_inputs = packet_embedding_inputs_from_storage(&storage)?;
4416
4417 let conversation_ids: Vec<i64> = storage.raw().query_map_collect(
4423 "SELECT DISTINCT m.conversation_id
4424 FROM messages m
4425 JOIN conversations c ON c.id = m.conversation_id
4426 ORDER BY m.conversation_id ASC",
4427 &[] as &[ParamValue],
4428 |row| row.get_typed(0),
4429 )?;
4430 let envelopes = fetch_canonical_embedding_conversations(&storage, &conversation_ids)?;
4431 let mut grouped_messages =
4432 storage.fetch_messages_for_lexical_rebuild_batch(&conversation_ids, None, None)?;
4433 let mut packets: Vec<ConversationPacket> = Vec::with_capacity(envelopes.len());
4434 let mut contexts: Vec<SemanticPacketContext> = Vec::with_capacity(envelopes.len());
4435 for envelope in &envelopes {
4436 let messages = grouped_messages
4437 .remove(&envelope.conversation_id)
4438 .unwrap_or_default();
4439 let provenance = canonical_embedding_packet_provenance(envelope);
4440 let canonical = canonical_embedding_conversation(envelope, &provenance, messages);
4441 packets.push(ConversationPacket::from_canonical_replay(
4442 &canonical, provenance,
4443 ));
4444 contexts.push(SemanticPacketContext {
4445 conversation_id: envelope.conversation_id,
4446 agent_id: saturating_u32_from_i64(envelope.agent_id),
4447 workspace_id: saturating_u32_from_i64(envelope.workspace_id.unwrap_or(0)),
4448 });
4449 }
4450 let packet_inputs = semantic_inputs_from_packets(&packets, &contexts)?;
4451
4452 assert!(
4456 !storage_inputs.is_empty(),
4457 "fixture should produce non-empty semantic inputs (sanity)"
4458 );
4459 assert_eq!(
4460 comparable_semantic_inputs(storage_inputs.clone()),
4461 comparable_semantic_inputs(packet_inputs.clone()),
4462 "packet-driven semantic preparation must match storage replay byte-for-byte"
4463 );
4464
4465 let storage_count = storage_inputs.len();
4470 let packet_count = packet_inputs.len();
4471 assert_eq!(
4472 storage_count, packet_count,
4473 "storage and packet semantic input counts must agree exactly"
4474 );
4475 assert!(
4477 packet_inputs.iter().all(|input| !input.content.is_empty()),
4478 "empty content must be filtered by the packet semantic projection"
4479 );
4480 let normalized_source_id =
4482 normalized_index_source_id(Some("remote-laptop"), None, Some("builder-host"));
4483 let expected_hash = crc32fast::hash(normalized_source_id.as_bytes());
4484 assert!(
4485 packet_inputs
4486 .iter()
4487 .all(|input| input.source_id == expected_hash),
4488 "every emitted EmbeddingInput must hash provenance via the packet's normalized source_id"
4489 );
4490
4491 Ok(())
4492 }
4493
4494 #[test]
4500 fn semantic_inputs_from_packets_rejects_length_mismatch() {
4501 let provenance = ConversationPacketProvenance::local();
4502 let canonical = test_conversation("packet-mismatch", "hello");
4503 let packet = ConversationPacket::from_canonical_replay(&canonical, provenance);
4504 let result = semantic_inputs_from_packets(&[packet], &[]);
4505 assert!(
4506 result.is_err(),
4507 "expected error on packet/context length mismatch"
4508 );
4509 let err = result.unwrap_err().to_string();
4510 assert!(
4511 err.contains("length mismatch"),
4512 "error should mention length mismatch, got: {err}"
4513 );
4514 }
4515}