Skip to main content

zeph_memory/semantic/
recall.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use zeph_llm::provider::{LlmProvider as _, Message};
5
6/// Approximate characters per token (conservative estimate for mixed content).
7const CHARS_PER_TOKEN: usize = 4;
8
9/// Target chunk size in characters (~400 tokens).
10const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
11
12/// Overlap between adjacent chunks in characters (~80 tokens).
13const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
14
15/// Split `text` into overlapping chunks suitable for embedding.
16///
17/// For text shorter than `CHUNK_CHARS`, returns a single chunk.
18/// Splits at UTF-8 character boundaries on paragraph (`\n\n`), line (`\n`),
19/// space (` `), or raw character boundaries as a last resort.
20fn chunk_text(text: &str) -> Vec<&str> {
21    if text.len() <= CHUNK_CHARS {
22        return vec![text];
23    }
24
25    let mut chunks = Vec::new();
26    let mut start = 0;
27
28    while start < text.len() {
29        let end = if start + CHUNK_CHARS >= text.len() {
30            text.len()
31        } else {
32            // Find a clean UTF-8 char boundary at or before start + CHUNK_CHARS.
33            let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
34            // Prefer to split at a paragraph or line break for cleaner chunks.
35            let slice = &text[start..boundary];
36            if let Some(pos) = slice.rfind("\n\n") {
37                start + pos + 2
38            } else if let Some(pos) = slice.rfind('\n') {
39                start + pos + 1
40            } else if let Some(pos) = slice.rfind(' ') {
41                start + pos + 1
42            } else {
43                boundary
44            }
45        };
46
47        chunks.push(&text[start..end]);
48        if end >= text.len() {
49            break;
50        }
51        // Next chunk starts with overlap.
52        let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
53        start = text.ceil_char_boundary(next);
54        if start >= end {
55            start = end; // safeguard against infinite loop
56        }
57    }
58
59    chunks
60}
61
62use crate::admission::log_admission_decision;
63use crate::embedding_store::{MessageKind, SearchFilter};
64use crate::error::MemoryError;
65use crate::types::{ConversationId, MessageId};
66
67use super::SemanticMemory;
68use super::algorithms::{apply_mmr, apply_temporal_decay};
69
70/// Tool execution metadata stored as Qdrant payload fields alongside embeddings.
71///
72/// Stored as payload — NOT prepended to content — to avoid corrupting embedding vectors.
73#[derive(Debug, Clone, Default)]
74pub struct EmbedContext {
75    pub tool_name: Option<String>,
76    pub exit_code: Option<i32>,
77    pub timestamp: Option<String>,
78}
79
80#[derive(Debug)]
81pub struct RecalledMessage {
82    pub message: Message,
83    pub score: f32,
84}
85
86impl SemanticMemory {
87    /// Save a message to `SQLite` and optionally embed and store in Qdrant.
88    ///
89    /// Returns `Ok(Some(message_id))` when admitted and persisted.
90    /// Returns `Ok(None)` when A-MAC admission control rejects the message (not an error).
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if the `SQLite` save fails. Embedding failures are logged but not
95    /// propagated.
96    pub async fn remember(
97        &self,
98        conversation_id: ConversationId,
99        role: &str,
100        content: &str,
101        goal_text: Option<&str>,
102    ) -> Result<Option<MessageId>, MemoryError> {
103        // A-MAC admission gate.
104        if let Some(ref admission) = self.admission_control {
105            let decision = admission
106                .evaluate(
107                    content,
108                    role,
109                    &self.provider,
110                    self.qdrant.as_ref(),
111                    goal_text,
112                )
113                .await;
114            let preview: String = content.chars().take(100).collect();
115            log_admission_decision(&decision, &preview, role, admission.threshold());
116            if !decision.admitted {
117                return Ok(None);
118            }
119        }
120
121        let message_id = self
122            .sqlite
123            .save_message(conversation_id, role, content)
124            .await?;
125
126        if let Some(qdrant) = &self.qdrant
127            && self.provider.supports_embeddings()
128        {
129            let chunks = chunk_text(content);
130            let chunk_count = chunks.len();
131            let mut collection_ready = false;
132
133            for (chunk_index, chunk) in chunks.into_iter().enumerate() {
134                let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
135                match self.provider.embed(chunk).await {
136                    Ok(vector) => {
137                        if !collection_ready {
138                            let vector_size = u64::try_from(vector.len()).unwrap_or(896);
139                            if let Err(e) = qdrant.ensure_collection(vector_size).await {
140                                tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
141                                break;
142                            }
143                            collection_ready = true;
144                        }
145                        if let Err(e) = qdrant
146                            .store(
147                                message_id,
148                                conversation_id,
149                                role,
150                                vector,
151                                MessageKind::Regular,
152                                &self.embedding_model,
153                                chunk_index_u32,
154                            )
155                            .await
156                        {
157                            tracing::warn!(
158                                "Failed to store chunk {chunk_index}/{chunk_count} \
159                                 for msg {message_id}: {e:#}"
160                            );
161                        }
162                    }
163                    Err(e) => {
164                        tracing::warn!(
165                            "Failed to embed chunk {chunk_index}/{chunk_count} \
166                             for msg {message_id}: {e:#}"
167                        );
168                    }
169                }
170            }
171        }
172
173        Ok(Some(message_id))
174    }
175
176    /// Save a message with pre-serialized parts JSON to `SQLite` and optionally embed in Qdrant.
177    ///
178    /// Returns `Ok((Some(message_id), embedding_stored))` when admitted and persisted.
179    /// Returns `Ok((None, false))` when A-MAC admission control rejects the message.
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if the `SQLite` save fails.
184    pub async fn remember_with_parts(
185        &self,
186        conversation_id: ConversationId,
187        role: &str,
188        content: &str,
189        parts_json: &str,
190        goal_text: Option<&str>,
191    ) -> Result<(Option<MessageId>, bool), MemoryError> {
192        // A-MAC admission gate.
193        if let Some(ref admission) = self.admission_control {
194            let decision = admission
195                .evaluate(
196                    content,
197                    role,
198                    &self.provider,
199                    self.qdrant.as_ref(),
200                    goal_text,
201                )
202                .await;
203            let preview: String = content.chars().take(100).collect();
204            log_admission_decision(&decision, &preview, role, admission.threshold());
205            if !decision.admitted {
206                return Ok((None, false));
207            }
208        }
209
210        let message_id = self
211            .sqlite
212            .save_message_with_parts(conversation_id, role, content, parts_json)
213            .await?;
214
215        let mut embedding_stored = false;
216
217        if let Some(qdrant) = &self.qdrant
218            && self.provider.supports_embeddings()
219        {
220            let chunks = chunk_text(content);
221            let chunk_count = chunks.len();
222            let mut collection_ready = false;
223
224            for (chunk_index, chunk) in chunks.into_iter().enumerate() {
225                let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
226                match self.provider.embed(chunk).await {
227                    Ok(vector) => {
228                        if !collection_ready {
229                            let vector_size = u64::try_from(vector.len()).unwrap_or(896);
230                            if let Err(e) = qdrant.ensure_collection(vector_size).await {
231                                tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
232                                break;
233                            }
234                            collection_ready = true;
235                        }
236                        if let Err(e) = qdrant
237                            .store(
238                                message_id,
239                                conversation_id,
240                                role,
241                                vector,
242                                MessageKind::Regular,
243                                &self.embedding_model,
244                                chunk_index_u32,
245                            )
246                            .await
247                        {
248                            tracing::warn!(
249                                "Failed to store chunk {chunk_index}/{chunk_count} \
250                                 for msg {message_id}: {e:#}"
251                            );
252                        } else {
253                            embedding_stored = true;
254                        }
255                    }
256                    Err(e) => {
257                        tracing::warn!(
258                            "Failed to embed chunk {chunk_index}/{chunk_count} \
259                             for msg {message_id}: {e:#}"
260                        );
261                    }
262                }
263            }
264        }
265
266        Ok((Some(message_id), embedding_stored))
267    }
268
269    /// Save a tool output to `SQLite` and embed with tool metadata in Qdrant payload.
270    ///
271    /// Tool metadata (`tool_name`, `exit_code`, `timestamp`) is stored as Qdrant payload fields
272    /// so it is available for filtering without corrupting the embedding vector.
273    ///
274    /// Returns `Ok(Some(message_id))` when admitted and persisted.
275    /// Returns `Ok(None)` when A-MAC admission control rejects the message.
276    ///
277    /// # Errors
278    ///
279    /// Returns an error if the `SQLite` save fails.
280    pub async fn remember_tool_output(
281        &self,
282        conversation_id: ConversationId,
283        role: &str,
284        content: &str,
285        parts_json: &str,
286        embed_ctx: EmbedContext,
287    ) -> Result<(Option<MessageId>, bool), MemoryError> {
288        if let Some(ref admission) = self.admission_control {
289            let decision = admission
290                .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
291                .await;
292            let preview: String = content.chars().take(100).collect();
293            log_admission_decision(&decision, &preview, role, admission.threshold());
294            if !decision.admitted {
295                return Ok((None, false));
296            }
297        }
298
299        let message_id = self
300            .sqlite
301            .save_message_with_parts(conversation_id, role, content, parts_json)
302            .await?;
303
304        let embedding_stored = self
305            .embed_chunks_with_tool_context(message_id, conversation_id, role, content, embed_ctx)
306            .await;
307
308        Ok((Some(message_id), embedding_stored))
309    }
310
311    /// Embed content chunks, enriching Qdrant payload with tool metadata when present.
312    ///
313    /// Returns `true` if at least one chunk was successfully stored.
314    async fn embed_chunks_with_tool_context(
315        &self,
316        message_id: MessageId,
317        conversation_id: ConversationId,
318        role: &str,
319        content: &str,
320        embed_ctx: EmbedContext,
321    ) -> bool {
322        let Some(qdrant) = &self.qdrant else {
323            return false;
324        };
325        if !self.provider.supports_embeddings() {
326            return false;
327        }
328
329        let chunks = chunk_text(content);
330        let chunk_count = chunks.len();
331        let mut collection_ready = false;
332        let mut stored = false;
333
334        for (chunk_index, chunk) in chunks.into_iter().enumerate() {
335            let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
336            let Ok(vector) = self.provider.embed(chunk).await else {
337                tracing::warn!(
338                    "Failed to embed tool-output chunk {chunk_index}/{chunk_count} \
339                     for msg {message_id}"
340                );
341                continue;
342            };
343            if !collection_ready {
344                let vector_size = u64::try_from(vector.len()).unwrap_or(896);
345                if let Err(e) = qdrant.ensure_collection(vector_size).await {
346                    tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
347                    break;
348                }
349                collection_ready = true;
350            }
351            let result = if let Some(ref tool_name) = embed_ctx.tool_name {
352                qdrant
353                    .store_with_tool_context(
354                        message_id,
355                        conversation_id,
356                        role,
357                        vector,
358                        MessageKind::Regular,
359                        &self.embedding_model,
360                        chunk_index_u32,
361                        tool_name,
362                        embed_ctx.exit_code,
363                        embed_ctx.timestamp.as_deref(),
364                    )
365                    .await
366                    .map(|_| ())
367            } else {
368                qdrant
369                    .store(
370                        message_id,
371                        conversation_id,
372                        role,
373                        vector,
374                        MessageKind::Regular,
375                        &self.embedding_model,
376                        chunk_index_u32,
377                    )
378                    .await
379                    .map(|_| ())
380            };
381            match result {
382                Ok(()) => stored = true,
383                Err(e) => tracing::warn!(
384                    "Failed to store tool-output chunk {chunk_index}/{chunk_count} \
385                     for msg {message_id}: {e:#}"
386                ),
387            }
388        }
389
390        stored
391    }
392
393    /// Save a message to `SQLite` without generating an embedding.
394    ///
395    /// Use this when embedding is intentionally skipped (e.g. autosave disabled for assistant).
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if the `SQLite` save fails.
400    pub async fn save_only(
401        &self,
402        conversation_id: ConversationId,
403        role: &str,
404        content: &str,
405        parts_json: &str,
406    ) -> Result<MessageId, MemoryError> {
407        self.sqlite
408            .save_message_with_parts(conversation_id, role, content, parts_json)
409            .await
410    }
411
412    /// Recall relevant messages using hybrid search (vector + FTS5 keyword).
413    ///
414    /// When Qdrant is available, runs both vector and keyword searches, then merges
415    /// results using weighted scoring. When Qdrant is unavailable, falls back to
416    /// FTS5-only keyword search.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if embedding generation, Qdrant search, or FTS5 query fails.
421    pub async fn recall(
422        &self,
423        query: &str,
424        limit: usize,
425        filter: Option<SearchFilter>,
426    ) -> Result<Vec<RecalledMessage>, MemoryError> {
427        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
428
429        tracing::debug!(
430            query_len = query.len(),
431            limit,
432            has_filter = filter.is_some(),
433            conversation_id = conversation_id.map(|c| c.0),
434            has_qdrant = self.qdrant.is_some(),
435            "recall: starting hybrid search"
436        );
437
438        let keyword_results = match self
439            .sqlite
440            .keyword_search(query, limit * 2, conversation_id)
441            .await
442        {
443            Ok(results) => results,
444            Err(e) => {
445                tracing::warn!("FTS5 keyword search failed: {e:#}");
446                Vec::new()
447            }
448        };
449
450        let vector_results = if let Some(qdrant) = &self.qdrant
451            && self.provider.supports_embeddings()
452        {
453            let query_vector = self.provider.embed(query).await?;
454            let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
455            qdrant.ensure_collection(vector_size).await?;
456            qdrant.search(&query_vector, limit * 2, filter).await?
457        } else {
458            Vec::new()
459        };
460
461        self.recall_merge_and_rank(keyword_results, vector_results, limit)
462            .await
463    }
464
465    pub(super) async fn recall_fts5_raw(
466        &self,
467        query: &str,
468        limit: usize,
469        conversation_id: Option<ConversationId>,
470    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
471        self.sqlite
472            .keyword_search(query, limit * 2, conversation_id)
473            .await
474    }
475
476    pub(super) async fn recall_vectors_raw(
477        &self,
478        query: &str,
479        limit: usize,
480        filter: Option<SearchFilter>,
481    ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
482        let Some(qdrant) = &self.qdrant else {
483            return Ok(Vec::new());
484        };
485        if !self.provider.supports_embeddings() {
486            return Ok(Vec::new());
487        }
488        let query_vector = self.provider.embed(query).await?;
489        let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
490        qdrant.ensure_collection(vector_size).await?;
491        qdrant.search(&query_vector, limit * 2, filter).await
492    }
493
494    /// Merge raw keyword and vector results, apply weighted scoring, temporal decay, and MMR
495    /// re-ranking, then resolve to `RecalledMessage` objects.
496    ///
497    /// This is the shared post-processing step used by all recall paths.
498    ///
499    /// # Errors
500    ///
501    /// Returns an error if the `SQLite` `messages_by_ids` query fails.
502    #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
503    pub(super) async fn recall_merge_and_rank(
504        &self,
505        keyword_results: Vec<(MessageId, f64)>,
506        vector_results: Vec<crate::embedding_store::SearchResult>,
507        limit: usize,
508    ) -> Result<Vec<RecalledMessage>, MemoryError> {
509        tracing::debug!(
510            vector_count = vector_results.len(),
511            keyword_count = keyword_results.len(),
512            limit,
513            "recall: merging search results"
514        );
515
516        let mut scores: std::collections::HashMap<MessageId, f64> =
517            std::collections::HashMap::new();
518
519        if !vector_results.is_empty() {
520            let max_vs = vector_results
521                .iter()
522                .map(|r| r.score)
523                .fold(f32::NEG_INFINITY, f32::max);
524            let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
525            for r in &vector_results {
526                let normalized = f64::from(r.score / norm);
527                *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
528            }
529        }
530
531        if !keyword_results.is_empty() {
532            let max_ks = keyword_results
533                .iter()
534                .map(|r| r.1)
535                .fold(f64::NEG_INFINITY, f64::max);
536            let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
537            for &(msg_id, score) in &keyword_results {
538                let normalized = score / norm;
539                *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
540            }
541        }
542
543        if scores.is_empty() {
544            tracing::debug!("recall: empty merge, no overlapping scores");
545            return Ok(Vec::new());
546        }
547
548        let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
549        ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
550
551        tracing::debug!(
552            merged = ranked.len(),
553            top_score = ranked.first().map(|r| r.1),
554            bottom_score = ranked.last().map(|r| r.1),
555            vector_weight = %self.vector_weight,
556            keyword_weight = %self.keyword_weight,
557            "recall: weighted merge complete"
558        );
559
560        if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
561            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
562            match self.sqlite.message_timestamps(&ids).await {
563                Ok(timestamps) => {
564                    apply_temporal_decay(
565                        &mut ranked,
566                        &timestamps,
567                        self.temporal_decay_half_life_days,
568                    );
569                    ranked
570                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
571                    tracing::debug!(
572                        half_life_days = self.temporal_decay_half_life_days,
573                        top_score_after = ranked.first().map(|r| r.1),
574                        "recall: temporal decay applied"
575                    );
576                }
577                Err(e) => {
578                    tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
579                }
580            }
581        }
582
583        if self.mmr_enabled && !vector_results.is_empty() {
584            if let Some(qdrant) = &self.qdrant {
585                let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
586                match qdrant.get_vectors(&ids).await {
587                    Ok(vec_map) if !vec_map.is_empty() => {
588                        let ranked_len_before = ranked.len();
589                        ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
590                        tracing::debug!(
591                            before = ranked_len_before,
592                            after = ranked.len(),
593                            lambda = %self.mmr_lambda,
594                            "recall: mmr re-ranked"
595                        );
596                    }
597                    Ok(_) => {
598                        ranked.truncate(limit);
599                    }
600                    Err(e) => {
601                        tracing::warn!("MMR: failed to fetch vectors: {e:#}");
602                        ranked.truncate(limit);
603                    }
604                }
605            } else {
606                ranked.truncate(limit);
607            }
608        } else {
609            ranked.truncate(limit);
610        }
611
612        if self.importance_enabled && !ranked.is_empty() {
613            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
614            match self.sqlite.fetch_importance_scores(&ids).await {
615                Ok(scores) => {
616                    for (msg_id, score) in &mut ranked {
617                        if let Some(&imp) = scores.get(msg_id) {
618                            *score += imp * self.importance_weight;
619                        }
620                    }
621                    ranked
622                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
623                    tracing::debug!(
624                        importance_weight = %self.importance_weight,
625                        "recall: importance scores blended"
626                    );
627                }
628                Err(e) => {
629                    tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
630                }
631            }
632        }
633
634        // Apply tier boost: semantic-tier messages receive an additive bonus so distilled facts
635        // rank above episodic messages with the same base score. Additive (not multiplicative)
636        // so the effect is consistent regardless of base score magnitude.
637        if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
638            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
639            match self.sqlite.fetch_tiers(&ids).await {
640                Ok(tiers) => {
641                    let bonus = self.tier_boost_semantic - 1.0;
642                    let mut boosted = false;
643                    for (msg_id, score) in &mut ranked {
644                        if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
645                            *score += bonus;
646                            boosted = true;
647                        }
648                    }
649                    if boosted {
650                        ranked.sort_by(|a, b| {
651                            b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
652                        });
653                        tracing::debug!(
654                            tier_boost = %self.tier_boost_semantic,
655                            "recall: semantic tier boost applied"
656                        );
657                    }
658                }
659                Err(e) => {
660                    tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
661                }
662            }
663        }
664
665        let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
666
667        if !ids.is_empty()
668            && let Err(e) = self.batch_increment_access_count(ids.clone()).await
669        {
670            tracing::warn!("recall: failed to increment access counts: {e:#}");
671        }
672
673        // Update RL admission training data: mark recalled messages as positive examples.
674        if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
675            tracing::debug!(
676                error = %e,
677                "recall: failed to mark training data as recalled (non-fatal)"
678            );
679        }
680
681        let messages = self.sqlite.messages_by_ids(&ids).await?;
682        let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
683
684        let recalled: Vec<RecalledMessage> = ranked
685            .iter()
686            .filter_map(|(msg_id, score)| {
687                msg_map.get(msg_id).map(|msg| RecalledMessage {
688                    message: msg.clone(),
689                    #[expect(clippy::cast_possible_truncation)]
690                    score: *score as f32,
691                })
692            })
693            .collect();
694
695        tracing::debug!(final_count = recalled.len(), "recall: final results");
696
697        Ok(recalled)
698    }
699
700    /// Recall messages using query-aware routing.
701    ///
702    /// Delegates to FTS5-only, vector-only, or hybrid search based on the router decision,
703    /// then runs the shared merge and ranking pipeline.
704    ///
705    /// # Errors
706    ///
707    /// Returns an error if any underlying search or database operation fails.
708    pub async fn recall_routed(
709        &self,
710        query: &str,
711        limit: usize,
712        filter: Option<SearchFilter>,
713        router: &dyn crate::router::MemoryRouter,
714    ) -> Result<Vec<RecalledMessage>, MemoryError> {
715        use crate::router::MemoryRoute;
716
717        let route = router.route(query);
718        tracing::debug!(?route, query_len = query.len(), "memory routing decision");
719
720        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
721
722        let (keyword_results, vector_results): (
723            Vec<(MessageId, f64)>,
724            Vec<crate::embedding_store::SearchResult>,
725        ) = match route {
726            MemoryRoute::Keyword => {
727                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
728                (kw, Vec::new())
729            }
730            MemoryRoute::Semantic => {
731                let vr = self.recall_vectors_raw(query, limit, filter).await?;
732                (Vec::new(), vr)
733            }
734            MemoryRoute::Hybrid => {
735                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
736                    Ok(r) => r,
737                    Err(e) => {
738                        tracing::warn!("FTS5 keyword search failed: {e:#}");
739                        Vec::new()
740                    }
741                };
742                let vr = self.recall_vectors_raw(query, limit, filter).await?;
743                (kw, vr)
744            }
745            // Episodic: FTS5 keyword search with an optional timestamp-range filter.
746            // Temporal keywords are stripped from the query before passing to FTS5 to
747            // prevent BM25 score distortion (e.g. "yesterday" matching messages that
748            // literally contain the word "yesterday" regardless of actual relevance).
749            // Vector search is skipped for speed; temporal decay in recall_merge_and_rank
750            // provides recency boosting for the FTS5 results.
751            // Known trade-off (MVP): semantically similar but lexically different messages
752            // may be missed. See issue #1629 for a future hybrid_temporal mode.
753            MemoryRoute::Episodic => {
754                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
755                let cleaned = crate::router::strip_temporal_keywords(query);
756                let search_query = if cleaned.is_empty() { query } else { &cleaned };
757                let kw = if let Some(ref r) = range {
758                    self.sqlite
759                        .keyword_search_with_time_range(
760                            search_query,
761                            limit,
762                            conversation_id,
763                            r.after.as_deref(),
764                            r.before.as_deref(),
765                        )
766                        .await?
767                } else {
768                    self.recall_fts5_raw(search_query, limit, conversation_id)
769                        .await?
770                };
771                tracing::debug!(
772                    has_range = range.is_some(),
773                    cleaned_query = %search_query,
774                    keyword_count = kw.len(),
775                    "recall: episodic path"
776                );
777                (kw, Vec::new())
778            }
779            // Graph routing triggers graph_recall separately in agent/context.rs.
780            // For the message-based recall, behave like Hybrid.
781            MemoryRoute::Graph => {
782                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
783                    Ok(r) => r,
784                    Err(e) => {
785                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
786                        Vec::new()
787                    }
788                };
789                let vr = self.recall_vectors_raw(query, limit, filter).await?;
790                (kw, vr)
791            }
792        };
793
794        tracing::debug!(
795            keyword_count = keyword_results.len(),
796            vector_count = vector_results.len(),
797            "recall: routed search results"
798        );
799
800        self.recall_merge_and_rank(keyword_results, vector_results, limit)
801            .await
802    }
803
804    /// Async variant of [`recall_routed`](Self::recall_routed) that uses
805    /// [`AsyncMemoryRouter::route_async`](crate::router::AsyncMemoryRouter::route_async) when
806    /// available, enabling LLM-based routing for `LlmRouter` and `HybridRouter`.
807    ///
808    /// Falls back to [`recall_routed`](Self::recall_routed) for routers that only implement
809    /// the sync `MemoryRouter` trait (e.g. `HeuristicRouter`).
810    ///
811    /// # Errors
812    ///
813    /// Returns an error if any underlying search or database operation fails.
814    pub async fn recall_routed_async(
815        &self,
816        query: &str,
817        limit: usize,
818        filter: Option<crate::embedding_store::SearchFilter>,
819        router: &dyn crate::router::AsyncMemoryRouter,
820    ) -> Result<Vec<RecalledMessage>, MemoryError> {
821        use crate::router::MemoryRoute;
822
823        let decision = router.route_async(query).await;
824        let route = decision.route;
825        tracing::debug!(
826            ?route,
827            confidence = decision.confidence,
828            query_len = query.len(),
829            "memory routing decision (async)"
830        );
831
832        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
833
834        let (keyword_results, vector_results): (
835            Vec<(crate::types::MessageId, f64)>,
836            Vec<crate::embedding_store::SearchResult>,
837        ) = match route {
838            MemoryRoute::Keyword => {
839                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
840                (kw, Vec::new())
841            }
842            MemoryRoute::Semantic => {
843                let vr = self.recall_vectors_raw(query, limit, filter).await?;
844                (Vec::new(), vr)
845            }
846            MemoryRoute::Hybrid => {
847                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
848                    Ok(r) => r,
849                    Err(e) => {
850                        tracing::warn!("FTS5 keyword search failed: {e:#}");
851                        Vec::new()
852                    }
853                };
854                let vr = self.recall_vectors_raw(query, limit, filter).await?;
855                (kw, vr)
856            }
857            MemoryRoute::Episodic => {
858                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
859                let cleaned = crate::router::strip_temporal_keywords(query);
860                let search_query = if cleaned.is_empty() { query } else { &cleaned };
861                let kw = if let Some(ref r) = range {
862                    self.sqlite
863                        .keyword_search_with_time_range(
864                            search_query,
865                            limit,
866                            conversation_id,
867                            r.after.as_deref(),
868                            r.before.as_deref(),
869                        )
870                        .await?
871                } else {
872                    self.recall_fts5_raw(search_query, limit, conversation_id)
873                        .await?
874                };
875                (kw, Vec::new())
876            }
877            MemoryRoute::Graph => {
878                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
879                    Ok(r) => r,
880                    Err(e) => {
881                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
882                        Vec::new()
883                    }
884                };
885                let vr = self.recall_vectors_raw(query, limit, filter).await?;
886                (kw, vr)
887            }
888        };
889
890        tracing::debug!(
891            keyword_count = keyword_results.len(),
892            vector_count = vector_results.len(),
893            "recall: routed search results (async)"
894        );
895
896        self.recall_merge_and_rank(keyword_results, vector_results, limit)
897            .await
898    }
899
900    /// Retrieve graph facts relevant to `query` via BFS traversal.
901    ///
902    /// Returns an empty `Vec` if no `graph_store` is configured.
903    ///
904    /// # Parameters
905    ///
906    /// - `at_timestamp`: when `Some`, only edges valid at that `SQLite` datetime string are returned.
907    ///   When `None`, only currently active edges are used.
908    /// - `temporal_decay_rate`: non-negative decay rate (1/day). `0.0` preserves original ordering.
909    ///
910    /// # Errors
911    ///
912    /// Returns an error if the underlying graph query fails.
913    pub async fn recall_graph(
914        &self,
915        query: &str,
916        limit: usize,
917        max_hops: u32,
918        at_timestamp: Option<&str>,
919        temporal_decay_rate: f64,
920        edge_types: &[crate::graph::EdgeType],
921    ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
922        let Some(store) = &self.graph_store else {
923            return Ok(Vec::new());
924        };
925
926        tracing::debug!(
927            query_len = query.len(),
928            limit,
929            max_hops,
930            "graph: starting recall"
931        );
932
933        let results = crate::graph::retrieval::graph_recall(
934            store,
935            self.qdrant.as_deref(),
936            &self.provider,
937            query,
938            limit,
939            max_hops,
940            at_timestamp,
941            temporal_decay_rate,
942            edge_types,
943        )
944        .await?;
945
946        tracing::debug!(result_count = results.len(), "graph: recall complete");
947
948        Ok(results)
949    }
950
951    /// Retrieve graph facts via SYNAPSE spreading activation.
952    ///
953    /// Delegates to [`crate::graph::retrieval::graph_recall_activated`].
954    /// Used in place of [`recall_graph`] when `spreading_activation.enabled = true`.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the underlying graph query fails.
959    pub async fn recall_graph_activated(
960        &self,
961        query: &str,
962        limit: usize,
963        params: crate::graph::SpreadingActivationParams,
964        edge_types: &[crate::graph::EdgeType],
965    ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
966        let Some(store) = &self.graph_store else {
967            return Ok(Vec::new());
968        };
969
970        tracing::debug!(
971            query_len = query.len(),
972            limit,
973            "spreading activation: starting graph recall"
974        );
975
976        let embeddings = self.qdrant.as_deref();
977        let results = crate::graph::retrieval::graph_recall_activated(
978            store,
979            embeddings,
980            &self.provider,
981            query,
982            limit,
983            params,
984            edge_types,
985        )
986        .await?;
987
988        tracing::debug!(
989            result_count = results.len(),
990            "spreading activation: graph recall complete"
991        );
992
993        Ok(results)
994    }
995
996    /// Increment access count and update `last_accessed` for a batch of message IDs.
997    ///
998    /// Skips the update if `message_ids` is empty to avoid an invalid `IN ()` clause.
999    ///
1000    /// # Errors
1001    ///
1002    /// Returns an error if the `SQLite` update fails.
1003    async fn batch_increment_access_count(
1004        &self,
1005        message_ids: Vec<MessageId>,
1006    ) -> Result<(), MemoryError> {
1007        if message_ids.is_empty() {
1008            return Ok(());
1009        }
1010        self.sqlite.increment_access_counts(&message_ids).await
1011    }
1012
1013    /// Check whether an embedding exists for a given message ID.
1014    ///
1015    /// # Errors
1016    ///
1017    /// Returns an error if the `SQLite` query fails.
1018    pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
1019        match &self.qdrant {
1020            Some(qdrant) => qdrant.has_embedding(message_id).await,
1021            None => Ok(false),
1022        }
1023    }
1024
1025    /// Embed all messages that do not yet have embeddings.
1026    ///
1027    /// Returns the count of successfully embedded messages.
1028    ///
1029    /// # Errors
1030    ///
1031    /// Returns an error if collection initialization or database query fails.
1032    /// Individual embedding failures are logged but do not stop processing.
1033    pub async fn embed_missing(&self) -> Result<usize, MemoryError> {
1034        let Some(qdrant) = &self.qdrant else {
1035            return Ok(0);
1036        };
1037        if !self.provider.supports_embeddings() {
1038            return Ok(0);
1039        }
1040
1041        let unembedded = self.sqlite.unembedded_message_ids(Some(1000)).await?;
1042
1043        if unembedded.is_empty() {
1044            return Ok(0);
1045        }
1046
1047        let probe = self.provider.embed("probe").await?;
1048        let vector_size = u64::try_from(probe.len())?;
1049        qdrant.ensure_collection(vector_size).await?;
1050
1051        let mut count = 0;
1052        for (msg_id, conversation_id, role, content) in &unembedded {
1053            let chunks = chunk_text(content);
1054            let chunk_count = chunks.len();
1055            let mut stored = 0usize;
1056
1057            for (chunk_index, chunk) in chunks.into_iter().enumerate() {
1058                let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
1059                match self.provider.embed(chunk).await {
1060                    Ok(vector) => {
1061                        if let Err(e) = qdrant
1062                            .store(
1063                                *msg_id,
1064                                *conversation_id,
1065                                role,
1066                                vector,
1067                                MessageKind::Regular,
1068                                &self.embedding_model,
1069                                chunk_index_u32,
1070                            )
1071                            .await
1072                        {
1073                            tracing::warn!(
1074                                "Failed to store chunk {chunk_index}/{chunk_count} \
1075                                 for msg {msg_id}: {e:#}"
1076                            );
1077                        } else {
1078                            stored += 1;
1079                        }
1080                    }
1081                    Err(e) => {
1082                        tracing::warn!(
1083                            "Failed to embed chunk {chunk_index}/{chunk_count} \
1084                             for msg {msg_id}: {e:#}"
1085                        );
1086                    }
1087                }
1088            }
1089
1090            if stored > 0 {
1091                count += 1;
1092            }
1093        }
1094
1095        tracing::info!("Embedded {count}/{} missing messages", unembedded.len());
1096        Ok(count)
1097    }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102    use super::*;
1103
1104    #[test]
1105    fn embed_context_default_all_none() {
1106        let ctx = EmbedContext::default();
1107        assert!(ctx.tool_name.is_none());
1108        assert!(ctx.exit_code.is_none());
1109        assert!(ctx.timestamp.is_none());
1110    }
1111
1112    #[test]
1113    fn embed_context_fields_set_correctly() {
1114        let ctx = EmbedContext {
1115            tool_name: Some("shell".to_string()),
1116            exit_code: Some(0),
1117            timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1118        };
1119        assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1120        assert_eq!(ctx.exit_code, Some(0));
1121        assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1122    }
1123
1124    #[test]
1125    fn embed_context_non_zero_exit_code() {
1126        let ctx = EmbedContext {
1127            tool_name: Some("shell".to_string()),
1128            exit_code: Some(1),
1129            timestamp: None,
1130        };
1131        assert_eq!(ctx.exit_code, Some(1));
1132        assert!(ctx.timestamp.is_none());
1133    }
1134}