1use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::types::{
10 CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
11 KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
12 LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
13 MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTier, NodeType,
14 StoreMessageRequest, TreeNode,
15};
16use chrono::Utc;
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tandem_orchestrator::{
21 build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
22 KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
23 KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
24};
25use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
26use tokio::sync::Mutex;
27
28pub struct MemoryManager {
30 db: Arc<MemoryDatabase>,
31 embedding_service: Arc<Mutex<EmbeddingService>>,
32 tokenizer: Tokenizer,
33}
34
35const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
36const GUIDE_DOC_SOURCE_PREFIX: &str = "guide_docs:";
37const GUIDE_DOC_RECENCY_HALFLIFE_MS: f64 = 30.0 * 24.0 * 60.0 * 60.0 * 1000.0;
38const GUIDE_DOC_RECENCY_WEIGHT: f64 = 0.12;
39
40impl MemoryManager {
41 fn guide_doc_similarity(similarity: f64, chunk: &MemoryChunk, now_ms: i64) -> f64 {
42 if !chunk.source.starts_with(GUIDE_DOC_SOURCE_PREFIX) {
43 return similarity.clamp(0.0, 1.0);
44 }
45
46 let normalized_similarity = similarity.clamp(0.0, 1.0);
47 let source_mtime = chunk
48 .source_mtime
49 .filter(|value| *value > 0)
50 .unwrap_or_else(|| chunk.created_at.timestamp_millis());
51 let age_ms = (now_ms - source_mtime).max(0) as f64;
52 let recency_score = 1.0 / (1.0 + (age_ms / GUIDE_DOC_RECENCY_HALFLIFE_MS));
53 ((1.0 - GUIDE_DOC_RECENCY_WEIGHT) * normalized_similarity
54 + (GUIDE_DOC_RECENCY_WEIGHT * recency_score))
55 .clamp(0.0, 1.0)
56 }
57
58 fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
59 err.to_string()
60 .to_lowercase()
61 .contains("database disk image is malformed")
62 }
63
64 pub fn db(&self) -> &Arc<MemoryDatabase> {
65 &self.db
66 }
67
68 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
70 let db = Arc::new(MemoryDatabase::new(db_path).await?);
71 let embedding_service = Arc::new(Mutex::new(EmbeddingService::new()));
72 let tokenizer = Tokenizer::new()?;
73
74 Ok(Self {
75 db,
76 embedding_service,
77 tokenizer,
78 })
79 }
80
81 pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
88 if self
89 .db
90 .ensure_vector_tables_healthy()
91 .await
92 .unwrap_or(false)
93 {
94 tracing::warn!("Memory vector tables were repaired before storing message chunks");
95 }
96
97 let config = if let Some(ref pid) = request.project_id {
98 self.db.get_or_create_config(pid).await?
99 } else {
100 MemoryConfig::default()
101 };
102
103 let chunking_config = ChunkingConfig {
105 chunk_size: config.chunk_size as usize,
106 chunk_overlap: config.chunk_overlap as usize,
107 separator: None,
108 };
109
110 let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
111
112 if text_chunks.is_empty() {
113 return Ok(Vec::new());
114 }
115
116 let mut chunk_ids = Vec::with_capacity(text_chunks.len());
117 let embedding_service = self.embedding_service.lock().await;
118
119 for text_chunk in text_chunks {
120 let chunk_id = uuid::Uuid::new_v4().to_string();
121
122 let embedding = embedding_service.embed(&text_chunk.content).await?;
124
125 let chunk = MemoryChunk {
127 id: chunk_id.clone(),
128 content: text_chunk.content,
129 tier: request.tier,
130 session_id: request.session_id.clone(),
131 project_id: request.project_id.clone(),
132 source: request.source.clone(),
133 source_path: request.source_path.clone(),
134 source_mtime: request.source_mtime,
135 source_size: request.source_size,
136 source_hash: request.source_hash.clone(),
137 created_at: Utc::now(),
138 token_count: text_chunk.token_count as i64,
139 metadata: request.metadata.clone(),
140 };
141
142 if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
144 tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
145 let repaired = {
146 let repaired_after_error =
147 self.db.try_repair_after_error(&err).await.unwrap_or(false);
148 repaired_after_error
149 || self
150 .db
151 .ensure_vector_tables_healthy()
152 .await
153 .unwrap_or(false)
154 };
155 if repaired {
156 tracing::warn!(
157 "Retrying memory chunk insert after vector table repair: {}",
158 chunk.id
159 );
160 if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
161 if Self::is_malformed_database_error(&retry_err) {
162 tracing::warn!(
163 "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
164 chunk.id
165 );
166 self.db.reset_all_memory_tables().await?;
167 self.db.store_chunk(&chunk, &embedding).await?;
168 } else {
169 return Err(retry_err);
170 }
171 }
172 } else {
173 return Err(err);
174 }
175 }
176 chunk_ids.push(chunk_id);
177 }
178
179 if config.auto_cleanup {
181 self.maybe_cleanup(&request.project_id).await?;
182 }
183
184 Ok(chunk_ids)
185 }
186
187 pub async fn search(
189 &self,
190 query: &str,
191 tier: Option<MemoryTier>,
192 project_id: Option<&str>,
193 session_id: Option<&str>,
194 limit: Option<i64>,
195 ) -> MemoryResult<Vec<MemorySearchResult>> {
196 let effective_limit = limit.unwrap_or(5);
197
198 let embedding_service = self.embedding_service.lock().await;
200 let query_embedding = embedding_service.embed(query).await?;
201 drop(embedding_service);
202
203 let mut results = Vec::new();
204
205 let tiers_to_search = match tier {
207 Some(t) => vec![t],
208 None => {
209 if project_id.is_some() {
210 vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
211 } else {
212 vec![MemoryTier::Session, MemoryTier::Global]
213 }
214 }
215 };
216
217 let now_ms = Utc::now().timestamp_millis();
218 for search_tier in tiers_to_search {
219 let tier_results = match self
220 .db
221 .search_similar(
222 &query_embedding,
223 search_tier,
224 project_id,
225 session_id,
226 effective_limit,
227 )
228 .await
229 {
230 Ok(results) => results,
231 Err(err) => {
232 tracing::warn!(
233 "Memory tier search failed for {:?}: {}. Attempting vector repair.",
234 search_tier,
235 err
236 );
237 let repaired = {
238 let repaired_after_error =
239 self.db.try_repair_after_error(&err).await.unwrap_or(false);
240 repaired_after_error
241 || self
242 .db
243 .ensure_vector_tables_healthy()
244 .await
245 .unwrap_or(false)
246 };
247 if repaired {
248 match self
249 .db
250 .search_similar(
251 &query_embedding,
252 search_tier,
253 project_id,
254 session_id,
255 effective_limit,
256 )
257 .await
258 {
259 Ok(results) => results,
260 Err(retry_err) => {
261 tracing::warn!(
262 "Memory tier search still failing for {:?} after repair: {}",
263 search_tier,
264 retry_err
265 );
266 continue;
267 }
268 }
269 } else {
270 continue;
271 }
272 }
273 };
274
275 for (chunk, distance) in tier_results {
276 let similarity = 1.0 - distance.clamp(0.0, 1.0);
280 let similarity = if search_tier == MemoryTier::Global {
281 Self::guide_doc_similarity(similarity, &chunk, now_ms)
282 } else {
283 similarity
284 };
285
286 results.push(MemorySearchResult { chunk, similarity });
287 }
288 }
289
290 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
292 results.truncate(effective_limit as usize);
293
294 Ok(results)
295 }
296
297 pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
298 self.db.upsert_knowledge_space(space).await
299 }
300
301 pub async fn get_knowledge_space(
302 &self,
303 id: &str,
304 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
305 self.db.get_knowledge_space(id).await
306 }
307
308 pub async fn list_knowledge_spaces(
309 &self,
310 project_id: Option<&str>,
311 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
312 self.db.list_knowledge_spaces(project_id).await
313 }
314
315 pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
316 self.db.upsert_knowledge_item(item).await
317 }
318
319 pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
320 self.db.get_knowledge_item(id).await
321 }
322
323 pub async fn list_knowledge_items(
324 &self,
325 space_id: &str,
326 coverage_key: Option<&str>,
327 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
328 self.db.list_knowledge_items(space_id, coverage_key).await
329 }
330
331 pub async fn upsert_knowledge_coverage(
332 &self,
333 coverage: &KnowledgeCoverageRecord,
334 ) -> MemoryResult<()> {
335 self.db.upsert_knowledge_coverage(coverage).await
336 }
337
338 pub async fn get_knowledge_coverage(
339 &self,
340 coverage_key: &str,
341 space_id: &str,
342 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
343 self.db.get_knowledge_coverage(coverage_key, space_id).await
344 }
345
346 pub async fn promote_knowledge_item(
347 &self,
348 request: &KnowledgePromotionRequest,
349 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
350 self.db.promote_knowledge_item(request).await
351 }
352
353 fn space_matches_ref(
354 space: &KnowledgeSpaceRecord,
355 space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
356 project_id: &str,
357 ) -> bool {
358 if space.scope != space_ref.scope {
359 return false;
360 }
361 match space_ref.scope {
362 KnowledgeScope::Project | KnowledgeScope::Run => {
363 let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
364 if space.project_id.as_deref() != Some(target_project) {
365 return false;
366 }
367 }
368 KnowledgeScope::Global => {}
369 }
370 if let Some(namespace) = space_ref.namespace.as_deref() {
371 if space.namespace.as_deref() != Some(namespace) {
372 return false;
373 }
374 }
375 true
376 }
377
378 fn select_preflight_namespace(
379 binding: &KnowledgeBinding,
380 spaces: &[KnowledgeSpaceRecord],
381 ) -> Option<String> {
382 if let Some(namespace) = binding.namespace.clone() {
383 return Some(namespace);
384 }
385 if binding.read_spaces.len() == 1 {
386 if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
387 return Some(namespace);
388 }
389 }
390 if spaces.len() == 1 {
391 return spaces[0].namespace.clone();
392 }
393 let mut unique = HashSet::new();
394 for space in spaces {
395 if let Some(namespace) = space.namespace.as_ref() {
396 unique.insert(namespace);
397 }
398 }
399 if unique.len() == 1 {
400 unique.into_iter().next().map(|value| value.to_string())
401 } else {
402 None
403 }
404 }
405
406 fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
407 !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
408 }
409
410 fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
411 match (space_namespace, binding_namespace) {
412 (None, None) => true,
413 (Some(space), Some(binding)) => {
414 normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
415 }
416 _ => false,
417 }
418 }
419
420 fn is_fresh_enough(
421 freshness_expires_at_ms: Option<u64>,
422 freshness_policy_ms: Option<u64>,
423 coverage_last_promoted_at_ms: Option<u64>,
424 item_created_at_ms: u64,
425 now_ms: u64,
426 ) -> bool {
427 if let Some(expires_at_ms) = freshness_expires_at_ms {
428 return expires_at_ms > now_ms;
429 }
430 let Some(policy_ms) = freshness_policy_ms else {
431 return true;
432 };
433 let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
434 now_ms.saturating_sub(basis_ms) <= policy_ms
435 }
436
437 async fn resolve_preflight_spaces(
438 &self,
439 request: &KnowledgePreflightRequest,
440 _coverage_key: &str,
441 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
442 let binding = &request.binding;
443 let mut spaces = Vec::new();
444 let mut seen_space_ids = HashSet::new();
445
446 let push_space = |space: KnowledgeSpaceRecord,
447 spaces: &mut Vec<KnowledgeSpaceRecord>,
448 seen_space_ids: &mut HashSet<String>| {
449 if seen_space_ids.insert(space.id.clone()) {
450 spaces.push(space);
451 }
452 };
453
454 if Self::binding_uses_explicit_spaces(binding) {
455 for space_ref in binding
456 .read_spaces
457 .iter()
458 .chain(binding.promote_spaces.iter())
459 {
460 if let Some(space_id) = space_ref.space_id.as_deref() {
461 if let Some(space) = self.get_knowledge_space(space_id).await? {
462 push_space(space, &mut spaces, &mut seen_space_ids);
463 }
464 continue;
465 }
466
467 match space_ref.scope {
468 KnowledgeScope::Run => {}
469 KnowledgeScope::Project => {
470 let candidate_project_id = space_ref
471 .project_id
472 .as_deref()
473 .unwrap_or(&request.project_id);
474 let project_spaces = self
475 .list_knowledge_spaces(Some(candidate_project_id))
476 .await?;
477 for space in project_spaces.into_iter().filter(|space| {
478 Self::space_matches_ref(space, space_ref, &request.project_id)
479 }) {
480 push_space(space, &mut spaces, &mut seen_space_ids);
481 }
482 }
483 KnowledgeScope::Global => {
484 let global_spaces = self.list_knowledge_spaces(None).await?;
485 for space in global_spaces.into_iter().filter(|space| {
486 Self::space_matches_ref(space, space_ref, &request.project_id)
487 }) {
488 push_space(space, &mut spaces, &mut seen_space_ids);
489 }
490 }
491 }
492 }
493 return Ok(spaces);
494 }
495
496 if request.project_id.trim().is_empty() {
497 return Ok(spaces);
498 }
499
500 let project_spaces = self
501 .list_knowledge_spaces(Some(&request.project_id))
502 .await?;
503 let requested_namespace = if binding.namespace.is_some() {
504 binding.namespace.clone()
505 } else {
506 Self::select_preflight_namespace(binding, &project_spaces)
507 };
508 let Some(requested_namespace) = requested_namespace else {
509 return Ok(spaces);
510 };
511
512 for space in project_spaces.into_iter().filter(|space| {
513 space.scope == KnowledgeScope::Project
514 && Self::namespace_matches(
515 space.namespace.as_deref(),
516 Some(requested_namespace.as_str()),
517 )
518 }) {
519 push_space(space, &mut spaces, &mut seen_space_ids);
520 }
521 Ok(spaces)
522 }
523
524 pub async fn preflight_knowledge(
525 &self,
526 request: &KnowledgePreflightRequest,
527 ) -> MemoryResult<KnowledgePreflightResult> {
528 let binding = &request.binding;
529 let project_spaces = if request.project_id.trim().is_empty() {
530 Vec::new()
531 } else {
532 self.list_knowledge_spaces(Some(&request.project_id))
533 .await?
534 };
535 let namespace = binding
536 .namespace
537 .clone()
538 .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
539 let coverage_key = build_knowledge_coverage_key(
540 &request.project_id,
541 namespace.as_deref(),
542 &request.task_family,
543 &request.subject,
544 );
545
546 if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
547 return Ok(KnowledgePreflightResult {
548 project_id: request.project_id.clone(),
549 namespace,
550 task_family: request.task_family.clone(),
551 subject: request.subject.clone(),
552 coverage_key,
553 decision: KnowledgeReuseDecision::Disabled,
554 reuse_reason: None,
555 skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
556 freshness_reason: None,
557 items: Vec::new(),
558 });
559 }
560
561 let spaces = self
562 .resolve_preflight_spaces(request, &coverage_key)
563 .await?;
564 if spaces.is_empty() {
565 return Ok(KnowledgePreflightResult {
566 project_id: request.project_id.clone(),
567 namespace,
568 task_family: request.task_family.clone(),
569 subject: request.subject.clone(),
570 coverage_key,
571 decision: KnowledgeReuseDecision::NoPriorKnowledge,
572 reuse_reason: None,
573 skip_reason: Some("no reusable knowledge spaces were found".to_string()),
574 freshness_reason: None,
575 items: Vec::new(),
576 });
577 }
578
579 let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
580 let mut fresh_items = Vec::new();
581 let mut stale_items = Vec::new();
582 let mut freshest_reason = None;
583
584 for space in &spaces {
585 let items = self
586 .list_knowledge_items(&space.id, Some(&coverage_key))
587 .await?;
588 let coverage = self
589 .get_knowledge_coverage(&coverage_key, &space.id)
590 .await?;
591 for item in items {
592 if !item.status.is_active() {
593 continue;
594 }
595 let Some(trust_level) = item.status.as_trust_level() else {
596 continue;
597 };
598 if !trust_level.meets_floor(binding.trust_floor) {
599 continue;
600 }
601 let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
602 coverage
603 .as_ref()
604 .and_then(|coverage| coverage.freshness_expires_at_ms)
605 });
606 let pack_item = KnowledgePackItem {
607 item_id: item.id.clone(),
608 space_id: space.id.clone(),
609 coverage_key: item.coverage_key.clone(),
610 title: item.title.clone(),
611 summary: item.summary.clone(),
612 trust_level,
613 status: item.status.to_string(),
614 artifact_refs: item.artifact_refs.clone(),
615 source_memory_ids: item.source_memory_ids.clone(),
616 freshness_expires_at_ms,
617 };
618 if Self::is_fresh_enough(
619 freshness_expires_at_ms,
620 binding.freshness_ms,
621 coverage
622 .as_ref()
623 .and_then(|coverage| coverage.last_promoted_at_ms),
624 item.created_at_ms,
625 now_ms,
626 ) {
627 fresh_items.push(pack_item);
628 } else {
629 freshest_reason = Some(match freshness_expires_at_ms {
630 Some(expires_at_ms) => format!(
631 "coverage `{}` in space `{}` expired at {}",
632 coverage_key, space.id, expires_at_ms
633 ),
634 None => format!(
635 "coverage `{}` in space `{}` lacks freshness metadata",
636 coverage_key, space.id
637 ),
638 });
639 stale_items.push(pack_item);
640 }
641 }
642 }
643
644 fresh_items.sort_by(|left, right| {
645 right
646 .trust_level
647 .rank()
648 .cmp(&left.trust_level.rank())
649 .then_with(|| {
650 right
651 .freshness_expires_at_ms
652 .unwrap_or(0)
653 .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
654 })
655 .then_with(|| left.title.cmp(&right.title))
656 });
657 stale_items.sort_by(|left, right| {
658 right
659 .trust_level
660 .rank()
661 .cmp(&left.trust_level.rank())
662 .then_with(|| left.title.cmp(&right.title))
663 });
664
665 if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
666 let selected = fresh_items
667 .into_iter()
668 .take(MAX_KNOWLEDGE_PACK_ITEMS)
669 .collect::<Vec<_>>();
670 let decision = match freshest_trust_level {
671 KnowledgeTrustLevel::ApprovedDefault => {
672 KnowledgeReuseDecision::ReuseApprovedDefault
673 }
674 _ => KnowledgeReuseDecision::ReusePromoted,
675 };
676 let selected_count = selected.len();
677 return Ok(KnowledgePreflightResult {
678 project_id: request.project_id.clone(),
679 namespace,
680 task_family: request.task_family.clone(),
681 subject: request.subject.clone(),
682 coverage_key,
683 decision,
684 reuse_reason: Some(format!(
685 "reusing {} promoted knowledge item(s) from {} space(s)",
686 selected_count,
687 spaces.len()
688 )),
689 skip_reason: None,
690 freshness_reason: None,
691 items: selected,
692 });
693 }
694
695 if !stale_items.is_empty() {
696 let selected = stale_items
697 .into_iter()
698 .take(MAX_KNOWLEDGE_PACK_ITEMS)
699 .collect::<Vec<_>>();
700 return Ok(KnowledgePreflightResult {
701 project_id: request.project_id.clone(),
702 namespace,
703 task_family: request.task_family.clone(),
704 subject: request.subject.clone(),
705 coverage_key,
706 decision: KnowledgeReuseDecision::RefreshRequired,
707 reuse_reason: None,
708 skip_reason: Some(
709 "prior knowledge exists but is not fresh enough to reuse".to_string(),
710 ),
711 freshness_reason: freshest_reason.or_else(|| {
712 Some("matching knowledge exists but freshness policy rejected it".to_string())
713 }),
714 items: selected,
715 });
716 }
717
718 Ok(KnowledgePreflightResult {
719 project_id: request.project_id.clone(),
720 namespace,
721 task_family: request.task_family.clone(),
722 subject: request.subject.clone(),
723 coverage_key,
724 decision: KnowledgeReuseDecision::NoPriorKnowledge,
725 reuse_reason: None,
726 skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
727 freshness_reason: None,
728 items: Vec::new(),
729 })
730 }
731
732 pub async fn retrieve_context(
737 &self,
738 query: &str,
739 project_id: Option<&str>,
740 session_id: Option<&str>,
741 token_budget: Option<i64>,
742 ) -> MemoryResult<MemoryContext> {
743 let (context, _) = self
744 .retrieve_context_with_meta(query, project_id, session_id, token_budget)
745 .await?;
746 Ok(context)
747 }
748
749 pub async fn retrieve_context_with_meta(
751 &self,
752 query: &str,
753 project_id: Option<&str>,
754 session_id: Option<&str>,
755 token_budget: Option<i64>,
756 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
757 let config = if let Some(pid) = project_id {
758 self.db.get_or_create_config(pid).await?
759 } else {
760 MemoryConfig::default()
761 };
762 let budget = token_budget.unwrap_or(config.token_budget);
763 let retrieval_limit = config.retrieval_k.max(1);
764
765 let current_session = if let Some(sid) = session_id {
767 self.db.get_session_chunks(sid).await?
768 } else {
769 Vec::new()
770 };
771
772 let search_results = self
774 .search(query, None, project_id, session_id, Some(retrieval_limit))
775 .await?;
776
777 let mut score_min: Option<f64> = None;
778 let mut score_max: Option<f64> = None;
779 for result in &search_results {
780 score_min = Some(match score_min {
781 Some(current) => current.min(result.similarity),
782 None => result.similarity,
783 });
784 score_max = Some(match score_max {
785 Some(current) => current.max(result.similarity),
786 None => result.similarity,
787 });
788 }
789
790 let mut current_session = current_session;
791 let mut relevant_history = Vec::new();
792 let mut project_facts = Vec::new();
793
794 for result in search_results {
795 match result.chunk.tier {
796 MemoryTier::Project => {
797 project_facts.push(result.chunk);
798 }
799 MemoryTier::Global => {
800 project_facts.push(result.chunk);
801 }
802 MemoryTier::Session => {
803 if !current_session.iter().any(|c| c.id == result.chunk.id) {
805 relevant_history.push(result.chunk);
806 }
807 }
808 }
809 }
810
811 let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
813 total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
814 total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
815
816 if total_tokens > budget {
818 let excess = total_tokens - budget;
819 self.trim_context(
820 &mut current_session,
821 &mut relevant_history,
822 &mut project_facts,
823 excess,
824 )?;
825 total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
826 + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
827 + project_facts.iter().map(|c| c.token_count).sum::<i64>();
828 }
829
830 let context = MemoryContext {
831 current_session,
832 relevant_history,
833 project_facts,
834 total_tokens,
835 };
836 let chunks_total = context.current_session.len()
837 + context.relevant_history.len()
838 + context.project_facts.len();
839 let meta = MemoryRetrievalMeta {
840 used: chunks_total > 0,
841 chunks_total,
842 session_chunks: context.current_session.len(),
843 history_chunks: context.relevant_history.len(),
844 project_fact_chunks: context.project_facts.len(),
845 score_min,
846 score_max,
847 };
848
849 Ok((context, meta))
850 }
851
852 fn trim_context(
854 &self,
855 current_session: &mut Vec<MemoryChunk>,
856 relevant_history: &mut Vec<MemoryChunk>,
857 project_facts: &mut Vec<MemoryChunk>,
858 excess_tokens: i64,
859 ) -> MemoryResult<()> {
860 let mut tokens_to_remove = excess_tokens;
861
862 while tokens_to_remove > 0 && !relevant_history.is_empty() {
864 if let Some(chunk) = relevant_history.pop() {
865 tokens_to_remove -= chunk.token_count;
866 }
867 }
868
869 while tokens_to_remove > 0 && !project_facts.is_empty() {
871 if let Some(chunk) = project_facts.pop() {
872 tokens_to_remove -= chunk.token_count;
873 }
874 }
875
876 while tokens_to_remove > 0 && !current_session.is_empty() {
877 if let Some(chunk) = current_session.pop() {
878 tokens_to_remove -= chunk.token_count;
879 }
880 }
881
882 Ok(())
883 }
884
885 pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
887 let count = self.db.clear_session_memory(session_id).await?;
888
889 self.db
891 .log_cleanup(
892 "manual",
893 MemoryTier::Session,
894 None,
895 Some(session_id),
896 count as i64,
897 0,
898 )
899 .await?;
900
901 Ok(count)
902 }
903
904 pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
906 let count = self.db.clear_project_memory(project_id).await?;
907
908 self.db
910 .log_cleanup(
911 "manual",
912 MemoryTier::Project,
913 Some(project_id),
914 None,
915 count as i64,
916 0,
917 )
918 .await?;
919
920 Ok(count)
921 }
922
923 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
925 self.db.get_stats().await
926 }
927
928 pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
930 self.db.get_or_create_config(project_id).await
931 }
932
933 pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
935 self.db.update_config(project_id, config).await
936 }
937
938 pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
939 self.db.get_node_by_uri(uri).await
940 }
941
942 pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
943 let nodes = self.db.list_directory(uri).await?;
944 let directories: Vec<MemoryNode> = nodes
945 .iter()
946 .filter(|n| n.node_type == NodeType::Directory)
947 .cloned()
948 .collect();
949 let files: Vec<MemoryNode> = nodes
950 .iter()
951 .filter(|n| n.node_type == NodeType::File)
952 .cloned()
953 .collect();
954
955 Ok(DirectoryListing {
956 uri: uri.to_string(),
957 nodes,
958 total_children: directories.len() + files.len(),
959 directories,
960 files,
961 })
962 }
963
964 pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
965 self.db.get_children_tree(uri, max_depth).await
966 }
967
968 pub async fn create_context_node(
969 &self,
970 uri: &str,
971 node_type: NodeType,
972 metadata: Option<serde_json::Value>,
973 ) -> MemoryResult<String> {
974 let parsed_uri =
975 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
976 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
977 self.db
978 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
979 .await
980 }
981
982 pub async fn get_context_layer(
983 &self,
984 node_id: &str,
985 layer_type: LayerType,
986 ) -> MemoryResult<Option<MemoryLayer>> {
987 self.db.get_layer(node_id, layer_type).await
988 }
989
990 pub async fn store_content_with_layers(
991 &self,
992 uri: &str,
993 content: &str,
994 metadata: Option<serde_json::Value>,
995 ) -> MemoryResult<String> {
996 let parsed_uri =
997 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
998 let node_type = if parsed_uri
999 .last_segment()
1000 .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
1001 .unwrap_or(false)
1002 {
1003 NodeType::File
1004 } else {
1005 NodeType::Directory
1006 };
1007
1008 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1009 let node_id = self
1010 .db
1011 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1012 .await?;
1013
1014 let token_count = self.tokenizer.count_tokens(content) as i64;
1015 self.db
1016 .create_layer(&node_id, LayerType::L2, content, token_count, None)
1017 .await?;
1018
1019 Ok(node_id)
1020 }
1021
1022 pub async fn generate_layers_for_node(
1023 &self,
1024 node_id: &str,
1025 providers: &ProviderRegistry,
1026 ) -> MemoryResult<()> {
1027 let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1028 let l2_content = match l2_layer {
1029 Some(layer) => layer.content,
1030 None => return Ok(()),
1031 };
1032
1033 let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1034
1035 let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1036
1037 let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1038 let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1039
1040 if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1041 self.db
1042 .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1043 .await?;
1044 }
1045
1046 if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1047 self.db
1048 .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1049 .await?;
1050 }
1051
1052 Ok(())
1053 }
1054
1055 pub async fn get_layer_content(
1056 &self,
1057 node_id: &str,
1058 layer_type: LayerType,
1059 ) -> MemoryResult<Option<String>> {
1060 let layer = self.db.get_layer(node_id, layer_type).await?;
1061 Ok(layer.map(|l| l.content))
1062 }
1063
1064 pub async fn store_content_with_layers_auto(
1065 &self,
1066 uri: &str,
1067 content: &str,
1068 metadata: Option<serde_json::Value>,
1069 providers: Option<&ProviderRegistry>,
1070 ) -> MemoryResult<String> {
1071 let node_id = self
1072 .store_content_with_layers(uri, content, metadata)
1073 .await?;
1074
1075 if let Some(p) = providers {
1076 if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1077 tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1078 }
1079 }
1080
1081 Ok(node_id)
1082 }
1083
1084 pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1086 let mut total_cleaned = 0u64;
1087
1088 if let Some(pid) = project_id {
1089 let config = self.db.get_or_create_config(pid).await?;
1091
1092 if config.auto_cleanup {
1093 let cleaned = self
1095 .db
1096 .cleanup_old_sessions(config.session_retention_days)
1097 .await?;
1098 total_cleaned += cleaned;
1099
1100 if cleaned > 0 {
1101 self.db
1102 .log_cleanup(
1103 "auto",
1104 MemoryTier::Session,
1105 Some(pid),
1106 None,
1107 cleaned as i64,
1108 0,
1109 )
1110 .await?;
1111 }
1112 }
1113 } else {
1114 let cleaned = self.db.cleanup_old_sessions(30).await?;
1118 total_cleaned += cleaned;
1119 }
1120
1121 if total_cleaned > 100 {
1123 self.db.vacuum().await?;
1124 }
1125
1126 Ok(total_cleaned)
1127 }
1128
1129 async fn maybe_cleanup(&self, project_id: &Option<String>) -> MemoryResult<()> {
1131 if let Some(pid) = project_id {
1132 let stats = self.db.get_stats().await?;
1133 let config = self.db.get_or_create_config(pid).await?;
1134
1135 if stats.project_chunks > config.max_chunks {
1137 let excess = stats.project_chunks - config.max_chunks;
1139 tracing::info!("Project {} has {} excess chunks", pid, excess);
1142 }
1143 }
1144
1145 Ok(())
1146 }
1147
1148 pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1150 Ok(Vec::new())
1153 }
1154
1155 pub fn count_tokens(&self, text: &str) -> usize {
1157 self.tokenizer.count_tokens(text)
1158 }
1159
1160 pub async fn embedding_health(&self) -> EmbeddingHealth {
1162 let service = self.embedding_service.lock().await;
1163 if service.is_available() {
1164 EmbeddingHealth {
1165 status: "ok".to_string(),
1166 reason: None,
1167 }
1168 } else {
1169 EmbeddingHealth {
1170 status: "degraded_disabled".to_string(),
1171 reason: service.disabled_reason().map(ToString::to_string),
1172 }
1173 }
1174 }
1175
1176 pub async fn consolidate_session(
1178 &self,
1179 session_id: &str,
1180 project_id: Option<&str>,
1181 providers: &ProviderRegistry,
1182 config: &MemoryConsolidationConfig,
1183 ) -> MemoryResult<Option<String>> {
1184 if !config.enabled {
1185 return Ok(None);
1186 }
1187
1188 let chunks = self.db.get_session_chunks(session_id).await?;
1189 if chunks.is_empty() {
1190 return Ok(None);
1191 }
1192
1193 let mut text_parts = Vec::new();
1195 for chunk in &chunks {
1196 text_parts.push(chunk.content.clone());
1197 }
1198 let full_text = text_parts.join("\n\n---\n\n");
1199
1200 let prompt = format!(
1202 "Please provide a concise but comprehensive summary of the following chat session. \
1203 Focus on the key decisions, technical details, code changes, and unresolved issues. \
1204 Do NOT include conversational filler, greetings, or sign-offs. \
1205 This summary will be used as long-term memory to recall the context of this work.\n\n\
1206 Session transcripts:\n\n{}",
1207 full_text
1208 );
1209
1210 let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1211 let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1212
1213 let summary_text = match providers
1214 .complete_cheapest(&prompt, provider_override, model_override)
1215 .await
1216 {
1217 Ok(s) => s,
1218 Err(e) => {
1219 tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1220 return Ok(None);
1221 }
1222 };
1223
1224 if summary_text.trim().is_empty() {
1225 return Ok(None);
1226 }
1227
1228 let embedding = {
1230 let service = self.embedding_service.lock().await;
1231 service
1232 .embed(&summary_text)
1233 .await
1234 .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1235 };
1236
1237 let chunk_id = uuid::Uuid::new_v4().to_string();
1239 let chunk = MemoryChunk {
1240 id: chunk_id,
1241 content: summary_text.clone(),
1242 tier: MemoryTier::Project,
1243 session_id: None, project_id: project_id.map(ToString::to_string),
1245 created_at: Utc::now(),
1246 source: "consolidation".to_string(),
1247 token_count: self.count_tokens(&summary_text) as i64,
1248 source_path: None,
1249 source_mtime: None,
1250 source_size: None,
1251 source_hash: None,
1252 metadata: None,
1253 };
1254
1255 self.db.store_chunk(&chunk, &embedding).await?;
1256
1257 self.db.clear_session_memory(session_id).await?;
1259
1260 tracing::info!(
1261 "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1262 );
1263
1264 Ok(Some(summary_text))
1265 }
1266}
1267
1268pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1270 let db_path = app_data_dir.join("tandem_memory.db");
1271 MemoryManager::new(&db_path).await
1272}