1use futures::StreamExt as _;
5use zeph_llm::provider::{LlmProvider as _, Message};
6
7const CHARS_PER_TOKEN: usize = 4;
9
10const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
12
13const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
15
16fn chunk_text(text: &str) -> Vec<&str> {
22 if text.len() <= CHUNK_CHARS {
23 return vec![text];
24 }
25
26 let mut chunks = Vec::new();
27 let mut start = 0;
28
29 while start < text.len() {
30 let end = if start + CHUNK_CHARS >= text.len() {
31 text.len()
32 } else {
33 let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
35 let slice = &text[start..boundary];
37 if let Some(pos) = slice.rfind("\n\n") {
38 start + pos + 2
39 } else if let Some(pos) = slice.rfind('\n') {
40 start + pos + 1
41 } else if let Some(pos) = slice.rfind(' ') {
42 start + pos + 1
43 } else {
44 boundary
45 }
46 };
47
48 chunks.push(&text[start..end]);
49 if end >= text.len() {
50 break;
51 }
52 let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
54 start = text.ceil_char_boundary(next);
55 if start >= end {
56 start = end; }
58 }
59
60 chunks
61}
62
63use crate::admission::log_admission_decision;
64use crate::embedding_store::{MessageKind, SearchFilter};
65use crate::error::MemoryError;
66use crate::types::{ConversationId, MessageId};
67
68use super::SemanticMemory;
69use super::algorithms::{apply_mmr, apply_temporal_decay};
70
71#[derive(Debug, Clone, Default)]
75pub struct EmbedContext {
76 pub tool_name: Option<String>,
77 pub exit_code: Option<i32>,
78 pub timestamp: Option<String>,
79}
80
81#[derive(Debug)]
82pub struct RecalledMessage {
83 pub message: Message,
84 pub score: f32,
85}
86
87impl SemanticMemory {
88 pub async fn remember(
98 &self,
99 conversation_id: ConversationId,
100 role: &str,
101 content: &str,
102 goal_text: Option<&str>,
103 ) -> Result<Option<MessageId>, MemoryError> {
104 if let Some(ref admission) = self.admission_control {
106 let decision = admission
107 .evaluate(
108 content,
109 role,
110 &self.provider,
111 self.qdrant.as_ref(),
112 goal_text,
113 )
114 .await;
115 let preview: String = content.chars().take(100).collect();
116 log_admission_decision(&decision, &preview, role, admission.threshold());
117 if !decision.admitted {
118 return Ok(None);
119 }
120 }
121
122 let message_id = self
123 .sqlite
124 .save_message(conversation_id, role, content)
125 .await?;
126
127 self.embed_and_store_regular(message_id, conversation_id, role, content)
128 .await;
129
130 Ok(Some(message_id))
131 }
132
133 pub async fn remember_with_parts(
142 &self,
143 conversation_id: ConversationId,
144 role: &str,
145 content: &str,
146 parts_json: &str,
147 goal_text: Option<&str>,
148 ) -> Result<(Option<MessageId>, bool), MemoryError> {
149 if let Some(ref admission) = self.admission_control {
151 let decision = admission
152 .evaluate(
153 content,
154 role,
155 &self.provider,
156 self.qdrant.as_ref(),
157 goal_text,
158 )
159 .await;
160 let preview: String = content.chars().take(100).collect();
161 log_admission_decision(&decision, &preview, role, admission.threshold());
162 if !decision.admitted {
163 return Ok((None, false));
164 }
165 }
166
167 let message_id = self
168 .sqlite
169 .save_message_with_parts(conversation_id, role, content, parts_json)
170 .await?;
171
172 let embedding_stored = self
173 .embed_and_store_regular(message_id, conversation_id, role, content)
174 .await;
175
176 Ok((Some(message_id), embedding_stored))
177 }
178
179 pub async fn remember_tool_output(
191 &self,
192 conversation_id: ConversationId,
193 role: &str,
194 content: &str,
195 parts_json: &str,
196 embed_ctx: EmbedContext,
197 ) -> Result<(Option<MessageId>, bool), MemoryError> {
198 if let Some(ref admission) = self.admission_control {
199 let decision = admission
200 .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
201 .await;
202 let preview: String = content.chars().take(100).collect();
203 log_admission_decision(&decision, &preview, role, admission.threshold());
204 if !decision.admitted {
205 return Ok((None, false));
206 }
207 }
208
209 let message_id = self
210 .sqlite
211 .save_message_with_parts(conversation_id, role, content, parts_json)
212 .await?;
213
214 let embedding_stored = self
215 .embed_chunks_with_tool_context(message_id, conversation_id, role, content, embed_ctx)
216 .await;
217
218 Ok((Some(message_id), embedding_stored))
219 }
220
221 pub async fn remember_categorized(
232 &self,
233 conversation_id: ConversationId,
234 role: &str,
235 content: &str,
236 category: Option<&str>,
237 goal_text: Option<&str>,
238 ) -> Result<Option<MessageId>, MemoryError> {
239 if let Some(ref admission) = self.admission_control {
240 let decision = admission
241 .evaluate(
242 content,
243 role,
244 &self.provider,
245 self.qdrant.as_ref(),
246 goal_text,
247 )
248 .await;
249 let preview: String = content.chars().take(100).collect();
250 log_admission_decision(&decision, &preview, role, admission.threshold());
251 if !decision.admitted {
252 return Ok(None);
253 }
254 }
255
256 let message_id = self
257 .sqlite
258 .save_message_with_category(conversation_id, role, content, category)
259 .await?;
260
261 self.embed_and_store_with_category(message_id, conversation_id, role, content, category)
262 .await;
263
264 Ok(Some(message_id))
265 }
266
267 pub async fn recall_with_category(
275 &self,
276 query: &str,
277 limit: usize,
278 filter: Option<SearchFilter>,
279 category: Option<&str>,
280 ) -> Result<Vec<RecalledMessage>, MemoryError> {
281 let filter_with_category = filter.map(|mut f| {
282 f.category = category.map(str::to_owned);
283 f
284 });
285 self.recall(query, limit, filter_with_category).await
286 }
287
288 async fn embed_and_store_with_category(
290 &self,
291 message_id: MessageId,
292 conversation_id: ConversationId,
293 role: &str,
294 content: &str,
295 category: Option<&str>,
296 ) -> bool {
297 let Some(qdrant) = &self.qdrant else {
298 return false;
299 };
300 let embed_provider = self.effective_embed_provider();
301 if !embed_provider.supports_embeddings() {
302 return false;
303 }
304
305 let chunks = chunk_text(content);
306 let chunk_count = chunks.len();
307
308 let vectors = match embed_provider.embed_batch(&chunks).await {
309 Ok(v) => v,
310 Err(e) => {
311 tracing::warn!("Failed to embed categorized chunks for msg {message_id}: {e:#}");
312 return false;
313 }
314 };
315
316 let Some(first) = vectors.first() else {
317 return false;
318 };
319 let vector_size = u64::try_from(first.len()).unwrap_or(896);
320 if let Err(e) = qdrant.ensure_collection(vector_size).await {
321 tracing::warn!("Failed to ensure Qdrant collection for categorized msg: {e:#}");
322 return false;
323 }
324
325 let mut stored = false;
326 for (chunk_index, vector) in vectors.into_iter().enumerate() {
327 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
328 match qdrant
329 .store_with_category(
330 message_id,
331 conversation_id,
332 role,
333 vector,
334 MessageKind::Regular,
335 &self.embedding_model,
336 chunk_index_u32,
337 category,
338 )
339 .await
340 {
341 Ok(_) => stored = true,
342 Err(e) => tracing::warn!(
343 "Failed to store categorized chunk {chunk_index}/{chunk_count} \
344 for msg {message_id}: {e:#}"
345 ),
346 }
347 }
348
349 stored
350 }
351
352 async fn embed_and_store_regular(
357 &self,
358 message_id: MessageId,
359 conversation_id: ConversationId,
360 role: &str,
361 content: &str,
362 ) -> bool {
363 let Some(qdrant) = &self.qdrant else {
364 return false;
365 };
366 let embed_provider = self.effective_embed_provider();
367 if !embed_provider.supports_embeddings() {
368 return false;
369 }
370
371 let chunks = chunk_text(content);
372 let chunk_count = chunks.len();
373
374 let vectors = match embed_provider.embed_batch(&chunks).await {
375 Ok(v) => v,
376 Err(e) => {
377 tracing::warn!("Failed to embed chunks for msg {message_id}: {e:#}");
378 return false;
379 }
380 };
381
382 let Some(first) = vectors.first() else {
383 return false;
384 };
385 let vector_size = u64::try_from(first.len()).unwrap_or(896);
386 if let Err(e) = qdrant.ensure_collection(vector_size).await {
387 tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
388 return false;
389 }
390
391 let mut stored = false;
392 for (chunk_index, vector) in vectors.into_iter().enumerate() {
393 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
394 match qdrant
395 .store(
396 message_id,
397 conversation_id,
398 role,
399 vector,
400 MessageKind::Regular,
401 &self.embedding_model,
402 chunk_index_u32,
403 )
404 .await
405 {
406 Ok(_) => stored = true,
407 Err(e) => tracing::warn!(
408 "Failed to store chunk {chunk_index}/{chunk_count} \
409 for msg {message_id}: {e:#}"
410 ),
411 }
412 }
413
414 stored
415 }
416
417 async fn embed_chunks_with_tool_context(
421 &self,
422 message_id: MessageId,
423 conversation_id: ConversationId,
424 role: &str,
425 content: &str,
426 embed_ctx: EmbedContext,
427 ) -> bool {
428 let Some(qdrant) = &self.qdrant else {
429 return false;
430 };
431 let embed_provider = self.effective_embed_provider();
432 if !embed_provider.supports_embeddings() {
433 return false;
434 }
435
436 let chunks = chunk_text(content);
437 let chunk_count = chunks.len();
438 let mut stored = false;
439
440 let vectors = match embed_provider.embed_batch(&chunks).await {
443 Ok(v) => v,
444 Err(e) => {
445 tracing::warn!("Failed to embed tool-output chunks for msg {message_id}: {e:#}");
446 return false;
447 }
448 };
449
450 if let Some(first) = vectors.first() {
451 let vector_size = u64::try_from(first.len()).unwrap_or(896);
452 if let Err(e) = qdrant.ensure_collection(vector_size).await {
453 tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
454 return false;
455 }
456 }
457
458 for (chunk_index, vector) in vectors.into_iter().enumerate() {
459 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
460 let result = if let Some(ref tool_name) = embed_ctx.tool_name {
461 qdrant
462 .store_with_tool_context(
463 message_id,
464 conversation_id,
465 role,
466 vector,
467 MessageKind::Regular,
468 &self.embedding_model,
469 chunk_index_u32,
470 tool_name,
471 embed_ctx.exit_code,
472 embed_ctx.timestamp.as_deref(),
473 )
474 .await
475 .map(|_| ())
476 } else {
477 qdrant
478 .store(
479 message_id,
480 conversation_id,
481 role,
482 vector,
483 MessageKind::Regular,
484 &self.embedding_model,
485 chunk_index_u32,
486 )
487 .await
488 .map(|_| ())
489 };
490 match result {
491 Ok(()) => stored = true,
492 Err(e) => tracing::warn!(
493 "Failed to store tool-output chunk {chunk_index}/{chunk_count} \
494 for msg {message_id}: {e:#}"
495 ),
496 }
497 }
498
499 stored
500 }
501
502 pub async fn save_only(
510 &self,
511 conversation_id: ConversationId,
512 role: &str,
513 content: &str,
514 parts_json: &str,
515 ) -> Result<MessageId, MemoryError> {
516 self.sqlite
517 .save_message_with_parts(conversation_id, role, content, parts_json)
518 .await
519 }
520
521 pub async fn recall(
531 &self,
532 query: &str,
533 limit: usize,
534 filter: Option<SearchFilter>,
535 ) -> Result<Vec<RecalledMessage>, MemoryError> {
536 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
537
538 tracing::debug!(
539 query_len = query.len(),
540 limit,
541 has_filter = filter.is_some(),
542 conversation_id = conversation_id.map(|c| c.0),
543 has_qdrant = self.qdrant.is_some(),
544 "recall: starting hybrid search"
545 );
546
547 let keyword_results = match self
548 .sqlite
549 .keyword_search(query, limit * 2, conversation_id)
550 .await
551 {
552 Ok(results) => results,
553 Err(e) => {
554 tracing::warn!("FTS5 keyword search failed: {e:#}");
555 Vec::new()
556 }
557 };
558
559 let vector_results = if let Some(qdrant) = &self.qdrant
560 && self.provider.supports_embeddings()
561 {
562 let query_vector = self.provider.embed(query).await?;
563 let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
564 qdrant.ensure_collection(vector_size).await?;
565 qdrant.search(&query_vector, limit * 2, filter).await?
566 } else {
567 Vec::new()
568 };
569
570 self.recall_merge_and_rank(keyword_results, vector_results, limit)
571 .await
572 }
573
574 pub(super) async fn recall_fts5_raw(
575 &self,
576 query: &str,
577 limit: usize,
578 conversation_id: Option<ConversationId>,
579 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
580 self.sqlite
581 .keyword_search(query, limit * 2, conversation_id)
582 .await
583 }
584
585 pub(super) async fn recall_vectors_raw(
586 &self,
587 query: &str,
588 limit: usize,
589 filter: Option<SearchFilter>,
590 ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
591 let Some(qdrant) = &self.qdrant else {
592 return Ok(Vec::new());
593 };
594 if !self.provider.supports_embeddings() {
595 return Ok(Vec::new());
596 }
597 let query_vector = self.provider.embed(query).await?;
598 let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
599 qdrant.ensure_collection(vector_size).await?;
600 qdrant.search(&query_vector, limit * 2, filter).await
601 }
602
603 #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
612 pub(super) async fn recall_merge_and_rank(
613 &self,
614 keyword_results: Vec<(MessageId, f64)>,
615 vector_results: Vec<crate::embedding_store::SearchResult>,
616 limit: usize,
617 ) -> Result<Vec<RecalledMessage>, MemoryError> {
618 tracing::debug!(
619 vector_count = vector_results.len(),
620 keyword_count = keyword_results.len(),
621 limit,
622 "recall: merging search results"
623 );
624
625 let mut scores: std::collections::HashMap<MessageId, f64> =
626 std::collections::HashMap::new();
627
628 if !vector_results.is_empty() {
629 let max_vs = vector_results
630 .iter()
631 .map(|r| r.score)
632 .fold(f32::NEG_INFINITY, f32::max);
633 let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
634 for r in &vector_results {
635 let normalized = f64::from(r.score / norm);
636 *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
637 }
638 }
639
640 if !keyword_results.is_empty() {
641 let max_ks = keyword_results
642 .iter()
643 .map(|r| r.1)
644 .fold(f64::NEG_INFINITY, f64::max);
645 let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
646 for &(msg_id, score) in &keyword_results {
647 let normalized = score / norm;
648 *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
649 }
650 }
651
652 if scores.is_empty() {
653 tracing::debug!("recall: empty merge, no overlapping scores");
654 return Ok(Vec::new());
655 }
656
657 let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
658 ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
659
660 tracing::debug!(
661 merged = ranked.len(),
662 top_score = ranked.first().map(|r| r.1),
663 bottom_score = ranked.last().map(|r| r.1),
664 vector_weight = %self.vector_weight,
665 keyword_weight = %self.keyword_weight,
666 "recall: weighted merge complete"
667 );
668
669 if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
670 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
671 match self.sqlite.message_timestamps(&ids).await {
672 Ok(timestamps) => {
673 apply_temporal_decay(
674 &mut ranked,
675 ×tamps,
676 self.temporal_decay_half_life_days,
677 );
678 ranked
679 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
680 tracing::debug!(
681 half_life_days = self.temporal_decay_half_life_days,
682 top_score_after = ranked.first().map(|r| r.1),
683 "recall: temporal decay applied"
684 );
685 }
686 Err(e) => {
687 tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
688 }
689 }
690 }
691
692 if self.mmr_enabled && !vector_results.is_empty() {
693 if let Some(qdrant) = &self.qdrant {
694 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
695 match qdrant.get_vectors(&ids).await {
696 Ok(vec_map) if !vec_map.is_empty() => {
697 let ranked_len_before = ranked.len();
698 ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
699 tracing::debug!(
700 before = ranked_len_before,
701 after = ranked.len(),
702 lambda = %self.mmr_lambda,
703 "recall: mmr re-ranked"
704 );
705 }
706 Ok(_) => {
707 ranked.truncate(limit);
708 }
709 Err(e) => {
710 tracing::warn!("MMR: failed to fetch vectors: {e:#}");
711 ranked.truncate(limit);
712 }
713 }
714 } else {
715 ranked.truncate(limit);
716 }
717 } else {
718 ranked.truncate(limit);
719 }
720
721 if self.importance_enabled && !ranked.is_empty() {
722 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
723 match self.sqlite.fetch_importance_scores(&ids).await {
724 Ok(scores) => {
725 for (msg_id, score) in &mut ranked {
726 if let Some(&imp) = scores.get(msg_id) {
727 *score += imp * self.importance_weight;
728 }
729 }
730 ranked
731 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
732 tracing::debug!(
733 importance_weight = %self.importance_weight,
734 "recall: importance scores blended"
735 );
736 }
737 Err(e) => {
738 tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
739 }
740 }
741 }
742
743 if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
747 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
748 match self.sqlite.fetch_tiers(&ids).await {
749 Ok(tiers) => {
750 let bonus = self.tier_boost_semantic - 1.0;
751 let mut boosted = false;
752 for (msg_id, score) in &mut ranked {
753 if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
754 *score += bonus;
755 boosted = true;
756 }
757 }
758 if boosted {
759 ranked.sort_by(|a, b| {
760 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
761 });
762 tracing::debug!(
763 tier_boost = %self.tier_boost_semantic,
764 "recall: semantic tier boost applied"
765 );
766 }
767 }
768 Err(e) => {
769 tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
770 }
771 }
772 }
773
774 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
775
776 if !ids.is_empty()
777 && let Err(e) = self.batch_increment_access_count(ids.clone()).await
778 {
779 tracing::warn!("recall: failed to increment access counts: {e:#}");
780 }
781
782 if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
784 tracing::debug!(
785 error = %e,
786 "recall: failed to mark training data as recalled (non-fatal)"
787 );
788 }
789
790 let messages = self.sqlite.messages_by_ids(&ids).await?;
791 let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
792
793 let recalled: Vec<RecalledMessage> = ranked
794 .iter()
795 .filter_map(|(msg_id, score)| {
796 msg_map.get(msg_id).map(|msg| RecalledMessage {
797 message: msg.clone(),
798 #[expect(clippy::cast_possible_truncation)]
799 score: *score as f32,
800 })
801 })
802 .collect();
803
804 tracing::debug!(final_count = recalled.len(), "recall: final results");
805
806 Ok(recalled)
807 }
808
809 pub async fn recall_routed(
818 &self,
819 query: &str,
820 limit: usize,
821 filter: Option<SearchFilter>,
822 router: &dyn crate::router::MemoryRouter,
823 ) -> Result<Vec<RecalledMessage>, MemoryError> {
824 use crate::router::MemoryRoute;
825
826 let route = router.route(query);
827 tracing::debug!(?route, query_len = query.len(), "memory routing decision");
828
829 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
830
831 let (keyword_results, vector_results): (
832 Vec<(MessageId, f64)>,
833 Vec<crate::embedding_store::SearchResult>,
834 ) = match route {
835 MemoryRoute::Keyword => {
836 let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
837 (kw, Vec::new())
838 }
839 MemoryRoute::Semantic => {
840 let vr = self.recall_vectors_raw(query, limit, filter).await?;
841 (Vec::new(), vr)
842 }
843 MemoryRoute::Hybrid => {
844 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
845 Ok(r) => r,
846 Err(e) => {
847 tracing::warn!("FTS5 keyword search failed: {e:#}");
848 Vec::new()
849 }
850 };
851 let vr = self.recall_vectors_raw(query, limit, filter).await?;
852 (kw, vr)
853 }
854 MemoryRoute::Episodic => {
863 let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
864 let cleaned = crate::router::strip_temporal_keywords(query);
865 let search_query = if cleaned.is_empty() { query } else { &cleaned };
866 let kw = if let Some(ref r) = range {
867 self.sqlite
868 .keyword_search_with_time_range(
869 search_query,
870 limit,
871 conversation_id,
872 r.after.as_deref(),
873 r.before.as_deref(),
874 )
875 .await?
876 } else {
877 self.recall_fts5_raw(search_query, limit, conversation_id)
878 .await?
879 };
880 tracing::debug!(
881 has_range = range.is_some(),
882 cleaned_query = %search_query,
883 keyword_count = kw.len(),
884 "recall: episodic path"
885 );
886 (kw, Vec::new())
887 }
888 MemoryRoute::Graph => {
891 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
892 Ok(r) => r,
893 Err(e) => {
894 tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
895 Vec::new()
896 }
897 };
898 let vr = self.recall_vectors_raw(query, limit, filter).await?;
899 (kw, vr)
900 }
901 };
902
903 tracing::debug!(
904 keyword_count = keyword_results.len(),
905 vector_count = vector_results.len(),
906 "recall: routed search results"
907 );
908
909 self.recall_merge_and_rank(keyword_results, vector_results, limit)
910 .await
911 }
912
913 pub async fn recall_routed_async(
924 &self,
925 query: &str,
926 limit: usize,
927 filter: Option<crate::embedding_store::SearchFilter>,
928 router: &dyn crate::router::AsyncMemoryRouter,
929 ) -> Result<Vec<RecalledMessage>, MemoryError> {
930 use crate::router::MemoryRoute;
931
932 let decision = router.route_async(query).await;
933 let route = decision.route;
934 tracing::debug!(
935 ?route,
936 confidence = decision.confidence,
937 query_len = query.len(),
938 "memory routing decision (async)"
939 );
940
941 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
942
943 let (keyword_results, vector_results): (
944 Vec<(crate::types::MessageId, f64)>,
945 Vec<crate::embedding_store::SearchResult>,
946 ) = match route {
947 MemoryRoute::Keyword => {
948 let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
949 (kw, Vec::new())
950 }
951 MemoryRoute::Semantic => {
952 let vr = self.recall_vectors_raw(query, limit, filter).await?;
953 (Vec::new(), vr)
954 }
955 MemoryRoute::Hybrid => {
956 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
957 Ok(r) => r,
958 Err(e) => {
959 tracing::warn!("FTS5 keyword search failed: {e:#}");
960 Vec::new()
961 }
962 };
963 let vr = self.recall_vectors_raw(query, limit, filter).await?;
964 (kw, vr)
965 }
966 MemoryRoute::Episodic => {
967 let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
968 let cleaned = crate::router::strip_temporal_keywords(query);
969 let search_query = if cleaned.is_empty() { query } else { &cleaned };
970 let kw = if let Some(ref r) = range {
971 self.sqlite
972 .keyword_search_with_time_range(
973 search_query,
974 limit,
975 conversation_id,
976 r.after.as_deref(),
977 r.before.as_deref(),
978 )
979 .await?
980 } else {
981 self.recall_fts5_raw(search_query, limit, conversation_id)
982 .await?
983 };
984 (kw, Vec::new())
985 }
986 MemoryRoute::Graph => {
987 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
988 Ok(r) => r,
989 Err(e) => {
990 tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
991 Vec::new()
992 }
993 };
994 let vr = self.recall_vectors_raw(query, limit, filter).await?;
995 (kw, vr)
996 }
997 };
998
999 tracing::debug!(
1000 keyword_count = keyword_results.len(),
1001 vector_count = vector_results.len(),
1002 "recall: routed search results (async)"
1003 );
1004
1005 self.recall_merge_and_rank(keyword_results, vector_results, limit)
1006 .await
1007 }
1008
1009 pub async fn recall_graph(
1023 &self,
1024 query: &str,
1025 limit: usize,
1026 max_hops: u32,
1027 at_timestamp: Option<&str>,
1028 temporal_decay_rate: f64,
1029 edge_types: &[crate::graph::EdgeType],
1030 ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
1031 let Some(store) = &self.graph_store else {
1032 return Ok(Vec::new());
1033 };
1034
1035 tracing::debug!(
1036 query_len = query.len(),
1037 limit,
1038 max_hops,
1039 "graph: starting recall"
1040 );
1041
1042 let results = crate::graph::retrieval::graph_recall(
1043 store,
1044 self.qdrant.as_deref(),
1045 &self.provider,
1046 query,
1047 limit,
1048 max_hops,
1049 at_timestamp,
1050 temporal_decay_rate,
1051 edge_types,
1052 )
1053 .await?;
1054
1055 tracing::debug!(result_count = results.len(), "graph: recall complete");
1056
1057 Ok(results)
1058 }
1059
1060 pub async fn recall_graph_activated(
1069 &self,
1070 query: &str,
1071 limit: usize,
1072 params: crate::graph::SpreadingActivationParams,
1073 edge_types: &[crate::graph::EdgeType],
1074 ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
1075 let Some(store) = &self.graph_store else {
1076 return Ok(Vec::new());
1077 };
1078
1079 tracing::debug!(
1080 query_len = query.len(),
1081 limit,
1082 "spreading activation: starting graph recall"
1083 );
1084
1085 let embeddings = self.qdrant.as_deref();
1086 let results = crate::graph::retrieval::graph_recall_activated(
1087 store,
1088 embeddings,
1089 &self.provider,
1090 query,
1091 limit,
1092 params,
1093 edge_types,
1094 )
1095 .await?;
1096
1097 tracing::debug!(
1098 result_count = results.len(),
1099 "spreading activation: graph recall complete"
1100 );
1101
1102 Ok(results)
1103 }
1104
1105 async fn batch_increment_access_count(
1113 &self,
1114 message_ids: Vec<MessageId>,
1115 ) -> Result<(), MemoryError> {
1116 if message_ids.is_empty() {
1117 return Ok(());
1118 }
1119 self.sqlite.increment_access_counts(&message_ids).await
1120 }
1121
1122 pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
1128 match &self.qdrant {
1129 Some(qdrant) => qdrant.has_embedding(message_id).await,
1130 None => Ok(false),
1131 }
1132 }
1133
1134 pub async fn embed_missing(&self) -> Result<usize, MemoryError> {
1148 if self.qdrant.is_none() || !self.effective_embed_provider().supports_embeddings() {
1149 return Ok(0);
1150 }
1151
1152 let mut stream = std::pin::pin!(self.sqlite.stream_unembedded_messages(1000));
1153
1154 let mut count = 0usize;
1155 let mut total = 0usize;
1156 while let Some(row) = stream.next().await {
1157 let (msg_id, conversation_id, role, content) = match row {
1158 Ok(r) => r,
1159 Err(e) => {
1160 tracing::warn!("embed_missing: failed to read row: {e:#}");
1161 continue;
1162 }
1163 };
1164 total += 1;
1165 if self
1166 .embed_and_store_regular(msg_id, conversation_id, &role, &content)
1167 .await
1168 {
1169 count += 1;
1170 }
1171 }
1172
1173 if total > 0 {
1174 tracing::info!("Embedded {count}/{total} missing messages");
1175 }
1176 Ok(count)
1177 }
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182 use super::*;
1183
1184 #[test]
1185 fn embed_context_default_all_none() {
1186 let ctx = EmbedContext::default();
1187 assert!(ctx.tool_name.is_none());
1188 assert!(ctx.exit_code.is_none());
1189 assert!(ctx.timestamp.is_none());
1190 }
1191
1192 #[test]
1193 fn embed_context_fields_set_correctly() {
1194 let ctx = EmbedContext {
1195 tool_name: Some("shell".to_string()),
1196 exit_code: Some(0),
1197 timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1198 };
1199 assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1200 assert_eq!(ctx.exit_code, Some(0));
1201 assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1202 }
1203
1204 #[test]
1205 fn embed_context_non_zero_exit_code() {
1206 let ctx = EmbedContext {
1207 tool_name: Some("shell".to_string()),
1208 exit_code: Some(1),
1209 timestamp: None,
1210 };
1211 assert_eq!(ctx.exit_code, Some(1));
1212 assert!(ctx.timestamp.is_none());
1213 }
1214}