Skip to main content

zeph_memory/semantic/
recall.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::{StreamExt as _, TryStreamExt as _};
5use zeph_llm::provider::{LlmProvider as _, Message};
6
7/// Approximate characters per token (conservative estimate for mixed content).
8const CHARS_PER_TOKEN: usize = 4;
9
10/// Target chunk size in characters (~400 tokens).
11const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
12
13/// Overlap between adjacent chunks in characters (~80 tokens).
14const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
15
16/// Split `text` into overlapping chunks suitable for embedding.
17///
18/// For text shorter than `CHUNK_CHARS`, returns a single chunk.
19/// Splits at UTF-8 character boundaries on paragraph (`\n\n`), line (`\n`),
20/// space (` `), or raw character boundaries as a last resort.
21fn chunk_text(text: &str) -> Vec<&str> {
22    if text.len() <= CHUNK_CHARS {
23        return vec![text];
24    }
25
26    let mut chunks = Vec::new();
27    let mut start = 0;
28
29    while start < text.len() {
30        let end = if start + CHUNK_CHARS >= text.len() {
31            text.len()
32        } else {
33            // Find a clean UTF-8 char boundary at or before start + CHUNK_CHARS.
34            let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
35            // Prefer to split at a paragraph or line break for cleaner chunks.
36            let slice = &text[start..boundary];
37            if let Some(pos) = slice.rfind("\n\n") {
38                start + pos + 2
39            } else if let Some(pos) = slice.rfind('\n') {
40                start + pos + 1
41            } else if let Some(pos) = slice.rfind(' ') {
42                start + pos + 1
43            } else {
44                boundary
45            }
46        };
47
48        chunks.push(&text[start..end]);
49        if end >= text.len() {
50            break;
51        }
52        // Next chunk starts with overlap.
53        let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
54        start = text.ceil_char_boundary(next);
55        if start >= end {
56            start = end; // safeguard against infinite loop
57        }
58    }
59
60    chunks
61}
62
63use crate::admission::log_admission_decision;
64use crate::embedding_store::{MessageKind, SearchFilter};
65use crate::error::MemoryError;
66use crate::types::{ConversationId, MessageId};
67
68use super::SemanticMemory;
69use super::algorithms::{apply_mmr, apply_temporal_decay};
70
71/// Tool execution metadata stored as Qdrant payload fields alongside embeddings.
72///
73/// Stored as payload — NOT prepended to content — to avoid corrupting embedding vectors.
74#[derive(Debug, Clone, Default)]
75pub struct EmbedContext {
76    pub tool_name: Option<String>,
77    pub exit_code: Option<i32>,
78    pub timestamp: Option<String>,
79}
80
81#[derive(Debug)]
82pub struct RecalledMessage {
83    pub message: Message,
84    pub score: f32,
85}
86
87/// Maximum number of concurrent background embed tasks per `SemanticMemory` instance.
88const MAX_EMBED_BG_TASKS: usize = 64;
89
90/// Shared arguments for background embed tasks.
91struct EmbedBgArgs {
92    qdrant: std::sync::Arc<crate::embedding_store::EmbeddingStore>,
93    embed_provider: zeph_llm::any::AnyProvider,
94    embedding_model: String,
95    message_id: MessageId,
96    conversation_id: ConversationId,
97    role: String,
98    content: String,
99}
100
101/// Background task: embed chunks and store as regular message vectors.
102///
103/// All errors are logged as warnings; the function never panics.
104async fn embed_and_store_regular_bg(args: EmbedBgArgs) {
105    let EmbedBgArgs {
106        qdrant,
107        embed_provider,
108        embedding_model,
109        message_id,
110        conversation_id,
111        role,
112        content,
113    } = args;
114    let chunks = chunk_text(&content);
115    let chunk_count = chunks.len();
116
117    let vectors = match embed_provider.embed_batch(&chunks).await {
118        Ok(v) => v,
119        Err(e) => {
120            tracing::warn!("bg embed_regular: failed to embed chunks for msg {message_id}: {e:#}");
121            return;
122        }
123    };
124
125    let Some(first) = vectors.first() else {
126        return;
127    };
128    let vector_size = first.len() as u64;
129    if let Err(e) = qdrant.ensure_collection(vector_size).await {
130        tracing::warn!("bg embed_regular: failed to ensure Qdrant collection: {e:#}");
131        return;
132    }
133
134    for (chunk_index, vector) in vectors.into_iter().enumerate() {
135        let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
136        if let Err(e) = qdrant
137            .store(
138                message_id,
139                conversation_id,
140                &role,
141                vector,
142                MessageKind::Regular,
143                &embedding_model,
144                chunk_index_u32,
145            )
146            .await
147        {
148            tracing::warn!(
149                "bg embed_regular: failed to store chunk {chunk_index}/{chunk_count} \
150                 for msg {message_id}: {e:#}"
151            );
152        }
153    }
154}
155
156/// Background task: embed chunks with tool context metadata and store in Qdrant.
157///
158/// All errors are logged as warnings; the function never panics.
159async fn embed_chunks_with_tool_context_bg(args: EmbedBgArgs, embed_ctx: EmbedContext) {
160    let EmbedBgArgs {
161        qdrant,
162        embed_provider,
163        embedding_model,
164        message_id,
165        conversation_id,
166        role,
167        content,
168    } = args;
169    let chunks = chunk_text(&content);
170    let chunk_count = chunks.len();
171
172    let vectors = match embed_provider.embed_batch(&chunks).await {
173        Ok(v) => v,
174        Err(e) => {
175            tracing::warn!(
176                "bg embed_tool: failed to embed tool-output chunks for msg {message_id}: {e:#}"
177            );
178            return;
179        }
180    };
181
182    if let Some(first) = vectors.first() {
183        let vector_size = first.len() as u64;
184        if let Err(e) = qdrant.ensure_collection(vector_size).await {
185            tracing::warn!("bg embed_tool: failed to ensure Qdrant collection: {e:#}");
186            return;
187        }
188    }
189
190    for (chunk_index, vector) in vectors.into_iter().enumerate() {
191        let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
192        let result = if let Some(ref tool_name) = embed_ctx.tool_name {
193            qdrant
194                .store_with_tool_context(
195                    message_id,
196                    conversation_id,
197                    &role,
198                    vector,
199                    MessageKind::Regular,
200                    &embedding_model,
201                    chunk_index_u32,
202                    tool_name,
203                    embed_ctx.exit_code,
204                    embed_ctx.timestamp.as_deref(),
205                )
206                .await
207                .map(|_| ())
208        } else {
209            qdrant
210                .store(
211                    message_id,
212                    conversation_id,
213                    &role,
214                    vector,
215                    MessageKind::Regular,
216                    &embedding_model,
217                    chunk_index_u32,
218                )
219                .await
220                .map(|_| ())
221        };
222        if let Err(e) = result {
223            tracing::warn!(
224                "bg embed_tool: failed to store chunk {chunk_index}/{chunk_count} \
225                 for msg {message_id}: {e:#}"
226            );
227        }
228    }
229}
230
231/// Background task: embed chunks with optional category and store in Qdrant.
232///
233/// All errors are logged as warnings; the function never panics.
234async fn embed_and_store_with_category_bg(args: EmbedBgArgs, category: Option<String>) {
235    let EmbedBgArgs {
236        qdrant,
237        embed_provider,
238        embedding_model,
239        message_id,
240        conversation_id,
241        role,
242        content,
243    } = args;
244    let chunks = chunk_text(&content);
245    let chunk_count = chunks.len();
246
247    let vectors = match embed_provider.embed_batch(&chunks).await {
248        Ok(v) => v,
249        Err(e) => {
250            tracing::warn!(
251                "bg embed_category: failed to embed categorized chunks for msg {message_id}: {e:#}"
252            );
253            return;
254        }
255    };
256
257    let Some(first) = vectors.first() else {
258        return;
259    };
260    let vector_size = first.len() as u64;
261    if let Err(e) = qdrant.ensure_collection(vector_size).await {
262        tracing::warn!("bg embed_category: failed to ensure Qdrant collection: {e:#}");
263        return;
264    }
265
266    for (chunk_index, vector) in vectors.into_iter().enumerate() {
267        let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
268        if let Err(e) = qdrant
269            .store_with_category(
270                message_id,
271                conversation_id,
272                &role,
273                vector,
274                MessageKind::Regular,
275                &embedding_model,
276                chunk_index_u32,
277                category.as_deref(),
278            )
279            .await
280        {
281            tracing::warn!(
282                "bg embed_category: failed to store chunk {chunk_index}/{chunk_count} \
283                 for msg {message_id}: {e:#}"
284            );
285        }
286    }
287}
288
289impl SemanticMemory {
290    /// Save a message to `SQLite` and optionally embed and store in Qdrant.
291    ///
292    /// Returns `Ok(Some(message_id))` when admitted and persisted.
293    /// Returns `Ok(None)` when A-MAC admission control rejects the message (not an error).
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if the `SQLite` save fails. Embedding failures are logged but not
298    /// propagated.
299    #[cfg_attr(
300        feature = "profiling",
301        tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
302    )]
303    pub async fn remember(
304        &self,
305        conversation_id: ConversationId,
306        role: &str,
307        content: &str,
308        goal_text: Option<&str>,
309    ) -> Result<Option<MessageId>, MemoryError> {
310        // A-MAC admission gate.
311        if let Some(ref admission) = self.admission_control {
312            let decision = admission
313                .evaluate(
314                    content,
315                    role,
316                    &self.provider,
317                    self.qdrant.as_ref(),
318                    goal_text,
319                )
320                .await;
321            let preview: String = content.chars().take(100).collect();
322            log_admission_decision(&decision, &preview, role, admission.threshold());
323            if !decision.admitted {
324                return Ok(None);
325            }
326        }
327
328        let message_id = self
329            .sqlite
330            .save_message(conversation_id, role, content)
331            .await?;
332
333        self.embed_and_store_regular(message_id, conversation_id, role, content);
334
335        Ok(Some(message_id))
336    }
337
338    /// Save a message with pre-serialized parts JSON to `SQLite` and optionally embed in Qdrant.
339    ///
340    /// Returns `Ok((Some(message_id), embedding_stored))` when admitted and persisted.
341    /// Returns `Ok((None, false))` when A-MAC admission control rejects the message.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the `SQLite` save fails.
346    #[cfg_attr(
347        feature = "profiling",
348        tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
349    )]
350    pub async fn remember_with_parts(
351        &self,
352        conversation_id: ConversationId,
353        role: &str,
354        content: &str,
355        parts_json: &str,
356        goal_text: Option<&str>,
357    ) -> Result<(Option<MessageId>, bool), MemoryError> {
358        // A-MAC admission gate.
359        if let Some(ref admission) = self.admission_control {
360            let decision = admission
361                .evaluate(
362                    content,
363                    role,
364                    &self.provider,
365                    self.qdrant.as_ref(),
366                    goal_text,
367                )
368                .await;
369            let preview: String = content.chars().take(100).collect();
370            log_admission_decision(&decision, &preview, role, admission.threshold());
371            if !decision.admitted {
372                return Ok((None, false));
373            }
374        }
375
376        let message_id = self
377            .sqlite
378            .save_message_with_parts(conversation_id, role, content, parts_json)
379            .await?;
380
381        let embedding_stored =
382            self.embed_and_store_regular(message_id, conversation_id, role, content);
383
384        Ok((Some(message_id), embedding_stored))
385    }
386
387    /// Save a tool output to `SQLite` and embed with tool metadata in Qdrant payload.
388    ///
389    /// Tool metadata (`tool_name`, `exit_code`, `timestamp`) is stored as Qdrant payload fields
390    /// so it is available for filtering without corrupting the embedding vector.
391    ///
392    /// Returns `Ok(Some(message_id))` when admitted and persisted.
393    /// Returns `Ok(None)` when A-MAC admission control rejects the message.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the `SQLite` save fails.
398    #[cfg_attr(
399        feature = "profiling",
400        tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
401    )]
402    pub async fn remember_tool_output(
403        &self,
404        conversation_id: ConversationId,
405        role: &str,
406        content: &str,
407        parts_json: &str,
408        embed_ctx: EmbedContext,
409    ) -> Result<(Option<MessageId>, bool), MemoryError> {
410        if let Some(ref admission) = self.admission_control {
411            let decision = admission
412                .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
413                .await;
414            let preview: String = content.chars().take(100).collect();
415            log_admission_decision(&decision, &preview, role, admission.threshold());
416            if !decision.admitted {
417                return Ok((None, false));
418            }
419        }
420
421        let message_id = self
422            .sqlite
423            .save_message_with_parts(conversation_id, role, content, parts_json)
424            .await?;
425
426        let embedding_stored = self.embed_chunks_with_tool_context(
427            message_id,
428            conversation_id,
429            role,
430            content,
431            embed_ctx,
432        );
433
434        Ok((Some(message_id), embedding_stored))
435    }
436
437    /// Save a categorized message to `SQLite` and embed with category payload in Qdrant.
438    ///
439    /// The `category` is stored in both the `messages.category` column and as a Qdrant payload
440    /// field for recall filtering. Uses A-MAC admission gate.
441    ///
442    /// Returns `Ok(Some(message_id))` when admitted; `Ok(None)` when rejected.
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the `SQLite` save fails.
447    #[cfg_attr(
448        feature = "profiling",
449        tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
450    )]
451    pub async fn remember_categorized(
452        &self,
453        conversation_id: ConversationId,
454        role: &str,
455        content: &str,
456        category: Option<&str>,
457        goal_text: Option<&str>,
458    ) -> Result<Option<MessageId>, MemoryError> {
459        if let Some(ref admission) = self.admission_control {
460            let decision = admission
461                .evaluate(
462                    content,
463                    role,
464                    &self.provider,
465                    self.qdrant.as_ref(),
466                    goal_text,
467                )
468                .await;
469            let preview: String = content.chars().take(100).collect();
470            log_admission_decision(&decision, &preview, role, admission.threshold());
471            if !decision.admitted {
472                return Ok(None);
473            }
474        }
475
476        let message_id = self
477            .sqlite
478            .save_message_with_category(conversation_id, role, content, category)
479            .await?;
480
481        self.embed_and_store_with_category(message_id, conversation_id, role, content, category);
482
483        Ok(Some(message_id))
484    }
485
486    /// Recall messages filtered by category.
487    ///
488    /// When `category` is `None`, behaves identically to [`Self::recall`].
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if the search fails.
493    pub async fn recall_with_category(
494        &self,
495        query: &str,
496        limit: usize,
497        filter: Option<SearchFilter>,
498        category: Option<&str>,
499    ) -> Result<Vec<RecalledMessage>, MemoryError> {
500        let filter_with_category = filter.map(|mut f| {
501            f.category = category.map(str::to_owned);
502            f
503        });
504        self.recall(query, limit, filter_with_category).await
505    }
506
507    /// Reap completed background embed tasks (non-blocking).
508    ///
509    /// Call at turn boundaries to release handles for finished tasks.
510    pub fn reap_embed_tasks(&self) {
511        if let Ok(mut tasks) = self.embed_tasks.lock() {
512            while tasks.try_join_next().is_some() {}
513        }
514    }
515
516    /// Spawn `fut` as a bounded background embed task.
517    ///
518    /// If the task limit is reached, the task is dropped and a debug message is logged.
519    fn spawn_embed_bg<F>(&self, fut: F) -> bool
520    where
521        F: std::future::Future<Output = ()> + Send + 'static,
522    {
523        let Ok(mut tasks) = self.embed_tasks.lock() else {
524            return false;
525        };
526        // Reap any finished tasks before checking capacity.
527        while tasks.try_join_next().is_some() {}
528        if tasks.len() >= MAX_EMBED_BG_TASKS {
529            tracing::debug!("background embed task limit reached, skipping");
530            return false;
531        }
532        tasks.spawn(fut);
533        // embedding dispatched to background; metric not incremented
534        false
535    }
536
537    /// Embed content chunks and store each with an optional category payload field.
538    ///
539    /// Spawns a bounded background task; returns immediately.
540    fn embed_and_store_with_category(
541        &self,
542        message_id: MessageId,
543        conversation_id: ConversationId,
544        role: &str,
545        content: &str,
546        category: Option<&str>,
547    ) -> bool {
548        let Some(qdrant) = self.qdrant.clone() else {
549            return false;
550        };
551        let embed_provider = self.effective_embed_provider().clone();
552        if !embed_provider.supports_embeddings() {
553            return false;
554        }
555        self.spawn_embed_bg(embed_and_store_with_category_bg(
556            EmbedBgArgs {
557                qdrant,
558                embed_provider,
559                embedding_model: self.embedding_model.clone(),
560                message_id,
561                conversation_id,
562                role: role.to_owned(),
563                content: content.to_owned(),
564            },
565            category.map(str::to_owned),
566        ))
567    }
568
569    /// Embed content chunks and store each as a regular (non-tool) message vector.
570    ///
571    /// Spawns a bounded background task; returns immediately.
572    fn embed_and_store_regular(
573        &self,
574        message_id: MessageId,
575        conversation_id: ConversationId,
576        role: &str,
577        content: &str,
578    ) -> bool {
579        let Some(qdrant) = self.qdrant.clone() else {
580            return false;
581        };
582        let embed_provider = self.effective_embed_provider().clone();
583        if !embed_provider.supports_embeddings() {
584            return false;
585        }
586        self.spawn_embed_bg(embed_and_store_regular_bg(EmbedBgArgs {
587            qdrant,
588            embed_provider,
589            embedding_model: self.embedding_model.clone(),
590            message_id,
591            conversation_id,
592            role: role.to_owned(),
593            content: content.to_owned(),
594        }))
595    }
596
597    /// Embed content chunks, enriching Qdrant payload with tool metadata when present.
598    ///
599    /// Spawns a bounded background task; returns immediately.
600    fn embed_chunks_with_tool_context(
601        &self,
602        message_id: MessageId,
603        conversation_id: ConversationId,
604        role: &str,
605        content: &str,
606        embed_ctx: EmbedContext,
607    ) -> bool {
608        let Some(qdrant) = self.qdrant.clone() else {
609            return false;
610        };
611        let embed_provider = self.effective_embed_provider().clone();
612        if !embed_provider.supports_embeddings() {
613            return false;
614        }
615        self.spawn_embed_bg(embed_chunks_with_tool_context_bg(
616            EmbedBgArgs {
617                qdrant,
618                embed_provider,
619                embedding_model: self.embedding_model.clone(),
620                message_id,
621                conversation_id,
622                role: role.to_owned(),
623                content: content.to_owned(),
624            },
625            embed_ctx,
626        ))
627    }
628
629    /// Save a message to `SQLite` without generating an embedding.
630    ///
631    /// Use this when embedding is intentionally skipped (e.g. autosave disabled for assistant).
632    ///
633    /// # Errors
634    ///
635    /// Returns an error if the `SQLite` save fails.
636    pub async fn save_only(
637        &self,
638        conversation_id: ConversationId,
639        role: &str,
640        content: &str,
641        parts_json: &str,
642    ) -> Result<MessageId, MemoryError> {
643        self.sqlite
644            .save_message_with_parts(conversation_id, role, content, parts_json)
645            .await
646    }
647
648    /// Recall relevant messages using hybrid search (vector + FTS5 keyword).
649    ///
650    /// When Qdrant is available, runs both vector and keyword searches, then merges
651    /// results using weighted scoring. When Qdrant is unavailable, falls back to
652    /// FTS5-only keyword search.
653    ///
654    /// # Errors
655    ///
656    /// Returns an error if embedding generation, Qdrant search, or FTS5 query fails.
657    #[cfg_attr(
658        feature = "profiling",
659        tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty, top_score = tracing::field::Empty))
660    )]
661    pub async fn recall(
662        &self,
663        query: &str,
664        limit: usize,
665        filter: Option<SearchFilter>,
666    ) -> Result<Vec<RecalledMessage>, MemoryError> {
667        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
668
669        tracing::debug!(
670            query_len = query.len(),
671            limit,
672            has_filter = filter.is_some(),
673            conversation_id = conversation_id.map(|c| c.0),
674            has_qdrant = self.qdrant.is_some(),
675            "recall: starting hybrid search"
676        );
677
678        let keyword_results = match self
679            .sqlite
680            .keyword_search(query, limit * 2, conversation_id)
681            .await
682        {
683            Ok(results) => results,
684            Err(e) => {
685                tracing::warn!("FTS5 keyword search failed: {e:#}");
686                Vec::new()
687            }
688        };
689
690        let vector_results = if let Some(qdrant) = &self.qdrant
691            && self.provider.supports_embeddings()
692        {
693            let query_vector = self.provider.embed(query).await?;
694            let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
695            qdrant.ensure_collection(vector_size).await?;
696            qdrant.search(&query_vector, limit * 2, filter).await?
697        } else {
698            Vec::new()
699        };
700
701        let results = self
702            .recall_merge_and_rank(keyword_results, vector_results, limit)
703            .await?;
704        #[cfg(feature = "profiling")]
705        {
706            let span = tracing::Span::current();
707            span.record("result_count", results.len());
708            if let Some(top) = results.first() {
709                span.record("top_score", top.score);
710            }
711        }
712        Ok(results)
713    }
714
715    pub(super) async fn recall_fts5_raw(
716        &self,
717        query: &str,
718        limit: usize,
719        conversation_id: Option<ConversationId>,
720    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
721        self.sqlite
722            .keyword_search(query, limit * 2, conversation_id)
723            .await
724    }
725
726    pub(super) async fn recall_vectors_raw(
727        &self,
728        query: &str,
729        limit: usize,
730        filter: Option<SearchFilter>,
731    ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
732        let Some(qdrant) = &self.qdrant else {
733            return Ok(Vec::new());
734        };
735        if !self.provider.supports_embeddings() {
736            return Ok(Vec::new());
737        }
738        let query_vector = self.provider.embed(query).await?;
739        let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
740        qdrant.ensure_collection(vector_size).await?;
741        qdrant.search(&query_vector, limit * 2, filter).await
742    }
743
744    /// Merge raw keyword and vector results, apply weighted scoring, temporal decay, and MMR
745    /// re-ranking, then resolve to `RecalledMessage` objects.
746    ///
747    /// This is the shared post-processing step used by all recall paths.
748    ///
749    /// # Errors
750    ///
751    /// Returns an error if the `SQLite` `messages_by_ids` query fails.
752    #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
753    pub(super) async fn recall_merge_and_rank(
754        &self,
755        keyword_results: Vec<(MessageId, f64)>,
756        vector_results: Vec<crate::embedding_store::SearchResult>,
757        limit: usize,
758    ) -> Result<Vec<RecalledMessage>, MemoryError> {
759        tracing::debug!(
760            vector_count = vector_results.len(),
761            keyword_count = keyword_results.len(),
762            limit,
763            "recall: merging search results"
764        );
765
766        let mut scores: std::collections::HashMap<MessageId, f64> =
767            std::collections::HashMap::new();
768
769        if !vector_results.is_empty() {
770            let max_vs = vector_results
771                .iter()
772                .map(|r| r.score)
773                .fold(f32::NEG_INFINITY, f32::max);
774            let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
775            for r in &vector_results {
776                let normalized = f64::from(r.score / norm);
777                *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
778            }
779        }
780
781        if !keyword_results.is_empty() {
782            let max_ks = keyword_results
783                .iter()
784                .map(|r| r.1)
785                .fold(f64::NEG_INFINITY, f64::max);
786            let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
787            for &(msg_id, score) in &keyword_results {
788                let normalized = score / norm;
789                *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
790            }
791        }
792
793        if scores.is_empty() {
794            tracing::debug!("recall: empty merge, no overlapping scores");
795            return Ok(Vec::new());
796        }
797
798        let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
799        ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
800
801        tracing::debug!(
802            merged = ranked.len(),
803            top_score = ranked.first().map(|r| r.1),
804            bottom_score = ranked.last().map(|r| r.1),
805            vector_weight = %self.vector_weight,
806            keyword_weight = %self.keyword_weight,
807            "recall: weighted merge complete"
808        );
809
810        if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
811            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
812            match self.sqlite.message_timestamps(&ids).await {
813                Ok(timestamps) => {
814                    apply_temporal_decay(
815                        &mut ranked,
816                        &timestamps,
817                        self.temporal_decay_half_life_days,
818                    );
819                    ranked
820                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
821                    tracing::debug!(
822                        half_life_days = self.temporal_decay_half_life_days,
823                        top_score_after = ranked.first().map(|r| r.1),
824                        "recall: temporal decay applied"
825                    );
826                }
827                Err(e) => {
828                    tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
829                }
830            }
831        }
832
833        if self.mmr_enabled && !vector_results.is_empty() {
834            if let Some(qdrant) = &self.qdrant {
835                let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
836                match qdrant.get_vectors(&ids).await {
837                    Ok(vec_map) if !vec_map.is_empty() => {
838                        let ranked_len_before = ranked.len();
839                        ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
840                        tracing::debug!(
841                            before = ranked_len_before,
842                            after = ranked.len(),
843                            lambda = %self.mmr_lambda,
844                            "recall: mmr re-ranked"
845                        );
846                    }
847                    Ok(_) => {
848                        ranked.truncate(limit);
849                    }
850                    Err(e) => {
851                        tracing::warn!("MMR: failed to fetch vectors: {e:#}");
852                        ranked.truncate(limit);
853                    }
854                }
855            } else {
856                ranked.truncate(limit);
857            }
858        } else {
859            ranked.truncate(limit);
860        }
861
862        if self.importance_enabled && !ranked.is_empty() {
863            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
864            match self.sqlite.fetch_importance_scores(&ids).await {
865                Ok(scores) => {
866                    for (msg_id, score) in &mut ranked {
867                        if let Some(&imp) = scores.get(msg_id) {
868                            *score += imp * self.importance_weight;
869                        }
870                    }
871                    ranked
872                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
873                    tracing::debug!(
874                        importance_weight = %self.importance_weight,
875                        "recall: importance scores blended"
876                    );
877                }
878                Err(e) => {
879                    tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
880                }
881            }
882        }
883
884        // Apply tier boost: semantic-tier messages receive an additive bonus so distilled facts
885        // rank above episodic messages with the same base score. Additive (not multiplicative)
886        // so the effect is consistent regardless of base score magnitude.
887        if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
888            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
889            match self.sqlite.fetch_tiers(&ids).await {
890                Ok(tiers) => {
891                    let bonus = self.tier_boost_semantic - 1.0;
892                    let mut boosted = false;
893                    for (msg_id, score) in &mut ranked {
894                        if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
895                            *score += bonus;
896                            boosted = true;
897                        }
898                    }
899                    if boosted {
900                        ranked.sort_by(|a, b| {
901                            b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
902                        });
903                        tracing::debug!(
904                            tier_boost = %self.tier_boost_semantic,
905                            "recall: semantic tier boost applied"
906                        );
907                    }
908                }
909                Err(e) => {
910                    tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
911                }
912            }
913        }
914
915        let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
916
917        if !ids.is_empty()
918            && let Err(e) = self.batch_increment_access_count(ids.clone()).await
919        {
920            tracing::warn!("recall: failed to increment access counts: {e:#}");
921        }
922
923        // Update RL admission training data: mark recalled messages as positive examples.
924        if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
925            tracing::debug!(
926                error = %e,
927                "recall: failed to mark training data as recalled (non-fatal)"
928            );
929        }
930
931        let messages = self.sqlite.messages_by_ids(&ids).await?;
932        let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
933
934        let recalled: Vec<RecalledMessage> = ranked
935            .iter()
936            .filter_map(|(msg_id, score)| {
937                msg_map.get(msg_id).map(|msg| RecalledMessage {
938                    message: msg.clone(),
939                    #[expect(clippy::cast_possible_truncation)]
940                    score: *score as f32,
941                })
942            })
943            .collect();
944
945        tracing::debug!(final_count = recalled.len(), "recall: final results");
946
947        Ok(recalled)
948    }
949
950    /// Recall messages using query-aware routing.
951    ///
952    /// Delegates to FTS5-only, vector-only, or hybrid search based on the router decision,
953    /// then runs the shared merge and ranking pipeline.
954    ///
955    /// # Errors
956    ///
957    /// Returns an error if any underlying search or database operation fails.
958    #[cfg_attr(
959        feature = "profiling",
960        tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty))
961    )]
962    pub async fn recall_routed(
963        &self,
964        query: &str,
965        limit: usize,
966        filter: Option<SearchFilter>,
967        router: &dyn crate::router::MemoryRouter,
968    ) -> Result<Vec<RecalledMessage>, MemoryError> {
969        use crate::router::MemoryRoute;
970
971        let route = router.route(query);
972        tracing::debug!(?route, query_len = query.len(), "memory routing decision");
973
974        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
975
976        let (keyword_results, vector_results): (
977            Vec<(MessageId, f64)>,
978            Vec<crate::embedding_store::SearchResult>,
979        ) = match route {
980            MemoryRoute::Keyword => {
981                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
982                (kw, Vec::new())
983            }
984            MemoryRoute::Semantic => {
985                let vr = self.recall_vectors_raw(query, limit, filter).await?;
986                (Vec::new(), vr)
987            }
988            MemoryRoute::Hybrid => {
989                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
990                    Ok(r) => r,
991                    Err(e) => {
992                        tracing::warn!("FTS5 keyword search failed: {e:#}");
993                        Vec::new()
994                    }
995                };
996                let vr = self.recall_vectors_raw(query, limit, filter).await?;
997                (kw, vr)
998            }
999            // Episodic: FTS5 keyword search with an optional timestamp-range filter.
1000            // Temporal keywords are stripped from the query before passing to FTS5 to
1001            // prevent BM25 score distortion (e.g. "yesterday" matching messages that
1002            // literally contain the word "yesterday" regardless of actual relevance).
1003            // Vector search is skipped for speed; temporal decay in recall_merge_and_rank
1004            // provides recency boosting for the FTS5 results.
1005            // Known trade-off (MVP): semantically similar but lexically different messages
1006            // may be missed. See issue #1629 for a future hybrid_temporal mode.
1007            MemoryRoute::Episodic => {
1008                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
1009                let cleaned = crate::router::strip_temporal_keywords(query);
1010                let search_query = if cleaned.is_empty() { query } else { &cleaned };
1011                let kw = if let Some(ref r) = range {
1012                    self.sqlite
1013                        .keyword_search_with_time_range(
1014                            search_query,
1015                            limit,
1016                            conversation_id,
1017                            r.after.as_deref(),
1018                            r.before.as_deref(),
1019                        )
1020                        .await?
1021                } else {
1022                    self.recall_fts5_raw(search_query, limit, conversation_id)
1023                        .await?
1024                };
1025                tracing::debug!(
1026                    has_range = range.is_some(),
1027                    cleaned_query = %search_query,
1028                    keyword_count = kw.len(),
1029                    "recall: episodic path"
1030                );
1031                (kw, Vec::new())
1032            }
1033            // Graph routing triggers graph_recall separately in agent/context.rs.
1034            // For the message-based recall, behave like Hybrid.
1035            MemoryRoute::Graph => {
1036                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1037                    Ok(r) => r,
1038                    Err(e) => {
1039                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
1040                        Vec::new()
1041                    }
1042                };
1043                let vr = self.recall_vectors_raw(query, limit, filter).await?;
1044                (kw, vr)
1045            }
1046        };
1047
1048        tracing::debug!(
1049            keyword_count = keyword_results.len(),
1050            vector_count = vector_results.len(),
1051            "recall: routed search results"
1052        );
1053
1054        self.recall_merge_and_rank(keyword_results, vector_results, limit)
1055            .await
1056    }
1057
1058    /// Async variant of [`recall_routed`](Self::recall_routed) that uses
1059    /// [`AsyncMemoryRouter::route_async`](crate::router::AsyncMemoryRouter::route_async) when
1060    /// available, enabling LLM-based routing for `LlmRouter` and `HybridRouter`.
1061    ///
1062    /// Falls back to [`recall_routed`](Self::recall_routed) for routers that only implement
1063    /// the sync `MemoryRouter` trait (e.g. `HeuristicRouter`).
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns an error if any underlying search or database operation fails.
1068    #[cfg_attr(
1069        feature = "profiling",
1070        tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty))
1071    )]
1072    pub async fn recall_routed_async(
1073        &self,
1074        query: &str,
1075        limit: usize,
1076        filter: Option<crate::embedding_store::SearchFilter>,
1077        router: &dyn crate::router::AsyncMemoryRouter,
1078    ) -> Result<Vec<RecalledMessage>, MemoryError> {
1079        use crate::router::MemoryRoute;
1080
1081        let decision = router.route_async(query).await;
1082        let route = decision.route;
1083        tracing::debug!(
1084            ?route,
1085            confidence = decision.confidence,
1086            query_len = query.len(),
1087            "memory routing decision (async)"
1088        );
1089
1090        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
1091
1092        let (keyword_results, vector_results): (
1093            Vec<(crate::types::MessageId, f64)>,
1094            Vec<crate::embedding_store::SearchResult>,
1095        ) = match route {
1096            MemoryRoute::Keyword => {
1097                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
1098                (kw, Vec::new())
1099            }
1100            MemoryRoute::Semantic => {
1101                let vr = self.recall_vectors_raw(query, limit, filter).await?;
1102                (Vec::new(), vr)
1103            }
1104            MemoryRoute::Hybrid => {
1105                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1106                    Ok(r) => r,
1107                    Err(e) => {
1108                        tracing::warn!("FTS5 keyword search failed: {e:#}");
1109                        Vec::new()
1110                    }
1111                };
1112                let vr = self.recall_vectors_raw(query, limit, filter).await?;
1113                (kw, vr)
1114            }
1115            MemoryRoute::Episodic => {
1116                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
1117                let cleaned = crate::router::strip_temporal_keywords(query);
1118                let search_query = if cleaned.is_empty() { query } else { &cleaned };
1119                let kw = if let Some(ref r) = range {
1120                    self.sqlite
1121                        .keyword_search_with_time_range(
1122                            search_query,
1123                            limit,
1124                            conversation_id,
1125                            r.after.as_deref(),
1126                            r.before.as_deref(),
1127                        )
1128                        .await?
1129                } else {
1130                    self.recall_fts5_raw(search_query, limit, conversation_id)
1131                        .await?
1132                };
1133                (kw, Vec::new())
1134            }
1135            MemoryRoute::Graph => {
1136                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1137                    Ok(r) => r,
1138                    Err(e) => {
1139                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
1140                        Vec::new()
1141                    }
1142                };
1143                let vr = self.recall_vectors_raw(query, limit, filter).await?;
1144                (kw, vr)
1145            }
1146        };
1147
1148        tracing::debug!(
1149            keyword_count = keyword_results.len(),
1150            vector_count = vector_results.len(),
1151            "recall: routed search results (async)"
1152        );
1153
1154        self.recall_merge_and_rank(keyword_results, vector_results, limit)
1155            .await
1156    }
1157
1158    /// Retrieve graph facts relevant to `query` via BFS traversal.
1159    ///
1160    /// Returns an empty `Vec` if no `graph_store` is configured.
1161    ///
1162    /// # Parameters
1163    ///
1164    /// - `at_timestamp`: when `Some`, only edges valid at that `SQLite` datetime string are returned.
1165    ///   When `None`, only currently active edges are used.
1166    /// - `temporal_decay_rate`: non-negative decay rate (1/day). `0.0` preserves original ordering.
1167    ///
1168    /// # Errors
1169    ///
1170    /// Returns an error if the underlying graph query fails.
1171    #[cfg_attr(
1172        feature = "profiling",
1173        tracing::instrument(name = "memory.recall_graph", skip_all, fields(result_count = tracing::field::Empty))
1174    )]
1175    pub async fn recall_graph(
1176        &self,
1177        query: &str,
1178        limit: usize,
1179        max_hops: u32,
1180        at_timestamp: Option<&str>,
1181        temporal_decay_rate: f64,
1182        edge_types: &[crate::graph::EdgeType],
1183    ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
1184        let Some(store) = &self.graph_store else {
1185            return Ok(Vec::new());
1186        };
1187
1188        tracing::debug!(
1189            query_len = query.len(),
1190            limit,
1191            max_hops,
1192            "graph: starting recall"
1193        );
1194
1195        let results = crate::graph::retrieval::graph_recall(
1196            store,
1197            self.qdrant.as_deref(),
1198            &self.provider,
1199            query,
1200            limit,
1201            max_hops,
1202            at_timestamp,
1203            temporal_decay_rate,
1204            edge_types,
1205        )
1206        .await?;
1207
1208        tracing::debug!(result_count = results.len(), "graph: recall complete");
1209        #[cfg(feature = "profiling")]
1210        tracing::Span::current().record("result_count", results.len());
1211
1212        Ok(results)
1213    }
1214
1215    /// Retrieve graph facts via SYNAPSE spreading activation.
1216    ///
1217    /// Delegates to [`crate::graph::retrieval::graph_recall_activated`].
1218    /// Used in place of [`Self::recall_graph`] when `spreading_activation.enabled = true`.
1219    ///
1220    /// # Errors
1221    ///
1222    /// Returns an error if the underlying graph query fails.
1223    #[cfg_attr(
1224        feature = "profiling",
1225        tracing::instrument(name = "memory.recall_graph", skip_all, fields(result_count = tracing::field::Empty))
1226    )]
1227    pub async fn recall_graph_activated(
1228        &self,
1229        query: &str,
1230        limit: usize,
1231        params: crate::graph::SpreadingActivationParams,
1232        edge_types: &[crate::graph::EdgeType],
1233    ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
1234        let Some(store) = &self.graph_store else {
1235            return Ok(Vec::new());
1236        };
1237
1238        tracing::debug!(
1239            query_len = query.len(),
1240            limit,
1241            "spreading activation: starting graph recall"
1242        );
1243
1244        let embeddings = self.qdrant.as_deref();
1245        let results = crate::graph::retrieval::graph_recall_activated(
1246            store,
1247            embeddings,
1248            &self.provider,
1249            query,
1250            limit,
1251            params,
1252            edge_types,
1253        )
1254        .await?;
1255
1256        tracing::debug!(
1257            result_count = results.len(),
1258            "spreading activation: graph recall complete"
1259        );
1260
1261        Ok(results)
1262    }
1263
1264    /// Increment access count and update `last_accessed` for a batch of message IDs.
1265    ///
1266    /// Skips the update if `message_ids` is empty to avoid an invalid `IN ()` clause.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns an error if the `SQLite` update fails.
1271    async fn batch_increment_access_count(
1272        &self,
1273        message_ids: Vec<MessageId>,
1274    ) -> Result<(), MemoryError> {
1275        if message_ids.is_empty() {
1276            return Ok(());
1277        }
1278        self.sqlite.increment_access_counts(&message_ids).await
1279    }
1280
1281    /// Check whether an embedding exists for a given message ID.
1282    ///
1283    /// # Errors
1284    ///
1285    /// Returns an error if the `SQLite` query fails.
1286    pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
1287        match &self.qdrant {
1288            Some(qdrant) => qdrant.has_embedding(message_id).await,
1289            None => Ok(false),
1290        }
1291    }
1292
1293    /// Embed all messages that do not yet have embeddings.
1294    ///
1295    /// Processes unembedded messages in micro-batches of 32, using `buffer_unordered(4)` for
1296    /// concurrent embedding within each batch. Bounded peak memory: at most 32 messages of content
1297    /// plus their embedding vectors are live at any time.
1298    ///
1299    /// When `progress_tx` is `Some`, sends `Some(BackfillProgress)` after each message and
1300    /// `None` on completion (or on timeout/error in the caller).
1301    ///
1302    /// Returns the count of successfully embedded messages.
1303    ///
1304    /// # Errors
1305    ///
1306    /// Returns an error if collection initialization or the streaming query setup fails.
1307    /// Individual embedding failures are logged but do not stop processing.
1308    pub async fn embed_missing(
1309        &self,
1310        progress_tx: Option<tokio::sync::watch::Sender<Option<super::BackfillProgress>>>,
1311    ) -> Result<usize, MemoryError> {
1312        if self.qdrant.is_none() || !self.effective_embed_provider().supports_embeddings() {
1313            return Ok(0);
1314        }
1315
1316        let total = self.sqlite.count_unembedded_messages().await?;
1317        if total == 0 {
1318            return Ok(0);
1319        }
1320
1321        if let Some(tx) = &progress_tx {
1322            let _ = tx.send(Some(super::BackfillProgress { done: 0, total }));
1323        }
1324
1325        let mut done = 0usize;
1326        let mut succeeded = 0usize;
1327
1328        loop {
1329            const BATCH_SIZE: usize = 32;
1330            const BATCH_SIZE_I64: i64 = 32;
1331            let rows: Vec<_> = self
1332                .sqlite
1333                .stream_unembedded_messages(BATCH_SIZE_I64)
1334                .try_collect()
1335                .await?;
1336
1337            if rows.is_empty() {
1338                break;
1339            }
1340
1341            let batch_len = rows.len();
1342
1343            let results: Vec<bool> = futures::stream::iter(rows)
1344                .map(|(msg_id, conv_id, role, content)| async move {
1345                    self.embed_and_store_regular(msg_id, conv_id, &role, &content)
1346                })
1347                .buffer_unordered(4)
1348                .collect()
1349                .await;
1350
1351            for ok in &results {
1352                done += 1;
1353                if *ok {
1354                    succeeded += 1;
1355                }
1356                if let Some(tx) = &progress_tx {
1357                    let _ = tx.send(Some(super::BackfillProgress { done, total }));
1358                }
1359            }
1360
1361            let batch_succeeded = results.iter().filter(|&&b| b).count();
1362            if batch_succeeded > 0 {
1363                tracing::debug!("Backfill batch: {batch_succeeded}/{batch_len} embedded");
1364            }
1365
1366            if batch_len < BATCH_SIZE {
1367                break;
1368            }
1369        }
1370
1371        if let Some(tx) = &progress_tx {
1372            let _ = tx.send(None);
1373        }
1374
1375        if done > 0 {
1376            tracing::info!("Embedded {succeeded}/{total} missing messages");
1377        }
1378        Ok(succeeded)
1379    }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384    use super::*;
1385
1386    #[test]
1387    fn embed_context_default_all_none() {
1388        let ctx = EmbedContext::default();
1389        assert!(ctx.tool_name.is_none());
1390        assert!(ctx.exit_code.is_none());
1391        assert!(ctx.timestamp.is_none());
1392    }
1393
1394    #[test]
1395    fn embed_context_fields_set_correctly() {
1396        let ctx = EmbedContext {
1397            tool_name: Some("shell".to_string()),
1398            exit_code: Some(0),
1399            timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1400        };
1401        assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1402        assert_eq!(ctx.exit_code, Some(0));
1403        assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1404    }
1405
1406    #[test]
1407    fn embed_context_non_zero_exit_code() {
1408        let ctx = EmbedContext {
1409            tool_name: Some("shell".to_string()),
1410            exit_code: Some(1),
1411            timestamp: None,
1412        };
1413        assert_eq!(ctx.exit_code, Some(1));
1414        assert!(ctx.timestamp.is_none());
1415    }
1416}