1use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::envelope::validate_memory_envelope_for_write;
10use crate::types::{
11 CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
12 KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
13 LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
14 MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTenantScope,
15 MemoryTier, NodeType, StoreMessageRequest, TreeNode,
16};
17use chrono::Utc;
18use std::collections::HashSet;
19use std::path::Path;
20use std::sync::Arc;
21use tandem_orchestrator::{
22 build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
23 KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
24 KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
25};
26use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
27use tokio::sync::Mutex;
28
29pub struct MemoryManager {
31 db: Arc<MemoryDatabase>,
32 embedding_service: Arc<Mutex<EmbeddingService>>,
33 tokenizer: Tokenizer,
34}
35
36const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
37const GUIDE_DOC_SOURCE_PREFIX: &str = "guide_docs:";
38const GUIDE_DOC_RECENCY_HALFLIFE_MS: f64 = 30.0 * 24.0 * 60.0 * 60.0 * 1000.0;
39const GUIDE_DOC_RECENCY_WEIGHT: f64 = 0.12;
40const ACCESS_FILTER_CANDIDATE_MULTIPLIER: i64 = 5;
41
42impl MemoryManager {
43 fn guide_doc_similarity(similarity: f64, chunk: &MemoryChunk, now_ms: i64) -> f64 {
44 if !chunk.source.starts_with(GUIDE_DOC_SOURCE_PREFIX) {
45 return similarity.clamp(0.0, 1.0);
46 }
47
48 let normalized_similarity = similarity.clamp(0.0, 1.0);
49 let source_mtime = chunk
50 .source_mtime
51 .filter(|value| *value > 0)
52 .unwrap_or_else(|| chunk.created_at.timestamp_millis());
53 let age_ms = (now_ms - source_mtime).max(0) as f64;
54 let recency_score = 1.0 / (1.0 + (age_ms / GUIDE_DOC_RECENCY_HALFLIFE_MS));
55 ((1.0 - GUIDE_DOC_RECENCY_WEIGHT) * normalized_similarity
56 + (GUIDE_DOC_RECENCY_WEIGHT * recency_score))
57 .clamp(0.0, 1.0)
58 }
59
60 fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
61 err.to_string()
62 .to_lowercase()
63 .contains("database disk image is malformed")
64 }
65
66 pub fn db(&self) -> &Arc<MemoryDatabase> {
67 &self.db
68 }
69
70 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
72 Self::new_with_embedding_service(db_path, EmbeddingService::new()).await
73 }
74
75 pub async fn new_with_embedding_service(
78 db_path: &Path,
79 embedding_service: EmbeddingService,
80 ) -> MemoryResult<Self> {
81 let db = Arc::new(MemoryDatabase::new(db_path).await?);
82 let embedding_service = Arc::new(Mutex::new(embedding_service));
83 let tokenizer = Tokenizer::new()?;
84
85 Ok(Self {
86 db,
87 embedding_service,
88 tokenizer,
89 })
90 }
91
92 pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
99 validate_memory_envelope_for_write(&request.tenant_scope, request.metadata.as_ref())?;
100
101 if self
102 .db
103 .ensure_vector_tables_healthy()
104 .await
105 .unwrap_or(false)
106 {
107 tracing::warn!("Memory vector tables were repaired before storing message chunks");
108 }
109
110 let config = if let Some(ref pid) = request.project_id {
111 self.db
112 .get_or_create_config_for_tenant(pid, &request.tenant_scope)
113 .await?
114 } else {
115 MemoryConfig::default()
116 };
117
118 let chunking_config = ChunkingConfig {
120 chunk_size: config.chunk_size as usize,
121 chunk_overlap: config.chunk_overlap as usize,
122 separator: None,
123 };
124
125 let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
126
127 if text_chunks.is_empty() {
128 return Ok(Vec::new());
129 }
130
131 let mut chunk_ids = Vec::with_capacity(text_chunks.len());
132 let embedding_service = self.embedding_service.lock().await;
133
134 for text_chunk in text_chunks {
135 let chunk_id = uuid::Uuid::new_v4().to_string();
136
137 let embedding = embedding_service.embed(&text_chunk.content).await?;
139
140 let chunk = MemoryChunk {
142 id: chunk_id.clone(),
143 content: text_chunk.content,
144 tier: request.tier,
145 session_id: request.session_id.clone(),
146 project_id: request.project_id.clone(),
147 source: request.source.clone(),
148 source_path: request.source_path.clone(),
149 source_mtime: request.source_mtime,
150 source_size: request.source_size,
151 source_hash: request.source_hash.clone(),
152 tenant_scope: request.tenant_scope.clone(),
153 created_at: Utc::now(),
154 token_count: text_chunk.token_count as i64,
155 metadata: request.metadata.clone(),
156 };
157
158 if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
160 tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
161 let repaired = {
162 let repaired_after_error =
163 self.db.try_repair_after_error(&err).await.unwrap_or(false);
164 repaired_after_error
165 || self
166 .db
167 .ensure_vector_tables_healthy()
168 .await
169 .unwrap_or(false)
170 };
171 if repaired {
172 tracing::warn!(
173 "Retrying memory chunk insert after vector table repair: {}",
174 chunk.id
175 );
176 if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
177 if Self::is_malformed_database_error(&retry_err) {
178 tracing::warn!(
179 "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
180 chunk.id
181 );
182 self.db.reset_all_memory_tables().await?;
183 self.db.store_chunk(&chunk, &embedding).await?;
184 } else {
185 return Err(retry_err);
186 }
187 }
188 } else {
189 return Err(err);
190 }
191 }
192 chunk_ids.push(chunk_id);
193 }
194
195 if config.auto_cleanup {
197 self.maybe_cleanup(&request.project_id, &request.tenant_scope)
198 .await?;
199 }
200
201 Ok(chunk_ids)
202 }
203
204 pub async fn search(
206 &self,
207 query: &str,
208 tier: Option<MemoryTier>,
209 project_id: Option<&str>,
210 session_id: Option<&str>,
211 limit: Option<i64>,
212 ) -> MemoryResult<Vec<MemorySearchResult>> {
213 self.search_for_tenant(
214 query,
215 tier,
216 project_id,
217 session_id,
218 &MemoryTenantScope::local(),
219 limit,
220 )
221 .await
222 }
223
224 pub async fn search_for_tenant(
225 &self,
226 query: &str,
227 tier: Option<MemoryTier>,
228 project_id: Option<&str>,
229 session_id: Option<&str>,
230 tenant_scope: &MemoryTenantScope,
231 limit: Option<i64>,
232 ) -> MemoryResult<Vec<MemorySearchResult>> {
233 self.search_for_tenant_with_access_filter(
234 query,
235 tier,
236 project_id,
237 session_id,
238 tenant_scope,
239 limit,
240 None,
241 )
242 .await
243 }
244
245 #[allow(clippy::too_many_arguments)]
246 pub async fn search_for_tenant_with_access_filter(
247 &self,
248 query: &str,
249 tier: Option<MemoryTier>,
250 project_id: Option<&str>,
251 session_id: Option<&str>,
252 tenant_scope: &MemoryTenantScope,
253 limit: Option<i64>,
254 access_filter: Option<&crate::types::MemoryAccessFilter>,
255 ) -> MemoryResult<Vec<MemorySearchResult>> {
256 let effective_limit = limit.unwrap_or(5);
257 let candidate_limit = if access_filter.is_some() {
258 effective_limit.saturating_mul(ACCESS_FILTER_CANDIDATE_MULTIPLIER)
259 } else {
260 effective_limit
261 };
262
263 let embedding_service = self.embedding_service.lock().await;
265 let query_embedding = embedding_service.embed(query).await?;
266 drop(embedding_service);
267
268 let mut results = Vec::new();
269
270 let tiers_to_search = match tier {
272 Some(t) => vec![t],
273 None => {
274 if project_id.is_some() {
275 vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
276 } else {
277 vec![MemoryTier::Session, MemoryTier::Global]
278 }
279 }
280 };
281
282 let now_ms = Utc::now().timestamp_millis();
283 for search_tier in tiers_to_search {
284 let tier_results = match self
285 .db
286 .search_similar_for_tenant(
287 &query_embedding,
288 search_tier,
289 project_id,
290 session_id,
291 tenant_scope,
292 candidate_limit,
293 )
294 .await
295 {
296 Ok(results) => results,
297 Err(err) => {
298 tracing::warn!(
299 "Memory tier search failed for {:?}: {}. Attempting vector repair.",
300 search_tier,
301 err
302 );
303 let repaired = {
304 let repaired_after_error =
305 self.db.try_repair_after_error(&err).await.unwrap_or(false);
306 repaired_after_error
307 || self
308 .db
309 .ensure_vector_tables_healthy()
310 .await
311 .unwrap_or(false)
312 };
313 if repaired {
314 match self
315 .db
316 .search_similar_for_tenant(
317 &query_embedding,
318 search_tier,
319 project_id,
320 session_id,
321 tenant_scope,
322 candidate_limit,
323 )
324 .await
325 {
326 Ok(results) => results,
327 Err(retry_err) => {
328 tracing::warn!(
329 "Memory tier search still failing for {:?} after repair: {}",
330 search_tier,
331 retry_err
332 );
333 continue;
334 }
335 }
336 } else {
337 continue;
338 }
339 }
340 };
341
342 for (chunk, distance) in tier_results {
343 if !memory_chunk_visible_to_access_filter(&chunk, access_filter) {
344 continue;
345 }
346 let similarity = 1.0 - distance.clamp(0.0, 1.0);
350 let similarity = if search_tier == MemoryTier::Global {
351 Self::guide_doc_similarity(similarity, &chunk, now_ms)
352 } else {
353 similarity
354 };
355
356 results.push(MemorySearchResult { chunk, similarity });
357 }
358 }
359
360 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
362 results.truncate(effective_limit as usize);
363
364 Ok(results)
365 }
366
367 pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
368 validate_memory_envelope_for_write(&MemoryTenantScope::local(), space.metadata.as_ref())?;
369 self.db.upsert_knowledge_space(space).await
370 }
371
372 pub async fn upsert_knowledge_space_for_tenant(
373 &self,
374 space: &KnowledgeSpaceRecord,
375 tenant_scope: &MemoryTenantScope,
376 ) -> MemoryResult<()> {
377 validate_memory_envelope_for_write(tenant_scope, space.metadata.as_ref())?;
378 self.db
379 .upsert_knowledge_space_for_tenant(space, tenant_scope)
380 .await
381 }
382
383 pub async fn get_knowledge_space(
384 &self,
385 id: &str,
386 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
387 self.db.get_knowledge_space(id).await
388 }
389
390 pub async fn get_knowledge_space_for_tenant(
391 &self,
392 id: &str,
393 tenant_scope: &MemoryTenantScope,
394 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
395 self.db
396 .get_knowledge_space_for_tenant(id, tenant_scope)
397 .await
398 }
399
400 pub async fn list_knowledge_spaces(
401 &self,
402 project_id: Option<&str>,
403 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
404 self.db.list_knowledge_spaces(project_id).await
405 }
406
407 pub async fn list_knowledge_spaces_for_tenant(
408 &self,
409 project_id: Option<&str>,
410 tenant_scope: &MemoryTenantScope,
411 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
412 self.db
413 .list_knowledge_spaces_for_tenant(project_id, tenant_scope)
414 .await
415 }
416
417 pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
418 validate_memory_envelope_for_write(&MemoryTenantScope::local(), item.metadata.as_ref())?;
419 self.db.upsert_knowledge_item(item).await
420 }
421
422 pub async fn upsert_knowledge_item_for_tenant(
423 &self,
424 item: &KnowledgeItemRecord,
425 tenant_scope: &MemoryTenantScope,
426 ) -> MemoryResult<()> {
427 validate_memory_envelope_for_write(tenant_scope, item.metadata.as_ref())?;
428 self.db
429 .upsert_knowledge_item_for_tenant(item, tenant_scope)
430 .await
431 }
432
433 pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
434 self.db.get_knowledge_item(id).await
435 }
436
437 pub async fn get_knowledge_item_for_tenant(
438 &self,
439 id: &str,
440 tenant_scope: &MemoryTenantScope,
441 ) -> MemoryResult<Option<KnowledgeItemRecord>> {
442 self.db
443 .get_knowledge_item_for_tenant(id, tenant_scope)
444 .await
445 }
446
447 pub async fn list_knowledge_items(
448 &self,
449 space_id: &str,
450 coverage_key: Option<&str>,
451 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
452 self.db.list_knowledge_items(space_id, coverage_key).await
453 }
454
455 pub async fn list_knowledge_items_for_tenant(
456 &self,
457 space_id: &str,
458 coverage_key: Option<&str>,
459 tenant_scope: &MemoryTenantScope,
460 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
461 self.db
462 .list_knowledge_items_for_tenant(space_id, coverage_key, tenant_scope)
463 .await
464 }
465
466 pub async fn upsert_knowledge_coverage(
467 &self,
468 coverage: &KnowledgeCoverageRecord,
469 ) -> MemoryResult<()> {
470 validate_memory_envelope_for_write(
471 &MemoryTenantScope::local(),
472 coverage.metadata.as_ref(),
473 )?;
474 self.db.upsert_knowledge_coverage(coverage).await
475 }
476
477 pub async fn upsert_knowledge_coverage_for_tenant(
478 &self,
479 coverage: &KnowledgeCoverageRecord,
480 tenant_scope: &MemoryTenantScope,
481 ) -> MemoryResult<()> {
482 validate_memory_envelope_for_write(tenant_scope, coverage.metadata.as_ref())?;
483 self.db
484 .upsert_knowledge_coverage_for_tenant(coverage, tenant_scope)
485 .await
486 }
487
488 pub async fn get_knowledge_coverage(
489 &self,
490 coverage_key: &str,
491 space_id: &str,
492 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
493 self.db.get_knowledge_coverage(coverage_key, space_id).await
494 }
495
496 pub async fn get_knowledge_coverage_for_tenant(
497 &self,
498 coverage_key: &str,
499 space_id: &str,
500 tenant_scope: &MemoryTenantScope,
501 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
502 self.db
503 .get_knowledge_coverage_for_tenant(coverage_key, space_id, tenant_scope)
504 .await
505 }
506
507 pub async fn promote_knowledge_item(
508 &self,
509 request: &KnowledgePromotionRequest,
510 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
511 self.db.promote_knowledge_item(request).await
512 }
513
514 pub async fn promote_knowledge_item_for_tenant(
515 &self,
516 request: &KnowledgePromotionRequest,
517 tenant_scope: &MemoryTenantScope,
518 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
519 self.db
520 .promote_knowledge_item_for_tenant(request, tenant_scope)
521 .await
522 }
523
524 fn space_matches_ref(
525 space: &KnowledgeSpaceRecord,
526 space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
527 project_id: &str,
528 ) -> bool {
529 if space.scope != space_ref.scope {
530 return false;
531 }
532 match space_ref.scope {
533 KnowledgeScope::Project | KnowledgeScope::Run => {
534 let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
535 if space.project_id.as_deref() != Some(target_project) {
536 return false;
537 }
538 }
539 KnowledgeScope::Global => {}
540 }
541 if let Some(namespace) = space_ref.namespace.as_deref() {
542 if space.namespace.as_deref() != Some(namespace) {
543 return false;
544 }
545 }
546 true
547 }
548
549 fn select_preflight_namespace(
550 binding: &KnowledgeBinding,
551 spaces: &[KnowledgeSpaceRecord],
552 ) -> Option<String> {
553 if let Some(namespace) = binding.namespace.clone() {
554 return Some(namespace);
555 }
556 if binding.read_spaces.len() == 1 {
557 if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
558 return Some(namespace);
559 }
560 }
561 if spaces.len() == 1 {
562 return spaces[0].namespace.clone();
563 }
564 let mut unique = HashSet::new();
565 for space in spaces {
566 if let Some(namespace) = space.namespace.as_ref() {
567 unique.insert(namespace);
568 }
569 }
570 if unique.len() == 1 {
571 unique.into_iter().next().map(|value| value.to_string())
572 } else {
573 None
574 }
575 }
576
577 fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
578 !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
579 }
580
581 fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
582 match (space_namespace, binding_namespace) {
583 (None, None) => true,
584 (Some(space), Some(binding)) => {
585 normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
586 }
587 _ => false,
588 }
589 }
590
591 fn is_fresh_enough(
592 freshness_expires_at_ms: Option<u64>,
593 freshness_policy_ms: Option<u64>,
594 coverage_last_promoted_at_ms: Option<u64>,
595 item_created_at_ms: u64,
596 now_ms: u64,
597 ) -> bool {
598 if let Some(expires_at_ms) = freshness_expires_at_ms {
599 return expires_at_ms > now_ms;
600 }
601 let Some(policy_ms) = freshness_policy_ms else {
602 return true;
603 };
604 let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
605 now_ms.saturating_sub(basis_ms) <= policy_ms
606 }
607
608 async fn resolve_preflight_spaces(
609 &self,
610 request: &KnowledgePreflightRequest,
611 _coverage_key: &str,
612 tenant_scope: &MemoryTenantScope,
613 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
614 let binding = &request.binding;
615 let mut spaces = Vec::new();
616 let mut seen_space_ids = HashSet::new();
617
618 let push_space = |space: KnowledgeSpaceRecord,
619 spaces: &mut Vec<KnowledgeSpaceRecord>,
620 seen_space_ids: &mut HashSet<String>| {
621 if seen_space_ids.insert(space.id.clone()) {
622 spaces.push(space);
623 }
624 };
625
626 if Self::binding_uses_explicit_spaces(binding) {
627 for space_ref in binding
628 .read_spaces
629 .iter()
630 .chain(binding.promote_spaces.iter())
631 {
632 if let Some(space_id) = space_ref.space_id.as_deref() {
633 if let Some(space) = self
634 .get_knowledge_space_for_tenant(space_id, tenant_scope)
635 .await?
636 {
637 push_space(space, &mut spaces, &mut seen_space_ids);
638 }
639 continue;
640 }
641
642 match space_ref.scope {
643 KnowledgeScope::Run => {}
644 KnowledgeScope::Project => {
645 let candidate_project_id = space_ref
646 .project_id
647 .as_deref()
648 .unwrap_or(&request.project_id);
649 let project_spaces = self
650 .list_knowledge_spaces_for_tenant(
651 Some(candidate_project_id),
652 tenant_scope,
653 )
654 .await?;
655 for space in project_spaces.into_iter().filter(|space| {
656 Self::space_matches_ref(space, space_ref, &request.project_id)
657 }) {
658 push_space(space, &mut spaces, &mut seen_space_ids);
659 }
660 }
661 KnowledgeScope::Global => {
662 let global_spaces = self
663 .list_knowledge_spaces_for_tenant(None, tenant_scope)
664 .await?;
665 for space in global_spaces.into_iter().filter(|space| {
666 Self::space_matches_ref(space, space_ref, &request.project_id)
667 }) {
668 push_space(space, &mut spaces, &mut seen_space_ids);
669 }
670 }
671 }
672 }
673 return Ok(spaces);
674 }
675
676 if request.project_id.trim().is_empty() {
677 return Ok(spaces);
678 }
679
680 let project_spaces = self
681 .list_knowledge_spaces_for_tenant(Some(&request.project_id), tenant_scope)
682 .await?;
683 let requested_namespace = if binding.namespace.is_some() {
684 binding.namespace.clone()
685 } else {
686 Self::select_preflight_namespace(binding, &project_spaces)
687 };
688 let Some(requested_namespace) = requested_namespace else {
689 return Ok(spaces);
690 };
691
692 for space in project_spaces.into_iter().filter(|space| {
693 space.scope == KnowledgeScope::Project
694 && Self::namespace_matches(
695 space.namespace.as_deref(),
696 Some(requested_namespace.as_str()),
697 )
698 }) {
699 push_space(space, &mut spaces, &mut seen_space_ids);
700 }
701 Ok(spaces)
702 }
703
704 pub async fn preflight_knowledge(
705 &self,
706 request: &KnowledgePreflightRequest,
707 ) -> MemoryResult<KnowledgePreflightResult> {
708 self.preflight_knowledge_for_tenant(request, &MemoryTenantScope::local())
709 .await
710 }
711
712 pub async fn preflight_knowledge_for_tenant(
713 &self,
714 request: &KnowledgePreflightRequest,
715 tenant_scope: &MemoryTenantScope,
716 ) -> MemoryResult<KnowledgePreflightResult> {
717 let binding = &request.binding;
718 let project_spaces = if request.project_id.trim().is_empty() {
719 Vec::new()
720 } else {
721 self.list_knowledge_spaces_for_tenant(Some(&request.project_id), tenant_scope)
722 .await?
723 };
724 let namespace = binding
725 .namespace
726 .clone()
727 .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
728 let coverage_key = build_knowledge_coverage_key(
729 &request.project_id,
730 namespace.as_deref(),
731 &request.task_family,
732 &request.subject,
733 );
734
735 if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
736 return Ok(KnowledgePreflightResult {
737 project_id: request.project_id.clone(),
738 namespace,
739 task_family: request.task_family.clone(),
740 subject: request.subject.clone(),
741 coverage_key,
742 decision: KnowledgeReuseDecision::Disabled,
743 reuse_reason: None,
744 skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
745 freshness_reason: None,
746 items: Vec::new(),
747 });
748 }
749
750 let spaces = self
751 .resolve_preflight_spaces(request, &coverage_key, tenant_scope)
752 .await?;
753 if spaces.is_empty() {
754 return Ok(KnowledgePreflightResult {
755 project_id: request.project_id.clone(),
756 namespace,
757 task_family: request.task_family.clone(),
758 subject: request.subject.clone(),
759 coverage_key,
760 decision: KnowledgeReuseDecision::NoPriorKnowledge,
761 reuse_reason: None,
762 skip_reason: Some("no reusable knowledge spaces were found".to_string()),
763 freshness_reason: None,
764 items: Vec::new(),
765 });
766 }
767
768 let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
769 let mut fresh_items = Vec::new();
770 let mut stale_items = Vec::new();
771 let mut freshest_reason = None;
772
773 for space in &spaces {
774 let items = self
775 .list_knowledge_items_for_tenant(&space.id, Some(&coverage_key), tenant_scope)
776 .await?;
777 let coverage = self
778 .get_knowledge_coverage_for_tenant(&coverage_key, &space.id, tenant_scope)
779 .await?;
780 for item in items {
781 if !item.status.is_active() {
782 continue;
783 }
784 let Some(trust_level) = item.status.as_trust_level() else {
785 continue;
786 };
787 if !trust_level.meets_floor(binding.trust_floor) {
788 continue;
789 }
790 let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
791 coverage
792 .as_ref()
793 .and_then(|coverage| coverage.freshness_expires_at_ms)
794 });
795 let pack_item = KnowledgePackItem {
796 item_id: item.id.clone(),
797 space_id: space.id.clone(),
798 coverage_key: item.coverage_key.clone(),
799 title: item.title.clone(),
800 summary: item.summary.clone(),
801 trust_level,
802 status: item.status.to_string(),
803 artifact_refs: item.artifact_refs.clone(),
804 source_memory_ids: item.source_memory_ids.clone(),
805 freshness_expires_at_ms,
806 };
807 if Self::is_fresh_enough(
808 freshness_expires_at_ms,
809 binding.freshness_ms,
810 coverage
811 .as_ref()
812 .and_then(|coverage| coverage.last_promoted_at_ms),
813 item.created_at_ms,
814 now_ms,
815 ) {
816 fresh_items.push(pack_item);
817 } else {
818 freshest_reason = Some(match freshness_expires_at_ms {
819 Some(expires_at_ms) => format!(
820 "coverage `{}` in space `{}` expired at {}",
821 coverage_key, space.id, expires_at_ms
822 ),
823 None => format!(
824 "coverage `{}` in space `{}` lacks freshness metadata",
825 coverage_key, space.id
826 ),
827 });
828 stale_items.push(pack_item);
829 }
830 }
831 }
832
833 fresh_items.sort_by(|left, right| {
834 right
835 .trust_level
836 .rank()
837 .cmp(&left.trust_level.rank())
838 .then_with(|| {
839 right
840 .freshness_expires_at_ms
841 .unwrap_or(0)
842 .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
843 })
844 .then_with(|| left.title.cmp(&right.title))
845 });
846 stale_items.sort_by(|left, right| {
847 right
848 .trust_level
849 .rank()
850 .cmp(&left.trust_level.rank())
851 .then_with(|| left.title.cmp(&right.title))
852 });
853
854 if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
855 let selected = fresh_items
856 .into_iter()
857 .take(MAX_KNOWLEDGE_PACK_ITEMS)
858 .collect::<Vec<_>>();
859 let decision = match freshest_trust_level {
860 KnowledgeTrustLevel::ApprovedDefault => {
861 KnowledgeReuseDecision::ReuseApprovedDefault
862 }
863 _ => KnowledgeReuseDecision::ReusePromoted,
864 };
865 let selected_count = selected.len();
866 return Ok(KnowledgePreflightResult {
867 project_id: request.project_id.clone(),
868 namespace,
869 task_family: request.task_family.clone(),
870 subject: request.subject.clone(),
871 coverage_key,
872 decision,
873 reuse_reason: Some(format!(
874 "reusing {} promoted knowledge item(s) from {} space(s)",
875 selected_count,
876 spaces.len()
877 )),
878 skip_reason: None,
879 freshness_reason: None,
880 items: selected,
881 });
882 }
883
884 if !stale_items.is_empty() {
885 let selected = stale_items
886 .into_iter()
887 .take(MAX_KNOWLEDGE_PACK_ITEMS)
888 .collect::<Vec<_>>();
889 return Ok(KnowledgePreflightResult {
890 project_id: request.project_id.clone(),
891 namespace,
892 task_family: request.task_family.clone(),
893 subject: request.subject.clone(),
894 coverage_key,
895 decision: KnowledgeReuseDecision::RefreshRequired,
896 reuse_reason: None,
897 skip_reason: Some(
898 "prior knowledge exists but is not fresh enough to reuse".to_string(),
899 ),
900 freshness_reason: freshest_reason.or_else(|| {
901 Some("matching knowledge exists but freshness policy rejected it".to_string())
902 }),
903 items: selected,
904 });
905 }
906
907 Ok(KnowledgePreflightResult {
908 project_id: request.project_id.clone(),
909 namespace,
910 task_family: request.task_family.clone(),
911 subject: request.subject.clone(),
912 coverage_key,
913 decision: KnowledgeReuseDecision::NoPriorKnowledge,
914 reuse_reason: None,
915 skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
916 freshness_reason: None,
917 items: Vec::new(),
918 })
919 }
920
921 pub async fn retrieve_context(
926 &self,
927 query: &str,
928 project_id: Option<&str>,
929 session_id: Option<&str>,
930 token_budget: Option<i64>,
931 ) -> MemoryResult<MemoryContext> {
932 self.retrieve_context_for_tenant(
933 query,
934 project_id,
935 session_id,
936 &MemoryTenantScope::local(),
937 token_budget,
938 )
939 .await
940 }
941
942 pub async fn retrieve_context_for_tenant(
943 &self,
944 query: &str,
945 project_id: Option<&str>,
946 session_id: Option<&str>,
947 tenant_scope: &MemoryTenantScope,
948 token_budget: Option<i64>,
949 ) -> MemoryResult<MemoryContext> {
950 let (context, _) = self
951 .retrieve_context_with_meta_for_tenant(
952 query,
953 project_id,
954 session_id,
955 tenant_scope,
956 token_budget,
957 )
958 .await?;
959 Ok(context)
960 }
961
962 pub async fn retrieve_context_with_meta(
964 &self,
965 query: &str,
966 project_id: Option<&str>,
967 session_id: Option<&str>,
968 token_budget: Option<i64>,
969 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
970 self.retrieve_context_with_meta_for_tenant(
971 query,
972 project_id,
973 session_id,
974 &MemoryTenantScope::local(),
975 token_budget,
976 )
977 .await
978 }
979
980 pub async fn retrieve_context_with_meta_for_tenant(
981 &self,
982 query: &str,
983 project_id: Option<&str>,
984 session_id: Option<&str>,
985 tenant_scope: &MemoryTenantScope,
986 token_budget: Option<i64>,
987 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
988 self.retrieve_context_with_meta_for_tenant_with_access_filter(
989 query,
990 project_id,
991 session_id,
992 tenant_scope,
993 token_budget,
994 None,
995 )
996 .await
997 }
998
999 pub async fn retrieve_context_with_meta_for_tenant_with_access_filter(
1000 &self,
1001 query: &str,
1002 project_id: Option<&str>,
1003 session_id: Option<&str>,
1004 tenant_scope: &MemoryTenantScope,
1005 token_budget: Option<i64>,
1006 access_filter: Option<&crate::types::MemoryAccessFilter>,
1007 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
1008 let config = if let Some(pid) = project_id {
1009 self.db
1010 .get_or_create_config_for_tenant(pid, tenant_scope)
1011 .await?
1012 } else {
1013 MemoryConfig::default()
1014 };
1015 let budget = token_budget.unwrap_or(config.token_budget);
1016 let retrieval_limit = config.retrieval_k.max(1);
1017
1018 let current_session = if let Some(sid) = session_id {
1020 self.db
1021 .get_session_chunks_for_tenant(sid, tenant_scope)
1022 .await?
1023 .into_iter()
1024 .filter(|chunk| memory_chunk_visible_to_access_filter(chunk, access_filter))
1025 .collect()
1026 } else {
1027 Vec::new()
1028 };
1029
1030 let search_results = self
1032 .search_for_tenant_with_access_filter(
1033 query,
1034 None,
1035 project_id,
1036 session_id,
1037 tenant_scope,
1038 Some(retrieval_limit),
1039 access_filter,
1040 )
1041 .await?;
1042
1043 let mut score_min: Option<f64> = None;
1044 let mut score_max: Option<f64> = None;
1045 for result in &search_results {
1046 score_min = Some(match score_min {
1047 Some(current) => current.min(result.similarity),
1048 None => result.similarity,
1049 });
1050 score_max = Some(match score_max {
1051 Some(current) => current.max(result.similarity),
1052 None => result.similarity,
1053 });
1054 }
1055
1056 let mut current_session = current_session;
1057 let mut relevant_history = Vec::new();
1058 let mut project_facts = Vec::new();
1059
1060 for result in search_results {
1061 match result.chunk.tier {
1062 MemoryTier::Project => {
1063 project_facts.push(result.chunk);
1064 }
1065 MemoryTier::Global => {
1066 project_facts.push(result.chunk);
1067 }
1068 MemoryTier::Session => {
1069 if !current_session.iter().any(|c| c.id == result.chunk.id) {
1071 relevant_history.push(result.chunk);
1072 }
1073 }
1074 }
1075 }
1076
1077 let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
1079 total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
1080 total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
1081
1082 if total_tokens > budget {
1084 let excess = total_tokens - budget;
1085 self.trim_context(
1086 &mut current_session,
1087 &mut relevant_history,
1088 &mut project_facts,
1089 excess,
1090 )?;
1091 total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
1092 + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
1093 + project_facts.iter().map(|c| c.token_count).sum::<i64>();
1094 }
1095
1096 let context = MemoryContext {
1097 current_session,
1098 relevant_history,
1099 project_facts,
1100 total_tokens,
1101 };
1102 let chunks_total = context.current_session.len()
1103 + context.relevant_history.len()
1104 + context.project_facts.len();
1105 let meta = MemoryRetrievalMeta {
1106 used: chunks_total > 0,
1107 chunks_total,
1108 session_chunks: context.current_session.len(),
1109 history_chunks: context.relevant_history.len(),
1110 project_fact_chunks: context.project_facts.len(),
1111 score_min,
1112 score_max,
1113 };
1114
1115 Ok((context, meta))
1116 }
1117
1118 fn trim_context(
1120 &self,
1121 current_session: &mut Vec<MemoryChunk>,
1122 relevant_history: &mut Vec<MemoryChunk>,
1123 project_facts: &mut Vec<MemoryChunk>,
1124 excess_tokens: i64,
1125 ) -> MemoryResult<()> {
1126 let mut tokens_to_remove = excess_tokens;
1127
1128 while tokens_to_remove > 0 && !relevant_history.is_empty() {
1130 if let Some(chunk) = relevant_history.pop() {
1131 tokens_to_remove -= chunk.token_count;
1132 }
1133 }
1134
1135 while tokens_to_remove > 0 && !project_facts.is_empty() {
1137 if let Some(chunk) = project_facts.pop() {
1138 tokens_to_remove -= chunk.token_count;
1139 }
1140 }
1141
1142 while tokens_to_remove > 0 && !current_session.is_empty() {
1143 if let Some(chunk) = current_session.pop() {
1144 tokens_to_remove -= chunk.token_count;
1145 }
1146 }
1147
1148 Ok(())
1149 }
1150
1151 pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
1153 self.clear_session_for_tenant(session_id, &MemoryTenantScope::local())
1154 .await
1155 }
1156
1157 pub async fn clear_session_for_tenant(
1158 &self,
1159 session_id: &str,
1160 tenant_scope: &MemoryTenantScope,
1161 ) -> MemoryResult<u64> {
1162 let count = self
1163 .db
1164 .clear_session_memory_for_tenant(session_id, tenant_scope)
1165 .await?;
1166
1167 self.db
1169 .log_cleanup_for_tenant(
1170 "manual",
1171 MemoryTier::Session,
1172 None,
1173 Some(session_id),
1174 count as i64,
1175 0,
1176 tenant_scope,
1177 )
1178 .await?;
1179
1180 Ok(count)
1181 }
1182
1183 pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
1185 self.clear_project_for_tenant(project_id, &MemoryTenantScope::local())
1186 .await
1187 }
1188
1189 pub async fn clear_project_for_tenant(
1190 &self,
1191 project_id: &str,
1192 tenant_scope: &MemoryTenantScope,
1193 ) -> MemoryResult<u64> {
1194 let count = self
1195 .db
1196 .clear_project_memory_for_tenant(project_id, tenant_scope)
1197 .await?;
1198
1199 self.db
1201 .log_cleanup_for_tenant(
1202 "manual",
1203 MemoryTier::Project,
1204 Some(project_id),
1205 None,
1206 count as i64,
1207 0,
1208 tenant_scope,
1209 )
1210 .await?;
1211
1212 Ok(count)
1213 }
1214
1215 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1217 self.db.get_stats().await
1218 }
1219
1220 pub async fn get_stats_for_tenant(
1221 &self,
1222 tenant_scope: &MemoryTenantScope,
1223 ) -> MemoryResult<MemoryStats> {
1224 self.db.get_stats_for_tenant(tenant_scope).await
1225 }
1226
1227 pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1229 self.db.get_or_create_config(project_id).await
1230 }
1231
1232 pub async fn get_config_for_tenant(
1233 &self,
1234 project_id: &str,
1235 tenant_scope: &MemoryTenantScope,
1236 ) -> MemoryResult<MemoryConfig> {
1237 self.db
1238 .get_or_create_config_for_tenant(project_id, tenant_scope)
1239 .await
1240 }
1241
1242 pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1244 self.db.update_config(project_id, config).await
1245 }
1246
1247 pub async fn set_config_for_tenant(
1248 &self,
1249 project_id: &str,
1250 config: &MemoryConfig,
1251 tenant_scope: &MemoryTenantScope,
1252 ) -> MemoryResult<()> {
1253 self.db
1254 .update_config_for_tenant(project_id, config, tenant_scope)
1255 .await
1256 }
1257
1258 pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
1259 self.db.get_node_by_uri(uri).await
1260 }
1261
1262 pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
1263 let nodes = self.db.list_directory(uri).await?;
1264 let directories: Vec<MemoryNode> = nodes
1265 .iter()
1266 .filter(|n| n.node_type == NodeType::Directory)
1267 .cloned()
1268 .collect();
1269 let files: Vec<MemoryNode> = nodes
1270 .iter()
1271 .filter(|n| n.node_type == NodeType::File)
1272 .cloned()
1273 .collect();
1274
1275 Ok(DirectoryListing {
1276 uri: uri.to_string(),
1277 nodes,
1278 total_children: directories.len() + files.len(),
1279 directories,
1280 files,
1281 })
1282 }
1283
1284 pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
1285 self.db.get_children_tree(uri, max_depth).await
1286 }
1287
1288 pub async fn create_context_node(
1289 &self,
1290 uri: &str,
1291 node_type: NodeType,
1292 metadata: Option<serde_json::Value>,
1293 ) -> MemoryResult<String> {
1294 let parsed_uri =
1295 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
1296 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1297 self.db
1298 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1299 .await
1300 }
1301
1302 pub async fn get_context_layer(
1303 &self,
1304 node_id: &str,
1305 layer_type: LayerType,
1306 ) -> MemoryResult<Option<MemoryLayer>> {
1307 self.db.get_layer(node_id, layer_type).await
1308 }
1309
1310 pub async fn store_content_with_layers(
1311 &self,
1312 uri: &str,
1313 content: &str,
1314 metadata: Option<serde_json::Value>,
1315 ) -> MemoryResult<String> {
1316 let parsed_uri =
1317 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
1318 let node_type = if parsed_uri
1319 .last_segment()
1320 .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
1321 .unwrap_or(false)
1322 {
1323 NodeType::File
1324 } else {
1325 NodeType::Directory
1326 };
1327
1328 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1329 let node_id = self
1330 .db
1331 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1332 .await?;
1333
1334 let token_count = self.tokenizer.count_tokens(content) as i64;
1335 self.db
1336 .create_layer(&node_id, LayerType::L2, content, token_count, None)
1337 .await?;
1338
1339 Ok(node_id)
1340 }
1341
1342 pub async fn generate_layers_for_node(
1343 &self,
1344 node_id: &str,
1345 providers: &ProviderRegistry,
1346 ) -> MemoryResult<()> {
1347 let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1348 let l2_content = match l2_layer {
1349 Some(layer) => layer.content,
1350 None => return Ok(()),
1351 };
1352
1353 let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1354
1355 let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1356
1357 let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1358 let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1359
1360 if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1361 self.db
1362 .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1363 .await?;
1364 }
1365
1366 if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1367 self.db
1368 .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1369 .await?;
1370 }
1371
1372 Ok(())
1373 }
1374
1375 pub async fn get_layer_content(
1376 &self,
1377 node_id: &str,
1378 layer_type: LayerType,
1379 ) -> MemoryResult<Option<String>> {
1380 let layer = self.db.get_layer(node_id, layer_type).await?;
1381 Ok(layer.map(|l| l.content))
1382 }
1383
1384 pub async fn store_content_with_layers_auto(
1385 &self,
1386 uri: &str,
1387 content: &str,
1388 metadata: Option<serde_json::Value>,
1389 providers: Option<&ProviderRegistry>,
1390 ) -> MemoryResult<String> {
1391 let node_id = self
1392 .store_content_with_layers(uri, content, metadata)
1393 .await?;
1394
1395 if let Some(p) = providers {
1396 if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1397 tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1398 }
1399 }
1400
1401 Ok(node_id)
1402 }
1403
1404 pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1406 self.run_cleanup_for_tenant(project_id, &MemoryTenantScope::local())
1407 .await
1408 }
1409
1410 pub async fn run_cleanup_for_tenant(
1411 &self,
1412 project_id: Option<&str>,
1413 tenant_scope: &MemoryTenantScope,
1414 ) -> MemoryResult<u64> {
1415 let mut total_cleaned = 0u64;
1416
1417 if let Some(pid) = project_id {
1418 let config = self
1420 .db
1421 .get_or_create_config_for_tenant(pid, tenant_scope)
1422 .await?;
1423
1424 if config.auto_cleanup {
1425 let cleaned = self
1427 .db
1428 .cleanup_old_sessions_for_tenant(config.session_retention_days, tenant_scope)
1429 .await?;
1430 total_cleaned += cleaned;
1431
1432 if cleaned > 0 {
1433 self.db
1434 .log_cleanup_for_tenant(
1435 "auto",
1436 MemoryTier::Session,
1437 Some(pid),
1438 None,
1439 cleaned as i64,
1440 0,
1441 tenant_scope,
1442 )
1443 .await?;
1444 }
1445 }
1446 } else {
1447 let cleaned = self
1451 .db
1452 .cleanup_old_sessions_for_tenant(30, tenant_scope)
1453 .await?;
1454 total_cleaned += cleaned;
1455 }
1456
1457 if total_cleaned > 100 {
1459 self.db.vacuum().await?;
1460 }
1461
1462 Ok(total_cleaned)
1463 }
1464
1465 async fn maybe_cleanup(
1467 &self,
1468 project_id: &Option<String>,
1469 tenant_scope: &MemoryTenantScope,
1470 ) -> MemoryResult<()> {
1471 if let Some(pid) = project_id {
1472 let stats = self.db.get_stats_for_tenant(tenant_scope).await?;
1473 let config = self
1474 .db
1475 .get_or_create_config_for_tenant(pid, tenant_scope)
1476 .await?;
1477
1478 if stats.project_chunks > config.max_chunks {
1480 let excess = stats.project_chunks - config.max_chunks;
1482 tracing::info!("Project {} has {} excess chunks", pid, excess);
1485 }
1486 }
1487
1488 Ok(())
1489 }
1490
1491 pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1493 Ok(Vec::new())
1496 }
1497
1498 pub fn count_tokens(&self, text: &str) -> usize {
1500 self.tokenizer.count_tokens(text)
1501 }
1502
1503 pub async fn embedding_health(&self) -> EmbeddingHealth {
1505 let service = self.embedding_service.lock().await;
1506 if service.is_available() {
1507 EmbeddingHealth {
1508 status: "ok".to_string(),
1509 reason: None,
1510 }
1511 } else {
1512 EmbeddingHealth {
1513 status: "degraded_disabled".to_string(),
1514 reason: service.disabled_reason().map(ToString::to_string),
1515 }
1516 }
1517 }
1518
1519 pub async fn consolidate_session(
1521 &self,
1522 session_id: &str,
1523 project_id: Option<&str>,
1524 providers: &ProviderRegistry,
1525 config: &MemoryConsolidationConfig,
1526 ) -> MemoryResult<Option<String>> {
1527 if !config.enabled {
1528 return Ok(None);
1529 }
1530
1531 let chunks = self.db.get_session_chunks(session_id).await?;
1532 if chunks.is_empty() {
1533 return Ok(None);
1534 }
1535
1536 let mut text_parts = Vec::new();
1538 for chunk in &chunks {
1539 text_parts.push(chunk.content.clone());
1540 }
1541 let full_text = text_parts.join("\n\n---\n\n");
1542
1543 let prompt = format!(
1545 "Please provide a concise but comprehensive summary of the following chat session. \
1546 Focus on the key decisions, technical details, code changes, and unresolved issues. \
1547 Do NOT include conversational filler, greetings, or sign-offs. \
1548 This summary will be used as long-term memory to recall the context of this work.\n\n\
1549 Session transcripts:\n\n{}",
1550 full_text
1551 );
1552
1553 let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1554 let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1555
1556 let summary_text = match providers
1557 .complete_cheapest(&prompt, provider_override, model_override)
1558 .await
1559 {
1560 Ok(s) => s,
1561 Err(e) => {
1562 tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1563 return Ok(None);
1564 }
1565 };
1566
1567 if summary_text.trim().is_empty() {
1568 return Ok(None);
1569 }
1570
1571 let embedding = {
1573 let service = self.embedding_service.lock().await;
1574 service
1575 .embed(&summary_text)
1576 .await
1577 .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1578 };
1579
1580 let chunk_id = uuid::Uuid::new_v4().to_string();
1582 let chunk = MemoryChunk {
1583 id: chunk_id,
1584 content: summary_text.clone(),
1585 tier: MemoryTier::Project,
1586 session_id: None, project_id: project_id.map(ToString::to_string),
1588 created_at: Utc::now(),
1589 source: "consolidation".to_string(),
1590 token_count: self.count_tokens(&summary_text) as i64,
1591 source_path: None,
1592 source_mtime: None,
1593 source_size: None,
1594 source_hash: None,
1595 tenant_scope: MemoryTenantScope::local(),
1596 metadata: None,
1597 };
1598
1599 self.db.store_chunk(&chunk, &embedding).await?;
1600
1601 self.db.clear_session_memory(session_id).await?;
1603
1604 tracing::info!(
1605 "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1606 );
1607
1608 Ok(Some(summary_text))
1609 }
1610}
1611
1612fn memory_chunk_visible_to_access_filter(
1613 chunk: &MemoryChunk,
1614 access_filter: Option<&crate::types::MemoryAccessFilter>,
1615) -> bool {
1616 if crate::types::MemorySourceAccessTarget::from_chunk(chunk).is_none() {
1617 return true;
1618 }
1619 access_filter
1620 .map(|filter| filter.allows_chunk(chunk))
1621 .unwrap_or(false)
1622}
1623
1624pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1626 let db_path = app_data_dir.join("tandem_memory.db");
1627 MemoryManager::new(&db_path).await
1628}