Skip to main content

zeph_memory/semantic/
recall.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::StreamExt as _;
5use zeph_llm::provider::{LlmProvider as _, Message};
6
7/// Approximate characters per token (conservative estimate for mixed content).
8const CHARS_PER_TOKEN: usize = 4;
9
10/// Target chunk size in characters (~400 tokens).
11const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
12
13/// Overlap between adjacent chunks in characters (~80 tokens).
14const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
15
16/// Split `text` into overlapping chunks suitable for embedding.
17///
18/// For text shorter than `CHUNK_CHARS`, returns a single chunk.
19/// Splits at UTF-8 character boundaries on paragraph (`\n\n`), line (`\n`),
20/// space (` `), or raw character boundaries as a last resort.
21fn chunk_text(text: &str) -> Vec<&str> {
22    if text.len() <= CHUNK_CHARS {
23        return vec![text];
24    }
25
26    let mut chunks = Vec::new();
27    let mut start = 0;
28
29    while start < text.len() {
30        let end = if start + CHUNK_CHARS >= text.len() {
31            text.len()
32        } else {
33            // Find a clean UTF-8 char boundary at or before start + CHUNK_CHARS.
34            let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
35            // Prefer to split at a paragraph or line break for cleaner chunks.
36            let slice = &text[start..boundary];
37            if let Some(pos) = slice.rfind("\n\n") {
38                start + pos + 2
39            } else if let Some(pos) = slice.rfind('\n') {
40                start + pos + 1
41            } else if let Some(pos) = slice.rfind(' ') {
42                start + pos + 1
43            } else {
44                boundary
45            }
46        };
47
48        chunks.push(&text[start..end]);
49        if end >= text.len() {
50            break;
51        }
52        // Next chunk starts with overlap.
53        let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
54        start = text.ceil_char_boundary(next);
55        if start >= end {
56            start = end; // safeguard against infinite loop
57        }
58    }
59
60    chunks
61}
62
63use crate::admission::log_admission_decision;
64use crate::embedding_store::{MessageKind, SearchFilter};
65use crate::error::MemoryError;
66use crate::types::{ConversationId, MessageId};
67
68use super::SemanticMemory;
69use super::algorithms::{apply_mmr, apply_temporal_decay};
70
71/// Tool execution metadata stored as Qdrant payload fields alongside embeddings.
72///
73/// Stored as payload — NOT prepended to content — to avoid corrupting embedding vectors.
74#[derive(Debug, Clone, Default)]
75pub struct EmbedContext {
76    pub tool_name: Option<String>,
77    pub exit_code: Option<i32>,
78    pub timestamp: Option<String>,
79}
80
81#[derive(Debug)]
82pub struct RecalledMessage {
83    pub message: Message,
84    pub score: f32,
85}
86
87impl SemanticMemory {
88    /// Save a message to `SQLite` and optionally embed and store in Qdrant.
89    ///
90    /// Returns `Ok(Some(message_id))` when admitted and persisted.
91    /// Returns `Ok(None)` when A-MAC admission control rejects the message (not an error).
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the `SQLite` save fails. Embedding failures are logged but not
96    /// propagated.
97    pub async fn remember(
98        &self,
99        conversation_id: ConversationId,
100        role: &str,
101        content: &str,
102        goal_text: Option<&str>,
103    ) -> Result<Option<MessageId>, MemoryError> {
104        // A-MAC admission gate.
105        if let Some(ref admission) = self.admission_control {
106            let decision = admission
107                .evaluate(
108                    content,
109                    role,
110                    &self.provider,
111                    self.qdrant.as_ref(),
112                    goal_text,
113                )
114                .await;
115            let preview: String = content.chars().take(100).collect();
116            log_admission_decision(&decision, &preview, role, admission.threshold());
117            if !decision.admitted {
118                return Ok(None);
119            }
120        }
121
122        let message_id = self
123            .sqlite
124            .save_message(conversation_id, role, content)
125            .await?;
126
127        self.embed_and_store_regular(message_id, conversation_id, role, content)
128            .await;
129
130        Ok(Some(message_id))
131    }
132
133    /// Save a message with pre-serialized parts JSON to `SQLite` and optionally embed in Qdrant.
134    ///
135    /// Returns `Ok((Some(message_id), embedding_stored))` when admitted and persisted.
136    /// Returns `Ok((None, false))` when A-MAC admission control rejects the message.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the `SQLite` save fails.
141    pub async fn remember_with_parts(
142        &self,
143        conversation_id: ConversationId,
144        role: &str,
145        content: &str,
146        parts_json: &str,
147        goal_text: Option<&str>,
148    ) -> Result<(Option<MessageId>, bool), MemoryError> {
149        // A-MAC admission gate.
150        if let Some(ref admission) = self.admission_control {
151            let decision = admission
152                .evaluate(
153                    content,
154                    role,
155                    &self.provider,
156                    self.qdrant.as_ref(),
157                    goal_text,
158                )
159                .await;
160            let preview: String = content.chars().take(100).collect();
161            log_admission_decision(&decision, &preview, role, admission.threshold());
162            if !decision.admitted {
163                return Ok((None, false));
164            }
165        }
166
167        let message_id = self
168            .sqlite
169            .save_message_with_parts(conversation_id, role, content, parts_json)
170            .await?;
171
172        let embedding_stored = self
173            .embed_and_store_regular(message_id, conversation_id, role, content)
174            .await;
175
176        Ok((Some(message_id), embedding_stored))
177    }
178
179    /// Save a tool output to `SQLite` and embed with tool metadata in Qdrant payload.
180    ///
181    /// Tool metadata (`tool_name`, `exit_code`, `timestamp`) is stored as Qdrant payload fields
182    /// so it is available for filtering without corrupting the embedding vector.
183    ///
184    /// Returns `Ok(Some(message_id))` when admitted and persisted.
185    /// Returns `Ok(None)` when A-MAC admission control rejects the message.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the `SQLite` save fails.
190    pub async fn remember_tool_output(
191        &self,
192        conversation_id: ConversationId,
193        role: &str,
194        content: &str,
195        parts_json: &str,
196        embed_ctx: EmbedContext,
197    ) -> Result<(Option<MessageId>, bool), MemoryError> {
198        if let Some(ref admission) = self.admission_control {
199            let decision = admission
200                .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
201                .await;
202            let preview: String = content.chars().take(100).collect();
203            log_admission_decision(&decision, &preview, role, admission.threshold());
204            if !decision.admitted {
205                return Ok((None, false));
206            }
207        }
208
209        let message_id = self
210            .sqlite
211            .save_message_with_parts(conversation_id, role, content, parts_json)
212            .await?;
213
214        let embedding_stored = self
215            .embed_chunks_with_tool_context(message_id, conversation_id, role, content, embed_ctx)
216            .await;
217
218        Ok((Some(message_id), embedding_stored))
219    }
220
221    /// Embed content chunks and store each as a regular (non-tool) message vector.
222    ///
223    /// Handles: chunking → `embed_batch` → `ensure_collection` → per-chunk `store`.
224    /// Returns `true` if at least one chunk was successfully stored.
225    async fn embed_and_store_regular(
226        &self,
227        message_id: MessageId,
228        conversation_id: ConversationId,
229        role: &str,
230        content: &str,
231    ) -> bool {
232        let Some(qdrant) = &self.qdrant else {
233            return false;
234        };
235        let embed_provider = self.effective_embed_provider();
236        if !embed_provider.supports_embeddings() {
237            return false;
238        }
239
240        let chunks = chunk_text(content);
241        let chunk_count = chunks.len();
242
243        let vectors = match embed_provider.embed_batch(&chunks).await {
244            Ok(v) => v,
245            Err(e) => {
246                tracing::warn!("Failed to embed chunks for msg {message_id}: {e:#}");
247                return false;
248            }
249        };
250
251        let Some(first) = vectors.first() else {
252            return false;
253        };
254        let vector_size = u64::try_from(first.len()).unwrap_or(896);
255        if let Err(e) = qdrant.ensure_collection(vector_size).await {
256            tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
257            return false;
258        }
259
260        let mut stored = false;
261        for (chunk_index, vector) in vectors.into_iter().enumerate() {
262            let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
263            match qdrant
264                .store(
265                    message_id,
266                    conversation_id,
267                    role,
268                    vector,
269                    MessageKind::Regular,
270                    &self.embedding_model,
271                    chunk_index_u32,
272                )
273                .await
274            {
275                Ok(_) => stored = true,
276                Err(e) => tracing::warn!(
277                    "Failed to store chunk {chunk_index}/{chunk_count} \
278                     for msg {message_id}: {e:#}"
279                ),
280            }
281        }
282
283        stored
284    }
285
286    /// Embed content chunks, enriching Qdrant payload with tool metadata when present.
287    ///
288    /// Returns `true` if at least one chunk was successfully stored.
289    async fn embed_chunks_with_tool_context(
290        &self,
291        message_id: MessageId,
292        conversation_id: ConversationId,
293        role: &str,
294        content: &str,
295        embed_ctx: EmbedContext,
296    ) -> bool {
297        let Some(qdrant) = &self.qdrant else {
298            return false;
299        };
300        let embed_provider = self.effective_embed_provider();
301        if !embed_provider.supports_embeddings() {
302            return false;
303        }
304
305        let chunks = chunk_text(content);
306        let chunk_count = chunks.len();
307        let mut stored = false;
308
309        // Embed all chunks in a single batch call.
310        // Batch semantics are atomic: if the batch fails, skip embedding for this message.
311        let vectors = match embed_provider.embed_batch(&chunks).await {
312            Ok(v) => v,
313            Err(e) => {
314                tracing::warn!("Failed to embed tool-output chunks for msg {message_id}: {e:#}");
315                return false;
316            }
317        };
318
319        if let Some(first) = vectors.first() {
320            let vector_size = u64::try_from(first.len()).unwrap_or(896);
321            if let Err(e) = qdrant.ensure_collection(vector_size).await {
322                tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
323                return false;
324            }
325        }
326
327        for (chunk_index, vector) in vectors.into_iter().enumerate() {
328            let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
329            let result = if let Some(ref tool_name) = embed_ctx.tool_name {
330                qdrant
331                    .store_with_tool_context(
332                        message_id,
333                        conversation_id,
334                        role,
335                        vector,
336                        MessageKind::Regular,
337                        &self.embedding_model,
338                        chunk_index_u32,
339                        tool_name,
340                        embed_ctx.exit_code,
341                        embed_ctx.timestamp.as_deref(),
342                    )
343                    .await
344                    .map(|_| ())
345            } else {
346                qdrant
347                    .store(
348                        message_id,
349                        conversation_id,
350                        role,
351                        vector,
352                        MessageKind::Regular,
353                        &self.embedding_model,
354                        chunk_index_u32,
355                    )
356                    .await
357                    .map(|_| ())
358            };
359            match result {
360                Ok(()) => stored = true,
361                Err(e) => tracing::warn!(
362                    "Failed to store tool-output chunk {chunk_index}/{chunk_count} \
363                     for msg {message_id}: {e:#}"
364                ),
365            }
366        }
367
368        stored
369    }
370
371    /// Save a message to `SQLite` without generating an embedding.
372    ///
373    /// Use this when embedding is intentionally skipped (e.g. autosave disabled for assistant).
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if the `SQLite` save fails.
378    pub async fn save_only(
379        &self,
380        conversation_id: ConversationId,
381        role: &str,
382        content: &str,
383        parts_json: &str,
384    ) -> Result<MessageId, MemoryError> {
385        self.sqlite
386            .save_message_with_parts(conversation_id, role, content, parts_json)
387            .await
388    }
389
390    /// Recall relevant messages using hybrid search (vector + FTS5 keyword).
391    ///
392    /// When Qdrant is available, runs both vector and keyword searches, then merges
393    /// results using weighted scoring. When Qdrant is unavailable, falls back to
394    /// FTS5-only keyword search.
395    ///
396    /// # Errors
397    ///
398    /// Returns an error if embedding generation, Qdrant search, or FTS5 query fails.
399    pub async fn recall(
400        &self,
401        query: &str,
402        limit: usize,
403        filter: Option<SearchFilter>,
404    ) -> Result<Vec<RecalledMessage>, MemoryError> {
405        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
406
407        tracing::debug!(
408            query_len = query.len(),
409            limit,
410            has_filter = filter.is_some(),
411            conversation_id = conversation_id.map(|c| c.0),
412            has_qdrant = self.qdrant.is_some(),
413            "recall: starting hybrid search"
414        );
415
416        let keyword_results = match self
417            .sqlite
418            .keyword_search(query, limit * 2, conversation_id)
419            .await
420        {
421            Ok(results) => results,
422            Err(e) => {
423                tracing::warn!("FTS5 keyword search failed: {e:#}");
424                Vec::new()
425            }
426        };
427
428        let vector_results = if let Some(qdrant) = &self.qdrant
429            && self.provider.supports_embeddings()
430        {
431            let query_vector = self.provider.embed(query).await?;
432            let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
433            qdrant.ensure_collection(vector_size).await?;
434            qdrant.search(&query_vector, limit * 2, filter).await?
435        } else {
436            Vec::new()
437        };
438
439        self.recall_merge_and_rank(keyword_results, vector_results, limit)
440            .await
441    }
442
443    pub(super) async fn recall_fts5_raw(
444        &self,
445        query: &str,
446        limit: usize,
447        conversation_id: Option<ConversationId>,
448    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
449        self.sqlite
450            .keyword_search(query, limit * 2, conversation_id)
451            .await
452    }
453
454    pub(super) async fn recall_vectors_raw(
455        &self,
456        query: &str,
457        limit: usize,
458        filter: Option<SearchFilter>,
459    ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
460        let Some(qdrant) = &self.qdrant else {
461            return Ok(Vec::new());
462        };
463        if !self.provider.supports_embeddings() {
464            return Ok(Vec::new());
465        }
466        let query_vector = self.provider.embed(query).await?;
467        let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
468        qdrant.ensure_collection(vector_size).await?;
469        qdrant.search(&query_vector, limit * 2, filter).await
470    }
471
472    /// Merge raw keyword and vector results, apply weighted scoring, temporal decay, and MMR
473    /// re-ranking, then resolve to `RecalledMessage` objects.
474    ///
475    /// This is the shared post-processing step used by all recall paths.
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the `SQLite` `messages_by_ids` query fails.
480    #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
481    pub(super) async fn recall_merge_and_rank(
482        &self,
483        keyword_results: Vec<(MessageId, f64)>,
484        vector_results: Vec<crate::embedding_store::SearchResult>,
485        limit: usize,
486    ) -> Result<Vec<RecalledMessage>, MemoryError> {
487        tracing::debug!(
488            vector_count = vector_results.len(),
489            keyword_count = keyword_results.len(),
490            limit,
491            "recall: merging search results"
492        );
493
494        let mut scores: std::collections::HashMap<MessageId, f64> =
495            std::collections::HashMap::new();
496
497        if !vector_results.is_empty() {
498            let max_vs = vector_results
499                .iter()
500                .map(|r| r.score)
501                .fold(f32::NEG_INFINITY, f32::max);
502            let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
503            for r in &vector_results {
504                let normalized = f64::from(r.score / norm);
505                *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
506            }
507        }
508
509        if !keyword_results.is_empty() {
510            let max_ks = keyword_results
511                .iter()
512                .map(|r| r.1)
513                .fold(f64::NEG_INFINITY, f64::max);
514            let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
515            for &(msg_id, score) in &keyword_results {
516                let normalized = score / norm;
517                *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
518            }
519        }
520
521        if scores.is_empty() {
522            tracing::debug!("recall: empty merge, no overlapping scores");
523            return Ok(Vec::new());
524        }
525
526        let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
527        ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
528
529        tracing::debug!(
530            merged = ranked.len(),
531            top_score = ranked.first().map(|r| r.1),
532            bottom_score = ranked.last().map(|r| r.1),
533            vector_weight = %self.vector_weight,
534            keyword_weight = %self.keyword_weight,
535            "recall: weighted merge complete"
536        );
537
538        if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
539            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
540            match self.sqlite.message_timestamps(&ids).await {
541                Ok(timestamps) => {
542                    apply_temporal_decay(
543                        &mut ranked,
544                        &timestamps,
545                        self.temporal_decay_half_life_days,
546                    );
547                    ranked
548                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
549                    tracing::debug!(
550                        half_life_days = self.temporal_decay_half_life_days,
551                        top_score_after = ranked.first().map(|r| r.1),
552                        "recall: temporal decay applied"
553                    );
554                }
555                Err(e) => {
556                    tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
557                }
558            }
559        }
560
561        if self.mmr_enabled && !vector_results.is_empty() {
562            if let Some(qdrant) = &self.qdrant {
563                let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
564                match qdrant.get_vectors(&ids).await {
565                    Ok(vec_map) if !vec_map.is_empty() => {
566                        let ranked_len_before = ranked.len();
567                        ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
568                        tracing::debug!(
569                            before = ranked_len_before,
570                            after = ranked.len(),
571                            lambda = %self.mmr_lambda,
572                            "recall: mmr re-ranked"
573                        );
574                    }
575                    Ok(_) => {
576                        ranked.truncate(limit);
577                    }
578                    Err(e) => {
579                        tracing::warn!("MMR: failed to fetch vectors: {e:#}");
580                        ranked.truncate(limit);
581                    }
582                }
583            } else {
584                ranked.truncate(limit);
585            }
586        } else {
587            ranked.truncate(limit);
588        }
589
590        if self.importance_enabled && !ranked.is_empty() {
591            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
592            match self.sqlite.fetch_importance_scores(&ids).await {
593                Ok(scores) => {
594                    for (msg_id, score) in &mut ranked {
595                        if let Some(&imp) = scores.get(msg_id) {
596                            *score += imp * self.importance_weight;
597                        }
598                    }
599                    ranked
600                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
601                    tracing::debug!(
602                        importance_weight = %self.importance_weight,
603                        "recall: importance scores blended"
604                    );
605                }
606                Err(e) => {
607                    tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
608                }
609            }
610        }
611
612        // Apply tier boost: semantic-tier messages receive an additive bonus so distilled facts
613        // rank above episodic messages with the same base score. Additive (not multiplicative)
614        // so the effect is consistent regardless of base score magnitude.
615        if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
616            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
617            match self.sqlite.fetch_tiers(&ids).await {
618                Ok(tiers) => {
619                    let bonus = self.tier_boost_semantic - 1.0;
620                    let mut boosted = false;
621                    for (msg_id, score) in &mut ranked {
622                        if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
623                            *score += bonus;
624                            boosted = true;
625                        }
626                    }
627                    if boosted {
628                        ranked.sort_by(|a, b| {
629                            b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
630                        });
631                        tracing::debug!(
632                            tier_boost = %self.tier_boost_semantic,
633                            "recall: semantic tier boost applied"
634                        );
635                    }
636                }
637                Err(e) => {
638                    tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
639                }
640            }
641        }
642
643        let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
644
645        if !ids.is_empty()
646            && let Err(e) = self.batch_increment_access_count(ids.clone()).await
647        {
648            tracing::warn!("recall: failed to increment access counts: {e:#}");
649        }
650
651        // Update RL admission training data: mark recalled messages as positive examples.
652        if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
653            tracing::debug!(
654                error = %e,
655                "recall: failed to mark training data as recalled (non-fatal)"
656            );
657        }
658
659        let messages = self.sqlite.messages_by_ids(&ids).await?;
660        let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
661
662        let recalled: Vec<RecalledMessage> = ranked
663            .iter()
664            .filter_map(|(msg_id, score)| {
665                msg_map.get(msg_id).map(|msg| RecalledMessage {
666                    message: msg.clone(),
667                    #[expect(clippy::cast_possible_truncation)]
668                    score: *score as f32,
669                })
670            })
671            .collect();
672
673        tracing::debug!(final_count = recalled.len(), "recall: final results");
674
675        Ok(recalled)
676    }
677
678    /// Recall messages using query-aware routing.
679    ///
680    /// Delegates to FTS5-only, vector-only, or hybrid search based on the router decision,
681    /// then runs the shared merge and ranking pipeline.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if any underlying search or database operation fails.
686    pub async fn recall_routed(
687        &self,
688        query: &str,
689        limit: usize,
690        filter: Option<SearchFilter>,
691        router: &dyn crate::router::MemoryRouter,
692    ) -> Result<Vec<RecalledMessage>, MemoryError> {
693        use crate::router::MemoryRoute;
694
695        let route = router.route(query);
696        tracing::debug!(?route, query_len = query.len(), "memory routing decision");
697
698        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
699
700        let (keyword_results, vector_results): (
701            Vec<(MessageId, f64)>,
702            Vec<crate::embedding_store::SearchResult>,
703        ) = match route {
704            MemoryRoute::Keyword => {
705                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
706                (kw, Vec::new())
707            }
708            MemoryRoute::Semantic => {
709                let vr = self.recall_vectors_raw(query, limit, filter).await?;
710                (Vec::new(), vr)
711            }
712            MemoryRoute::Hybrid => {
713                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
714                    Ok(r) => r,
715                    Err(e) => {
716                        tracing::warn!("FTS5 keyword search failed: {e:#}");
717                        Vec::new()
718                    }
719                };
720                let vr = self.recall_vectors_raw(query, limit, filter).await?;
721                (kw, vr)
722            }
723            // Episodic: FTS5 keyword search with an optional timestamp-range filter.
724            // Temporal keywords are stripped from the query before passing to FTS5 to
725            // prevent BM25 score distortion (e.g. "yesterday" matching messages that
726            // literally contain the word "yesterday" regardless of actual relevance).
727            // Vector search is skipped for speed; temporal decay in recall_merge_and_rank
728            // provides recency boosting for the FTS5 results.
729            // Known trade-off (MVP): semantically similar but lexically different messages
730            // may be missed. See issue #1629 for a future hybrid_temporal mode.
731            MemoryRoute::Episodic => {
732                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
733                let cleaned = crate::router::strip_temporal_keywords(query);
734                let search_query = if cleaned.is_empty() { query } else { &cleaned };
735                let kw = if let Some(ref r) = range {
736                    self.sqlite
737                        .keyword_search_with_time_range(
738                            search_query,
739                            limit,
740                            conversation_id,
741                            r.after.as_deref(),
742                            r.before.as_deref(),
743                        )
744                        .await?
745                } else {
746                    self.recall_fts5_raw(search_query, limit, conversation_id)
747                        .await?
748                };
749                tracing::debug!(
750                    has_range = range.is_some(),
751                    cleaned_query = %search_query,
752                    keyword_count = kw.len(),
753                    "recall: episodic path"
754                );
755                (kw, Vec::new())
756            }
757            // Graph routing triggers graph_recall separately in agent/context.rs.
758            // For the message-based recall, behave like Hybrid.
759            MemoryRoute::Graph => {
760                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
761                    Ok(r) => r,
762                    Err(e) => {
763                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
764                        Vec::new()
765                    }
766                };
767                let vr = self.recall_vectors_raw(query, limit, filter).await?;
768                (kw, vr)
769            }
770        };
771
772        tracing::debug!(
773            keyword_count = keyword_results.len(),
774            vector_count = vector_results.len(),
775            "recall: routed search results"
776        );
777
778        self.recall_merge_and_rank(keyword_results, vector_results, limit)
779            .await
780    }
781
782    /// Async variant of [`recall_routed`](Self::recall_routed) that uses
783    /// [`AsyncMemoryRouter::route_async`](crate::router::AsyncMemoryRouter::route_async) when
784    /// available, enabling LLM-based routing for `LlmRouter` and `HybridRouter`.
785    ///
786    /// Falls back to [`recall_routed`](Self::recall_routed) for routers that only implement
787    /// the sync `MemoryRouter` trait (e.g. `HeuristicRouter`).
788    ///
789    /// # Errors
790    ///
791    /// Returns an error if any underlying search or database operation fails.
792    pub async fn recall_routed_async(
793        &self,
794        query: &str,
795        limit: usize,
796        filter: Option<crate::embedding_store::SearchFilter>,
797        router: &dyn crate::router::AsyncMemoryRouter,
798    ) -> Result<Vec<RecalledMessage>, MemoryError> {
799        use crate::router::MemoryRoute;
800
801        let decision = router.route_async(query).await;
802        let route = decision.route;
803        tracing::debug!(
804            ?route,
805            confidence = decision.confidence,
806            query_len = query.len(),
807            "memory routing decision (async)"
808        );
809
810        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
811
812        let (keyword_results, vector_results): (
813            Vec<(crate::types::MessageId, f64)>,
814            Vec<crate::embedding_store::SearchResult>,
815        ) = match route {
816            MemoryRoute::Keyword => {
817                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
818                (kw, Vec::new())
819            }
820            MemoryRoute::Semantic => {
821                let vr = self.recall_vectors_raw(query, limit, filter).await?;
822                (Vec::new(), vr)
823            }
824            MemoryRoute::Hybrid => {
825                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
826                    Ok(r) => r,
827                    Err(e) => {
828                        tracing::warn!("FTS5 keyword search failed: {e:#}");
829                        Vec::new()
830                    }
831                };
832                let vr = self.recall_vectors_raw(query, limit, filter).await?;
833                (kw, vr)
834            }
835            MemoryRoute::Episodic => {
836                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
837                let cleaned = crate::router::strip_temporal_keywords(query);
838                let search_query = if cleaned.is_empty() { query } else { &cleaned };
839                let kw = if let Some(ref r) = range {
840                    self.sqlite
841                        .keyword_search_with_time_range(
842                            search_query,
843                            limit,
844                            conversation_id,
845                            r.after.as_deref(),
846                            r.before.as_deref(),
847                        )
848                        .await?
849                } else {
850                    self.recall_fts5_raw(search_query, limit, conversation_id)
851                        .await?
852                };
853                (kw, Vec::new())
854            }
855            MemoryRoute::Graph => {
856                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
857                    Ok(r) => r,
858                    Err(e) => {
859                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
860                        Vec::new()
861                    }
862                };
863                let vr = self.recall_vectors_raw(query, limit, filter).await?;
864                (kw, vr)
865            }
866        };
867
868        tracing::debug!(
869            keyword_count = keyword_results.len(),
870            vector_count = vector_results.len(),
871            "recall: routed search results (async)"
872        );
873
874        self.recall_merge_and_rank(keyword_results, vector_results, limit)
875            .await
876    }
877
878    /// Retrieve graph facts relevant to `query` via BFS traversal.
879    ///
880    /// Returns an empty `Vec` if no `graph_store` is configured.
881    ///
882    /// # Parameters
883    ///
884    /// - `at_timestamp`: when `Some`, only edges valid at that `SQLite` datetime string are returned.
885    ///   When `None`, only currently active edges are used.
886    /// - `temporal_decay_rate`: non-negative decay rate (1/day). `0.0` preserves original ordering.
887    ///
888    /// # Errors
889    ///
890    /// Returns an error if the underlying graph query fails.
891    pub async fn recall_graph(
892        &self,
893        query: &str,
894        limit: usize,
895        max_hops: u32,
896        at_timestamp: Option<&str>,
897        temporal_decay_rate: f64,
898        edge_types: &[crate::graph::EdgeType],
899    ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
900        let Some(store) = &self.graph_store else {
901            return Ok(Vec::new());
902        };
903
904        tracing::debug!(
905            query_len = query.len(),
906            limit,
907            max_hops,
908            "graph: starting recall"
909        );
910
911        let results = crate::graph::retrieval::graph_recall(
912            store,
913            self.qdrant.as_deref(),
914            &self.provider,
915            query,
916            limit,
917            max_hops,
918            at_timestamp,
919            temporal_decay_rate,
920            edge_types,
921        )
922        .await?;
923
924        tracing::debug!(result_count = results.len(), "graph: recall complete");
925
926        Ok(results)
927    }
928
929    /// Retrieve graph facts via SYNAPSE spreading activation.
930    ///
931    /// Delegates to [`crate::graph::retrieval::graph_recall_activated`].
932    /// Used in place of [`recall_graph`] when `spreading_activation.enabled = true`.
933    ///
934    /// # Errors
935    ///
936    /// Returns an error if the underlying graph query fails.
937    pub async fn recall_graph_activated(
938        &self,
939        query: &str,
940        limit: usize,
941        params: crate::graph::SpreadingActivationParams,
942        edge_types: &[crate::graph::EdgeType],
943    ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
944        let Some(store) = &self.graph_store else {
945            return Ok(Vec::new());
946        };
947
948        tracing::debug!(
949            query_len = query.len(),
950            limit,
951            "spreading activation: starting graph recall"
952        );
953
954        let embeddings = self.qdrant.as_deref();
955        let results = crate::graph::retrieval::graph_recall_activated(
956            store,
957            embeddings,
958            &self.provider,
959            query,
960            limit,
961            params,
962            edge_types,
963        )
964        .await?;
965
966        tracing::debug!(
967            result_count = results.len(),
968            "spreading activation: graph recall complete"
969        );
970
971        Ok(results)
972    }
973
974    /// Increment access count and update `last_accessed` for a batch of message IDs.
975    ///
976    /// Skips the update if `message_ids` is empty to avoid an invalid `IN ()` clause.
977    ///
978    /// # Errors
979    ///
980    /// Returns an error if the `SQLite` update fails.
981    async fn batch_increment_access_count(
982        &self,
983        message_ids: Vec<MessageId>,
984    ) -> Result<(), MemoryError> {
985        if message_ids.is_empty() {
986            return Ok(());
987        }
988        self.sqlite.increment_access_counts(&message_ids).await
989    }
990
991    /// Check whether an embedding exists for a given message ID.
992    ///
993    /// # Errors
994    ///
995    /// Returns an error if the `SQLite` query fails.
996    pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
997        match &self.qdrant {
998            Some(qdrant) => qdrant.has_embedding(message_id).await,
999            None => Ok(false),
1000        }
1001    }
1002
1003    /// Embed all messages that do not yet have embeddings.
1004    ///
1005    /// Streams up to 1 000 unembedded rows from `SQLite` one at a time, embedding and storing
1006    /// each before advancing the cursor. This avoids loading all message content into memory
1007    /// at once, which can cause a significant RAM spike when messages contain large tool output
1008    /// or code blocks.
1009    ///
1010    /// Returns the count of successfully embedded messages.
1011    ///
1012    /// # Errors
1013    ///
1014    /// Returns an error if collection initialization or the streaming query setup fails.
1015    /// Individual embedding failures are logged but do not stop processing.
1016    pub async fn embed_missing(&self) -> Result<usize, MemoryError> {
1017        if self.qdrant.is_none() || !self.effective_embed_provider().supports_embeddings() {
1018            return Ok(0);
1019        }
1020
1021        let mut stream = std::pin::pin!(self.sqlite.stream_unembedded_messages(1000));
1022
1023        let mut count = 0usize;
1024        let mut total = 0usize;
1025        while let Some(row) = stream.next().await {
1026            let (msg_id, conversation_id, role, content) = match row {
1027                Ok(r) => r,
1028                Err(e) => {
1029                    tracing::warn!("embed_missing: failed to read row: {e:#}");
1030                    continue;
1031                }
1032            };
1033            total += 1;
1034            if self
1035                .embed_and_store_regular(msg_id, conversation_id, &role, &content)
1036                .await
1037            {
1038                count += 1;
1039            }
1040        }
1041
1042        if total > 0 {
1043            tracing::info!("Embedded {count}/{total} missing messages");
1044        }
1045        Ok(count)
1046    }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051    use super::*;
1052
1053    #[test]
1054    fn embed_context_default_all_none() {
1055        let ctx = EmbedContext::default();
1056        assert!(ctx.tool_name.is_none());
1057        assert!(ctx.exit_code.is_none());
1058        assert!(ctx.timestamp.is_none());
1059    }
1060
1061    #[test]
1062    fn embed_context_fields_set_correctly() {
1063        let ctx = EmbedContext {
1064            tool_name: Some("shell".to_string()),
1065            exit_code: Some(0),
1066            timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1067        };
1068        assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1069        assert_eq!(ctx.exit_code, Some(0));
1070        assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1071    }
1072
1073    #[test]
1074    fn embed_context_non_zero_exit_code() {
1075        let ctx = EmbedContext {
1076            tool_name: Some("shell".to_string()),
1077            exit_code: Some(1),
1078            timestamp: None,
1079        };
1080        assert_eq!(ctx.exit_code, Some(1));
1081        assert!(ctx.timestamp.is_none());
1082    }
1083}