1use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::types::{
10 CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
11 KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
12 LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
13 MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTenantScope,
14 MemoryTier, NodeType, StoreMessageRequest, TreeNode,
15};
16use chrono::Utc;
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tandem_orchestrator::{
21 build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
22 KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
23 KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
24};
25use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
26use tokio::sync::Mutex;
27
28pub struct MemoryManager {
30 db: Arc<MemoryDatabase>,
31 embedding_service: Arc<Mutex<EmbeddingService>>,
32 tokenizer: Tokenizer,
33}
34
35const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
36const GUIDE_DOC_SOURCE_PREFIX: &str = "guide_docs:";
37const GUIDE_DOC_RECENCY_HALFLIFE_MS: f64 = 30.0 * 24.0 * 60.0 * 60.0 * 1000.0;
38const GUIDE_DOC_RECENCY_WEIGHT: f64 = 0.12;
39const ACCESS_FILTER_CANDIDATE_MULTIPLIER: i64 = 5;
40
41impl MemoryManager {
42 fn guide_doc_similarity(similarity: f64, chunk: &MemoryChunk, now_ms: i64) -> f64 {
43 if !chunk.source.starts_with(GUIDE_DOC_SOURCE_PREFIX) {
44 return similarity.clamp(0.0, 1.0);
45 }
46
47 let normalized_similarity = similarity.clamp(0.0, 1.0);
48 let source_mtime = chunk
49 .source_mtime
50 .filter(|value| *value > 0)
51 .unwrap_or_else(|| chunk.created_at.timestamp_millis());
52 let age_ms = (now_ms - source_mtime).max(0) as f64;
53 let recency_score = 1.0 / (1.0 + (age_ms / GUIDE_DOC_RECENCY_HALFLIFE_MS));
54 ((1.0 - GUIDE_DOC_RECENCY_WEIGHT) * normalized_similarity
55 + (GUIDE_DOC_RECENCY_WEIGHT * recency_score))
56 .clamp(0.0, 1.0)
57 }
58
59 fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
60 err.to_string()
61 .to_lowercase()
62 .contains("database disk image is malformed")
63 }
64
65 pub fn db(&self) -> &Arc<MemoryDatabase> {
66 &self.db
67 }
68
69 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
71 Self::new_with_embedding_service(db_path, EmbeddingService::new()).await
72 }
73
74 pub async fn new_with_embedding_service(
77 db_path: &Path,
78 embedding_service: EmbeddingService,
79 ) -> MemoryResult<Self> {
80 let db = Arc::new(MemoryDatabase::new(db_path).await?);
81 let embedding_service = Arc::new(Mutex::new(embedding_service));
82 let tokenizer = Tokenizer::new()?;
83
84 Ok(Self {
85 db,
86 embedding_service,
87 tokenizer,
88 })
89 }
90
91 pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
98 if self
99 .db
100 .ensure_vector_tables_healthy()
101 .await
102 .unwrap_or(false)
103 {
104 tracing::warn!("Memory vector tables were repaired before storing message chunks");
105 }
106
107 let config = if let Some(ref pid) = request.project_id {
108 self.db
109 .get_or_create_config_for_tenant(pid, &request.tenant_scope)
110 .await?
111 } else {
112 MemoryConfig::default()
113 };
114
115 let chunking_config = ChunkingConfig {
117 chunk_size: config.chunk_size as usize,
118 chunk_overlap: config.chunk_overlap as usize,
119 separator: None,
120 };
121
122 let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
123
124 if text_chunks.is_empty() {
125 return Ok(Vec::new());
126 }
127
128 let mut chunk_ids = Vec::with_capacity(text_chunks.len());
129 let embedding_service = self.embedding_service.lock().await;
130
131 for text_chunk in text_chunks {
132 let chunk_id = uuid::Uuid::new_v4().to_string();
133
134 let embedding = embedding_service.embed(&text_chunk.content).await?;
136
137 let chunk = MemoryChunk {
139 id: chunk_id.clone(),
140 content: text_chunk.content,
141 tier: request.tier,
142 session_id: request.session_id.clone(),
143 project_id: request.project_id.clone(),
144 source: request.source.clone(),
145 source_path: request.source_path.clone(),
146 source_mtime: request.source_mtime,
147 source_size: request.source_size,
148 source_hash: request.source_hash.clone(),
149 tenant_scope: request.tenant_scope.clone(),
150 created_at: Utc::now(),
151 token_count: text_chunk.token_count as i64,
152 metadata: request.metadata.clone(),
153 };
154
155 if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
157 tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
158 let repaired = {
159 let repaired_after_error =
160 self.db.try_repair_after_error(&err).await.unwrap_or(false);
161 repaired_after_error
162 || self
163 .db
164 .ensure_vector_tables_healthy()
165 .await
166 .unwrap_or(false)
167 };
168 if repaired {
169 tracing::warn!(
170 "Retrying memory chunk insert after vector table repair: {}",
171 chunk.id
172 );
173 if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
174 if Self::is_malformed_database_error(&retry_err) {
175 tracing::warn!(
176 "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
177 chunk.id
178 );
179 self.db.reset_all_memory_tables().await?;
180 self.db.store_chunk(&chunk, &embedding).await?;
181 } else {
182 return Err(retry_err);
183 }
184 }
185 } else {
186 return Err(err);
187 }
188 }
189 chunk_ids.push(chunk_id);
190 }
191
192 if config.auto_cleanup {
194 self.maybe_cleanup(&request.project_id, &request.tenant_scope)
195 .await?;
196 }
197
198 Ok(chunk_ids)
199 }
200
201 pub async fn search(
203 &self,
204 query: &str,
205 tier: Option<MemoryTier>,
206 project_id: Option<&str>,
207 session_id: Option<&str>,
208 limit: Option<i64>,
209 ) -> MemoryResult<Vec<MemorySearchResult>> {
210 self.search_for_tenant(
211 query,
212 tier,
213 project_id,
214 session_id,
215 &MemoryTenantScope::local(),
216 limit,
217 )
218 .await
219 }
220
221 pub async fn search_for_tenant(
222 &self,
223 query: &str,
224 tier: Option<MemoryTier>,
225 project_id: Option<&str>,
226 session_id: Option<&str>,
227 tenant_scope: &MemoryTenantScope,
228 limit: Option<i64>,
229 ) -> MemoryResult<Vec<MemorySearchResult>> {
230 self.search_for_tenant_with_access_filter(
231 query,
232 tier,
233 project_id,
234 session_id,
235 tenant_scope,
236 limit,
237 None,
238 )
239 .await
240 }
241
242 pub async fn search_for_tenant_with_access_filter(
243 &self,
244 query: &str,
245 tier: Option<MemoryTier>,
246 project_id: Option<&str>,
247 session_id: Option<&str>,
248 tenant_scope: &MemoryTenantScope,
249 limit: Option<i64>,
250 access_filter: Option<&crate::types::MemoryAccessFilter>,
251 ) -> MemoryResult<Vec<MemorySearchResult>> {
252 let effective_limit = limit.unwrap_or(5);
253 let candidate_limit = if access_filter.is_some() {
254 effective_limit.saturating_mul(ACCESS_FILTER_CANDIDATE_MULTIPLIER)
255 } else {
256 effective_limit
257 };
258
259 let embedding_service = self.embedding_service.lock().await;
261 let query_embedding = embedding_service.embed(query).await?;
262 drop(embedding_service);
263
264 let mut results = Vec::new();
265
266 let tiers_to_search = match tier {
268 Some(t) => vec![t],
269 None => {
270 if project_id.is_some() {
271 vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
272 } else {
273 vec![MemoryTier::Session, MemoryTier::Global]
274 }
275 }
276 };
277
278 let now_ms = Utc::now().timestamp_millis();
279 for search_tier in tiers_to_search {
280 let tier_results = match self
281 .db
282 .search_similar_for_tenant(
283 &query_embedding,
284 search_tier,
285 project_id,
286 session_id,
287 tenant_scope,
288 candidate_limit,
289 )
290 .await
291 {
292 Ok(results) => results,
293 Err(err) => {
294 tracing::warn!(
295 "Memory tier search failed for {:?}: {}. Attempting vector repair.",
296 search_tier,
297 err
298 );
299 let repaired = {
300 let repaired_after_error =
301 self.db.try_repair_after_error(&err).await.unwrap_or(false);
302 repaired_after_error
303 || self
304 .db
305 .ensure_vector_tables_healthy()
306 .await
307 .unwrap_or(false)
308 };
309 if repaired {
310 match self
311 .db
312 .search_similar_for_tenant(
313 &query_embedding,
314 search_tier,
315 project_id,
316 session_id,
317 tenant_scope,
318 candidate_limit,
319 )
320 .await
321 {
322 Ok(results) => results,
323 Err(retry_err) => {
324 tracing::warn!(
325 "Memory tier search still failing for {:?} after repair: {}",
326 search_tier,
327 retry_err
328 );
329 continue;
330 }
331 }
332 } else {
333 continue;
334 }
335 }
336 };
337
338 for (chunk, distance) in tier_results {
339 if !memory_chunk_visible_to_access_filter(&chunk, access_filter) {
340 continue;
341 }
342 let similarity = 1.0 - distance.clamp(0.0, 1.0);
346 let similarity = if search_tier == MemoryTier::Global {
347 Self::guide_doc_similarity(similarity, &chunk, now_ms)
348 } else {
349 similarity
350 };
351
352 results.push(MemorySearchResult { chunk, similarity });
353 }
354 }
355
356 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
358 results.truncate(effective_limit as usize);
359
360 Ok(results)
361 }
362
363 pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
364 self.db.upsert_knowledge_space(space).await
365 }
366
367 pub async fn upsert_knowledge_space_for_tenant(
368 &self,
369 space: &KnowledgeSpaceRecord,
370 tenant_scope: &MemoryTenantScope,
371 ) -> MemoryResult<()> {
372 self.db
373 .upsert_knowledge_space_for_tenant(space, tenant_scope)
374 .await
375 }
376
377 pub async fn get_knowledge_space(
378 &self,
379 id: &str,
380 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
381 self.db.get_knowledge_space(id).await
382 }
383
384 pub async fn get_knowledge_space_for_tenant(
385 &self,
386 id: &str,
387 tenant_scope: &MemoryTenantScope,
388 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
389 self.db
390 .get_knowledge_space_for_tenant(id, tenant_scope)
391 .await
392 }
393
394 pub async fn list_knowledge_spaces(
395 &self,
396 project_id: Option<&str>,
397 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
398 self.db.list_knowledge_spaces(project_id).await
399 }
400
401 pub async fn list_knowledge_spaces_for_tenant(
402 &self,
403 project_id: Option<&str>,
404 tenant_scope: &MemoryTenantScope,
405 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
406 self.db
407 .list_knowledge_spaces_for_tenant(project_id, tenant_scope)
408 .await
409 }
410
411 pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
412 self.db.upsert_knowledge_item(item).await
413 }
414
415 pub async fn upsert_knowledge_item_for_tenant(
416 &self,
417 item: &KnowledgeItemRecord,
418 tenant_scope: &MemoryTenantScope,
419 ) -> MemoryResult<()> {
420 self.db
421 .upsert_knowledge_item_for_tenant(item, tenant_scope)
422 .await
423 }
424
425 pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
426 self.db.get_knowledge_item(id).await
427 }
428
429 pub async fn get_knowledge_item_for_tenant(
430 &self,
431 id: &str,
432 tenant_scope: &MemoryTenantScope,
433 ) -> MemoryResult<Option<KnowledgeItemRecord>> {
434 self.db
435 .get_knowledge_item_for_tenant(id, tenant_scope)
436 .await
437 }
438
439 pub async fn list_knowledge_items(
440 &self,
441 space_id: &str,
442 coverage_key: Option<&str>,
443 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
444 self.db.list_knowledge_items(space_id, coverage_key).await
445 }
446
447 pub async fn list_knowledge_items_for_tenant(
448 &self,
449 space_id: &str,
450 coverage_key: Option<&str>,
451 tenant_scope: &MemoryTenantScope,
452 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
453 self.db
454 .list_knowledge_items_for_tenant(space_id, coverage_key, tenant_scope)
455 .await
456 }
457
458 pub async fn upsert_knowledge_coverage(
459 &self,
460 coverage: &KnowledgeCoverageRecord,
461 ) -> MemoryResult<()> {
462 self.db.upsert_knowledge_coverage(coverage).await
463 }
464
465 pub async fn upsert_knowledge_coverage_for_tenant(
466 &self,
467 coverage: &KnowledgeCoverageRecord,
468 tenant_scope: &MemoryTenantScope,
469 ) -> MemoryResult<()> {
470 self.db
471 .upsert_knowledge_coverage_for_tenant(coverage, tenant_scope)
472 .await
473 }
474
475 pub async fn get_knowledge_coverage(
476 &self,
477 coverage_key: &str,
478 space_id: &str,
479 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
480 self.db.get_knowledge_coverage(coverage_key, space_id).await
481 }
482
483 pub async fn get_knowledge_coverage_for_tenant(
484 &self,
485 coverage_key: &str,
486 space_id: &str,
487 tenant_scope: &MemoryTenantScope,
488 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
489 self.db
490 .get_knowledge_coverage_for_tenant(coverage_key, space_id, tenant_scope)
491 .await
492 }
493
494 pub async fn promote_knowledge_item(
495 &self,
496 request: &KnowledgePromotionRequest,
497 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
498 self.db.promote_knowledge_item(request).await
499 }
500
501 pub async fn promote_knowledge_item_for_tenant(
502 &self,
503 request: &KnowledgePromotionRequest,
504 tenant_scope: &MemoryTenantScope,
505 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
506 self.db
507 .promote_knowledge_item_for_tenant(request, tenant_scope)
508 .await
509 }
510
511 fn space_matches_ref(
512 space: &KnowledgeSpaceRecord,
513 space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
514 project_id: &str,
515 ) -> bool {
516 if space.scope != space_ref.scope {
517 return false;
518 }
519 match space_ref.scope {
520 KnowledgeScope::Project | KnowledgeScope::Run => {
521 let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
522 if space.project_id.as_deref() != Some(target_project) {
523 return false;
524 }
525 }
526 KnowledgeScope::Global => {}
527 }
528 if let Some(namespace) = space_ref.namespace.as_deref() {
529 if space.namespace.as_deref() != Some(namespace) {
530 return false;
531 }
532 }
533 true
534 }
535
536 fn select_preflight_namespace(
537 binding: &KnowledgeBinding,
538 spaces: &[KnowledgeSpaceRecord],
539 ) -> Option<String> {
540 if let Some(namespace) = binding.namespace.clone() {
541 return Some(namespace);
542 }
543 if binding.read_spaces.len() == 1 {
544 if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
545 return Some(namespace);
546 }
547 }
548 if spaces.len() == 1 {
549 return spaces[0].namespace.clone();
550 }
551 let mut unique = HashSet::new();
552 for space in spaces {
553 if let Some(namespace) = space.namespace.as_ref() {
554 unique.insert(namespace);
555 }
556 }
557 if unique.len() == 1 {
558 unique.into_iter().next().map(|value| value.to_string())
559 } else {
560 None
561 }
562 }
563
564 fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
565 !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
566 }
567
568 fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
569 match (space_namespace, binding_namespace) {
570 (None, None) => true,
571 (Some(space), Some(binding)) => {
572 normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
573 }
574 _ => false,
575 }
576 }
577
578 fn is_fresh_enough(
579 freshness_expires_at_ms: Option<u64>,
580 freshness_policy_ms: Option<u64>,
581 coverage_last_promoted_at_ms: Option<u64>,
582 item_created_at_ms: u64,
583 now_ms: u64,
584 ) -> bool {
585 if let Some(expires_at_ms) = freshness_expires_at_ms {
586 return expires_at_ms > now_ms;
587 }
588 let Some(policy_ms) = freshness_policy_ms else {
589 return true;
590 };
591 let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
592 now_ms.saturating_sub(basis_ms) <= policy_ms
593 }
594
595 async fn resolve_preflight_spaces(
596 &self,
597 request: &KnowledgePreflightRequest,
598 _coverage_key: &str,
599 tenant_scope: &MemoryTenantScope,
600 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
601 let binding = &request.binding;
602 let mut spaces = Vec::new();
603 let mut seen_space_ids = HashSet::new();
604
605 let push_space = |space: KnowledgeSpaceRecord,
606 spaces: &mut Vec<KnowledgeSpaceRecord>,
607 seen_space_ids: &mut HashSet<String>| {
608 if seen_space_ids.insert(space.id.clone()) {
609 spaces.push(space);
610 }
611 };
612
613 if Self::binding_uses_explicit_spaces(binding) {
614 for space_ref in binding
615 .read_spaces
616 .iter()
617 .chain(binding.promote_spaces.iter())
618 {
619 if let Some(space_id) = space_ref.space_id.as_deref() {
620 if let Some(space) = self
621 .get_knowledge_space_for_tenant(space_id, tenant_scope)
622 .await?
623 {
624 push_space(space, &mut spaces, &mut seen_space_ids);
625 }
626 continue;
627 }
628
629 match space_ref.scope {
630 KnowledgeScope::Run => {}
631 KnowledgeScope::Project => {
632 let candidate_project_id = space_ref
633 .project_id
634 .as_deref()
635 .unwrap_or(&request.project_id);
636 let project_spaces = self
637 .list_knowledge_spaces_for_tenant(
638 Some(candidate_project_id),
639 tenant_scope,
640 )
641 .await?;
642 for space in project_spaces.into_iter().filter(|space| {
643 Self::space_matches_ref(space, space_ref, &request.project_id)
644 }) {
645 push_space(space, &mut spaces, &mut seen_space_ids);
646 }
647 }
648 KnowledgeScope::Global => {
649 let global_spaces = self
650 .list_knowledge_spaces_for_tenant(None, tenant_scope)
651 .await?;
652 for space in global_spaces.into_iter().filter(|space| {
653 Self::space_matches_ref(space, space_ref, &request.project_id)
654 }) {
655 push_space(space, &mut spaces, &mut seen_space_ids);
656 }
657 }
658 }
659 }
660 return Ok(spaces);
661 }
662
663 if request.project_id.trim().is_empty() {
664 return Ok(spaces);
665 }
666
667 let project_spaces = self
668 .list_knowledge_spaces_for_tenant(Some(&request.project_id), tenant_scope)
669 .await?;
670 let requested_namespace = if binding.namespace.is_some() {
671 binding.namespace.clone()
672 } else {
673 Self::select_preflight_namespace(binding, &project_spaces)
674 };
675 let Some(requested_namespace) = requested_namespace else {
676 return Ok(spaces);
677 };
678
679 for space in project_spaces.into_iter().filter(|space| {
680 space.scope == KnowledgeScope::Project
681 && Self::namespace_matches(
682 space.namespace.as_deref(),
683 Some(requested_namespace.as_str()),
684 )
685 }) {
686 push_space(space, &mut spaces, &mut seen_space_ids);
687 }
688 Ok(spaces)
689 }
690
691 pub async fn preflight_knowledge(
692 &self,
693 request: &KnowledgePreflightRequest,
694 ) -> MemoryResult<KnowledgePreflightResult> {
695 self.preflight_knowledge_for_tenant(request, &MemoryTenantScope::local())
696 .await
697 }
698
699 pub async fn preflight_knowledge_for_tenant(
700 &self,
701 request: &KnowledgePreflightRequest,
702 tenant_scope: &MemoryTenantScope,
703 ) -> MemoryResult<KnowledgePreflightResult> {
704 let binding = &request.binding;
705 let project_spaces = if request.project_id.trim().is_empty() {
706 Vec::new()
707 } else {
708 self.list_knowledge_spaces_for_tenant(Some(&request.project_id), tenant_scope)
709 .await?
710 };
711 let namespace = binding
712 .namespace
713 .clone()
714 .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
715 let coverage_key = build_knowledge_coverage_key(
716 &request.project_id,
717 namespace.as_deref(),
718 &request.task_family,
719 &request.subject,
720 );
721
722 if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
723 return Ok(KnowledgePreflightResult {
724 project_id: request.project_id.clone(),
725 namespace,
726 task_family: request.task_family.clone(),
727 subject: request.subject.clone(),
728 coverage_key,
729 decision: KnowledgeReuseDecision::Disabled,
730 reuse_reason: None,
731 skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
732 freshness_reason: None,
733 items: Vec::new(),
734 });
735 }
736
737 let spaces = self
738 .resolve_preflight_spaces(request, &coverage_key, tenant_scope)
739 .await?;
740 if spaces.is_empty() {
741 return Ok(KnowledgePreflightResult {
742 project_id: request.project_id.clone(),
743 namespace,
744 task_family: request.task_family.clone(),
745 subject: request.subject.clone(),
746 coverage_key,
747 decision: KnowledgeReuseDecision::NoPriorKnowledge,
748 reuse_reason: None,
749 skip_reason: Some("no reusable knowledge spaces were found".to_string()),
750 freshness_reason: None,
751 items: Vec::new(),
752 });
753 }
754
755 let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
756 let mut fresh_items = Vec::new();
757 let mut stale_items = Vec::new();
758 let mut freshest_reason = None;
759
760 for space in &spaces {
761 let items = self
762 .list_knowledge_items_for_tenant(&space.id, Some(&coverage_key), tenant_scope)
763 .await?;
764 let coverage = self
765 .get_knowledge_coverage_for_tenant(&coverage_key, &space.id, tenant_scope)
766 .await?;
767 for item in items {
768 if !item.status.is_active() {
769 continue;
770 }
771 let Some(trust_level) = item.status.as_trust_level() else {
772 continue;
773 };
774 if !trust_level.meets_floor(binding.trust_floor) {
775 continue;
776 }
777 let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
778 coverage
779 .as_ref()
780 .and_then(|coverage| coverage.freshness_expires_at_ms)
781 });
782 let pack_item = KnowledgePackItem {
783 item_id: item.id.clone(),
784 space_id: space.id.clone(),
785 coverage_key: item.coverage_key.clone(),
786 title: item.title.clone(),
787 summary: item.summary.clone(),
788 trust_level,
789 status: item.status.to_string(),
790 artifact_refs: item.artifact_refs.clone(),
791 source_memory_ids: item.source_memory_ids.clone(),
792 freshness_expires_at_ms,
793 };
794 if Self::is_fresh_enough(
795 freshness_expires_at_ms,
796 binding.freshness_ms,
797 coverage
798 .as_ref()
799 .and_then(|coverage| coverage.last_promoted_at_ms),
800 item.created_at_ms,
801 now_ms,
802 ) {
803 fresh_items.push(pack_item);
804 } else {
805 freshest_reason = Some(match freshness_expires_at_ms {
806 Some(expires_at_ms) => format!(
807 "coverage `{}` in space `{}` expired at {}",
808 coverage_key, space.id, expires_at_ms
809 ),
810 None => format!(
811 "coverage `{}` in space `{}` lacks freshness metadata",
812 coverage_key, space.id
813 ),
814 });
815 stale_items.push(pack_item);
816 }
817 }
818 }
819
820 fresh_items.sort_by(|left, right| {
821 right
822 .trust_level
823 .rank()
824 .cmp(&left.trust_level.rank())
825 .then_with(|| {
826 right
827 .freshness_expires_at_ms
828 .unwrap_or(0)
829 .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
830 })
831 .then_with(|| left.title.cmp(&right.title))
832 });
833 stale_items.sort_by(|left, right| {
834 right
835 .trust_level
836 .rank()
837 .cmp(&left.trust_level.rank())
838 .then_with(|| left.title.cmp(&right.title))
839 });
840
841 if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
842 let selected = fresh_items
843 .into_iter()
844 .take(MAX_KNOWLEDGE_PACK_ITEMS)
845 .collect::<Vec<_>>();
846 let decision = match freshest_trust_level {
847 KnowledgeTrustLevel::ApprovedDefault => {
848 KnowledgeReuseDecision::ReuseApprovedDefault
849 }
850 _ => KnowledgeReuseDecision::ReusePromoted,
851 };
852 let selected_count = selected.len();
853 return Ok(KnowledgePreflightResult {
854 project_id: request.project_id.clone(),
855 namespace,
856 task_family: request.task_family.clone(),
857 subject: request.subject.clone(),
858 coverage_key,
859 decision,
860 reuse_reason: Some(format!(
861 "reusing {} promoted knowledge item(s) from {} space(s)",
862 selected_count,
863 spaces.len()
864 )),
865 skip_reason: None,
866 freshness_reason: None,
867 items: selected,
868 });
869 }
870
871 if !stale_items.is_empty() {
872 let selected = stale_items
873 .into_iter()
874 .take(MAX_KNOWLEDGE_PACK_ITEMS)
875 .collect::<Vec<_>>();
876 return Ok(KnowledgePreflightResult {
877 project_id: request.project_id.clone(),
878 namespace,
879 task_family: request.task_family.clone(),
880 subject: request.subject.clone(),
881 coverage_key,
882 decision: KnowledgeReuseDecision::RefreshRequired,
883 reuse_reason: None,
884 skip_reason: Some(
885 "prior knowledge exists but is not fresh enough to reuse".to_string(),
886 ),
887 freshness_reason: freshest_reason.or_else(|| {
888 Some("matching knowledge exists but freshness policy rejected it".to_string())
889 }),
890 items: selected,
891 });
892 }
893
894 Ok(KnowledgePreflightResult {
895 project_id: request.project_id.clone(),
896 namespace,
897 task_family: request.task_family.clone(),
898 subject: request.subject.clone(),
899 coverage_key,
900 decision: KnowledgeReuseDecision::NoPriorKnowledge,
901 reuse_reason: None,
902 skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
903 freshness_reason: None,
904 items: Vec::new(),
905 })
906 }
907
908 pub async fn retrieve_context(
913 &self,
914 query: &str,
915 project_id: Option<&str>,
916 session_id: Option<&str>,
917 token_budget: Option<i64>,
918 ) -> MemoryResult<MemoryContext> {
919 self.retrieve_context_for_tenant(
920 query,
921 project_id,
922 session_id,
923 &MemoryTenantScope::local(),
924 token_budget,
925 )
926 .await
927 }
928
929 pub async fn retrieve_context_for_tenant(
930 &self,
931 query: &str,
932 project_id: Option<&str>,
933 session_id: Option<&str>,
934 tenant_scope: &MemoryTenantScope,
935 token_budget: Option<i64>,
936 ) -> MemoryResult<MemoryContext> {
937 let (context, _) = self
938 .retrieve_context_with_meta_for_tenant(
939 query,
940 project_id,
941 session_id,
942 tenant_scope,
943 token_budget,
944 )
945 .await?;
946 Ok(context)
947 }
948
949 pub async fn retrieve_context_with_meta(
951 &self,
952 query: &str,
953 project_id: Option<&str>,
954 session_id: Option<&str>,
955 token_budget: Option<i64>,
956 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
957 self.retrieve_context_with_meta_for_tenant(
958 query,
959 project_id,
960 session_id,
961 &MemoryTenantScope::local(),
962 token_budget,
963 )
964 .await
965 }
966
967 pub async fn retrieve_context_with_meta_for_tenant(
968 &self,
969 query: &str,
970 project_id: Option<&str>,
971 session_id: Option<&str>,
972 tenant_scope: &MemoryTenantScope,
973 token_budget: Option<i64>,
974 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
975 self.retrieve_context_with_meta_for_tenant_with_access_filter(
976 query,
977 project_id,
978 session_id,
979 tenant_scope,
980 token_budget,
981 None,
982 )
983 .await
984 }
985
986 pub async fn retrieve_context_with_meta_for_tenant_with_access_filter(
987 &self,
988 query: &str,
989 project_id: Option<&str>,
990 session_id: Option<&str>,
991 tenant_scope: &MemoryTenantScope,
992 token_budget: Option<i64>,
993 access_filter: Option<&crate::types::MemoryAccessFilter>,
994 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
995 let config = if let Some(pid) = project_id {
996 self.db
997 .get_or_create_config_for_tenant(pid, tenant_scope)
998 .await?
999 } else {
1000 MemoryConfig::default()
1001 };
1002 let budget = token_budget.unwrap_or(config.token_budget);
1003 let retrieval_limit = config.retrieval_k.max(1);
1004
1005 let current_session = if let Some(sid) = session_id {
1007 self.db
1008 .get_session_chunks_for_tenant(sid, tenant_scope)
1009 .await?
1010 .into_iter()
1011 .filter(|chunk| memory_chunk_visible_to_access_filter(chunk, access_filter))
1012 .collect()
1013 } else {
1014 Vec::new()
1015 };
1016
1017 let search_results = self
1019 .search_for_tenant_with_access_filter(
1020 query,
1021 None,
1022 project_id,
1023 session_id,
1024 tenant_scope,
1025 Some(retrieval_limit),
1026 access_filter,
1027 )
1028 .await?;
1029
1030 let mut score_min: Option<f64> = None;
1031 let mut score_max: Option<f64> = None;
1032 for result in &search_results {
1033 score_min = Some(match score_min {
1034 Some(current) => current.min(result.similarity),
1035 None => result.similarity,
1036 });
1037 score_max = Some(match score_max {
1038 Some(current) => current.max(result.similarity),
1039 None => result.similarity,
1040 });
1041 }
1042
1043 let mut current_session = current_session;
1044 let mut relevant_history = Vec::new();
1045 let mut project_facts = Vec::new();
1046
1047 for result in search_results {
1048 match result.chunk.tier {
1049 MemoryTier::Project => {
1050 project_facts.push(result.chunk);
1051 }
1052 MemoryTier::Global => {
1053 project_facts.push(result.chunk);
1054 }
1055 MemoryTier::Session => {
1056 if !current_session.iter().any(|c| c.id == result.chunk.id) {
1058 relevant_history.push(result.chunk);
1059 }
1060 }
1061 }
1062 }
1063
1064 let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
1066 total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
1067 total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
1068
1069 if total_tokens > budget {
1071 let excess = total_tokens - budget;
1072 self.trim_context(
1073 &mut current_session,
1074 &mut relevant_history,
1075 &mut project_facts,
1076 excess,
1077 )?;
1078 total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
1079 + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
1080 + project_facts.iter().map(|c| c.token_count).sum::<i64>();
1081 }
1082
1083 let context = MemoryContext {
1084 current_session,
1085 relevant_history,
1086 project_facts,
1087 total_tokens,
1088 };
1089 let chunks_total = context.current_session.len()
1090 + context.relevant_history.len()
1091 + context.project_facts.len();
1092 let meta = MemoryRetrievalMeta {
1093 used: chunks_total > 0,
1094 chunks_total,
1095 session_chunks: context.current_session.len(),
1096 history_chunks: context.relevant_history.len(),
1097 project_fact_chunks: context.project_facts.len(),
1098 score_min,
1099 score_max,
1100 };
1101
1102 Ok((context, meta))
1103 }
1104
1105 fn trim_context(
1107 &self,
1108 current_session: &mut Vec<MemoryChunk>,
1109 relevant_history: &mut Vec<MemoryChunk>,
1110 project_facts: &mut Vec<MemoryChunk>,
1111 excess_tokens: i64,
1112 ) -> MemoryResult<()> {
1113 let mut tokens_to_remove = excess_tokens;
1114
1115 while tokens_to_remove > 0 && !relevant_history.is_empty() {
1117 if let Some(chunk) = relevant_history.pop() {
1118 tokens_to_remove -= chunk.token_count;
1119 }
1120 }
1121
1122 while tokens_to_remove > 0 && !project_facts.is_empty() {
1124 if let Some(chunk) = project_facts.pop() {
1125 tokens_to_remove -= chunk.token_count;
1126 }
1127 }
1128
1129 while tokens_to_remove > 0 && !current_session.is_empty() {
1130 if let Some(chunk) = current_session.pop() {
1131 tokens_to_remove -= chunk.token_count;
1132 }
1133 }
1134
1135 Ok(())
1136 }
1137
1138 pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
1140 self.clear_session_for_tenant(session_id, &MemoryTenantScope::local())
1141 .await
1142 }
1143
1144 pub async fn clear_session_for_tenant(
1145 &self,
1146 session_id: &str,
1147 tenant_scope: &MemoryTenantScope,
1148 ) -> MemoryResult<u64> {
1149 let count = self
1150 .db
1151 .clear_session_memory_for_tenant(session_id, tenant_scope)
1152 .await?;
1153
1154 self.db
1156 .log_cleanup_for_tenant(
1157 "manual",
1158 MemoryTier::Session,
1159 None,
1160 Some(session_id),
1161 count as i64,
1162 0,
1163 tenant_scope,
1164 )
1165 .await?;
1166
1167 Ok(count)
1168 }
1169
1170 pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
1172 self.clear_project_for_tenant(project_id, &MemoryTenantScope::local())
1173 .await
1174 }
1175
1176 pub async fn clear_project_for_tenant(
1177 &self,
1178 project_id: &str,
1179 tenant_scope: &MemoryTenantScope,
1180 ) -> MemoryResult<u64> {
1181 let count = self
1182 .db
1183 .clear_project_memory_for_tenant(project_id, tenant_scope)
1184 .await?;
1185
1186 self.db
1188 .log_cleanup_for_tenant(
1189 "manual",
1190 MemoryTier::Project,
1191 Some(project_id),
1192 None,
1193 count as i64,
1194 0,
1195 tenant_scope,
1196 )
1197 .await?;
1198
1199 Ok(count)
1200 }
1201
1202 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
1204 self.db.get_stats().await
1205 }
1206
1207 pub async fn get_stats_for_tenant(
1208 &self,
1209 tenant_scope: &MemoryTenantScope,
1210 ) -> MemoryResult<MemoryStats> {
1211 self.db.get_stats_for_tenant(tenant_scope).await
1212 }
1213
1214 pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
1216 self.db.get_or_create_config(project_id).await
1217 }
1218
1219 pub async fn get_config_for_tenant(
1220 &self,
1221 project_id: &str,
1222 tenant_scope: &MemoryTenantScope,
1223 ) -> MemoryResult<MemoryConfig> {
1224 self.db
1225 .get_or_create_config_for_tenant(project_id, tenant_scope)
1226 .await
1227 }
1228
1229 pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
1231 self.db.update_config(project_id, config).await
1232 }
1233
1234 pub async fn set_config_for_tenant(
1235 &self,
1236 project_id: &str,
1237 config: &MemoryConfig,
1238 tenant_scope: &MemoryTenantScope,
1239 ) -> MemoryResult<()> {
1240 self.db
1241 .update_config_for_tenant(project_id, config, tenant_scope)
1242 .await
1243 }
1244
1245 pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
1246 self.db.get_node_by_uri(uri).await
1247 }
1248
1249 pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
1250 let nodes = self.db.list_directory(uri).await?;
1251 let directories: Vec<MemoryNode> = nodes
1252 .iter()
1253 .filter(|n| n.node_type == NodeType::Directory)
1254 .cloned()
1255 .collect();
1256 let files: Vec<MemoryNode> = nodes
1257 .iter()
1258 .filter(|n| n.node_type == NodeType::File)
1259 .cloned()
1260 .collect();
1261
1262 Ok(DirectoryListing {
1263 uri: uri.to_string(),
1264 nodes,
1265 total_children: directories.len() + files.len(),
1266 directories,
1267 files,
1268 })
1269 }
1270
1271 pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
1272 self.db.get_children_tree(uri, max_depth).await
1273 }
1274
1275 pub async fn create_context_node(
1276 &self,
1277 uri: &str,
1278 node_type: NodeType,
1279 metadata: Option<serde_json::Value>,
1280 ) -> MemoryResult<String> {
1281 let parsed_uri =
1282 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
1283 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1284 self.db
1285 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1286 .await
1287 }
1288
1289 pub async fn get_context_layer(
1290 &self,
1291 node_id: &str,
1292 layer_type: LayerType,
1293 ) -> MemoryResult<Option<MemoryLayer>> {
1294 self.db.get_layer(node_id, layer_type).await
1295 }
1296
1297 pub async fn store_content_with_layers(
1298 &self,
1299 uri: &str,
1300 content: &str,
1301 metadata: Option<serde_json::Value>,
1302 ) -> MemoryResult<String> {
1303 let parsed_uri =
1304 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
1305 let node_type = if parsed_uri
1306 .last_segment()
1307 .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
1308 .unwrap_or(false)
1309 {
1310 NodeType::File
1311 } else {
1312 NodeType::Directory
1313 };
1314
1315 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1316 let node_id = self
1317 .db
1318 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1319 .await?;
1320
1321 let token_count = self.tokenizer.count_tokens(content) as i64;
1322 self.db
1323 .create_layer(&node_id, LayerType::L2, content, token_count, None)
1324 .await?;
1325
1326 Ok(node_id)
1327 }
1328
1329 pub async fn generate_layers_for_node(
1330 &self,
1331 node_id: &str,
1332 providers: &ProviderRegistry,
1333 ) -> MemoryResult<()> {
1334 let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1335 let l2_content = match l2_layer {
1336 Some(layer) => layer.content,
1337 None => return Ok(()),
1338 };
1339
1340 let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1341
1342 let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1343
1344 let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1345 let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1346
1347 if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1348 self.db
1349 .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1350 .await?;
1351 }
1352
1353 if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1354 self.db
1355 .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1356 .await?;
1357 }
1358
1359 Ok(())
1360 }
1361
1362 pub async fn get_layer_content(
1363 &self,
1364 node_id: &str,
1365 layer_type: LayerType,
1366 ) -> MemoryResult<Option<String>> {
1367 let layer = self.db.get_layer(node_id, layer_type).await?;
1368 Ok(layer.map(|l| l.content))
1369 }
1370
1371 pub async fn store_content_with_layers_auto(
1372 &self,
1373 uri: &str,
1374 content: &str,
1375 metadata: Option<serde_json::Value>,
1376 providers: Option<&ProviderRegistry>,
1377 ) -> MemoryResult<String> {
1378 let node_id = self
1379 .store_content_with_layers(uri, content, metadata)
1380 .await?;
1381
1382 if let Some(p) = providers {
1383 if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1384 tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1385 }
1386 }
1387
1388 Ok(node_id)
1389 }
1390
1391 pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1393 self.run_cleanup_for_tenant(project_id, &MemoryTenantScope::local())
1394 .await
1395 }
1396
1397 pub async fn run_cleanup_for_tenant(
1398 &self,
1399 project_id: Option<&str>,
1400 tenant_scope: &MemoryTenantScope,
1401 ) -> MemoryResult<u64> {
1402 let mut total_cleaned = 0u64;
1403
1404 if let Some(pid) = project_id {
1405 let config = self
1407 .db
1408 .get_or_create_config_for_tenant(pid, tenant_scope)
1409 .await?;
1410
1411 if config.auto_cleanup {
1412 let cleaned = self
1414 .db
1415 .cleanup_old_sessions_for_tenant(config.session_retention_days, tenant_scope)
1416 .await?;
1417 total_cleaned += cleaned;
1418
1419 if cleaned > 0 {
1420 self.db
1421 .log_cleanup_for_tenant(
1422 "auto",
1423 MemoryTier::Session,
1424 Some(pid),
1425 None,
1426 cleaned as i64,
1427 0,
1428 tenant_scope,
1429 )
1430 .await?;
1431 }
1432 }
1433 } else {
1434 let cleaned = self
1438 .db
1439 .cleanup_old_sessions_for_tenant(30, tenant_scope)
1440 .await?;
1441 total_cleaned += cleaned;
1442 }
1443
1444 if total_cleaned > 100 {
1446 self.db.vacuum().await?;
1447 }
1448
1449 Ok(total_cleaned)
1450 }
1451
1452 async fn maybe_cleanup(
1454 &self,
1455 project_id: &Option<String>,
1456 tenant_scope: &MemoryTenantScope,
1457 ) -> MemoryResult<()> {
1458 if let Some(pid) = project_id {
1459 let stats = self.db.get_stats_for_tenant(tenant_scope).await?;
1460 let config = self
1461 .db
1462 .get_or_create_config_for_tenant(pid, tenant_scope)
1463 .await?;
1464
1465 if stats.project_chunks > config.max_chunks {
1467 let excess = stats.project_chunks - config.max_chunks;
1469 tracing::info!("Project {} has {} excess chunks", pid, excess);
1472 }
1473 }
1474
1475 Ok(())
1476 }
1477
1478 pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1480 Ok(Vec::new())
1483 }
1484
1485 pub fn count_tokens(&self, text: &str) -> usize {
1487 self.tokenizer.count_tokens(text)
1488 }
1489
1490 pub async fn embedding_health(&self) -> EmbeddingHealth {
1492 let service = self.embedding_service.lock().await;
1493 if service.is_available() {
1494 EmbeddingHealth {
1495 status: "ok".to_string(),
1496 reason: None,
1497 }
1498 } else {
1499 EmbeddingHealth {
1500 status: "degraded_disabled".to_string(),
1501 reason: service.disabled_reason().map(ToString::to_string),
1502 }
1503 }
1504 }
1505
1506 pub async fn consolidate_session(
1508 &self,
1509 session_id: &str,
1510 project_id: Option<&str>,
1511 providers: &ProviderRegistry,
1512 config: &MemoryConsolidationConfig,
1513 ) -> MemoryResult<Option<String>> {
1514 if !config.enabled {
1515 return Ok(None);
1516 }
1517
1518 let chunks = self.db.get_session_chunks(session_id).await?;
1519 if chunks.is_empty() {
1520 return Ok(None);
1521 }
1522
1523 let mut text_parts = Vec::new();
1525 for chunk in &chunks {
1526 text_parts.push(chunk.content.clone());
1527 }
1528 let full_text = text_parts.join("\n\n---\n\n");
1529
1530 let prompt = format!(
1532 "Please provide a concise but comprehensive summary of the following chat session. \
1533 Focus on the key decisions, technical details, code changes, and unresolved issues. \
1534 Do NOT include conversational filler, greetings, or sign-offs. \
1535 This summary will be used as long-term memory to recall the context of this work.\n\n\
1536 Session transcripts:\n\n{}",
1537 full_text
1538 );
1539
1540 let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1541 let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1542
1543 let summary_text = match providers
1544 .complete_cheapest(&prompt, provider_override, model_override)
1545 .await
1546 {
1547 Ok(s) => s,
1548 Err(e) => {
1549 tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1550 return Ok(None);
1551 }
1552 };
1553
1554 if summary_text.trim().is_empty() {
1555 return Ok(None);
1556 }
1557
1558 let embedding = {
1560 let service = self.embedding_service.lock().await;
1561 service
1562 .embed(&summary_text)
1563 .await
1564 .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1565 };
1566
1567 let chunk_id = uuid::Uuid::new_v4().to_string();
1569 let chunk = MemoryChunk {
1570 id: chunk_id,
1571 content: summary_text.clone(),
1572 tier: MemoryTier::Project,
1573 session_id: None, project_id: project_id.map(ToString::to_string),
1575 created_at: Utc::now(),
1576 source: "consolidation".to_string(),
1577 token_count: self.count_tokens(&summary_text) as i64,
1578 source_path: None,
1579 source_mtime: None,
1580 source_size: None,
1581 source_hash: None,
1582 tenant_scope: MemoryTenantScope::local(),
1583 metadata: None,
1584 };
1585
1586 self.db.store_chunk(&chunk, &embedding).await?;
1587
1588 self.db.clear_session_memory(session_id).await?;
1590
1591 tracing::info!(
1592 "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1593 );
1594
1595 Ok(Some(summary_text))
1596 }
1597}
1598
1599fn memory_chunk_visible_to_access_filter(
1600 chunk: &MemoryChunk,
1601 access_filter: Option<&crate::types::MemoryAccessFilter>,
1602) -> bool {
1603 if crate::types::MemorySourceAccessTarget::from_chunk(chunk).is_none() {
1604 return true;
1605 }
1606 access_filter
1607 .map(|filter| filter.allows_chunk(chunk))
1608 .unwrap_or(false)
1609}
1610
1611pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1613 let db_path = app_data_dir.join("tandem_memory.db");
1614 MemoryManager::new(&db_path).await
1615}