1use futures::{StreamExt as _, TryStreamExt as _};
5use zeph_llm::provider::{LlmProvider as _, Message};
6
7const CHARS_PER_TOKEN: usize = 4;
9
10const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
12
13const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
15
16fn chunk_text(text: &str) -> Vec<&str> {
22 if text.len() <= CHUNK_CHARS {
23 return vec![text];
24 }
25
26 let mut chunks = Vec::new();
27 let mut start = 0;
28
29 while start < text.len() {
30 let end = if start + CHUNK_CHARS >= text.len() {
31 text.len()
32 } else {
33 let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
35 let slice = &text[start..boundary];
37 if let Some(pos) = slice.rfind("\n\n") {
38 start + pos + 2
39 } else if let Some(pos) = slice.rfind('\n') {
40 start + pos + 1
41 } else if let Some(pos) = slice.rfind(' ') {
42 start + pos + 1
43 } else {
44 boundary
45 }
46 };
47
48 chunks.push(&text[start..end]);
49 if end >= text.len() {
50 break;
51 }
52 let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
56 let new_start = text.ceil_char_boundary(next);
57 start = if new_start > start { new_start } else { end };
58 }
59
60 chunks
61}
62
63use crate::admission::log_admission_decision;
64use crate::embedding_store::{MessageKind, SearchFilter};
65use crate::error::MemoryError;
66use crate::types::{ConversationId, MessageId};
67
68use super::SemanticMemory;
69use super::algorithms::{apply_mmr, apply_temporal_decay};
70
71#[derive(Debug, Clone, Default)]
75pub struct EmbedContext {
76 pub tool_name: Option<String>,
77 pub exit_code: Option<i32>,
78 pub timestamp: Option<String>,
79}
80
81#[derive(Debug)]
82pub struct RecalledMessage {
83 pub message: Message,
84 pub score: f32,
85}
86
87const MAX_EMBED_BG_TASKS: usize = 64;
89
90struct EmbedBgArgs {
92 qdrant: std::sync::Arc<crate::embedding_store::EmbeddingStore>,
93 embed_provider: zeph_llm::any::AnyProvider,
94 embedding_model: String,
95 message_id: MessageId,
96 conversation_id: ConversationId,
97 role: String,
98 content: String,
99}
100
101async fn embed_and_store_regular_bg(args: EmbedBgArgs) {
105 let EmbedBgArgs {
106 qdrant,
107 embed_provider,
108 embedding_model,
109 message_id,
110 conversation_id,
111 role,
112 content,
113 } = args;
114 let chunks = chunk_text(&content);
115 let chunk_count = chunks.len();
116
117 let vectors = match embed_provider.embed_batch(&chunks).await {
118 Ok(v) => v,
119 Err(e) => {
120 tracing::warn!("bg embed_regular: failed to embed chunks for msg {message_id}: {e:#}");
121 return;
122 }
123 };
124
125 let Some(first) = vectors.first() else {
126 return;
127 };
128 let vector_size = first.len() as u64;
129 if let Err(e) = qdrant.ensure_collection(vector_size).await {
130 tracing::warn!("bg embed_regular: failed to ensure Qdrant collection: {e:#}");
131 return;
132 }
133
134 for (chunk_index, vector) in vectors.into_iter().enumerate() {
135 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
136 if let Err(e) = qdrant
137 .store(
138 message_id,
139 conversation_id,
140 &role,
141 vector,
142 MessageKind::Regular,
143 &embedding_model,
144 chunk_index_u32,
145 )
146 .await
147 {
148 tracing::warn!(
149 "bg embed_regular: failed to store chunk {chunk_index}/{chunk_count} \
150 for msg {message_id}: {e:#}"
151 );
152 }
153 }
154}
155
156async fn embed_chunks_with_tool_context_bg(args: EmbedBgArgs, embed_ctx: EmbedContext) {
160 let EmbedBgArgs {
161 qdrant,
162 embed_provider,
163 embedding_model,
164 message_id,
165 conversation_id,
166 role,
167 content,
168 } = args;
169 let chunks = chunk_text(&content);
170 let chunk_count = chunks.len();
171
172 let vectors = match embed_provider.embed_batch(&chunks).await {
173 Ok(v) => v,
174 Err(e) => {
175 tracing::warn!(
176 "bg embed_tool: failed to embed tool-output chunks for msg {message_id}: {e:#}"
177 );
178 return;
179 }
180 };
181
182 if let Some(first) = vectors.first() {
183 let vector_size = first.len() as u64;
184 if let Err(e) = qdrant.ensure_collection(vector_size).await {
185 tracing::warn!("bg embed_tool: failed to ensure Qdrant collection: {e:#}");
186 return;
187 }
188 }
189
190 for (chunk_index, vector) in vectors.into_iter().enumerate() {
191 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
192 let result = if let Some(ref tool_name) = embed_ctx.tool_name {
193 qdrant
194 .store_with_tool_context(
195 message_id,
196 conversation_id,
197 &role,
198 vector,
199 MessageKind::Regular,
200 &embedding_model,
201 chunk_index_u32,
202 tool_name,
203 embed_ctx.exit_code,
204 embed_ctx.timestamp.as_deref(),
205 )
206 .await
207 .map(|_| ())
208 } else {
209 qdrant
210 .store(
211 message_id,
212 conversation_id,
213 &role,
214 vector,
215 MessageKind::Regular,
216 &embedding_model,
217 chunk_index_u32,
218 )
219 .await
220 .map(|_| ())
221 };
222 if let Err(e) = result {
223 tracing::warn!(
224 "bg embed_tool: failed to store chunk {chunk_index}/{chunk_count} \
225 for msg {message_id}: {e:#}"
226 );
227 }
228 }
229}
230
231async fn embed_and_store_with_category_bg(args: EmbedBgArgs, category: Option<String>) {
235 let EmbedBgArgs {
236 qdrant,
237 embed_provider,
238 embedding_model,
239 message_id,
240 conversation_id,
241 role,
242 content,
243 } = args;
244 let chunks = chunk_text(&content);
245 let chunk_count = chunks.len();
246
247 let vectors = match embed_provider.embed_batch(&chunks).await {
248 Ok(v) => v,
249 Err(e) => {
250 tracing::warn!(
251 "bg embed_category: failed to embed categorized chunks for msg {message_id}: {e:#}"
252 );
253 return;
254 }
255 };
256
257 let Some(first) = vectors.first() else {
258 return;
259 };
260 let vector_size = first.len() as u64;
261 if let Err(e) = qdrant.ensure_collection(vector_size).await {
262 tracing::warn!("bg embed_category: failed to ensure Qdrant collection: {e:#}");
263 return;
264 }
265
266 for (chunk_index, vector) in vectors.into_iter().enumerate() {
267 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
268 if let Err(e) = qdrant
269 .store_with_category(
270 message_id,
271 conversation_id,
272 &role,
273 vector,
274 MessageKind::Regular,
275 &embedding_model,
276 chunk_index_u32,
277 category.as_deref(),
278 )
279 .await
280 {
281 tracing::warn!(
282 "bg embed_category: failed to store chunk {chunk_index}/{chunk_count} \
283 for msg {message_id}: {e:#}"
284 );
285 }
286 }
287}
288
289impl SemanticMemory {
290 #[cfg_attr(
300 feature = "profiling",
301 tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
302 )]
303 pub async fn remember(
304 &self,
305 conversation_id: ConversationId,
306 role: &str,
307 content: &str,
308 goal_text: Option<&str>,
309 ) -> Result<Option<MessageId>, MemoryError> {
310 if let Some(ref admission) = self.admission_control {
312 let decision = admission
313 .evaluate(
314 content,
315 role,
316 &self.provider,
317 self.qdrant.as_ref(),
318 goal_text,
319 )
320 .await;
321 let preview: String = content.chars().take(100).collect();
322 log_admission_decision(&decision, &preview, role, admission.threshold());
323 if !decision.admitted {
324 return Ok(None);
325 }
326 }
327
328 let message_id = self
329 .sqlite
330 .save_message(conversation_id, role, content)
331 .await?;
332
333 self.embed_and_store_regular(message_id, conversation_id, role, content);
334
335 Ok(Some(message_id))
336 }
337
338 #[cfg_attr(
347 feature = "profiling",
348 tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
349 )]
350 pub async fn remember_with_parts(
351 &self,
352 conversation_id: ConversationId,
353 role: &str,
354 content: &str,
355 parts_json: &str,
356 goal_text: Option<&str>,
357 ) -> Result<(Option<MessageId>, bool), MemoryError> {
358 if let Some(ref admission) = self.admission_control {
360 let decision = admission
361 .evaluate(
362 content,
363 role,
364 &self.provider,
365 self.qdrant.as_ref(),
366 goal_text,
367 )
368 .await;
369 let preview: String = content.chars().take(100).collect();
370 log_admission_decision(&decision, &preview, role, admission.threshold());
371 if !decision.admitted {
372 return Ok((None, false));
373 }
374 }
375
376 let message_id = self
377 .sqlite
378 .save_message_with_parts(conversation_id, role, content, parts_json)
379 .await?;
380
381 let embedding_stored =
382 self.embed_and_store_regular(message_id, conversation_id, role, content);
383
384 Ok((Some(message_id), embedding_stored))
385 }
386
387 #[cfg_attr(
399 feature = "profiling",
400 tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
401 )]
402 pub async fn remember_tool_output(
403 &self,
404 conversation_id: ConversationId,
405 role: &str,
406 content: &str,
407 parts_json: &str,
408 embed_ctx: EmbedContext,
409 ) -> Result<(Option<MessageId>, bool), MemoryError> {
410 if let Some(ref admission) = self.admission_control {
411 let decision = admission
412 .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
413 .await;
414 let preview: String = content.chars().take(100).collect();
415 log_admission_decision(&decision, &preview, role, admission.threshold());
416 if !decision.admitted {
417 return Ok((None, false));
418 }
419 }
420
421 let message_id = self
422 .sqlite
423 .save_message_with_parts(conversation_id, role, content, parts_json)
424 .await?;
425
426 let embedding_stored = self.embed_chunks_with_tool_context(
427 message_id,
428 conversation_id,
429 role,
430 content,
431 embed_ctx,
432 );
433
434 Ok((Some(message_id), embedding_stored))
435 }
436
437 #[cfg_attr(
448 feature = "profiling",
449 tracing::instrument(name = "memory.remember", skip_all, fields(content_len = %content.len()))
450 )]
451 pub async fn remember_categorized(
452 &self,
453 conversation_id: ConversationId,
454 role: &str,
455 content: &str,
456 category: Option<&str>,
457 goal_text: Option<&str>,
458 ) -> Result<Option<MessageId>, MemoryError> {
459 if let Some(ref admission) = self.admission_control {
460 let decision = admission
461 .evaluate(
462 content,
463 role,
464 &self.provider,
465 self.qdrant.as_ref(),
466 goal_text,
467 )
468 .await;
469 let preview: String = content.chars().take(100).collect();
470 log_admission_decision(&decision, &preview, role, admission.threshold());
471 if !decision.admitted {
472 return Ok(None);
473 }
474 }
475
476 let message_id = self
477 .sqlite
478 .save_message_with_category(conversation_id, role, content, category)
479 .await?;
480
481 self.embed_and_store_with_category(message_id, conversation_id, role, content, category);
482
483 Ok(Some(message_id))
484 }
485
486 pub async fn recall_with_category(
494 &self,
495 query: &str,
496 limit: usize,
497 filter: Option<SearchFilter>,
498 category: Option<&str>,
499 ) -> Result<Vec<RecalledMessage>, MemoryError> {
500 let filter_with_category = filter.map(|mut f| {
501 f.category = category.map(str::to_owned);
502 f
503 });
504 self.recall(query, limit, filter_with_category).await
505 }
506
507 pub fn reap_embed_tasks(&self) {
511 if let Ok(mut tasks) = self.embed_tasks.lock() {
512 while tasks.try_join_next().is_some() {}
513 }
514 }
515
516 fn spawn_embed_bg<F>(&self, fut: F) -> bool
520 where
521 F: std::future::Future<Output = ()> + Send + 'static,
522 {
523 let Ok(mut tasks) = self.embed_tasks.lock() else {
524 return false;
525 };
526 while tasks.try_join_next().is_some() {}
528 if tasks.len() >= MAX_EMBED_BG_TASKS {
529 tracing::debug!("background embed task limit reached, skipping");
530 return false;
531 }
532 tasks.spawn(fut);
533 true
534 }
535
536 fn embed_and_store_with_category(
540 &self,
541 message_id: MessageId,
542 conversation_id: ConversationId,
543 role: &str,
544 content: &str,
545 category: Option<&str>,
546 ) -> bool {
547 let Some(qdrant) = self.qdrant.clone() else {
548 return false;
549 };
550 let embed_provider = self.effective_embed_provider().clone();
551 if !embed_provider.supports_embeddings() {
552 return false;
553 }
554 self.spawn_embed_bg(embed_and_store_with_category_bg(
555 EmbedBgArgs {
556 qdrant,
557 embed_provider,
558 embedding_model: self.embedding_model.clone(),
559 message_id,
560 conversation_id,
561 role: role.to_owned(),
562 content: content.to_owned(),
563 },
564 category.map(str::to_owned),
565 ))
566 }
567
568 fn embed_and_store_regular(
572 &self,
573 message_id: MessageId,
574 conversation_id: ConversationId,
575 role: &str,
576 content: &str,
577 ) -> bool {
578 let Some(qdrant) = self.qdrant.clone() else {
579 return false;
580 };
581 let embed_provider = self.effective_embed_provider().clone();
582 if !embed_provider.supports_embeddings() {
583 return false;
584 }
585 self.spawn_embed_bg(embed_and_store_regular_bg(EmbedBgArgs {
586 qdrant,
587 embed_provider,
588 embedding_model: self.embedding_model.clone(),
589 message_id,
590 conversation_id,
591 role: role.to_owned(),
592 content: content.to_owned(),
593 }))
594 }
595
596 fn embed_chunks_with_tool_context(
600 &self,
601 message_id: MessageId,
602 conversation_id: ConversationId,
603 role: &str,
604 content: &str,
605 embed_ctx: EmbedContext,
606 ) -> bool {
607 let Some(qdrant) = self.qdrant.clone() else {
608 return false;
609 };
610 let embed_provider = self.effective_embed_provider().clone();
611 if !embed_provider.supports_embeddings() {
612 return false;
613 }
614 self.spawn_embed_bg(embed_chunks_with_tool_context_bg(
615 EmbedBgArgs {
616 qdrant,
617 embed_provider,
618 embedding_model: self.embedding_model.clone(),
619 message_id,
620 conversation_id,
621 role: role.to_owned(),
622 content: content.to_owned(),
623 },
624 embed_ctx,
625 ))
626 }
627
628 pub async fn save_only(
636 &self,
637 conversation_id: ConversationId,
638 role: &str,
639 content: &str,
640 parts_json: &str,
641 ) -> Result<MessageId, MemoryError> {
642 self.sqlite
643 .save_message_with_parts(conversation_id, role, content, parts_json)
644 .await
645 }
646
647 #[cfg_attr(
657 feature = "profiling",
658 tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty, top_score = tracing::field::Empty))
659 )]
660 pub async fn recall(
661 &self,
662 query: &str,
663 limit: usize,
664 filter: Option<SearchFilter>,
665 ) -> Result<Vec<RecalledMessage>, MemoryError> {
666 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
667
668 tracing::debug!(
669 query_len = query.len(),
670 limit,
671 has_filter = filter.is_some(),
672 conversation_id = conversation_id.map(|c| c.0),
673 has_qdrant = self.qdrant.is_some(),
674 "recall: starting hybrid search"
675 );
676
677 let keyword_results = match self
678 .sqlite
679 .keyword_search(query, limit * 2, conversation_id)
680 .await
681 {
682 Ok(results) => results,
683 Err(e) => {
684 tracing::warn!("FTS5 keyword search failed: {e:#}");
685 Vec::new()
686 }
687 };
688
689 let vector_results = if let Some(qdrant) = &self.qdrant
690 && self.provider.supports_embeddings()
691 {
692 let query_vector = self.provider.embed(query).await?;
693 let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
694 qdrant.ensure_collection(vector_size).await?;
695 qdrant.search(&query_vector, limit * 2, filter).await?
696 } else {
697 Vec::new()
698 };
699
700 let results = self
701 .recall_merge_and_rank(keyword_results, vector_results, limit)
702 .await?;
703 #[cfg(feature = "profiling")]
704 {
705 let span = tracing::Span::current();
706 span.record("result_count", results.len());
707 if let Some(top) = results.first() {
708 span.record("top_score", top.score);
709 }
710 }
711 Ok(results)
712 }
713
714 pub(super) async fn recall_fts5_raw(
715 &self,
716 query: &str,
717 limit: usize,
718 conversation_id: Option<ConversationId>,
719 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
720 self.sqlite
721 .keyword_search(query, limit * 2, conversation_id)
722 .await
723 }
724
725 pub(super) async fn recall_vectors_raw(
726 &self,
727 query: &str,
728 limit: usize,
729 filter: Option<SearchFilter>,
730 ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
731 let Some(qdrant) = &self.qdrant else {
732 return Ok(Vec::new());
733 };
734 if !self.provider.supports_embeddings() {
735 return Ok(Vec::new());
736 }
737 let query_vector = self.provider.embed(query).await?;
738 let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
739 qdrant.ensure_collection(vector_size).await?;
740 qdrant.search(&query_vector, limit * 2, filter).await
741 }
742
743 #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
752 pub(super) async fn recall_merge_and_rank(
753 &self,
754 keyword_results: Vec<(MessageId, f64)>,
755 vector_results: Vec<crate::embedding_store::SearchResult>,
756 limit: usize,
757 ) -> Result<Vec<RecalledMessage>, MemoryError> {
758 tracing::debug!(
759 vector_count = vector_results.len(),
760 keyword_count = keyword_results.len(),
761 limit,
762 "recall: merging search results"
763 );
764
765 let mut scores: std::collections::HashMap<MessageId, f64> =
766 std::collections::HashMap::new();
767
768 if !vector_results.is_empty() {
769 let max_vs = vector_results
770 .iter()
771 .map(|r| r.score)
772 .fold(f32::NEG_INFINITY, f32::max);
773 let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
774 for r in &vector_results {
775 let normalized = f64::from(r.score / norm);
776 *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
777 }
778 }
779
780 if !keyword_results.is_empty() {
781 let max_ks = keyword_results
782 .iter()
783 .map(|r| r.1)
784 .fold(f64::NEG_INFINITY, f64::max);
785 let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
786 for &(msg_id, score) in &keyword_results {
787 let normalized = score / norm;
788 *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
789 }
790 }
791
792 if scores.is_empty() {
793 tracing::debug!("recall: empty merge, no overlapping scores");
794 return Ok(Vec::new());
795 }
796
797 let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
798 ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
799
800 tracing::debug!(
801 merged = ranked.len(),
802 top_score = ranked.first().map(|r| r.1),
803 bottom_score = ranked.last().map(|r| r.1),
804 vector_weight = %self.vector_weight,
805 keyword_weight = %self.keyword_weight,
806 "recall: weighted merge complete"
807 );
808
809 if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
810 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
811 match self.sqlite.message_timestamps(&ids).await {
812 Ok(timestamps) => {
813 apply_temporal_decay(
814 &mut ranked,
815 ×tamps,
816 self.temporal_decay_half_life_days,
817 );
818 ranked
819 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
820 tracing::debug!(
821 half_life_days = self.temporal_decay_half_life_days,
822 top_score_after = ranked.first().map(|r| r.1),
823 "recall: temporal decay applied"
824 );
825 }
826 Err(e) => {
827 tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
828 }
829 }
830 }
831
832 if self.mmr_enabled && !vector_results.is_empty() {
833 if let Some(qdrant) = &self.qdrant {
834 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
835 match qdrant.get_vectors(&ids).await {
836 Ok(vec_map) if !vec_map.is_empty() => {
837 let ranked_len_before = ranked.len();
838 ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
839 tracing::debug!(
840 before = ranked_len_before,
841 after = ranked.len(),
842 lambda = %self.mmr_lambda,
843 "recall: mmr re-ranked"
844 );
845 }
846 Ok(_) => {
847 ranked.truncate(limit);
848 }
849 Err(e) => {
850 tracing::warn!("MMR: failed to fetch vectors: {e:#}");
851 ranked.truncate(limit);
852 }
853 }
854 } else {
855 ranked.truncate(limit);
856 }
857 } else {
858 ranked.truncate(limit);
859 }
860
861 if self.importance_enabled && !ranked.is_empty() {
862 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
863 match self.sqlite.fetch_importance_scores(&ids).await {
864 Ok(scores) => {
865 for (msg_id, score) in &mut ranked {
866 if let Some(&imp) = scores.get(msg_id) {
867 *score += imp * self.importance_weight;
868 }
869 }
870 ranked
871 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
872 tracing::debug!(
873 importance_weight = %self.importance_weight,
874 "recall: importance scores blended"
875 );
876 }
877 Err(e) => {
878 tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
879 }
880 }
881 }
882
883 if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
887 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
888 match self.sqlite.fetch_tiers(&ids).await {
889 Ok(tiers) => {
890 let bonus = self.tier_boost_semantic - 1.0;
891 let mut boosted = false;
892 for (msg_id, score) in &mut ranked {
893 if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
894 *score += bonus;
895 boosted = true;
896 }
897 }
898 if boosted {
899 ranked.sort_by(|a, b| {
900 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
901 });
902 tracing::debug!(
903 tier_boost = %self.tier_boost_semantic,
904 "recall: semantic tier boost applied"
905 );
906 }
907 }
908 Err(e) => {
909 tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
910 }
911 }
912 }
913
914 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
915
916 if !ids.is_empty()
917 && let Err(e) = self.batch_increment_access_count(ids.clone()).await
918 {
919 tracing::warn!("recall: failed to increment access counts: {e:#}");
920 }
921
922 if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
924 tracing::debug!(
925 error = %e,
926 "recall: failed to mark training data as recalled (non-fatal)"
927 );
928 }
929
930 let messages = self.sqlite.messages_by_ids(&ids).await?;
931 let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
932
933 let recalled: Vec<RecalledMessage> = ranked
934 .iter()
935 .filter_map(|(msg_id, score)| {
936 msg_map.get(msg_id).map(|msg| RecalledMessage {
937 message: msg.clone(),
938 #[expect(clippy::cast_possible_truncation)]
939 score: *score as f32,
940 })
941 })
942 .collect();
943
944 tracing::debug!(final_count = recalled.len(), "recall: final results");
945
946 Ok(recalled)
947 }
948
949 #[cfg_attr(
958 feature = "profiling",
959 tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty))
960 )]
961 pub async fn recall_routed(
962 &self,
963 query: &str,
964 limit: usize,
965 filter: Option<SearchFilter>,
966 router: &dyn crate::router::MemoryRouter,
967 ) -> Result<Vec<RecalledMessage>, MemoryError> {
968 use crate::router::MemoryRoute;
969
970 let route = router.route(query);
971 tracing::debug!(?route, query_len = query.len(), "memory routing decision");
972
973 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
974
975 let (keyword_results, vector_results): (
976 Vec<(MessageId, f64)>,
977 Vec<crate::embedding_store::SearchResult>,
978 ) = match route {
979 MemoryRoute::Keyword => {
980 let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
981 (kw, Vec::new())
982 }
983 MemoryRoute::Semantic => {
984 let vr = self.recall_vectors_raw(query, limit, filter).await?;
985 (Vec::new(), vr)
986 }
987 MemoryRoute::Hybrid => {
988 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
989 Ok(r) => r,
990 Err(e) => {
991 tracing::warn!("FTS5 keyword search failed: {e:#}");
992 Vec::new()
993 }
994 };
995 let vr = self.recall_vectors_raw(query, limit, filter).await?;
996 (kw, vr)
997 }
998 MemoryRoute::Episodic => {
1007 let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
1008 let cleaned = crate::router::strip_temporal_keywords(query);
1009 let search_query = if cleaned.is_empty() { query } else { &cleaned };
1010 let kw = if let Some(ref r) = range {
1011 self.sqlite
1012 .keyword_search_with_time_range(
1013 search_query,
1014 limit,
1015 conversation_id,
1016 r.after.as_deref(),
1017 r.before.as_deref(),
1018 )
1019 .await?
1020 } else {
1021 self.recall_fts5_raw(search_query, limit, conversation_id)
1022 .await?
1023 };
1024 tracing::debug!(
1025 has_range = range.is_some(),
1026 cleaned_query = %search_query,
1027 keyword_count = kw.len(),
1028 "recall: episodic path"
1029 );
1030 (kw, Vec::new())
1031 }
1032 MemoryRoute::Graph => {
1035 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1036 Ok(r) => r,
1037 Err(e) => {
1038 tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
1039 Vec::new()
1040 }
1041 };
1042 let vr = self.recall_vectors_raw(query, limit, filter).await?;
1043 (kw, vr)
1044 }
1045 };
1046
1047 tracing::debug!(
1048 keyword_count = keyword_results.len(),
1049 vector_count = vector_results.len(),
1050 "recall: routed search results"
1051 );
1052
1053 self.recall_merge_and_rank(keyword_results, vector_results, limit)
1054 .await
1055 }
1056
1057 #[cfg_attr(
1068 feature = "profiling",
1069 tracing::instrument(name = "memory.recall", skip_all, fields(query_len = %query.len(), result_count = tracing::field::Empty))
1070 )]
1071 pub async fn recall_routed_async(
1072 &self,
1073 query: &str,
1074 limit: usize,
1075 filter: Option<crate::embedding_store::SearchFilter>,
1076 router: &dyn crate::router::AsyncMemoryRouter,
1077 ) -> Result<Vec<RecalledMessage>, MemoryError> {
1078 use crate::router::MemoryRoute;
1079
1080 let decision = router.route_async(query).await;
1081 let route = decision.route;
1082 tracing::debug!(
1083 ?route,
1084 confidence = decision.confidence,
1085 query_len = query.len(),
1086 "memory routing decision (async)"
1087 );
1088
1089 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
1090
1091 let (keyword_results, vector_results): (
1092 Vec<(crate::types::MessageId, f64)>,
1093 Vec<crate::embedding_store::SearchResult>,
1094 ) = match route {
1095 MemoryRoute::Keyword => {
1096 let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
1097 (kw, Vec::new())
1098 }
1099 MemoryRoute::Semantic => {
1100 let vr = self.recall_vectors_raw(query, limit, filter).await?;
1101 (Vec::new(), vr)
1102 }
1103 MemoryRoute::Hybrid => {
1104 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1105 Ok(r) => r,
1106 Err(e) => {
1107 tracing::warn!("FTS5 keyword search failed: {e:#}");
1108 Vec::new()
1109 }
1110 };
1111 let vr = self.recall_vectors_raw(query, limit, filter).await?;
1112 (kw, vr)
1113 }
1114 MemoryRoute::Episodic => {
1115 let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
1116 let cleaned = crate::router::strip_temporal_keywords(query);
1117 let search_query = if cleaned.is_empty() { query } else { &cleaned };
1118 let kw = if let Some(ref r) = range {
1119 self.sqlite
1120 .keyword_search_with_time_range(
1121 search_query,
1122 limit,
1123 conversation_id,
1124 r.after.as_deref(),
1125 r.before.as_deref(),
1126 )
1127 .await?
1128 } else {
1129 self.recall_fts5_raw(search_query, limit, conversation_id)
1130 .await?
1131 };
1132 (kw, Vec::new())
1133 }
1134 MemoryRoute::Graph => {
1135 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
1136 Ok(r) => r,
1137 Err(e) => {
1138 tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
1139 Vec::new()
1140 }
1141 };
1142 let vr = self.recall_vectors_raw(query, limit, filter).await?;
1143 (kw, vr)
1144 }
1145 };
1146
1147 tracing::debug!(
1148 keyword_count = keyword_results.len(),
1149 vector_count = vector_results.len(),
1150 "recall: routed search results (async)"
1151 );
1152
1153 self.recall_merge_and_rank(keyword_results, vector_results, limit)
1154 .await
1155 }
1156
1157 #[cfg_attr(
1171 feature = "profiling",
1172 tracing::instrument(name = "memory.recall_graph", skip_all, fields(result_count = tracing::field::Empty))
1173 )]
1174 pub async fn recall_graph(
1175 &self,
1176 query: &str,
1177 limit: usize,
1178 max_hops: u32,
1179 at_timestamp: Option<&str>,
1180 temporal_decay_rate: f64,
1181 edge_types: &[crate::graph::EdgeType],
1182 ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
1183 let Some(store) = &self.graph_store else {
1184 return Ok(Vec::new());
1185 };
1186
1187 tracing::debug!(
1188 query_len = query.len(),
1189 limit,
1190 max_hops,
1191 "graph: starting recall"
1192 );
1193
1194 let results = crate::graph::retrieval::graph_recall(
1195 store,
1196 self.qdrant.as_deref(),
1197 &self.provider,
1198 query,
1199 limit,
1200 max_hops,
1201 at_timestamp,
1202 temporal_decay_rate,
1203 edge_types,
1204 )
1205 .await?;
1206
1207 tracing::debug!(result_count = results.len(), "graph: recall complete");
1208 #[cfg(feature = "profiling")]
1209 tracing::Span::current().record("result_count", results.len());
1210
1211 Ok(results)
1212 }
1213
1214 #[cfg_attr(
1223 feature = "profiling",
1224 tracing::instrument(name = "memory.recall_graph", skip_all, fields(result_count = tracing::field::Empty))
1225 )]
1226 pub async fn recall_graph_activated(
1227 &self,
1228 query: &str,
1229 limit: usize,
1230 params: crate::graph::SpreadingActivationParams,
1231 edge_types: &[crate::graph::EdgeType],
1232 ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
1233 let Some(store) = &self.graph_store else {
1234 return Ok(Vec::new());
1235 };
1236
1237 tracing::debug!(
1238 query_len = query.len(),
1239 limit,
1240 "spreading activation: starting graph recall"
1241 );
1242
1243 let embeddings = self.qdrant.as_deref();
1244 let results = crate::graph::retrieval::graph_recall_activated(
1245 store,
1246 embeddings,
1247 &self.provider,
1248 query,
1249 limit,
1250 params,
1251 edge_types,
1252 )
1253 .await?;
1254
1255 tracing::debug!(
1256 result_count = results.len(),
1257 "spreading activation: graph recall complete"
1258 );
1259
1260 Ok(results)
1261 }
1262
1263 async fn batch_increment_access_count(
1271 &self,
1272 message_ids: Vec<MessageId>,
1273 ) -> Result<(), MemoryError> {
1274 if message_ids.is_empty() {
1275 return Ok(());
1276 }
1277 self.sqlite.increment_access_counts(&message_ids).await
1278 }
1279
1280 pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
1286 match &self.qdrant {
1287 Some(qdrant) => qdrant.has_embedding(message_id).await,
1288 None => Ok(false),
1289 }
1290 }
1291
1292 pub async fn embed_missing(
1308 &self,
1309 progress_tx: Option<tokio::sync::watch::Sender<Option<super::BackfillProgress>>>,
1310 ) -> Result<usize, MemoryError> {
1311 if self.qdrant.is_none() || !self.effective_embed_provider().supports_embeddings() {
1312 return Ok(0);
1313 }
1314
1315 let total = self.sqlite.count_unembedded_messages().await?;
1316 if total == 0 {
1317 return Ok(0);
1318 }
1319
1320 if let Some(tx) = &progress_tx {
1321 let _ = tx.send(Some(super::BackfillProgress { done: 0, total }));
1322 }
1323
1324 let mut done = 0usize;
1325 let mut succeeded = 0usize;
1326
1327 loop {
1328 const BATCH_SIZE: usize = 32;
1329 const BATCH_SIZE_I64: i64 = 32;
1330 let rows: Vec<_> = self
1331 .sqlite
1332 .stream_unembedded_messages(BATCH_SIZE_I64)
1333 .try_collect()
1334 .await?;
1335
1336 if rows.is_empty() {
1337 break;
1338 }
1339
1340 let batch_len = rows.len();
1341
1342 let results: Vec<bool> = futures::stream::iter(rows)
1343 .map(|(msg_id, conv_id, role, content)| async move {
1344 self.embed_and_store_regular(msg_id, conv_id, &role, &content)
1345 })
1346 .buffer_unordered(4)
1347 .collect()
1348 .await;
1349
1350 for ok in &results {
1351 done += 1;
1352 if *ok {
1353 succeeded += 1;
1354 }
1355 if let Some(tx) = &progress_tx {
1356 let _ = tx.send(Some(super::BackfillProgress { done, total }));
1357 }
1358 }
1359
1360 let batch_succeeded = results.iter().filter(|&&b| b).count();
1361 if batch_succeeded > 0 {
1362 tracing::debug!("Backfill batch: {batch_succeeded}/{batch_len} embedded");
1363 }
1364
1365 if batch_len < BATCH_SIZE {
1366 break;
1367 }
1368 }
1369
1370 if let Some(tx) = &progress_tx {
1371 let _ = tx.send(None);
1372 }
1373
1374 if done > 0 {
1375 tracing::info!("Embedded {succeeded}/{total} missing messages");
1376 }
1377 Ok(succeeded)
1378 }
1379}
1380
1381#[cfg(test)]
1382mod tests {
1383 use super::*;
1384
1385 #[test]
1386 fn embed_context_default_all_none() {
1387 let ctx = EmbedContext::default();
1388 assert!(ctx.tool_name.is_none());
1389 assert!(ctx.exit_code.is_none());
1390 assert!(ctx.timestamp.is_none());
1391 }
1392
1393 #[test]
1394 fn embed_context_fields_set_correctly() {
1395 let ctx = EmbedContext {
1396 tool_name: Some("shell".to_string()),
1397 exit_code: Some(0),
1398 timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1399 };
1400 assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1401 assert_eq!(ctx.exit_code, Some(0));
1402 assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1403 }
1404
1405 #[test]
1406 fn embed_context_non_zero_exit_code() {
1407 let ctx = EmbedContext {
1408 tool_name: Some("shell".to_string()),
1409 exit_code: Some(1),
1410 timestamp: None,
1411 };
1412 assert_eq!(ctx.exit_code, Some(1));
1413 assert!(ctx.timestamp.is_none());
1414 }
1415
1416 async fn make_semantic_memory() -> crate::semantic::SemanticMemory {
1417 use std::sync::Arc;
1418 use std::sync::atomic::AtomicU64;
1419 use zeph_llm::any::AnyProvider;
1420 use zeph_llm::mock::MockProvider;
1421
1422 let provider = AnyProvider::Mock(MockProvider::default());
1423 let sqlite = crate::store::SqliteStore::new(":memory:").await.unwrap();
1424 crate::semantic::SemanticMemory {
1425 sqlite,
1426 qdrant: None,
1427 provider,
1428 embed_provider: None,
1429 embedding_model: "test-model".into(),
1430 vector_weight: 0.7,
1431 keyword_weight: 0.3,
1432 temporal_decay_enabled: false,
1433 temporal_decay_half_life_days: 30,
1434 mmr_enabled: false,
1435 mmr_lambda: 0.7,
1436 importance_enabled: false,
1437 importance_weight: 0.15,
1438 token_counter: Arc::new(crate::token_counter::TokenCounter::new()),
1439 graph_store: None,
1440 community_detection_failures: Arc::new(AtomicU64::new(0)),
1441 graph_extraction_count: Arc::new(AtomicU64::new(0)),
1442 graph_extraction_failures: Arc::new(AtomicU64::new(0)),
1443 tier_boost_semantic: 1.3,
1444 admission_control: None,
1445 key_facts_dedup_threshold: 0.95,
1446 embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
1447 }
1448 }
1449
1450 #[tokio::test]
1451 async fn spawn_embed_bg_returns_true_when_capacity_available() {
1452 let memory = make_semantic_memory().await;
1453 let dispatched = memory.spawn_embed_bg(std::future::ready(()));
1454 assert!(
1455 dispatched,
1456 "spawn_embed_bg must return true when a task was successfully spawned"
1457 );
1458 }
1459
1460 #[tokio::test]
1461 async fn spawn_embed_bg_returns_false_at_capacity() {
1462 let memory = make_semantic_memory().await;
1463
1464 {
1466 let mut tasks = memory.embed_tasks.lock().unwrap();
1467 for _ in 0..MAX_EMBED_BG_TASKS {
1468 tasks.spawn(std::future::pending::<()>());
1469 }
1470 }
1471
1472 let dispatched = memory.spawn_embed_bg(std::future::ready(()));
1473 assert!(
1474 !dispatched,
1475 "spawn_embed_bg must return false when the task limit is reached"
1476 );
1477 }
1478}