Skip to main content

semantic_memory/
search.rs

1//! Hybrid search engine: BM25 + vector similarity + Reciprocal Rank Fusion.
2
3use crate::config::{DerivedVectorBackendPolicy, SearchConfig};
4use crate::episodes;
5use crate::error::MemoryError;
6use crate::types::{
7    ExplainedResult, ScoreBreakdown, SearchContext, SearchResult, SearchSource, SearchSourceType,
8    VectorSearchReceiptV1,
9};
10use rusqlite::types::Value as SqlValue;
11use rusqlite::Connection;
12#[cfg(feature = "turbo-quant-codec")]
13use rusqlite::OptionalExtension;
14use stack_ids::DigestBuilder;
15#[cfg(feature = "turbo-quant-codec")]
16use std::collections::BinaryHeap;
17use std::collections::{HashMap, HashSet};
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20/// Per-table row count above which vector search emits a warning.
21const VECTOR_SCAN_WARN_THRESHOLD: usize = 50_000;
22/// Per-table row count above which brute-force vector search is refused.
23const VECTOR_SCAN_HARD_LIMIT: usize = 250_000;
24
25static VECTOR_SCAN_WARN_LIMIT: AtomicUsize = AtomicUsize::new(VECTOR_SCAN_WARN_THRESHOLD);
26static VECTOR_SCAN_BLOCK_LIMIT: AtomicUsize = AtomicUsize::new(VECTOR_SCAN_HARD_LIMIT);
27
28/// Sanitize a raw query string for safe use in an FTS5 MATCH expression.
29///
30/// Replaces any character that is not alphanumeric, whitespace, or a Unicode
31/// letter/digit with a space, then strips FTS5 boolean keywords (`AND`, `OR`,
32/// `NOT`, `NEAR`).  Returns `None` when no searchable tokens remain.
33///
34/// This uses an allowlist strategy so that *any* FTS5 operator or punctuation
35/// — including `?`, `.`, `/`, `!`, etc. — is neutralised without needing an
36/// exhaustive denylist.
37pub fn sanitize_fts_query(raw: &str) -> Option<String> {
38    let cleaned: String = raw
39        .chars()
40        .map(|c| {
41            if c.is_alphanumeric() || c.is_whitespace() || c == '_' {
42                c
43            } else {
44                ' '
45            }
46        })
47        .collect();
48
49    let tokens: Vec<&str> = cleaned
50        .split_whitespace()
51        .filter(|t| !matches!(t.to_uppercase().as_str(), "AND" | "OR" | "NOT" | "NEAR"))
52        .collect();
53
54    if tokens.is_empty() {
55        None
56    } else {
57        Some(
58            tokens
59                .into_iter()
60                .map(|token| format!("\"{}\"", token.replace('"', "\"\"")))
61                .collect::<Vec<_>>()
62                .join(" OR "),
63        )
64    }
65}
66
67/// Compute cosine similarity between two vectors.
68pub fn cosine_similarity(a: &[f32], b: &[f32]) -> Result<f32, MemoryError> {
69    if a.len() != b.len() {
70        return Err(MemoryError::EmbeddingDimensionMismatch {
71            expected: a.len(),
72            actual: b.len(),
73        });
74    }
75    if let Some((index, _)) = a.iter().enumerate().find(|(_, value)| !value.is_finite()) {
76        return Err(MemoryError::NonFiniteEmbeddingValue { index });
77    }
78    if let Some((index, _)) = b.iter().enumerate().find(|(_, value)| !value.is_finite()) {
79        return Err(MemoryError::NonFiniteEmbeddingValue { index });
80    }
81    let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
82    let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
83    let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
84    if norm_a == 0.0 || norm_b == 0.0 {
85        return Ok(0.0);
86    }
87    let similarity = dot / (norm_a * norm_b);
88    if !similarity.is_finite() {
89        return Err(MemoryError::Other(
90            "cosine similarity produced a non-finite score".to_string(),
91        ));
92    }
93    Ok(similarity)
94}
95
96fn days_since(timestamp: &str, evaluation_time: chrono::DateTime<chrono::Utc>) -> Option<f64> {
97    let dt = parse_search_timestamp(timestamp)?;
98    let duration = evaluation_time.naive_utc() - dt;
99    Some(duration.num_seconds() as f64 / 86_400.0)
100}
101
102fn parse_search_timestamp(timestamp: &str) -> Option<chrono::NaiveDateTime> {
103    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(timestamp, "%Y-%m-%d %H:%M:%S") {
104        return Some(dt);
105    }
106    if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(timestamp, "%Y-%m-%d %H:%M:%S%.f") {
107        return Some(dt);
108    }
109    if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(timestamp) {
110        return Some(dt.naive_utc());
111    }
112    tracing::warn!(
113        timestamp,
114        "failed to parse search timestamp for recency scoring; recency contribution dropped"
115    );
116    None
117}
118
119fn recency_contribution(
120    config: &SearchConfig,
121    context: &SearchContext,
122    updated_at: Option<&str>,
123    best_rank: Option<usize>,
124) -> Option<f64> {
125    match (config.recency_half_life_days, updated_at) {
126        (Some(half_life), Some(ts)) if half_life > 0.0 => {
127            let age_days = days_since(ts, context.evaluation_time).map(|days| days.max(0.0))?;
128            let decay = 2.0_f64.powf(-age_days / half_life);
129            let rank = best_rank.unwrap_or(1).max(1) as f64;
130            Some(config.recency_weight * decay / (config.rrf_k + rank))
131        }
132        _ => None,
133    }
134}
135
136pub(crate) fn search_result_id(source: &SearchSource) -> String {
137    match source {
138        SearchSource::Fact { fact_id, .. } => format!("fact:{fact_id}"),
139        SearchSource::Chunk { chunk_id, .. } => format!("chunk:{chunk_id}"),
140        SearchSource::Message { message_id, .. } => format!("msg:{message_id}"),
141        SearchSource::Episode { episode_id, .. } => format!("episode:{episode_id}"),
142        SearchSource::Projection { projection_id, .. } => format!("projection:{projection_id}"),
143    }
144}
145
146pub fn source_dedup_key(source: &SearchSource) -> (u8, String) {
147    match source {
148        SearchSource::Fact { fact_id, .. } => (0, fact_id.clone()),
149        SearchSource::Chunk { chunk_id, .. } => (1, chunk_id.clone()),
150        SearchSource::Message {
151            message_id,
152            session_id,
153            ..
154        } => (2, format!("{session_id}:{message_id}")),
155        SearchSource::Episode { episode_id, .. } => (3, episode_id.clone()),
156        SearchSource::Projection { projection_id, .. } => (4, projection_id.clone()),
157    }
158}
159
160/// A BM25 search hit from FTS5.
161#[derive(Debug, Clone)]
162pub struct Bm25Hit {
163    /// Search item key such as `fact:{uuid}` or `episode:{episode_id}`.
164    pub id: String,
165    /// Text content returned to callers.
166    pub content: String,
167    /// Source info.
168    pub source: SearchSource,
169    /// Raw BM25 score reported by SQLite FTS5.
170    pub raw_score: f64,
171    /// Timestamp used for recency scoring.
172    pub updated_at: Option<String>,
173}
174
175/// A vector search hit.
176#[derive(Debug, Clone)]
177pub struct VectorHit {
178    /// Search item key such as `fact:{uuid}` or `episode:{episode_id}`.
179    pub id: String,
180    /// Text content returned to callers.
181    pub content: String,
182    /// Source info.
183    pub source: SearchSource,
184    /// Final similarity used for vector ranking.
185    pub similarity: f64,
186    /// Timestamp used for recency scoring.
187    pub updated_at: Option<String>,
188    /// Rank from the underlying retrieval stage before exact reranking.
189    pub source_rank: Option<usize>,
190    /// Similarity from the underlying retrieval stage before exact reranking.
191    pub source_similarity: Option<f64>,
192    /// Whether exact f32 reranking changed or confirmed this candidate ordering.
193    pub reranked_from_f32: bool,
194}
195
196#[allow(dead_code)]
197struct VectorRow {
198    id: String,
199    content: String,
200    blob: Vec<u8>,
201    updated_at: Option<String>,
202    source_type: SearchSourceType,
203    filter_namespace: Option<String>,
204    filter_session_id: Option<String>,
205    source: SearchSource,
206}
207
208struct RrfCandidate {
209    content: String,
210    source: SearchSource,
211    updated_at: Option<String>,
212    bm25_score: Option<f64>,
213    bm25_rank: Option<usize>,
214    vector_score: Option<f64>,
215    vector_rank: Option<usize>,
216    vector_source_rank: Option<usize>,
217    vector_source_score: Option<f64>,
218    vector_reranked_from_f32: bool,
219}
220
221impl RrfCandidate {
222    fn explained(self, config: &SearchConfig, context: &SearchContext) -> ExplainedResult {
223        let bm25_contribution = self
224            .bm25_rank
225            .map(|rank| config.bm25_weight / (config.rrf_k + rank as f64));
226        let vector_contribution = self
227            .vector_rank
228            .map(|rank| config.vector_weight / (config.rrf_k + rank as f64));
229        let best_rank = match (self.bm25_rank, self.vector_rank) {
230            (Some(a), Some(b)) => Some(a.min(b)),
231            (Some(a), None) | (None, Some(a)) => Some(a),
232            (None, None) => None,
233        };
234        let recency_score =
235            recency_contribution(config, context, self.updated_at.as_deref(), best_rank);
236        let rrf_score = bm25_contribution.unwrap_or(0.0)
237            + vector_contribution.unwrap_or(0.0)
238            + recency_score.unwrap_or(0.0);
239
240        let breakdown = ScoreBreakdown {
241            rrf_score,
242            bm25_score: self.bm25_score,
243            vector_score: self.vector_score,
244            recency_score,
245            bm25_rank: self.bm25_rank,
246            vector_rank: self.vector_rank,
247            vector_source_rank: self.vector_source_rank,
248            vector_source_score: self.vector_source_score,
249            bm25_contribution,
250            vector_contribution,
251            vector_reranked_from_f32: self.vector_reranked_from_f32,
252            bm25_weight: config.bm25_weight,
253            vector_weight: config.vector_weight,
254            recency_weight: config.recency_half_life_days.map(|_| config.recency_weight),
255            rrf_k: config.rrf_k,
256        };
257
258        ExplainedResult {
259            result: SearchResult {
260                content: self.content,
261                source: self.source,
262                score: rrf_score,
263                bm25_rank: breakdown.bm25_rank,
264                vector_rank: breakdown.vector_rank,
265                cosine_similarity: breakdown.vector_score,
266            },
267            breakdown,
268        }
269    }
270}
271
272fn scan_vector_rows(
273    rows: impl Iterator<Item = Result<VectorRow, rusqlite::Error>>,
274    query_embedding: &[f32],
275    min_similarity: f64,
276    table_label: &str,
277) -> Result<(Vec<VectorHit>, usize), MemoryError> {
278    let expected_dims = query_embedding.len();
279    let mut hits = Vec::new();
280    let mut row_count = 0usize;
281    let warn_limit = VECTOR_SCAN_WARN_LIMIT.load(Ordering::Relaxed);
282    let hard_limit = VECTOR_SCAN_BLOCK_LIMIT.load(Ordering::Relaxed);
283
284    for row in rows {
285        let row = row?;
286        row_count += 1;
287        if warn_limit > 0 && row_count == warn_limit.saturating_add(1) {
288            tracing::warn!(
289                table = table_label,
290                count = row_count,
291                threshold = warn_limit,
292                "vector scan warning threshold exceeded"
293            );
294        }
295        if hard_limit > 0 && row_count > hard_limit {
296            return Err(MemoryError::VectorScanLimitExceeded {
297                table: table_label.to_string(),
298                scanned: row_count,
299                limit: hard_limit,
300            });
301        }
302
303        let stored_embedding = match crate::db::decode_f32_le(&row.blob, expected_dims) {
304            Ok(embedding) => embedding,
305            Err(error) => {
306                tracing::warn!(
307                    error = %error,
308                    table = table_label,
309                    item = %row.id,
310                    "Skipping row with invalid embedding blob"
311                );
312                continue;
313            }
314        };
315
316        if stored_embedding.len() != expected_dims {
317            tracing::warn!(
318                expected = expected_dims,
319                actual = stored_embedding.len(),
320                "Skipping {} with wrong embedding dimensions",
321                table_label
322            );
323            continue;
324        }
325
326        let similarity = cosine_similarity(query_embedding, &stored_embedding)? as f64;
327        if similarity >= min_similarity {
328            hits.push(VectorHit {
329                id: row.id,
330                content: row.content,
331                source: row.source,
332                similarity,
333                updated_at: row.updated_at,
334                source_rank: None,
335                source_similarity: None,
336                reranked_from_f32: false,
337            });
338        }
339    }
340
341    Ok((hits, row_count))
342}
343
344fn rank_vector_hits(mut hits: Vec<VectorHit>, pool_size: usize) -> Vec<VectorHit> {
345    hits.sort_by(|a, b| {
346        b.similarity.partial_cmp(&a.similarity).unwrap_or_else(|| {
347            if a.similarity.is_nan() {
348                std::cmp::Ordering::Greater
349            } else {
350                std::cmp::Ordering::Less
351            }
352        })
353    });
354
355    for (idx, hit) in hits.iter_mut().enumerate() {
356        hit.source_rank = Some(idx + 1);
357        hit.source_similarity = Some(hit.similarity);
358    }
359
360    hits.truncate(pool_size);
361    hits
362}
363
364/// Run BM25 search over facts_fts, chunks_fts, episodes_fts, and optionally messages_fts.
365pub(crate) fn bm25_search(
366    conn: &Connection,
367    sanitized_query: &str,
368    pool_size: usize,
369    namespaces: Option<&[&str]>,
370    source_types: Option<&[SearchSourceType]>,
371    session_ids: Option<&[&str]>,
372) -> Result<Vec<Bm25Hit>, MemoryError> {
373    let mut hits = Vec::new();
374
375    let search_facts = source_types
376        .map(|st| st.contains(&SearchSourceType::Facts))
377        .unwrap_or(true);
378    let search_chunks = source_types
379        .map(|st| st.contains(&SearchSourceType::Chunks))
380        .unwrap_or(true);
381    let search_messages = source_types
382        .map(|st| st.contains(&SearchSourceType::Messages))
383        .unwrap_or(false);
384    let search_episodes = source_types
385        .map(|st| st.contains(&SearchSourceType::Episodes))
386        .unwrap_or(true);
387
388    if search_facts {
389        let (ns_clause, ns_params) = build_filter_clause("f.namespace", namespaces, 3);
390        let sql = format!(
391            "SELECT fm.fact_id, f.content, f.namespace, bm25(facts_fts) AS score, f.updated_at
392             FROM facts_fts
393             JOIN facts_rowid_map fm ON facts_fts.rowid = fm.rowid
394             JOIN facts f ON f.id = fm.fact_id
395             WHERE facts_fts MATCH ?1 {}
396             ORDER BY score ASC
397             LIMIT ?2",
398            ns_clause
399        );
400
401        let mut params = vec![
402            SqlValue::Text(sanitized_query.to_string()),
403            SqlValue::Integer(pool_size as i64),
404        ];
405        params.extend(ns_params);
406
407        let mut stmt = conn.prepare(&sql)?;
408        let rows = stmt.query_map(rusqlite::params_from_iter(&params), |row| {
409            let fact_id: String = row.get(0)?;
410            let content: String = row.get(1)?;
411            let namespace: String = row.get(2)?;
412            let raw_score: f64 = row.get(3)?;
413            let updated_at: Option<String> = row.get(4)?;
414            Ok(Bm25Hit {
415                id: format!("fact:{fact_id}"),
416                content,
417                source: SearchSource::Fact { fact_id, namespace },
418                raw_score,
419                updated_at,
420            })
421        })?;
422
423        for row in rows {
424            hits.push(row?);
425        }
426    }
427
428    if search_chunks {
429        let (ns_clause, ns_params) = build_filter_clause("d.namespace", namespaces, 3);
430        let sql = format!(
431            "SELECT cm.chunk_id, c.content, c.document_id, d.title, c.chunk_index,
432                    bm25(chunks_fts) AS score, c.created_at
433             FROM chunks_fts
434             JOIN chunks_rowid_map cm ON chunks_fts.rowid = cm.rowid
435             JOIN chunks c ON c.id = cm.chunk_id
436             JOIN documents d ON d.id = c.document_id
437             WHERE chunks_fts MATCH ?1 {}
438             ORDER BY score ASC
439             LIMIT ?2",
440            ns_clause
441        );
442
443        let mut params = vec![
444            SqlValue::Text(sanitized_query.to_string()),
445            SqlValue::Integer(pool_size as i64),
446        ];
447        params.extend(ns_params);
448
449        let mut stmt = conn.prepare(&sql)?;
450        let rows = stmt.query_map(rusqlite::params_from_iter(&params), |row| {
451            let chunk_id: String = row.get(0)?;
452            let content: String = row.get(1)?;
453            let document_id: String = row.get(2)?;
454            let document_title: String = row.get(3)?;
455            let chunk_index: i64 = row.get(4)?;
456            let raw_score: f64 = row.get(5)?;
457            let updated_at: Option<String> = row.get(6)?;
458            Ok(Bm25Hit {
459                id: format!("chunk:{chunk_id}"),
460                content,
461                source: SearchSource::Chunk {
462                    chunk_id,
463                    document_id,
464                    document_title,
465                    chunk_index: chunk_index as usize,
466                },
467                raw_score,
468                updated_at,
469            })
470        })?;
471
472        for row in rows {
473            hits.push(row?);
474        }
475    }
476
477    if search_messages {
478        let (sid_clause, sid_params) = build_filter_clause("m.session_id", session_ids, 3);
479        let sql = format!(
480            "SELECT mm.message_id, m.content, m.session_id, m.role,
481                    bm25(messages_fts) AS score, m.created_at
482             FROM messages_fts
483             JOIN messages_rowid_map mm ON messages_fts.rowid = mm.rowid
484             JOIN messages m ON m.id = mm.message_id
485             WHERE messages_fts MATCH ?1 {}
486             ORDER BY score ASC
487             LIMIT ?2",
488            sid_clause
489        );
490
491        let mut params = vec![
492            SqlValue::Text(sanitized_query.to_string()),
493            SqlValue::Integer(pool_size as i64),
494        ];
495        params.extend(sid_params);
496
497        let mut stmt = conn.prepare(&sql)?;
498        let rows = stmt.query_map(rusqlite::params_from_iter(&params), |row| {
499            let message_id: i64 = row.get(0)?;
500            let content: String = row.get(1)?;
501            let session_id: String = row.get(2)?;
502            let role: String = row.get(3)?;
503            let raw_score: f64 = row.get(4)?;
504            let updated_at: Option<String> = row.get(5)?;
505            Ok(Bm25Hit {
506                id: format!("msg:{message_id}"),
507                content,
508                source: SearchSource::Message {
509                    message_id,
510                    session_id,
511                    role,
512                },
513                raw_score,
514                updated_at,
515            })
516        })?;
517
518        for row in rows {
519            hits.push(row?);
520        }
521    }
522
523    if search_episodes {
524        let (ns_clause, ns_params) = build_filter_clause("d.namespace", namespaces, 3);
525        let sql = format!(
526            "SELECT e.episode_id, e.document_id, e.search_text, e.effect_type, e.outcome,
527                    bm25(episodes_fts) AS score, e.updated_at
528             FROM episodes_fts
529             JOIN episodes_rowid_map rm ON episodes_fts.rowid = rm.rowid
530             JOIN episodes e ON e.episode_id = rm.episode_id
531             JOIN documents d ON d.id = e.document_id
532             WHERE episodes_fts MATCH ?1 {}
533             ORDER BY score ASC
534             LIMIT ?2",
535            ns_clause
536        );
537
538        let mut params = vec![
539            SqlValue::Text(sanitized_query.to_string()),
540            SqlValue::Integer(pool_size as i64),
541        ];
542        params.extend(ns_params);
543
544        let mut stmt = conn.prepare(&sql)?;
545        let rows = stmt.query_map(rusqlite::params_from_iter(&params), |row| {
546            let episode_id: String = row.get(0)?;
547            let document_id: String = row.get(1)?;
548            let content: String = row.get(2)?;
549            let effect_type: String = row.get(3)?;
550            let outcome: String = row.get(4)?;
551            let raw_score: f64 = row.get(5)?;
552            let updated_at: Option<String> = row.get(6)?;
553            Ok(Bm25Hit {
554                id: episodes::episode_item_key(&episode_id),
555                content,
556                source: SearchSource::Episode {
557                    episode_id,
558                    document_id,
559                    effect_type,
560                    outcome,
561                },
562                raw_score,
563                updated_at,
564            })
565        })?;
566
567        for row in rows {
568            hits.push(row?);
569        }
570    }
571
572    Ok(hits)
573}
574
575/// Run brute-force vector search over facts, chunks, episodes, and optionally messages.
576pub(crate) fn vector_search(
577    conn: &Connection,
578    query_embedding: &[f32],
579    pool_size: usize,
580    min_similarity: f64,
581    namespaces: Option<&[&str]>,
582    source_types: Option<&[SearchSourceType]>,
583    session_ids: Option<&[&str]>,
584) -> Result<Vec<VectorHit>, MemoryError> {
585    let mut hits = Vec::new();
586
587    let search_facts = source_types
588        .map(|st| st.contains(&SearchSourceType::Facts))
589        .unwrap_or(true);
590    let search_chunks = source_types
591        .map(|st| st.contains(&SearchSourceType::Chunks))
592        .unwrap_or(true);
593    let search_messages = source_types
594        .map(|st| st.contains(&SearchSourceType::Messages))
595        .unwrap_or(false);
596    let search_episodes = source_types
597        .map(|st| st.contains(&SearchSourceType::Episodes))
598        .unwrap_or(true);
599
600    if search_facts {
601        let (ns_clause, ns_params) = build_filter_clause("namespace", namespaces, 1);
602        let sql = format!(
603            "SELECT id, content, namespace, embedding, updated_at
604             FROM facts
605             WHERE embedding IS NOT NULL {}",
606            ns_clause
607        );
608
609        let mut stmt = conn.prepare(&sql)?;
610        let rows = stmt.query_map(rusqlite::params_from_iter(&ns_params), |row| {
611            let id: String = row.get(0)?;
612            let content: String = row.get(1)?;
613            let namespace: String = row.get(2)?;
614            let blob: Vec<u8> = row.get(3)?;
615            let updated_at: Option<String> = row.get(4)?;
616            Ok(VectorRow {
617                id: format!("fact:{id}"),
618                content,
619                blob,
620                updated_at,
621                source_type: SearchSourceType::Facts,
622                filter_namespace: Some(namespace.clone()),
623                filter_session_id: None,
624                source: SearchSource::Fact {
625                    fact_id: id,
626                    namespace,
627                },
628            })
629        })?;
630
631        let (fact_hits, fact_count) =
632            scan_vector_rows(rows, query_embedding, min_similarity, "fact")?;
633        hits.extend(fact_hits);
634
635        if vector_scan_warn_exceeded(fact_count) {
636            tracing::warn!(
637                count = fact_count,
638                "facts table exceeds vector scan threshold ({} rows)",
639                fact_count
640            );
641        }
642    }
643
644    if search_chunks {
645        let (ns_clause, ns_params) = build_filter_clause("d.namespace", namespaces, 1);
646        let sql = format!(
647            "SELECT c.id, c.content, c.document_id, d.title, c.chunk_index, c.embedding, c.created_at, d.namespace
648             FROM chunks c
649             JOIN documents d ON d.id = c.document_id
650             WHERE c.embedding IS NOT NULL {}",
651            ns_clause
652        );
653
654        let mut stmt = conn.prepare(&sql)?;
655        let rows = stmt.query_map(rusqlite::params_from_iter(&ns_params), |row| {
656            let id: String = row.get(0)?;
657            let content: String = row.get(1)?;
658            let document_id: String = row.get(2)?;
659            let document_title: String = row.get(3)?;
660            let chunk_index: i64 = row.get(4)?;
661            let blob: Vec<u8> = row.get(5)?;
662            let updated_at: Option<String> = row.get(6)?;
663            let namespace: String = row.get(7)?;
664            Ok(VectorRow {
665                id: format!("chunk:{id}"),
666                content,
667                blob,
668                updated_at,
669                source_type: SearchSourceType::Chunks,
670                filter_namespace: Some(namespace),
671                filter_session_id: None,
672                source: SearchSource::Chunk {
673                    chunk_id: id,
674                    document_id,
675                    document_title,
676                    chunk_index: chunk_index as usize,
677                },
678            })
679        })?;
680
681        let (chunk_hits, chunk_count) =
682            scan_vector_rows(rows, query_embedding, min_similarity, "chunk")?;
683        hits.extend(chunk_hits);
684
685        if vector_scan_warn_exceeded(chunk_count) {
686            tracing::warn!(
687                count = chunk_count,
688                "chunks table exceeds vector scan threshold ({} rows)",
689                chunk_count
690            );
691        }
692    }
693
694    if search_messages {
695        let (sid_clause, sid_params) = build_filter_clause("m.session_id", session_ids, 1);
696        let sql = format!(
697            "SELECT m.id, m.content, m.session_id, m.role, m.embedding, m.created_at
698             FROM messages m
699             WHERE m.embedding IS NOT NULL {}",
700            sid_clause
701        );
702
703        let mut stmt = conn.prepare(&sql)?;
704        let rows = stmt.query_map(rusqlite::params_from_iter(&sid_params), |row| {
705            let message_id: i64 = row.get(0)?;
706            let content: String = row.get(1)?;
707            let session_id: String = row.get(2)?;
708            let role: String = row.get(3)?;
709            let blob: Vec<u8> = row.get(4)?;
710            let updated_at: Option<String> = row.get(5)?;
711            Ok(VectorRow {
712                id: format!("msg:{message_id}"),
713                content,
714                blob,
715                updated_at,
716                source_type: SearchSourceType::Messages,
717                filter_namespace: None,
718                filter_session_id: Some(session_id.clone()),
719                source: SearchSource::Message {
720                    message_id,
721                    session_id,
722                    role,
723                },
724            })
725        })?;
726
727        let (message_hits, message_count) =
728            scan_vector_rows(rows, query_embedding, min_similarity, "message")?;
729        hits.extend(message_hits);
730
731        if vector_scan_warn_exceeded(message_count) {
732            tracing::warn!(
733                count = message_count,
734                "messages table exceeds vector scan threshold ({} rows)",
735                message_count
736            );
737        }
738    }
739
740    if search_episodes {
741        let (ns_clause, ns_params) = build_filter_clause("d.namespace", namespaces, 1);
742        let sql = format!(
743            "SELECT e.episode_id, e.document_id, e.search_text, e.effect_type, e.outcome, e.embedding, e.updated_at, d.namespace
744             FROM episodes e
745             JOIN documents d ON d.id = e.document_id
746             WHERE e.embedding IS NOT NULL {}",
747            ns_clause
748        );
749
750        let mut stmt = conn.prepare(&sql)?;
751        let rows = stmt.query_map(rusqlite::params_from_iter(&ns_params), |row| {
752            let episode_id: String = row.get(0)?;
753            let document_id: String = row.get(1)?;
754            let content: String = row.get(2)?;
755            let effect_type: String = row.get(3)?;
756            let outcome: String = row.get(4)?;
757            let blob: Vec<u8> = row.get(5)?;
758            let updated_at: Option<String> = row.get(6)?;
759            let namespace: String = row.get(7)?;
760            Ok(VectorRow {
761                id: episodes::episode_item_key(&episode_id),
762                content,
763                blob,
764                updated_at,
765                source_type: SearchSourceType::Episodes,
766                filter_namespace: Some(namespace),
767                filter_session_id: None,
768                source: SearchSource::Episode {
769                    episode_id,
770                    document_id,
771                    effect_type,
772                    outcome,
773                },
774            })
775        })?;
776
777        let (episode_hits, episode_count) =
778            scan_vector_rows(rows, query_embedding, min_similarity, "episode")?;
779        hits.extend(episode_hits);
780
781        if vector_scan_warn_exceeded(episode_count) {
782            tracing::warn!(
783                count = episode_count,
784                "episodes table exceeds vector scan threshold ({} rows)",
785                episode_count
786            );
787        }
788    }
789
790    Ok(rank_vector_hits(hits, pool_size))
791}
792
793fn brute_force_vector_outcome(
794    conn: &Connection,
795    query_embedding: &[f32],
796    pool_size: usize,
797    min_similarity: f64,
798    namespaces: Option<&[&str]>,
799    source_types: Option<&[SearchSourceType]>,
800    session_ids: Option<&[&str]>,
801) -> Result<VectorSearchOutcome, MemoryError> {
802    let hits = vector_search(
803        conn,
804        query_embedding,
805        pool_size,
806        min_similarity,
807        namespaces,
808        source_types,
809        session_ids,
810    )?;
811    Ok(VectorSearchOutcome {
812        requested_candidates: pool_size,
813        returned_candidates: hits.len(),
814        post_filter_candidates: hits.len(),
815        hits,
816        candidate_backend: "brute_force_f32".to_string(),
817        fallback: None,
818        exact_rerank: true,
819        degradations: Vec::new(),
820        receipt_metadata: VectorReceiptMetadata::default(),
821    })
822}
823
824#[allow(clippy::too_many_arguments)]
825fn vector_search_with_backend(
826    conn: &Connection,
827    query_embedding: &[f32],
828    pool_size: usize,
829    min_similarity: f64,
830    config: &SearchConfig,
831    context: &SearchContext,
832    namespaces: Option<&[&str]>,
833    source_types: Option<&[SearchSourceType]>,
834    session_ids: Option<&[&str]>,
835) -> Result<VectorSearchOutcome, MemoryError> {
836    if context.exactness_profile == crate::types::ExactnessProfile::PreferExact {
837        return brute_force_vector_outcome(
838            conn,
839            query_embedding,
840            pool_size,
841            min_similarity,
842            namespaces,
843            source_types,
844            session_ids,
845        );
846    }
847
848    match config.derived_vector_backend {
849        DerivedVectorBackendPolicy::Disabled => brute_force_vector_outcome(
850            conn,
851            query_embedding,
852            pool_size,
853            min_similarity,
854            namespaces,
855            source_types,
856            session_ids,
857        ),
858        DerivedVectorBackendPolicy::TurboQuantCandidateOnly => turbo_quant_vector_outcome(
859            conn,
860            query_embedding,
861            pool_size,
862            min_similarity,
863            config,
864            namespaces,
865            source_types,
866            session_ids,
867        ),
868    }
869}
870
871#[cfg(not(feature = "turbo-quant-codec"))]
872#[allow(clippy::too_many_arguments)]
873fn turbo_quant_vector_outcome(
874    conn: &Connection,
875    query_embedding: &[f32],
876    pool_size: usize,
877    min_similarity: f64,
878    _config: &SearchConfig,
879    namespaces: Option<&[&str]>,
880    source_types: Option<&[SearchSourceType]>,
881    session_ids: Option<&[&str]>,
882) -> Result<VectorSearchOutcome, MemoryError> {
883    let mut outcome = brute_force_vector_outcome(
884        conn,
885        query_embedding,
886        pool_size,
887        min_similarity,
888        namespaces,
889        source_types,
890        session_ids,
891    )?;
892    outcome.candidate_backend = "turbo_quant_candidate_then_exact_f32".to_string();
893    outcome.fallback = Some("turbo_quant_feature_disabled".to_string());
894    outcome
895        .degradations
896        .push("TurboQuant backend requested without turbo-quant-codec feature".to_string());
897    Ok(outcome)
898}
899
900#[cfg(feature = "turbo-quant-codec")]
901#[allow(clippy::too_many_arguments)]
902fn turbo_quant_vector_outcome(
903    conn: &Connection,
904    query_embedding: &[f32],
905    pool_size: usize,
906    min_similarity: f64,
907    config: &SearchConfig,
908    namespaces: Option<&[&str]>,
909    source_types: Option<&[SearchSourceType]>,
910    session_ids: Option<&[&str]>,
911) -> Result<VectorSearchOutcome, MemoryError> {
912    use crate::vector_codec::{TurboQuantCodec, VectorArtifactV1, VectorCodec};
913
914    if !config.turbo_quant_require_exact_rerank {
915        return Err(MemoryError::InvalidConfig {
916            field: "search.turbo_quant_require_exact_rerank",
917            reason: "TurboQuant candidate backend requires exact f32 rerank".to_string(),
918        });
919    }
920
921    let dim = query_embedding.len();
922    let codec = TurboQuantCodec::new(
923        dim,
924        config.turbo_quant_bits,
925        config.turbo_quant_projections,
926        config.turbo_quant_seed,
927    )?;
928    let profile = codec.profile().clone();
929    let profile_digest = profile.digest();
930    let mut metadata = VectorReceiptMetadata {
931        codec_family: Some("turbo_quant".to_string()),
932        codec_profile_digest: Some(profile_digest.clone()),
933        ..VectorReceiptMetadata::default()
934    };
935
936    let filtered = namespaces.is_some_and(|values| !values.is_empty())
937        || source_types.is_some_and(|values| !values.is_empty())
938        || session_ids.is_some_and(|values| !values.is_empty());
939    metadata.filter_strategy = Some(if filtered {
940        "adaptive_oversampling_after_approximate_scoring".to_string()
941    } else {
942        "unfiltered_top_k_heap".to_string()
943    });
944
945    let raw_count = authoritative_vector_row_count(conn)?;
946    let (current_source_snapshot_digest, current_source_row_count) =
947        crate::db::current_source_snapshot_digest(conn, dim)?;
948    let Some(generation) =
949        crate::db::current_derived_vector_generation(conn, "turbo_quant", &profile_digest)?
950    else {
951        metadata.artifact_missing_count = Some(raw_count);
952        metadata.vector_artifact_missing_count = Some(raw_count);
953        let mut outcome = brute_force_vector_outcome(
954            conn,
955            query_embedding,
956            pool_size,
957            min_similarity,
958            namespaces,
959            source_types,
960            session_ids,
961        )?;
962        outcome.candidate_backend = "turbo_quant_candidate_then_exact_f32".to_string();
963        outcome.fallback = Some("turbo_quant_generation_missing_or_invalidated".to_string());
964        outcome.degradations.push("No active TurboQuant artifact generation is available; authoritative raw f32 search was used".to_string());
965        outcome.receipt_metadata = metadata;
966        return Ok(outcome);
967    };
968
969    metadata.artifact_generation_id = Some(generation.generation_id.clone());
970    metadata.vector_artifact_manifest_digest = Some(generation.artifact_manifest_digest.clone());
971    metadata.artifact_count = Some(generation.artifact_count);
972
973    let artifacts =
974        crate::db::load_derived_vector_artifacts_by_generation(conn, &generation.generation_id)?;
975    metadata.vector_artifact_count = Some(artifacts.len());
976
977    if generation.dim != dim
978        || generation.encoding != "turbo_code_wire_v1"
979        || generation.status != "active"
980        || generation.source_row_count != raw_count
981        || generation.source_row_count != current_source_row_count
982        || generation.source_snapshot_digest != current_source_snapshot_digest
983        || generation.artifact_count != artifacts.len()
984    {
985        let missing = raw_count.saturating_sub(artifacts.len());
986        metadata.artifact_missing_count = Some(missing);
987        metadata.vector_artifact_missing_count = Some(missing);
988        let mut outcome = brute_force_vector_outcome(
989            conn,
990            query_embedding,
991            pool_size,
992            min_similarity,
993            namespaces,
994            source_types,
995            session_ids,
996        )?;
997        outcome.candidate_backend = "turbo_quant_candidate_then_exact_f32".to_string();
998        outcome.fallback = Some("turbo_quant_generation_incomplete_or_stale".to_string());
999        outcome.degradations.push(format!(
1000            "TurboQuant generation validation failed: generation={}, status={}, dim={}, source_rows={}, artifacts={}, authoritative_rows={}, snapshot_current={}",
1001            generation.generation_id,
1002            generation.status,
1003            generation.dim,
1004            generation.source_row_count,
1005            artifacts.len(),
1006            raw_count,
1007            generation.source_snapshot_digest == current_source_snapshot_digest
1008        ));
1009        outcome.receipt_metadata = metadata;
1010        return Ok(outcome);
1011    }
1012
1013    let prepared = codec.prepare_query(query_embedding)?;
1014    let candidate_cap = if filtered {
1015        artifacts
1016            .len()
1017            .min(pool_size.saturating_mul(16).max(pool_size))
1018    } else {
1019        pool_size.min(artifacts.len())
1020    };
1021    let mut scored = BinaryHeap::with_capacity(candidate_cap.saturating_add(1));
1022    let mut corrupt_count = 0usize;
1023    let mut scanned_count = 0usize;
1024    for (seq, artifact_row) in artifacts.into_iter().enumerate() {
1025        scanned_count += 1;
1026        if artifact_row.encoding != "turbo_code_wire_v1"
1027            || artifact_row.dim != dim
1028            || artifact_row.status != "active"
1029        {
1030            corrupt_count += 1;
1031            continue;
1032        }
1033        let artifact = VectorArtifactV1::new(profile.clone(), artifact_row.encoded);
1034        if artifact.profile_digest != artifact_row.codec_profile_digest
1035            || artifact.artifact_digest != artifact_row.encoded_digest
1036        {
1037            corrupt_count += 1;
1038            continue;
1039        }
1040        let approx = match codec.score_inner_product_prepared(&artifact, &prepared) {
1041            Ok(score) if score.is_finite() => score as f64,
1042            Ok(_) => {
1043                corrupt_count += 1;
1044                continue;
1045            }
1046            Err(err) => {
1047                tracing::warn!(
1048                    error = %err,
1049                    item = %artifact_row.item_key,
1050                    "corrupt TurboQuant artifact encountered; falling back to raw f32"
1051                );
1052                corrupt_count += 1;
1053                continue;
1054            }
1055        };
1056        if candidate_cap == 0 {
1057            continue;
1058        }
1059        let candidate = ApproxCandidate {
1060            score: approx,
1061            seq,
1062            item_key: artifact_row.item_key,
1063        };
1064        if scored.len() < candidate_cap {
1065            scored.push(candidate);
1066        } else if scored
1067            .peek()
1068            .is_some_and(|worst: &ApproxCandidate| candidate.score > worst.score)
1069        {
1070            scored.pop();
1071            scored.push(candidate);
1072        }
1073    }
1074
1075    metadata.artifact_corruption_count = Some(corrupt_count);
1076    metadata.approximate_scanned_count = Some(scanned_count);
1077    if corrupt_count > 0 {
1078        let mut outcome = brute_force_vector_outcome(
1079            conn,
1080            query_embedding,
1081            pool_size,
1082            min_similarity,
1083            namespaces,
1084            source_types,
1085            session_ids,
1086        )?;
1087        outcome.candidate_backend = "turbo_quant_candidate_then_exact_f32".to_string();
1088        outcome.fallback = Some("turbo_quant_artifact_validation_failed".to_string());
1089        outcome.degradations.push(format!(
1090            "TurboQuant artifact validation failed: {corrupt_count} corrupt artifacts in generation {}",
1091            generation.generation_id
1092        ));
1093        outcome.receipt_metadata = metadata;
1094        return Ok(outcome);
1095    }
1096
1097    let mut scored = scored.into_vec();
1098    scored.sort_by(|a, b| {
1099        b.score
1100            .partial_cmp(&a.score)
1101            .unwrap_or(std::cmp::Ordering::Equal)
1102            .then_with(|| a.seq.cmp(&b.seq))
1103    });
1104    let approximate_returned = scored.len();
1105    metadata.approximate_candidate_count = Some(approximate_returned);
1106    metadata.approximate_returned_count = Some(approximate_returned);
1107    let mut exact_hits = Vec::new();
1108    let mut raw_rows_loaded_count = 0usize;
1109    let mut missing_count = 0usize;
1110    for (approx_rank_0, candidate) in scored.into_iter().enumerate() {
1111        let Some(row) = load_vector_row_by_item_key(conn, &candidate.item_key)? else {
1112            missing_count += 1;
1113            continue;
1114        };
1115        raw_rows_loaded_count += 1;
1116        if !vector_row_matches_filters(&row, namespaces, source_types, session_ids) {
1117            continue;
1118        }
1119        let stored_embedding = crate::db::decode_f32_le(&row.blob, dim)?;
1120        let similarity = cosine_similarity(query_embedding, &stored_embedding)? as f64;
1121        if similarity >= min_similarity {
1122            exact_hits.push(VectorHit {
1123                id: row.id,
1124                content: row.content,
1125                source: row.source,
1126                similarity,
1127                updated_at: row.updated_at,
1128                source_rank: Some(approx_rank_0 + 1),
1129                source_similarity: Some(candidate.score),
1130                reranked_from_f32: true,
1131            });
1132        }
1133    }
1134    let post_filter_candidates = exact_hits.len();
1135    metadata.artifact_missing_count = Some(missing_count);
1136    metadata.vector_artifact_missing_count = Some(missing_count);
1137    metadata.vector_artifact_stale_count = Some(0);
1138    metadata.raw_rows_loaded_count = Some(raw_rows_loaded_count);
1139    metadata.exact_rerank_count = Some(raw_rows_loaded_count);
1140    let mut degradations = Vec::new();
1141    if filtered && post_filter_candidates < pool_size && candidate_cap < scanned_count {
1142        degradations.push(format!(
1143            "TurboQuant filter-aware candidate generation under-returned {post_filter_candidates} candidates for requested pool {pool_size} after scanning {scanned_count} artifacts with candidate budget {candidate_cap}"
1144        ));
1145    }
1146    if missing_count > 0 {
1147        degradations.push(format!(
1148            "TurboQuant exact rerank skipped {missing_count} candidates whose authoritative rows were missing"
1149        ));
1150    }
1151    let hits = rank_vector_hits(exact_hits, pool_size);
1152    Ok(VectorSearchOutcome {
1153        hits,
1154        candidate_backend: "turbo_quant_candidate_then_exact_f32".to_string(),
1155        requested_candidates: pool_size,
1156        returned_candidates: approximate_returned,
1157        post_filter_candidates,
1158        fallback: None,
1159        exact_rerank: true,
1160        degradations,
1161        receipt_metadata: metadata,
1162    })
1163}
1164
1165#[cfg(feature = "turbo-quant-codec")]
1166#[derive(Debug, Clone)]
1167struct ApproxCandidate {
1168    score: f64,
1169    seq: usize,
1170    item_key: String,
1171}
1172
1173#[cfg(feature = "turbo-quant-codec")]
1174impl PartialEq for ApproxCandidate {
1175    fn eq(&self, other: &Self) -> bool {
1176        self.score == other.score && self.seq == other.seq
1177    }
1178}
1179
1180#[cfg(feature = "turbo-quant-codec")]
1181impl Eq for ApproxCandidate {}
1182
1183#[cfg(feature = "turbo-quant-codec")]
1184impl PartialOrd for ApproxCandidate {
1185    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1186        Some(self.cmp(other))
1187    }
1188}
1189
1190#[cfg(feature = "turbo-quant-codec")]
1191impl Ord for ApproxCandidate {
1192    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1193        other
1194            .score
1195            .partial_cmp(&self.score)
1196            .unwrap_or(std::cmp::Ordering::Equal)
1197            .then_with(|| other.seq.cmp(&self.seq))
1198    }
1199}
1200
1201#[cfg(feature = "turbo-quant-codec")]
1202fn vector_row_matches_filters(
1203    row: &VectorRow,
1204    namespaces: Option<&[&str]>,
1205    source_types: Option<&[SearchSourceType]>,
1206    session_ids: Option<&[&str]>,
1207) -> bool {
1208    if source_types.is_some_and(|values| !values.contains(&row.source_type)) {
1209        return false;
1210    }
1211    if let Some(namespaces) = namespaces.filter(|values| !values.is_empty()) {
1212        let Some(namespace) = row.filter_namespace.as_deref() else {
1213            return false;
1214        };
1215        if !namespaces.contains(&namespace) {
1216            return false;
1217        }
1218    }
1219    if let Some(session_ids) = session_ids.filter(|values| !values.is_empty()) {
1220        let Some(session_id) = row.filter_session_id.as_deref() else {
1221            return false;
1222        };
1223        if !session_ids.contains(&session_id) {
1224            return false;
1225        }
1226    }
1227    true
1228}
1229
1230#[cfg(feature = "turbo-quant-codec")]
1231fn authoritative_vector_row_count(conn: &Connection) -> Result<usize, MemoryError> {
1232    let count: i64 = conn.query_row(
1233        "SELECT
1234             (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
1235             (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
1236             (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
1237             (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
1238        [],
1239        |row| row.get(0),
1240    )?;
1241    usize::try_from(count)
1242        .map_err(|err| MemoryError::Other(format!("authoritative vector count overflow: {err}")))
1243}
1244
1245#[cfg(feature = "turbo-quant-codec")]
1246fn load_vector_row_by_item_key(
1247    conn: &Connection,
1248    item_key: &str,
1249) -> Result<Option<VectorRow>, MemoryError> {
1250    let Some((domain, id)) = item_key.split_once(':') else {
1251        return Ok(None);
1252    };
1253    match domain {
1254        "fact" => conn
1255            .query_row(
1256                "SELECT id, content, namespace, embedding, updated_at
1257                 FROM facts WHERE id = ?1 AND embedding IS NOT NULL",
1258                [id],
1259                |row| {
1260                    let fact_id: String = row.get(0)?;
1261                    let content: String = row.get(1)?;
1262                    let namespace: String = row.get(2)?;
1263                    let blob: Vec<u8> = row.get(3)?;
1264                    let updated_at: Option<String> = row.get(4)?;
1265                    Ok(VectorRow {
1266                        id: format!("fact:{fact_id}"),
1267                        content,
1268                        blob,
1269                        updated_at,
1270                        source_type: SearchSourceType::Facts,
1271                        filter_namespace: Some(namespace.clone()),
1272                        filter_session_id: None,
1273                        source: SearchSource::Fact { fact_id, namespace },
1274                    })
1275                },
1276            )
1277            .optional()
1278            .map_err(MemoryError::from),
1279        "chunk" => conn
1280            .query_row(
1281                "SELECT c.id, c.content, c.document_id, d.title, c.chunk_index, c.embedding, c.created_at, d.namespace
1282                 FROM chunks c
1283                 JOIN documents d ON d.id = c.document_id
1284                 WHERE c.id = ?1 AND c.embedding IS NOT NULL",
1285                [id],
1286                |row| {
1287                    let chunk_id: String = row.get(0)?;
1288                    let content: String = row.get(1)?;
1289                    let document_id: String = row.get(2)?;
1290                    let document_title: String = row.get(3)?;
1291                    let chunk_index: i64 = row.get(4)?;
1292                    let blob: Vec<u8> = row.get(5)?;
1293                    let updated_at: Option<String> = row.get(6)?;
1294                    let namespace: String = row.get(7)?;
1295                    Ok(VectorRow {
1296                        id: format!("chunk:{chunk_id}"),
1297                        content,
1298                        blob,
1299                        updated_at,
1300                        source_type: SearchSourceType::Chunks,
1301                        filter_namespace: Some(namespace),
1302                        filter_session_id: None,
1303                        source: SearchSource::Chunk {
1304                            chunk_id,
1305                            document_id,
1306                            document_title,
1307                            chunk_index: chunk_index as usize,
1308                        },
1309                    })
1310                },
1311            )
1312            .optional()
1313            .map_err(MemoryError::from),
1314        "msg" => {
1315            let Ok(message_id) = id.parse::<i64>() else {
1316                return Ok(None);
1317            };
1318            conn.query_row(
1319                "SELECT id, content, session_id, role, embedding, created_at
1320                 FROM messages WHERE id = ?1 AND embedding IS NOT NULL",
1321                [message_id],
1322                |row| {
1323                    let message_id: i64 = row.get(0)?;
1324                    let content: String = row.get(1)?;
1325                    let session_id: String = row.get(2)?;
1326                    let role: String = row.get(3)?;
1327                    let blob: Vec<u8> = row.get(4)?;
1328                    let updated_at: Option<String> = row.get(5)?;
1329                    Ok(VectorRow {
1330                        id: format!("msg:{message_id}"),
1331                        content,
1332                        blob,
1333                        updated_at,
1334                        source_type: SearchSourceType::Messages,
1335                        filter_namespace: None,
1336                        filter_session_id: Some(session_id.clone()),
1337                        source: SearchSource::Message {
1338                            message_id,
1339                            session_id,
1340                            role,
1341                        },
1342                    })
1343                },
1344            )
1345            .optional()
1346            .map_err(MemoryError::from)
1347        }
1348        "episode" => conn
1349            .query_row(
1350                "SELECT e.episode_id, e.document_id, e.search_text, e.effect_type, e.outcome, e.embedding, e.updated_at, d.namespace
1351                 FROM episodes e
1352                 JOIN documents d ON d.id = e.document_id
1353                 WHERE e.episode_id = ?1 AND e.embedding IS NOT NULL",
1354                [id],
1355                |row| {
1356                    let episode_id: String = row.get(0)?;
1357                    let document_id: String = row.get(1)?;
1358                    let content: String = row.get(2)?;
1359                    let effect_type: String = row.get(3)?;
1360                    let outcome: String = row.get(4)?;
1361                    let blob: Vec<u8> = row.get(5)?;
1362                    let updated_at: Option<String> = row.get(6)?;
1363                    let namespace: String = row.get(7)?;
1364                    Ok(VectorRow {
1365                        id: episodes::episode_item_key(&episode_id),
1366                        content,
1367                        blob,
1368                        updated_at,
1369                        source_type: SearchSourceType::Episodes,
1370                        filter_namespace: Some(namespace),
1371                        filter_session_id: None,
1372                        source: SearchSource::Episode {
1373                            episode_id,
1374                            document_id,
1375                            effect_type,
1376                            outcome,
1377                        },
1378                    })
1379                },
1380            )
1381            .optional()
1382            .map_err(MemoryError::from),
1383        _ => Ok(None),
1384    }
1385}
1386
1387fn vector_scan_warn_exceeded(count: usize) -> bool {
1388    let limit = VECTOR_SCAN_WARN_LIMIT.load(Ordering::Relaxed);
1389    limit > 0 && count > limit
1390}
1391
1392#[derive(Debug, Clone)]
1393pub(crate) struct SearchExecution {
1394    pub results: Vec<ExplainedResult>,
1395    pub receipt: Option<VectorSearchReceiptV1>,
1396}
1397
1398#[derive(Debug, Clone, Default)]
1399struct VectorReceiptMetadata {
1400    codec_family: Option<String>,
1401    codec_profile_digest: Option<String>,
1402    artifact_count: Option<usize>,
1403    artifact_corruption_count: Option<usize>,
1404    artifact_missing_count: Option<usize>,
1405    vector_artifact_manifest_digest: Option<String>,
1406    artifact_generation_id: Option<String>,
1407    approximate_scanned_count: Option<usize>,
1408    approximate_returned_count: Option<usize>,
1409    raw_rows_loaded_count: Option<usize>,
1410    filter_strategy: Option<String>,
1411    vector_artifact_count: Option<usize>,
1412    vector_artifact_missing_count: Option<usize>,
1413    vector_artifact_stale_count: Option<usize>,
1414    exact_rerank_count: Option<usize>,
1415    approximate_candidate_count: Option<usize>,
1416}
1417
1418#[derive(Debug, Clone)]
1419struct VectorSearchOutcome {
1420    hits: Vec<VectorHit>,
1421    candidate_backend: String,
1422    requested_candidates: usize,
1423    returned_candidates: usize,
1424    post_filter_candidates: usize,
1425    fallback: Option<String>,
1426    exact_rerank: bool,
1427    degradations: Vec<String>,
1428    receipt_metadata: VectorReceiptMetadata,
1429}
1430
1431fn rrf_fuse_detailed_with_context(
1432    bm25_hits: &[Bm25Hit],
1433    vector_hits: &[VectorHit],
1434    config: &SearchConfig,
1435    context: &SearchContext,
1436    top_k: usize,
1437) -> Vec<ExplainedResult> {
1438    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
1439    let mut candidates: HashMap<(u8, String), RrfCandidate> = HashMap::new();
1440
1441    for (rank_0, hit) in bm25_hits.iter().enumerate() {
1442        let key = source_dedup_key(&hit.source);
1443        let rank = rank_0 + 1;
1444        candidates
1445            .entry(key)
1446            .and_modify(|candidate| {
1447                candidate.bm25_rank = Some(rank);
1448                candidate.bm25_score = Some(hit.raw_score);
1449                if candidate.updated_at.is_none() {
1450                    candidate.updated_at = hit.updated_at.clone();
1451                }
1452            })
1453            .or_insert_with(|| RrfCandidate {
1454                content: hit.content.clone(),
1455                source: hit.source.clone(),
1456                updated_at: hit.updated_at.clone(),
1457                bm25_score: Some(hit.raw_score),
1458                bm25_rank: Some(rank),
1459                vector_score: None,
1460                vector_rank: None,
1461                vector_source_rank: None,
1462                vector_source_score: None,
1463                vector_reranked_from_f32: false,
1464            });
1465    }
1466
1467    for (rank_0, hit) in vector_hits.iter().enumerate() {
1468        let key = source_dedup_key(&hit.source);
1469        let rank = rank_0 + 1;
1470        candidates
1471            .entry(key)
1472            .and_modify(|candidate| {
1473                candidate.vector_rank = Some(rank);
1474                candidate.vector_score = Some(hit.similarity);
1475                candidate.vector_source_rank = hit.source_rank.or(Some(rank));
1476                candidate.vector_source_score = hit.source_similarity.or(Some(hit.similarity));
1477                candidate.vector_reranked_from_f32 = hit.reranked_from_f32;
1478                if candidate.updated_at.is_none() {
1479                    candidate.updated_at = hit.updated_at.clone();
1480                }
1481            })
1482            .or_insert_with(|| RrfCandidate {
1483                content: hit.content.clone(),
1484                source: hit.source.clone(),
1485                updated_at: hit.updated_at.clone(),
1486                bm25_score: None,
1487                bm25_rank: None,
1488                vector_score: Some(hit.similarity),
1489                vector_rank: Some(rank),
1490                vector_source_rank: hit.source_rank.or(Some(rank)),
1491                vector_source_score: hit.source_similarity.or(Some(hit.similarity)),
1492                vector_reranked_from_f32: hit.reranked_from_f32,
1493            });
1494    }
1495
1496    let mut explained: Vec<ExplainedResult> = candidates
1497        .into_values()
1498        .map(|candidate| candidate.explained(config, context))
1499        .collect();
1500
1501    explained.sort_by(|a, b| {
1502        b.result
1503            .score
1504            .partial_cmp(&a.result.score)
1505            .unwrap_or(std::cmp::Ordering::Equal)
1506            .then_with(|| {
1507                source_dedup_key(&a.result.source).cmp(&source_dedup_key(&b.result.source))
1508            })
1509    });
1510    explained.truncate(top_k);
1511    explained
1512}
1513
1514fn rrf_fuse_detailed(
1515    bm25_hits: &[Bm25Hit],
1516    vector_hits: &[VectorHit],
1517    config: &SearchConfig,
1518    top_k: usize,
1519) -> Vec<ExplainedResult> {
1520    let context = SearchContext::default_now();
1521    rrf_fuse_detailed_with_context(bm25_hits, vector_hits, config, &context, top_k)
1522}
1523
1524pub fn rrf_fuse_with_context(
1525    bm25_hits: &[Bm25Hit],
1526    vector_hits: &[VectorHit],
1527    config: &SearchConfig,
1528    context: &SearchContext,
1529    top_k: usize,
1530) -> Vec<SearchResult> {
1531    rrf_fuse_detailed_with_context(bm25_hits, vector_hits, config, context, top_k)
1532        .into_iter()
1533        .map(|result| result.result)
1534        .collect()
1535}
1536
1537/// Fuse BM25 and vector results via Reciprocal Rank Fusion.
1538pub fn rrf_fuse(
1539    bm25_hits: &[Bm25Hit],
1540    vector_hits: &[VectorHit],
1541    config: &SearchConfig,
1542    top_k: usize,
1543) -> Vec<SearchResult> {
1544    rrf_fuse_detailed(bm25_hits, vector_hits, config, top_k)
1545        .into_iter()
1546        .map(|result| result.result)
1547        .collect()
1548}
1549
1550pub(crate) fn query_embedding_digest(query_embedding: &[f32]) -> String {
1551    let mut builder = DigestBuilder::new();
1552    builder
1553        .update_str("semantic-memory.query_embedding.v1")
1554        .separator()
1555        .update(&(query_embedding.len() as u64).to_le_bytes())
1556        .separator();
1557    for value in query_embedding {
1558        builder.update(&value.to_le_bytes());
1559    }
1560    format!("blake3:{}", builder.finalize().hex())
1561}
1562
1563#[allow(clippy::too_many_arguments)]
1564fn build_receipt(
1565    context: &SearchContext,
1566    query_embedding: &[f32],
1567    search_profile: &str,
1568    candidate_backend: &str,
1569    requested_candidates: usize,
1570    returned_candidates: usize,
1571    post_filter_candidates: usize,
1572    fallback: Option<String>,
1573    exact_rerank: bool,
1574    results: &[ExplainedResult],
1575    degradations: Vec<String>,
1576) -> Option<VectorSearchReceiptV1> {
1577    build_receipt_with_metadata(
1578        context,
1579        query_embedding,
1580        search_profile,
1581        candidate_backend,
1582        requested_candidates,
1583        returned_candidates,
1584        post_filter_candidates,
1585        fallback,
1586        exact_rerank,
1587        results,
1588        degradations,
1589        VectorReceiptMetadata::default(),
1590    )
1591}
1592
1593#[allow(clippy::too_many_arguments)]
1594fn build_receipt_with_metadata(
1595    context: &SearchContext,
1596    query_embedding: &[f32],
1597    search_profile: &str,
1598    candidate_backend: &str,
1599    requested_candidates: usize,
1600    returned_candidates: usize,
1601    post_filter_candidates: usize,
1602    fallback: Option<String>,
1603    exact_rerank: bool,
1604    results: &[ExplainedResult],
1605    degradations: Vec<String>,
1606    metadata: VectorReceiptMetadata,
1607) -> Option<VectorSearchReceiptV1> {
1608    if !context.receipts_enabled() {
1609        return None;
1610    }
1611    Some(VectorSearchReceiptV1 {
1612        schema_version: "vector_search_receipt_v1".to_string(),
1613        receipt_digest: None,
1614        receipt_id: context
1615            .request_id
1616            .clone()
1617            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1618        evaluation_time: context.evaluation_time,
1619        trace_id: context.trace_id.clone(),
1620        attempt_family_id: context.attempt_family_id.clone(),
1621        attempt_id: context.attempt_id.clone(),
1622        replay_of: context.replay_of.clone(),
1623        query_embedding_digest: Some(query_embedding_digest(query_embedding)),
1624        query_text_digest: context.query_text_digest.clone(),
1625        query_input_digest: context.query_input_digest.clone(),
1626        filter_digest: context.filter_digest.clone(),
1627        redaction_state: context.redaction_state.clone(),
1628        budget_id: context.budget_id.clone(),
1629        deadline_at: context.deadline_at,
1630        search_profile: search_profile.to_string(),
1631        candidate_backend: candidate_backend.to_string(),
1632        codec_family: metadata.codec_family.clone(),
1633        codec_profile_digest: metadata.codec_profile_digest.clone(),
1634        artifact_profile_digest: metadata.codec_profile_digest.clone(),
1635        artifact_count: metadata.artifact_count,
1636        artifact_corruption_count: metadata.artifact_corruption_count,
1637        artifact_missing_count: metadata.artifact_missing_count,
1638        vector_artifact_manifest_digest: metadata.vector_artifact_manifest_digest,
1639        artifact_generation_id: metadata.artifact_generation_id,
1640        approximate_scanned_count: metadata.approximate_scanned_count,
1641        approximate_returned_count: metadata.approximate_returned_count,
1642        raw_rows_loaded_count: metadata.raw_rows_loaded_count,
1643        filter_strategy: metadata.filter_strategy,
1644        vector_artifact_count: metadata.vector_artifact_count.or(metadata.artifact_count),
1645        vector_artifact_missing_count: metadata
1646            .vector_artifact_missing_count
1647            .or(metadata.artifact_missing_count),
1648        vector_artifact_stale_count: metadata.vector_artifact_stale_count,
1649        exact_rerank_count: metadata.exact_rerank_count.or(if exact_rerank {
1650            Some(post_filter_candidates)
1651        } else {
1652            None
1653        }),
1654        approximate_candidate_count: metadata.approximate_candidate_count,
1655        approximate: candidate_backend.contains("hnsw")
1656            || candidate_backend.contains("turbo_quant"),
1657        requested_candidates,
1658        returned_candidates,
1659        post_filter_candidates,
1660        fallback_reason: fallback.clone(),
1661        fallback,
1662        exact_rerank,
1663        result_ids: results
1664            .iter()
1665            .map(|result| search_result_id(&result.result.source))
1666            .collect(),
1667        degradations,
1668    })
1669}
1670
1671#[cfg(feature = "hnsw")]
1672fn filters_are_active(
1673    namespaces: Option<&[&str]>,
1674    source_types: Option<&[SearchSourceType]>,
1675    session_ids: Option<&[&str]>,
1676) -> bool {
1677    namespaces.is_some_and(|values| !values.is_empty())
1678        || source_types.is_some_and(|values| !values.is_empty())
1679        || session_ids.is_some_and(|values| !values.is_empty())
1680}
1681
1682#[allow(clippy::too_many_arguments)]
1683pub(crate) fn hybrid_search_detailed_with_context(
1684    conn: &Connection,
1685    query: &str,
1686    query_embedding: &[f32],
1687    config: &SearchConfig,
1688    context: &SearchContext,
1689    top_k: usize,
1690    namespaces: Option<&[&str]>,
1691    source_types: Option<&[SearchSourceType]>,
1692    session_ids: Option<&[&str]>,
1693) -> Result<SearchExecution, MemoryError> {
1694    let bm25_hits = match sanitize_fts_query(query) {
1695        Some(sanitized) => bm25_search(
1696            conn,
1697            &sanitized,
1698            config.candidate_pool_size,
1699            namespaces,
1700            source_types,
1701            session_ids,
1702        )?,
1703        None => Vec::new(),
1704    };
1705
1706    let vector_outcome = vector_search_with_backend(
1707        conn,
1708        query_embedding,
1709        config.candidate_pool_size,
1710        config.min_similarity,
1711        config,
1712        context,
1713        namespaces,
1714        source_types,
1715        session_ids,
1716    )?;
1717
1718    let results =
1719        rrf_fuse_detailed_with_context(&bm25_hits, &vector_outcome.hits, config, context, top_k);
1720    let receipt = build_receipt_with_metadata(
1721        context,
1722        query_embedding,
1723        "hybrid",
1724        &vector_outcome.candidate_backend,
1725        vector_outcome.requested_candidates,
1726        vector_outcome.returned_candidates,
1727        vector_outcome.post_filter_candidates,
1728        vector_outcome.fallback,
1729        vector_outcome.exact_rerank,
1730        &results,
1731        vector_outcome.degradations,
1732        vector_outcome.receipt_metadata,
1733    );
1734    Ok(SearchExecution { results, receipt })
1735}
1736
1737#[allow(clippy::too_many_arguments)]
1738pub(crate) fn hybrid_search_detailed(
1739    conn: &Connection,
1740    query: &str,
1741    query_embedding: &[f32],
1742    config: &SearchConfig,
1743    top_k: usize,
1744    namespaces: Option<&[&str]>,
1745    source_types: Option<&[SearchSourceType]>,
1746    session_ids: Option<&[&str]>,
1747) -> Result<Vec<ExplainedResult>, MemoryError> {
1748    let context = SearchContext::default_now();
1749    Ok(hybrid_search_detailed_with_context(
1750        conn,
1751        query,
1752        query_embedding,
1753        config,
1754        &context,
1755        top_k,
1756        namespaces,
1757        source_types,
1758        session_ids,
1759    )?
1760    .results)
1761}
1762
1763/// Perform a hybrid search and return the exact score decomposition.
1764#[allow(clippy::too_many_arguments)]
1765pub fn hybrid_search_explained(
1766    conn: &Connection,
1767    query: &str,
1768    query_embedding: &[f32],
1769    config: &SearchConfig,
1770    top_k: usize,
1771    namespaces: Option<&[&str]>,
1772    source_types: Option<&[SearchSourceType]>,
1773    session_ids: Option<&[&str]>,
1774) -> Result<Vec<ExplainedResult>, MemoryError> {
1775    hybrid_search_detailed(
1776        conn,
1777        query,
1778        query_embedding,
1779        config,
1780        top_k,
1781        namespaces,
1782        source_types,
1783        session_ids,
1784    )
1785}
1786
1787/// Perform a hybrid search (BM25 + vector + RRF).
1788#[allow(clippy::too_many_arguments)]
1789pub fn hybrid_search(
1790    conn: &Connection,
1791    query: &str,
1792    query_embedding: &[f32],
1793    config: &SearchConfig,
1794    top_k: usize,
1795    namespaces: Option<&[&str]>,
1796    source_types: Option<&[SearchSourceType]>,
1797    session_ids: Option<&[&str]>,
1798) -> Result<Vec<SearchResult>, MemoryError> {
1799    Ok(hybrid_search_detailed(
1800        conn,
1801        query,
1802        query_embedding,
1803        config,
1804        top_k,
1805        namespaces,
1806        source_types,
1807        session_ids,
1808    )?
1809    .into_iter()
1810    .map(|result| result.result)
1811    .collect())
1812}
1813
1814#[cfg(feature = "hnsw")]
1815#[derive(Clone)]
1816struct HnswCandidateSeed {
1817    source_rank: usize,
1818    source_similarity: f64,
1819}
1820
1821#[cfg(feature = "hnsw")]
1822#[allow(clippy::type_complexity)]
1823fn resolve_hnsw_hits_batched(
1824    conn: &Connection,
1825    query_embedding: &[f32],
1826    config: &SearchConfig,
1827    namespaces: Option<&[&str]>,
1828    source_types: Option<&[SearchSourceType]>,
1829    session_ids: Option<&[&str]>,
1830    hnsw_hits: &[crate::hnsw::HnswHit],
1831) -> Result<Vec<VectorHit>, MemoryError> {
1832    let search_facts = source_types
1833        .map(|st| st.contains(&SearchSourceType::Facts))
1834        .unwrap_or(true);
1835    let search_chunks = source_types
1836        .map(|st| st.contains(&SearchSourceType::Chunks))
1837        .unwrap_or(true);
1838    let search_messages = source_types
1839        .map(|st| st.contains(&SearchSourceType::Messages))
1840        .unwrap_or(false);
1841    let search_episodes = source_types
1842        .map(|st| st.contains(&SearchSourceType::Episodes))
1843        .unwrap_or(true);
1844
1845    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
1846    let mut fact_entries: HashMap<String, HnswCandidateSeed> = HashMap::new();
1847    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
1848    let mut chunk_entries: HashMap<String, HnswCandidateSeed> = HashMap::new();
1849    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
1850    let mut message_entries: HashMap<i64, HnswCandidateSeed> = HashMap::new();
1851    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
1852    let mut episode_entries: HashMap<String, HnswCandidateSeed> = HashMap::new();
1853
1854    for (rank_0, hit) in hnsw_hits.iter().enumerate() {
1855        let similarity = hit.similarity() as f64;
1856        if similarity < config.min_similarity {
1857            continue;
1858        }
1859
1860        let (domain, raw_id) = hit.parse_key()?;
1861        let seed = HnswCandidateSeed {
1862            source_rank: rank_0 + 1,
1863            source_similarity: similarity,
1864        };
1865
1866        match domain {
1867            "fact" if search_facts => {
1868                fact_entries.entry(raw_id.to_string()).or_insert(seed);
1869            }
1870            "chunk" if search_chunks => {
1871                chunk_entries.entry(raw_id.to_string()).or_insert(seed);
1872            }
1873            "msg" if search_messages => {
1874                if let Ok(message_id) = raw_id.parse::<i64>() {
1875                    message_entries.entry(message_id).or_insert(seed);
1876                }
1877            }
1878            "episode" if search_episodes => {
1879                episode_entries.entry(raw_id.to_string()).or_insert(seed);
1880            }
1881            _ => {}
1882        }
1883    }
1884
1885    let mut hits = Vec::new();
1886    batch_load_fact_hits(
1887        conn,
1888        query_embedding,
1889        config,
1890        namespaces,
1891        &fact_entries,
1892        &mut hits,
1893    )?;
1894    batch_load_chunk_hits(
1895        conn,
1896        query_embedding,
1897        config,
1898        namespaces,
1899        &chunk_entries,
1900        &mut hits,
1901    )?;
1902    batch_load_message_hits(
1903        conn,
1904        query_embedding,
1905        config,
1906        session_ids,
1907        &message_entries,
1908        &mut hits,
1909    )?;
1910    batch_load_episode_hits(
1911        conn,
1912        query_embedding,
1913        config,
1914        namespaces,
1915        &episode_entries,
1916        &mut hits,
1917    )?;
1918
1919    hits.sort_by(|a, b| {
1920        b.similarity
1921            .partial_cmp(&a.similarity)
1922            .unwrap_or(std::cmp::Ordering::Equal)
1923            .then_with(|| {
1924                a.source_rank
1925                    .unwrap_or(usize::MAX)
1926                    .cmp(&b.source_rank.unwrap_or(usize::MAX))
1927            })
1928    });
1929    hits.truncate(config.candidate_pool_size);
1930    Ok(hits)
1931}
1932
1933#[cfg(feature = "hnsw")]
1934fn exact_similarity_from_blob(
1935    query_embedding: &[f32],
1936    blob: &[u8],
1937) -> Result<Option<f64>, MemoryError> {
1938    if blob.is_empty() {
1939        return Ok(None);
1940    }
1941    let stored = crate::db::bytes_to_embedding(blob)?;
1942    if stored.len() != query_embedding.len() {
1943        return Ok(None);
1944    }
1945    Ok(Some(cosine_similarity(query_embedding, &stored)? as f64))
1946}
1947
1948#[cfg(feature = "hnsw")]
1949#[allow(clippy::too_many_arguments)]
1950fn build_ranked_vector_hit(
1951    id: String,
1952    content: String,
1953    source: SearchSource,
1954    updated_at: Option<String>,
1955    embedding_blob: Option<Vec<u8>>,
1956    seed: &HnswCandidateSeed,
1957    query_embedding: &[f32],
1958    config: &SearchConfig,
1959) -> Result<Option<VectorHit>, MemoryError> {
1960    let similarity = if config.rerank_from_f32 {
1961        match embedding_blob {
1962            Some(blob) => exact_similarity_from_blob(query_embedding, &blob)?,
1963            None => None,
1964        }
1965        .unwrap_or(seed.source_similarity)
1966    } else {
1967        seed.source_similarity
1968    };
1969
1970    if similarity < config.min_similarity {
1971        return Ok(None);
1972    }
1973
1974    Ok(Some(VectorHit {
1975        id,
1976        content,
1977        source,
1978        similarity,
1979        updated_at,
1980        source_rank: Some(seed.source_rank),
1981        source_similarity: Some(seed.source_similarity),
1982        reranked_from_f32: config.rerank_from_f32,
1983    }))
1984}
1985
1986#[cfg(feature = "hnsw")]
1987fn batch_load_fact_hits(
1988    conn: &Connection,
1989    query_embedding: &[f32],
1990    config: &SearchConfig,
1991    namespaces: Option<&[&str]>,
1992    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
1993    entries: &HashMap<String, HnswCandidateSeed>,
1994    output: &mut Vec<VectorHit>,
1995) -> Result<(), MemoryError> {
1996    if entries.is_empty() {
1997        return Ok(());
1998    }
1999
2000    let placeholders = (1..=entries.len())
2001        .map(|idx| format!("?{idx}"))
2002        .collect::<Vec<_>>()
2003        .join(", ");
2004    let sql = format!(
2005        "SELECT id, content, namespace, updated_at, embedding
2006         FROM facts
2007         WHERE id IN ({placeholders})"
2008    );
2009    let params: Vec<SqlValue> = entries
2010        .keys()
2011        .map(|id| SqlValue::Text(id.clone()))
2012        .collect();
2013    let mut stmt = conn.prepare(&sql)?;
2014    let rows = stmt.query_map(rusqlite::params_from_iter(&params), |row| {
2015        Ok((
2016            row.get::<_, String>(0)?,
2017            row.get::<_, String>(1)?,
2018            row.get::<_, String>(2)?,
2019            row.get::<_, Option<String>>(3)?,
2020            row.get::<_, Option<Vec<u8>>>(4)?,
2021        ))
2022    })?;
2023
2024    for row in rows {
2025        let (fact_id, content, namespace, updated_at, embedding_blob) = row?;
2026        if let Some(filter) = namespaces {
2027            if !filter.contains(&namespace.as_str()) {
2028                continue;
2029            }
2030        }
2031        if let Some(seed) = entries.get(&fact_id) {
2032            if let Some(hit) = build_ranked_vector_hit(
2033                format!("fact:{fact_id}"),
2034                content,
2035                SearchSource::Fact { fact_id, namespace },
2036                updated_at,
2037                embedding_blob,
2038                seed,
2039                query_embedding,
2040                config,
2041            )? {
2042                output.push(hit);
2043            }
2044        }
2045    }
2046
2047    Ok(())
2048}
2049
2050#[cfg(feature = "hnsw")]
2051fn batch_load_chunk_hits(
2052    conn: &Connection,
2053    query_embedding: &[f32],
2054    config: &SearchConfig,
2055    namespaces: Option<&[&str]>,
2056    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
2057    entries: &HashMap<String, HnswCandidateSeed>,
2058    output: &mut Vec<VectorHit>,
2059) -> Result<(), MemoryError> {
2060    if entries.is_empty() {
2061        return Ok(());
2062    }
2063
2064    let placeholders = (1..=entries.len())
2065        .map(|idx| format!("?{idx}"))
2066        .collect::<Vec<_>>()
2067        .join(", ");
2068    let sql = format!(
2069        "SELECT c.id, c.content, c.document_id, d.title, c.chunk_index, c.created_at, d.namespace, c.embedding
2070         FROM chunks c
2071         JOIN documents d ON d.id = c.document_id
2072         WHERE c.id IN ({placeholders})"
2073    );
2074    let params: Vec<SqlValue> = entries
2075        .keys()
2076        .map(|id| SqlValue::Text(id.clone()))
2077        .collect();
2078    let mut stmt = conn.prepare(&sql)?;
2079    let rows = stmt.query_map(rusqlite::params_from_iter(&params), |row| {
2080        Ok((
2081            row.get::<_, String>(0)?,
2082            row.get::<_, String>(1)?,
2083            row.get::<_, String>(2)?,
2084            row.get::<_, String>(3)?,
2085            row.get::<_, i64>(4)?,
2086            row.get::<_, Option<String>>(5)?,
2087            row.get::<_, String>(6)?,
2088            row.get::<_, Option<Vec<u8>>>(7)?,
2089        ))
2090    })?;
2091
2092    for row in rows {
2093        let (
2094            chunk_id,
2095            content,
2096            document_id,
2097            document_title,
2098            chunk_index,
2099            updated_at,
2100            namespace,
2101            embedding_blob,
2102        ) = row?;
2103        if let Some(filter) = namespaces {
2104            if !filter.contains(&namespace.as_str()) {
2105                continue;
2106            }
2107        }
2108        if let Some(seed) = entries.get(&chunk_id) {
2109            if let Some(hit) = build_ranked_vector_hit(
2110                format!("chunk:{chunk_id}"),
2111                content,
2112                SearchSource::Chunk {
2113                    chunk_id,
2114                    document_id,
2115                    document_title,
2116                    chunk_index: chunk_index as usize,
2117                },
2118                updated_at,
2119                embedding_blob,
2120                seed,
2121                query_embedding,
2122                config,
2123            )? {
2124                output.push(hit);
2125            }
2126        }
2127    }
2128
2129    Ok(())
2130}
2131
2132#[cfg(feature = "hnsw")]
2133fn batch_load_message_hits(
2134    conn: &Connection,
2135    query_embedding: &[f32],
2136    config: &SearchConfig,
2137    session_ids: Option<&[&str]>,
2138    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
2139    entries: &HashMap<i64, HnswCandidateSeed>,
2140    output: &mut Vec<VectorHit>,
2141) -> Result<(), MemoryError> {
2142    if entries.is_empty() {
2143        return Ok(());
2144    }
2145
2146    let placeholders = (1..=entries.len())
2147        .map(|idx| format!("?{idx}"))
2148        .collect::<Vec<_>>()
2149        .join(", ");
2150    let sql = format!(
2151        "SELECT id, content, session_id, role, created_at, embedding
2152         FROM messages
2153         WHERE id IN ({placeholders})"
2154    );
2155    let params: Vec<SqlValue> = entries.keys().map(|id| SqlValue::Integer(*id)).collect();
2156    let mut stmt = conn.prepare(&sql)?;
2157    let rows = stmt.query_map(rusqlite::params_from_iter(&params), |row| {
2158        Ok((
2159            row.get::<_, i64>(0)?,
2160            row.get::<_, String>(1)?,
2161            row.get::<_, String>(2)?,
2162            row.get::<_, String>(3)?,
2163            row.get::<_, Option<String>>(4)?,
2164            row.get::<_, Option<Vec<u8>>>(5)?,
2165        ))
2166    })?;
2167
2168    for row in rows {
2169        let (message_id, content, session_id, role, updated_at, embedding_blob) = row?;
2170        if let Some(filter) = session_ids {
2171            if !filter.contains(&session_id.as_str()) {
2172                continue;
2173            }
2174        }
2175        if let Some(seed) = entries.get(&message_id) {
2176            if let Some(hit) = build_ranked_vector_hit(
2177                format!("msg:{message_id}"),
2178                content,
2179                SearchSource::Message {
2180                    message_id,
2181                    session_id,
2182                    role,
2183                },
2184                updated_at,
2185                embedding_blob,
2186                seed,
2187                query_embedding,
2188                config,
2189            )? {
2190                output.push(hit);
2191            }
2192        }
2193    }
2194
2195    Ok(())
2196}
2197
2198#[cfg(feature = "hnsw")]
2199fn batch_load_episode_hits(
2200    conn: &Connection,
2201    query_embedding: &[f32],
2202    config: &SearchConfig,
2203    namespaces: Option<&[&str]>,
2204    // CONVENTION EXCEPTION: O(1) lookup required for performance-critical search path
2205    entries: &HashMap<String, HnswCandidateSeed>,
2206    output: &mut Vec<VectorHit>,
2207) -> Result<(), MemoryError> {
2208    if entries.is_empty() {
2209        return Ok(());
2210    }
2211
2212    let placeholders = (1..=entries.len())
2213        .map(|idx| format!("?{idx}"))
2214        .collect::<Vec<_>>()
2215        .join(", ");
2216    let sql = format!(
2217        "SELECT e.episode_id, e.document_id, e.search_text, e.effect_type, e.outcome, e.updated_at, d.namespace, e.embedding
2218         FROM episodes e
2219         JOIN documents d ON d.id = e.document_id
2220         WHERE e.episode_id IN ({placeholders})"
2221    );
2222    let params: Vec<SqlValue> = entries
2223        .keys()
2224        .map(|id| SqlValue::Text(id.clone()))
2225        .collect();
2226    let mut stmt = conn.prepare(&sql)?;
2227    let rows = stmt.query_map(rusqlite::params_from_iter(&params), |row| {
2228        Ok((
2229            row.get::<_, String>(0)?,
2230            row.get::<_, String>(1)?,
2231            row.get::<_, String>(2)?,
2232            row.get::<_, String>(3)?,
2233            row.get::<_, String>(4)?,
2234            row.get::<_, Option<String>>(5)?,
2235            row.get::<_, String>(6)?,
2236            row.get::<_, Option<Vec<u8>>>(7)?,
2237        ))
2238    })?;
2239
2240    for row in rows {
2241        let (
2242            episode_id,
2243            document_id,
2244            content,
2245            effect_type,
2246            outcome,
2247            updated_at,
2248            namespace,
2249            embedding_blob,
2250        ) = row?;
2251        if let Some(filter) = namespaces {
2252            if !filter.contains(&namespace.as_str()) {
2253                continue;
2254            }
2255        }
2256        if let Some(seed) = entries.get(&episode_id) {
2257            if let Some(hit) = build_ranked_vector_hit(
2258                episodes::episode_item_key(&episode_id),
2259                content,
2260                SearchSource::Episode {
2261                    episode_id,
2262                    document_id,
2263                    effect_type,
2264                    outcome,
2265                },
2266                updated_at,
2267                embedding_blob,
2268                seed,
2269                query_embedding,
2270                config,
2271            )? {
2272                output.push(hit);
2273            }
2274        }
2275    }
2276
2277    Ok(())
2278}
2279
2280/// Perform a hybrid search using pre-computed HNSW hits for the vector component.
2281#[cfg(feature = "hnsw")]
2282#[allow(clippy::too_many_arguments)]
2283pub fn hybrid_search_with_hnsw(
2284    conn: &Connection,
2285    query: &str,
2286    query_embedding: &[f32],
2287    config: &SearchConfig,
2288    top_k: usize,
2289    namespaces: Option<&[&str]>,
2290    source_types: Option<&[SearchSourceType]>,
2291    session_ids: Option<&[&str]>,
2292    hnsw_hits: &[crate::hnsw::HnswHit],
2293) -> Result<Vec<SearchResult>, MemoryError> {
2294    Ok(hybrid_search_with_hnsw_detailed(
2295        conn,
2296        query,
2297        query_embedding,
2298        config,
2299        top_k,
2300        namespaces,
2301        source_types,
2302        session_ids,
2303        hnsw_hits,
2304    )?
2305    .into_iter()
2306    .map(|result| result.result)
2307    .collect())
2308}
2309
2310#[cfg(feature = "hnsw")]
2311#[allow(clippy::too_many_arguments)]
2312pub(crate) fn hybrid_search_with_hnsw_detailed_with_context(
2313    conn: &Connection,
2314    query: &str,
2315    query_embedding: &[f32],
2316    config: &SearchConfig,
2317    context: &SearchContext,
2318    top_k: usize,
2319    namespaces: Option<&[&str]>,
2320    source_types: Option<&[SearchSourceType]>,
2321    session_ids: Option<&[&str]>,
2322    hnsw_hits: &[crate::hnsw::HnswHit],
2323) -> Result<SearchExecution, MemoryError> {
2324    let bm25_hits = match sanitize_fts_query(query) {
2325        Some(sanitized) => bm25_search(
2326            conn,
2327            &sanitized,
2328            config.candidate_pool_size,
2329            namespaces,
2330            source_types,
2331            session_ids,
2332        )?,
2333        None => Vec::new(),
2334    };
2335
2336    let mut vector_hits = resolve_hnsw_hits_batched(
2337        conn,
2338        query_embedding,
2339        config,
2340        namespaces,
2341        source_types,
2342        session_ids,
2343        hnsw_hits,
2344    )?;
2345    let mut fallback = None;
2346    let mut degradations = Vec::new();
2347    let mut backend = "hnsw";
2348    let mut exact_rerank = config.rerank_from_f32;
2349
2350    if !hnsw_hits.is_empty()
2351        && vector_hits.len() < top_k
2352        && filters_are_active(namespaces, source_types, session_ids)
2353    {
2354        fallback = Some("hnsw_filtered_underreturn_fallback".to_string());
2355        degradations.push(format!(
2356            "HNSW returned {} post-filter vector candidates for requested top_k {}; exact filtered fallback was used",
2357            vector_hits.len(),
2358            top_k
2359        ));
2360        vector_hits = vector_search(
2361            conn,
2362            query_embedding,
2363            config.candidate_pool_size,
2364            config.min_similarity,
2365            namespaces,
2366            source_types,
2367            session_ids,
2368        )?;
2369        backend = "hnsw_then_brute_force_f32";
2370        exact_rerank = true;
2371    }
2372
2373    let results = rrf_fuse_detailed_with_context(&bm25_hits, &vector_hits, config, context, top_k);
2374    let receipt = build_receipt(
2375        context,
2376        query_embedding,
2377        "hybrid",
2378        backend,
2379        config.candidate_pool_size,
2380        hnsw_hits.len(),
2381        vector_hits.len(),
2382        fallback,
2383        exact_rerank,
2384        &results,
2385        degradations,
2386    );
2387
2388    Ok(SearchExecution { results, receipt })
2389}
2390
2391#[cfg(feature = "hnsw")]
2392#[allow(clippy::too_many_arguments)]
2393pub(crate) fn hybrid_search_with_hnsw_detailed(
2394    conn: &Connection,
2395    query: &str,
2396    query_embedding: &[f32],
2397    config: &SearchConfig,
2398    top_k: usize,
2399    namespaces: Option<&[&str]>,
2400    source_types: Option<&[SearchSourceType]>,
2401    session_ids: Option<&[&str]>,
2402    hnsw_hits: &[crate::hnsw::HnswHit],
2403) -> Result<Vec<ExplainedResult>, MemoryError> {
2404    let context = SearchContext::default_now();
2405    Ok(hybrid_search_with_hnsw_detailed_with_context(
2406        conn,
2407        query,
2408        query_embedding,
2409        config,
2410        &context,
2411        top_k,
2412        namespaces,
2413        source_types,
2414        session_ids,
2415        hnsw_hits,
2416    )?
2417    .results)
2418}
2419
2420/// Perform a hybrid HNSW-backed search and return the exact score decomposition.
2421#[cfg(feature = "hnsw")]
2422#[allow(clippy::too_many_arguments)]
2423pub fn hybrid_search_explained_with_hnsw(
2424    conn: &Connection,
2425    query: &str,
2426    query_embedding: &[f32],
2427    config: &SearchConfig,
2428    top_k: usize,
2429    namespaces: Option<&[&str]>,
2430    source_types: Option<&[SearchSourceType]>,
2431    session_ids: Option<&[&str]>,
2432    hnsw_hits: &[crate::hnsw::HnswHit],
2433) -> Result<Vec<ExplainedResult>, MemoryError> {
2434    hybrid_search_with_hnsw_detailed(
2435        conn,
2436        query,
2437        query_embedding,
2438        config,
2439        top_k,
2440        namespaces,
2441        source_types,
2442        session_ids,
2443        hnsw_hits,
2444    )
2445}
2446
2447pub(crate) fn fts_only_search_detailed(
2448    conn: &Connection,
2449    query: &str,
2450    config: &SearchConfig,
2451    top_k: usize,
2452    namespaces: Option<&[&str]>,
2453    source_types: Option<&[SearchSourceType]>,
2454    session_ids: Option<&[&str]>,
2455) -> Result<Vec<ExplainedResult>, MemoryError> {
2456    let sanitized = match sanitize_fts_query(query) {
2457        Some(value) => value,
2458        None => return Ok(Vec::new()),
2459    };
2460    let bm25_hits = bm25_search(
2461        conn,
2462        &sanitized,
2463        top_k,
2464        namespaces,
2465        source_types,
2466        session_ids,
2467    )?;
2468    Ok(rrf_fuse_detailed(&bm25_hits, &[], config, top_k))
2469}
2470
2471/// Full-text search only (no embeddings needed). Synchronous.
2472pub fn fts_only_search(
2473    conn: &Connection,
2474    query: &str,
2475    config: &SearchConfig,
2476    top_k: usize,
2477    namespaces: Option<&[&str]>,
2478    source_types: Option<&[SearchSourceType]>,
2479    session_ids: Option<&[&str]>,
2480) -> Result<Vec<SearchResult>, MemoryError> {
2481    Ok(fts_only_search_detailed(
2482        conn,
2483        query,
2484        config,
2485        top_k,
2486        namespaces,
2487        source_types,
2488        session_ids,
2489    )?
2490    .into_iter()
2491    .map(|result| result.result)
2492    .collect())
2493}
2494
2495#[allow(clippy::too_many_arguments)]
2496pub(crate) fn vector_only_search_detailed_with_context(
2497    conn: &Connection,
2498    query_embedding: &[f32],
2499    config: &SearchConfig,
2500    context: &SearchContext,
2501    top_k: usize,
2502    namespaces: Option<&[&str]>,
2503    source_types: Option<&[SearchSourceType]>,
2504    session_ids: Option<&[&str]>,
2505) -> Result<SearchExecution, MemoryError> {
2506    let vector_outcome = vector_search_with_backend(
2507        conn,
2508        query_embedding,
2509        top_k,
2510        config.min_similarity,
2511        config,
2512        context,
2513        namespaces,
2514        source_types,
2515        session_ids,
2516    )?;
2517    let results = rrf_fuse_detailed_with_context(&[], &vector_outcome.hits, config, context, top_k);
2518    let receipt = build_receipt_with_metadata(
2519        context,
2520        query_embedding,
2521        "vector_only",
2522        &vector_outcome.candidate_backend,
2523        vector_outcome.requested_candidates,
2524        vector_outcome.returned_candidates,
2525        vector_outcome.post_filter_candidates,
2526        vector_outcome.fallback,
2527        vector_outcome.exact_rerank,
2528        &results,
2529        vector_outcome.degradations,
2530        vector_outcome.receipt_metadata,
2531    );
2532    Ok(SearchExecution { results, receipt })
2533}
2534
2535pub(crate) fn vector_only_search_detailed(
2536    conn: &Connection,
2537    query_embedding: &[f32],
2538    config: &SearchConfig,
2539    top_k: usize,
2540    namespaces: Option<&[&str]>,
2541    source_types: Option<&[SearchSourceType]>,
2542    session_ids: Option<&[&str]>,
2543) -> Result<Vec<ExplainedResult>, MemoryError> {
2544    let context = SearchContext::default_now();
2545    Ok(vector_only_search_detailed_with_context(
2546        conn,
2547        query_embedding,
2548        config,
2549        &context,
2550        top_k,
2551        namespaces,
2552        source_types,
2553        session_ids,
2554    )?
2555    .results)
2556}
2557
2558/// Vector-only search. Called after embedding the query.
2559pub fn vector_only_search(
2560    conn: &Connection,
2561    query_embedding: &[f32],
2562    config: &SearchConfig,
2563    top_k: usize,
2564    namespaces: Option<&[&str]>,
2565    source_types: Option<&[SearchSourceType]>,
2566    session_ids: Option<&[&str]>,
2567) -> Result<Vec<SearchResult>, MemoryError> {
2568    Ok(vector_only_search_detailed(
2569        conn,
2570        query_embedding,
2571        config,
2572        top_k,
2573        namespaces,
2574        source_types,
2575        session_ids,
2576    )?
2577    .into_iter()
2578    .map(|result| result.result)
2579    .collect())
2580}
2581
2582#[cfg(test)]
2583mod digest_tests {
2584    use super::query_embedding_digest;
2585
2586    #[test]
2587    fn query_embedding_digest_includes_dimension_and_bytes() {
2588        let two_dims = query_embedding_digest(&[1.0, 2.0]);
2589        let three_dims = query_embedding_digest(&[1.0, 2.0, 0.0]);
2590        let changed_byte = query_embedding_digest(&[1.0, 2.000_001]);
2591
2592        assert!(two_dims.starts_with("blake3:"));
2593        assert_eq!(two_dims.len(), 71);
2594        assert_ne!(two_dims, three_dims);
2595        assert_ne!(two_dims, changed_byte);
2596        assert_eq!(two_dims, query_embedding_digest(&[1.0, 2.0]));
2597    }
2598}
2599
2600/// Vector-only search using pre-computed HNSW hits.
2601#[cfg(feature = "hnsw")]
2602#[allow(clippy::too_many_arguments)]
2603pub fn vector_only_search_with_hnsw(
2604    conn: &Connection,
2605    query_embedding: &[f32],
2606    config: &SearchConfig,
2607    top_k: usize,
2608    namespaces: Option<&[&str]>,
2609    source_types: Option<&[SearchSourceType]>,
2610    session_ids: Option<&[&str]>,
2611    hnsw_hits: &[crate::hnsw::HnswHit],
2612) -> Result<Vec<SearchResult>, MemoryError> {
2613    Ok(vector_only_search_with_hnsw_detailed(
2614        conn,
2615        query_embedding,
2616        config,
2617        top_k,
2618        namespaces,
2619        source_types,
2620        session_ids,
2621        hnsw_hits,
2622    )?
2623    .into_iter()
2624    .map(|result| result.result)
2625    .collect())
2626}
2627
2628#[cfg(feature = "hnsw")]
2629#[allow(clippy::too_many_arguments)]
2630pub(crate) fn vector_only_search_with_hnsw_detailed_with_context(
2631    conn: &Connection,
2632    query_embedding: &[f32],
2633    config: &SearchConfig,
2634    context: &SearchContext,
2635    top_k: usize,
2636    namespaces: Option<&[&str]>,
2637    source_types: Option<&[SearchSourceType]>,
2638    session_ids: Option<&[&str]>,
2639    hnsw_hits: &[crate::hnsw::HnswHit],
2640) -> Result<SearchExecution, MemoryError> {
2641    let mut vector_hits = resolve_hnsw_hits_batched(
2642        conn,
2643        query_embedding,
2644        config,
2645        namespaces,
2646        source_types,
2647        session_ids,
2648        hnsw_hits,
2649    )?;
2650    let mut fallback = None;
2651    let mut degradations = Vec::new();
2652    let mut backend = "hnsw";
2653    let mut exact_rerank = config.rerank_from_f32;
2654
2655    if !hnsw_hits.is_empty()
2656        && vector_hits.len() < top_k
2657        && filters_are_active(namespaces, source_types, session_ids)
2658    {
2659        fallback = Some("hnsw_filtered_underreturn_fallback".to_string());
2660        degradations.push(format!(
2661            "HNSW returned {} post-filter vector candidates for requested top_k {}; exact filtered fallback was used",
2662            vector_hits.len(),
2663            top_k
2664        ));
2665        vector_hits = vector_search(
2666            conn,
2667            query_embedding,
2668            top_k,
2669            config.min_similarity,
2670            namespaces,
2671            source_types,
2672            session_ids,
2673        )?;
2674        backend = "hnsw_then_brute_force_f32";
2675        exact_rerank = true;
2676    }
2677
2678    let results = rrf_fuse_detailed_with_context(&[], &vector_hits, config, context, top_k);
2679    let receipt = build_receipt(
2680        context,
2681        query_embedding,
2682        "vector_only",
2683        backend,
2684        top_k,
2685        hnsw_hits.len(),
2686        vector_hits.len(),
2687        fallback,
2688        exact_rerank,
2689        &results,
2690        degradations,
2691    );
2692    Ok(SearchExecution { results, receipt })
2693}
2694
2695#[cfg(feature = "hnsw")]
2696#[allow(clippy::too_many_arguments)]
2697pub(crate) fn vector_only_search_with_hnsw_detailed(
2698    conn: &Connection,
2699    query_embedding: &[f32],
2700    config: &SearchConfig,
2701    top_k: usize,
2702    namespaces: Option<&[&str]>,
2703    source_types: Option<&[SearchSourceType]>,
2704    session_ids: Option<&[&str]>,
2705    hnsw_hits: &[crate::hnsw::HnswHit],
2706) -> Result<Vec<ExplainedResult>, MemoryError> {
2707    let context = SearchContext::default_now();
2708    Ok(vector_only_search_with_hnsw_detailed_with_context(
2709        conn,
2710        query_embedding,
2711        config,
2712        &context,
2713        top_k,
2714        namespaces,
2715        source_types,
2716        session_ids,
2717        hnsw_hits,
2718    )?
2719    .results)
2720}
2721
2722fn build_filter_clause(
2723    column: &str,
2724    values: Option<&[&str]>,
2725    param_offset: usize,
2726) -> (String, Vec<SqlValue>) {
2727    match values {
2728        Some(values) if !values.is_empty() => {
2729            let placeholders = (0..values.len())
2730                .map(|idx| format!("?{}", param_offset + idx))
2731                .collect::<Vec<_>>();
2732            let clause = format!(" AND {} IN ({})", column, placeholders.join(", "));
2733            let params = values
2734                .iter()
2735                .map(|value| SqlValue::Text((*value).to_string()))
2736                .collect();
2737            (clause, params)
2738        }
2739        _ => (String::new(), Vec::new()),
2740    }
2741}
2742
2743/// Deduplicate results by (source_type, source_id), keeping the first occurrence.
2744pub fn deduplicate_results(results: Vec<SearchResult>) -> Vec<SearchResult> {
2745    let mut seen = HashSet::new();
2746    results
2747        .into_iter()
2748        .filter(|result| seen.insert(source_dedup_key(&result.source)))
2749        .collect()
2750}
2751
2752#[cfg(test)]
2753mod tests {
2754    use super::*;
2755
2756    fn vector_row(id: &str) -> VectorRow {
2757        VectorRow {
2758            id: id.to_string(),
2759            content: format!("content {id}"),
2760            blob: bytemuck::cast_slice(&[1.0_f32, 0.0]).to_vec(),
2761            updated_at: None,
2762            source_type: SearchSourceType::Facts,
2763            filter_namespace: Some("default".to_string()),
2764            filter_session_id: None,
2765            source: SearchSource::Fact {
2766                fact_id: id.to_string(),
2767                namespace: "default".to_string(),
2768            },
2769        }
2770    }
2771
2772    #[test]
2773    fn timestamp_parser_accepts_sql_fractional_and_rfc3339_and_warns_by_returning_none() {
2774        assert!(parse_search_timestamp("2026-05-07 12:34:56").is_some());
2775        assert!(parse_search_timestamp("2026-05-07 12:34:56.123").is_some());
2776        assert!(parse_search_timestamp("2026-05-07T12:34:56Z").is_some());
2777        assert!(parse_search_timestamp("not-a-timestamp").is_none());
2778    }
2779
2780    #[test]
2781    fn vector_scan_hard_limit_blocks_before_unbounded_scan() {
2782        let old_warn = VECTOR_SCAN_WARN_LIMIT.swap(1, Ordering::SeqCst);
2783        let old_hard = VECTOR_SCAN_BLOCK_LIMIT.swap(2, Ordering::SeqCst);
2784        let rows = ["a", "b", "c"].into_iter().map(|id| Ok(vector_row(id)));
2785        let result = scan_vector_rows(rows, &[1.0, 0.0], -1.0, "fact");
2786        VECTOR_SCAN_WARN_LIMIT.store(old_warn, Ordering::SeqCst);
2787        VECTOR_SCAN_BLOCK_LIMIT.store(old_hard, Ordering::SeqCst);
2788
2789        match result {
2790            Err(MemoryError::VectorScanLimitExceeded {
2791                table,
2792                scanned,
2793                limit,
2794            }) => {
2795                assert_eq!(table, "fact");
2796                assert_eq!(scanned, 3);
2797                assert_eq!(limit, 2);
2798            }
2799            other => panic!("expected vector scan limit error, got {other:?}"),
2800        }
2801    }
2802}