1use crate::config::{DerivedVectorBackendPolicy, SearchConfig};
4use crate::episodes;
5use crate::error::MemoryError;
6use crate::types::{
7 ExplainedResult, ScoreBreakdown, SearchContext, SearchResult, SearchSource, SearchSourceType,
8 VectorSearchReceiptV1,
9};
10use rusqlite::types::Value as SqlValue;
11use rusqlite::Connection;
12#[cfg(feature = "turbo-quant-codec")]
13use rusqlite::OptionalExtension;
14use stack_ids::DigestBuilder;
15#[cfg(feature = "turbo-quant-codec")]
16use std::collections::BinaryHeap;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20const VECTOR_SCAN_WARN_THRESHOLD: usize = 50_000;
22const VECTOR_SCAN_HARD_LIMIT: usize = 250_000;
24
25static VECTOR_SCAN_WARN_LIMIT: AtomicUsize = AtomicUsize::new(VECTOR_SCAN_WARN_THRESHOLD);
26static VECTOR_SCAN_BLOCK_LIMIT: AtomicUsize = AtomicUsize::new(VECTOR_SCAN_HARD_LIMIT);
27
28pub fn sanitize_fts_query(raw: &str) -> Option<String> {
38 let cleaned: String = raw
39 .chars()
40 .map(|c| {
41 if c.is_alphanumeric() || c.is_whitespace() || c == '_' {
42 c
43 } else {
44 ' '
45 }
46 })
47 .collect();
48
49 let tokens: Vec<&str> = cleaned
50 .split_whitespace()
51 .filter(|t| !matches!(t.to_uppercase().as_str(), "AND" | "OR" | "NOT" | "NEAR"))
52 .collect();
53
54 if tokens.is_empty() {
55 None
56 } else {
57 Some(
58 tokens
59 .into_iter()
60 .map(|token| format!("\"{}\"", token.replace('"', "\"\"")))
61 .collect::<Vec<_>>()
62 .join(" OR "),
63 )
64 }
65}
66
67pub fn cosine_similarity(a: &[f32], b: &[f32]) -> Result<f32, MemoryError> {
69 if a.len() != b.len() {
70 return Err(MemoryError::EmbeddingDimensionMismatch {
71 expected: a.len(),
72 actual: b.len(),
73 });
74 }
75 if let Some((index, _)) = a.iter().enumerate().find(|(_, value)| !value.is_finite()) {
76 return Err(MemoryError::NonFiniteEmbeddingValue { index });
77 }
78 if let Some((index, _)) = b.iter().enumerate().find(|(_, value)| !value.is_finite()) {
79 return Err(MemoryError::NonFiniteEmbeddingValue { index });
80 }
81 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
82 let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
83 let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
84 if norm_a == 0.0 || norm_b == 0.0 {
85 return Ok(0.0);
86 }
87 let similarity = dot / (norm_a * norm_b);
88 if !similarity.is_finite() {
89 return Err(MemoryError::Other(
90 "cosine similarity produced a non-finite score".to_string(),
91 ));
92 }
93 Ok(similarity)
94}
95
96fn days_since(timestamp: &str, evaluation_time: chrono::DateTime<chrono::Utc>) -> Option<f64> {
97 let dt = parse_search_timestamp(timestamp)?;
98 let duration = evaluation_time.naive_utc() - dt;
99 Some(duration.num_seconds() as f64 / 86_400.0)
100}
101
102fn parse_search_timestamp(timestamp: &str) -> Option<chrono::NaiveDateTime> {
103 if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(timestamp, "%Y-%m-%d %H:%M:%S") {
104 return Some(dt);
105 }
106 if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(timestamp, "%Y-%m-%d %H:%M:%S%.f") {
107 return Some(dt);
108 }
109 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(timestamp) {
110 return Some(dt.naive_utc());
111 }
112 tracing::warn!(
113 timestamp,
114 "failed to parse search timestamp for recency scoring; recency contribution dropped"
115 );
116 None
117}
118
119fn recency_contribution(
120 config: &SearchConfig,
121 context: &SearchContext,
122 updated_at: Option<&str>,
123 best_rank: Option<usize>,
124) -> Option<f64> {
125 match (config.recency_half_life_days, updated_at) {
126 (Some(half_life), Some(ts)) if half_life > 0.0 => {
127 let age_days = days_since(ts, context.evaluation_time).map(|days| days.max(0.0))?;
128 let decay = 2.0_f64.powf(-age_days / half_life);
129 let rank = best_rank.unwrap_or(1).max(1) as f64;
130 Some(config.recency_weight * decay / (config.rrf_k + rank))
131 }
132 _ => None,
133 }
134}
135
136pub(crate) fn search_result_id(source: &SearchSource) -> String {
137 match source {
138 SearchSource::Fact { fact_id, .. } => format!("fact:{fact_id}"),
139 SearchSource::Chunk { chunk_id, .. } => format!("chunk:{chunk_id}"),
140 SearchSource::Message { message_id, .. } => format!("msg:{message_id}"),
141 SearchSource::Episode { episode_id, .. } => format!("episode:{episode_id}"),
142 SearchSource::Projection { projection_id, .. } => format!("projection:{projection_id}"),
143 }
144}
145
146pub fn source_dedup_key(source: &SearchSource) -> (u8, String) {
147 match source {
148 SearchSource::Fact { fact_id, .. } => (0, fact_id.clone()),
149 SearchSource::Chunk { chunk_id, .. } => (1, chunk_id.clone()),
150 SearchSource::Message {
151 message_id,
152 session_id,
153 ..
154 } => (2, format!("{session_id}:{message_id}")),
155 SearchSource::Episode { episode_id, .. } => (3, episode_id.clone()),
156 SearchSource::Projection { projection_id, .. } => (4, projection_id.clone()),
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct Bm25Hit {
163 pub id: String,
165 pub content: String,
167 pub source: SearchSource,
169 pub raw_score: f64,
171 pub updated_at: Option<String>,
173}
174
175#[derive(Debug, Clone)]
177pub struct VectorHit {
178 pub id: String,
180 pub content: String,
182 pub source: SearchSource,
184 pub similarity: f64,
186 pub updated_at: Option<String>,
188 pub source_rank: Option<usize>,
190 pub source_similarity: Option<f64>,
192 pub reranked_from_f32: bool,
194}
195
196#[allow(dead_code)]
197struct VectorRow {
198 id: String,
199 content: String,
200 blob: Vec<u8>,
201 updated_at: Option<String>,
202 source_type: SearchSourceType,
203 filter_namespace: Option<String>,
204 filter_session_id: Option<String>,
205 source: SearchSource,
206}
207
208struct RrfCandidate {
209 content: String,
210 source: SearchSource,
211 updated_at: Option<String>,
212 bm25_score: Option<f64>,
213 bm25_rank: Option<usize>,
214 vector_score: Option<f64>,
215 vector_rank: Option<usize>,
216 vector_source_rank: Option<usize>,
217 vector_source_score: Option<f64>,
218 vector_reranked_from_f32: bool,
219}
220
221impl RrfCandidate {
222 fn explained(self, config: &SearchConfig, context: &SearchContext) -> ExplainedResult {
223 let bm25_contribution = self
224 .bm25_rank
225 .map(|rank| config.bm25_weight / (config.rrf_k + rank as f64));
226 let vector_contribution = self
227 .vector_rank
228 .map(|rank| config.vector_weight / (config.rrf_k + rank as f64));
229 let best_rank = match (self.bm25_rank, self.vector_rank) {
230 (Some(a), Some(b)) => Some(a.min(b)),
231 (Some(a), None) | (None, Some(a)) => Some(a),
232 (None, None) => None,
233 };
234 let recency_score =
235 recency_contribution(config, context, self.updated_at.as_deref(), best_rank);
236 let rrf_score = bm25_contribution.unwrap_or(0.0)
237 + vector_contribution.unwrap_or(0.0)
238 + recency_score.unwrap_or(0.0);
239
240 let breakdown = ScoreBreakdown {
241 rrf_score,
242 bm25_score: self.bm25_score,
243 vector_score: self.vector_score,
244 recency_score,
245 bm25_rank: self.bm25_rank,
246 vector_rank: self.vector_rank,
247 vector_source_rank: self.vector_source_rank,
248 vector_source_score: self.vector_source_score,
249 bm25_contribution,
250 vector_contribution,
251 vector_reranked_from_f32: self.vector_reranked_from_f32,
252 bm25_weight: config.bm25_weight,
253 vector_weight: config.vector_weight,
254 recency_weight: config.recency_half_life_days.map(|_| config.recency_weight),
255 rrf_k: config.rrf_k,
256 };
257
258 ExplainedResult {
259 result: SearchResult {
260 content: self.content,
261 source: self.source,
262 score: rrf_score,
263 bm25_rank: breakdown.bm25_rank,
264 vector_rank: breakdown.vector_rank,
265 cosine_similarity: breakdown.vector_score,
266 },
267 breakdown,
268 }
269 }
270}
271
272fn scan_vector_rows(
273 rows: impl Iterator<Item = Result<VectorRow, rusqlite::Error>>,
274 query_embedding: &[f32],
275 min_similarity: f64,
276 table_label: &str,
277) -> Result<(Vec<VectorHit>, usize), MemoryError> {
278 let expected_dims = query_embedding.len();
279 let mut hits = Vec::new();
280 let mut row_count = 0usize;
281 let warn_limit = VECTOR_SCAN_WARN_LIMIT.load(Ordering::Relaxed);
282 let hard_limit = VECTOR_SCAN_BLOCK_LIMIT.load(Ordering::Relaxed);
283
284 for row in rows {
285 let row = row?;
286 row_count += 1;
287 if warn_limit > 0 && row_count == warn_limit.saturating_add(1) {
288 tracing::warn!(
289 table = table_label,
290 count = row_count,
291 threshold = warn_limit,
292 "vector scan warning threshold exceeded"
293 );
294 }
295 if hard_limit > 0 && row_count > hard_limit {
296 return Err(MemoryError::VectorScanLimitExceeded {
297 table: table_label.to_string(),
298 scanned: row_count,
299 limit: hard_limit,
300 });
301 }
302
303 let stored_embedding = match crate::db::decode_f32_le(&row.blob, expected_dims) {
304 Ok(embedding) => embedding,
305 Err(error) => {
306 tracing::warn!(
307 error = %error,
308 table = table_label,
309 item = %row.id,
310 "Skipping row with invalid embedding blob"
311 );
312 continue;
313 }
314 };
315
316 if stored_embedding.len() != expected_dims {
317 tracing::warn!(
318 expected = expected_dims,
319 actual = stored_embedding.len(),
320 "Skipping {} with wrong embedding dimensions",
321 table_label
322 );
323 continue;
324 }
325
326 let similarity = cosine_similarity(query_embedding, &stored_embedding)? as f64;
327 if similarity >= min_similarity {
328 hits.push(VectorHit {
329 id: row.id,
330 content: row.content,
331 source: row.source,
332 similarity,
333 updated_at: row.updated_at,
334 source_rank: None,
335 source_similarity: None,
336 reranked_from_f32: false,
337 });
338 }
339 }
340
341 Ok((hits, row_count))
342}
343
344fn rank_vector_hits(mut hits: Vec<VectorHit>, pool_size: usize) -> Vec<VectorHit> {
345 hits.sort_by(|a, b| {
346 b.similarity.partial_cmp(&a.similarity).unwrap_or_else(|| {
347 if a.similarity.is_nan() {
348 std::cmp::Ordering::Greater
349 } else {
350 std::cmp::Ordering::Less
351 }
352 })
353 });
354
355 for (idx, hit) in hits.iter_mut().enumerate() {
356 hit.source_rank = Some(idx + 1);
357 hit.source_similarity = Some(hit.similarity);
358 }
359
360 hits.truncate(pool_size);
361 hits
362}
363
364pub(crate) fn bm25_search(
366 conn: &Connection,
367 sanitized_query: &str,
368 pool_size: usize,
369 namespaces: Option<&[&str]>,
370 source_types: Option<&[SearchSourceType]>,
371 session_ids: Option<&[&str]>,
372) -> Result<Vec<Bm25Hit>, MemoryError> {
373 let mut hits = Vec::new();
374
375 let search_facts = source_types
376 .map(|st| st.contains(&SearchSourceType::Facts))
377 .unwrap_or(true);
378 let search_chunks = source_types
379 .map(|st| st.contains(&SearchSourceType::Chunks))
380 .unwrap_or(true);
381 let search_messages = source_types
382 .map(|st| st.contains(&SearchSourceType::Messages))
383 .unwrap_or(false);
384 let search_episodes = source_types
385 .map(|st| st.contains(&SearchSourceType::Episodes))
386 .unwrap_or(true);
387
388 if search_facts {
389 let (ns_clause, ns_params) = build_filter_clause("f.namespace", namespaces, 3);
390 let sql = format!(
391 "SELECT fm.fact_id, f.content, f.namespace, bm25(facts_fts) AS score, f.updated_at
392 FROM facts_fts
393 JOIN facts_rowid_map fm ON facts_fts.rowid = fm.rowid
394 JOIN facts f ON f.id = fm.fact_id
395 WHERE facts_fts MATCH ?1 {}
396 ORDER BY score ASC
397 LIMIT ?2",
398 ns_clause
399 );
400
401 let mut params = vec![
402 SqlValue::Text(sanitized_query.to_string()),
403 SqlValue::Integer(pool_size as i64),
404 ];
405 params.extend(ns_params);
406
407 let mut stmt = conn.prepare(&sql)?;
408 let rows = stmt.query_map(rusqlite::params_from_iter(¶ms), |row| {
409 let fact_id: String = row.get(0)?;
410 let content: String = row.get(1)?;
411 let namespace: String = row.get(2)?;
412 let raw_score: f64 = row.get(3)?;
413 let updated_at: Option<String> = row.get(4)?;
414 Ok(Bm25Hit {
415 id: format!("fact:{fact_id}"),
416 content,
417 source: SearchSource::Fact { fact_id, namespace },
418 raw_score,
419 updated_at,
420 })
421 })?;
422
423 for row in rows {
424 hits.push(row?);
425 }
426 }
427
428 if search_chunks {
429 let (ns_clause, ns_params) = build_filter_clause("d.namespace", namespaces, 3);
430 let sql = format!(
431 "SELECT cm.chunk_id, c.content, c.document_id, d.title, c.chunk_index,
432 bm25(chunks_fts) AS score, c.created_at
433 FROM chunks_fts
434 JOIN chunks_rowid_map cm ON chunks_fts.rowid = cm.rowid
435 JOIN chunks c ON c.id = cm.chunk_id
436 JOIN documents d ON d.id = c.document_id
437 WHERE chunks_fts MATCH ?1 {}
438 ORDER BY score ASC
439 LIMIT ?2",
440 ns_clause
441 );
442
443 let mut params = vec![
444 SqlValue::Text(sanitized_query.to_string()),
445 SqlValue::Integer(pool_size as i64),
446 ];
447 params.extend(ns_params);
448
449 let mut stmt = conn.prepare(&sql)?;
450 let rows = stmt.query_map(rusqlite::params_from_iter(¶ms), |row| {
451 let chunk_id: String = row.get(0)?;
452 let content: String = row.get(1)?;
453 let document_id: String = row.get(2)?;
454 let document_title: String = row.get(3)?;
455 let chunk_index: i64 = row.get(4)?;
456 let raw_score: f64 = row.get(5)?;
457 let updated_at: Option<String> = row.get(6)?;
458 Ok(Bm25Hit {
459 id: format!("chunk:{chunk_id}"),
460 content,
461 source: SearchSource::Chunk {
462 chunk_id,
463 document_id,
464 document_title,
465 chunk_index: chunk_index as usize,
466 },
467 raw_score,
468 updated_at,
469 })
470 })?;
471
472 for row in rows {
473 hits.push(row?);
474 }
475 }
476
477 if search_messages {
478 let (sid_clause, sid_params) = build_filter_clause("m.session_id", session_ids, 3);
479 let sql = format!(
480 "SELECT mm.message_id, m.content, m.session_id, m.role,
481 bm25(messages_fts) AS score, m.created_at
482 FROM messages_fts
483 JOIN messages_rowid_map mm ON messages_fts.rowid = mm.rowid
484 JOIN messages m ON m.id = mm.message_id
485 WHERE messages_fts MATCH ?1 {}
486 ORDER BY score ASC
487 LIMIT ?2",
488 sid_clause
489 );
490
491 let mut params = vec![
492 SqlValue::Text(sanitized_query.to_string()),
493 SqlValue::Integer(pool_size as i64),
494 ];
495 params.extend(sid_params);
496
497 let mut stmt = conn.prepare(&sql)?;
498 let rows = stmt.query_map(rusqlite::params_from_iter(¶ms), |row| {
499 let message_id: i64 = row.get(0)?;
500 let content: String = row.get(1)?;
501 let session_id: String = row.get(2)?;
502 let role: String = row.get(3)?;
503 let raw_score: f64 = row.get(4)?;
504 let updated_at: Option<String> = row.get(5)?;
505 Ok(Bm25Hit {
506 id: format!("msg:{message_id}"),
507 content,
508 source: SearchSource::Message {
509 message_id,
510 session_id,
511 role,
512 },
513 raw_score,
514 updated_at,
515 })
516 })?;
517
518 for row in rows {
519 hits.push(row?);
520 }
521 }
522
523 if search_episodes {
524 let (ns_clause, ns_params) = build_filter_clause("d.namespace", namespaces, 3);
525 let sql = format!(
526 "SELECT e.episode_id, e.document_id, e.search_text, e.effect_type, e.outcome,
527 bm25(episodes_fts) AS score, e.updated_at
528 FROM episodes_fts
529 JOIN episodes_rowid_map rm ON episodes_fts.rowid = rm.rowid
530 JOIN episodes e ON e.episode_id = rm.episode_id
531 JOIN documents d ON d.id = e.document_id
532 WHERE episodes_fts MATCH ?1 {}
533 ORDER BY score ASC
534 LIMIT ?2",
535 ns_clause
536 );
537
538 let mut params = vec![
539 SqlValue::Text(sanitized_query.to_string()),
540 SqlValue::Integer(pool_size as i64),
541 ];
542 params.extend(ns_params);
543
544 let mut stmt = conn.prepare(&sql)?;
545 let rows = stmt.query_map(rusqlite::params_from_iter(¶ms), |row| {
546 let episode_id: String = row.get(0)?;
547 let document_id: String = row.get(1)?;
548 let content: String = row.get(2)?;
549 let effect_type: String = row.get(3)?;
550 let outcome: String = row.get(4)?;
551 let raw_score: f64 = row.get(5)?;
552 let updated_at: Option<String> = row.get(6)?;
553 Ok(Bm25Hit {
554 id: episodes::episode_item_key(&episode_id),
555 content,
556 source: SearchSource::Episode {
557 episode_id,
558 document_id,
559 effect_type,
560 outcome,
561 },
562 raw_score,
563 updated_at,
564 })
565 })?;
566
567 for row in rows {
568 hits.push(row?);
569 }
570 }
571
572 Ok(hits)
573}
574
575pub(crate) fn vector_search(
577 conn: &Connection,
578 query_embedding: &[f32],
579 pool_size: usize,
580 min_similarity: f64,
581 namespaces: Option<&[&str]>,
582 source_types: Option<&[SearchSourceType]>,
583 session_ids: Option<&[&str]>,
584) -> Result<Vec<VectorHit>, MemoryError> {
585 let mut hits = Vec::new();
586
587 let search_facts = source_types
588 .map(|st| st.contains(&SearchSourceType::Facts))
589 .unwrap_or(true);
590 let search_chunks = source_types
591 .map(|st| st.contains(&SearchSourceType::Chunks))
592 .unwrap_or(true);
593 let search_messages = source_types
594 .map(|st| st.contains(&SearchSourceType::Messages))
595 .unwrap_or(false);
596 let search_episodes = source_types
597 .map(|st| st.contains(&SearchSourceType::Episodes))
598 .unwrap_or(true);
599
600 if search_facts {
601 let (ns_clause, ns_params) = build_filter_clause("namespace", namespaces, 1);
602 let sql = format!(
603 "SELECT id, content, namespace, embedding, updated_at
604 FROM facts
605 WHERE embedding IS NOT NULL {}",
606 ns_clause
607 );
608
609 let mut stmt = conn.prepare(&sql)?;
610 let rows = stmt.query_map(rusqlite::params_from_iter(&ns_params), |row| {
611 let id: String = row.get(0)?;
612 let content: String = row.get(1)?;
613 let namespace: String = row.get(2)?;
614 let blob: Vec<u8> = row.get(3)?;
615 let updated_at: Option<String> = row.get(4)?;
616 Ok(VectorRow {
617 id: format!("fact:{id}"),
618 content,
619 blob,
620 updated_at,
621 source_type: SearchSourceType::Facts,
622 filter_namespace: Some(namespace.clone()),
623 filter_session_id: None,
624 source: SearchSource::Fact {
625 fact_id: id,
626 namespace,
627 },
628 })
629 })?;
630
631 let (fact_hits, fact_count) =
632 scan_vector_rows(rows, query_embedding, min_similarity, "fact")?;
633 hits.extend(fact_hits);
634
635 if vector_scan_warn_exceeded(fact_count) {
636 tracing::warn!(
637 count = fact_count,
638 "facts table exceeds vector scan threshold ({} rows)",
639 fact_count
640 );
641 }
642 }
643
644 if search_chunks {
645 let (ns_clause, ns_params) = build_filter_clause("d.namespace", namespaces, 1);
646 let sql = format!(
647 "SELECT c.id, c.content, c.document_id, d.title, c.chunk_index, c.embedding, c.created_at, d.namespace
648 FROM chunks c
649 JOIN documents d ON d.id = c.document_id
650 WHERE c.embedding IS NOT NULL {}",
651 ns_clause
652 );
653
654 let mut stmt = conn.prepare(&sql)?;
655 let rows = stmt.query_map(rusqlite::params_from_iter(&ns_params), |row| {
656 let id: String = row.get(0)?;
657 let content: String = row.get(1)?;
658 let document_id: String = row.get(2)?;
659 let document_title: String = row.get(3)?;
660 let chunk_index: i64 = row.get(4)?;
661 let blob: Vec<u8> = row.get(5)?;
662 let updated_at: Option<String> = row.get(6)?;
663 let namespace: String = row.get(7)?;
664 Ok(VectorRow {
665 id: format!("chunk:{id}"),
666 content,
667 blob,
668 updated_at,
669 source_type: SearchSourceType::Chunks,
670 filter_namespace: Some(namespace),
671 filter_session_id: None,
672 source: SearchSource::Chunk {
673 chunk_id: id,
674 document_id,
675 document_title,
676 chunk_index: chunk_index as usize,
677 },
678 })
679 })?;
680
681 let (chunk_hits, chunk_count) =
682 scan_vector_rows(rows, query_embedding, min_similarity, "chunk")?;
683 hits.extend(chunk_hits);
684
685 if vector_scan_warn_exceeded(chunk_count) {
686 tracing::warn!(
687 count = chunk_count,
688 "chunks table exceeds vector scan threshold ({} rows)",
689 chunk_count
690 );
691 }
692 }
693
694 if search_messages {
695 let (sid_clause, sid_params) = build_filter_clause("m.session_id", session_ids, 1);
696 let sql = format!(
697 "SELECT m.id, m.content, m.session_id, m.role, m.embedding, m.created_at
698 FROM messages m
699 WHERE m.embedding IS NOT NULL {}",
700 sid_clause
701 );
702
703 let mut stmt = conn.prepare(&sql)?;
704 let rows = stmt.query_map(rusqlite::params_from_iter(&sid_params), |row| {
705 let message_id: i64 = row.get(0)?;
706 let content: String = row.get(1)?;
707 let session_id: String = row.get(2)?;
708 let role: String = row.get(3)?;
709 let blob: Vec<u8> = row.get(4)?;
710 let updated_at: Option<String> = row.get(5)?;
711 Ok(VectorRow {
712 id: format!("msg:{message_id}"),
713 content,
714 blob,
715 updated_at,
716 source_type: SearchSourceType::Messages,
717 filter_namespace: None,
718 filter_session_id: Some(session_id.clone()),
719 source: SearchSource::Message {
720 message_id,
721 session_id,
722 role,
723 },
724 })
725 })?;
726
727 let (message_hits, message_count) =
728 scan_vector_rows(rows, query_embedding, min_similarity, "message")?;
729 hits.extend(message_hits);
730
731 if vector_scan_warn_exceeded(message_count) {
732 tracing::warn!(
733 count = message_count,
734 "messages table exceeds vector scan threshold ({} rows)",
735 message_count
736 );
737 }
738 }
739
740 if search_episodes {
741 let (ns_clause, ns_params) = build_filter_clause("d.namespace", namespaces, 1);
742 let sql = format!(
743 "SELECT e.episode_id, e.document_id, e.search_text, e.effect_type, e.outcome, e.embedding, e.updated_at, d.namespace
744 FROM episodes e
745 JOIN documents d ON d.id = e.document_id
746 WHERE e.embedding IS NOT NULL {}",
747 ns_clause
748 );
749
750 let mut stmt = conn.prepare(&sql)?;
751 let rows = stmt.query_map(rusqlite::params_from_iter(&ns_params), |row| {
752 let episode_id: String = row.get(0)?;
753 let document_id: String = row.get(1)?;
754 let content: String = row.get(2)?;
755 let effect_type: String = row.get(3)?;
756 let outcome: String = row.get(4)?;
757 let blob: Vec<u8> = row.get(5)?;
758 let updated_at: Option<String> = row.get(6)?;
759 let namespace: String = row.get(7)?;
760 Ok(VectorRow {
761 id: episodes::episode_item_key(&episode_id),
762 content,
763 blob,
764 updated_at,
765 source_type: SearchSourceType::Episodes,
766 filter_namespace: Some(namespace),
767 filter_session_id: None,
768 source: SearchSource::Episode {
769 episode_id,
770 document_id,
771 effect_type,
772 outcome,
773 },
774 })
775 })?;
776
777 let (episode_hits, episode_count) =
778 scan_vector_rows(rows, query_embedding, min_similarity, "episode")?;
779 hits.extend(episode_hits);
780
781 if vector_scan_warn_exceeded(episode_count) {
782 tracing::warn!(
783 count = episode_count,
784 "episodes table exceeds vector scan threshold ({} rows)",
785 episode_count
786 );
787 }
788 }
789
790 Ok(rank_vector_hits(hits, pool_size))
791}
792
793fn brute_force_vector_outcome(
794 conn: &Connection,
795 query_embedding: &[f32],
796 pool_size: usize,
797 min_similarity: f64,
798 namespaces: Option<&[&str]>,
799 source_types: Option<&[SearchSourceType]>,
800 session_ids: Option<&[&str]>,
801) -> Result<VectorSearchOutcome, MemoryError> {
802 let hits = vector_search(
803 conn,
804 query_embedding,
805 pool_size,
806 min_similarity,
807 namespaces,
808 source_types,
809 session_ids,
810 )?;
811 Ok(VectorSearchOutcome {
812 requested_candidates: pool_size,
813 returned_candidates: hits.len(),
814 post_filter_candidates: hits.len(),
815 hits,
816 candidate_backend: "brute_force_f32".to_string(),
817 fallback: None,
818 exact_rerank: true,
819 degradations: Vec::new(),
820 receipt_metadata: VectorReceiptMetadata::default(),
821 })
822}
823
824#[allow(clippy::too_many_arguments)]
825fn vector_search_with_backend(
826 conn: &Connection,
827 query_embedding: &[f32],
828 pool_size: usize,
829 min_similarity: f64,
830 config: &SearchConfig,
831 context: &SearchContext,
832 namespaces: Option<&[&str]>,
833 source_types: Option<&[SearchSourceType]>,
834 session_ids: Option<&[&str]>,
835) -> Result<VectorSearchOutcome, MemoryError> {
836 if context.exactness_profile == crate::types::ExactnessProfile::PreferExact {
837 return brute_force_vector_outcome(
838 conn,
839 query_embedding,
840 pool_size,
841 min_similarity,
842 namespaces,
843 source_types,
844 session_ids,
845 );
846 }
847
848 match config.derived_vector_backend {
849 DerivedVectorBackendPolicy::Disabled => brute_force_vector_outcome(
850 conn,
851 query_embedding,
852 pool_size,
853 min_similarity,
854 namespaces,
855 source_types,
856 session_ids,
857 ),
858 DerivedVectorBackendPolicy::TurboQuantCandidateOnly => turbo_quant_vector_outcome(
859 conn,
860 query_embedding,
861 pool_size,
862 min_similarity,
863 config,
864 namespaces,
865 source_types,
866 session_ids,
867 ),
868 }
869}
870
871#[cfg(not(feature = "turbo-quant-codec"))]
872#[allow(clippy::too_many_arguments)]
873fn turbo_quant_vector_outcome(
874 conn: &Connection,
875 query_embedding: &[f32],
876 pool_size: usize,
877 min_similarity: f64,
878 _config: &SearchConfig,
879 namespaces: Option<&[&str]>,
880 source_types: Option<&[SearchSourceType]>,
881 session_ids: Option<&[&str]>,
882) -> Result<VectorSearchOutcome, MemoryError> {
883 let mut outcome = brute_force_vector_outcome(
884 conn,
885 query_embedding,
886 pool_size,
887 min_similarity,
888 namespaces,
889 source_types,
890 session_ids,
891 )?;
892 outcome.candidate_backend = "turbo_quant_candidate_then_exact_f32".to_string();
893 outcome.fallback = Some("turbo_quant_feature_disabled".to_string());
894 outcome
895 .degradations
896 .push("TurboQuant backend requested without turbo-quant-codec feature".to_string());
897 Ok(outcome)
898}
899
900#[cfg(feature = "turbo-quant-codec")]
901#[allow(clippy::too_many_arguments)]
902fn turbo_quant_vector_outcome(
903 conn: &Connection,
904 query_embedding: &[f32],
905 pool_size: usize,
906 min_similarity: f64,
907 config: &SearchConfig,
908 namespaces: Option<&[&str]>,
909 source_types: Option<&[SearchSourceType]>,
910 session_ids: Option<&[&str]>,
911) -> Result<VectorSearchOutcome, MemoryError> {
912 use crate::vector_codec::{TurboQuantCodec, VectorArtifactV1, VectorCodec};
913
914 if !config.turbo_quant_require_exact_rerank {
915 return Err(MemoryError::InvalidConfig {
916 field: "search.turbo_quant_require_exact_rerank",
917 reason: "TurboQuant candidate backend requires exact f32 rerank".to_string(),
918 });
919 }
920
921 let dim = query_embedding.len();
922 let codec = TurboQuantCodec::new(
923 dim,
924 config.turbo_quant_bits,
925 config.turbo_quant_projections,
926 config.turbo_quant_seed,
927 )?;
928 let profile = codec.profile().clone();
929 let profile_digest = profile.digest();
930 let mut metadata = VectorReceiptMetadata {
931 codec_family: Some("turbo_quant".to_string()),
932 codec_profile_digest: Some(profile_digest.clone()),
933 ..VectorReceiptMetadata::default()
934 };
935
936 let filtered = namespaces.is_some_and(|values| !values.is_empty())
937 || source_types.is_some_and(|values| !values.is_empty())
938 || session_ids.is_some_and(|values| !values.is_empty());
939 metadata.filter_strategy = Some(if filtered {
940 "adaptive_oversampling_after_approximate_scoring".to_string()
941 } else {
942 "unfiltered_top_k_heap".to_string()
943 });
944
945 let raw_count = authoritative_vector_row_count(conn)?;
946 let (current_source_snapshot_digest, current_source_row_count) =
947 crate::db::current_source_snapshot_digest(conn, dim)?;
948 let Some(generation) =
949 crate::db::current_derived_vector_generation(conn, "turbo_quant", &profile_digest)?
950 else {
951 metadata.artifact_missing_count = Some(raw_count);
952 metadata.vector_artifact_missing_count = Some(raw_count);
953 let mut outcome = brute_force_vector_outcome(
954 conn,
955 query_embedding,
956 pool_size,
957 min_similarity,
958 namespaces,
959 source_types,
960 session_ids,
961 )?;
962 outcome.candidate_backend = "turbo_quant_candidate_then_exact_f32".to_string();
963 outcome.fallback = Some("turbo_quant_generation_missing_or_invalidated".to_string());
964 outcome.degradations.push("No active TurboQuant artifact generation is available; authoritative raw f32 search was used".to_string());
965 outcome.receipt_metadata = metadata;
966 return Ok(outcome);
967 };
968
969 metadata.artifact_generation_id = Some(generation.generation_id.clone());
970 metadata.vector_artifact_manifest_digest = Some(generation.artifact_manifest_digest.clone());
971 metadata.artifact_count = Some(generation.artifact_count);
972
973 let artifacts =
974 crate::db::load_derived_vector_artifacts_by_generation(conn, &generation.generation_id)?;
975 metadata.vector_artifact_count = Some(artifacts.len());
976
977 if generation.dim != dim
978 || generation.encoding != "turbo_code_wire_v1"
979 || generation.status != "active"
980 || generation.source_row_count != raw_count
981 || generation.source_row_count != current_source_row_count
982 || generation.source_snapshot_digest != current_source_snapshot_digest
983 || generation.artifact_count != artifacts.len()
984 {
985 let missing = raw_count.saturating_sub(artifacts.len());
986 metadata.artifact_missing_count = Some(missing);
987 metadata.vector_artifact_missing_count = Some(missing);
988 let mut outcome = brute_force_vector_outcome(
989 conn,
990 query_embedding,
991 pool_size,
992 min_similarity,
993 namespaces,
994 source_types,
995 session_ids,
996 )?;
997 outcome.candidate_backend = "turbo_quant_candidate_then_exact_f32".to_string();
998 outcome.fallback = Some("turbo_quant_generation_incomplete_or_stale".to_string());
999 outcome.degradations.push(format!(
1000 "TurboQuant generation validation failed: generation={}, status={}, dim={}, source_rows={}, artifacts={}, authoritative_rows={}, snapshot_current={}",
1001 generation.generation_id,
1002 generation.status,
1003 generation.dim,
1004 generation.source_row_count,
1005 artifacts.len(),
1006 raw_count,
1007 generation.source_snapshot_digest == current_source_snapshot_digest
1008 ));
1009 outcome.receipt_metadata = metadata;
1010 return Ok(outcome);
1011 }
1012
1013 let prepared = codec.prepare_query(query_embedding)?;
1014 let candidate_cap = if filtered {
1015 artifacts
1016 .len()
1017 .min(pool_size.saturating_mul(16).max(pool_size))
1018 } else {
1019 pool_size.min(artifacts.len())
1020 };
1021 let mut scored = BinaryHeap::with_capacity(candidate_cap.saturating_add(1));
1022 let mut corrupt_count = 0usize;
1023 let mut scanned_count = 0usize;
1024 for (seq, artifact_row) in artifacts.into_iter().enumerate() {
1025 scanned_count += 1;
1026 if artifact_row.encoding != "turbo_code_wire_v1"
1027 || artifact_row.dim != dim
1028 || artifact_row.status != "active"
1029 {
1030 corrupt_count += 1;
1031 continue;
1032 }
1033 let artifact = VectorArtifactV1::new(profile.clone(), artifact_row.encoded);
1034 if artifact.profile_digest != artifact_row.codec_profile_digest
1035 || artifact.artifact_digest != artifact_row.encoded_digest
1036 {
1037 corrupt_count += 1;
1038 continue;
1039 }
1040 let approx = match codec.score_inner_product_prepared(&artifact, &prepared) {
1041 Ok(score) if score.is_finite() => score as f64,
1042 Ok(_) => {
1043 corrupt_count += 1;
1044 continue;
1045 }
1046 Err(err) => {
1047 tracing::warn!(
1048 error = %err,
1049 item = %artifact_row.item_key,
1050 "corrupt TurboQuant artifact encountered; falling back to raw f32"
1051 );
1052 corrupt_count += 1;
1053 continue;
1054 }
1055 };
1056 if candidate_cap == 0 {
1057 continue;
1058 }
1059 let candidate = ApproxCandidate {
1060 score: approx,
1061 seq,
1062 item_key: artifact_row.item_key,
1063 };
1064 if scored.len() < candidate_cap {
1065 scored.push(candidate);
1066 } else if scored
1067 .peek()
1068 .is_some_and(|worst: &ApproxCandidate| candidate.score > worst.score)
1069 {
1070 scored.pop();
1071 scored.push(candidate);
1072 }
1073 }
1074
1075 metadata.artifact_corruption_count = Some(corrupt_count);
1076 metadata.approximate_scanned_count = Some(scanned_count);
1077 if corrupt_count > 0 {
1078 let mut outcome = brute_force_vector_outcome(
1079 conn,
1080 query_embedding,
1081 pool_size,
1082 min_similarity,
1083 namespaces,
1084 source_types,
1085 session_ids,
1086 )?;
1087 outcome.candidate_backend = "turbo_quant_candidate_then_exact_f32".to_string();
1088 outcome.fallback = Some("turbo_quant_artifact_validation_failed".to_string());
1089 outcome.degradations.push(format!(
1090 "TurboQuant artifact validation failed: {corrupt_count} corrupt artifacts in generation {}",
1091 generation.generation_id
1092 ));
1093 outcome.receipt_metadata = metadata;
1094 return Ok(outcome);
1095 }
1096
1097 let mut scored = scored.into_vec();
1098 scored.sort_by(|a, b| {
1099 b.score
1100 .partial_cmp(&a.score)
1101 .unwrap_or(std::cmp::Ordering::Equal)
1102 .then_with(|| a.seq.cmp(&b.seq))
1103 });
1104 let approximate_returned = scored.len();
1105 metadata.approximate_candidate_count = Some(approximate_returned);
1106 metadata.approximate_returned_count = Some(approximate_returned);
1107 let mut exact_hits = Vec::new();
1108 let mut raw_rows_loaded_count = 0usize;
1109 let mut missing_count = 0usize;
1110 for (approx_rank_0, candidate) in scored.into_iter().enumerate() {
1111 let Some(row) = load_vector_row_by_item_key(conn, &candidate.item_key)? else {
1112 missing_count += 1;
1113 continue;
1114 };
1115 raw_rows_loaded_count += 1;
1116 if !vector_row_matches_filters(&row, namespaces, source_types, session_ids) {
1117 continue;
1118 }
1119 let stored_embedding = crate::db::decode_f32_le(&row.blob, dim)?;
1120 let similarity = cosine_similarity(query_embedding, &stored_embedding)? as f64;
1121 if similarity >= min_similarity {
1122 exact_hits.push(VectorHit {
1123 id: row.id,
1124 content: row.content,
1125 source: row.source,
1126 similarity,
1127 updated_at: row.updated_at,
1128 source_rank: Some(approx_rank_0 + 1),
1129 source_similarity: Some(candidate.score),
1130 reranked_from_f32: true,
1131 });
1132 }
1133 }
1134 let post_filter_candidates = exact_hits.len();
1135 metadata.artifact_missing_count = Some(missing_count);
1136 metadata.vector_artifact_missing_count = Some(missing_count);
1137 metadata.vector_artifact_stale_count = Some(0);
1138 metadata.raw_rows_loaded_count = Some(raw_rows_loaded_count);
1139 metadata.exact_rerank_count = Some(raw_rows_loaded_count);
1140 let mut degradations = Vec::new();
1141 if filtered && post_filter_candidates < pool_size && candidate_cap < scanned_count {
1142 degradations.push(format!(
1143 "TurboQuant filter-aware candidate generation under-returned {post_filter_candidates} candidates for requested pool {pool_size} after scanning {scanned_count} artifacts with candidate budget {candidate_cap}"
1144 ));
1145 }
1146 if missing_count > 0 {
1147 degradations.push(format!(
1148 "TurboQuant exact rerank skipped {missing_count} candidates whose authoritative rows were missing"
1149 ));
1150 }
1151 let hits = rank_vector_hits(exact_hits, pool_size);
1152 Ok(VectorSearchOutcome {
1153 hits,
1154 candidate_backend: "turbo_quant_candidate_then_exact_f32".to_string(),
1155 requested_candidates: pool_size,
1156 returned_candidates: approximate_returned,
1157 post_filter_candidates,
1158 fallback: None,
1159 exact_rerank: true,
1160 degradations,
1161 receipt_metadata: metadata,
1162 })
1163}
1164
1165#[cfg(feature = "turbo-quant-codec")]
1166#[derive(Debug, Clone)]
1167struct ApproxCandidate {
1168 score: f64,
1169 seq: usize,
1170 item_key: String,
1171}
1172
1173#[cfg(feature = "turbo-quant-codec")]
1174impl PartialEq for ApproxCandidate {
1175 fn eq(&self, other: &Self) -> bool {
1176 self.score == other.score && self.seq == other.seq
1177 }
1178}
1179
1180#[cfg(feature = "turbo-quant-codec")]
1181impl Eq for ApproxCandidate {}
1182
1183#[cfg(feature = "turbo-quant-codec")]
1184impl PartialOrd for ApproxCandidate {
1185 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1186 Some(self.cmp(other))
1187 }
1188}
1189
1190#[cfg(feature = "turbo-quant-codec")]
1191impl Ord for ApproxCandidate {
1192 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1193 other
1194 .score
1195 .partial_cmp(&self.score)
1196 .unwrap_or(std::cmp::Ordering::Equal)
1197 .then_with(|| other.seq.cmp(&self.seq))
1198 }
1199}
1200
1201#[cfg(feature = "turbo-quant-codec")]
1202fn vector_row_matches_filters(
1203 row: &VectorRow,
1204 namespaces: Option<&[&str]>,
1205 source_types: Option<&[SearchSourceType]>,
1206 session_ids: Option<&[&str]>,
1207) -> bool {
1208 if source_types.is_some_and(|values| !values.contains(&row.source_type)) {
1209 return false;
1210 }
1211 if let Some(namespaces) = namespaces.filter(|values| !values.is_empty()) {
1212 let Some(namespace) = row.filter_namespace.as_deref() else {
1213 return false;
1214 };
1215 if !namespaces.contains(&namespace) {
1216 return false;
1217 }
1218 }
1219 if let Some(session_ids) = session_ids.filter(|values| !values.is_empty()) {
1220 let Some(session_id) = row.filter_session_id.as_deref() else {
1221 return false;
1222 };
1223 if !session_ids.contains(&session_id) {
1224 return false;
1225 }
1226 }
1227 true
1228}
1229
1230#[cfg(feature = "turbo-quant-codec")]
1231fn authoritative_vector_row_count(conn: &Connection) -> Result<usize, MemoryError> {
1232 let count: i64 = conn.query_row(
1233 "SELECT
1234 (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
1235 (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
1236 (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
1237 (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
1238 [],
1239 |row| row.get(0),
1240 )?;
1241 usize::try_from(count)
1242 .map_err(|err| MemoryError::Other(format!("authoritative vector count overflow: {err}")))
1243}
1244
1245#[cfg(feature = "turbo-quant-codec")]
1246fn load_vector_row_by_item_key(
1247 conn: &Connection,
1248 item_key: &str,
1249) -> Result<Option<VectorRow>, MemoryError> {
1250 let Some((domain, id)) = item_key.split_once(':') else {
1251 return Ok(None);
1252 };
1253 match domain {
1254 "fact" => conn
1255 .query_row(
1256 "SELECT id, content, namespace, embedding, updated_at
1257 FROM facts WHERE id = ?1 AND embedding IS NOT NULL",
1258 [id],
1259 |row| {
1260 let fact_id: String = row.get(0)?;
1261 let content: String = row.get(1)?;
1262 let namespace: String = row.get(2)?;
1263 let blob: Vec<u8> = row.get(3)?;
1264 let updated_at: Option<String> = row.get(4)?;
1265 Ok(VectorRow {
1266 id: format!("fact:{fact_id}"),
1267 content,
1268 blob,
1269 updated_at,
1270 source_type: SearchSourceType::Facts,
1271 filter_namespace: Some(namespace.clone()),
1272 filter_session_id: None,
1273 source: SearchSource::Fact { fact_id, namespace },
1274 })
1275 },
1276 )
1277 .optional()
1278 .map_err(MemoryError::from),
1279 "chunk" => conn
1280 .query_row(
1281 "SELECT c.id, c.content, c.document_id, d.title, c.chunk_index, c.embedding, c.created_at, d.namespace
1282 FROM chunks c
1283 JOIN documents d ON d.id = c.document_id
1284 WHERE c.id = ?1 AND c.embedding IS NOT NULL",
1285 [id],
1286 |row| {
1287 let chunk_id: String = row.get(0)?;
1288 let content: String = row.get(1)?;
1289 let document_id: String = row.get(2)?;
1290 let document_title: String = row.get(3)?;
1291 let chunk_index: i64 = row.get(4)?;
1292 let blob: Vec<u8> = row.get(5)?;
1293 let updated_at: Option<String> = row.get(6)?;
1294 let namespace: String = row.get(7)?;
1295 Ok(VectorRow {
1296 id: format!("chunk:{chunk_id}"),
1297 content,
1298 blob,
1299 updated_at,
1300 source_type: SearchSourceType::Chunks,
1301 filter_namespace: Some(namespace),
1302 filter_session_id: None,
1303 source: SearchSource::Chunk {
1304 chunk_id,
1305 document_id,
1306 document_title,
1307 chunk_index: chunk_index as usize,
1308 },
1309 })
1310 },
1311 )
1312 .optional()
1313 .map_err(MemoryError::from),
1314 "msg" => {
1315 let Ok(message_id) = id.parse::<i64>() else {
1316 return Ok(None);
1317 };
1318 conn.query_row(
1319 "SELECT id, content, session_id, role, embedding, created_at
1320 FROM messages WHERE id = ?1 AND embedding IS NOT NULL",
1321 [message_id],
1322 |row| {
1323 let message_id: i64 = row.get(0)?;
1324 let content: String = row.get(1)?;
1325 let session_id: String = row.get(2)?;
1326 let role: String = row.get(3)?;
1327 let blob: Vec<u8> = row.get(4)?;
1328 let updated_at: Option<String> = row.get(5)?;
1329 Ok(VectorRow {
1330 id: format!("msg:{message_id}"),
1331 content,
1332 blob,
1333 updated_at,
1334 source_type: SearchSourceType::Messages,
1335 filter_namespace: None,
1336 filter_session_id: Some(session_id.clone()),
1337 source: SearchSource::Message {
1338 message_id,
1339 session_id,
1340 role,
1341 },
1342 })
1343 },
1344 )
1345 .optional()
1346 .map_err(MemoryError::from)
1347 }
1348 "episode" => conn
1349 .query_row(
1350 "SELECT e.episode_id, e.document_id, e.search_text, e.effect_type, e.outcome, e.embedding, e.updated_at, d.namespace
1351 FROM episodes e
1352 JOIN documents d ON d.id = e.document_id
1353 WHERE e.episode_id = ?1 AND e.embedding IS NOT NULL",
1354 [id],
1355 |row| {
1356 let episode_id: String = row.get(0)?;
1357 let document_id: String = row.get(1)?;
1358 let content: String = row.get(2)?;
1359 let effect_type: String = row.get(3)?;
1360 let outcome: String = row.get(4)?;
1361 let blob: Vec<u8> = row.get(5)?;
1362 let updated_at: Option<String> = row.get(6)?;
1363 let namespace: String = row.get(7)?;
1364 Ok(VectorRow {
1365 id: episodes::episode_item_key(&episode_id),
1366 content,
1367 blob,
1368 updated_at,
1369 source_type: SearchSourceType::Episodes,
1370 filter_namespace: Some(namespace),
1371 filter_session_id: None,
1372 source: SearchSource::Episode {
1373 episode_id,
1374 document_id,
1375 effect_type,
1376 outcome,
1377 },
1378 })
1379 },
1380 )
1381 .optional()
1382 .map_err(MemoryError::from),
1383 _ => Ok(None),
1384 }
1385}
1386
1387fn vector_scan_warn_exceeded(count: usize) -> bool {
1388 let limit = VECTOR_SCAN_WARN_LIMIT.load(Ordering::Relaxed);
1389 limit > 0 && count > limit
1390}
1391
1392#[derive(Debug, Clone)]
1393pub(crate) struct SearchExecution {
1394 pub results: Vec<ExplainedResult>,
1395 pub receipt: Option<VectorSearchReceiptV1>,
1396}
1397
1398#[derive(Debug, Clone, Default)]
1399struct VectorReceiptMetadata {
1400 codec_family: Option<String>,
1401 codec_profile_digest: Option<String>,
1402 artifact_count: Option<usize>,
1403 artifact_corruption_count: Option<usize>,
1404 artifact_missing_count: Option<usize>,
1405 vector_artifact_manifest_digest: Option<String>,
1406 artifact_generation_id: Option<String>,
1407 approximate_scanned_count: Option<usize>,
1408 approximate_returned_count: Option<usize>,
1409 raw_rows_loaded_count: Option<usize>,
1410 filter_strategy: Option<String>,
1411 vector_artifact_count: Option<usize>,
1412 vector_artifact_missing_count: Option<usize>,
1413 vector_artifact_stale_count: Option<usize>,
1414 exact_rerank_count: Option<usize>,
1415 approximate_candidate_count: Option<usize>,
1416}
1417
1418#[derive(Debug, Clone)]
1419struct VectorSearchOutcome {
1420 hits: Vec<VectorHit>,
1421 candidate_backend: String,
1422 requested_candidates: usize,
1423 returned_candidates: usize,
1424 post_filter_candidates: usize,
1425 fallback: Option<String>,
1426 exact_rerank: bool,
1427 degradations: Vec<String>,
1428 receipt_metadata: VectorReceiptMetadata,
1429}
1430
1431fn rrf_fuse_detailed_with_context(
1432 bm25_hits: &[Bm25Hit],
1433 vector_hits: &[VectorHit],
1434 config: &SearchConfig,
1435 context: &SearchContext,
1436 top_k: usize,
1437) -> Vec<ExplainedResult> {
1438 let mut candidates: HashMap<(u8, String), RrfCandidate> = HashMap::new();
1440
1441 for (rank_0, hit) in bm25_hits.iter().enumerate() {
1442 let key = source_dedup_key(&hit.source);
1443 let rank = rank_0 + 1;
1444 candidates
1445 .entry(key)
1446 .and_modify(|candidate| {
1447 candidate.bm25_rank = Some(rank);
1448 candidate.bm25_score = Some(hit.raw_score);
1449 if candidate.updated_at.is_none() {
1450 candidate.updated_at = hit.updated_at.clone();
1451 }
1452 })
1453 .or_insert_with(|| RrfCandidate {
1454 content: hit.content.clone(),
1455 source: hit.source.clone(),
1456 updated_at: hit.updated_at.clone(),
1457 bm25_score: Some(hit.raw_score),
1458 bm25_rank: Some(rank),
1459 vector_score: None,
1460 vector_rank: None,
1461 vector_source_rank: None,
1462 vector_source_score: None,
1463 vector_reranked_from_f32: false,
1464 });
1465 }
1466
1467 for (rank_0, hit) in vector_hits.iter().enumerate() {
1468 let key = source_dedup_key(&hit.source);
1469 let rank = rank_0 + 1;
1470 candidates
1471 .entry(key)
1472 .and_modify(|candidate| {
1473 candidate.vector_rank = Some(rank);
1474 candidate.vector_score = Some(hit.similarity);
1475 candidate.vector_source_rank = hit.source_rank.or(Some(rank));
1476 candidate.vector_source_score = hit.source_similarity.or(Some(hit.similarity));
1477 candidate.vector_reranked_from_f32 = hit.reranked_from_f32;
1478 if candidate.updated_at.is_none() {
1479 candidate.updated_at = hit.updated_at.clone();
1480 }
1481 })
1482 .or_insert_with(|| RrfCandidate {
1483 content: hit.content.clone(),
1484 source: hit.source.clone(),
1485 updated_at: hit.updated_at.clone(),
1486 bm25_score: None,
1487 bm25_rank: None,
1488 vector_score: Some(hit.similarity),
1489 vector_rank: Some(rank),
1490 vector_source_rank: hit.source_rank.or(Some(rank)),
1491 vector_source_score: hit.source_similarity.or(Some(hit.similarity)),
1492 vector_reranked_from_f32: hit.reranked_from_f32,
1493 });
1494 }
1495
1496 let mut explained: Vec<ExplainedResult> = candidates
1497 .into_values()
1498 .map(|candidate| candidate.explained(config, context))
1499 .collect();
1500
1501 explained.sort_by(|a, b| {
1502 b.result
1503 .score
1504 .partial_cmp(&a.result.score)
1505 .unwrap_or(std::cmp::Ordering::Equal)
1506 .then_with(|| {
1507 source_dedup_key(&a.result.source).cmp(&source_dedup_key(&b.result.source))
1508 })
1509 });
1510 explained.truncate(top_k);
1511 explained
1512}
1513
1514fn rrf_fuse_detailed(
1515 bm25_hits: &[Bm25Hit],
1516 vector_hits: &[VectorHit],
1517 config: &SearchConfig,
1518 top_k: usize,
1519) -> Vec<ExplainedResult> {
1520 let context = SearchContext::default_now();
1521 rrf_fuse_detailed_with_context(bm25_hits, vector_hits, config, &context, top_k)
1522}
1523
1524pub fn rrf_fuse_with_context(
1525 bm25_hits: &[Bm25Hit],
1526 vector_hits: &[VectorHit],
1527 config: &SearchConfig,
1528 context: &SearchContext,
1529 top_k: usize,
1530) -> Vec<SearchResult> {
1531 rrf_fuse_detailed_with_context(bm25_hits, vector_hits, config, context, top_k)
1532 .into_iter()
1533 .map(|result| result.result)
1534 .collect()
1535}
1536
1537pub fn rrf_fuse(
1539 bm25_hits: &[Bm25Hit],
1540 vector_hits: &[VectorHit],
1541 config: &SearchConfig,
1542 top_k: usize,
1543) -> Vec<SearchResult> {
1544 rrf_fuse_detailed(bm25_hits, vector_hits, config, top_k)
1545 .into_iter()
1546 .map(|result| result.result)
1547 .collect()
1548}
1549
1550pub(crate) fn query_embedding_digest(query_embedding: &[f32]) -> String {
1551 let mut builder = DigestBuilder::new();
1552 builder
1553 .update_str("semantic-memory.query_embedding.v1")
1554 .separator()
1555 .update(&(query_embedding.len() as u64).to_le_bytes())
1556 .separator();
1557 for value in query_embedding {
1558 builder.update(&value.to_le_bytes());
1559 }
1560 format!("blake3:{}", builder.finalize().hex())
1561}
1562
1563#[allow(clippy::too_many_arguments)]
1564fn build_receipt(
1565 context: &SearchContext,
1566 query_embedding: &[f32],
1567 search_profile: &str,
1568 candidate_backend: &str,
1569 requested_candidates: usize,
1570 returned_candidates: usize,
1571 post_filter_candidates: usize,
1572 fallback: Option<String>,
1573 exact_rerank: bool,
1574 results: &[ExplainedResult],
1575 degradations: Vec<String>,
1576) -> Option<VectorSearchReceiptV1> {
1577 build_receipt_with_metadata(
1578 context,
1579 query_embedding,
1580 search_profile,
1581 candidate_backend,
1582 requested_candidates,
1583 returned_candidates,
1584 post_filter_candidates,
1585 fallback,
1586 exact_rerank,
1587 results,
1588 degradations,
1589 VectorReceiptMetadata::default(),
1590 )
1591}
1592
1593#[allow(clippy::too_many_arguments)]
1594fn build_receipt_with_metadata(
1595 context: &SearchContext,
1596 query_embedding: &[f32],
1597 search_profile: &str,
1598 candidate_backend: &str,
1599 requested_candidates: usize,
1600 returned_candidates: usize,
1601 post_filter_candidates: usize,
1602 fallback: Option<String>,
1603 exact_rerank: bool,
1604 results: &[ExplainedResult],
1605 degradations: Vec<String>,
1606 metadata: VectorReceiptMetadata,
1607) -> Option<VectorSearchReceiptV1> {
1608 if !context.receipts_enabled() {
1609 return None;
1610 }
1611 Some(VectorSearchReceiptV1 {
1612 schema_version: "vector_search_receipt_v1".to_string(),
1613 receipt_digest: None,
1614 receipt_id: context
1615 .request_id
1616 .clone()
1617 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1618 evaluation_time: context.evaluation_time,
1619 trace_id: context.trace_id.clone(),
1620 attempt_family_id: context.attempt_family_id.clone(),
1621 attempt_id: context.attempt_id.clone(),
1622 replay_of: context.replay_of.clone(),
1623 query_embedding_digest: Some(query_embedding_digest(query_embedding)),
1624 query_text_digest: context.query_text_digest.clone(),
1625 query_input_digest: context.query_input_digest.clone(),
1626 filter_digest: context.filter_digest.clone(),
1627 redaction_state: context.redaction_state.clone(),
1628 budget_id: context.budget_id.clone(),
1629 deadline_at: context.deadline_at,
1630 search_profile: search_profile.to_string(),
1631 candidate_backend: candidate_backend.to_string(),
1632 codec_family: metadata.codec_family.clone(),
1633 codec_profile_digest: metadata.codec_profile_digest.clone(),
1634 artifact_profile_digest: metadata.codec_profile_digest.clone(),
1635 artifact_count: metadata.artifact_count,
1636 artifact_corruption_count: metadata.artifact_corruption_count,
1637 artifact_missing_count: metadata.artifact_missing_count,
1638 vector_artifact_manifest_digest: metadata.vector_artifact_manifest_digest,
1639 artifact_generation_id: metadata.artifact_generation_id,
1640 approximate_scanned_count: metadata.approximate_scanned_count,
1641 approximate_returned_count: metadata.approximate_returned_count,
1642 raw_rows_loaded_count: metadata.raw_rows_loaded_count,
1643 filter_strategy: metadata.filter_strategy,
1644 vector_artifact_count: metadata.vector_artifact_count.or(metadata.artifact_count),
1645 vector_artifact_missing_count: metadata
1646 .vector_artifact_missing_count
1647 .or(metadata.artifact_missing_count),
1648 vector_artifact_stale_count: metadata.vector_artifact_stale_count,
1649 exact_rerank_count: metadata.exact_rerank_count.or(if exact_rerank {
1650 Some(post_filter_candidates)
1651 } else {
1652 None
1653 }),
1654 approximate_candidate_count: metadata.approximate_candidate_count,
1655 approximate: candidate_backend.contains("hnsw")
1656 || candidate_backend.contains("turbo_quant"),
1657 requested_candidates,
1658 returned_candidates,
1659 post_filter_candidates,
1660 fallback_reason: fallback.clone(),
1661 fallback,
1662 exact_rerank,
1663 result_ids: results
1664 .iter()
1665 .map(|result| search_result_id(&result.result.source))
1666 .collect(),
1667 degradations,
1668 })
1669}
1670
1671#[cfg(feature = "hnsw")]
1672fn filters_are_active(
1673 namespaces: Option<&[&str]>,
1674 source_types: Option<&[SearchSourceType]>,
1675 session_ids: Option<&[&str]>,
1676) -> bool {
1677 namespaces.is_some_and(|values| !values.is_empty())
1678 || source_types.is_some_and(|values| !values.is_empty())
1679 || session_ids.is_some_and(|values| !values.is_empty())
1680}
1681
1682#[allow(clippy::too_many_arguments)]
1683pub(crate) fn hybrid_search_detailed_with_context(
1684 conn: &Connection,
1685 query: &str,
1686 query_embedding: &[f32],
1687 config: &SearchConfig,
1688 context: &SearchContext,
1689 top_k: usize,
1690 namespaces: Option<&[&str]>,
1691 source_types: Option<&[SearchSourceType]>,
1692 session_ids: Option<&[&str]>,
1693) -> Result<SearchExecution, MemoryError> {
1694 let bm25_hits = match sanitize_fts_query(query) {
1695 Some(sanitized) => bm25_search(
1696 conn,
1697 &sanitized,
1698 config.candidate_pool_size,
1699 namespaces,
1700 source_types,
1701 session_ids,
1702 )?,
1703 None => Vec::new(),
1704 };
1705
1706 let vector_outcome = vector_search_with_backend(
1707 conn,
1708 query_embedding,
1709 config.candidate_pool_size,
1710 config.min_similarity,
1711 config,
1712 context,
1713 namespaces,
1714 source_types,
1715 session_ids,
1716 )?;
1717
1718 let results =
1719 rrf_fuse_detailed_with_context(&bm25_hits, &vector_outcome.hits, config, context, top_k);
1720 let receipt = build_receipt_with_metadata(
1721 context,
1722 query_embedding,
1723 "hybrid",
1724 &vector_outcome.candidate_backend,
1725 vector_outcome.requested_candidates,
1726 vector_outcome.returned_candidates,
1727 vector_outcome.post_filter_candidates,
1728 vector_outcome.fallback,
1729 vector_outcome.exact_rerank,
1730 &results,
1731 vector_outcome.degradations,
1732 vector_outcome.receipt_metadata,
1733 );
1734 Ok(SearchExecution { results, receipt })
1735}
1736
1737#[allow(clippy::too_many_arguments)]
1738pub(crate) fn hybrid_search_detailed(
1739 conn: &Connection,
1740 query: &str,
1741 query_embedding: &[f32],
1742 config: &SearchConfig,
1743 top_k: usize,
1744 namespaces: Option<&[&str]>,
1745 source_types: Option<&[SearchSourceType]>,
1746 session_ids: Option<&[&str]>,
1747) -> Result<Vec<ExplainedResult>, MemoryError> {
1748 let context = SearchContext::default_now();
1749 Ok(hybrid_search_detailed_with_context(
1750 conn,
1751 query,
1752 query_embedding,
1753 config,
1754 &context,
1755 top_k,
1756 namespaces,
1757 source_types,
1758 session_ids,
1759 )?
1760 .results)
1761}
1762
1763#[allow(clippy::too_many_arguments)]
1765pub fn hybrid_search_explained(
1766 conn: &Connection,
1767 query: &str,
1768 query_embedding: &[f32],
1769 config: &SearchConfig,
1770 top_k: usize,
1771 namespaces: Option<&[&str]>,
1772 source_types: Option<&[SearchSourceType]>,
1773 session_ids: Option<&[&str]>,
1774) -> Result<Vec<ExplainedResult>, MemoryError> {
1775 hybrid_search_detailed(
1776 conn,
1777 query,
1778 query_embedding,
1779 config,
1780 top_k,
1781 namespaces,
1782 source_types,
1783 session_ids,
1784 )
1785}
1786
1787#[allow(clippy::too_many_arguments)]
1789pub fn hybrid_search(
1790 conn: &Connection,
1791 query: &str,
1792 query_embedding: &[f32],
1793 config: &SearchConfig,
1794 top_k: usize,
1795 namespaces: Option<&[&str]>,
1796 source_types: Option<&[SearchSourceType]>,
1797 session_ids: Option<&[&str]>,
1798) -> Result<Vec<SearchResult>, MemoryError> {
1799 Ok(hybrid_search_detailed(
1800 conn,
1801 query,
1802 query_embedding,
1803 config,
1804 top_k,
1805 namespaces,
1806 source_types,
1807 session_ids,
1808 )?
1809 .into_iter()
1810 .map(|result| result.result)
1811 .collect())
1812}
1813
1814#[cfg(feature = "hnsw")]
1815#[derive(Clone)]
1816struct HnswCandidateSeed {
1817 source_rank: usize,
1818 source_similarity: f64,
1819}
1820
1821#[cfg(feature = "hnsw")]
1822#[allow(clippy::type_complexity)]
1823fn resolve_hnsw_hits_batched(
1824 conn: &Connection,
1825 query_embedding: &[f32],
1826 config: &SearchConfig,
1827 namespaces: Option<&[&str]>,
1828 source_types: Option<&[SearchSourceType]>,
1829 session_ids: Option<&[&str]>,
1830 hnsw_hits: &[crate::hnsw::HnswHit],
1831) -> Result<Vec<VectorHit>, MemoryError> {
1832 let search_facts = source_types
1833 .map(|st| st.contains(&SearchSourceType::Facts))
1834 .unwrap_or(true);
1835 let search_chunks = source_types
1836 .map(|st| st.contains(&SearchSourceType::Chunks))
1837 .unwrap_or(true);
1838 let search_messages = source_types
1839 .map(|st| st.contains(&SearchSourceType::Messages))
1840 .unwrap_or(false);
1841 let search_episodes = source_types
1842 .map(|st| st.contains(&SearchSourceType::Episodes))
1843 .unwrap_or(true);
1844
1845 let mut fact_entries: HashMap<String, HnswCandidateSeed> = HashMap::new();
1847 let mut chunk_entries: HashMap<String, HnswCandidateSeed> = HashMap::new();
1849 let mut message_entries: HashMap<i64, HnswCandidateSeed> = HashMap::new();
1851 let mut episode_entries: HashMap<String, HnswCandidateSeed> = HashMap::new();
1853
1854 for (rank_0, hit) in hnsw_hits.iter().enumerate() {
1855 let similarity = hit.similarity() as f64;
1856 if similarity < config.min_similarity {
1857 continue;
1858 }
1859
1860 let (domain, raw_id) = hit.parse_key()?;
1861 let seed = HnswCandidateSeed {
1862 source_rank: rank_0 + 1,
1863 source_similarity: similarity,
1864 };
1865
1866 match domain {
1867 "fact" if search_facts => {
1868 fact_entries.entry(raw_id.to_string()).or_insert(seed);
1869 }
1870 "chunk" if search_chunks => {
1871 chunk_entries.entry(raw_id.to_string()).or_insert(seed);
1872 }
1873 "msg" if search_messages => {
1874 if let Ok(message_id) = raw_id.parse::<i64>() {
1875 message_entries.entry(message_id).or_insert(seed);
1876 }
1877 }
1878 "episode" if search_episodes => {
1879 episode_entries.entry(raw_id.to_string()).or_insert(seed);
1880 }
1881 _ => {}
1882 }
1883 }
1884
1885 let mut hits = Vec::new();
1886 batch_load_fact_hits(
1887 conn,
1888 query_embedding,
1889 config,
1890 namespaces,
1891 &fact_entries,
1892 &mut hits,
1893 )?;
1894 batch_load_chunk_hits(
1895 conn,
1896 query_embedding,
1897 config,
1898 namespaces,
1899 &chunk_entries,
1900 &mut hits,
1901 )?;
1902 batch_load_message_hits(
1903 conn,
1904 query_embedding,
1905 config,
1906 session_ids,
1907 &message_entries,
1908 &mut hits,
1909 )?;
1910 batch_load_episode_hits(
1911 conn,
1912 query_embedding,
1913 config,
1914 namespaces,
1915 &episode_entries,
1916 &mut hits,
1917 )?;
1918
1919 hits.sort_by(|a, b| {
1920 b.similarity
1921 .partial_cmp(&a.similarity)
1922 .unwrap_or(std::cmp::Ordering::Equal)
1923 .then_with(|| {
1924 a.source_rank
1925 .unwrap_or(usize::MAX)
1926 .cmp(&b.source_rank.unwrap_or(usize::MAX))
1927 })
1928 });
1929 hits.truncate(config.candidate_pool_size);
1930 Ok(hits)
1931}
1932
1933#[cfg(feature = "hnsw")]
1934fn exact_similarity_from_blob(
1935 query_embedding: &[f32],
1936 blob: &[u8],
1937) -> Result<Option<f64>, MemoryError> {
1938 if blob.is_empty() {
1939 return Ok(None);
1940 }
1941 let stored = crate::db::bytes_to_embedding(blob)?;
1942 if stored.len() != query_embedding.len() {
1943 return Ok(None);
1944 }
1945 Ok(Some(cosine_similarity(query_embedding, &stored)? as f64))
1946}
1947
1948#[cfg(feature = "hnsw")]
1949#[allow(clippy::too_many_arguments)]
1950fn build_ranked_vector_hit(
1951 id: String,
1952 content: String,
1953 source: SearchSource,
1954 updated_at: Option<String>,
1955 embedding_blob: Option<Vec<u8>>,
1956 seed: &HnswCandidateSeed,
1957 query_embedding: &[f32],
1958 config: &SearchConfig,
1959) -> Result<Option<VectorHit>, MemoryError> {
1960 let similarity = if config.rerank_from_f32 {
1961 match embedding_blob {
1962 Some(blob) => exact_similarity_from_blob(query_embedding, &blob)?,
1963 None => None,
1964 }
1965 .unwrap_or(seed.source_similarity)
1966 } else {
1967 seed.source_similarity
1968 };
1969
1970 if similarity < config.min_similarity {
1971 return Ok(None);
1972 }
1973
1974 Ok(Some(VectorHit {
1975 id,
1976 content,
1977 source,
1978 similarity,
1979 updated_at,
1980 source_rank: Some(seed.source_rank),
1981 source_similarity: Some(seed.source_similarity),
1982 reranked_from_f32: config.rerank_from_f32,
1983 }))
1984}
1985
1986#[cfg(feature = "hnsw")]
1987fn batch_load_fact_hits(
1988 conn: &Connection,
1989 query_embedding: &[f32],
1990 config: &SearchConfig,
1991 namespaces: Option<&[&str]>,
1992 entries: &HashMap<String, HnswCandidateSeed>,
1994 output: &mut Vec<VectorHit>,
1995) -> Result<(), MemoryError> {
1996 if entries.is_empty() {
1997 return Ok(());
1998 }
1999
2000 let placeholders = (1..=entries.len())
2001 .map(|idx| format!("?{idx}"))
2002 .collect::<Vec<_>>()
2003 .join(", ");
2004 let sql = format!(
2005 "SELECT id, content, namespace, updated_at, embedding
2006 FROM facts
2007 WHERE id IN ({placeholders})"
2008 );
2009 let params: Vec<SqlValue> = entries
2010 .keys()
2011 .map(|id| SqlValue::Text(id.clone()))
2012 .collect();
2013 let mut stmt = conn.prepare(&sql)?;
2014 let rows = stmt.query_map(rusqlite::params_from_iter(¶ms), |row| {
2015 Ok((
2016 row.get::<_, String>(0)?,
2017 row.get::<_, String>(1)?,
2018 row.get::<_, String>(2)?,
2019 row.get::<_, Option<String>>(3)?,
2020 row.get::<_, Option<Vec<u8>>>(4)?,
2021 ))
2022 })?;
2023
2024 for row in rows {
2025 let (fact_id, content, namespace, updated_at, embedding_blob) = row?;
2026 if let Some(filter) = namespaces {
2027 if !filter.contains(&namespace.as_str()) {
2028 continue;
2029 }
2030 }
2031 if let Some(seed) = entries.get(&fact_id) {
2032 if let Some(hit) = build_ranked_vector_hit(
2033 format!("fact:{fact_id}"),
2034 content,
2035 SearchSource::Fact { fact_id, namespace },
2036 updated_at,
2037 embedding_blob,
2038 seed,
2039 query_embedding,
2040 config,
2041 )? {
2042 output.push(hit);
2043 }
2044 }
2045 }
2046
2047 Ok(())
2048}
2049
2050#[cfg(feature = "hnsw")]
2051fn batch_load_chunk_hits(
2052 conn: &Connection,
2053 query_embedding: &[f32],
2054 config: &SearchConfig,
2055 namespaces: Option<&[&str]>,
2056 entries: &HashMap<String, HnswCandidateSeed>,
2058 output: &mut Vec<VectorHit>,
2059) -> Result<(), MemoryError> {
2060 if entries.is_empty() {
2061 return Ok(());
2062 }
2063
2064 let placeholders = (1..=entries.len())
2065 .map(|idx| format!("?{idx}"))
2066 .collect::<Vec<_>>()
2067 .join(", ");
2068 let sql = format!(
2069 "SELECT c.id, c.content, c.document_id, d.title, c.chunk_index, c.created_at, d.namespace, c.embedding
2070 FROM chunks c
2071 JOIN documents d ON d.id = c.document_id
2072 WHERE c.id IN ({placeholders})"
2073 );
2074 let params: Vec<SqlValue> = entries
2075 .keys()
2076 .map(|id| SqlValue::Text(id.clone()))
2077 .collect();
2078 let mut stmt = conn.prepare(&sql)?;
2079 let rows = stmt.query_map(rusqlite::params_from_iter(¶ms), |row| {
2080 Ok((
2081 row.get::<_, String>(0)?,
2082 row.get::<_, String>(1)?,
2083 row.get::<_, String>(2)?,
2084 row.get::<_, String>(3)?,
2085 row.get::<_, i64>(4)?,
2086 row.get::<_, Option<String>>(5)?,
2087 row.get::<_, String>(6)?,
2088 row.get::<_, Option<Vec<u8>>>(7)?,
2089 ))
2090 })?;
2091
2092 for row in rows {
2093 let (
2094 chunk_id,
2095 content,
2096 document_id,
2097 document_title,
2098 chunk_index,
2099 updated_at,
2100 namespace,
2101 embedding_blob,
2102 ) = row?;
2103 if let Some(filter) = namespaces {
2104 if !filter.contains(&namespace.as_str()) {
2105 continue;
2106 }
2107 }
2108 if let Some(seed) = entries.get(&chunk_id) {
2109 if let Some(hit) = build_ranked_vector_hit(
2110 format!("chunk:{chunk_id}"),
2111 content,
2112 SearchSource::Chunk {
2113 chunk_id,
2114 document_id,
2115 document_title,
2116 chunk_index: chunk_index as usize,
2117 },
2118 updated_at,
2119 embedding_blob,
2120 seed,
2121 query_embedding,
2122 config,
2123 )? {
2124 output.push(hit);
2125 }
2126 }
2127 }
2128
2129 Ok(())
2130}
2131
2132#[cfg(feature = "hnsw")]
2133fn batch_load_message_hits(
2134 conn: &Connection,
2135 query_embedding: &[f32],
2136 config: &SearchConfig,
2137 session_ids: Option<&[&str]>,
2138 entries: &HashMap<i64, HnswCandidateSeed>,
2140 output: &mut Vec<VectorHit>,
2141) -> Result<(), MemoryError> {
2142 if entries.is_empty() {
2143 return Ok(());
2144 }
2145
2146 let placeholders = (1..=entries.len())
2147 .map(|idx| format!("?{idx}"))
2148 .collect::<Vec<_>>()
2149 .join(", ");
2150 let sql = format!(
2151 "SELECT id, content, session_id, role, created_at, embedding
2152 FROM messages
2153 WHERE id IN ({placeholders})"
2154 );
2155 let params: Vec<SqlValue> = entries.keys().map(|id| SqlValue::Integer(*id)).collect();
2156 let mut stmt = conn.prepare(&sql)?;
2157 let rows = stmt.query_map(rusqlite::params_from_iter(¶ms), |row| {
2158 Ok((
2159 row.get::<_, i64>(0)?,
2160 row.get::<_, String>(1)?,
2161 row.get::<_, String>(2)?,
2162 row.get::<_, String>(3)?,
2163 row.get::<_, Option<String>>(4)?,
2164 row.get::<_, Option<Vec<u8>>>(5)?,
2165 ))
2166 })?;
2167
2168 for row in rows {
2169 let (message_id, content, session_id, role, updated_at, embedding_blob) = row?;
2170 if let Some(filter) = session_ids {
2171 if !filter.contains(&session_id.as_str()) {
2172 continue;
2173 }
2174 }
2175 if let Some(seed) = entries.get(&message_id) {
2176 if let Some(hit) = build_ranked_vector_hit(
2177 format!("msg:{message_id}"),
2178 content,
2179 SearchSource::Message {
2180 message_id,
2181 session_id,
2182 role,
2183 },
2184 updated_at,
2185 embedding_blob,
2186 seed,
2187 query_embedding,
2188 config,
2189 )? {
2190 output.push(hit);
2191 }
2192 }
2193 }
2194
2195 Ok(())
2196}
2197
2198#[cfg(feature = "hnsw")]
2199fn batch_load_episode_hits(
2200 conn: &Connection,
2201 query_embedding: &[f32],
2202 config: &SearchConfig,
2203 namespaces: Option<&[&str]>,
2204 entries: &HashMap<String, HnswCandidateSeed>,
2206 output: &mut Vec<VectorHit>,
2207) -> Result<(), MemoryError> {
2208 if entries.is_empty() {
2209 return Ok(());
2210 }
2211
2212 let placeholders = (1..=entries.len())
2213 .map(|idx| format!("?{idx}"))
2214 .collect::<Vec<_>>()
2215 .join(", ");
2216 let sql = format!(
2217 "SELECT e.episode_id, e.document_id, e.search_text, e.effect_type, e.outcome, e.updated_at, d.namespace, e.embedding
2218 FROM episodes e
2219 JOIN documents d ON d.id = e.document_id
2220 WHERE e.episode_id IN ({placeholders})"
2221 );
2222 let params: Vec<SqlValue> = entries
2223 .keys()
2224 .map(|id| SqlValue::Text(id.clone()))
2225 .collect();
2226 let mut stmt = conn.prepare(&sql)?;
2227 let rows = stmt.query_map(rusqlite::params_from_iter(¶ms), |row| {
2228 Ok((
2229 row.get::<_, String>(0)?,
2230 row.get::<_, String>(1)?,
2231 row.get::<_, String>(2)?,
2232 row.get::<_, String>(3)?,
2233 row.get::<_, String>(4)?,
2234 row.get::<_, Option<String>>(5)?,
2235 row.get::<_, String>(6)?,
2236 row.get::<_, Option<Vec<u8>>>(7)?,
2237 ))
2238 })?;
2239
2240 for row in rows {
2241 let (
2242 episode_id,
2243 document_id,
2244 content,
2245 effect_type,
2246 outcome,
2247 updated_at,
2248 namespace,
2249 embedding_blob,
2250 ) = row?;
2251 if let Some(filter) = namespaces {
2252 if !filter.contains(&namespace.as_str()) {
2253 continue;
2254 }
2255 }
2256 if let Some(seed) = entries.get(&episode_id) {
2257 if let Some(hit) = build_ranked_vector_hit(
2258 episodes::episode_item_key(&episode_id),
2259 content,
2260 SearchSource::Episode {
2261 episode_id,
2262 document_id,
2263 effect_type,
2264 outcome,
2265 },
2266 updated_at,
2267 embedding_blob,
2268 seed,
2269 query_embedding,
2270 config,
2271 )? {
2272 output.push(hit);
2273 }
2274 }
2275 }
2276
2277 Ok(())
2278}
2279
2280#[cfg(feature = "hnsw")]
2282#[allow(clippy::too_many_arguments)]
2283pub fn hybrid_search_with_hnsw(
2284 conn: &Connection,
2285 query: &str,
2286 query_embedding: &[f32],
2287 config: &SearchConfig,
2288 top_k: usize,
2289 namespaces: Option<&[&str]>,
2290 source_types: Option<&[SearchSourceType]>,
2291 session_ids: Option<&[&str]>,
2292 hnsw_hits: &[crate::hnsw::HnswHit],
2293) -> Result<Vec<SearchResult>, MemoryError> {
2294 Ok(hybrid_search_with_hnsw_detailed(
2295 conn,
2296 query,
2297 query_embedding,
2298 config,
2299 top_k,
2300 namespaces,
2301 source_types,
2302 session_ids,
2303 hnsw_hits,
2304 )?
2305 .into_iter()
2306 .map(|result| result.result)
2307 .collect())
2308}
2309
2310#[cfg(feature = "hnsw")]
2311#[allow(clippy::too_many_arguments)]
2312pub(crate) fn hybrid_search_with_hnsw_detailed_with_context(
2313 conn: &Connection,
2314 query: &str,
2315 query_embedding: &[f32],
2316 config: &SearchConfig,
2317 context: &SearchContext,
2318 top_k: usize,
2319 namespaces: Option<&[&str]>,
2320 source_types: Option<&[SearchSourceType]>,
2321 session_ids: Option<&[&str]>,
2322 hnsw_hits: &[crate::hnsw::HnswHit],
2323) -> Result<SearchExecution, MemoryError> {
2324 let bm25_hits = match sanitize_fts_query(query) {
2325 Some(sanitized) => bm25_search(
2326 conn,
2327 &sanitized,
2328 config.candidate_pool_size,
2329 namespaces,
2330 source_types,
2331 session_ids,
2332 )?,
2333 None => Vec::new(),
2334 };
2335
2336 let mut vector_hits = resolve_hnsw_hits_batched(
2337 conn,
2338 query_embedding,
2339 config,
2340 namespaces,
2341 source_types,
2342 session_ids,
2343 hnsw_hits,
2344 )?;
2345 let mut fallback = None;
2346 let mut degradations = Vec::new();
2347 let mut backend = "hnsw";
2348 let mut exact_rerank = config.rerank_from_f32;
2349
2350 if !hnsw_hits.is_empty()
2351 && vector_hits.len() < top_k
2352 && filters_are_active(namespaces, source_types, session_ids)
2353 {
2354 fallback = Some("hnsw_filtered_underreturn_fallback".to_string());
2355 degradations.push(format!(
2356 "HNSW returned {} post-filter vector candidates for requested top_k {}; exact filtered fallback was used",
2357 vector_hits.len(),
2358 top_k
2359 ));
2360 vector_hits = vector_search(
2361 conn,
2362 query_embedding,
2363 config.candidate_pool_size,
2364 config.min_similarity,
2365 namespaces,
2366 source_types,
2367 session_ids,
2368 )?;
2369 backend = "hnsw_then_brute_force_f32";
2370 exact_rerank = true;
2371 }
2372
2373 let results = rrf_fuse_detailed_with_context(&bm25_hits, &vector_hits, config, context, top_k);
2374 let receipt = build_receipt(
2375 context,
2376 query_embedding,
2377 "hybrid",
2378 backend,
2379 config.candidate_pool_size,
2380 hnsw_hits.len(),
2381 vector_hits.len(),
2382 fallback,
2383 exact_rerank,
2384 &results,
2385 degradations,
2386 );
2387
2388 Ok(SearchExecution { results, receipt })
2389}
2390
2391#[cfg(feature = "hnsw")]
2392#[allow(clippy::too_many_arguments)]
2393pub(crate) fn hybrid_search_with_hnsw_detailed(
2394 conn: &Connection,
2395 query: &str,
2396 query_embedding: &[f32],
2397 config: &SearchConfig,
2398 top_k: usize,
2399 namespaces: Option<&[&str]>,
2400 source_types: Option<&[SearchSourceType]>,
2401 session_ids: Option<&[&str]>,
2402 hnsw_hits: &[crate::hnsw::HnswHit],
2403) -> Result<Vec<ExplainedResult>, MemoryError> {
2404 let context = SearchContext::default_now();
2405 Ok(hybrid_search_with_hnsw_detailed_with_context(
2406 conn,
2407 query,
2408 query_embedding,
2409 config,
2410 &context,
2411 top_k,
2412 namespaces,
2413 source_types,
2414 session_ids,
2415 hnsw_hits,
2416 )?
2417 .results)
2418}
2419
2420#[cfg(feature = "hnsw")]
2422#[allow(clippy::too_many_arguments)]
2423pub fn hybrid_search_explained_with_hnsw(
2424 conn: &Connection,
2425 query: &str,
2426 query_embedding: &[f32],
2427 config: &SearchConfig,
2428 top_k: usize,
2429 namespaces: Option<&[&str]>,
2430 source_types: Option<&[SearchSourceType]>,
2431 session_ids: Option<&[&str]>,
2432 hnsw_hits: &[crate::hnsw::HnswHit],
2433) -> Result<Vec<ExplainedResult>, MemoryError> {
2434 hybrid_search_with_hnsw_detailed(
2435 conn,
2436 query,
2437 query_embedding,
2438 config,
2439 top_k,
2440 namespaces,
2441 source_types,
2442 session_ids,
2443 hnsw_hits,
2444 )
2445}
2446
2447pub(crate) fn fts_only_search_detailed(
2448 conn: &Connection,
2449 query: &str,
2450 config: &SearchConfig,
2451 top_k: usize,
2452 namespaces: Option<&[&str]>,
2453 source_types: Option<&[SearchSourceType]>,
2454 session_ids: Option<&[&str]>,
2455) -> Result<Vec<ExplainedResult>, MemoryError> {
2456 let sanitized = match sanitize_fts_query(query) {
2457 Some(value) => value,
2458 None => return Ok(Vec::new()),
2459 };
2460 let bm25_hits = bm25_search(
2461 conn,
2462 &sanitized,
2463 top_k,
2464 namespaces,
2465 source_types,
2466 session_ids,
2467 )?;
2468 Ok(rrf_fuse_detailed(&bm25_hits, &[], config, top_k))
2469}
2470
2471pub fn fts_only_search(
2473 conn: &Connection,
2474 query: &str,
2475 config: &SearchConfig,
2476 top_k: usize,
2477 namespaces: Option<&[&str]>,
2478 source_types: Option<&[SearchSourceType]>,
2479 session_ids: Option<&[&str]>,
2480) -> Result<Vec<SearchResult>, MemoryError> {
2481 Ok(fts_only_search_detailed(
2482 conn,
2483 query,
2484 config,
2485 top_k,
2486 namespaces,
2487 source_types,
2488 session_ids,
2489 )?
2490 .into_iter()
2491 .map(|result| result.result)
2492 .collect())
2493}
2494
2495#[allow(clippy::too_many_arguments)]
2496pub(crate) fn vector_only_search_detailed_with_context(
2497 conn: &Connection,
2498 query_embedding: &[f32],
2499 config: &SearchConfig,
2500 context: &SearchContext,
2501 top_k: usize,
2502 namespaces: Option<&[&str]>,
2503 source_types: Option<&[SearchSourceType]>,
2504 session_ids: Option<&[&str]>,
2505) -> Result<SearchExecution, MemoryError> {
2506 let vector_outcome = vector_search_with_backend(
2507 conn,
2508 query_embedding,
2509 top_k,
2510 config.min_similarity,
2511 config,
2512 context,
2513 namespaces,
2514 source_types,
2515 session_ids,
2516 )?;
2517 let results = rrf_fuse_detailed_with_context(&[], &vector_outcome.hits, config, context, top_k);
2518 let receipt = build_receipt_with_metadata(
2519 context,
2520 query_embedding,
2521 "vector_only",
2522 &vector_outcome.candidate_backend,
2523 vector_outcome.requested_candidates,
2524 vector_outcome.returned_candidates,
2525 vector_outcome.post_filter_candidates,
2526 vector_outcome.fallback,
2527 vector_outcome.exact_rerank,
2528 &results,
2529 vector_outcome.degradations,
2530 vector_outcome.receipt_metadata,
2531 );
2532 Ok(SearchExecution { results, receipt })
2533}
2534
2535pub(crate) fn vector_only_search_detailed(
2536 conn: &Connection,
2537 query_embedding: &[f32],
2538 config: &SearchConfig,
2539 top_k: usize,
2540 namespaces: Option<&[&str]>,
2541 source_types: Option<&[SearchSourceType]>,
2542 session_ids: Option<&[&str]>,
2543) -> Result<Vec<ExplainedResult>, MemoryError> {
2544 let context = SearchContext::default_now();
2545 Ok(vector_only_search_detailed_with_context(
2546 conn,
2547 query_embedding,
2548 config,
2549 &context,
2550 top_k,
2551 namespaces,
2552 source_types,
2553 session_ids,
2554 )?
2555 .results)
2556}
2557
2558pub fn vector_only_search(
2560 conn: &Connection,
2561 query_embedding: &[f32],
2562 config: &SearchConfig,
2563 top_k: usize,
2564 namespaces: Option<&[&str]>,
2565 source_types: Option<&[SearchSourceType]>,
2566 session_ids: Option<&[&str]>,
2567) -> Result<Vec<SearchResult>, MemoryError> {
2568 Ok(vector_only_search_detailed(
2569 conn,
2570 query_embedding,
2571 config,
2572 top_k,
2573 namespaces,
2574 source_types,
2575 session_ids,
2576 )?
2577 .into_iter()
2578 .map(|result| result.result)
2579 .collect())
2580}
2581
2582#[cfg(test)]
2583mod digest_tests {
2584 use super::query_embedding_digest;
2585
2586 #[test]
2587 fn query_embedding_digest_includes_dimension_and_bytes() {
2588 let two_dims = query_embedding_digest(&[1.0, 2.0]);
2589 let three_dims = query_embedding_digest(&[1.0, 2.0, 0.0]);
2590 let changed_byte = query_embedding_digest(&[1.0, 2.000_001]);
2591
2592 assert!(two_dims.starts_with("blake3:"));
2593 assert_eq!(two_dims.len(), 71);
2594 assert_ne!(two_dims, three_dims);
2595 assert_ne!(two_dims, changed_byte);
2596 assert_eq!(two_dims, query_embedding_digest(&[1.0, 2.0]));
2597 }
2598}
2599
2600#[cfg(feature = "hnsw")]
2602#[allow(clippy::too_many_arguments)]
2603pub fn vector_only_search_with_hnsw(
2604 conn: &Connection,
2605 query_embedding: &[f32],
2606 config: &SearchConfig,
2607 top_k: usize,
2608 namespaces: Option<&[&str]>,
2609 source_types: Option<&[SearchSourceType]>,
2610 session_ids: Option<&[&str]>,
2611 hnsw_hits: &[crate::hnsw::HnswHit],
2612) -> Result<Vec<SearchResult>, MemoryError> {
2613 Ok(vector_only_search_with_hnsw_detailed(
2614 conn,
2615 query_embedding,
2616 config,
2617 top_k,
2618 namespaces,
2619 source_types,
2620 session_ids,
2621 hnsw_hits,
2622 )?
2623 .into_iter()
2624 .map(|result| result.result)
2625 .collect())
2626}
2627
2628#[cfg(feature = "hnsw")]
2629#[allow(clippy::too_many_arguments)]
2630pub(crate) fn vector_only_search_with_hnsw_detailed_with_context(
2631 conn: &Connection,
2632 query_embedding: &[f32],
2633 config: &SearchConfig,
2634 context: &SearchContext,
2635 top_k: usize,
2636 namespaces: Option<&[&str]>,
2637 source_types: Option<&[SearchSourceType]>,
2638 session_ids: Option<&[&str]>,
2639 hnsw_hits: &[crate::hnsw::HnswHit],
2640) -> Result<SearchExecution, MemoryError> {
2641 let mut vector_hits = resolve_hnsw_hits_batched(
2642 conn,
2643 query_embedding,
2644 config,
2645 namespaces,
2646 source_types,
2647 session_ids,
2648 hnsw_hits,
2649 )?;
2650 let mut fallback = None;
2651 let mut degradations = Vec::new();
2652 let mut backend = "hnsw";
2653 let mut exact_rerank = config.rerank_from_f32;
2654
2655 if !hnsw_hits.is_empty()
2656 && vector_hits.len() < top_k
2657 && filters_are_active(namespaces, source_types, session_ids)
2658 {
2659 fallback = Some("hnsw_filtered_underreturn_fallback".to_string());
2660 degradations.push(format!(
2661 "HNSW returned {} post-filter vector candidates for requested top_k {}; exact filtered fallback was used",
2662 vector_hits.len(),
2663 top_k
2664 ));
2665 vector_hits = vector_search(
2666 conn,
2667 query_embedding,
2668 top_k,
2669 config.min_similarity,
2670 namespaces,
2671 source_types,
2672 session_ids,
2673 )?;
2674 backend = "hnsw_then_brute_force_f32";
2675 exact_rerank = true;
2676 }
2677
2678 let results = rrf_fuse_detailed_with_context(&[], &vector_hits, config, context, top_k);
2679 let receipt = build_receipt(
2680 context,
2681 query_embedding,
2682 "vector_only",
2683 backend,
2684 top_k,
2685 hnsw_hits.len(),
2686 vector_hits.len(),
2687 fallback,
2688 exact_rerank,
2689 &results,
2690 degradations,
2691 );
2692 Ok(SearchExecution { results, receipt })
2693}
2694
2695#[cfg(feature = "hnsw")]
2696#[allow(clippy::too_many_arguments)]
2697pub(crate) fn vector_only_search_with_hnsw_detailed(
2698 conn: &Connection,
2699 query_embedding: &[f32],
2700 config: &SearchConfig,
2701 top_k: usize,
2702 namespaces: Option<&[&str]>,
2703 source_types: Option<&[SearchSourceType]>,
2704 session_ids: Option<&[&str]>,
2705 hnsw_hits: &[crate::hnsw::HnswHit],
2706) -> Result<Vec<ExplainedResult>, MemoryError> {
2707 let context = SearchContext::default_now();
2708 Ok(vector_only_search_with_hnsw_detailed_with_context(
2709 conn,
2710 query_embedding,
2711 config,
2712 &context,
2713 top_k,
2714 namespaces,
2715 source_types,
2716 session_ids,
2717 hnsw_hits,
2718 )?
2719 .results)
2720}
2721
2722fn build_filter_clause(
2723 column: &str,
2724 values: Option<&[&str]>,
2725 param_offset: usize,
2726) -> (String, Vec<SqlValue>) {
2727 match values {
2728 Some(values) if !values.is_empty() => {
2729 let placeholders = (0..values.len())
2730 .map(|idx| format!("?{}", param_offset + idx))
2731 .collect::<Vec<_>>();
2732 let clause = format!(" AND {} IN ({})", column, placeholders.join(", "));
2733 let params = values
2734 .iter()
2735 .map(|value| SqlValue::Text((*value).to_string()))
2736 .collect();
2737 (clause, params)
2738 }
2739 _ => (String::new(), Vec::new()),
2740 }
2741}
2742
2743pub fn deduplicate_results(results: Vec<SearchResult>) -> Vec<SearchResult> {
2745 let mut seen = HashSet::new();
2746 results
2747 .into_iter()
2748 .filter(|result| seen.insert(source_dedup_key(&result.source)))
2749 .collect()
2750}
2751
2752#[cfg(test)]
2753mod tests {
2754 use super::*;
2755
2756 fn vector_row(id: &str) -> VectorRow {
2757 VectorRow {
2758 id: id.to_string(),
2759 content: format!("content {id}"),
2760 blob: bytemuck::cast_slice(&[1.0_f32, 0.0]).to_vec(),
2761 updated_at: None,
2762 source_type: SearchSourceType::Facts,
2763 filter_namespace: Some("default".to_string()),
2764 filter_session_id: None,
2765 source: SearchSource::Fact {
2766 fact_id: id.to_string(),
2767 namespace: "default".to_string(),
2768 },
2769 }
2770 }
2771
2772 #[test]
2773 fn timestamp_parser_accepts_sql_fractional_and_rfc3339_and_warns_by_returning_none() {
2774 assert!(parse_search_timestamp("2026-05-07 12:34:56").is_some());
2775 assert!(parse_search_timestamp("2026-05-07 12:34:56.123").is_some());
2776 assert!(parse_search_timestamp("2026-05-07T12:34:56Z").is_some());
2777 assert!(parse_search_timestamp("not-a-timestamp").is_none());
2778 }
2779
2780 #[test]
2781 fn vector_scan_hard_limit_blocks_before_unbounded_scan() {
2782 let old_warn = VECTOR_SCAN_WARN_LIMIT.swap(1, Ordering::SeqCst);
2783 let old_hard = VECTOR_SCAN_BLOCK_LIMIT.swap(2, Ordering::SeqCst);
2784 let rows = ["a", "b", "c"].into_iter().map(|id| Ok(vector_row(id)));
2785 let result = scan_vector_rows(rows, &[1.0, 0.0], -1.0, "fact");
2786 VECTOR_SCAN_WARN_LIMIT.store(old_warn, Ordering::SeqCst);
2787 VECTOR_SCAN_BLOCK_LIMIT.store(old_hard, Ordering::SeqCst);
2788
2789 match result {
2790 Err(MemoryError::VectorScanLimitExceeded {
2791 table,
2792 scanned,
2793 limit,
2794 }) => {
2795 assert_eq!(table, "fact");
2796 assert_eq!(scanned, 3);
2797 assert_eq!(limit, 2);
2798 }
2799 other => panic!("expected vector scan limit error, got {other:?}"),
2800 }
2801 }
2802}