1use zeph_llm::provider::{LlmProvider as _, Message};
5
6const CHARS_PER_TOKEN: usize = 4;
8
9const CHUNK_CHARS: usize = 400 * CHARS_PER_TOKEN;
11
12const CHUNK_OVERLAP_CHARS: usize = 80 * CHARS_PER_TOKEN;
14
15fn chunk_text(text: &str) -> Vec<&str> {
21 if text.len() <= CHUNK_CHARS {
22 return vec![text];
23 }
24
25 let mut chunks = Vec::new();
26 let mut start = 0;
27
28 while start < text.len() {
29 let end = if start + CHUNK_CHARS >= text.len() {
30 text.len()
31 } else {
32 let boundary = text.floor_char_boundary(start + CHUNK_CHARS);
34 let slice = &text[start..boundary];
36 if let Some(pos) = slice.rfind("\n\n") {
37 start + pos + 2
38 } else if let Some(pos) = slice.rfind('\n') {
39 start + pos + 1
40 } else if let Some(pos) = slice.rfind(' ') {
41 start + pos + 1
42 } else {
43 boundary
44 }
45 };
46
47 chunks.push(&text[start..end]);
48 if end >= text.len() {
49 break;
50 }
51 let next = end.saturating_sub(CHUNK_OVERLAP_CHARS);
53 start = text.ceil_char_boundary(next);
54 if start >= end {
55 start = end; }
57 }
58
59 chunks
60}
61
62use crate::admission::log_admission_decision;
63use crate::embedding_store::{MessageKind, SearchFilter};
64use crate::error::MemoryError;
65use crate::types::{ConversationId, MessageId};
66
67use super::SemanticMemory;
68use super::algorithms::{apply_mmr, apply_temporal_decay};
69
70#[derive(Debug, Clone, Default)]
74pub struct EmbedContext {
75 pub tool_name: Option<String>,
76 pub exit_code: Option<i32>,
77 pub timestamp: Option<String>,
78}
79
80#[derive(Debug)]
81pub struct RecalledMessage {
82 pub message: Message,
83 pub score: f32,
84}
85
86impl SemanticMemory {
87 pub async fn remember(
97 &self,
98 conversation_id: ConversationId,
99 role: &str,
100 content: &str,
101 goal_text: Option<&str>,
102 ) -> Result<Option<MessageId>, MemoryError> {
103 if let Some(ref admission) = self.admission_control {
105 let decision = admission
106 .evaluate(
107 content,
108 role,
109 &self.provider,
110 self.qdrant.as_ref(),
111 goal_text,
112 )
113 .await;
114 let preview: String = content.chars().take(100).collect();
115 log_admission_decision(&decision, &preview, role, admission.threshold());
116 if !decision.admitted {
117 return Ok(None);
118 }
119 }
120
121 let message_id = self
122 .sqlite
123 .save_message(conversation_id, role, content)
124 .await?;
125
126 if let Some(qdrant) = &self.qdrant
127 && self.provider.supports_embeddings()
128 {
129 let chunks = chunk_text(content);
130 let chunk_count = chunks.len();
131 let mut collection_ready = false;
132
133 for (chunk_index, chunk) in chunks.into_iter().enumerate() {
134 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
135 match self.provider.embed(chunk).await {
136 Ok(vector) => {
137 if !collection_ready {
138 let vector_size = u64::try_from(vector.len()).unwrap_or(896);
139 if let Err(e) = qdrant.ensure_collection(vector_size).await {
140 tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
141 break;
142 }
143 collection_ready = true;
144 }
145 if let Err(e) = qdrant
146 .store(
147 message_id,
148 conversation_id,
149 role,
150 vector,
151 MessageKind::Regular,
152 &self.embedding_model,
153 chunk_index_u32,
154 )
155 .await
156 {
157 tracing::warn!(
158 "Failed to store chunk {chunk_index}/{chunk_count} \
159 for msg {message_id}: {e:#}"
160 );
161 }
162 }
163 Err(e) => {
164 tracing::warn!(
165 "Failed to embed chunk {chunk_index}/{chunk_count} \
166 for msg {message_id}: {e:#}"
167 );
168 }
169 }
170 }
171 }
172
173 Ok(Some(message_id))
174 }
175
176 pub async fn remember_with_parts(
185 &self,
186 conversation_id: ConversationId,
187 role: &str,
188 content: &str,
189 parts_json: &str,
190 goal_text: Option<&str>,
191 ) -> Result<(Option<MessageId>, bool), MemoryError> {
192 if let Some(ref admission) = self.admission_control {
194 let decision = admission
195 .evaluate(
196 content,
197 role,
198 &self.provider,
199 self.qdrant.as_ref(),
200 goal_text,
201 )
202 .await;
203 let preview: String = content.chars().take(100).collect();
204 log_admission_decision(&decision, &preview, role, admission.threshold());
205 if !decision.admitted {
206 return Ok((None, false));
207 }
208 }
209
210 let message_id = self
211 .sqlite
212 .save_message_with_parts(conversation_id, role, content, parts_json)
213 .await?;
214
215 let mut embedding_stored = false;
216
217 if let Some(qdrant) = &self.qdrant
218 && self.provider.supports_embeddings()
219 {
220 let chunks = chunk_text(content);
221 let chunk_count = chunks.len();
222 let mut collection_ready = false;
223
224 for (chunk_index, chunk) in chunks.into_iter().enumerate() {
225 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
226 match self.provider.embed(chunk).await {
227 Ok(vector) => {
228 if !collection_ready {
229 let vector_size = u64::try_from(vector.len()).unwrap_or(896);
230 if let Err(e) = qdrant.ensure_collection(vector_size).await {
231 tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
232 break;
233 }
234 collection_ready = true;
235 }
236 if let Err(e) = qdrant
237 .store(
238 message_id,
239 conversation_id,
240 role,
241 vector,
242 MessageKind::Regular,
243 &self.embedding_model,
244 chunk_index_u32,
245 )
246 .await
247 {
248 tracing::warn!(
249 "Failed to store chunk {chunk_index}/{chunk_count} \
250 for msg {message_id}: {e:#}"
251 );
252 } else {
253 embedding_stored = true;
254 }
255 }
256 Err(e) => {
257 tracing::warn!(
258 "Failed to embed chunk {chunk_index}/{chunk_count} \
259 for msg {message_id}: {e:#}"
260 );
261 }
262 }
263 }
264 }
265
266 Ok((Some(message_id), embedding_stored))
267 }
268
269 pub async fn remember_tool_output(
281 &self,
282 conversation_id: ConversationId,
283 role: &str,
284 content: &str,
285 parts_json: &str,
286 embed_ctx: EmbedContext,
287 ) -> Result<(Option<MessageId>, bool), MemoryError> {
288 if let Some(ref admission) = self.admission_control {
289 let decision = admission
290 .evaluate(content, role, &self.provider, self.qdrant.as_ref(), None)
291 .await;
292 let preview: String = content.chars().take(100).collect();
293 log_admission_decision(&decision, &preview, role, admission.threshold());
294 if !decision.admitted {
295 return Ok((None, false));
296 }
297 }
298
299 let message_id = self
300 .sqlite
301 .save_message_with_parts(conversation_id, role, content, parts_json)
302 .await?;
303
304 let embedding_stored = self
305 .embed_chunks_with_tool_context(message_id, conversation_id, role, content, embed_ctx)
306 .await;
307
308 Ok((Some(message_id), embedding_stored))
309 }
310
311 async fn embed_chunks_with_tool_context(
315 &self,
316 message_id: MessageId,
317 conversation_id: ConversationId,
318 role: &str,
319 content: &str,
320 embed_ctx: EmbedContext,
321 ) -> bool {
322 let Some(qdrant) = &self.qdrant else {
323 return false;
324 };
325 if !self.provider.supports_embeddings() {
326 return false;
327 }
328
329 let chunks = chunk_text(content);
330 let chunk_count = chunks.len();
331 let mut collection_ready = false;
332 let mut stored = false;
333
334 for (chunk_index, chunk) in chunks.into_iter().enumerate() {
335 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
336 let Ok(vector) = self.provider.embed(chunk).await else {
337 tracing::warn!(
338 "Failed to embed tool-output chunk {chunk_index}/{chunk_count} \
339 for msg {message_id}"
340 );
341 continue;
342 };
343 if !collection_ready {
344 let vector_size = u64::try_from(vector.len()).unwrap_or(896);
345 if let Err(e) = qdrant.ensure_collection(vector_size).await {
346 tracing::warn!("Failed to ensure Qdrant collection: {e:#}");
347 break;
348 }
349 collection_ready = true;
350 }
351 let result = if let Some(ref tool_name) = embed_ctx.tool_name {
352 qdrant
353 .store_with_tool_context(
354 message_id,
355 conversation_id,
356 role,
357 vector,
358 MessageKind::Regular,
359 &self.embedding_model,
360 chunk_index_u32,
361 tool_name,
362 embed_ctx.exit_code,
363 embed_ctx.timestamp.as_deref(),
364 )
365 .await
366 .map(|_| ())
367 } else {
368 qdrant
369 .store(
370 message_id,
371 conversation_id,
372 role,
373 vector,
374 MessageKind::Regular,
375 &self.embedding_model,
376 chunk_index_u32,
377 )
378 .await
379 .map(|_| ())
380 };
381 match result {
382 Ok(()) => stored = true,
383 Err(e) => tracing::warn!(
384 "Failed to store tool-output chunk {chunk_index}/{chunk_count} \
385 for msg {message_id}: {e:#}"
386 ),
387 }
388 }
389
390 stored
391 }
392
393 pub async fn save_only(
401 &self,
402 conversation_id: ConversationId,
403 role: &str,
404 content: &str,
405 parts_json: &str,
406 ) -> Result<MessageId, MemoryError> {
407 self.sqlite
408 .save_message_with_parts(conversation_id, role, content, parts_json)
409 .await
410 }
411
412 pub async fn recall(
422 &self,
423 query: &str,
424 limit: usize,
425 filter: Option<SearchFilter>,
426 ) -> Result<Vec<RecalledMessage>, MemoryError> {
427 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
428
429 tracing::debug!(
430 query_len = query.len(),
431 limit,
432 has_filter = filter.is_some(),
433 conversation_id = conversation_id.map(|c| c.0),
434 has_qdrant = self.qdrant.is_some(),
435 "recall: starting hybrid search"
436 );
437
438 let keyword_results = match self
439 .sqlite
440 .keyword_search(query, limit * 2, conversation_id)
441 .await
442 {
443 Ok(results) => results,
444 Err(e) => {
445 tracing::warn!("FTS5 keyword search failed: {e:#}");
446 Vec::new()
447 }
448 };
449
450 let vector_results = if let Some(qdrant) = &self.qdrant
451 && self.provider.supports_embeddings()
452 {
453 let query_vector = self.provider.embed(query).await?;
454 let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
455 qdrant.ensure_collection(vector_size).await?;
456 qdrant.search(&query_vector, limit * 2, filter).await?
457 } else {
458 Vec::new()
459 };
460
461 self.recall_merge_and_rank(keyword_results, vector_results, limit)
462 .await
463 }
464
465 pub(super) async fn recall_fts5_raw(
466 &self,
467 query: &str,
468 limit: usize,
469 conversation_id: Option<ConversationId>,
470 ) -> Result<Vec<(MessageId, f64)>, MemoryError> {
471 self.sqlite
472 .keyword_search(query, limit * 2, conversation_id)
473 .await
474 }
475
476 pub(super) async fn recall_vectors_raw(
477 &self,
478 query: &str,
479 limit: usize,
480 filter: Option<SearchFilter>,
481 ) -> Result<Vec<crate::embedding_store::SearchResult>, MemoryError> {
482 let Some(qdrant) = &self.qdrant else {
483 return Ok(Vec::new());
484 };
485 if !self.provider.supports_embeddings() {
486 return Ok(Vec::new());
487 }
488 let query_vector = self.provider.embed(query).await?;
489 let vector_size = u64::try_from(query_vector.len()).unwrap_or(896);
490 qdrant.ensure_collection(vector_size).await?;
491 qdrant.search(&query_vector, limit * 2, filter).await
492 }
493
494 #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
503 pub(super) async fn recall_merge_and_rank(
504 &self,
505 keyword_results: Vec<(MessageId, f64)>,
506 vector_results: Vec<crate::embedding_store::SearchResult>,
507 limit: usize,
508 ) -> Result<Vec<RecalledMessage>, MemoryError> {
509 tracing::debug!(
510 vector_count = vector_results.len(),
511 keyword_count = keyword_results.len(),
512 limit,
513 "recall: merging search results"
514 );
515
516 let mut scores: std::collections::HashMap<MessageId, f64> =
517 std::collections::HashMap::new();
518
519 if !vector_results.is_empty() {
520 let max_vs = vector_results
521 .iter()
522 .map(|r| r.score)
523 .fold(f32::NEG_INFINITY, f32::max);
524 let norm = if max_vs > 0.0 { max_vs } else { 1.0 };
525 for r in &vector_results {
526 let normalized = f64::from(r.score / norm);
527 *scores.entry(r.message_id).or_default() += normalized * self.vector_weight;
528 }
529 }
530
531 if !keyword_results.is_empty() {
532 let max_ks = keyword_results
533 .iter()
534 .map(|r| r.1)
535 .fold(f64::NEG_INFINITY, f64::max);
536 let norm = if max_ks > 0.0 { max_ks } else { 1.0 };
537 for &(msg_id, score) in &keyword_results {
538 let normalized = score / norm;
539 *scores.entry(msg_id).or_default() += normalized * self.keyword_weight;
540 }
541 }
542
543 if scores.is_empty() {
544 tracing::debug!("recall: empty merge, no overlapping scores");
545 return Ok(Vec::new());
546 }
547
548 let mut ranked: Vec<(MessageId, f64)> = scores.into_iter().collect();
549 ranked.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
550
551 tracing::debug!(
552 merged = ranked.len(),
553 top_score = ranked.first().map(|r| r.1),
554 bottom_score = ranked.last().map(|r| r.1),
555 vector_weight = %self.vector_weight,
556 keyword_weight = %self.keyword_weight,
557 "recall: weighted merge complete"
558 );
559
560 if self.temporal_decay_enabled && self.temporal_decay_half_life_days > 0 {
561 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
562 match self.sqlite.message_timestamps(&ids).await {
563 Ok(timestamps) => {
564 apply_temporal_decay(
565 &mut ranked,
566 ×tamps,
567 self.temporal_decay_half_life_days,
568 );
569 ranked
570 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
571 tracing::debug!(
572 half_life_days = self.temporal_decay_half_life_days,
573 top_score_after = ranked.first().map(|r| r.1),
574 "recall: temporal decay applied"
575 );
576 }
577 Err(e) => {
578 tracing::warn!("temporal decay: failed to fetch timestamps: {e:#}");
579 }
580 }
581 }
582
583 if self.mmr_enabled && !vector_results.is_empty() {
584 if let Some(qdrant) = &self.qdrant {
585 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
586 match qdrant.get_vectors(&ids).await {
587 Ok(vec_map) if !vec_map.is_empty() => {
588 let ranked_len_before = ranked.len();
589 ranked = apply_mmr(&ranked, &vec_map, self.mmr_lambda, limit);
590 tracing::debug!(
591 before = ranked_len_before,
592 after = ranked.len(),
593 lambda = %self.mmr_lambda,
594 "recall: mmr re-ranked"
595 );
596 }
597 Ok(_) => {
598 ranked.truncate(limit);
599 }
600 Err(e) => {
601 tracing::warn!("MMR: failed to fetch vectors: {e:#}");
602 ranked.truncate(limit);
603 }
604 }
605 } else {
606 ranked.truncate(limit);
607 }
608 } else {
609 ranked.truncate(limit);
610 }
611
612 if self.importance_enabled && !ranked.is_empty() {
613 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
614 match self.sqlite.fetch_importance_scores(&ids).await {
615 Ok(scores) => {
616 for (msg_id, score) in &mut ranked {
617 if let Some(&imp) = scores.get(msg_id) {
618 *score += imp * self.importance_weight;
619 }
620 }
621 ranked
622 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
623 tracing::debug!(
624 importance_weight = %self.importance_weight,
625 "recall: importance scores blended"
626 );
627 }
628 Err(e) => {
629 tracing::warn!("importance scoring: failed to fetch scores: {e:#}");
630 }
631 }
632 }
633
634 if (self.tier_boost_semantic - 1.0).abs() > f64::EPSILON && !ranked.is_empty() {
638 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
639 match self.sqlite.fetch_tiers(&ids).await {
640 Ok(tiers) => {
641 let bonus = self.tier_boost_semantic - 1.0;
642 let mut boosted = false;
643 for (msg_id, score) in &mut ranked {
644 if tiers.get(msg_id).map(String::as_str) == Some("semantic") {
645 *score += bonus;
646 boosted = true;
647 }
648 }
649 if boosted {
650 ranked.sort_by(|a, b| {
651 b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal)
652 });
653 tracing::debug!(
654 tier_boost = %self.tier_boost_semantic,
655 "recall: semantic tier boost applied"
656 );
657 }
658 }
659 Err(e) => {
660 tracing::warn!("tier boost: failed to fetch tiers: {e:#}");
661 }
662 }
663 }
664
665 let ids: Vec<MessageId> = ranked.iter().map(|r| r.0).collect();
666
667 if !ids.is_empty()
668 && let Err(e) = self.batch_increment_access_count(ids.clone()).await
669 {
670 tracing::warn!("recall: failed to increment access counts: {e:#}");
671 }
672
673 if let Err(e) = self.sqlite.mark_training_recalled(&ids).await {
675 tracing::debug!(
676 error = %e,
677 "recall: failed to mark training data as recalled (non-fatal)"
678 );
679 }
680
681 let messages = self.sqlite.messages_by_ids(&ids).await?;
682 let msg_map: std::collections::HashMap<MessageId, _> = messages.into_iter().collect();
683
684 let recalled: Vec<RecalledMessage> = ranked
685 .iter()
686 .filter_map(|(msg_id, score)| {
687 msg_map.get(msg_id).map(|msg| RecalledMessage {
688 message: msg.clone(),
689 #[expect(clippy::cast_possible_truncation)]
690 score: *score as f32,
691 })
692 })
693 .collect();
694
695 tracing::debug!(final_count = recalled.len(), "recall: final results");
696
697 Ok(recalled)
698 }
699
700 pub async fn recall_routed(
709 &self,
710 query: &str,
711 limit: usize,
712 filter: Option<SearchFilter>,
713 router: &dyn crate::router::MemoryRouter,
714 ) -> Result<Vec<RecalledMessage>, MemoryError> {
715 use crate::router::MemoryRoute;
716
717 let route = router.route(query);
718 tracing::debug!(?route, query_len = query.len(), "memory routing decision");
719
720 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
721
722 let (keyword_results, vector_results): (
723 Vec<(MessageId, f64)>,
724 Vec<crate::embedding_store::SearchResult>,
725 ) = match route {
726 MemoryRoute::Keyword => {
727 let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
728 (kw, Vec::new())
729 }
730 MemoryRoute::Semantic => {
731 let vr = self.recall_vectors_raw(query, limit, filter).await?;
732 (Vec::new(), vr)
733 }
734 MemoryRoute::Hybrid => {
735 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
736 Ok(r) => r,
737 Err(e) => {
738 tracing::warn!("FTS5 keyword search failed: {e:#}");
739 Vec::new()
740 }
741 };
742 let vr = self.recall_vectors_raw(query, limit, filter).await?;
743 (kw, vr)
744 }
745 MemoryRoute::Episodic => {
754 let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
755 let cleaned = crate::router::strip_temporal_keywords(query);
756 let search_query = if cleaned.is_empty() { query } else { &cleaned };
757 let kw = if let Some(ref r) = range {
758 self.sqlite
759 .keyword_search_with_time_range(
760 search_query,
761 limit,
762 conversation_id,
763 r.after.as_deref(),
764 r.before.as_deref(),
765 )
766 .await?
767 } else {
768 self.recall_fts5_raw(search_query, limit, conversation_id)
769 .await?
770 };
771 tracing::debug!(
772 has_range = range.is_some(),
773 cleaned_query = %search_query,
774 keyword_count = kw.len(),
775 "recall: episodic path"
776 );
777 (kw, Vec::new())
778 }
779 MemoryRoute::Graph => {
782 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
783 Ok(r) => r,
784 Err(e) => {
785 tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
786 Vec::new()
787 }
788 };
789 let vr = self.recall_vectors_raw(query, limit, filter).await?;
790 (kw, vr)
791 }
792 };
793
794 tracing::debug!(
795 keyword_count = keyword_results.len(),
796 vector_count = vector_results.len(),
797 "recall: routed search results"
798 );
799
800 self.recall_merge_and_rank(keyword_results, vector_results, limit)
801 .await
802 }
803
804 pub async fn recall_routed_async(
815 &self,
816 query: &str,
817 limit: usize,
818 filter: Option<crate::embedding_store::SearchFilter>,
819 router: &dyn crate::router::AsyncMemoryRouter,
820 ) -> Result<Vec<RecalledMessage>, MemoryError> {
821 use crate::router::MemoryRoute;
822
823 let decision = router.route_async(query).await;
824 let route = decision.route;
825 tracing::debug!(
826 ?route,
827 confidence = decision.confidence,
828 query_len = query.len(),
829 "memory routing decision (async)"
830 );
831
832 let conversation_id = filter.as_ref().and_then(|f| f.conversation_id);
833
834 let (keyword_results, vector_results): (
835 Vec<(crate::types::MessageId, f64)>,
836 Vec<crate::embedding_store::SearchResult>,
837 ) = match route {
838 MemoryRoute::Keyword => {
839 let kw = self.recall_fts5_raw(query, limit, conversation_id).await?;
840 (kw, Vec::new())
841 }
842 MemoryRoute::Semantic => {
843 let vr = self.recall_vectors_raw(query, limit, filter).await?;
844 (Vec::new(), vr)
845 }
846 MemoryRoute::Hybrid => {
847 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
848 Ok(r) => r,
849 Err(e) => {
850 tracing::warn!("FTS5 keyword search failed: {e:#}");
851 Vec::new()
852 }
853 };
854 let vr = self.recall_vectors_raw(query, limit, filter).await?;
855 (kw, vr)
856 }
857 MemoryRoute::Episodic => {
858 let range = crate::router::resolve_temporal_range(query, chrono::Utc::now());
859 let cleaned = crate::router::strip_temporal_keywords(query);
860 let search_query = if cleaned.is_empty() { query } else { &cleaned };
861 let kw = if let Some(ref r) = range {
862 self.sqlite
863 .keyword_search_with_time_range(
864 search_query,
865 limit,
866 conversation_id,
867 r.after.as_deref(),
868 r.before.as_deref(),
869 )
870 .await?
871 } else {
872 self.recall_fts5_raw(search_query, limit, conversation_id)
873 .await?
874 };
875 (kw, Vec::new())
876 }
877 MemoryRoute::Graph => {
878 let kw = match self.recall_fts5_raw(query, limit, conversation_id).await {
879 Ok(r) => r,
880 Err(e) => {
881 tracing::warn!("FTS5 keyword search failed (graph→hybrid fallback): {e:#}");
882 Vec::new()
883 }
884 };
885 let vr = self.recall_vectors_raw(query, limit, filter).await?;
886 (kw, vr)
887 }
888 };
889
890 tracing::debug!(
891 keyword_count = keyword_results.len(),
892 vector_count = vector_results.len(),
893 "recall: routed search results (async)"
894 );
895
896 self.recall_merge_and_rank(keyword_results, vector_results, limit)
897 .await
898 }
899
900 pub async fn recall_graph(
914 &self,
915 query: &str,
916 limit: usize,
917 max_hops: u32,
918 at_timestamp: Option<&str>,
919 temporal_decay_rate: f64,
920 edge_types: &[crate::graph::EdgeType],
921 ) -> Result<Vec<crate::graph::types::GraphFact>, MemoryError> {
922 let Some(store) = &self.graph_store else {
923 return Ok(Vec::new());
924 };
925
926 tracing::debug!(
927 query_len = query.len(),
928 limit,
929 max_hops,
930 "graph: starting recall"
931 );
932
933 let results = crate::graph::retrieval::graph_recall(
934 store,
935 self.qdrant.as_deref(),
936 &self.provider,
937 query,
938 limit,
939 max_hops,
940 at_timestamp,
941 temporal_decay_rate,
942 edge_types,
943 )
944 .await?;
945
946 tracing::debug!(result_count = results.len(), "graph: recall complete");
947
948 Ok(results)
949 }
950
951 pub async fn recall_graph_activated(
960 &self,
961 query: &str,
962 limit: usize,
963 params: crate::graph::SpreadingActivationParams,
964 edge_types: &[crate::graph::EdgeType],
965 ) -> Result<Vec<crate::graph::activation::ActivatedFact>, MemoryError> {
966 let Some(store) = &self.graph_store else {
967 return Ok(Vec::new());
968 };
969
970 tracing::debug!(
971 query_len = query.len(),
972 limit,
973 "spreading activation: starting graph recall"
974 );
975
976 let embeddings = self.qdrant.as_deref();
977 let results = crate::graph::retrieval::graph_recall_activated(
978 store,
979 embeddings,
980 &self.provider,
981 query,
982 limit,
983 params,
984 edge_types,
985 )
986 .await?;
987
988 tracing::debug!(
989 result_count = results.len(),
990 "spreading activation: graph recall complete"
991 );
992
993 Ok(results)
994 }
995
996 async fn batch_increment_access_count(
1004 &self,
1005 message_ids: Vec<MessageId>,
1006 ) -> Result<(), MemoryError> {
1007 if message_ids.is_empty() {
1008 return Ok(());
1009 }
1010 self.sqlite.increment_access_counts(&message_ids).await
1011 }
1012
1013 pub async fn has_embedding(&self, message_id: MessageId) -> Result<bool, MemoryError> {
1019 match &self.qdrant {
1020 Some(qdrant) => qdrant.has_embedding(message_id).await,
1021 None => Ok(false),
1022 }
1023 }
1024
1025 pub async fn embed_missing(&self) -> Result<usize, MemoryError> {
1034 let Some(qdrant) = &self.qdrant else {
1035 return Ok(0);
1036 };
1037 if !self.provider.supports_embeddings() {
1038 return Ok(0);
1039 }
1040
1041 let unembedded = self.sqlite.unembedded_message_ids(Some(1000)).await?;
1042
1043 if unembedded.is_empty() {
1044 return Ok(0);
1045 }
1046
1047 let probe = self.provider.embed("probe").await?;
1048 let vector_size = u64::try_from(probe.len())?;
1049 qdrant.ensure_collection(vector_size).await?;
1050
1051 let mut count = 0;
1052 for (msg_id, conversation_id, role, content) in &unembedded {
1053 let chunks = chunk_text(content);
1054 let chunk_count = chunks.len();
1055 let mut stored = 0usize;
1056
1057 for (chunk_index, chunk) in chunks.into_iter().enumerate() {
1058 let chunk_index_u32 = u32::try_from(chunk_index).unwrap_or(u32::MAX);
1059 match self.provider.embed(chunk).await {
1060 Ok(vector) => {
1061 if let Err(e) = qdrant
1062 .store(
1063 *msg_id,
1064 *conversation_id,
1065 role,
1066 vector,
1067 MessageKind::Regular,
1068 &self.embedding_model,
1069 chunk_index_u32,
1070 )
1071 .await
1072 {
1073 tracing::warn!(
1074 "Failed to store chunk {chunk_index}/{chunk_count} \
1075 for msg {msg_id}: {e:#}"
1076 );
1077 } else {
1078 stored += 1;
1079 }
1080 }
1081 Err(e) => {
1082 tracing::warn!(
1083 "Failed to embed chunk {chunk_index}/{chunk_count} \
1084 for msg {msg_id}: {e:#}"
1085 );
1086 }
1087 }
1088 }
1089
1090 if stored > 0 {
1091 count += 1;
1092 }
1093 }
1094
1095 tracing::info!("Embedded {count}/{} missing messages", unembedded.len());
1096 Ok(count)
1097 }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102 use super::*;
1103
1104 #[test]
1105 fn embed_context_default_all_none() {
1106 let ctx = EmbedContext::default();
1107 assert!(ctx.tool_name.is_none());
1108 assert!(ctx.exit_code.is_none());
1109 assert!(ctx.timestamp.is_none());
1110 }
1111
1112 #[test]
1113 fn embed_context_fields_set_correctly() {
1114 let ctx = EmbedContext {
1115 tool_name: Some("shell".to_string()),
1116 exit_code: Some(0),
1117 timestamp: Some("2026-04-04T00:00:00Z".to_string()),
1118 };
1119 assert_eq!(ctx.tool_name.as_deref(), Some("shell"));
1120 assert_eq!(ctx.exit_code, Some(0));
1121 assert_eq!(ctx.timestamp.as_deref(), Some("2026-04-04T00:00:00Z"));
1122 }
1123
1124 #[test]
1125 fn embed_context_non_zero_exit_code() {
1126 let ctx = EmbedContext {
1127 tool_name: Some("shell".to_string()),
1128 exit_code: Some(1),
1129 timestamp: None,
1130 };
1131 assert_eq!(ctx.exit_code, Some(1));
1132 assert!(ctx.timestamp.is_none());
1133 }
1134}