Skip to main content

zeph_memory/semantic/
recall.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::StreamExt as _;
5use zeph_llm::provider::{LlmProvider as _, Message};
6
7/// Approximate characters per token (conservative estimate for mixed content).
8const CHARS_PER_TOKEN: usize = 4;
9
10/// Target chunk size in characters (~400 tokens).
11const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
12
13/// Overlap between adjacent chunks in characters (~80 tokens).
14const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
15
16/// Split `text` into overlapping chunks suitable for embedding.
17///
18/// For text shorter than `CHUNK_CHARS`, returns a single chunk.
19/// Splits at UTF-8 character boundaries on paragraph (`\n\n`), line (`\n`),
20/// space (` `), or raw character boundaries as a last resort.
21fn chunk_text(text: &str) -> Vec<&str> {
22    if text.len() <= CHUNK_CHARS {
23        return vec![text];
24    }
25
26    let mut chunks = Vec::new();
27    let mut start = 0;
28
29    while start < text.len() {
30        let end = if start + CHUNK_CHARS >= text.len() {
31            text.len()
32        } else {
33            // Find a clean UTF-8 char boundary at or before start + CHUNK_CHARS.
34            let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
35            // Prefer to split at a paragraph or line break for cleaner chunks.
36            let slice = &text[start..boundary];
37            if let Some(pos) = slice.rfind("\n\n") {
38                start + pos + 2
39            } else if let Some(pos) = slice.rfind('\n') {
40                start + pos + 1
41            } else if let Some(pos) = slice.rfind(' ') {
42                start + pos + 1
43            } else {
44                boundary
45            }
46        };
47
48        chunks.push(&text[start..end]);
49        if end >= text.len() {
50            break;
51        }
52        // Next chunk starts with overlap.
53        let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
54        start = text.ceil_char_boundary(next);
55        if start >= end {
56            start = end; // safeguard against infinite loop
57        }
58    }
59
60    chunks
61}
62
63use crate::admission::log_admission_decision;
64use crate::embedding_store::{MessageKind, SearchFilter};
65use crate::error::MemoryError;
66use crate::types::{ConversationId, MessageId};
67
68use super::SemanticMemory;
69use super::algorithms::{apply_mmr, apply_temporal_decay};
70
71/// Tool execution metadata stored as Qdrant payload fields alongside embeddings.
72///
73/// Stored as payload — NOT prepended to content — to avoid corrupting embedding vectors.
74#[derive(Debug, Clone, Default)]
75pub struct EmbedContext {
76    pub tool_name: Option<String>,
77    pub exit_code: Option<i32>,
78    pub timestamp: Option<String>,
79}
80
81#[derive(Debug)]
82pub struct RecalledMessage {
83    pub message: Message,
84    pub score: f32,
85}
86
87impl SemanticMemory {
88    /// Save a message to `SQLite` and optionally embed and store in Qdrant.
89    ///
90    /// Returns `Ok(Some(message_id))` when admitted and persisted.
91    /// Returns `Ok(None)` when A-MAC admission control rejects the message (not an error).
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the `SQLite` save fails. Embedding failures are logged but not
96    /// propagated.
97    pub async fn remember(
98        &self,
99        conversation_id: ConversationId,
100        role: &str,
101        content: &str,
102        goal_text: Option<&str>,
103    ) -> Result<Option<MessageId>, MemoryError> {
104        // A-MAC admission gate.
105        if let Some(ref admission) = self.admission_control {
106            let decision = admission
107                .evaluate(
108                    content,
109                    role,
110                    &self.provider,
111                    self.qdrant.as_ref(),
112                    goal_text,
113                )
114                .await;
115            let preview: String = content.chars().take(100).collect();
116            log_admission_decision(&decision, &preview, role, admission.threshold());
117            if !decision.admitted {
118                return Ok(None);
119            }
120        }
121
122        let message_id = self
123            .sqlite
124            .save_message(conversation_id, role, content)
125            .await?;
126
127        self.embed_and_store_regular(message_id, conversation_id, role, content)
128            .await;
129
130        Ok(Some(message_id))
131    }
132
133    /// Save a message with pre-serialized parts JSON to `SQLite` and optionally embed in Qdrant.
134    ///
135    /// Returns `Ok((Some(message_id), embedding_stored))` when admitted and persisted.
136    /// Returns `Ok((None, false))` when A-MAC admission control rejects the message.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the `SQLite` save fails.
141    pub async fn remember_with_parts(
142        &self,
143        conversation_id: ConversationId,
144        role: &str,
145        content: &str,
146        parts_json: &str,
147        goal_text: Option<&str>,
148    ) -> Result<(Option<MessageId>, bool), MemoryError> {
149        // A-MAC admission gate.
150        if let Some(ref admission) = self.admission_control {
151            let decision = admission
152                .evaluate(
153                    content,
154                    role,
155                    &self.provider,
156                    self.qdrant.as_ref(),
157                    goal_text,
158                )
159                .await;
160            let preview: String = content.chars().take(100).collect();
161            log_admission_decision(&decision, &preview, role, admission.threshold());
162            if !decision.admitted {
163                return Ok((None, false));
164            }
165        }
166
167        let message_id = self
168            .sqlite
169            .save_message_with_parts(conversation_id, role, content, parts_json)
170            .await?;
171
172        let embedding_stored = self
173            .embed_and_store_regular(message_id, conversation_id, role, content)
174            .await;
175
176        Ok((Some(message_id), embedding_stored))
177    }
178
179    /// Save a tool output to `SQLite` and embed with tool metadata in Qdrant payload.
180    ///
181    /// Tool metadata (`tool_name`, `exit_code`, `timestamp`) is stored as Qdrant payload fields
182    /// so it is available for filtering without corrupting the embedding vector.
183    ///
184    /// Returns `Ok(Some(message_id))` when admitted and persisted.
185    /// Returns `Ok(None)` when A-MAC admission control rejects the message.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the `SQLite` save fails.
190    pub async fn remember_tool_output(
191        &self,
192        conversation_id: ConversationId,
193        role: &str,
194        content: &str,
195        parts_json: &str,
196        embed_ctx: EmbedContext,
197    ) -> Result<(Option<MessageId>, bool), MemoryError> {
198        if let Some(ref admission) = self.admission_control {
199            let decision = admission
200                .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
201                .await;
202            let preview: String = content.chars().take(100).collect();
203            log_admission_decision(&decision, &preview, role, admission.threshold());
204            if !decision.admitted {
205                return Ok((None, false));
206            }
207        }
208
209        let message_id = self
210            .sqlite
211            .save_message_with_parts(conversation_id, role, content, parts_json)
212            .await?;
213
214        let embedding_stored = self
215            .embed_chunks_with_tool_context(message_id, conversation_id, role, content, embed_ctx)
216            .await;
217
218        Ok((Some(message_id), embedding_stored))
219    }
220
221    /// Save a categorized message to `SQLite` and embed with category payload in Qdrant.
222    ///
223    /// The `category` is stored in both the `messages.category` column and as a Qdrant payload
224    /// field for recall filtering. Uses A-MAC admission gate.
225    ///
226    /// Returns `Ok(Some(message_id))` when admitted; `Ok(None)` when rejected.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the `SQLite` save fails.
231    pub async fn remember_categorized(
232        &self,
233        conversation_id: ConversationId,
234        role: &str,
235        content: &str,
236        category: Option<&str>,
237        goal_text: Option<&str>,
238    ) -> Result<Option<MessageId>, MemoryError> {
239        if let Some(ref admission) = self.admission_control {
240            let decision = admission
241                .evaluate(
242                    content,
243                    role,
244                    &self.provider,
245                    self.qdrant.as_ref(),
246                    goal_text,
247                )
248                .await;
249            let preview: String = content.chars().take(100).collect();
250            log_admission_decision(&decision, &preview, role, admission.threshold());
251            if !decision.admitted {
252                return Ok(None);
253            }
254        }
255
256        let message_id = self
257            .sqlite
258            .save_message_with_category(conversation_id, role, content, category)
259            .await?;
260
261        self.embed_and_store_with_category(message_id, conversation_id, role, content, category)
262            .await;
263
264        Ok(Some(message_id))
265    }
266
267    /// Recall messages filtered by category.
268    ///
269    /// When `category` is `None`, behaves identically to [`recall`].
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the search fails.
274    pub async fn recall_with_category(
275        &self,
276        query: &str,
277        limit: usize,
278        filter: Option<SearchFilter>,
279        category: Option<&str>,
280    ) -> Result<Vec<RecalledMessage>, MemoryError> {
281        let filter_with_category = filter.map(|mut f| {
282            f.category = category.map(str::to_owned);
283            f
284        });
285        self.recall(query, limit, filter_with_category).await
286    }
287
288    /// Embed content chunks and store each with an optional category payload field.
289    async fn embed_and_store_with_category(
290        &self,
291        message_id: MessageId,
292        conversation_id: ConversationId,
293        role: &str,
294        content: &str,
295        category: Option<&str>,
296    ) -> bool {
297        let Some(qdrant) = &self.qdrant else {
298            return false;
299        };
300        let embed_provider = self.effective_embed_provider();
301        if !embed_provider.supports_embeddings() {
302            return false;
303        }
304
305        let chunks = chunk_text(content);
306        let chunk_count = chunks.len();
307
308        let vectors = match embed_provider.embed_batch(&chunks).await {
309            Ok(v) => v,
310            Err(e) => {
311                tracing::warn!("Failed to embed categorized chunks for msg {message_id}: {e:#}");
312                return false;
313            }
314        };
315
316        let Some(first) = vectors.first() else {
317            return false;
318        };
319        let vector_size = u64::try_from(first.len()).unwrap_or(896);
320        if let Err(e) = qdrant.ensure_collection(vector_size).await {
321            tracing::warn!("Failed to ensure Qdrant collection for categorized msg: {e:#}");
322            return false;
323        }
324
325        let mut stored = false;
326        for (chunk_index, vector) in vectors.into_iter().enumerate() {
327            let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
328            match qdrant
329                .store_with_category(
330                    message_id,
331                    conversation_id,
332                    role,
333                    vector,
334                    MessageKind::Regular,
335                    &self.embedding_model,
336                    chunk_index_u32,
337                    category,
338                )
339                .await
340            {
341                Ok(_) => stored = true,
342                Err(e) => tracing::warn!(
343                    "Failed to store categorized chunk {chunk_index}/{chunk_count} \
344                     for msg {message_id}: {e:#}"
345                ),
346            }
347        }
348
349        stored
350    }
351
352    /// Embed content chunks and store each as a regular (non-tool) message vector.
353    ///
354    /// Handles: chunking → `embed_batch` → `ensure_collection` → per-chunk `store`.
355    /// Returns `true` if at least one chunk was successfully stored.
356    async fn embed_and_store_regular(
357        &self,
358        message_id: MessageId,
359        conversation_id: ConversationId,
360        role: &str,
361        content: &str,
362    ) -> bool {
363        let Some(qdrant) = &self.qdrant else {
364            return false;
365        };
366        let embed_provider = self.effective_embed_provider();
367        if !embed_provider.supports_embeddings() {
368            return false;
369        }
370
371        let chunks = chunk_text(content);
372        let chunk_count = chunks.len();
373
374        let vectors = match embed_provider.embed_batch(&chunks).await {
375            Ok(v) => v,
376            Err(e) => {
377                tracing::warn!("Failed to embed chunks for msg {message_id}: {e:#}");
378                return false;
379            }
380        };
381
382        let Some(first) = vectors.first() else {
383            return false;
384        };
385        let vector_size = u64::try_from(first.len()).unwrap_or(896);
386        if let Err(e) = qdrant.ensure_collection(vector_size).await {
387            tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
388            return false;
389        }
390
391        let mut stored = false;
392        for (chunk_index, vector) in vectors.into_iter().enumerate() {
393            let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
394            match qdrant
395                .store(
396                    message_id,
397                    conversation_id,
398                    role,
399                    vector,
400                    MessageKind::Regular,
401                    &self.embedding_model,
402                    chunk_index_u32,
403                )
404                .await
405            {
406                Ok(_) => stored = true,
407                Err(e) => tracing::warn!(
408                    "Failed to store chunk {chunk_index}/{chunk_count} \
409                     for msg {message_id}: {e:#}"
410                ),
411            }
412        }
413
414        stored
415    }
416
417    /// Embed content chunks, enriching Qdrant payload with tool metadata when present.
418    ///
419    /// Returns `true` if at least one chunk was successfully stored.
420    async fn embed_chunks_with_tool_context(
421        &self,
422        message_id: MessageId,
423        conversation_id: ConversationId,
424        role: &str,
425        content: &str,
426        embed_ctx: EmbedContext,
427    ) -> bool {
428        let Some(qdrant) = &self.qdrant else {
429            return false;
430        };
431        let embed_provider = self.effective_embed_provider();
432        if !embed_provider.supports_embeddings() {
433            return false;
434        }
435
436        let chunks = chunk_text(content);
437        let chunk_count = chunks.len();
438        let mut stored = false;
439
440        // Embed all chunks in a single batch call.
441        // Batch semantics are atomic: if the batch fails, skip embedding for this message.
442        let vectors = match embed_provider.embed_batch(&chunks).await {
443            Ok(v) => v,
444            Err(e) => {
445                tracing::warn!("Failed to embed tool-output chunks for msg {message_id}: {e:#}");
446                return false;
447            }
448        };
449
450        if let Some(first) = vectors.first() {
451            let vector_size = u64::try_from(first.len()).unwrap_or(896);
452            if let Err(e) = qdrant.ensure_collection(vector_size).await {
453                tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
454                return false;
455            }
456        }
457
458        for (chunk_index, vector) in vectors.into_iter().enumerate() {
459            let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
460            let result = if let Some(ref tool_name) = embed_ctx.tool_name {
461                qdrant
462                    .store_with_tool_context(
463                        message_id,
464                        conversation_id,
465                        role,
466                        vector,
467                        MessageKind::Regular,
468                        &self.embedding_model,
469                        chunk_index_u32,
470                        tool_name,
471                        embed_ctx.exit_code,
472                        embed_ctx.timestamp.as_deref(),
473                    )
474                    .await
475                    .map(|_| ())
476            } else {
477                qdrant
478                    .store(
479                        message_id,
480                        conversation_id,
481                        role,
482                        vector,
483                        MessageKind::Regular,
484                        &self.embedding_model,
485                        chunk_index_u32,
486                    )
487                    .await
488                    .map(|_| ())
489            };
490            match result {
491                Ok(()) => stored = true,
492                Err(e) => tracing::warn!(
493                    "Failed to store tool-output chunk {chunk_index}/{chunk_count} \
494                     for msg {message_id}: {e:#}"
495                ),
496            }
497        }
498
499        stored
500    }
501
502    /// Save a message to `SQLite` without generating an embedding.
503    ///
504    /// Use this when embedding is intentionally skipped (e.g. autosave disabled for assistant).
505    ///
506    /// # Errors
507    ///
508    /// Returns an error if the `SQLite` save fails.
509    pub async fn save_only(
510        &self,
511        conversation_id: ConversationId,
512        role: &str,
513        content: &str,
514        parts_json: &str,
515    ) -> Result<MessageId, MemoryError> {
516        self.sqlite
517            .save_message_with_parts(conversation_id, role, content, parts_json)
518            .await
519    }
520
521    /// Recall relevant messages using hybrid search (vector + FTS5 keyword).
522    ///
523    /// When Qdrant is available, runs both vector and keyword searches, then merges
524    /// results using weighted scoring. When Qdrant is unavailable, falls back to
525    /// FTS5-only keyword search.
526    ///
527    /// # Errors
528    ///
529    /// Returns an error if embedding generation, Qdrant search, or FTS5 query fails.
530    pub async fn recall(
531        &self,
532        query: &str,
533        limit: usize,
534        filter: Option<SearchFilter>,
535    ) -> Result<Vec<RecalledMessage>, MemoryError> {
536        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
537
538        tracing::debug!(
539            query_len = query.len(),
540            limit,
541            has_filter = filter.is_some(),
542            conversation_id = conversation_id.map(|c| c.0),
543            has_qdrant = self.qdrant.is_some(),
544            "recall: starting hybrid search"
545        );
546
547        let keyword_results = match self
548            .sqlite
549            .keyword_search(query, limit * 2, conversation_id)
550            .await
551        {
552            Ok(results) => results,
553            Err(e) => {
554                tracing::warn!("FTS5 keyword search failed: {e:#}");
555                Vec::new()
556            }
557        };
558
559        let vector_results = if let Some(qdrant) = &self.qdrant
560            && self.provider.supports_embeddings()
561        {
562            let query_vector = self.provider.embed(query).await?;
563            let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
564            qdrant.ensure_collection(vector_size).await?;
565            qdrant.search(&query_vector, limit * 2, filter).await?
566        } else {
567            Vec::new()
568        };
569
570        self.recall_merge_and_rank(keyword_results, vector_results, limit)
571            .await
572    }
573
574    pub(super) async fn recall_fts5_raw(
575        &self,
576        query: &str,
577        limit: usize,
578        conversation_id: Option<ConversationId>,
579    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
580        self.sqlite
581            .keyword_search(query, limit * 2, conversation_id)
582            .await
583    }
584
585    pub(super) async fn recall_vectors_raw(
586        &self,
587        query: &str,
588        limit: usize,
589        filter: Option<SearchFilter>,
590    ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
591        let Some(qdrant) = &self.qdrant else {
592            return Ok(Vec::new());
593        };
594        if !self.provider.supports_embeddings() {
595            return Ok(Vec::new());
596        }
597        let query_vector = self.provider.embed(query).await?;
598        let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
599        qdrant.ensure_collection(vector_size).await?;
600        qdrant.search(&query_vector, limit * 2, filter).await
601    }
602
603    /// Merge raw keyword and vector results, apply weighted scoring, temporal decay, and MMR
604    /// re-ranking, then resolve to `RecalledMessage` objects.
605    ///
606    /// This is the shared post-processing step used by all recall paths.
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if the `SQLite` `messages_by_ids` query fails.
611    #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
612    pub(super) async fn recall_merge_and_rank(
613        &self,
614        keyword_results: Vec<(MessageId, f64)>,
615        vector_results: Vec<crate::embedding_store::SearchResult>,
616        limit: usize,
617    ) -> Result<Vec<RecalledMessage>, MemoryError> {
618        tracing::debug!(
619            vector_count = vector_results.len(),
620            keyword_count = keyword_results.len(),
621            limit,
622            "recall: merging search results"
623        );
624
625        let mut scores: std::collections::HashMap<MessageId, f64> =
626            std::collections::HashMap::new();
627
628        if !vector_results.is_empty() {
629            let max_vs = vector_results
630                .iter()
631                .map(|r| r.score)
632                .fold(f32::NEG_INFINITY, f32::max);
633            let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
634            for r in &vector_results {
635                let normalized = f64::from(r.score / norm);
636                *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
637            }
638        }
639
640        if !keyword_results.is_empty() {
641            let max_ks = keyword_results
642                .iter()
643                .map(|r| r.1)
644                .fold(f64::NEG_INFINITY, f64::max);
645            let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
646            for &(msg_id, score) in &keyword_results {
647                let normalized = score / norm;
648                *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
649            }
650        }
651
652        if scores.is_empty() {
653            tracing::debug!("recall: empty merge, no overlapping scores");
654            return Ok(Vec::new());
655        }
656
657        let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
658        ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
659
660        tracing::debug!(
661            merged = ranked.len(),
662            top_score = ranked.first().map(|r| r.1),
663            bottom_score = ranked.last().map(|r| r.1),
664            vector_weight = %self.vector_weight,
665            keyword_weight = %self.keyword_weight,
666            "recall: weighted merge complete"
667        );
668
669        if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
670            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
671            match self.sqlite.message_timestamps(&ids).await {
672                Ok(timestamps) => {
673                    apply_temporal_decay(
674                        &mut ranked,
675                        &timestamps,
676                        self.temporal_decay_half_life_days,
677                    );
678                    ranked
679                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
680                    tracing::debug!(
681                        half_life_days = self.temporal_decay_half_life_days,
682                        top_score_after = ranked.first().map(|r| r.1),
683                        "recall: temporal decay applied"
684                    );
685                }
686                Err(e) => {
687                    tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
688                }
689            }
690        }
691
692        if self.mmr_enabled && !vector_results.is_empty() {
693            if let Some(qdrant) = &self.qdrant {
694                let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
695                match qdrant.get_vectors(&ids).await {
696                    Ok(vec_map) if !vec_map.is_empty() => {
697                        let ranked_len_before = ranked.len();
698                        ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
699                        tracing::debug!(
700                            before = ranked_len_before,
701                            after = ranked.len(),
702                            lambda = %self.mmr_lambda,
703                            "recall: mmr re-ranked"
704                        );
705                    }
706                    Ok(_) => {
707                        ranked.truncate(limit);
708                    }
709                    Err(e) => {
710                        tracing::warn!("MMR: failed to fetch vectors: {e:#}");
711                        ranked.truncate(limit);
712                    }
713                }
714            } else {
715                ranked.truncate(limit);
716            }
717        } else {
718            ranked.truncate(limit);
719        }
720
721        if self.importance_enabled && !ranked.is_empty() {
722            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
723            match self.sqlite.fetch_importance_scores(&ids).await {
724                Ok(scores) => {
725                    for (msg_id, score) in &mut ranked {
726                        if let Some(&imp) = scores.get(msg_id) {
727                            *score += imp * self.importance_weight;
728                        }
729                    }
730                    ranked
731                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
732                    tracing::debug!(
733                        importance_weight = %self.importance_weight,
734                        "recall: importance scores blended"
735                    );
736                }
737                Err(e) => {
738                    tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
739                }
740            }
741        }
742
743        // Apply tier boost: semantic-tier messages receive an additive bonus so distilled facts
744        // rank above episodic messages with the same base score. Additive (not multiplicative)
745        // so the effect is consistent regardless of base score magnitude.
746        if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
747            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
748            match self.sqlite.fetch_tiers(&ids).await {
749                Ok(tiers) => {
750                    let bonus = self.tier_boost_semantic - 1.0;
751                    let mut boosted = false;
752                    for (msg_id, score) in &mut ranked {
753                        if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
754                            *score += bonus;
755                            boosted = true;
756                        }
757                    }
758                    if boosted {
759                        ranked.sort_by(|a, b| {
760                            b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
761                        });
762                        tracing::debug!(
763                            tier_boost = %self.tier_boost_semantic,
764                            "recall: semantic tier boost applied"
765                        );
766                    }
767                }
768                Err(e) => {
769                    tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
770                }
771            }
772        }
773
774        let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
775
776        if !ids.is_empty()
777            && let Err(e) = self.batch_increment_access_count(ids.clone()).await
778        {
779            tracing::warn!("recall: failed to increment access counts: {e:#}");
780        }
781
782        // Update RL admission training data: mark recalled messages as positive examples.
783        if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
784            tracing::debug!(
785                error = %e,
786                "recall: failed to mark training data as recalled (non-fatal)"
787            );
788        }
789
790        let messages = self.sqlite.messages_by_ids(&ids).await?;
791        let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
792
793        let recalled: Vec<RecalledMessage> = ranked
794            .iter()
795            .filter_map(|(msg_id, score)| {
796                msg_map.get(msg_id).map(|msg| RecalledMessage {
797                    message: msg.clone(),
798                    #[expect(clippy::cast_possible_truncation)]
799                    score: *score as f32,
800                })
801            })
802            .collect();
803
804        tracing::debug!(final_count = recalled.len(), "recall: final results");
805
806        Ok(recalled)
807    }
808
809    /// Recall messages using query-aware routing.
810    ///
811    /// Delegates to FTS5-only, vector-only, or hybrid search based on the router decision,
812    /// then runs the shared merge and ranking pipeline.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if any underlying search or database operation fails.
817    pub async fn recall_routed(
818        &self,
819        query: &str,
820        limit: usize,
821        filter: Option<SearchFilter>,
822        router: &dyn crate::router::MemoryRouter,
823    ) -> Result<Vec<RecalledMessage>, MemoryError> {
824        use crate::router::MemoryRoute;
825
826        let route = router.route(query);
827        tracing::debug!(?route, query_len = query.len(), "memory routing decision");
828
829        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
830
831        let (keyword_results, vector_results): (
832            Vec<(MessageId, f64)>,
833            Vec<crate::embedding_store::SearchResult>,
834        ) = match route {
835            MemoryRoute::Keyword => {
836                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
837                (kw, Vec::new())
838            }
839            MemoryRoute::Semantic => {
840                let vr = self.recall_vectors_raw(query, limit, filter).await?;
841                (Vec::new(), vr)
842            }
843            MemoryRoute::Hybrid => {
844                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
845                    Ok(r) => r,
846                    Err(e) => {
847                        tracing::warn!("FTS5 keyword search failed: {e:#}");
848                        Vec::new()
849                    }
850                };
851                let vr = self.recall_vectors_raw(query, limit, filter).await?;
852                (kw, vr)
853            }
854            // Episodic: FTS5 keyword search with an optional timestamp-range filter.
855            // Temporal keywords are stripped from the query before passing to FTS5 to
856            // prevent BM25 score distortion (e.g. "yesterday" matching messages that
857            // literally contain the word "yesterday" regardless of actual relevance).
858            // Vector search is skipped for speed; temporal decay in recall_merge_and_rank
859            // provides recency boosting for the FTS5 results.
860            // Known trade-off (MVP): semantically similar but lexically different messages
861            // may be missed. See issue #1629 for a future hybrid_temporal mode.
862            MemoryRoute::Episodic => {
863                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
864                let cleaned = crate::router::strip_temporal_keywords(query);
865                let search_query = if cleaned.is_empty() { query } else { &cleaned };
866                let kw = if let Some(ref r) = range {
867                    self.sqlite
868                        .keyword_search_with_time_range(
869                            search_query,
870                            limit,
871                            conversation_id,
872                            r.after.as_deref(),
873                            r.before.as_deref(),
874                        )
875                        .await?
876                } else {
877                    self.recall_fts5_raw(search_query, limit, conversation_id)
878                        .await?
879                };
880                tracing::debug!(
881                    has_range = range.is_some(),
882                    cleaned_query = %search_query,
883                    keyword_count = kw.len(),
884                    "recall: episodic path"
885                );
886                (kw, Vec::new())
887            }
888            // Graph routing triggers graph_recall separately in agent/context.rs.
889            // For the message-based recall, behave like Hybrid.
890            MemoryRoute::Graph => {
891                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
892                    Ok(r) => r,
893                    Err(e) => {
894                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
895                        Vec::new()
896                    }
897                };
898                let vr = self.recall_vectors_raw(query, limit, filter).await?;
899                (kw, vr)
900            }
901        };
902
903        tracing::debug!(
904            keyword_count = keyword_results.len(),
905            vector_count = vector_results.len(),
906            "recall: routed search results"
907        );
908
909        self.recall_merge_and_rank(keyword_results, vector_results, limit)
910            .await
911    }
912
913    /// Async variant of [`recall_routed`](Self::recall_routed) that uses
914    /// [`AsyncMemoryRouter::route_async`](crate::router::AsyncMemoryRouter::route_async) when
915    /// available, enabling LLM-based routing for `LlmRouter` and `HybridRouter`.
916    ///
917    /// Falls back to [`recall_routed`](Self::recall_routed) for routers that only implement
918    /// the sync `MemoryRouter` trait (e.g. `HeuristicRouter`).
919    ///
920    /// # Errors
921    ///
922    /// Returns an error if any underlying search or database operation fails.
923    pub async fn recall_routed_async(
924        &self,
925        query: &str,
926        limit: usize,
927        filter: Option<crate::embedding_store::SearchFilter>,
928        router: &dyn crate::router::AsyncMemoryRouter,
929    ) -> Result<Vec<RecalledMessage>, MemoryError> {
930        use crate::router::MemoryRoute;
931
932        let decision = router.route_async(query).await;
933        let route = decision.route;
934        tracing::debug!(
935            ?route,
936            confidence = decision.confidence,
937            query_len = query.len(),
938            "memory routing decision (async)"
939        );
940
941        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
942
943        let (keyword_results, vector_results): (
944            Vec<(crate::types::MessageId, f64)>,
945            Vec<crate::embedding_store::SearchResult>,
946        ) = match route {
947            MemoryRoute::Keyword => {
948                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
949                (kw, Vec::new())
950            }
951            MemoryRoute::Semantic => {
952                let vr = self.recall_vectors_raw(query, limit, filter).await?;
953                (Vec::new(), vr)
954            }
955            MemoryRoute::Hybrid => {
956                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
957                    Ok(r) => r,
958                    Err(e) => {
959                        tracing::warn!("FTS5 keyword search failed: {e:#}");
960                        Vec::new()
961                    }
962                };
963                let vr = self.recall_vectors_raw(query, limit, filter).await?;
964                (kw, vr)
965            }
966            MemoryRoute::Episodic => {
967                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
968                let cleaned = crate::router::strip_temporal_keywords(query);
969                let search_query = if cleaned.is_empty() { query } else { &cleaned };
970                let kw = if let Some(ref r) = range {
971                    self.sqlite
972                        .keyword_search_with_time_range(
973                            search_query,
974                            limit,
975                            conversation_id,
976                            r.after.as_deref(),
977                            r.before.as_deref(),
978                        )
979                        .await?
980                } else {
981                    self.recall_fts5_raw(search_query, limit, conversation_id)
982                        .await?
983                };
984                (kw, Vec::new())
985            }
986            MemoryRoute::Graph => {
987                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
988                    Ok(r) => r,
989                    Err(e) => {
990                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
991                        Vec::new()
992                    }
993                };
994                let vr = self.recall_vectors_raw(query, limit, filter).await?;
995                (kw, vr)
996            }
997        };
998
999        tracing::debug!(
1000            keyword_count = keyword_results.len(),
1001            vector_count = vector_results.len(),
1002            "recall: routed search results (async)"
1003        );
1004
1005        self.recall_merge_and_rank(keyword_results, vector_results, limit)
1006            .await
1007    }
1008
1009    /// Retrieve graph facts relevant to `query` via BFS traversal.
1010    ///
1011    /// Returns an empty `Vec` if no `graph_store` is configured.
1012    ///
1013    /// # Parameters
1014    ///
1015    /// - `at_timestamp`: when `Some`, only edges valid at that `SQLite` datetime string are returned.
1016    ///   When `None`, only currently active edges are used.
1017    /// - `temporal_decay_rate`: non-negative decay rate (1/day). `0.0` preserves original ordering.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns an error if the underlying graph query fails.
1022    pub async fn recall_graph(
1023        &self,
1024        query: &str,
1025        limit: usize,
1026        max_hops: u32,
1027        at_timestamp: Option<&str>,
1028        temporal_decay_rate: f64,
1029        edge_types: &[crate::graph::EdgeType],
1030    ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
1031        let Some(store) = &self.graph_store else {
1032            return Ok(Vec::new());
1033        };
1034
1035        tracing::debug!(
1036            query_len = query.len(),
1037            limit,
1038            max_hops,
1039            "graph: starting recall"
1040        );
1041
1042        let results = crate::graph::retrieval::graph_recall(
1043            store,
1044            self.qdrant.as_deref(),
1045            &self.provider,
1046            query,
1047            limit,
1048            max_hops,
1049            at_timestamp,
1050            temporal_decay_rate,
1051            edge_types,
1052        )
1053        .await?;
1054
1055        tracing::debug!(result_count = results.len(), "graph: recall complete");
1056
1057        Ok(results)
1058    }
1059
1060    /// Retrieve graph facts via SYNAPSE spreading activation.
1061    ///
1062    /// Delegates to [`crate::graph::retrieval::graph_recall_activated`].
1063    /// Used in place of [`recall_graph`] when `spreading_activation.enabled = true`.
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns an error if the underlying graph query fails.
1068    pub async fn recall_graph_activated(
1069        &self,
1070        query: &str,
1071        limit: usize,
1072        params: crate::graph::SpreadingActivationParams,
1073        edge_types: &[crate::graph::EdgeType],
1074    ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
1075        let Some(store) = &self.graph_store else {
1076            return Ok(Vec::new());
1077        };
1078
1079        tracing::debug!(
1080            query_len = query.len(),
1081            limit,
1082            "spreading activation: starting graph recall"
1083        );
1084
1085        let embeddings = self.qdrant.as_deref();
1086        let results = crate::graph::retrieval::graph_recall_activated(
1087            store,
1088            embeddings,
1089            &self.provider,
1090            query,
1091            limit,
1092            params,
1093            edge_types,
1094        )
1095        .await?;
1096
1097        tracing::debug!(
1098            result_count = results.len(),
1099            "spreading activation: graph recall complete"
1100        );
1101
1102        Ok(results)
1103    }
1104
1105    /// Increment access count and update `last_accessed` for a batch of message IDs.
1106    ///
1107    /// Skips the update if `message_ids` is empty to avoid an invalid `IN ()` clause.
1108    ///
1109    /// # Errors
1110    ///
1111    /// Returns an error if the `SQLite` update fails.
1112    async fn batch_increment_access_count(
1113        &self,
1114        message_ids: Vec<MessageId>,
1115    ) -> Result<(), MemoryError> {
1116        if message_ids.is_empty() {
1117            return Ok(());
1118        }
1119        self.sqlite.increment_access_counts(&message_ids).await
1120    }
1121
1122    /// Check whether an embedding exists for a given message ID.
1123    ///
1124    /// # Errors
1125    ///
1126    /// Returns an error if the `SQLite` query fails.
1127    pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
1128        match &self.qdrant {
1129            Some(qdrant) => qdrant.has_embedding(message_id).await,
1130            None => Ok(false),
1131        }
1132    }
1133
1134    /// Embed all messages that do not yet have embeddings.
1135    ///
1136    /// Streams up to 1 000 unembedded rows from `SQLite` one at a time, embedding and storing
1137    /// each before advancing the cursor. This avoids loading all message content into memory
1138    /// at once, which can cause a significant RAM spike when messages contain large tool output
1139    /// or code blocks.
1140    ///
1141    /// Returns the count of successfully embedded messages.
1142    ///
1143    /// # Errors
1144    ///
1145    /// Returns an error if collection initialization or the streaming query setup fails.
1146    /// Individual embedding failures are logged but do not stop processing.
1147    pub async fn embed_missing(&self) -> Result<usize, MemoryError> {
1148        if self.qdrant.is_none() || !self.effective_embed_provider().supports_embeddings() {
1149            return Ok(0);
1150        }
1151
1152        let mut stream = std::pin::pin!(self.sqlite.stream_unembedded_messages(1000));
1153
1154        let mut count = 0usize;
1155        let mut total = 0usize;
1156        while let Some(row) = stream.next().await {
1157            let (msg_id, conversation_id, role, content) = match row {
1158                Ok(r) => r,
1159                Err(e) => {
1160                    tracing::warn!("embed_missing: failed to read row: {e:#}");
1161                    continue;
1162                }
1163            };
1164            total += 1;
1165            if self
1166                .embed_and_store_regular(msg_id, conversation_id, &role, &content)
1167                .await
1168            {
1169                count += 1;
1170            }
1171        }
1172
1173        if total > 0 {
1174            tracing::info!("Embedded {count}/{total} missing messages");
1175        }
1176        Ok(count)
1177    }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182    use super::*;
1183
1184    #[test]
1185    fn embed_context_default_all_none() {
1186        let ctx = EmbedContext::default();
1187        assert!(ctx.tool_name.is_none());
1188        assert!(ctx.exit_code.is_none());
1189        assert!(ctx.timestamp.is_none());
1190    }
1191
1192    #[test]
1193    fn embed_context_fields_set_correctly() {
1194        let ctx = EmbedContext {
1195            tool_name: Some("shell".to_string()),
1196            exit_code: Some(0),
1197            timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1198        };
1199        assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1200        assert_eq!(ctx.exit_code, Some(0));
1201        assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1202    }
1203
1204    #[test]
1205    fn embed_context_non_zero_exit_code() {
1206        let ctx = EmbedContext {
1207            tool_name: Some("shell".to_string()),
1208            exit_code: Some(1),
1209            timestamp: None,
1210        };
1211        assert_eq!(ctx.exit_code, Some(1));
1212        assert!(ctx.timestamp.is_none());
1213    }
1214}