1use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::types::{
10 CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
11 KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
12 LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
13 MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTenantScope,
14 MemoryTier, NodeType, StoreMessageRequest, TreeNode,
15};
16use chrono::Utc;
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tandem_orchestrator::{
21 build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
22 KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
23 KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
24};
25use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
26use tokio::sync::Mutex;
27
28pub struct MemoryManager {
30 db: Arc<MemoryDatabase>,
31 embedding_service: Arc<Mutex<EmbeddingService>>,
32 tokenizer: Tokenizer,
33}
34
35const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
36const GUIDE_DOC_SOURCE_PREFIX: &str = "guide_docs:";
37const GUIDE_DOC_RECENCY_HALFLIFE_MS: f64 = 30.0 * 24.0 * 60.0 * 60.0 * 1000.0;
38const GUIDE_DOC_RECENCY_WEIGHT: f64 = 0.12;
39
40impl MemoryManager {
41 fn guide_doc_similarity(similarity: f64, chunk: &MemoryChunk, now_ms: i64) -> f64 {
42 if !chunk.source.starts_with(GUIDE_DOC_SOURCE_PREFIX) {
43 return similarity.clamp(0.0, 1.0);
44 }
45
46 let normalized_similarity = similarity.clamp(0.0, 1.0);
47 let source_mtime = chunk
48 .source_mtime
49 .filter(|value| *value > 0)
50 .unwrap_or_else(|| chunk.created_at.timestamp_millis());
51 let age_ms = (now_ms - source_mtime).max(0) as f64;
52 let recency_score = 1.0 / (1.0 + (age_ms / GUIDE_DOC_RECENCY_HALFLIFE_MS));
53 ((1.0 - GUIDE_DOC_RECENCY_WEIGHT) * normalized_similarity
54 + (GUIDE_DOC_RECENCY_WEIGHT * recency_score))
55 .clamp(0.0, 1.0)
56 }
57
58 fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
59 err.to_string()
60 .to_lowercase()
61 .contains("database disk image is malformed")
62 }
63
64 pub fn db(&self) -> &Arc<MemoryDatabase> {
65 &self.db
66 }
67
68 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
70 Self::new_with_embedding_service(db_path, EmbeddingService::new()).await
71 }
72
73 pub async fn new_with_embedding_service(
76 db_path: &Path,
77 embedding_service: EmbeddingService,
78 ) -> MemoryResult<Self> {
79 let db = Arc::new(MemoryDatabase::new(db_path).await?);
80 let embedding_service = Arc::new(Mutex::new(embedding_service));
81 let tokenizer = Tokenizer::new()?;
82
83 Ok(Self {
84 db,
85 embedding_service,
86 tokenizer,
87 })
88 }
89
90 pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
97 if self
98 .db
99 .ensure_vector_tables_healthy()
100 .await
101 .unwrap_or(false)
102 {
103 tracing::warn!("Memory vector tables were repaired before storing message chunks");
104 }
105
106 let config = if let Some(ref pid) = request.project_id {
107 self.db
108 .get_or_create_config_for_tenant(pid, &request.tenant_scope)
109 .await?
110 } else {
111 MemoryConfig::default()
112 };
113
114 let chunking_config = ChunkingConfig {
116 chunk_size: config.chunk_size as usize,
117 chunk_overlap: config.chunk_overlap as usize,
118 separator: None,
119 };
120
121 let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
122
123 if text_chunks.is_empty() {
124 return Ok(Vec::new());
125 }
126
127 let mut chunk_ids = Vec::with_capacity(text_chunks.len());
128 let embedding_service = self.embedding_service.lock().await;
129
130 for text_chunk in text_chunks {
131 let chunk_id = uuid::Uuid::new_v4().to_string();
132
133 let embedding = embedding_service.embed(&text_chunk.content).await?;
135
136 let chunk = MemoryChunk {
138 id: chunk_id.clone(),
139 content: text_chunk.content,
140 tier: request.tier,
141 session_id: request.session_id.clone(),
142 project_id: request.project_id.clone(),
143 source: request.source.clone(),
144 source_path: request.source_path.clone(),
145 source_mtime: request.source_mtime,
146 source_size: request.source_size,
147 source_hash: request.source_hash.clone(),
148 tenant_scope: request.tenant_scope.clone(),
149 created_at: Utc::now(),
150 token_count: text_chunk.token_count as i64,
151 metadata: request.metadata.clone(),
152 };
153
154 if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
156 tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
157 let repaired = {
158 let repaired_after_error =
159 self.db.try_repair_after_error(&err).await.unwrap_or(false);
160 repaired_after_error
161 || self
162 .db
163 .ensure_vector_tables_healthy()
164 .await
165 .unwrap_or(false)
166 };
167 if repaired {
168 tracing::warn!(
169 "Retrying memory chunk insert after vector table repair: {}",
170 chunk.id
171 );
172 if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
173 if Self::is_malformed_database_error(&retry_err) {
174 tracing::warn!(
175 "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
176 chunk.id
177 );
178 self.db.reset_all_memory_tables().await?;
179 self.db.store_chunk(&chunk, &embedding).await?;
180 } else {
181 return Err(retry_err);
182 }
183 }
184 } else {
185 return Err(err);
186 }
187 }
188 chunk_ids.push(chunk_id);
189 }
190
191 if config.auto_cleanup {
193 self.maybe_cleanup(&request.project_id, &request.tenant_scope)
194 .await?;
195 }
196
197 Ok(chunk_ids)
198 }
199
200 pub async fn search(
202 &self,
203 query: &str,
204 tier: Option<MemoryTier>,
205 project_id: Option<&str>,
206 session_id: Option<&str>,
207 limit: Option<i64>,
208 ) -> MemoryResult<Vec<MemorySearchResult>> {
209 self.search_for_tenant(
210 query,
211 tier,
212 project_id,
213 session_id,
214 &MemoryTenantScope::local(),
215 limit,
216 )
217 .await
218 }
219
220 pub async fn search_for_tenant(
221 &self,
222 query: &str,
223 tier: Option<MemoryTier>,
224 project_id: Option<&str>,
225 session_id: Option<&str>,
226 tenant_scope: &MemoryTenantScope,
227 limit: Option<i64>,
228 ) -> MemoryResult<Vec<MemorySearchResult>> {
229 let effective_limit = limit.unwrap_or(5);
230
231 let embedding_service = self.embedding_service.lock().await;
233 let query_embedding = embedding_service.embed(query).await?;
234 drop(embedding_service);
235
236 let mut results = Vec::new();
237
238 let tiers_to_search = match tier {
240 Some(t) => vec![t],
241 None => {
242 if project_id.is_some() {
243 vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
244 } else {
245 vec![MemoryTier::Session, MemoryTier::Global]
246 }
247 }
248 };
249
250 let now_ms = Utc::now().timestamp_millis();
251 for search_tier in tiers_to_search {
252 let tier_results = match self
253 .db
254 .search_similar_for_tenant(
255 &query_embedding,
256 search_tier,
257 project_id,
258 session_id,
259 tenant_scope,
260 effective_limit,
261 )
262 .await
263 {
264 Ok(results) => results,
265 Err(err) => {
266 tracing::warn!(
267 "Memory tier search failed for {:?}: {}. Attempting vector repair.",
268 search_tier,
269 err
270 );
271 let repaired = {
272 let repaired_after_error =
273 self.db.try_repair_after_error(&err).await.unwrap_or(false);
274 repaired_after_error
275 || self
276 .db
277 .ensure_vector_tables_healthy()
278 .await
279 .unwrap_or(false)
280 };
281 if repaired {
282 match self
283 .db
284 .search_similar_for_tenant(
285 &query_embedding,
286 search_tier,
287 project_id,
288 session_id,
289 tenant_scope,
290 effective_limit,
291 )
292 .await
293 {
294 Ok(results) => results,
295 Err(retry_err) => {
296 tracing::warn!(
297 "Memory tier search still failing for {:?} after repair: {}",
298 search_tier,
299 retry_err
300 );
301 continue;
302 }
303 }
304 } else {
305 continue;
306 }
307 }
308 };
309
310 for (chunk, distance) in tier_results {
311 let similarity = 1.0 - distance.clamp(0.0, 1.0);
315 let similarity = if search_tier == MemoryTier::Global {
316 Self::guide_doc_similarity(similarity, &chunk, now_ms)
317 } else {
318 similarity
319 };
320
321 results.push(MemorySearchResult { chunk, similarity });
322 }
323 }
324
325 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
327 results.truncate(effective_limit as usize);
328
329 Ok(results)
330 }
331
332 pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
333 self.db.upsert_knowledge_space(space).await
334 }
335
336 pub async fn upsert_knowledge_space_for_tenant(
337 &self,
338 space: &KnowledgeSpaceRecord,
339 tenant_scope: &MemoryTenantScope,
340 ) -> MemoryResult<()> {
341 self.db
342 .upsert_knowledge_space_for_tenant(space, tenant_scope)
343 .await
344 }
345
346 pub async fn get_knowledge_space(
347 &self,
348 id: &str,
349 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
350 self.db.get_knowledge_space(id).await
351 }
352
353 pub async fn get_knowledge_space_for_tenant(
354 &self,
355 id: &str,
356 tenant_scope: &MemoryTenantScope,
357 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
358 self.db
359 .get_knowledge_space_for_tenant(id, tenant_scope)
360 .await
361 }
362
363 pub async fn list_knowledge_spaces(
364 &self,
365 project_id: Option<&str>,
366 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
367 self.db.list_knowledge_spaces(project_id).await
368 }
369
370 pub async fn list_knowledge_spaces_for_tenant(
371 &self,
372 project_id: Option<&str>,
373 tenant_scope: &MemoryTenantScope,
374 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
375 self.db
376 .list_knowledge_spaces_for_tenant(project_id, tenant_scope)
377 .await
378 }
379
380 pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
381 self.db.upsert_knowledge_item(item).await
382 }
383
384 pub async fn upsert_knowledge_item_for_tenant(
385 &self,
386 item: &KnowledgeItemRecord,
387 tenant_scope: &MemoryTenantScope,
388 ) -> MemoryResult<()> {
389 self.db
390 .upsert_knowledge_item_for_tenant(item, tenant_scope)
391 .await
392 }
393
394 pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
395 self.db.get_knowledge_item(id).await
396 }
397
398 pub async fn get_knowledge_item_for_tenant(
399 &self,
400 id: &str,
401 tenant_scope: &MemoryTenantScope,
402 ) -> MemoryResult<Option<KnowledgeItemRecord>> {
403 self.db
404 .get_knowledge_item_for_tenant(id, tenant_scope)
405 .await
406 }
407
408 pub async fn list_knowledge_items(
409 &self,
410 space_id: &str,
411 coverage_key: Option<&str>,
412 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
413 self.db.list_knowledge_items(space_id, coverage_key).await
414 }
415
416 pub async fn list_knowledge_items_for_tenant(
417 &self,
418 space_id: &str,
419 coverage_key: Option<&str>,
420 tenant_scope: &MemoryTenantScope,
421 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
422 self.db
423 .list_knowledge_items_for_tenant(space_id, coverage_key, tenant_scope)
424 .await
425 }
426
427 pub async fn upsert_knowledge_coverage(
428 &self,
429 coverage: &KnowledgeCoverageRecord,
430 ) -> MemoryResult<()> {
431 self.db.upsert_knowledge_coverage(coverage).await
432 }
433
434 pub async fn upsert_knowledge_coverage_for_tenant(
435 &self,
436 coverage: &KnowledgeCoverageRecord,
437 tenant_scope: &MemoryTenantScope,
438 ) -> MemoryResult<()> {
439 self.db
440 .upsert_knowledge_coverage_for_tenant(coverage, tenant_scope)
441 .await
442 }
443
444 pub async fn get_knowledge_coverage(
445 &self,
446 coverage_key: &str,
447 space_id: &str,
448 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
449 self.db.get_knowledge_coverage(coverage_key, space_id).await
450 }
451
452 pub async fn get_knowledge_coverage_for_tenant(
453 &self,
454 coverage_key: &str,
455 space_id: &str,
456 tenant_scope: &MemoryTenantScope,
457 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
458 self.db
459 .get_knowledge_coverage_for_tenant(coverage_key, space_id, tenant_scope)
460 .await
461 }
462
463 pub async fn promote_knowledge_item(
464 &self,
465 request: &KnowledgePromotionRequest,
466 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
467 self.db.promote_knowledge_item(request).await
468 }
469
470 pub async fn promote_knowledge_item_for_tenant(
471 &self,
472 request: &KnowledgePromotionRequest,
473 tenant_scope: &MemoryTenantScope,
474 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
475 self.db
476 .promote_knowledge_item_for_tenant(request, tenant_scope)
477 .await
478 }
479
480 fn space_matches_ref(
481 space: &KnowledgeSpaceRecord,
482 space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
483 project_id: &str,
484 ) -> bool {
485 if space.scope != space_ref.scope {
486 return false;
487 }
488 match space_ref.scope {
489 KnowledgeScope::Project | KnowledgeScope::Run => {
490 let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
491 if space.project_id.as_deref() != Some(target_project) {
492 return false;
493 }
494 }
495 KnowledgeScope::Global => {}
496 }
497 if let Some(namespace) = space_ref.namespace.as_deref() {
498 if space.namespace.as_deref() != Some(namespace) {
499 return false;
500 }
501 }
502 true
503 }
504
505 fn select_preflight_namespace(
506 binding: &KnowledgeBinding,
507 spaces: &[KnowledgeSpaceRecord],
508 ) -> Option<String> {
509 if let Some(namespace) = binding.namespace.clone() {
510 return Some(namespace);
511 }
512 if binding.read_spaces.len() == 1 {
513 if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
514 return Some(namespace);
515 }
516 }
517 if spaces.len() == 1 {
518 return spaces[0].namespace.clone();
519 }
520 let mut unique = HashSet::new();
521 for space in spaces {
522 if let Some(namespace) = space.namespace.as_ref() {
523 unique.insert(namespace);
524 }
525 }
526 if unique.len() == 1 {
527 unique.into_iter().next().map(|value| value.to_string())
528 } else {
529 None
530 }
531 }
532
533 fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
534 !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
535 }
536
537 fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
538 match (space_namespace, binding_namespace) {
539 (None, None) => true,
540 (Some(space), Some(binding)) => {
541 normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
542 }
543 _ => false,
544 }
545 }
546
547 fn is_fresh_enough(
548 freshness_expires_at_ms: Option<u64>,
549 freshness_policy_ms: Option<u64>,
550 coverage_last_promoted_at_ms: Option<u64>,
551 item_created_at_ms: u64,
552 now_ms: u64,
553 ) -> bool {
554 if let Some(expires_at_ms) = freshness_expires_at_ms {
555 return expires_at_ms > now_ms;
556 }
557 let Some(policy_ms) = freshness_policy_ms else {
558 return true;
559 };
560 let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
561 now_ms.saturating_sub(basis_ms) <= policy_ms
562 }
563
564 async fn resolve_preflight_spaces(
565 &self,
566 request: &KnowledgePreflightRequest,
567 _coverage_key: &str,
568 tenant_scope: &MemoryTenantScope,
569 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
570 let binding = &request.binding;
571 let mut spaces = Vec::new();
572 let mut seen_space_ids = HashSet::new();
573
574 let push_space = |space: KnowledgeSpaceRecord,
575 spaces: &mut Vec<KnowledgeSpaceRecord>,
576 seen_space_ids: &mut HashSet<String>| {
577 if seen_space_ids.insert(space.id.clone()) {
578 spaces.push(space);
579 }
580 };
581
582 if Self::binding_uses_explicit_spaces(binding) {
583 for space_ref in binding
584 .read_spaces
585 .iter()
586 .chain(binding.promote_spaces.iter())
587 {
588 if let Some(space_id) = space_ref.space_id.as_deref() {
589 if let Some(space) = self
590 .get_knowledge_space_for_tenant(space_id, tenant_scope)
591 .await?
592 {
593 push_space(space, &mut spaces, &mut seen_space_ids);
594 }
595 continue;
596 }
597
598 match space_ref.scope {
599 KnowledgeScope::Run => {}
600 KnowledgeScope::Project => {
601 let candidate_project_id = space_ref
602 .project_id
603 .as_deref()
604 .unwrap_or(&request.project_id);
605 let project_spaces = self
606 .list_knowledge_spaces_for_tenant(
607 Some(candidate_project_id),
608 tenant_scope,
609 )
610 .await?;
611 for space in project_spaces.into_iter().filter(|space| {
612 Self::space_matches_ref(space, space_ref, &request.project_id)
613 }) {
614 push_space(space, &mut spaces, &mut seen_space_ids);
615 }
616 }
617 KnowledgeScope::Global => {
618 let global_spaces = self
619 .list_knowledge_spaces_for_tenant(None, tenant_scope)
620 .await?;
621 for space in global_spaces.into_iter().filter(|space| {
622 Self::space_matches_ref(space, space_ref, &request.project_id)
623 }) {
624 push_space(space, &mut spaces, &mut seen_space_ids);
625 }
626 }
627 }
628 }
629 return Ok(spaces);
630 }
631
632 if request.project_id.trim().is_empty() {
633 return Ok(spaces);
634 }
635
636 let project_spaces = self
637 .list_knowledge_spaces_for_tenant(Some(&request.project_id), tenant_scope)
638 .await?;
639 let requested_namespace = if binding.namespace.is_some() {
640 binding.namespace.clone()
641 } else {
642 Self::select_preflight_namespace(binding, &project_spaces)
643 };
644 let Some(requested_namespace) = requested_namespace else {
645 return Ok(spaces);
646 };
647
648 for space in project_spaces.into_iter().filter(|space| {
649 space.scope == KnowledgeScope::Project
650 && Self::namespace_matches(
651 space.namespace.as_deref(),
652 Some(requested_namespace.as_str()),
653 )
654 }) {
655 push_space(space, &mut spaces, &mut seen_space_ids);
656 }
657 Ok(spaces)
658 }
659
660 pub async fn preflight_knowledge(
661 &self,
662 request: &KnowledgePreflightRequest,
663 ) -> MemoryResult<KnowledgePreflightResult> {
664 self.preflight_knowledge_for_tenant(request, &MemoryTenantScope::local())
665 .await
666 }
667
668 pub async fn preflight_knowledge_for_tenant(
669 &self,
670 request: &KnowledgePreflightRequest,
671 tenant_scope: &MemoryTenantScope,
672 ) -> MemoryResult<KnowledgePreflightResult> {
673 let binding = &request.binding;
674 let project_spaces = if request.project_id.trim().is_empty() {
675 Vec::new()
676 } else {
677 self.list_knowledge_spaces_for_tenant(Some(&request.project_id), tenant_scope)
678 .await?
679 };
680 let namespace = binding
681 .namespace
682 .clone()
683 .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
684 let coverage_key = build_knowledge_coverage_key(
685 &request.project_id,
686 namespace.as_deref(),
687 &request.task_family,
688 &request.subject,
689 );
690
691 if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
692 return Ok(KnowledgePreflightResult {
693 project_id: request.project_id.clone(),
694 namespace,
695 task_family: request.task_family.clone(),
696 subject: request.subject.clone(),
697 coverage_key,
698 decision: KnowledgeReuseDecision::Disabled,
699 reuse_reason: None,
700 skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
701 freshness_reason: None,
702 items: Vec::new(),
703 });
704 }
705
706 let spaces = self
707 .resolve_preflight_spaces(request, &coverage_key, tenant_scope)
708 .await?;
709 if spaces.is_empty() {
710 return Ok(KnowledgePreflightResult {
711 project_id: request.project_id.clone(),
712 namespace,
713 task_family: request.task_family.clone(),
714 subject: request.subject.clone(),
715 coverage_key,
716 decision: KnowledgeReuseDecision::NoPriorKnowledge,
717 reuse_reason: None,
718 skip_reason: Some("no reusable knowledge spaces were found".to_string()),
719 freshness_reason: None,
720 items: Vec::new(),
721 });
722 }
723
724 let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
725 let mut fresh_items = Vec::new();
726 let mut stale_items = Vec::new();
727 let mut freshest_reason = None;
728
729 for space in &spaces {
730 let items = self
731 .list_knowledge_items_for_tenant(&space.id, Some(&coverage_key), tenant_scope)
732 .await?;
733 let coverage = self
734 .get_knowledge_coverage_for_tenant(&coverage_key, &space.id, tenant_scope)
735 .await?;
736 for item in items {
737 if !item.status.is_active() {
738 continue;
739 }
740 let Some(trust_level) = item.status.as_trust_level() else {
741 continue;
742 };
743 if !trust_level.meets_floor(binding.trust_floor) {
744 continue;
745 }
746 let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
747 coverage
748 .as_ref()
749 .and_then(|coverage| coverage.freshness_expires_at_ms)
750 });
751 let pack_item = KnowledgePackItem {
752 item_id: item.id.clone(),
753 space_id: space.id.clone(),
754 coverage_key: item.coverage_key.clone(),
755 title: item.title.clone(),
756 summary: item.summary.clone(),
757 trust_level,
758 status: item.status.to_string(),
759 artifact_refs: item.artifact_refs.clone(),
760 source_memory_ids: item.source_memory_ids.clone(),
761 freshness_expires_at_ms,
762 };
763 if Self::is_fresh_enough(
764 freshness_expires_at_ms,
765 binding.freshness_ms,
766 coverage
767 .as_ref()
768 .and_then(|coverage| coverage.last_promoted_at_ms),
769 item.created_at_ms,
770 now_ms,
771 ) {
772 fresh_items.push(pack_item);
773 } else {
774 freshest_reason = Some(match freshness_expires_at_ms {
775 Some(expires_at_ms) => format!(
776 "coverage `{}` in space `{}` expired at {}",
777 coverage_key, space.id, expires_at_ms
778 ),
779 None => format!(
780 "coverage `{}` in space `{}` lacks freshness metadata",
781 coverage_key, space.id
782 ),
783 });
784 stale_items.push(pack_item);
785 }
786 }
787 }
788
789 fresh_items.sort_by(|left, right| {
790 right
791 .trust_level
792 .rank()
793 .cmp(&left.trust_level.rank())
794 .then_with(|| {
795 right
796 .freshness_expires_at_ms
797 .unwrap_or(0)
798 .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
799 })
800 .then_with(|| left.title.cmp(&right.title))
801 });
802 stale_items.sort_by(|left, right| {
803 right
804 .trust_level
805 .rank()
806 .cmp(&left.trust_level.rank())
807 .then_with(|| left.title.cmp(&right.title))
808 });
809
810 if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
811 let selected = fresh_items
812 .into_iter()
813 .take(MAX_KNOWLEDGE_PACK_ITEMS)
814 .collect::<Vec<_>>();
815 let decision = match freshest_trust_level {
816 KnowledgeTrustLevel::ApprovedDefault => {
817 KnowledgeReuseDecision::ReuseApprovedDefault
818 }
819 _ => KnowledgeReuseDecision::ReusePromoted,
820 };
821 let selected_count = selected.len();
822 return Ok(KnowledgePreflightResult {
823 project_id: request.project_id.clone(),
824 namespace,
825 task_family: request.task_family.clone(),
826 subject: request.subject.clone(),
827 coverage_key,
828 decision,
829 reuse_reason: Some(format!(
830 "reusing {} promoted knowledge item(s) from {} space(s)",
831 selected_count,
832 spaces.len()
833 )),
834 skip_reason: None,
835 freshness_reason: None,
836 items: selected,
837 });
838 }
839
840 if !stale_items.is_empty() {
841 let selected = stale_items
842 .into_iter()
843 .take(MAX_KNOWLEDGE_PACK_ITEMS)
844 .collect::<Vec<_>>();
845 return Ok(KnowledgePreflightResult {
846 project_id: request.project_id.clone(),
847 namespace,
848 task_family: request.task_family.clone(),
849 subject: request.subject.clone(),
850 coverage_key,
851 decision: KnowledgeReuseDecision::RefreshRequired,
852 reuse_reason: None,
853 skip_reason: Some(
854 "prior knowledge exists but is not fresh enough to reuse".to_string(),
855 ),
856 freshness_reason: freshest_reason.or_else(|| {
857 Some("matching knowledge exists but freshness policy rejected it".to_string())
858 }),
859 items: selected,
860 });
861 }
862
863 Ok(KnowledgePreflightResult {
864 project_id: request.project_id.clone(),
865 namespace,
866 task_family: request.task_family.clone(),
867 subject: request.subject.clone(),
868 coverage_key,
869 decision: KnowledgeReuseDecision::NoPriorKnowledge,
870 reuse_reason: None,
871 skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
872 freshness_reason: None,
873 items: Vec::new(),
874 })
875 }
876
877 pub async fn retrieve_context(
882 &self,
883 query: &str,
884 project_id: Option<&str>,
885 session_id: Option<&str>,
886 token_budget: Option<i64>,
887 ) -> MemoryResult<MemoryContext> {
888 self.retrieve_context_for_tenant(
889 query,
890 project_id,
891 session_id,
892 &MemoryTenantScope::local(),
893 token_budget,
894 )
895 .await
896 }
897
898 pub async fn retrieve_context_for_tenant(
899 &self,
900 query: &str,
901 project_id: Option<&str>,
902 session_id: Option<&str>,
903 tenant_scope: &MemoryTenantScope,
904 token_budget: Option<i64>,
905 ) -> MemoryResult<MemoryContext> {
906 let (context, _) = self
907 .retrieve_context_with_meta_for_tenant(
908 query,
909 project_id,
910 session_id,
911 tenant_scope,
912 token_budget,
913 )
914 .await?;
915 Ok(context)
916 }
917
918 pub async fn retrieve_context_with_meta(
920 &self,
921 query: &str,
922 project_id: Option<&str>,
923 session_id: Option<&str>,
924 token_budget: Option<i64>,
925 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
926 self.retrieve_context_with_meta_for_tenant(
927 query,
928 project_id,
929 session_id,
930 &MemoryTenantScope::local(),
931 token_budget,
932 )
933 .await
934 }
935
936 pub async fn retrieve_context_with_meta_for_tenant(
937 &self,
938 query: &str,
939 project_id: Option<&str>,
940 session_id: Option<&str>,
941 tenant_scope: &MemoryTenantScope,
942 token_budget: Option<i64>,
943 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
944 let config = if let Some(pid) = project_id {
945 self.db
946 .get_or_create_config_for_tenant(pid, tenant_scope)
947 .await?
948 } else {
949 MemoryConfig::default()
950 };
951 let budget = token_budget.unwrap_or(config.token_budget);
952 let retrieval_limit = config.retrieval_k.max(1);
953
954 let current_session = if let Some(sid) = session_id {
956 self.db
957 .get_session_chunks_for_tenant(sid, tenant_scope)
958 .await?
959 } else {
960 Vec::new()
961 };
962
963 let search_results = self
965 .search_for_tenant(
966 query,
967 None,
968 project_id,
969 session_id,
970 tenant_scope,
971 Some(retrieval_limit),
972 )
973 .await?;
974
975 let mut score_min: Option<f64> = None;
976 let mut score_max: Option<f64> = None;
977 for result in &search_results {
978 score_min = Some(match score_min {
979 Some(current) => current.min(result.similarity),
980 None => result.similarity,
981 });
982 score_max = Some(match score_max {
983 Some(current) => current.max(result.similarity),
984 None => result.similarity,
985 });
986 }
987
988 let mut current_session = current_session;
989 let mut relevant_history = Vec::new();
990 let mut project_facts = Vec::new();
991
992 for result in search_results {
993 match result.chunk.tier {
994 MemoryTier::Project => {
995 project_facts.push(result.chunk);
996 }
997 MemoryTier::Global => {
998 project_facts.push(result.chunk);
999 }
1000 MemoryTier::Session => {
1001 if !current_session.iter().any(|c| c.id == result.chunk.id) {
1003 relevant_history.push(result.chunk);
1004 }
1005 }
1006 }
1007 }
1008
1009 let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
1011 total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
1012 total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
1013
1014 if total_tokens > budget {
1016 let excess = total_tokens - budget;
1017 self.trim_context(
1018 &mut current_session,
1019 &mut relevant_history,
1020 &mut project_facts,
1021 excess,
1022 )?;
1023 total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
1024 + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
1025 + project_facts.iter().map(|c| c.token_count).sum::<i64>();
1026 }
1027
1028 let context = MemoryContext {
1029 current_session,
1030 relevant_history,
1031 project_facts,
1032 total_tokens,
1033 };
1034 let chunks_total = context.current_session.len()
1035 + context.relevant_history.len()
1036 + context.project_facts.len();
1037 let meta = MemoryRetrievalMeta {
1038 used: chunks_total > 0,
1039 chunks_total,
1040 session_chunks: context.current_session.len(),
1041 history_chunks: context.relevant_history.len(),
1042 project_fact_chunks: context.project_facts.len(),
1043 score_min,
1044 score_max,
1045 };
1046
1047 Ok((context, meta))
1048 }
1049
1050 fn trim_context(
1052 &self,
1053 current_session: &mut Vec<MemoryChunk>,
1054 relevant_history: &mut Vec<MemoryChunk>,
1055 project_facts: &mut Vec<MemoryChunk>,
1056 excess_tokens: i64,
1057 ) -> MemoryResult<()> {
1058 let mut tokens_to_remove = excess_tokens;
1059
1060 while tokens_to_remove > 0 && !relevant_history.is_empty() {
1062 if let Some(chunk) = relevant_history.pop() {
1063 tokens_to_remove -= chunk.token_count;
1064 }
1065 }
1066
1067 while tokens_to_remove > 0 && !project_facts.is_empty() {
1069 if let Some(chunk) = project_facts.pop() {
1070 tokens_to_remove -= chunk.token_count;
1071 }
1072 }
1073
1074 while tokens_to_remove > 0 && !current_session.is_empty() {
1075 if let Some(chunk) = current_session.pop() {
1076 tokens_to_remove -= chunk.token_count;
1077 }
1078 }
1079
1080 Ok(())
1081 }
1082
1083 pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
1085 self.clear_session_for_tenant(session_id, &MemoryTenantScope::local())
1086 .await
1087 }
1088
1089 pub async fn clear_session_for_tenant(
1090 &self,
1091 session_id: &str,
1092 tenant_scope: &MemoryTenantScope,
1093 ) -> MemoryResult<u64> {
1094 let count = self
1095 .db
1096 .clear_session_memory_for_tenant(session_id, tenant_scope)
1097 .await?;
1098
1099 self.db
1101 .log_cleanup_for_tenant(
1102 "manual",
1103 MemoryTier::Session,
1104 None,
1105 Some(session_id),
1106 count as i64,
1107 0,
1108 tenant_scope,
1109 )
1110 .await?;
1111
1112 Ok(count)
1113 }
1114
1115 pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
1117 self.clear_project_for_tenant(project_id, &MemoryTenantScope::local())
1118 .await
1119 }
1120
1121 pub async fn clear_project_for_tenant(
1122 &self,
1123 project_id: &str,
1124 tenant_scope: &MemoryTenantScope,
1125 ) -> MemoryResult<u64> {
1126 let count = self
1127 .db
1128 .clear_project_memory_for_tenant(project_id, tenant_scope)
1129 .await?;
1130
1131 self.db
1133 .log_cleanup_for_tenant(
1134 "manual",
1135 MemoryTier::Project,
1136 Some(project_id),
1137 None,
1138 count as i64,
1139 0,
1140 tenant_scope,
1141 )
1142 .await?;
1143
1144 Ok(count)
1145 }
1146
1147 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1149 self.db.get_stats().await
1150 }
1151
1152 pub async fn get_stats_for_tenant(
1153 &self,
1154 tenant_scope: &MemoryTenantScope,
1155 ) -> MemoryResult<MemoryStats> {
1156 self.db.get_stats_for_tenant(tenant_scope).await
1157 }
1158
1159 pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1161 self.db.get_or_create_config(project_id).await
1162 }
1163
1164 pub async fn get_config_for_tenant(
1165 &self,
1166 project_id: &str,
1167 tenant_scope: &MemoryTenantScope,
1168 ) -> MemoryResult<MemoryConfig> {
1169 self.db
1170 .get_or_create_config_for_tenant(project_id, tenant_scope)
1171 .await
1172 }
1173
1174 pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1176 self.db.update_config(project_id, config).await
1177 }
1178
1179 pub async fn set_config_for_tenant(
1180 &self,
1181 project_id: &str,
1182 config: &MemoryConfig,
1183 tenant_scope: &MemoryTenantScope,
1184 ) -> MemoryResult<()> {
1185 self.db
1186 .update_config_for_tenant(project_id, config, tenant_scope)
1187 .await
1188 }
1189
1190 pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
1191 self.db.get_node_by_uri(uri).await
1192 }
1193
1194 pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
1195 let nodes = self.db.list_directory(uri).await?;
1196 let directories: Vec<MemoryNode> = nodes
1197 .iter()
1198 .filter(|n| n.node_type == NodeType::Directory)
1199 .cloned()
1200 .collect();
1201 let files: Vec<MemoryNode> = nodes
1202 .iter()
1203 .filter(|n| n.node_type == NodeType::File)
1204 .cloned()
1205 .collect();
1206
1207 Ok(DirectoryListing {
1208 uri: uri.to_string(),
1209 nodes,
1210 total_children: directories.len() + files.len(),
1211 directories,
1212 files,
1213 })
1214 }
1215
1216 pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
1217 self.db.get_children_tree(uri, max_depth).await
1218 }
1219
1220 pub async fn create_context_node(
1221 &self,
1222 uri: &str,
1223 node_type: NodeType,
1224 metadata: Option<serde_json::Value>,
1225 ) -> MemoryResult<String> {
1226 let parsed_uri =
1227 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
1228 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1229 self.db
1230 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1231 .await
1232 }
1233
1234 pub async fn get_context_layer(
1235 &self,
1236 node_id: &str,
1237 layer_type: LayerType,
1238 ) -> MemoryResult<Option<MemoryLayer>> {
1239 self.db.get_layer(node_id, layer_type).await
1240 }
1241
1242 pub async fn store_content_with_layers(
1243 &self,
1244 uri: &str,
1245 content: &str,
1246 metadata: Option<serde_json::Value>,
1247 ) -> MemoryResult<String> {
1248 let parsed_uri =
1249 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
1250 let node_type = if parsed_uri
1251 .last_segment()
1252 .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
1253 .unwrap_or(false)
1254 {
1255 NodeType::File
1256 } else {
1257 NodeType::Directory
1258 };
1259
1260 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1261 let node_id = self
1262 .db
1263 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1264 .await?;
1265
1266 let token_count = self.tokenizer.count_tokens(content) as i64;
1267 self.db
1268 .create_layer(&node_id, LayerType::L2, content, token_count, None)
1269 .await?;
1270
1271 Ok(node_id)
1272 }
1273
1274 pub async fn generate_layers_for_node(
1275 &self,
1276 node_id: &str,
1277 providers: &ProviderRegistry,
1278 ) -> MemoryResult<()> {
1279 let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1280 let l2_content = match l2_layer {
1281 Some(layer) => layer.content,
1282 None => return Ok(()),
1283 };
1284
1285 let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1286
1287 let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1288
1289 let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1290 let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1291
1292 if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1293 self.db
1294 .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1295 .await?;
1296 }
1297
1298 if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1299 self.db
1300 .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1301 .await?;
1302 }
1303
1304 Ok(())
1305 }
1306
1307 pub async fn get_layer_content(
1308 &self,
1309 node_id: &str,
1310 layer_type: LayerType,
1311 ) -> MemoryResult<Option<String>> {
1312 let layer = self.db.get_layer(node_id, layer_type).await?;
1313 Ok(layer.map(|l| l.content))
1314 }
1315
1316 pub async fn store_content_with_layers_auto(
1317 &self,
1318 uri: &str,
1319 content: &str,
1320 metadata: Option<serde_json::Value>,
1321 providers: Option<&ProviderRegistry>,
1322 ) -> MemoryResult<String> {
1323 let node_id = self
1324 .store_content_with_layers(uri, content, metadata)
1325 .await?;
1326
1327 if let Some(p) = providers {
1328 if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1329 tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1330 }
1331 }
1332
1333 Ok(node_id)
1334 }
1335
1336 pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1338 self.run_cleanup_for_tenant(project_id, &MemoryTenantScope::local())
1339 .await
1340 }
1341
1342 pub async fn run_cleanup_for_tenant(
1343 &self,
1344 project_id: Option<&str>,
1345 tenant_scope: &MemoryTenantScope,
1346 ) -> MemoryResult<u64> {
1347 let mut total_cleaned = 0u64;
1348
1349 if let Some(pid) = project_id {
1350 let config = self
1352 .db
1353 .get_or_create_config_for_tenant(pid, tenant_scope)
1354 .await?;
1355
1356 if config.auto_cleanup {
1357 let cleaned = self
1359 .db
1360 .cleanup_old_sessions_for_tenant(config.session_retention_days, tenant_scope)
1361 .await?;
1362 total_cleaned += cleaned;
1363
1364 if cleaned > 0 {
1365 self.db
1366 .log_cleanup_for_tenant(
1367 "auto",
1368 MemoryTier::Session,
1369 Some(pid),
1370 None,
1371 cleaned as i64,
1372 0,
1373 tenant_scope,
1374 )
1375 .await?;
1376 }
1377 }
1378 } else {
1379 let cleaned = self
1383 .db
1384 .cleanup_old_sessions_for_tenant(30, tenant_scope)
1385 .await?;
1386 total_cleaned += cleaned;
1387 }
1388
1389 if total_cleaned > 100 {
1391 self.db.vacuum().await?;
1392 }
1393
1394 Ok(total_cleaned)
1395 }
1396
1397 async fn maybe_cleanup(
1399 &self,
1400 project_id: &Option<String>,
1401 tenant_scope: &MemoryTenantScope,
1402 ) -> MemoryResult<()> {
1403 if let Some(pid) = project_id {
1404 let stats = self.db.get_stats_for_tenant(tenant_scope).await?;
1405 let config = self
1406 .db
1407 .get_or_create_config_for_tenant(pid, tenant_scope)
1408 .await?;
1409
1410 if stats.project_chunks > config.max_chunks {
1412 let excess = stats.project_chunks - config.max_chunks;
1414 tracing::info!("Project {} has {} excess chunks", pid, excess);
1417 }
1418 }
1419
1420 Ok(())
1421 }
1422
1423 pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1425 Ok(Vec::new())
1428 }
1429
1430 pub fn count_tokens(&self, text: &str) -> usize {
1432 self.tokenizer.count_tokens(text)
1433 }
1434
1435 pub async fn embedding_health(&self) -> EmbeddingHealth {
1437 let service = self.embedding_service.lock().await;
1438 if service.is_available() {
1439 EmbeddingHealth {
1440 status: "ok".to_string(),
1441 reason: None,
1442 }
1443 } else {
1444 EmbeddingHealth {
1445 status: "degraded_disabled".to_string(),
1446 reason: service.disabled_reason().map(ToString::to_string),
1447 }
1448 }
1449 }
1450
1451 pub async fn consolidate_session(
1453 &self,
1454 session_id: &str,
1455 project_id: Option<&str>,
1456 providers: &ProviderRegistry,
1457 config: &MemoryConsolidationConfig,
1458 ) -> MemoryResult<Option<String>> {
1459 if !config.enabled {
1460 return Ok(None);
1461 }
1462
1463 let chunks = self.db.get_session_chunks(session_id).await?;
1464 if chunks.is_empty() {
1465 return Ok(None);
1466 }
1467
1468 let mut text_parts = Vec::new();
1470 for chunk in &chunks {
1471 text_parts.push(chunk.content.clone());
1472 }
1473 let full_text = text_parts.join("\n\n---\n\n");
1474
1475 let prompt = format!(
1477 "Please provide a concise but comprehensive summary of the following chat session. \
1478 Focus on the key decisions, technical details, code changes, and unresolved issues. \
1479 Do NOT include conversational filler, greetings, or sign-offs. \
1480 This summary will be used as long-term memory to recall the context of this work.\n\n\
1481 Session transcripts:\n\n{}",
1482 full_text
1483 );
1484
1485 let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1486 let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1487
1488 let summary_text = match providers
1489 .complete_cheapest(&prompt, provider_override, model_override)
1490 .await
1491 {
1492 Ok(s) => s,
1493 Err(e) => {
1494 tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1495 return Ok(None);
1496 }
1497 };
1498
1499 if summary_text.trim().is_empty() {
1500 return Ok(None);
1501 }
1502
1503 let embedding = {
1505 let service = self.embedding_service.lock().await;
1506 service
1507 .embed(&summary_text)
1508 .await
1509 .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1510 };
1511
1512 let chunk_id = uuid::Uuid::new_v4().to_string();
1514 let chunk = MemoryChunk {
1515 id: chunk_id,
1516 content: summary_text.clone(),
1517 tier: MemoryTier::Project,
1518 session_id: None, project_id: project_id.map(ToString::to_string),
1520 created_at: Utc::now(),
1521 source: "consolidation".to_string(),
1522 token_count: self.count_tokens(&summary_text) as i64,
1523 source_path: None,
1524 source_mtime: None,
1525 source_size: None,
1526 source_hash: None,
1527 tenant_scope: MemoryTenantScope::local(),
1528 metadata: None,
1529 };
1530
1531 self.db.store_chunk(&chunk, &embedding).await?;
1532
1533 self.db.clear_session_memory(session_id).await?;
1535
1536 tracing::info!(
1537 "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1538 );
1539
1540 Ok(Some(summary_text))
1541 }
1542}
1543
1544pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1546 let db_path = app_data_dir.join("tandem_memory.db");
1547 MemoryManager::new(&db_path).await
1548}