1use futures::StreamExt as _;
5use zeph_llm::provider::{LlmProvider as _, Message};
6
7const CHARS_PER_TOKEN: usize = 4;
9
10const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
12
13const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
15
16fn chunk_text(text: &str) -> Vec<&str> {
22 if text.len() <= CHUNK_CHARS {
23 return vec![text];
24 }
25
26 let mut chunks = Vec::new();
27 let mut start = 0;
28
29 while start < text.len() {
30 let end = if start + CHUNK_CHARS >= text.len() {
31 text.len()
32 } else {
33 let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
35 let slice = &text[start..boundary];
37 if let Some(pos) = slice.rfind("\n\n") {
38 start + pos + 2
39 } else if let Some(pos) = slice.rfind('\n') {
40 start + pos + 1
41 } else if let Some(pos) = slice.rfind(' ') {
42 start + pos + 1
43 } else {
44 boundary
45 }
46 };
47
48 chunks.push(&text[start..end]);
49 if end >= text.len() {
50 break;
51 }
52 let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
54 start = text.ceil_char_boundary(next);
55 if start >= end {
56 start = end; }
58 }
59
60 chunks
61}
62
63use crate::admission::log_admission_decision;
64use crate::embedding_store::{MessageKind, SearchFilter};
65use crate::error::MemoryError;
66use crate::types::{ConversationId, MessageId};
67
68use super::SemanticMemory;
69use super::algorithms::{apply_mmr, apply_temporal_decay};
70
71#[derive(Debug, Clone, Default)]
75pub struct EmbedContext {
76 pub tool_name: Option<String>,
77 pub exit_code: Option<i32>,
78 pub timestamp: Option<String>,
79}
80
81#[derive(Debug)]
82pub struct RecalledMessage {
83 pub message: Message,
84 pub score: f32,
85}
86
87impl SemanticMemory {
88 pub async fn remember(
98 &self,
99 conversation_id: ConversationId,
100 role: &str,
101 content: &str,
102 goal_text: Option<&str>,
103 ) -> Result<Option<MessageId>, MemoryError> {
104 if let Some(ref admission) = self.admission_control {
106 let decision = admission
107 .evaluate(
108 content,
109 role,
110 &self.provider,
111 self.qdrant.as_ref(),
112 goal_text,
113 )
114 .await;
115 let preview: String = content.chars().take(100).collect();
116 log_admission_decision(&decision, &preview, role, admission.threshold());
117 if !decision.admitted {
118 return Ok(None);
119 }
120 }
121
122 let message_id = self
123 .sqlite
124 .save_message(conversation_id, role, content)
125 .await?;
126
127 self.embed_and_store_regular(message_id, conversation_id, role, content)
128 .await;
129
130 Ok(Some(message_id))
131 }
132
133 pub async fn remember_with_parts(
142 &self,
143 conversation_id: ConversationId,
144 role: &str,
145 content: &str,
146 parts_json: &str,
147 goal_text: Option<&str>,
148 ) -> Result<(Option<MessageId>, bool), MemoryError> {
149 if let Some(ref admission) = self.admission_control {
151 let decision = admission
152 .evaluate(
153 content,
154 role,
155 &self.provider,
156 self.qdrant.as_ref(),
157 goal_text,
158 )
159 .await;
160 let preview: String = content.chars().take(100).collect();
161 log_admission_decision(&decision, &preview, role, admission.threshold());
162 if !decision.admitted {
163 return Ok((None, false));
164 }
165 }
166
167 let message_id = self
168 .sqlite
169 .save_message_with_parts(conversation_id, role, content, parts_json)
170 .await?;
171
172 let embedding_stored = self
173 .embed_and_store_regular(message_id, conversation_id, role, content)
174 .await;
175
176 Ok((Some(message_id), embedding_stored))
177 }
178
179 pub async fn remember_tool_output(
191 &self,
192 conversation_id: ConversationId,
193 role: &str,
194 content: &str,
195 parts_json: &str,
196 embed_ctx: EmbedContext,
197 ) -> Result<(Option<MessageId>, bool), MemoryError> {
198 if let Some(ref admission) = self.admission_control {
199 let decision = admission
200 .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
201 .await;
202 let preview: String = content.chars().take(100).collect();
203 log_admission_decision(&decision, &preview, role, admission.threshold());
204 if !decision.admitted {
205 return Ok((None, false));
206 }
207 }
208
209 let message_id = self
210 .sqlite
211 .save_message_with_parts(conversation_id, role, content, parts_json)
212 .await?;
213
214 let embedding_stored = self
215 .embed_chunks_with_tool_context(message_id, conversation_id, role, content, embed_ctx)
216 .await;
217
218 Ok((Some(message_id), embedding_stored))
219 }
220
221 async fn embed_and_store_regular(
226 &self,
227 message_id: MessageId,
228 conversation_id: ConversationId,
229 role: &str,
230 content: &str,
231 ) -> bool {
232 let Some(qdrant) = &self.qdrant else {
233 return false;
234 };
235 let embed_provider = self.effective_embed_provider();
236 if !embed_provider.supports_embeddings() {
237 return false;
238 }
239
240 let chunks = chunk_text(content);
241 let chunk_count = chunks.len();
242
243 let vectors = match embed_provider.embed_batch(&chunks).await {
244 Ok(v) => v,
245 Err(e) => {
246 tracing::warn!("Failed to embed chunks for msg {message_id}: {e:#}");
247 return false;
248 }
249 };
250
251 let Some(first) = vectors.first() else {
252 return false;
253 };
254 let vector_size = u64::try_from(first.len()).unwrap_or(896);
255 if let Err(e) = qdrant.ensure_collection(vector_size).await {
256 tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
257 return false;
258 }
259
260 let mut stored = false;
261 for (chunk_index, vector) in vectors.into_iter().enumerate() {
262 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
263 match qdrant
264 .store(
265 message_id,
266 conversation_id,
267 role,
268 vector,
269 MessageKind::Regular,
270 &self.embedding_model,
271 chunk_index_u32,
272 )
273 .await
274 {
275 Ok(_) => stored = true,
276 Err(e) => tracing::warn!(
277 "Failed to store chunk {chunk_index}/{chunk_count} \
278 for msg {message_id}: {e:#}"
279 ),
280 }
281 }
282
283 stored
284 }
285
286 async fn embed_chunks_with_tool_context(
290 &self,
291 message_id: MessageId,
292 conversation_id: ConversationId,
293 role: &str,
294 content: &str,
295 embed_ctx: EmbedContext,
296 ) -> bool {
297 let Some(qdrant) = &self.qdrant else {
298 return false;
299 };
300 let embed_provider = self.effective_embed_provider();
301 if !embed_provider.supports_embeddings() {
302 return false;
303 }
304
305 let chunks = chunk_text(content);
306 let chunk_count = chunks.len();
307 let mut stored = false;
308
309 let vectors = match embed_provider.embed_batch(&chunks).await {
312 Ok(v) => v,
313 Err(e) => {
314 tracing::warn!("Failed to embed tool-output chunks for msg {message_id}: {e:#}");
315 return false;
316 }
317 };
318
319 if let Some(first) = vectors.first() {
320 let vector_size = u64::try_from(first.len()).unwrap_or(896);
321 if let Err(e) = qdrant.ensure_collection(vector_size).await {
322 tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
323 return false;
324 }
325 }
326
327 for (chunk_index, vector) in vectors.into_iter().enumerate() {
328 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
329 let result = if let Some(ref tool_name) = embed_ctx.tool_name {
330 qdrant
331 .store_with_tool_context(
332 message_id,
333 conversation_id,
334 role,
335 vector,
336 MessageKind::Regular,
337 &self.embedding_model,
338 chunk_index_u32,
339 tool_name,
340 embed_ctx.exit_code,
341 embed_ctx.timestamp.as_deref(),
342 )
343 .await
344 .map(|_| ())
345 } else {
346 qdrant
347 .store(
348 message_id,
349 conversation_id,
350 role,
351 vector,
352 MessageKind::Regular,
353 &self.embedding_model,
354 chunk_index_u32,
355 )
356 .await
357 .map(|_| ())
358 };
359 match result {
360 Ok(()) => stored = true,
361 Err(e) => tracing::warn!(
362 "Failed to store tool-output chunk {chunk_index}/{chunk_count} \
363 for msg {message_id}: {e:#}"
364 ),
365 }
366 }
367
368 stored
369 }
370
371 pub async fn save_only(
379 &self,
380 conversation_id: ConversationId,
381 role: &str,
382 content: &str,
383 parts_json: &str,
384 ) -> Result<MessageId, MemoryError> {
385 self.sqlite
386 .save_message_with_parts(conversation_id, role, content, parts_json)
387 .await
388 }
389
390 pub async fn recall(
400 &self,
401 query: &str,
402 limit: usize,
403 filter: Option<SearchFilter>,
404 ) -> Result<Vec<RecalledMessage>, MemoryError> {
405 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
406
407 tracing::debug!(
408 query_len = query.len(),
409 limit,
410 has_filter = filter.is_some(),
411 conversation_id = conversation_id.map(|c| c.0),
412 has_qdrant = self.qdrant.is_some(),
413 "recall: starting hybrid search"
414 );
415
416 let keyword_results = match self
417 .sqlite
418 .keyword_search(query, limit * 2, conversation_id)
419 .await
420 {
421 Ok(results) => results,
422 Err(e) => {
423 tracing::warn!("FTS5 keyword search failed: {e:#}");
424 Vec::new()
425 }
426 };
427
428 let vector_results = if let Some(qdrant) = &self.qdrant
429 && self.provider.supports_embeddings()
430 {
431 let query_vector = self.provider.embed(query).await?;
432 let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
433 qdrant.ensure_collection(vector_size).await?;
434 qdrant.search(&query_vector, limit * 2, filter).await?
435 } else {
436 Vec::new()
437 };
438
439 self.recall_merge_and_rank(keyword_results, vector_results, limit)
440 .await
441 }
442
443 pub(super) async fn recall_fts5_raw(
444 &self,
445 query: &str,
446 limit: usize,
447 conversation_id: Option<ConversationId>,
448 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
449 self.sqlite
450 .keyword_search(query, limit * 2, conversation_id)
451 .await
452 }
453
454 pub(super) async fn recall_vectors_raw(
455 &self,
456 query: &str,
457 limit: usize,
458 filter: Option<SearchFilter>,
459 ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
460 let Some(qdrant) = &self.qdrant else {
461 return Ok(Vec::new());
462 };
463 if !self.provider.supports_embeddings() {
464 return Ok(Vec::new());
465 }
466 let query_vector = self.provider.embed(query).await?;
467 let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
468 qdrant.ensure_collection(vector_size).await?;
469 qdrant.search(&query_vector, limit * 2, filter).await
470 }
471
472 #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
481 pub(super) async fn recall_merge_and_rank(
482 &self,
483 keyword_results: Vec<(MessageId, f64)>,
484 vector_results: Vec<crate::embedding_store::SearchResult>,
485 limit: usize,
486 ) -> Result<Vec<RecalledMessage>, MemoryError> {
487 tracing::debug!(
488 vector_count = vector_results.len(),
489 keyword_count = keyword_results.len(),
490 limit,
491 "recall: merging search results"
492 );
493
494 let mut scores: std::collections::HashMap<MessageId, f64> =
495 std::collections::HashMap::new();
496
497 if !vector_results.is_empty() {
498 let max_vs = vector_results
499 .iter()
500 .map(|r| r.score)
501 .fold(f32::NEG_INFINITY, f32::max);
502 let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
503 for r in &vector_results {
504 let normalized = f64::from(r.score / norm);
505 *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
506 }
507 }
508
509 if !keyword_results.is_empty() {
510 let max_ks = keyword_results
511 .iter()
512 .map(|r| r.1)
513 .fold(f64::NEG_INFINITY, f64::max);
514 let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
515 for &(msg_id, score) in &keyword_results {
516 let normalized = score / norm;
517 *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
518 }
519 }
520
521 if scores.is_empty() {
522 tracing::debug!("recall: empty merge, no overlapping scores");
523 return Ok(Vec::new());
524 }
525
526 let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
527 ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
528
529 tracing::debug!(
530 merged = ranked.len(),
531 top_score = ranked.first().map(|r| r.1),
532 bottom_score = ranked.last().map(|r| r.1),
533 vector_weight = %self.vector_weight,
534 keyword_weight = %self.keyword_weight,
535 "recall: weighted merge complete"
536 );
537
538 if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
539 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
540 match self.sqlite.message_timestamps(&ids).await {
541 Ok(timestamps) => {
542 apply_temporal_decay(
543 &mut ranked,
544 ×tamps,
545 self.temporal_decay_half_life_days,
546 );
547 ranked
548 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
549 tracing::debug!(
550 half_life_days = self.temporal_decay_half_life_days,
551 top_score_after = ranked.first().map(|r| r.1),
552 "recall: temporal decay applied"
553 );
554 }
555 Err(e) => {
556 tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
557 }
558 }
559 }
560
561 if self.mmr_enabled && !vector_results.is_empty() {
562 if let Some(qdrant) = &self.qdrant {
563 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
564 match qdrant.get_vectors(&ids).await {
565 Ok(vec_map) if !vec_map.is_empty() => {
566 let ranked_len_before = ranked.len();
567 ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
568 tracing::debug!(
569 before = ranked_len_before,
570 after = ranked.len(),
571 lambda = %self.mmr_lambda,
572 "recall: mmr re-ranked"
573 );
574 }
575 Ok(_) => {
576 ranked.truncate(limit);
577 }
578 Err(e) => {
579 tracing::warn!("MMR: failed to fetch vectors: {e:#}");
580 ranked.truncate(limit);
581 }
582 }
583 } else {
584 ranked.truncate(limit);
585 }
586 } else {
587 ranked.truncate(limit);
588 }
589
590 if self.importance_enabled && !ranked.is_empty() {
591 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
592 match self.sqlite.fetch_importance_scores(&ids).await {
593 Ok(scores) => {
594 for (msg_id, score) in &mut ranked {
595 if let Some(&imp) = scores.get(msg_id) {
596 *score += imp * self.importance_weight;
597 }
598 }
599 ranked
600 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
601 tracing::debug!(
602 importance_weight = %self.importance_weight,
603 "recall: importance scores blended"
604 );
605 }
606 Err(e) => {
607 tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
608 }
609 }
610 }
611
612 if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
616 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
617 match self.sqlite.fetch_tiers(&ids).await {
618 Ok(tiers) => {
619 let bonus = self.tier_boost_semantic - 1.0;
620 let mut boosted = false;
621 for (msg_id, score) in &mut ranked {
622 if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
623 *score += bonus;
624 boosted = true;
625 }
626 }
627 if boosted {
628 ranked.sort_by(|a, b| {
629 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
630 });
631 tracing::debug!(
632 tier_boost = %self.tier_boost_semantic,
633 "recall: semantic tier boost applied"
634 );
635 }
636 }
637 Err(e) => {
638 tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
639 }
640 }
641 }
642
643 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
644
645 if !ids.is_empty()
646 && let Err(e) = self.batch_increment_access_count(ids.clone()).await
647 {
648 tracing::warn!("recall: failed to increment access counts: {e:#}");
649 }
650
651 if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
653 tracing::debug!(
654 error = %e,
655 "recall: failed to mark training data as recalled (non-fatal)"
656 );
657 }
658
659 let messages = self.sqlite.messages_by_ids(&ids).await?;
660 let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
661
662 let recalled: Vec<RecalledMessage> = ranked
663 .iter()
664 .filter_map(|(msg_id, score)| {
665 msg_map.get(msg_id).map(|msg| RecalledMessage {
666 message: msg.clone(),
667 #[expect(clippy::cast_possible_truncation)]
668 score: *score as f32,
669 })
670 })
671 .collect();
672
673 tracing::debug!(final_count = recalled.len(), "recall: final results");
674
675 Ok(recalled)
676 }
677
678 pub async fn recall_routed(
687 &self,
688 query: &str,
689 limit: usize,
690 filter: Option<SearchFilter>,
691 router: &dyn crate::router::MemoryRouter,
692 ) -> Result<Vec<RecalledMessage>, MemoryError> {
693 use crate::router::MemoryRoute;
694
695 let route = router.route(query);
696 tracing::debug!(?route, query_len = query.len(), "memory routing decision");
697
698 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
699
700 let (keyword_results, vector_results): (
701 Vec<(MessageId, f64)>,
702 Vec<crate::embedding_store::SearchResult>,
703 ) = match route {
704 MemoryRoute::Keyword => {
705 let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
706 (kw, Vec::new())
707 }
708 MemoryRoute::Semantic => {
709 let vr = self.recall_vectors_raw(query, limit, filter).await?;
710 (Vec::new(), vr)
711 }
712 MemoryRoute::Hybrid => {
713 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
714 Ok(r) => r,
715 Err(e) => {
716 tracing::warn!("FTS5 keyword search failed: {e:#}");
717 Vec::new()
718 }
719 };
720 let vr = self.recall_vectors_raw(query, limit, filter).await?;
721 (kw, vr)
722 }
723 MemoryRoute::Episodic => {
732 let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
733 let cleaned = crate::router::strip_temporal_keywords(query);
734 let search_query = if cleaned.is_empty() { query } else { &cleaned };
735 let kw = if let Some(ref r) = range {
736 self.sqlite
737 .keyword_search_with_time_range(
738 search_query,
739 limit,
740 conversation_id,
741 r.after.as_deref(),
742 r.before.as_deref(),
743 )
744 .await?
745 } else {
746 self.recall_fts5_raw(search_query, limit, conversation_id)
747 .await?
748 };
749 tracing::debug!(
750 has_range = range.is_some(),
751 cleaned_query = %search_query,
752 keyword_count = kw.len(),
753 "recall: episodic path"
754 );
755 (kw, Vec::new())
756 }
757 MemoryRoute::Graph => {
760 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
761 Ok(r) => r,
762 Err(e) => {
763 tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
764 Vec::new()
765 }
766 };
767 let vr = self.recall_vectors_raw(query, limit, filter).await?;
768 (kw, vr)
769 }
770 };
771
772 tracing::debug!(
773 keyword_count = keyword_results.len(),
774 vector_count = vector_results.len(),
775 "recall: routed search results"
776 );
777
778 self.recall_merge_and_rank(keyword_results, vector_results, limit)
779 .await
780 }
781
782 pub async fn recall_routed_async(
793 &self,
794 query: &str,
795 limit: usize,
796 filter: Option<crate::embedding_store::SearchFilter>,
797 router: &dyn crate::router::AsyncMemoryRouter,
798 ) -> Result<Vec<RecalledMessage>, MemoryError> {
799 use crate::router::MemoryRoute;
800
801 let decision = router.route_async(query).await;
802 let route = decision.route;
803 tracing::debug!(
804 ?route,
805 confidence = decision.confidence,
806 query_len = query.len(),
807 "memory routing decision (async)"
808 );
809
810 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
811
812 let (keyword_results, vector_results): (
813 Vec<(crate::types::MessageId, f64)>,
814 Vec<crate::embedding_store::SearchResult>,
815 ) = match route {
816 MemoryRoute::Keyword => {
817 let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
818 (kw, Vec::new())
819 }
820 MemoryRoute::Semantic => {
821 let vr = self.recall_vectors_raw(query, limit, filter).await?;
822 (Vec::new(), vr)
823 }
824 MemoryRoute::Hybrid => {
825 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
826 Ok(r) => r,
827 Err(e) => {
828 tracing::warn!("FTS5 keyword search failed: {e:#}");
829 Vec::new()
830 }
831 };
832 let vr = self.recall_vectors_raw(query, limit, filter).await?;
833 (kw, vr)
834 }
835 MemoryRoute::Episodic => {
836 let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
837 let cleaned = crate::router::strip_temporal_keywords(query);
838 let search_query = if cleaned.is_empty() { query } else { &cleaned };
839 let kw = if let Some(ref r) = range {
840 self.sqlite
841 .keyword_search_with_time_range(
842 search_query,
843 limit,
844 conversation_id,
845 r.after.as_deref(),
846 r.before.as_deref(),
847 )
848 .await?
849 } else {
850 self.recall_fts5_raw(search_query, limit, conversation_id)
851 .await?
852 };
853 (kw, Vec::new())
854 }
855 MemoryRoute::Graph => {
856 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
857 Ok(r) => r,
858 Err(e) => {
859 tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
860 Vec::new()
861 }
862 };
863 let vr = self.recall_vectors_raw(query, limit, filter).await?;
864 (kw, vr)
865 }
866 };
867
868 tracing::debug!(
869 keyword_count = keyword_results.len(),
870 vector_count = vector_results.len(),
871 "recall: routed search results (async)"
872 );
873
874 self.recall_merge_and_rank(keyword_results, vector_results, limit)
875 .await
876 }
877
878 pub async fn recall_graph(
892 &self,
893 query: &str,
894 limit: usize,
895 max_hops: u32,
896 at_timestamp: Option<&str>,
897 temporal_decay_rate: f64,
898 edge_types: &[crate::graph::EdgeType],
899 ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
900 let Some(store) = &self.graph_store else {
901 return Ok(Vec::new());
902 };
903
904 tracing::debug!(
905 query_len = query.len(),
906 limit,
907 max_hops,
908 "graph: starting recall"
909 );
910
911 let results = crate::graph::retrieval::graph_recall(
912 store,
913 self.qdrant.as_deref(),
914 &self.provider,
915 query,
916 limit,
917 max_hops,
918 at_timestamp,
919 temporal_decay_rate,
920 edge_types,
921 )
922 .await?;
923
924 tracing::debug!(result_count = results.len(), "graph: recall complete");
925
926 Ok(results)
927 }
928
929 pub async fn recall_graph_activated(
938 &self,
939 query: &str,
940 limit: usize,
941 params: crate::graph::SpreadingActivationParams,
942 edge_types: &[crate::graph::EdgeType],
943 ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
944 let Some(store) = &self.graph_store else {
945 return Ok(Vec::new());
946 };
947
948 tracing::debug!(
949 query_len = query.len(),
950 limit,
951 "spreading activation: starting graph recall"
952 );
953
954 let embeddings = self.qdrant.as_deref();
955 let results = crate::graph::retrieval::graph_recall_activated(
956 store,
957 embeddings,
958 &self.provider,
959 query,
960 limit,
961 params,
962 edge_types,
963 )
964 .await?;
965
966 tracing::debug!(
967 result_count = results.len(),
968 "spreading activation: graph recall complete"
969 );
970
971 Ok(results)
972 }
973
974 async fn batch_increment_access_count(
982 &self,
983 message_ids: Vec<MessageId>,
984 ) -> Result<(), MemoryError> {
985 if message_ids.is_empty() {
986 return Ok(());
987 }
988 self.sqlite.increment_access_counts(&message_ids).await
989 }
990
991 pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
997 match &self.qdrant {
998 Some(qdrant) => qdrant.has_embedding(message_id).await,
999 None => Ok(false),
1000 }
1001 }
1002
1003 pub async fn embed_missing(&self) -> Result<usize, MemoryError> {
1017 if self.qdrant.is_none() || !self.effective_embed_provider().supports_embeddings() {
1018 return Ok(0);
1019 }
1020
1021 let mut stream = std::pin::pin!(self.sqlite.stream_unembedded_messages(1000));
1022
1023 let mut count = 0usize;
1024 let mut total = 0usize;
1025 while let Some(row) = stream.next().await {
1026 let (msg_id, conversation_id, role, content) = match row {
1027 Ok(r) => r,
1028 Err(e) => {
1029 tracing::warn!("embed_missing: failed to read row: {e:#}");
1030 continue;
1031 }
1032 };
1033 total += 1;
1034 if self
1035 .embed_and_store_regular(msg_id, conversation_id, &role, &content)
1036 .await
1037 {
1038 count += 1;
1039 }
1040 }
1041
1042 if total > 0 {
1043 tracing::info!("Embedded {count}/{total} missing messages");
1044 }
1045 Ok(count)
1046 }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051 use super::*;
1052
1053 #[test]
1054 fn embed_context_default_all_none() {
1055 let ctx = EmbedContext::default();
1056 assert!(ctx.tool_name.is_none());
1057 assert!(ctx.exit_code.is_none());
1058 assert!(ctx.timestamp.is_none());
1059 }
1060
1061 #[test]
1062 fn embed_context_fields_set_correctly() {
1063 let ctx = EmbedContext {
1064 tool_name: Some("shell".to_string()),
1065 exit_code: Some(0),
1066 timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1067 };
1068 assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1069 assert_eq!(ctx.exit_code, Some(0));
1070 assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1071 }
1072
1073 #[test]
1074 fn embed_context_non_zero_exit_code() {
1075 let ctx = EmbedContext {
1076 tool_name: Some("shell".to_string()),
1077 exit_code: Some(1),
1078 timestamp: None,
1079 };
1080 assert_eq!(ctx.exit_code, Some(1));
1081 assert!(ctx.timestamp.is_none());
1082 }
1083}