Skip to main content

zeph_memory/semantic/
recall.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use futures::{StreamExt as _, TryStreamExt as _};
5use zeph_llm::provider::{LlmProvider as _, Message};
6
7/// Approximate characters per token (conservative estimate for mixed content).
8const CHARS_PER_TOKEN: usize = 4;
9
10/// Target chunk size in characters (~400 tokens).
11const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
12
13/// Overlap between adjacent chunks in characters (~80 tokens).
14const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
15
16/// Split `text` into overlapping chunks suitable for embedding.
17///
18/// For text shorter than `CHUNK_CHARS`, returns a single chunk.
19/// Splits at UTF-8 character boundaries on paragraph (`\n\n`), line (`\n`),
20/// space (` `), or raw character boundaries as a last resort.
21fn chunk_text(text: &str) -> Vec<&str> {
22    if text.len() <= CHUNK_CHARS {
23        return vec![text];
24    }
25
26    let mut chunks = Vec::new();
27    let mut start = 0;
28
29    while start < text.len() {
30        let end = if start + CHUNK_CHARS >= text.len() {
31            text.len()
32        } else {
33            // Find a clean UTF-8 char boundary at or before start + CHUNK_CHARS.
34            let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
35            // Prefer to split at a paragraph or line break for cleaner chunks.
36            let slice = &text[start..boundary];
37            if let Some(pos) = slice.rfind("\n\n") {
38                start + pos + 2
39            } else if let Some(pos) = slice.rfind('\n') {
40                start + pos + 1
41            } else if let Some(pos) = slice.rfind(' ') {
42                start + pos + 1
43            } else {
44                boundary
45            }
46        };
47
48        chunks.push(&text[start..end]);
49        if end >= text.len() {
50            break;
51        }
52        // Next chunk starts with overlap, but must always advance past the
53        // current position to prevent infinite loops when rfind finds a match
54        // very early in the slice (end barely advances, overlap rewinds start).
55        let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
56        let new_start = text.ceil_char_boundary(next);
57        start = if new_start > start { new_start } else { end };
58    }
59
60    chunks
61}
62
63use crate::admission::log_admission_decision;
64use crate::embedding_store::{MessageKind, SearchFilter};
65use crate::error::MemoryError;
66use crate::types::{ConversationId, MessageId};
67
68use super::SemanticMemory;
69use super::algorithms::{apply_mmr, apply_temporal_decay};
70
71/// Tool execution metadata stored as Qdrant payload fields alongside embeddings.
72///
73/// Stored as payload — NOT prepended to content — to avoid corrupting embedding vectors.
74#[derive(Debug, Clone, Default)]
75pub struct EmbedContext {
76    pub tool_name: Option<String>,
77    pub exit_code: Option<i32>,
78    pub timestamp: Option<String>,
79}
80
81#[derive(Debug)]
82pub struct RecalledMessage {
83    pub message: Message,
84    pub score: f32,
85}
86
87/// Maximum number of concurrent background embed tasks per `SemanticMemory` instance.
88const MAX_EMBED_BG_TASKS: usize = 64;
89
90/// Shared arguments for background embed tasks.
91struct EmbedBgArgs {
92    qdrant: std::sync::Arc<crate::embedding_store::EmbeddingStore>,
93    embed_provider: zeph_llm::any::AnyProvider,
94    embedding_model: String,
95    message_id: MessageId,
96    conversation_id: ConversationId,
97    role: String,
98    content: String,
99}
100
101/// Background task: embed chunks and store as regular message vectors.
102///
103/// All errors are logged as warnings; the function never panics.
104async fn embed_and_store_regular_bg(args: EmbedBgArgs) {
105    let EmbedBgArgs {
106        qdrant,
107        embed_provider,
108        embedding_model,
109        message_id,
110        conversation_id,
111        role,
112        content,
113    } = args;
114    let chunks = chunk_text(&content);
115    let chunk_count = chunks.len();
116
117    let vectors = match embed_provider.embed_batch(&chunks).await {
118        Ok(v) => v,
119        Err(e) => {
120            tracing::warn!("bg embed_regular: failed to embed chunks for msg {message_id}: {e:#}");
121            return;
122        }
123    };
124
125    let Some(first) = vectors.first() else {
126        return;
127    };
128    let vector_size = first.len() as u64;
129    if let Err(e) = qdrant.ensure_collection(vector_size).await {
130        tracing::warn!("bg embed_regular: failed to ensure Qdrant collection: {e:#}");
131        return;
132    }
133
134    for (chunk_index, vector) in vectors.into_iter().enumerate() {
135        let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
136        if let Err(e) = qdrant
137            .store(
138                message_id,
139                conversation_id,
140                &role,
141                vector,
142                MessageKind::Regular,
143                &embedding_model,
144                chunk_index_u32,
145            )
146            .await
147        {
148            tracing::warn!(
149                "bg embed_regular: failed to store chunk {chunk_index}/{chunk_count} \
150                 for msg {message_id}: {e:#}"
151            );
152        }
153    }
154}
155
156/// Background task: embed chunks with tool context metadata and store in Qdrant.
157///
158/// All errors are logged as warnings; the function never panics.
159async fn embed_chunks_with_tool_context_bg(args: EmbedBgArgs, embed_ctx: EmbedContext) {
160    let EmbedBgArgs {
161        qdrant,
162        embed_provider,
163        embedding_model,
164        message_id,
165        conversation_id,
166        role,
167        content,
168    } = args;
169    let chunks = chunk_text(&content);
170    let chunk_count = chunks.len();
171
172    let vectors = match embed_provider.embed_batch(&chunks).await {
173        Ok(v) => v,
174        Err(e) => {
175            tracing::warn!(
176                "bg embed_tool: failed to embed tool-output chunks for msg {message_id}: {e:#}"
177            );
178            return;
179        }
180    };
181
182    if let Some(first) = vectors.first() {
183        let vector_size = first.len() as u64;
184        if let Err(e) = qdrant.ensure_collection(vector_size).await {
185            tracing::warn!("bg embed_tool: failed to ensure Qdrant collection: {e:#}");
186            return;
187        }
188    }
189
190    for (chunk_index, vector) in vectors.into_iter().enumerate() {
191        let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
192        let result = if let Some(ref tool_name) = embed_ctx.tool_name {
193            qdrant
194                .store_with_tool_context(
195                    message_id,
196                    conversation_id,
197                    &role,
198                    vector,
199                    MessageKind::Regular,
200                    &embedding_model,
201                    chunk_index_u32,
202                    tool_name,
203                    embed_ctx.exit_code,
204                    embed_ctx.timestamp.as_deref(),
205                )
206                .await
207                .map(|_| ())
208        } else {
209            qdrant
210                .store(
211                    message_id,
212                    conversation_id,
213                    &role,
214                    vector,
215                    MessageKind::Regular,
216                    &embedding_model,
217                    chunk_index_u32,
218                )
219                .await
220                .map(|_| ())
221        };
222        if let Err(e) = result {
223            tracing::warn!(
224                "bg embed_tool: failed to store chunk {chunk_index}/{chunk_count} \
225                 for msg {message_id}: {e:#}"
226            );
227        }
228    }
229}
230
231/// Background task: embed chunks with optional category and store in Qdrant.
232///
233/// All errors are logged as warnings; the function never panics.
234async fn embed_and_store_with_category_bg(args: EmbedBgArgs, category: Option<String>) {
235    let EmbedBgArgs {
236        qdrant,
237        embed_provider,
238        embedding_model,
239        message_id,
240        conversation_id,
241        role,
242        content,
243    } = args;
244    let chunks = chunk_text(&content);
245    let chunk_count = chunks.len();
246
247    let vectors = match embed_provider.embed_batch(&chunks).await {
248        Ok(v) => v,
249        Err(e) => {
250            tracing::warn!(
251                "bg embed_category: failed to embed categorized chunks for msg {message_id}: {e:#}"
252            );
253            return;
254        }
255    };
256
257    let Some(first) = vectors.first() else {
258        return;
259    };
260    let vector_size = first.len() as u64;
261    if let Err(e) = qdrant.ensure_collection(vector_size).await {
262        tracing::warn!("bg embed_category: failed to ensure Qdrant collection: {e:#}");
263        return;
264    }
265
266    for (chunk_index, vector) in vectors.into_iter().enumerate() {
267        let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
268        if let Err(e) = qdrant
269            .store_with_category(
270                message_id,
271                conversation_id,
272                &role,
273                vector,
274                MessageKind::Regular,
275                &embedding_model,
276                chunk_index_u32,
277                category.as_deref(),
278            )
279            .await
280        {
281            tracing::warn!(
282                "bg embed_category: failed to store chunk {chunk_index}/{chunk_count} \
283                 for msg {message_id}: {e:#}"
284            );
285        }
286    }
287}
288
289impl SemanticMemory {
290    /// Save a message to `SQLite` and optionally embed and store in Qdrant.
291    ///
292    /// Returns `Ok(Some(message_id))` when admitted and persisted.
293    /// Returns `Ok(None)` when A-MAC admission control rejects the message (not an error).
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if the `SQLite` save fails. Embedding failures are logged but not
298    /// propagated.
299    #[cfg_attr(
300        feature = "profiling",
301        tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
302    )]
303    pub async fn remember(
304        &self,
305        conversation_id: ConversationId,
306        role: &str,
307        content: &str,
308        goal_text: Option<&str>,
309    ) -> Result<Option<MessageId>, MemoryError> {
310        // A-MAC admission gate.
311        if let Some(ref admission) = self.admission_control {
312            let decision = admission
313                .evaluate(
314                    content,
315                    role,
316                    &self.provider,
317                    self.qdrant.as_ref(),
318                    goal_text,
319                )
320                .await;
321            let preview: String = content.chars().take(100).collect();
322            log_admission_decision(&decision, &preview, role, admission.threshold());
323            if !decision.admitted {
324                return Ok(None);
325            }
326        }
327
328        let message_id = self
329            .sqlite
330            .save_message(conversation_id, role, content)
331            .await?;
332
333        self.embed_and_store_regular(message_id, conversation_id, role, content);
334
335        Ok(Some(message_id))
336    }
337
338    /// Save a message with pre-serialized parts JSON to `SQLite` and optionally embed in Qdrant.
339    ///
340    /// Returns `Ok((Some(message_id), embedding_stored))` when admitted and persisted.
341    /// Returns `Ok((None, false))` when A-MAC admission control rejects the message.
342    ///
343    /// # Errors
344    ///
345    /// Returns an error if the `SQLite` save fails.
346    #[cfg_attr(
347        feature = "profiling",
348        tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
349    )]
350    pub async fn remember_with_parts(
351        &self,
352        conversation_id: ConversationId,
353        role: &str,
354        content: &str,
355        parts_json: &str,
356        goal_text: Option<&str>,
357    ) -> Result<(Option<MessageId>, bool), MemoryError> {
358        // A-MAC admission gate.
359        if let Some(ref admission) = self.admission_control {
360            let decision = admission
361                .evaluate(
362                    content,
363                    role,
364                    &self.provider,
365                    self.qdrant.as_ref(),
366                    goal_text,
367                )
368                .await;
369            let preview: String = content.chars().take(100).collect();
370            log_admission_decision(&decision, &preview, role, admission.threshold());
371            if !decision.admitted {
372                return Ok((None, false));
373            }
374        }
375
376        let message_id = self
377            .sqlite
378            .save_message_with_parts(conversation_id, role, content, parts_json)
379            .await?;
380
381        let embedding_stored =
382            self.embed_and_store_regular(message_id, conversation_id, role, content);
383
384        Ok((Some(message_id), embedding_stored))
385    }
386
387    /// Save a tool output to `SQLite` and embed with tool metadata in Qdrant payload.
388    ///
389    /// Tool metadata (`tool_name`, `exit_code`, `timestamp`) is stored as Qdrant payload fields
390    /// so it is available for filtering without corrupting the embedding vector.
391    ///
392    /// Returns `Ok(Some(message_id))` when admitted and persisted.
393    /// Returns `Ok(None)` when A-MAC admission control rejects the message.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the `SQLite` save fails.
398    #[cfg_attr(
399        feature = "profiling",
400        tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
401    )]
402    pub async fn remember_tool_output(
403        &self,
404        conversation_id: ConversationId,
405        role: &str,
406        content: &str,
407        parts_json: &str,
408        embed_ctx: EmbedContext,
409    ) -> Result<(Option<MessageId>, bool), MemoryError> {
410        if let Some(ref admission) = self.admission_control {
411            let decision = admission
412                .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
413                .await;
414            let preview: String = content.chars().take(100).collect();
415            log_admission_decision(&decision, &preview, role, admission.threshold());
416            if !decision.admitted {
417                return Ok((None, false));
418            }
419        }
420
421        let message_id = self
422            .sqlite
423            .save_message_with_parts(conversation_id, role, content, parts_json)
424            .await?;
425
426        let embedding_stored = self.embed_chunks_with_tool_context(
427            message_id,
428            conversation_id,
429            role,
430            content,
431            embed_ctx,
432        );
433
434        Ok((Some(message_id), embedding_stored))
435    }
436
437    /// Save a categorized message to `SQLite` and embed with category payload in Qdrant.
438    ///
439    /// The `category` is stored in both the `messages.category` column and as a Qdrant payload
440    /// field for recall filtering. Uses A-MAC admission gate.
441    ///
442    /// Returns `Ok(Some(message_id))` when admitted; `Ok(None)` when rejected.
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the `SQLite` save fails.
447    #[cfg_attr(
448        feature = "profiling",
449        tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
450    )]
451    pub async fn remember_categorized(
452        &self,
453        conversation_id: ConversationId,
454        role: &str,
455        content: &str,
456        category: Option<&str>,
457        goal_text: Option<&str>,
458    ) -> Result<Option<MessageId>, MemoryError> {
459        if let Some(ref admission) = self.admission_control {
460            let decision = admission
461                .evaluate(
462                    content,
463                    role,
464                    &self.provider,
465                    self.qdrant.as_ref(),
466                    goal_text,
467                )
468                .await;
469            let preview: String = content.chars().take(100).collect();
470            log_admission_decision(&decision, &preview, role, admission.threshold());
471            if !decision.admitted {
472                return Ok(None);
473            }
474        }
475
476        let message_id = self
477            .sqlite
478            .save_message_with_category(conversation_id, role, content, category)
479            .await?;
480
481        self.embed_and_store_with_category(message_id, conversation_id, role, content, category);
482
483        Ok(Some(message_id))
484    }
485
486    /// Recall messages filtered by category.
487    ///
488    /// When `category` is `None`, behaves identically to [`Self::recall`].
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if the search fails.
493    pub async fn recall_with_category(
494        &self,
495        query: &str,
496        limit: usize,
497        filter: Option<SearchFilter>,
498        category: Option<&str>,
499    ) -> Result<Vec<RecalledMessage>, MemoryError> {
500        let filter_with_category = filter.map(|mut f| {
501            f.category = category.map(str::to_owned);
502            f
503        });
504        self.recall(query, limit, filter_with_category).await
505    }
506
507    /// Reap completed background embed tasks (non-blocking).
508    ///
509    /// Call at turn boundaries to release handles for finished tasks.
510    pub fn reap_embed_tasks(&self) {
511        if let Ok(mut tasks) = self.embed_tasks.lock() {
512            while tasks.try_join_next().is_some() {}
513        }
514    }
515
516    /// Spawn `fut` as a bounded background embed task.
517    ///
518    /// If the task limit is reached, the task is dropped and a debug message is logged.
519    fn spawn_embed_bg<F>(&self, fut: F) -> bool
520    where
521        F: std::future::Future<Output = ()> + Send + 'static,
522    {
523        let Ok(mut tasks) = self.embed_tasks.lock() else {
524            return false;
525        };
526        // Reap any finished tasks before checking capacity.
527        while tasks.try_join_next().is_some() {}
528        if tasks.len() >= MAX_EMBED_BG_TASKS {
529            tracing::debug!("background embed task limit reached, skipping");
530            return false;
531        }
532        tasks.spawn(fut);
533        true
534    }
535
536    /// Embed content chunks and store each with an optional category payload field.
537    ///
538    /// Spawns a bounded background task; returns immediately.
539    fn embed_and_store_with_category(
540        &self,
541        message_id: MessageId,
542        conversation_id: ConversationId,
543        role: &str,
544        content: &str,
545        category: Option<&str>,
546    ) -> bool {
547        let Some(qdrant) = self.qdrant.clone() else {
548            return false;
549        };
550        let embed_provider = self.effective_embed_provider().clone();
551        if !embed_provider.supports_embeddings() {
552            return false;
553        }
554        self.spawn_embed_bg(embed_and_store_with_category_bg(
555            EmbedBgArgs {
556                qdrant,
557                embed_provider,
558                embedding_model: self.embedding_model.clone(),
559                message_id,
560                conversation_id,
561                role: role.to_owned(),
562                content: content.to_owned(),
563            },
564            category.map(str::to_owned),
565        ))
566    }
567
568    /// Embed content chunks and store each as a regular (non-tool) message vector.
569    ///
570    /// Spawns a bounded background task; returns immediately.
571    fn embed_and_store_regular(
572        &self,
573        message_id: MessageId,
574        conversation_id: ConversationId,
575        role: &str,
576        content: &str,
577    ) -> bool {
578        let Some(qdrant) = self.qdrant.clone() else {
579            return false;
580        };
581        let embed_provider = self.effective_embed_provider().clone();
582        if !embed_provider.supports_embeddings() {
583            return false;
584        }
585        self.spawn_embed_bg(embed_and_store_regular_bg(EmbedBgArgs {
586            qdrant,
587            embed_provider,
588            embedding_model: self.embedding_model.clone(),
589            message_id,
590            conversation_id,
591            role: role.to_owned(),
592            content: content.to_owned(),
593        }))
594    }
595
596    /// Embed content chunks, enriching Qdrant payload with tool metadata when present.
597    ///
598    /// Spawns a bounded background task; returns immediately.
599    fn embed_chunks_with_tool_context(
600        &self,
601        message_id: MessageId,
602        conversation_id: ConversationId,
603        role: &str,
604        content: &str,
605        embed_ctx: EmbedContext,
606    ) -> bool {
607        let Some(qdrant) = self.qdrant.clone() else {
608            return false;
609        };
610        let embed_provider = self.effective_embed_provider().clone();
611        if !embed_provider.supports_embeddings() {
612            return false;
613        }
614        self.spawn_embed_bg(embed_chunks_with_tool_context_bg(
615            EmbedBgArgs {
616                qdrant,
617                embed_provider,
618                embedding_model: self.embedding_model.clone(),
619                message_id,
620                conversation_id,
621                role: role.to_owned(),
622                content: content.to_owned(),
623            },
624            embed_ctx,
625        ))
626    }
627
628    /// Save a message to `SQLite` without generating an embedding.
629    ///
630    /// Use this when embedding is intentionally skipped (e.g. autosave disabled for assistant).
631    ///
632    /// # Errors
633    ///
634    /// Returns an error if the `SQLite` save fails.
635    pub async fn save_only(
636        &self,
637        conversation_id: ConversationId,
638        role: &str,
639        content: &str,
640        parts_json: &str,
641    ) -> Result<MessageId, MemoryError> {
642        self.sqlite
643            .save_message_with_parts(conversation_id, role, content, parts_json)
644            .await
645    }
646
647    /// Recall relevant messages using hybrid search (vector + FTS5 keyword).
648    ///
649    /// When Qdrant is available, runs both vector and keyword searches, then merges
650    /// results using weighted scoring. When Qdrant is unavailable, falls back to
651    /// FTS5-only keyword search.
652    ///
653    /// # Errors
654    ///
655    /// Returns an error if embedding generation, Qdrant search, or FTS5 query fails.
656    #[cfg_attr(
657        feature = "profiling",
658        tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty, top_score = tracing::field::Empty))
659    )]
660    pub async fn recall(
661        &self,
662        query: &str,
663        limit: usize,
664        filter: Option<SearchFilter>,
665    ) -> Result<Vec<RecalledMessage>, MemoryError> {
666        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
667
668        tracing::debug!(
669            query_len = query.len(),
670            limit,
671            has_filter = filter.is_some(),
672            conversation_id = conversation_id.map(|c| c.0),
673            has_qdrant = self.qdrant.is_some(),
674            "recall: starting hybrid search"
675        );
676
677        let keyword_results = match self
678            .sqlite
679            .keyword_search(query, limit * 2, conversation_id)
680            .await
681        {
682            Ok(results) => results,
683            Err(e) => {
684                tracing::warn!("FTS5 keyword search failed: {e:#}");
685                Vec::new()
686            }
687        };
688
689        let vector_results = if let Some(qdrant) = &self.qdrant
690            && self.provider.supports_embeddings()
691        {
692            let query_vector = self.provider.embed(query).await?;
693            let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
694            qdrant.ensure_collection(vector_size).await?;
695            qdrant.search(&query_vector, limit * 2, filter).await?
696        } else {
697            Vec::new()
698        };
699
700        let results = self
701            .recall_merge_and_rank(keyword_results, vector_results, limit)
702            .await?;
703        #[cfg(feature = "profiling")]
704        {
705            let span = tracing::Span::current();
706            span.record("result_count", results.len());
707            if let Some(top) = results.first() {
708                span.record("top_score", top.score);
709            }
710        }
711        Ok(results)
712    }
713
714    pub(super) async fn recall_fts5_raw(
715        &self,
716        query: &str,
717        limit: usize,
718        conversation_id: Option<ConversationId>,
719    ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
720        self.sqlite
721            .keyword_search(query, limit * 2, conversation_id)
722            .await
723    }
724
725    pub(super) async fn recall_vectors_raw(
726        &self,
727        query: &str,
728        limit: usize,
729        filter: Option<SearchFilter>,
730    ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
731        let Some(qdrant) = &self.qdrant else {
732            return Ok(Vec::new());
733        };
734        if !self.provider.supports_embeddings() {
735            return Ok(Vec::new());
736        }
737        let query_vector = self.provider.embed(query).await?;
738        let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
739        qdrant.ensure_collection(vector_size).await?;
740        qdrant.search(&query_vector, limit * 2, filter).await
741    }
742
743    /// Merge raw keyword and vector results, apply weighted scoring, temporal decay, and MMR
744    /// re-ranking, then resolve to `RecalledMessage` objects.
745    ///
746    /// This is the shared post-processing step used by all recall paths.
747    ///
748    /// # Errors
749    ///
750    /// Returns an error if the `SQLite` `messages_by_ids` query fails.
751    #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
752    pub(super) async fn recall_merge_and_rank(
753        &self,
754        keyword_results: Vec<(MessageId, f64)>,
755        vector_results: Vec<crate::embedding_store::SearchResult>,
756        limit: usize,
757    ) -> Result<Vec<RecalledMessage>, MemoryError> {
758        tracing::debug!(
759            vector_count = vector_results.len(),
760            keyword_count = keyword_results.len(),
761            limit,
762            "recall: merging search results"
763        );
764
765        let mut scores: std::collections::HashMap<MessageId, f64> =
766            std::collections::HashMap::new();
767
768        if !vector_results.is_empty() {
769            let max_vs = vector_results
770                .iter()
771                .map(|r| r.score)
772                .fold(f32::NEG_INFINITY, f32::max);
773            let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
774            for r in &vector_results {
775                let normalized = f64::from(r.score / norm);
776                *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
777            }
778        }
779
780        if !keyword_results.is_empty() {
781            let max_ks = keyword_results
782                .iter()
783                .map(|r| r.1)
784                .fold(f64::NEG_INFINITY, f64::max);
785            let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
786            for &(msg_id, score) in &keyword_results {
787                let normalized = score / norm;
788                *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
789            }
790        }
791
792        if scores.is_empty() {
793            tracing::debug!("recall: empty merge, no overlapping scores");
794            return Ok(Vec::new());
795        }
796
797        let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
798        ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
799
800        tracing::debug!(
801            merged = ranked.len(),
802            top_score = ranked.first().map(|r| r.1),
803            bottom_score = ranked.last().map(|r| r.1),
804            vector_weight = %self.vector_weight,
805            keyword_weight = %self.keyword_weight,
806            "recall: weighted merge complete"
807        );
808
809        if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
810            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
811            match self.sqlite.message_timestamps(&ids).await {
812                Ok(timestamps) => {
813                    apply_temporal_decay(
814                        &mut ranked,
815                        &timestamps,
816                        self.temporal_decay_half_life_days,
817                    );
818                    ranked
819                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
820                    tracing::debug!(
821                        half_life_days = self.temporal_decay_half_life_days,
822                        top_score_after = ranked.first().map(|r| r.1),
823                        "recall: temporal decay applied"
824                    );
825                }
826                Err(e) => {
827                    tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
828                }
829            }
830        }
831
832        if self.mmr_enabled && !vector_results.is_empty() {
833            if let Some(qdrant) = &self.qdrant {
834                let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
835                match qdrant.get_vectors(&ids).await {
836                    Ok(vec_map) if !vec_map.is_empty() => {
837                        let ranked_len_before = ranked.len();
838                        ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
839                        tracing::debug!(
840                            before = ranked_len_before,
841                            after = ranked.len(),
842                            lambda = %self.mmr_lambda,
843                            "recall: mmr re-ranked"
844                        );
845                    }
846                    Ok(_) => {
847                        ranked.truncate(limit);
848                    }
849                    Err(e) => {
850                        tracing::warn!("MMR: failed to fetch vectors: {e:#}");
851                        ranked.truncate(limit);
852                    }
853                }
854            } else {
855                ranked.truncate(limit);
856            }
857        } else {
858            ranked.truncate(limit);
859        }
860
861        if self.importance_enabled && !ranked.is_empty() {
862            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
863            match self.sqlite.fetch_importance_scores(&ids).await {
864                Ok(scores) => {
865                    for (msg_id, score) in &mut ranked {
866                        if let Some(&imp) = scores.get(msg_id) {
867                            *score += imp * self.importance_weight;
868                        }
869                    }
870                    ranked
871                        .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
872                    tracing::debug!(
873                        importance_weight = %self.importance_weight,
874                        "recall: importance scores blended"
875                    );
876                }
877                Err(e) => {
878                    tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
879                }
880            }
881        }
882
883        // Apply tier boost: semantic-tier messages receive an additive bonus so distilled facts
884        // rank above episodic messages with the same base score. Additive (not multiplicative)
885        // so the effect is consistent regardless of base score magnitude.
886        if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
887            let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
888            match self.sqlite.fetch_tiers(&ids).await {
889                Ok(tiers) => {
890                    let bonus = self.tier_boost_semantic - 1.0;
891                    let mut boosted = false;
892                    for (msg_id, score) in &mut ranked {
893                        if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
894                            *score += bonus;
895                            boosted = true;
896                        }
897                    }
898                    if boosted {
899                        ranked.sort_by(|a, b| {
900                            b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
901                        });
902                        tracing::debug!(
903                            tier_boost = %self.tier_boost_semantic,
904                            "recall: semantic tier boost applied"
905                        );
906                    }
907                }
908                Err(e) => {
909                    tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
910                }
911            }
912        }
913
914        let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
915
916        if !ids.is_empty()
917            && let Err(e) = self.batch_increment_access_count(ids.clone()).await
918        {
919            tracing::warn!("recall: failed to increment access counts: {e:#}");
920        }
921
922        // Update RL admission training data: mark recalled messages as positive examples.
923        if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
924            tracing::debug!(
925                error = %e,
926                "recall: failed to mark training data as recalled (non-fatal)"
927            );
928        }
929
930        let messages = self.sqlite.messages_by_ids(&ids).await?;
931        let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
932
933        let recalled: Vec<RecalledMessage> = ranked
934            .iter()
935            .filter_map(|(msg_id, score)| {
936                msg_map.get(msg_id).map(|msg| RecalledMessage {
937                    message: msg.clone(),
938                    #[expect(clippy::cast_possible_truncation)]
939                    score: *score as f32,
940                })
941            })
942            .collect();
943
944        tracing::debug!(final_count = recalled.len(), "recall: final results");
945
946        Ok(recalled)
947    }
948
949    /// Recall messages using query-aware routing.
950    ///
951    /// Delegates to FTS5-only, vector-only, or hybrid search based on the router decision,
952    /// then runs the shared merge and ranking pipeline.
953    ///
954    /// # Errors
955    ///
956    /// Returns an error if any underlying search or database operation fails.
957    #[cfg_attr(
958        feature = "profiling",
959        tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty))
960    )]
961    pub async fn recall_routed(
962        &self,
963        query: &str,
964        limit: usize,
965        filter: Option<SearchFilter>,
966        router: &dyn crate::router::MemoryRouter,
967    ) -> Result<Vec<RecalledMessage>, MemoryError> {
968        use crate::router::MemoryRoute;
969
970        let route = router.route(query);
971        tracing::debug!(?route, query_len = query.len(), "memory routing decision");
972
973        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
974
975        let (keyword_results, vector_results): (
976            Vec<(MessageId, f64)>,
977            Vec<crate::embedding_store::SearchResult>,
978        ) = match route {
979            MemoryRoute::Keyword => {
980                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
981                (kw, Vec::new())
982            }
983            MemoryRoute::Semantic => {
984                let vr = self.recall_vectors_raw(query, limit, filter).await?;
985                (Vec::new(), vr)
986            }
987            MemoryRoute::Hybrid => {
988                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
989                    Ok(r) => r,
990                    Err(e) => {
991                        tracing::warn!("FTS5 keyword search failed: {e:#}");
992                        Vec::new()
993                    }
994                };
995                let vr = self.recall_vectors_raw(query, limit, filter).await?;
996                (kw, vr)
997            }
998            // Episodic: FTS5 keyword search with an optional timestamp-range filter.
999            // Temporal keywords are stripped from the query before passing to FTS5 to
1000            // prevent BM25 score distortion (e.g. "yesterday" matching messages that
1001            // literally contain the word "yesterday" regardless of actual relevance).
1002            // Vector search is skipped for speed; temporal decay in recall_merge_and_rank
1003            // provides recency boosting for the FTS5 results.
1004            // Known trade-off (MVP): semantically similar but lexically different messages
1005            // may be missed. See issue #1629 for a future hybrid_temporal mode.
1006            MemoryRoute::Episodic => {
1007                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
1008                let cleaned = crate::router::strip_temporal_keywords(query);
1009                let search_query = if cleaned.is_empty() { query } else { &cleaned };
1010                let kw = if let Some(ref r) = range {
1011                    self.sqlite
1012                        .keyword_search_with_time_range(
1013                            search_query,
1014                            limit,
1015                            conversation_id,
1016                            r.after.as_deref(),
1017                            r.before.as_deref(),
1018                        )
1019                        .await?
1020                } else {
1021                    self.recall_fts5_raw(search_query, limit, conversation_id)
1022                        .await?
1023                };
1024                tracing::debug!(
1025                    has_range = range.is_some(),
1026                    cleaned_query = %search_query,
1027                    keyword_count = kw.len(),
1028                    "recall: episodic path"
1029                );
1030                (kw, Vec::new())
1031            }
1032            // Graph routing triggers graph_recall separately in agent/context.rs.
1033            // For the message-based recall, behave like Hybrid.
1034            MemoryRoute::Graph => {
1035                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1036                    Ok(r) => r,
1037                    Err(e) => {
1038                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
1039                        Vec::new()
1040                    }
1041                };
1042                let vr = self.recall_vectors_raw(query, limit, filter).await?;
1043                (kw, vr)
1044            }
1045        };
1046
1047        tracing::debug!(
1048            keyword_count = keyword_results.len(),
1049            vector_count = vector_results.len(),
1050            "recall: routed search results"
1051        );
1052
1053        self.recall_merge_and_rank(keyword_results, vector_results, limit)
1054            .await
1055    }
1056
1057    /// Async variant of [`recall_routed`](Self::recall_routed) that uses
1058    /// [`AsyncMemoryRouter::route_async`](crate::router::AsyncMemoryRouter::route_async) when
1059    /// available, enabling LLM-based routing for `LlmRouter` and `HybridRouter`.
1060    ///
1061    /// Falls back to [`recall_routed`](Self::recall_routed) for routers that only implement
1062    /// the sync `MemoryRouter` trait (e.g. `HeuristicRouter`).
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns an error if any underlying search or database operation fails.
1067    #[cfg_attr(
1068        feature = "profiling",
1069        tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty))
1070    )]
1071    pub async fn recall_routed_async(
1072        &self,
1073        query: &str,
1074        limit: usize,
1075        filter: Option<crate::embedding_store::SearchFilter>,
1076        router: &dyn crate::router::AsyncMemoryRouter,
1077    ) -> Result<Vec<RecalledMessage>, MemoryError> {
1078        use crate::router::MemoryRoute;
1079
1080        let decision = router.route_async(query).await;
1081        let route = decision.route;
1082        tracing::debug!(
1083            ?route,
1084            confidence = decision.confidence,
1085            query_len = query.len(),
1086            "memory routing decision (async)"
1087        );
1088
1089        let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
1090
1091        let (keyword_results, vector_results): (
1092            Vec<(crate::types::MessageId, f64)>,
1093            Vec<crate::embedding_store::SearchResult>,
1094        ) = match route {
1095            MemoryRoute::Keyword => {
1096                let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
1097                (kw, Vec::new())
1098            }
1099            MemoryRoute::Semantic => {
1100                let vr = self.recall_vectors_raw(query, limit, filter).await?;
1101                (Vec::new(), vr)
1102            }
1103            MemoryRoute::Hybrid => {
1104                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1105                    Ok(r) => r,
1106                    Err(e) => {
1107                        tracing::warn!("FTS5 keyword search failed: {e:#}");
1108                        Vec::new()
1109                    }
1110                };
1111                let vr = self.recall_vectors_raw(query, limit, filter).await?;
1112                (kw, vr)
1113            }
1114            MemoryRoute::Episodic => {
1115                let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
1116                let cleaned = crate::router::strip_temporal_keywords(query);
1117                let search_query = if cleaned.is_empty() { query } else { &cleaned };
1118                let kw = if let Some(ref r) = range {
1119                    self.sqlite
1120                        .keyword_search_with_time_range(
1121                            search_query,
1122                            limit,
1123                            conversation_id,
1124                            r.after.as_deref(),
1125                            r.before.as_deref(),
1126                        )
1127                        .await?
1128                } else {
1129                    self.recall_fts5_raw(search_query, limit, conversation_id)
1130                        .await?
1131                };
1132                (kw, Vec::new())
1133            }
1134            MemoryRoute::Graph => {
1135                let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1136                    Ok(r) => r,
1137                    Err(e) => {
1138                        tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
1139                        Vec::new()
1140                    }
1141                };
1142                let vr = self.recall_vectors_raw(query, limit, filter).await?;
1143                (kw, vr)
1144            }
1145        };
1146
1147        tracing::debug!(
1148            keyword_count = keyword_results.len(),
1149            vector_count = vector_results.len(),
1150            "recall: routed search results (async)"
1151        );
1152
1153        self.recall_merge_and_rank(keyword_results, vector_results, limit)
1154            .await
1155    }
1156
1157    /// Retrieve graph facts relevant to `query` via BFS traversal.
1158    ///
1159    /// Returns an empty `Vec` if no `graph_store` is configured.
1160    ///
1161    /// # Parameters
1162    ///
1163    /// - `at_timestamp`: when `Some`, only edges valid at that `SQLite` datetime string are returned.
1164    ///   When `None`, only currently active edges are used.
1165    /// - `temporal_decay_rate`: non-negative decay rate (1/day). `0.0` preserves original ordering.
1166    ///
1167    /// # Errors
1168    ///
1169    /// Returns an error if the underlying graph query fails.
1170    #[cfg_attr(
1171        feature = "profiling",
1172        tracing::instrument(name = "memory.recall_graph", skip_all, fields(result_count = tracing::field::Empty))
1173    )]
1174    pub async fn recall_graph(
1175        &self,
1176        query: &str,
1177        limit: usize,
1178        max_hops: u32,
1179        at_timestamp: Option<&str>,
1180        temporal_decay_rate: f64,
1181        edge_types: &[crate::graph::EdgeType],
1182    ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
1183        let Some(store) = &self.graph_store else {
1184            return Ok(Vec::new());
1185        };
1186
1187        tracing::debug!(
1188            query_len = query.len(),
1189            limit,
1190            max_hops,
1191            "graph: starting recall"
1192        );
1193
1194        let results = crate::graph::retrieval::graph_recall(
1195            store,
1196            self.qdrant.as_deref(),
1197            &self.provider,
1198            query,
1199            limit,
1200            max_hops,
1201            at_timestamp,
1202            temporal_decay_rate,
1203            edge_types,
1204        )
1205        .await?;
1206
1207        tracing::debug!(result_count = results.len(), "graph: recall complete");
1208        #[cfg(feature = "profiling")]
1209        tracing::Span::current().record("result_count", results.len());
1210
1211        Ok(results)
1212    }
1213
1214    /// Retrieve graph facts via SYNAPSE spreading activation.
1215    ///
1216    /// Delegates to [`crate::graph::retrieval::graph_recall_activated`].
1217    /// Used in place of [`Self::recall_graph`] when `spreading_activation.enabled = true`.
1218    ///
1219    /// # Errors
1220    ///
1221    /// Returns an error if the underlying graph query fails.
1222    #[cfg_attr(
1223        feature = "profiling",
1224        tracing::instrument(name = "memory.recall_graph", skip_all, fields(result_count = tracing::field::Empty))
1225    )]
1226    pub async fn recall_graph_activated(
1227        &self,
1228        query: &str,
1229        limit: usize,
1230        params: crate::graph::SpreadingActivationParams,
1231        edge_types: &[crate::graph::EdgeType],
1232    ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
1233        let Some(store) = &self.graph_store else {
1234            return Ok(Vec::new());
1235        };
1236
1237        tracing::debug!(
1238            query_len = query.len(),
1239            limit,
1240            "spreading activation: starting graph recall"
1241        );
1242
1243        let embeddings = self.qdrant.as_deref();
1244        let results = crate::graph::retrieval::graph_recall_activated(
1245            store,
1246            embeddings,
1247            &self.provider,
1248            query,
1249            limit,
1250            params,
1251            edge_types,
1252        )
1253        .await?;
1254
1255        tracing::debug!(
1256            result_count = results.len(),
1257            "spreading activation: graph recall complete"
1258        );
1259
1260        Ok(results)
1261    }
1262
1263    /// Increment access count and update `last_accessed` for a batch of message IDs.
1264    ///
1265    /// Skips the update if `message_ids` is empty to avoid an invalid `IN ()` clause.
1266    ///
1267    /// # Errors
1268    ///
1269    /// Returns an error if the `SQLite` update fails.
1270    async fn batch_increment_access_count(
1271        &self,
1272        message_ids: Vec<MessageId>,
1273    ) -> Result<(), MemoryError> {
1274        if message_ids.is_empty() {
1275            return Ok(());
1276        }
1277        self.sqlite.increment_access_counts(&message_ids).await
1278    }
1279
1280    /// Check whether an embedding exists for a given message ID.
1281    ///
1282    /// # Errors
1283    ///
1284    /// Returns an error if the `SQLite` query fails.
1285    pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
1286        match &self.qdrant {
1287            Some(qdrant) => qdrant.has_embedding(message_id).await,
1288            None => Ok(false),
1289        }
1290    }
1291
1292    /// Embed all messages that do not yet have embeddings.
1293    ///
1294    /// Processes unembedded messages in micro-batches of 32, using `buffer_unordered(4)` for
1295    /// concurrent embedding within each batch. Bounded peak memory: at most 32 messages of content
1296    /// plus their embedding vectors are live at any time.
1297    ///
1298    /// When `progress_tx` is `Some`, sends `Some(BackfillProgress)` after each message and
1299    /// `None` on completion (or on timeout/error in the caller).
1300    ///
1301    /// Returns the count of successfully embedded messages.
1302    ///
1303    /// # Errors
1304    ///
1305    /// Returns an error if collection initialization or the streaming query setup fails.
1306    /// Individual embedding failures are logged but do not stop processing.
1307    pub async fn embed_missing(
1308        &self,
1309        progress_tx: Option<tokio::sync::watch::Sender<Option<super::BackfillProgress>>>,
1310    ) -> Result<usize, MemoryError> {
1311        if self.qdrant.is_none() || !self.effective_embed_provider().supports_embeddings() {
1312            return Ok(0);
1313        }
1314
1315        let total = self.sqlite.count_unembedded_messages().await?;
1316        if total == 0 {
1317            return Ok(0);
1318        }
1319
1320        if let Some(tx) = &progress_tx {
1321            let _ = tx.send(Some(super::BackfillProgress { done: 0, total }));
1322        }
1323
1324        let mut done = 0usize;
1325        let mut succeeded = 0usize;
1326
1327        loop {
1328            const BATCH_SIZE: usize = 32;
1329            const BATCH_SIZE_I64: i64 = 32;
1330            let rows: Vec<_> = self
1331                .sqlite
1332                .stream_unembedded_messages(BATCH_SIZE_I64)
1333                .try_collect()
1334                .await?;
1335
1336            if rows.is_empty() {
1337                break;
1338            }
1339
1340            let batch_len = rows.len();
1341
1342            let results: Vec<bool> = futures::stream::iter(rows)
1343                .map(|(msg_id, conv_id, role, content)| async move {
1344                    self.embed_and_store_regular(msg_id, conv_id, &role, &content)
1345                })
1346                .buffer_unordered(4)
1347                .collect()
1348                .await;
1349
1350            for ok in &results {
1351                done += 1;
1352                if *ok {
1353                    succeeded += 1;
1354                }
1355                if let Some(tx) = &progress_tx {
1356                    let _ = tx.send(Some(super::BackfillProgress { done, total }));
1357                }
1358            }
1359
1360            let batch_succeeded = results.iter().filter(|&&b| b).count();
1361            if batch_succeeded > 0 {
1362                tracing::debug!("Backfill batch: {batch_succeeded}/{batch_len} embedded");
1363            }
1364
1365            if batch_len < BATCH_SIZE {
1366                break;
1367            }
1368        }
1369
1370        if let Some(tx) = &progress_tx {
1371            let _ = tx.send(None);
1372        }
1373
1374        if done > 0 {
1375            tracing::info!("Embedded {succeeded}/{total} missing messages");
1376        }
1377        Ok(succeeded)
1378    }
1379}
1380
1381#[cfg(test)]
1382mod tests {
1383    use super::*;
1384
1385    #[test]
1386    fn embed_context_default_all_none() {
1387        let ctx = EmbedContext::default();
1388        assert!(ctx.tool_name.is_none());
1389        assert!(ctx.exit_code.is_none());
1390        assert!(ctx.timestamp.is_none());
1391    }
1392
1393    #[test]
1394    fn embed_context_fields_set_correctly() {
1395        let ctx = EmbedContext {
1396            tool_name: Some("shell".to_string()),
1397            exit_code: Some(0),
1398            timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1399        };
1400        assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1401        assert_eq!(ctx.exit_code, Some(0));
1402        assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1403    }
1404
1405    #[test]
1406    fn embed_context_non_zero_exit_code() {
1407        let ctx = EmbedContext {
1408            tool_name: Some("shell".to_string()),
1409            exit_code: Some(1),
1410            timestamp: None,
1411        };
1412        assert_eq!(ctx.exit_code, Some(1));
1413        assert!(ctx.timestamp.is_none());
1414    }
1415
1416    async fn make_semantic_memory() -> crate::semantic::SemanticMemory {
1417        use std::sync::Arc;
1418        use std::sync::atomic::AtomicU64;
1419        use zeph_llm::any::AnyProvider;
1420        use zeph_llm::mock::MockProvider;
1421
1422        let provider = AnyProvider::Mock(MockProvider::default());
1423        let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
1424        crate::semantic::SemanticMemory {
1425            sqlite,
1426            qdrant: None,
1427            provider,
1428            embed_provider: None,
1429            embedding_model: "test-model".into(),
1430            vector_weight: 0.7,
1431            keyword_weight: 0.3,
1432            temporal_decay_enabled: false,
1433            temporal_decay_half_life_days: 30,
1434            mmr_enabled: false,
1435            mmr_lambda: 0.7,
1436            importance_enabled: false,
1437            importance_weight: 0.15,
1438            token_counter: Arc::new(crate::token_counter::TokenCounter::new()),
1439            graph_store: None,
1440            community_detection_failures: Arc::new(AtomicU64::new(0)),
1441            graph_extraction_count: Arc::new(AtomicU64::new(0)),
1442            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
1443            tier_boost_semantic: 1.3,
1444            admission_control: None,
1445            key_facts_dedup_threshold: 0.95,
1446            embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
1447        }
1448    }
1449
1450    #[tokio::test]
1451    async fn spawn_embed_bg_returns_true_when_capacity_available() {
1452        let memory = make_semantic_memory().await;
1453        let dispatched = memory.spawn_embed_bg(std::future::ready(()));
1454        assert!(
1455            dispatched,
1456            "spawn_embed_bg must return true when a task was successfully spawned"
1457        );
1458    }
1459
1460    #[tokio::test]
1461    async fn spawn_embed_bg_returns_false_at_capacity() {
1462        let memory = make_semantic_memory().await;
1463
1464        // Fill the JoinSet to the limit with never-completing futures.
1465        {
1466            let mut tasks = memory.embed_tasks.lock().unwrap();
1467            for _ in 0..MAX_EMBED_BG_TASKS {
1468                tasks.spawn(std::future::pending::<()>());
1469            }
1470        }
1471
1472        let dispatched = memory.spawn_embed_bg(std::future::ready(()));
1473        assert!(
1474            !dispatched,
1475            "spawn_embed_bg must return false when the task limit is reached"
1476        );
1477    }
1478}