1use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::types::{
10 CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
11 KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
12 LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
13 MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTier, NodeType,
14 StoreMessageRequest, TreeNode,
15};
16use chrono::Utc;
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tandem_orchestrator::{
21 build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
22 KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
23 KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
24};
25use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
26use tokio::sync::Mutex;
27
28pub struct MemoryManager {
30 db: Arc<MemoryDatabase>,
31 embedding_service: Arc<Mutex<EmbeddingService>>,
32 tokenizer: Tokenizer,
33}
34
35const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
36const GUIDE_DOC_SOURCE_PREFIX: &str = "guide_docs:";
37const GUIDE_DOC_RECENCY_HALFLIFE_MS: f64 = 30.0 * 24.0 * 60.0 * 60.0 * 1000.0;
38const GUIDE_DOC_RECENCY_WEIGHT: f64 = 0.12;
39
40impl MemoryManager {
41 fn guide_doc_similarity(similarity: f64, chunk: &MemoryChunk, now_ms: i64) -> f64 {
42 if !chunk.source.starts_with(GUIDE_DOC_SOURCE_PREFIX) {
43 return similarity.clamp(0.0, 1.0);
44 }
45
46 let normalized_similarity = similarity.clamp(0.0, 1.0);
47 let source_mtime = chunk
48 .source_mtime
49 .filter(|value| *value > 0)
50 .unwrap_or_else(|| chunk.created_at.timestamp_millis());
51 let age_ms = (now_ms - source_mtime).max(0) as f64;
52 let recency_score = 1.0 / (1.0 + (age_ms / GUIDE_DOC_RECENCY_HALFLIFE_MS));
53 ((1.0 - GUIDE_DOC_RECENCY_WEIGHT) * normalized_similarity
54 + (GUIDE_DOC_RECENCY_WEIGHT * recency_score))
55 .clamp(0.0, 1.0)
56 }
57
58 fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
59 err.to_string()
60 .to_lowercase()
61 .contains("database disk image is malformed")
62 }
63
64 pub fn db(&self) -> &Arc<MemoryDatabase> {
65 &self.db
66 }
67
68 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
70 Self::new_with_embedding_service(db_path, EmbeddingService::new()).await
71 }
72
73 pub async fn new_with_embedding_service(
76 db_path: &Path,
77 embedding_service: EmbeddingService,
78 ) -> MemoryResult<Self> {
79 let db = Arc::new(MemoryDatabase::new(db_path).await?);
80 let embedding_service = Arc::new(Mutex::new(embedding_service));
81 let tokenizer = Tokenizer::new()?;
82
83 Ok(Self {
84 db,
85 embedding_service,
86 tokenizer,
87 })
88 }
89
90 pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
97 if self
98 .db
99 .ensure_vector_tables_healthy()
100 .await
101 .unwrap_or(false)
102 {
103 tracing::warn!("Memory vector tables were repaired before storing message chunks");
104 }
105
106 let config = if let Some(ref pid) = request.project_id {
107 self.db.get_or_create_config(pid).await?
108 } else {
109 MemoryConfig::default()
110 };
111
112 let chunking_config = ChunkingConfig {
114 chunk_size: config.chunk_size as usize,
115 chunk_overlap: config.chunk_overlap as usize,
116 separator: None,
117 };
118
119 let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
120
121 if text_chunks.is_empty() {
122 return Ok(Vec::new());
123 }
124
125 let mut chunk_ids = Vec::with_capacity(text_chunks.len());
126 let embedding_service = self.embedding_service.lock().await;
127
128 for text_chunk in text_chunks {
129 let chunk_id = uuid::Uuid::new_v4().to_string();
130
131 let embedding = embedding_service.embed(&text_chunk.content).await?;
133
134 let chunk = MemoryChunk {
136 id: chunk_id.clone(),
137 content: text_chunk.content,
138 tier: request.tier,
139 session_id: request.session_id.clone(),
140 project_id: request.project_id.clone(),
141 source: request.source.clone(),
142 source_path: request.source_path.clone(),
143 source_mtime: request.source_mtime,
144 source_size: request.source_size,
145 source_hash: request.source_hash.clone(),
146 created_at: Utc::now(),
147 token_count: text_chunk.token_count as i64,
148 metadata: request.metadata.clone(),
149 };
150
151 if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
153 tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
154 let repaired = {
155 let repaired_after_error =
156 self.db.try_repair_after_error(&err).await.unwrap_or(false);
157 repaired_after_error
158 || self
159 .db
160 .ensure_vector_tables_healthy()
161 .await
162 .unwrap_or(false)
163 };
164 if repaired {
165 tracing::warn!(
166 "Retrying memory chunk insert after vector table repair: {}",
167 chunk.id
168 );
169 if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
170 if Self::is_malformed_database_error(&retry_err) {
171 tracing::warn!(
172 "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
173 chunk.id
174 );
175 self.db.reset_all_memory_tables().await?;
176 self.db.store_chunk(&chunk, &embedding).await?;
177 } else {
178 return Err(retry_err);
179 }
180 }
181 } else {
182 return Err(err);
183 }
184 }
185 chunk_ids.push(chunk_id);
186 }
187
188 if config.auto_cleanup {
190 self.maybe_cleanup(&request.project_id).await?;
191 }
192
193 Ok(chunk_ids)
194 }
195
196 pub async fn search(
198 &self,
199 query: &str,
200 tier: Option<MemoryTier>,
201 project_id: Option<&str>,
202 session_id: Option<&str>,
203 limit: Option<i64>,
204 ) -> MemoryResult<Vec<MemorySearchResult>> {
205 let effective_limit = limit.unwrap_or(5);
206
207 let embedding_service = self.embedding_service.lock().await;
209 let query_embedding = embedding_service.embed(query).await?;
210 drop(embedding_service);
211
212 let mut results = Vec::new();
213
214 let tiers_to_search = match tier {
216 Some(t) => vec![t],
217 None => {
218 if project_id.is_some() {
219 vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
220 } else {
221 vec![MemoryTier::Session, MemoryTier::Global]
222 }
223 }
224 };
225
226 let now_ms = Utc::now().timestamp_millis();
227 for search_tier in tiers_to_search {
228 let tier_results = match self
229 .db
230 .search_similar(
231 &query_embedding,
232 search_tier,
233 project_id,
234 session_id,
235 effective_limit,
236 )
237 .await
238 {
239 Ok(results) => results,
240 Err(err) => {
241 tracing::warn!(
242 "Memory tier search failed for {:?}: {}. Attempting vector repair.",
243 search_tier,
244 err
245 );
246 let repaired = {
247 let repaired_after_error =
248 self.db.try_repair_after_error(&err).await.unwrap_or(false);
249 repaired_after_error
250 || self
251 .db
252 .ensure_vector_tables_healthy()
253 .await
254 .unwrap_or(false)
255 };
256 if repaired {
257 match self
258 .db
259 .search_similar(
260 &query_embedding,
261 search_tier,
262 project_id,
263 session_id,
264 effective_limit,
265 )
266 .await
267 {
268 Ok(results) => results,
269 Err(retry_err) => {
270 tracing::warn!(
271 "Memory tier search still failing for {:?} after repair: {}",
272 search_tier,
273 retry_err
274 );
275 continue;
276 }
277 }
278 } else {
279 continue;
280 }
281 }
282 };
283
284 for (chunk, distance) in tier_results {
285 let similarity = 1.0 - distance.clamp(0.0, 1.0);
289 let similarity = if search_tier == MemoryTier::Global {
290 Self::guide_doc_similarity(similarity, &chunk, now_ms)
291 } else {
292 similarity
293 };
294
295 results.push(MemorySearchResult { chunk, similarity });
296 }
297 }
298
299 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
301 results.truncate(effective_limit as usize);
302
303 Ok(results)
304 }
305
306 pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
307 self.db.upsert_knowledge_space(space).await
308 }
309
310 pub async fn get_knowledge_space(
311 &self,
312 id: &str,
313 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
314 self.db.get_knowledge_space(id).await
315 }
316
317 pub async fn list_knowledge_spaces(
318 &self,
319 project_id: Option<&str>,
320 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
321 self.db.list_knowledge_spaces(project_id).await
322 }
323
324 pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
325 self.db.upsert_knowledge_item(item).await
326 }
327
328 pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
329 self.db.get_knowledge_item(id).await
330 }
331
332 pub async fn list_knowledge_items(
333 &self,
334 space_id: &str,
335 coverage_key: Option<&str>,
336 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
337 self.db.list_knowledge_items(space_id, coverage_key).await
338 }
339
340 pub async fn upsert_knowledge_coverage(
341 &self,
342 coverage: &KnowledgeCoverageRecord,
343 ) -> MemoryResult<()> {
344 self.db.upsert_knowledge_coverage(coverage).await
345 }
346
347 pub async fn get_knowledge_coverage(
348 &self,
349 coverage_key: &str,
350 space_id: &str,
351 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
352 self.db.get_knowledge_coverage(coverage_key, space_id).await
353 }
354
355 pub async fn promote_knowledge_item(
356 &self,
357 request: &KnowledgePromotionRequest,
358 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
359 self.db.promote_knowledge_item(request).await
360 }
361
362 fn space_matches_ref(
363 space: &KnowledgeSpaceRecord,
364 space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
365 project_id: &str,
366 ) -> bool {
367 if space.scope != space_ref.scope {
368 return false;
369 }
370 match space_ref.scope {
371 KnowledgeScope::Project | KnowledgeScope::Run => {
372 let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
373 if space.project_id.as_deref() != Some(target_project) {
374 return false;
375 }
376 }
377 KnowledgeScope::Global => {}
378 }
379 if let Some(namespace) = space_ref.namespace.as_deref() {
380 if space.namespace.as_deref() != Some(namespace) {
381 return false;
382 }
383 }
384 true
385 }
386
387 fn select_preflight_namespace(
388 binding: &KnowledgeBinding,
389 spaces: &[KnowledgeSpaceRecord],
390 ) -> Option<String> {
391 if let Some(namespace) = binding.namespace.clone() {
392 return Some(namespace);
393 }
394 if binding.read_spaces.len() == 1 {
395 if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
396 return Some(namespace);
397 }
398 }
399 if spaces.len() == 1 {
400 return spaces[0].namespace.clone();
401 }
402 let mut unique = HashSet::new();
403 for space in spaces {
404 if let Some(namespace) = space.namespace.as_ref() {
405 unique.insert(namespace);
406 }
407 }
408 if unique.len() == 1 {
409 unique.into_iter().next().map(|value| value.to_string())
410 } else {
411 None
412 }
413 }
414
415 fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
416 !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
417 }
418
419 fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
420 match (space_namespace, binding_namespace) {
421 (None, None) => true,
422 (Some(space), Some(binding)) => {
423 normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
424 }
425 _ => false,
426 }
427 }
428
429 fn is_fresh_enough(
430 freshness_expires_at_ms: Option<u64>,
431 freshness_policy_ms: Option<u64>,
432 coverage_last_promoted_at_ms: Option<u64>,
433 item_created_at_ms: u64,
434 now_ms: u64,
435 ) -> bool {
436 if let Some(expires_at_ms) = freshness_expires_at_ms {
437 return expires_at_ms > now_ms;
438 }
439 let Some(policy_ms) = freshness_policy_ms else {
440 return true;
441 };
442 let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
443 now_ms.saturating_sub(basis_ms) <= policy_ms
444 }
445
446 async fn resolve_preflight_spaces(
447 &self,
448 request: &KnowledgePreflightRequest,
449 _coverage_key: &str,
450 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
451 let binding = &request.binding;
452 let mut spaces = Vec::new();
453 let mut seen_space_ids = HashSet::new();
454
455 let push_space = |space: KnowledgeSpaceRecord,
456 spaces: &mut Vec<KnowledgeSpaceRecord>,
457 seen_space_ids: &mut HashSet<String>| {
458 if seen_space_ids.insert(space.id.clone()) {
459 spaces.push(space);
460 }
461 };
462
463 if Self::binding_uses_explicit_spaces(binding) {
464 for space_ref in binding
465 .read_spaces
466 .iter()
467 .chain(binding.promote_spaces.iter())
468 {
469 if let Some(space_id) = space_ref.space_id.as_deref() {
470 if let Some(space) = self.get_knowledge_space(space_id).await? {
471 push_space(space, &mut spaces, &mut seen_space_ids);
472 }
473 continue;
474 }
475
476 match space_ref.scope {
477 KnowledgeScope::Run => {}
478 KnowledgeScope::Project => {
479 let candidate_project_id = space_ref
480 .project_id
481 .as_deref()
482 .unwrap_or(&request.project_id);
483 let project_spaces = self
484 .list_knowledge_spaces(Some(candidate_project_id))
485 .await?;
486 for space in project_spaces.into_iter().filter(|space| {
487 Self::space_matches_ref(space, space_ref, &request.project_id)
488 }) {
489 push_space(space, &mut spaces, &mut seen_space_ids);
490 }
491 }
492 KnowledgeScope::Global => {
493 let global_spaces = self.list_knowledge_spaces(None).await?;
494 for space in global_spaces.into_iter().filter(|space| {
495 Self::space_matches_ref(space, space_ref, &request.project_id)
496 }) {
497 push_space(space, &mut spaces, &mut seen_space_ids);
498 }
499 }
500 }
501 }
502 return Ok(spaces);
503 }
504
505 if request.project_id.trim().is_empty() {
506 return Ok(spaces);
507 }
508
509 let project_spaces = self
510 .list_knowledge_spaces(Some(&request.project_id))
511 .await?;
512 let requested_namespace = if binding.namespace.is_some() {
513 binding.namespace.clone()
514 } else {
515 Self::select_preflight_namespace(binding, &project_spaces)
516 };
517 let Some(requested_namespace) = requested_namespace else {
518 return Ok(spaces);
519 };
520
521 for space in project_spaces.into_iter().filter(|space| {
522 space.scope == KnowledgeScope::Project
523 && Self::namespace_matches(
524 space.namespace.as_deref(),
525 Some(requested_namespace.as_str()),
526 )
527 }) {
528 push_space(space, &mut spaces, &mut seen_space_ids);
529 }
530 Ok(spaces)
531 }
532
533 pub async fn preflight_knowledge(
534 &self,
535 request: &KnowledgePreflightRequest,
536 ) -> MemoryResult<KnowledgePreflightResult> {
537 let binding = &request.binding;
538 let project_spaces = if request.project_id.trim().is_empty() {
539 Vec::new()
540 } else {
541 self.list_knowledge_spaces(Some(&request.project_id))
542 .await?
543 };
544 let namespace = binding
545 .namespace
546 .clone()
547 .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
548 let coverage_key = build_knowledge_coverage_key(
549 &request.project_id,
550 namespace.as_deref(),
551 &request.task_family,
552 &request.subject,
553 );
554
555 if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
556 return Ok(KnowledgePreflightResult {
557 project_id: request.project_id.clone(),
558 namespace,
559 task_family: request.task_family.clone(),
560 subject: request.subject.clone(),
561 coverage_key,
562 decision: KnowledgeReuseDecision::Disabled,
563 reuse_reason: None,
564 skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
565 freshness_reason: None,
566 items: Vec::new(),
567 });
568 }
569
570 let spaces = self
571 .resolve_preflight_spaces(request, &coverage_key)
572 .await?;
573 if spaces.is_empty() {
574 return Ok(KnowledgePreflightResult {
575 project_id: request.project_id.clone(),
576 namespace,
577 task_family: request.task_family.clone(),
578 subject: request.subject.clone(),
579 coverage_key,
580 decision: KnowledgeReuseDecision::NoPriorKnowledge,
581 reuse_reason: None,
582 skip_reason: Some("no reusable knowledge spaces were found".to_string()),
583 freshness_reason: None,
584 items: Vec::new(),
585 });
586 }
587
588 let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
589 let mut fresh_items = Vec::new();
590 let mut stale_items = Vec::new();
591 let mut freshest_reason = None;
592
593 for space in &spaces {
594 let items = self
595 .list_knowledge_items(&space.id, Some(&coverage_key))
596 .await?;
597 let coverage = self
598 .get_knowledge_coverage(&coverage_key, &space.id)
599 .await?;
600 for item in items {
601 if !item.status.is_active() {
602 continue;
603 }
604 let Some(trust_level) = item.status.as_trust_level() else {
605 continue;
606 };
607 if !trust_level.meets_floor(binding.trust_floor) {
608 continue;
609 }
610 let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
611 coverage
612 .as_ref()
613 .and_then(|coverage| coverage.freshness_expires_at_ms)
614 });
615 let pack_item = KnowledgePackItem {
616 item_id: item.id.clone(),
617 space_id: space.id.clone(),
618 coverage_key: item.coverage_key.clone(),
619 title: item.title.clone(),
620 summary: item.summary.clone(),
621 trust_level,
622 status: item.status.to_string(),
623 artifact_refs: item.artifact_refs.clone(),
624 source_memory_ids: item.source_memory_ids.clone(),
625 freshness_expires_at_ms,
626 };
627 if Self::is_fresh_enough(
628 freshness_expires_at_ms,
629 binding.freshness_ms,
630 coverage
631 .as_ref()
632 .and_then(|coverage| coverage.last_promoted_at_ms),
633 item.created_at_ms,
634 now_ms,
635 ) {
636 fresh_items.push(pack_item);
637 } else {
638 freshest_reason = Some(match freshness_expires_at_ms {
639 Some(expires_at_ms) => format!(
640 "coverage `{}` in space `{}` expired at {}",
641 coverage_key, space.id, expires_at_ms
642 ),
643 None => format!(
644 "coverage `{}` in space `{}` lacks freshness metadata",
645 coverage_key, space.id
646 ),
647 });
648 stale_items.push(pack_item);
649 }
650 }
651 }
652
653 fresh_items.sort_by(|left, right| {
654 right
655 .trust_level
656 .rank()
657 .cmp(&left.trust_level.rank())
658 .then_with(|| {
659 right
660 .freshness_expires_at_ms
661 .unwrap_or(0)
662 .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
663 })
664 .then_with(|| left.title.cmp(&right.title))
665 });
666 stale_items.sort_by(|left, right| {
667 right
668 .trust_level
669 .rank()
670 .cmp(&left.trust_level.rank())
671 .then_with(|| left.title.cmp(&right.title))
672 });
673
674 if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
675 let selected = fresh_items
676 .into_iter()
677 .take(MAX_KNOWLEDGE_PACK_ITEMS)
678 .collect::<Vec<_>>();
679 let decision = match freshest_trust_level {
680 KnowledgeTrustLevel::ApprovedDefault => {
681 KnowledgeReuseDecision::ReuseApprovedDefault
682 }
683 _ => KnowledgeReuseDecision::ReusePromoted,
684 };
685 let selected_count = selected.len();
686 return Ok(KnowledgePreflightResult {
687 project_id: request.project_id.clone(),
688 namespace,
689 task_family: request.task_family.clone(),
690 subject: request.subject.clone(),
691 coverage_key,
692 decision,
693 reuse_reason: Some(format!(
694 "reusing {} promoted knowledge item(s) from {} space(s)",
695 selected_count,
696 spaces.len()
697 )),
698 skip_reason: None,
699 freshness_reason: None,
700 items: selected,
701 });
702 }
703
704 if !stale_items.is_empty() {
705 let selected = stale_items
706 .into_iter()
707 .take(MAX_KNOWLEDGE_PACK_ITEMS)
708 .collect::<Vec<_>>();
709 return Ok(KnowledgePreflightResult {
710 project_id: request.project_id.clone(),
711 namespace,
712 task_family: request.task_family.clone(),
713 subject: request.subject.clone(),
714 coverage_key,
715 decision: KnowledgeReuseDecision::RefreshRequired,
716 reuse_reason: None,
717 skip_reason: Some(
718 "prior knowledge exists but is not fresh enough to reuse".to_string(),
719 ),
720 freshness_reason: freshest_reason.or_else(|| {
721 Some("matching knowledge exists but freshness policy rejected it".to_string())
722 }),
723 items: selected,
724 });
725 }
726
727 Ok(KnowledgePreflightResult {
728 project_id: request.project_id.clone(),
729 namespace,
730 task_family: request.task_family.clone(),
731 subject: request.subject.clone(),
732 coverage_key,
733 decision: KnowledgeReuseDecision::NoPriorKnowledge,
734 reuse_reason: None,
735 skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
736 freshness_reason: None,
737 items: Vec::new(),
738 })
739 }
740
741 pub async fn retrieve_context(
746 &self,
747 query: &str,
748 project_id: Option<&str>,
749 session_id: Option<&str>,
750 token_budget: Option<i64>,
751 ) -> MemoryResult<MemoryContext> {
752 let (context, _) = self
753 .retrieve_context_with_meta(query, project_id, session_id, token_budget)
754 .await?;
755 Ok(context)
756 }
757
758 pub async fn retrieve_context_with_meta(
760 &self,
761 query: &str,
762 project_id: Option<&str>,
763 session_id: Option<&str>,
764 token_budget: Option<i64>,
765 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
766 let config = if let Some(pid) = project_id {
767 self.db.get_or_create_config(pid).await?
768 } else {
769 MemoryConfig::default()
770 };
771 let budget = token_budget.unwrap_or(config.token_budget);
772 let retrieval_limit = config.retrieval_k.max(1);
773
774 let current_session = if let Some(sid) = session_id {
776 self.db.get_session_chunks(sid).await?
777 } else {
778 Vec::new()
779 };
780
781 let search_results = self
783 .search(query, None, project_id, session_id, Some(retrieval_limit))
784 .await?;
785
786 let mut score_min: Option<f64> = None;
787 let mut score_max: Option<f64> = None;
788 for result in &search_results {
789 score_min = Some(match score_min {
790 Some(current) => current.min(result.similarity),
791 None => result.similarity,
792 });
793 score_max = Some(match score_max {
794 Some(current) => current.max(result.similarity),
795 None => result.similarity,
796 });
797 }
798
799 let mut current_session = current_session;
800 let mut relevant_history = Vec::new();
801 let mut project_facts = Vec::new();
802
803 for result in search_results {
804 match result.chunk.tier {
805 MemoryTier::Project => {
806 project_facts.push(result.chunk);
807 }
808 MemoryTier::Global => {
809 project_facts.push(result.chunk);
810 }
811 MemoryTier::Session => {
812 if !current_session.iter().any(|c| c.id == result.chunk.id) {
814 relevant_history.push(result.chunk);
815 }
816 }
817 }
818 }
819
820 let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
822 total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
823 total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
824
825 if total_tokens > budget {
827 let excess = total_tokens - budget;
828 self.trim_context(
829 &mut current_session,
830 &mut relevant_history,
831 &mut project_facts,
832 excess,
833 )?;
834 total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
835 + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
836 + project_facts.iter().map(|c| c.token_count).sum::<i64>();
837 }
838
839 let context = MemoryContext {
840 current_session,
841 relevant_history,
842 project_facts,
843 total_tokens,
844 };
845 let chunks_total = context.current_session.len()
846 + context.relevant_history.len()
847 + context.project_facts.len();
848 let meta = MemoryRetrievalMeta {
849 used: chunks_total > 0,
850 chunks_total,
851 session_chunks: context.current_session.len(),
852 history_chunks: context.relevant_history.len(),
853 project_fact_chunks: context.project_facts.len(),
854 score_min,
855 score_max,
856 };
857
858 Ok((context, meta))
859 }
860
861 fn trim_context(
863 &self,
864 current_session: &mut Vec<MemoryChunk>,
865 relevant_history: &mut Vec<MemoryChunk>,
866 project_facts: &mut Vec<MemoryChunk>,
867 excess_tokens: i64,
868 ) -> MemoryResult<()> {
869 let mut tokens_to_remove = excess_tokens;
870
871 while tokens_to_remove > 0 && !relevant_history.is_empty() {
873 if let Some(chunk) = relevant_history.pop() {
874 tokens_to_remove -= chunk.token_count;
875 }
876 }
877
878 while tokens_to_remove > 0 && !project_facts.is_empty() {
880 if let Some(chunk) = project_facts.pop() {
881 tokens_to_remove -= chunk.token_count;
882 }
883 }
884
885 while tokens_to_remove > 0 && !current_session.is_empty() {
886 if let Some(chunk) = current_session.pop() {
887 tokens_to_remove -= chunk.token_count;
888 }
889 }
890
891 Ok(())
892 }
893
894 pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
896 let count = self.db.clear_session_memory(session_id).await?;
897
898 self.db
900 .log_cleanup(
901 "manual",
902 MemoryTier::Session,
903 None,
904 Some(session_id),
905 count as i64,
906 0,
907 )
908 .await?;
909
910 Ok(count)
911 }
912
913 pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
915 let count = self.db.clear_project_memory(project_id).await?;
916
917 self.db
919 .log_cleanup(
920 "manual",
921 MemoryTier::Project,
922 Some(project_id),
923 None,
924 count as i64,
925 0,
926 )
927 .await?;
928
929 Ok(count)
930 }
931
932 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
934 self.db.get_stats().await
935 }
936
937 pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
939 self.db.get_or_create_config(project_id).await
940 }
941
942 pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
944 self.db.update_config(project_id, config).await
945 }
946
947 pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
948 self.db.get_node_by_uri(uri).await
949 }
950
951 pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
952 let nodes = self.db.list_directory(uri).await?;
953 let directories: Vec<MemoryNode> = nodes
954 .iter()
955 .filter(|n| n.node_type == NodeType::Directory)
956 .cloned()
957 .collect();
958 let files: Vec<MemoryNode> = nodes
959 .iter()
960 .filter(|n| n.node_type == NodeType::File)
961 .cloned()
962 .collect();
963
964 Ok(DirectoryListing {
965 uri: uri.to_string(),
966 nodes,
967 total_children: directories.len() + files.len(),
968 directories,
969 files,
970 })
971 }
972
973 pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
974 self.db.get_children_tree(uri, max_depth).await
975 }
976
977 pub async fn create_context_node(
978 &self,
979 uri: &str,
980 node_type: NodeType,
981 metadata: Option<serde_json::Value>,
982 ) -> MemoryResult<String> {
983 let parsed_uri =
984 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
985 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
986 self.db
987 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
988 .await
989 }
990
991 pub async fn get_context_layer(
992 &self,
993 node_id: &str,
994 layer_type: LayerType,
995 ) -> MemoryResult<Option<MemoryLayer>> {
996 self.db.get_layer(node_id, layer_type).await
997 }
998
999 pub async fn store_content_with_layers(
1000 &self,
1001 uri: &str,
1002 content: &str,
1003 metadata: Option<serde_json::Value>,
1004 ) -> MemoryResult<String> {
1005 let parsed_uri =
1006 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
1007 let node_type = if parsed_uri
1008 .last_segment()
1009 .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
1010 .unwrap_or(false)
1011 {
1012 NodeType::File
1013 } else {
1014 NodeType::Directory
1015 };
1016
1017 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1018 let node_id = self
1019 .db
1020 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1021 .await?;
1022
1023 let token_count = self.tokenizer.count_tokens(content) as i64;
1024 self.db
1025 .create_layer(&node_id, LayerType::L2, content, token_count, None)
1026 .await?;
1027
1028 Ok(node_id)
1029 }
1030
1031 pub async fn generate_layers_for_node(
1032 &self,
1033 node_id: &str,
1034 providers: &ProviderRegistry,
1035 ) -> MemoryResult<()> {
1036 let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1037 let l2_content = match l2_layer {
1038 Some(layer) => layer.content,
1039 None => return Ok(()),
1040 };
1041
1042 let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1043
1044 let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1045
1046 let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1047 let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1048
1049 if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1050 self.db
1051 .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1052 .await?;
1053 }
1054
1055 if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1056 self.db
1057 .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1058 .await?;
1059 }
1060
1061 Ok(())
1062 }
1063
1064 pub async fn get_layer_content(
1065 &self,
1066 node_id: &str,
1067 layer_type: LayerType,
1068 ) -> MemoryResult<Option<String>> {
1069 let layer = self.db.get_layer(node_id, layer_type).await?;
1070 Ok(layer.map(|l| l.content))
1071 }
1072
1073 pub async fn store_content_with_layers_auto(
1074 &self,
1075 uri: &str,
1076 content: &str,
1077 metadata: Option<serde_json::Value>,
1078 providers: Option<&ProviderRegistry>,
1079 ) -> MemoryResult<String> {
1080 let node_id = self
1081 .store_content_with_layers(uri, content, metadata)
1082 .await?;
1083
1084 if let Some(p) = providers {
1085 if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1086 tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1087 }
1088 }
1089
1090 Ok(node_id)
1091 }
1092
1093 pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1095 let mut total_cleaned = 0u64;
1096
1097 if let Some(pid) = project_id {
1098 let config = self.db.get_or_create_config(pid).await?;
1100
1101 if config.auto_cleanup {
1102 let cleaned = self
1104 .db
1105 .cleanup_old_sessions(config.session_retention_days)
1106 .await?;
1107 total_cleaned += cleaned;
1108
1109 if cleaned > 0 {
1110 self.db
1111 .log_cleanup(
1112 "auto",
1113 MemoryTier::Session,
1114 Some(pid),
1115 None,
1116 cleaned as i64,
1117 0,
1118 )
1119 .await?;
1120 }
1121 }
1122 } else {
1123 let cleaned = self.db.cleanup_old_sessions(30).await?;
1127 total_cleaned += cleaned;
1128 }
1129
1130 if total_cleaned > 100 {
1132 self.db.vacuum().await?;
1133 }
1134
1135 Ok(total_cleaned)
1136 }
1137
1138 async fn maybe_cleanup(&self, project_id: &Option<String>) -> MemoryResult<()> {
1140 if let Some(pid) = project_id {
1141 let stats = self.db.get_stats().await?;
1142 let config = self.db.get_or_create_config(pid).await?;
1143
1144 if stats.project_chunks > config.max_chunks {
1146 let excess = stats.project_chunks - config.max_chunks;
1148 tracing::info!("Project {} has {} excess chunks", pid, excess);
1151 }
1152 }
1153
1154 Ok(())
1155 }
1156
1157 pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1159 Ok(Vec::new())
1162 }
1163
1164 pub fn count_tokens(&self, text: &str) -> usize {
1166 self.tokenizer.count_tokens(text)
1167 }
1168
1169 pub async fn embedding_health(&self) -> EmbeddingHealth {
1171 let service = self.embedding_service.lock().await;
1172 if service.is_available() {
1173 EmbeddingHealth {
1174 status: "ok".to_string(),
1175 reason: None,
1176 }
1177 } else {
1178 EmbeddingHealth {
1179 status: "degraded_disabled".to_string(),
1180 reason: service.disabled_reason().map(ToString::to_string),
1181 }
1182 }
1183 }
1184
1185 pub async fn consolidate_session(
1187 &self,
1188 session_id: &str,
1189 project_id: Option<&str>,
1190 providers: &ProviderRegistry,
1191 config: &MemoryConsolidationConfig,
1192 ) -> MemoryResult<Option<String>> {
1193 if !config.enabled {
1194 return Ok(None);
1195 }
1196
1197 let chunks = self.db.get_session_chunks(session_id).await?;
1198 if chunks.is_empty() {
1199 return Ok(None);
1200 }
1201
1202 let mut text_parts = Vec::new();
1204 for chunk in &chunks {
1205 text_parts.push(chunk.content.clone());
1206 }
1207 let full_text = text_parts.join("\n\n---\n\n");
1208
1209 let prompt = format!(
1211 "Please provide a concise but comprehensive summary of the following chat session. \
1212 Focus on the key decisions, technical details, code changes, and unresolved issues. \
1213 Do NOT include conversational filler, greetings, or sign-offs. \
1214 This summary will be used as long-term memory to recall the context of this work.\n\n\
1215 Session transcripts:\n\n{}",
1216 full_text
1217 );
1218
1219 let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1220 let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1221
1222 let summary_text = match providers
1223 .complete_cheapest(&prompt, provider_override, model_override)
1224 .await
1225 {
1226 Ok(s) => s,
1227 Err(e) => {
1228 tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1229 return Ok(None);
1230 }
1231 };
1232
1233 if summary_text.trim().is_empty() {
1234 return Ok(None);
1235 }
1236
1237 let embedding = {
1239 let service = self.embedding_service.lock().await;
1240 service
1241 .embed(&summary_text)
1242 .await
1243 .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1244 };
1245
1246 let chunk_id = uuid::Uuid::new_v4().to_string();
1248 let chunk = MemoryChunk {
1249 id: chunk_id,
1250 content: summary_text.clone(),
1251 tier: MemoryTier::Project,
1252 session_id: None, project_id: project_id.map(ToString::to_string),
1254 created_at: Utc::now(),
1255 source: "consolidation".to_string(),
1256 token_count: self.count_tokens(&summary_text) as i64,
1257 source_path: None,
1258 source_mtime: None,
1259 source_size: None,
1260 source_hash: None,
1261 metadata: None,
1262 };
1263
1264 self.db.store_chunk(&chunk, &embedding).await?;
1265
1266 self.db.clear_session_memory(session_id).await?;
1268
1269 tracing::info!(
1270 "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1271 );
1272
1273 Ok(Some(summary_text))
1274 }
1275}
1276
1277pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1279 let db_path = app_data_dir.join("tandem_memory.db");
1280 MemoryManager::new(&db_path).await
1281}