Skip to main content

tandem_memory/manager_parts/
part01.rs

1// Memory Manager Module
2// High-level memory operations (store, retrieve, cleanup)
3
4use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::types::{
10    CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
11    KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
12    LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
13    MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTier, NodeType,
14    StoreMessageRequest, TreeNode,
15};
16use chrono::Utc;
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tandem_orchestrator::{
21    build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
22    KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
23    KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
24};
25use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
26use tokio::sync::Mutex;
27
28/// High-level memory manager that coordinates database, embeddings, and chunking
29pub struct MemoryManager {
30    db: Arc<MemoryDatabase>,
31    embedding_service: Arc<Mutex<EmbeddingService>>,
32    tokenizer: Tokenizer,
33}
34
35const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
36
37impl MemoryManager {
38    fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
39        err.to_string()
40            .to_lowercase()
41            .contains("database disk image is malformed")
42    }
43
44    pub fn db(&self) -> &Arc<MemoryDatabase> {
45        &self.db
46    }
47
48    /// Initialize the memory manager
49    pub async fn new(db_path: &Path) -> MemoryResult<Self> {
50        let db = Arc::new(MemoryDatabase::new(db_path).await?);
51        let embedding_service = Arc::new(Mutex::new(EmbeddingService::new()));
52        let tokenizer = Tokenizer::new()?;
53
54        Ok(Self {
55            db,
56            embedding_service,
57            tokenizer,
58        })
59    }
60
61    /// Store a message in memory
62    ///
63    /// This will:
64    /// 1. Chunk the message content
65    /// 2. Generate embeddings for each chunk
66    /// 3. Store chunks and embeddings in the database
67    pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
68        if self
69            .db
70            .ensure_vector_tables_healthy()
71            .await
72            .unwrap_or(false)
73        {
74            tracing::warn!("Memory vector tables were repaired before storing message chunks");
75        }
76
77        let config = if let Some(ref pid) = request.project_id {
78            self.db.get_or_create_config(pid).await?
79        } else {
80            MemoryConfig::default()
81        };
82
83        // Chunk the content
84        let chunking_config = ChunkingConfig {
85            chunk_size: config.chunk_size as usize,
86            chunk_overlap: config.chunk_overlap as usize,
87            separator: None,
88        };
89
90        let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
91
92        if text_chunks.is_empty() {
93            return Ok(Vec::new());
94        }
95
96        let mut chunk_ids = Vec::with_capacity(text_chunks.len());
97        let embedding_service = self.embedding_service.lock().await;
98
99        for text_chunk in text_chunks {
100            let chunk_id = uuid::Uuid::new_v4().to_string();
101
102            // Generate embedding
103            let embedding = embedding_service.embed(&text_chunk.content).await?;
104
105            // Create memory chunk
106            let chunk = MemoryChunk {
107                id: chunk_id.clone(),
108                content: text_chunk.content,
109                tier: request.tier,
110                session_id: request.session_id.clone(),
111                project_id: request.project_id.clone(),
112                source: request.source.clone(),
113                source_path: request.source_path.clone(),
114                source_mtime: request.source_mtime,
115                source_size: request.source_size,
116                source_hash: request.source_hash.clone(),
117                created_at: Utc::now(),
118                token_count: text_chunk.token_count as i64,
119                metadata: request.metadata.clone(),
120            };
121
122            // Store in database (retry once after vector-table self-heal).
123            if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
124                tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
125                let repaired = {
126                    let repaired_after_error =
127                        self.db.try_repair_after_error(&err).await.unwrap_or(false);
128                    repaired_after_error
129                        || self
130                            .db
131                            .ensure_vector_tables_healthy()
132                            .await
133                            .unwrap_or(false)
134                };
135                if repaired {
136                    tracing::warn!(
137                        "Retrying memory chunk insert after vector table repair: {}",
138                        chunk.id
139                    );
140                    if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
141                        if Self::is_malformed_database_error(&retry_err) {
142                            tracing::warn!(
143                                "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
144                                chunk.id
145                            );
146                            self.db.reset_all_memory_tables().await?;
147                            self.db.store_chunk(&chunk, &embedding).await?;
148                        } else {
149                            return Err(retry_err);
150                        }
151                    }
152                } else {
153                    return Err(err);
154                }
155            }
156            chunk_ids.push(chunk_id);
157        }
158
159        // Check if cleanup is needed
160        if config.auto_cleanup {
161            self.maybe_cleanup(&request.project_id).await?;
162        }
163
164        Ok(chunk_ids)
165    }
166
167    /// Search memory for relevant chunks
168    pub async fn search(
169        &self,
170        query: &str,
171        tier: Option<MemoryTier>,
172        project_id: Option<&str>,
173        session_id: Option<&str>,
174        limit: Option<i64>,
175    ) -> MemoryResult<Vec<MemorySearchResult>> {
176        let effective_limit = limit.unwrap_or(5);
177
178        // Generate query embedding
179        let embedding_service = self.embedding_service.lock().await;
180        let query_embedding = embedding_service.embed(query).await?;
181        drop(embedding_service);
182
183        let mut results = Vec::new();
184
185        // Search in specified tier or all tiers
186        let tiers_to_search = match tier {
187            Some(t) => vec![t],
188            None => {
189                if project_id.is_some() {
190                    vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
191                } else {
192                    vec![MemoryTier::Session, MemoryTier::Global]
193                }
194            }
195        };
196
197        for search_tier in tiers_to_search {
198            let tier_results = match self
199                .db
200                .search_similar(
201                    &query_embedding,
202                    search_tier,
203                    project_id,
204                    session_id,
205                    effective_limit,
206                )
207                .await
208            {
209                Ok(results) => results,
210                Err(err) => {
211                    tracing::warn!(
212                        "Memory tier search failed for {:?}: {}. Attempting vector repair.",
213                        search_tier,
214                        err
215                    );
216                    let repaired = {
217                        let repaired_after_error =
218                            self.db.try_repair_after_error(&err).await.unwrap_or(false);
219                        repaired_after_error
220                            || self
221                                .db
222                                .ensure_vector_tables_healthy()
223                                .await
224                                .unwrap_or(false)
225                    };
226                    if repaired {
227                        match self
228                            .db
229                            .search_similar(
230                                &query_embedding,
231                                search_tier,
232                                project_id,
233                                session_id,
234                                effective_limit,
235                            )
236                            .await
237                        {
238                            Ok(results) => results,
239                            Err(retry_err) => {
240                                tracing::warn!(
241                                    "Memory tier search still failing for {:?} after repair: {}",
242                                    search_tier,
243                                    retry_err
244                                );
245                                continue;
246                            }
247                        }
248                    } else {
249                        continue;
250                    }
251                }
252            };
253
254            for (chunk, distance) in tier_results {
255                // Convert distance to similarity (cosine similarity)
256                // sqlite-vec returns distance, where lower is more similar
257                // Cosine similarity ranges from -1 to 1, but for normalized vectors it's 0 to 1
258                let similarity = 1.0 - distance.clamp(0.0, 1.0);
259
260                results.push(MemorySearchResult { chunk, similarity });
261            }
262        }
263
264        // Sort by similarity (highest first) and limit results
265        results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
266        results.truncate(effective_limit as usize);
267
268        Ok(results)
269    }
270
271    pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
272        self.db.upsert_knowledge_space(space).await
273    }
274
275    pub async fn get_knowledge_space(
276        &self,
277        id: &str,
278    ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
279        self.db.get_knowledge_space(id).await
280    }
281
282    pub async fn list_knowledge_spaces(
283        &self,
284        project_id: Option<&str>,
285    ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
286        self.db.list_knowledge_spaces(project_id).await
287    }
288
289    pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
290        self.db.upsert_knowledge_item(item).await
291    }
292
293    pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
294        self.db.get_knowledge_item(id).await
295    }
296
297    pub async fn list_knowledge_items(
298        &self,
299        space_id: &str,
300        coverage_key: Option<&str>,
301    ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
302        self.db.list_knowledge_items(space_id, coverage_key).await
303    }
304
305    pub async fn upsert_knowledge_coverage(
306        &self,
307        coverage: &KnowledgeCoverageRecord,
308    ) -> MemoryResult<()> {
309        self.db.upsert_knowledge_coverage(coverage).await
310    }
311
312    pub async fn get_knowledge_coverage(
313        &self,
314        coverage_key: &str,
315        space_id: &str,
316    ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
317        self.db.get_knowledge_coverage(coverage_key, space_id).await
318    }
319
320    pub async fn promote_knowledge_item(
321        &self,
322        request: &KnowledgePromotionRequest,
323    ) -> MemoryResult<Option<KnowledgePromotionResult>> {
324        self.db.promote_knowledge_item(request).await
325    }
326
327    fn space_matches_ref(
328        space: &KnowledgeSpaceRecord,
329        space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
330        project_id: &str,
331    ) -> bool {
332        if space.scope != space_ref.scope {
333            return false;
334        }
335        match space_ref.scope {
336            KnowledgeScope::Project | KnowledgeScope::Run => {
337                let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
338                if space.project_id.as_deref() != Some(target_project) {
339                    return false;
340                }
341            }
342            KnowledgeScope::Global => {}
343        }
344        if let Some(namespace) = space_ref.namespace.as_deref() {
345            if space.namespace.as_deref() != Some(namespace) {
346                return false;
347            }
348        }
349        true
350    }
351
352    fn select_preflight_namespace(
353        binding: &KnowledgeBinding,
354        spaces: &[KnowledgeSpaceRecord],
355    ) -> Option<String> {
356        if let Some(namespace) = binding.namespace.clone() {
357            return Some(namespace);
358        }
359        if binding.read_spaces.len() == 1 {
360            if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
361                return Some(namespace);
362            }
363        }
364        if spaces.len() == 1 {
365            return spaces[0].namespace.clone();
366        }
367        let mut unique = HashSet::new();
368        for space in spaces {
369            if let Some(namespace) = space.namespace.as_ref() {
370                unique.insert(namespace);
371            }
372        }
373        if unique.len() == 1 {
374            unique.into_iter().next().map(|value| value.to_string())
375        } else {
376            None
377        }
378    }
379
380    fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
381        !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
382    }
383
384    fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
385        match (space_namespace, binding_namespace) {
386            (None, None) => true,
387            (Some(space), Some(binding)) => {
388                normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
389            }
390            _ => false,
391        }
392    }
393
394    fn is_fresh_enough(
395        freshness_expires_at_ms: Option<u64>,
396        freshness_policy_ms: Option<u64>,
397        coverage_last_promoted_at_ms: Option<u64>,
398        item_created_at_ms: u64,
399        now_ms: u64,
400    ) -> bool {
401        if let Some(expires_at_ms) = freshness_expires_at_ms {
402            return expires_at_ms > now_ms;
403        }
404        let Some(policy_ms) = freshness_policy_ms else {
405            return true;
406        };
407        let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
408        now_ms.saturating_sub(basis_ms) <= policy_ms
409    }
410
411    async fn resolve_preflight_spaces(
412        &self,
413        request: &KnowledgePreflightRequest,
414        _coverage_key: &str,
415    ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
416        let binding = &request.binding;
417        let mut spaces = Vec::new();
418        let mut seen_space_ids = HashSet::new();
419
420        let push_space = |space: KnowledgeSpaceRecord,
421                          spaces: &mut Vec<KnowledgeSpaceRecord>,
422                          seen_space_ids: &mut HashSet<String>| {
423            if seen_space_ids.insert(space.id.clone()) {
424                spaces.push(space);
425            }
426        };
427
428        if Self::binding_uses_explicit_spaces(binding) {
429            for space_ref in binding
430                .read_spaces
431                .iter()
432                .chain(binding.promote_spaces.iter())
433            {
434                if let Some(space_id) = space_ref.space_id.as_deref() {
435                    if let Some(space) = self.get_knowledge_space(space_id).await? {
436                        push_space(space, &mut spaces, &mut seen_space_ids);
437                    }
438                    continue;
439                }
440
441                match space_ref.scope {
442                    KnowledgeScope::Run => {}
443                    KnowledgeScope::Project => {
444                        let candidate_project_id = space_ref
445                            .project_id
446                            .as_deref()
447                            .unwrap_or(&request.project_id);
448                        let project_spaces = self
449                            .list_knowledge_spaces(Some(candidate_project_id))
450                            .await?;
451                        for space in project_spaces.into_iter().filter(|space| {
452                            Self::space_matches_ref(space, space_ref, &request.project_id)
453                        }) {
454                            push_space(space, &mut spaces, &mut seen_space_ids);
455                        }
456                    }
457                    KnowledgeScope::Global => {
458                        let global_spaces = self.list_knowledge_spaces(None).await?;
459                        for space in global_spaces.into_iter().filter(|space| {
460                            Self::space_matches_ref(space, space_ref, &request.project_id)
461                        }) {
462                            push_space(space, &mut spaces, &mut seen_space_ids);
463                        }
464                    }
465                }
466            }
467            return Ok(spaces);
468        }
469
470        if request.project_id.trim().is_empty() {
471            return Ok(spaces);
472        }
473
474        let project_spaces = self
475            .list_knowledge_spaces(Some(&request.project_id))
476            .await?;
477        let requested_namespace = if binding.namespace.is_some() {
478            binding.namespace.clone()
479        } else {
480            Self::select_preflight_namespace(binding, &project_spaces)
481        };
482        let Some(requested_namespace) = requested_namespace else {
483            return Ok(spaces);
484        };
485
486        for space in project_spaces.into_iter().filter(|space| {
487            space.scope == KnowledgeScope::Project
488                && Self::namespace_matches(
489                    space.namespace.as_deref(),
490                    Some(requested_namespace.as_str()),
491                )
492        }) {
493            push_space(space, &mut spaces, &mut seen_space_ids);
494        }
495        Ok(spaces)
496    }
497
498    pub async fn preflight_knowledge(
499        &self,
500        request: &KnowledgePreflightRequest,
501    ) -> MemoryResult<KnowledgePreflightResult> {
502        let binding = &request.binding;
503        let project_spaces = if request.project_id.trim().is_empty() {
504            Vec::new()
505        } else {
506            self.list_knowledge_spaces(Some(&request.project_id))
507                .await?
508        };
509        let namespace = binding
510            .namespace
511            .clone()
512            .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
513        let coverage_key = build_knowledge_coverage_key(
514            &request.project_id,
515            namespace.as_deref(),
516            &request.task_family,
517            &request.subject,
518        );
519
520        if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
521            return Ok(KnowledgePreflightResult {
522                project_id: request.project_id.clone(),
523                namespace,
524                task_family: request.task_family.clone(),
525                subject: request.subject.clone(),
526                coverage_key,
527                decision: KnowledgeReuseDecision::Disabled,
528                reuse_reason: None,
529                skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
530                freshness_reason: None,
531                items: Vec::new(),
532            });
533        }
534
535        let spaces = self
536            .resolve_preflight_spaces(request, &coverage_key)
537            .await?;
538        if spaces.is_empty() {
539            return Ok(KnowledgePreflightResult {
540                project_id: request.project_id.clone(),
541                namespace,
542                task_family: request.task_family.clone(),
543                subject: request.subject.clone(),
544                coverage_key,
545                decision: KnowledgeReuseDecision::NoPriorKnowledge,
546                reuse_reason: None,
547                skip_reason: Some("no reusable knowledge spaces were found".to_string()),
548                freshness_reason: None,
549                items: Vec::new(),
550            });
551        }
552
553        let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
554        let mut fresh_items = Vec::new();
555        let mut stale_items = Vec::new();
556        let mut freshest_reason = None;
557
558        for space in &spaces {
559            let items = self
560                .list_knowledge_items(&space.id, Some(&coverage_key))
561                .await?;
562            let coverage = self
563                .get_knowledge_coverage(&coverage_key, &space.id)
564                .await?;
565            for item in items {
566                if !item.status.is_active() {
567                    continue;
568                }
569                let Some(trust_level) = item.status.as_trust_level() else {
570                    continue;
571                };
572                if !trust_level.meets_floor(binding.trust_floor) {
573                    continue;
574                }
575                let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
576                    coverage
577                        .as_ref()
578                        .and_then(|coverage| coverage.freshness_expires_at_ms)
579                });
580                let pack_item = KnowledgePackItem {
581                    item_id: item.id.clone(),
582                    space_id: space.id.clone(),
583                    coverage_key: item.coverage_key.clone(),
584                    title: item.title.clone(),
585                    summary: item.summary.clone(),
586                    trust_level,
587                    status: item.status.to_string(),
588                    artifact_refs: item.artifact_refs.clone(),
589                    source_memory_ids: item.source_memory_ids.clone(),
590                    freshness_expires_at_ms,
591                };
592                if Self::is_fresh_enough(
593                    freshness_expires_at_ms,
594                    binding.freshness_ms,
595                    coverage
596                        .as_ref()
597                        .and_then(|coverage| coverage.last_promoted_at_ms),
598                    item.created_at_ms,
599                    now_ms,
600                ) {
601                    fresh_items.push(pack_item);
602                } else {
603                    freshest_reason = Some(match freshness_expires_at_ms {
604                        Some(expires_at_ms) => format!(
605                            "coverage `{}` in space `{}` expired at {}",
606                            coverage_key, space.id, expires_at_ms
607                        ),
608                        None => format!(
609                            "coverage `{}` in space `{}` lacks freshness metadata",
610                            coverage_key, space.id
611                        ),
612                    });
613                    stale_items.push(pack_item);
614                }
615            }
616        }
617
618        fresh_items.sort_by(|left, right| {
619            right
620                .trust_level
621                .rank()
622                .cmp(&left.trust_level.rank())
623                .then_with(|| {
624                    right
625                        .freshness_expires_at_ms
626                        .unwrap_or(0)
627                        .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
628                })
629                .then_with(|| left.title.cmp(&right.title))
630        });
631        stale_items.sort_by(|left, right| {
632            right
633                .trust_level
634                .rank()
635                .cmp(&left.trust_level.rank())
636                .then_with(|| left.title.cmp(&right.title))
637        });
638
639        if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
640            let selected = fresh_items
641                .into_iter()
642                .take(MAX_KNOWLEDGE_PACK_ITEMS)
643                .collect::<Vec<_>>();
644            let decision = match freshest_trust_level {
645                KnowledgeTrustLevel::ApprovedDefault => {
646                    KnowledgeReuseDecision::ReuseApprovedDefault
647                }
648                _ => KnowledgeReuseDecision::ReusePromoted,
649            };
650            let selected_count = selected.len();
651            return Ok(KnowledgePreflightResult {
652                project_id: request.project_id.clone(),
653                namespace,
654                task_family: request.task_family.clone(),
655                subject: request.subject.clone(),
656                coverage_key,
657                decision,
658                reuse_reason: Some(format!(
659                    "reusing {} promoted knowledge item(s) from {} space(s)",
660                    selected_count,
661                    spaces.len()
662                )),
663                skip_reason: None,
664                freshness_reason: None,
665                items: selected,
666            });
667        }
668
669        if !stale_items.is_empty() {
670            let selected = stale_items
671                .into_iter()
672                .take(MAX_KNOWLEDGE_PACK_ITEMS)
673                .collect::<Vec<_>>();
674            return Ok(KnowledgePreflightResult {
675                project_id: request.project_id.clone(),
676                namespace,
677                task_family: request.task_family.clone(),
678                subject: request.subject.clone(),
679                coverage_key,
680                decision: KnowledgeReuseDecision::RefreshRequired,
681                reuse_reason: None,
682                skip_reason: Some(
683                    "prior knowledge exists but is not fresh enough to reuse".to_string(),
684                ),
685                freshness_reason: freshest_reason.or_else(|| {
686                    Some("matching knowledge exists but freshness policy rejected it".to_string())
687                }),
688                items: selected,
689            });
690        }
691
692        Ok(KnowledgePreflightResult {
693            project_id: request.project_id.clone(),
694            namespace,
695            task_family: request.task_family.clone(),
696            subject: request.subject.clone(),
697            coverage_key,
698            decision: KnowledgeReuseDecision::NoPriorKnowledge,
699            reuse_reason: None,
700            skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
701            freshness_reason: None,
702            items: Vec::new(),
703        })
704    }
705
706    /// Retrieve context for a message
707    ///
708    /// This retrieves relevant chunks from all tiers and formats them
709    /// for injection into the prompt
710    pub async fn retrieve_context(
711        &self,
712        query: &str,
713        project_id: Option<&str>,
714        session_id: Option<&str>,
715        token_budget: Option<i64>,
716    ) -> MemoryResult<MemoryContext> {
717        let (context, _) = self
718            .retrieve_context_with_meta(query, project_id, session_id, token_budget)
719            .await?;
720        Ok(context)
721    }
722
723    /// Retrieve context plus retrieval metadata for observability.
724    pub async fn retrieve_context_with_meta(
725        &self,
726        query: &str,
727        project_id: Option<&str>,
728        session_id: Option<&str>,
729        token_budget: Option<i64>,
730    ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
731        let config = if let Some(pid) = project_id {
732            self.db.get_or_create_config(pid).await?
733        } else {
734            MemoryConfig::default()
735        };
736        let budget = token_budget.unwrap_or(config.token_budget);
737        let retrieval_limit = config.retrieval_k.max(1);
738
739        // Get recent session chunks
740        let current_session = if let Some(sid) = session_id {
741            self.db.get_session_chunks(sid).await?
742        } else {
743            Vec::new()
744        };
745
746        // Search for relevant history
747        let search_results = self
748            .search(query, None, project_id, session_id, Some(retrieval_limit))
749            .await?;
750
751        let mut score_min: Option<f64> = None;
752        let mut score_max: Option<f64> = None;
753        for result in &search_results {
754            score_min = Some(match score_min {
755                Some(current) => current.min(result.similarity),
756                None => result.similarity,
757            });
758            score_max = Some(match score_max {
759                Some(current) => current.max(result.similarity),
760                None => result.similarity,
761            });
762        }
763
764        let mut current_session = current_session;
765        let mut relevant_history = Vec::new();
766        let mut project_facts = Vec::new();
767
768        for result in search_results {
769            match result.chunk.tier {
770                MemoryTier::Project => {
771                    project_facts.push(result.chunk);
772                }
773                MemoryTier::Global => {
774                    project_facts.push(result.chunk);
775                }
776                MemoryTier::Session => {
777                    // Only add to relevant_history if not in current_session
778                    if !current_session.iter().any(|c| c.id == result.chunk.id) {
779                        relevant_history.push(result.chunk);
780                    }
781                }
782            }
783        }
784
785        // Calculate total tokens and trim if necessary
786        let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
787        total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
788        total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
789
790        // Trim to fit budget if necessary
791        if total_tokens > budget {
792            let excess = total_tokens - budget;
793            self.trim_context(
794                &mut current_session,
795                &mut relevant_history,
796                &mut project_facts,
797                excess,
798            )?;
799            total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
800                + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
801                + project_facts.iter().map(|c| c.token_count).sum::<i64>();
802        }
803
804        let context = MemoryContext {
805            current_session,
806            relevant_history,
807            project_facts,
808            total_tokens,
809        };
810        let chunks_total = context.current_session.len()
811            + context.relevant_history.len()
812            + context.project_facts.len();
813        let meta = MemoryRetrievalMeta {
814            used: chunks_total > 0,
815            chunks_total,
816            session_chunks: context.current_session.len(),
817            history_chunks: context.relevant_history.len(),
818            project_fact_chunks: context.project_facts.len(),
819            score_min,
820            score_max,
821        };
822
823        Ok((context, meta))
824    }
825
826    /// Trim context to fit within token budget
827    fn trim_context(
828        &self,
829        current_session: &mut Vec<MemoryChunk>,
830        relevant_history: &mut Vec<MemoryChunk>,
831        project_facts: &mut Vec<MemoryChunk>,
832        excess_tokens: i64,
833    ) -> MemoryResult<()> {
834        let mut tokens_to_remove = excess_tokens;
835
836        // First, trim relevant_history (less important than project_facts)
837        while tokens_to_remove > 0 && !relevant_history.is_empty() {
838            if let Some(chunk) = relevant_history.pop() {
839                tokens_to_remove -= chunk.token_count;
840            }
841        }
842
843        // If still over budget, trim project_facts
844        while tokens_to_remove > 0 && !project_facts.is_empty() {
845            if let Some(chunk) = project_facts.pop() {
846                tokens_to_remove -= chunk.token_count;
847            }
848        }
849
850        while tokens_to_remove > 0 && !current_session.is_empty() {
851            if let Some(chunk) = current_session.pop() {
852                tokens_to_remove -= chunk.token_count;
853            }
854        }
855
856        Ok(())
857    }
858
859    /// Clear session memory
860    pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
861        let count = self.db.clear_session_memory(session_id).await?;
862
863        // Log cleanup
864        self.db
865            .log_cleanup(
866                "manual",
867                MemoryTier::Session,
868                None,
869                Some(session_id),
870                count as i64,
871                0,
872            )
873            .await?;
874
875        Ok(count)
876    }
877
878    /// Clear project memory
879    pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
880        let count = self.db.clear_project_memory(project_id).await?;
881
882        // Log cleanup
883        self.db
884            .log_cleanup(
885                "manual",
886                MemoryTier::Project,
887                Some(project_id),
888                None,
889                count as i64,
890                0,
891            )
892            .await?;
893
894        Ok(count)
895    }
896
897    /// Get memory statistics
898    pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
899        self.db.get_stats().await
900    }
901
902    /// Get memory configuration for a project
903    pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
904        self.db.get_or_create_config(project_id).await
905    }
906
907    /// Update memory configuration for a project
908    pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
909        self.db.update_config(project_id, config).await
910    }
911
912    pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
913        self.db.get_node_by_uri(uri).await
914    }
915
916    pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
917        let nodes = self.db.list_directory(uri).await?;
918        let directories: Vec<MemoryNode> = nodes
919            .iter()
920            .filter(|n| n.node_type == NodeType::Directory)
921            .cloned()
922            .collect();
923        let files: Vec<MemoryNode> = nodes
924            .iter()
925            .filter(|n| n.node_type == NodeType::File)
926            .cloned()
927            .collect();
928
929        Ok(DirectoryListing {
930            uri: uri.to_string(),
931            nodes,
932            total_children: directories.len() + files.len(),
933            directories,
934            files,
935        })
936    }
937
938    pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
939        self.db.get_children_tree(uri, max_depth).await
940    }
941
942    pub async fn create_context_node(
943        &self,
944        uri: &str,
945        node_type: NodeType,
946        metadata: Option<serde_json::Value>,
947    ) -> MemoryResult<String> {
948        let parsed_uri =
949            ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
950        let parent_uri = parsed_uri.parent().map(|p| p.to_string());
951        self.db
952            .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
953            .await
954    }
955
956    pub async fn get_context_layer(
957        &self,
958        node_id: &str,
959        layer_type: LayerType,
960    ) -> MemoryResult<Option<MemoryLayer>> {
961        self.db.get_layer(node_id, layer_type).await
962    }
963
964    pub async fn store_content_with_layers(
965        &self,
966        uri: &str,
967        content: &str,
968        metadata: Option<serde_json::Value>,
969    ) -> MemoryResult<String> {
970        let parsed_uri =
971            ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
972        let node_type = if parsed_uri
973            .last_segment()
974            .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
975            .unwrap_or(false)
976        {
977            NodeType::File
978        } else {
979            NodeType::Directory
980        };
981
982        let parent_uri = parsed_uri.parent().map(|p| p.to_string());
983        let node_id = self
984            .db
985            .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
986            .await?;
987
988        let token_count = self.tokenizer.count_tokens(content) as i64;
989        self.db
990            .create_layer(&node_id, LayerType::L2, content, token_count, None)
991            .await?;
992
993        Ok(node_id)
994    }
995
996    pub async fn generate_layers_for_node(
997        &self,
998        node_id: &str,
999        providers: &ProviderRegistry,
1000    ) -> MemoryResult<()> {
1001        let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1002        let l2_content = match l2_layer {
1003            Some(layer) => layer.content,
1004            None => return Ok(()),
1005        };
1006
1007        let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1008
1009        let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1010
1011        let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1012        let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1013
1014        if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1015            self.db
1016                .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1017                .await?;
1018        }
1019
1020        if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1021            self.db
1022                .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1023                .await?;
1024        }
1025
1026        Ok(())
1027    }
1028
1029    pub async fn get_layer_content(
1030        &self,
1031        node_id: &str,
1032        layer_type: LayerType,
1033    ) -> MemoryResult<Option<String>> {
1034        let layer = self.db.get_layer(node_id, layer_type).await?;
1035        Ok(layer.map(|l| l.content))
1036    }
1037
1038    pub async fn store_content_with_layers_auto(
1039        &self,
1040        uri: &str,
1041        content: &str,
1042        metadata: Option<serde_json::Value>,
1043        providers: Option<&ProviderRegistry>,
1044    ) -> MemoryResult<String> {
1045        let node_id = self
1046            .store_content_with_layers(uri, content, metadata)
1047            .await?;
1048
1049        if let Some(p) = providers {
1050            if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1051                tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1052            }
1053        }
1054
1055        Ok(node_id)
1056    }
1057
1058    /// Run cleanup based on retention policies
1059    pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1060        let mut total_cleaned = 0u64;
1061
1062        if let Some(pid) = project_id {
1063            // Get config for this project
1064            let config = self.db.get_or_create_config(pid).await?;
1065
1066            if config.auto_cleanup {
1067                // Clean up old session memory
1068                let cleaned = self
1069                    .db
1070                    .cleanup_old_sessions(config.session_retention_days)
1071                    .await?;
1072                total_cleaned += cleaned;
1073
1074                if cleaned > 0 {
1075                    self.db
1076                        .log_cleanup(
1077                            "auto",
1078                            MemoryTier::Session,
1079                            Some(pid),
1080                            None,
1081                            cleaned as i64,
1082                            0,
1083                        )
1084                        .await?;
1085                }
1086            }
1087        } else {
1088            // Clean up all projects with auto_cleanup enabled
1089            // This would require listing all projects, for now just clean session memory
1090            // with a default retention period
1091            let cleaned = self.db.cleanup_old_sessions(30).await?;
1092            total_cleaned += cleaned;
1093        }
1094
1095        // Vacuum if significant cleanup occurred
1096        if total_cleaned > 100 {
1097            self.db.vacuum().await?;
1098        }
1099
1100        Ok(total_cleaned)
1101    }
1102
1103    /// Check if cleanup is needed and run it
1104    async fn maybe_cleanup(&self, project_id: &Option<String>) -> MemoryResult<()> {
1105        if let Some(pid) = project_id {
1106            let stats = self.db.get_stats().await?;
1107            let config = self.db.get_or_create_config(pid).await?;
1108
1109            // Check if we're over the chunk limit
1110            if stats.project_chunks > config.max_chunks {
1111                // Remove oldest chunks
1112                let excess = stats.project_chunks - config.max_chunks;
1113                // This would require a new DB method to delete oldest chunks
1114                // For now, just log
1115                tracing::info!("Project {} has {} excess chunks", pid, excess);
1116            }
1117        }
1118
1119        Ok(())
1120    }
1121
1122    /// Get cleanup log entries
1123    pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1124        // This would be implemented in the DB layer
1125        // For now, return empty
1126        Ok(Vec::new())
1127    }
1128
1129    /// Count tokens in text
1130    pub fn count_tokens(&self, text: &str) -> usize {
1131        self.tokenizer.count_tokens(text)
1132    }
1133
1134    /// Report embedding backend health for UI/telemetry.
1135    pub async fn embedding_health(&self) -> EmbeddingHealth {
1136        let service = self.embedding_service.lock().await;
1137        if service.is_available() {
1138            EmbeddingHealth {
1139                status: "ok".to_string(),
1140                reason: None,
1141            }
1142        } else {
1143            EmbeddingHealth {
1144                status: "degraded_disabled".to_string(),
1145                reason: service.disabled_reason().map(ToString::to_string),
1146            }
1147        }
1148    }
1149
1150    /// Consolidate a session's memory into a summary chunk using the cheapest available provider.
1151    pub async fn consolidate_session(
1152        &self,
1153        session_id: &str,
1154        project_id: Option<&str>,
1155        providers: &ProviderRegistry,
1156        config: &MemoryConsolidationConfig,
1157    ) -> MemoryResult<Option<String>> {
1158        if !config.enabled {
1159            return Ok(None);
1160        }
1161
1162        let chunks = self.db.get_session_chunks(session_id).await?;
1163        if chunks.is_empty() {
1164            return Ok(None);
1165        }
1166
1167        // Assemble text
1168        let mut text_parts = Vec::new();
1169        for chunk in &chunks {
1170            text_parts.push(chunk.content.clone());
1171        }
1172        let full_text = text_parts.join("\n\n---\n\n");
1173
1174        // Build prompt
1175        let prompt = format!(
1176            "Please provide a concise but comprehensive summary of the following chat session. \
1177            Focus on the key decisions, technical details, code changes, and unresolved issues. \
1178            Do NOT include conversational filler, greetings, or sign-offs. \
1179            This summary will be used as long-term memory to recall the context of this work.\n\n\
1180            Session transcripts:\n\n{}",
1181            full_text
1182        );
1183
1184        let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1185        let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1186
1187        let summary_text = match providers
1188            .complete_cheapest(&prompt, provider_override, model_override)
1189            .await
1190        {
1191            Ok(s) => s,
1192            Err(e) => {
1193                tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1194                return Ok(None);
1195            }
1196        };
1197
1198        if summary_text.trim().is_empty() {
1199            return Ok(None);
1200        }
1201
1202        // Generate embedding for the summary
1203        let embedding = {
1204            let service = self.embedding_service.lock().await;
1205            service
1206                .embed(&summary_text)
1207                .await
1208                .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1209        };
1210
1211        // Store the summary chunk
1212        let chunk_id = uuid::Uuid::new_v4().to_string();
1213        let chunk = MemoryChunk {
1214            id: chunk_id,
1215            content: summary_text.clone(),
1216            tier: MemoryTier::Project,
1217            session_id: None, // The summary belongs to the project, not the ephemeral session
1218            project_id: project_id.map(ToString::to_string),
1219            created_at: Utc::now(),
1220            source: "consolidation".to_string(),
1221            token_count: self.count_tokens(&summary_text) as i64,
1222            source_path: None,
1223            source_mtime: None,
1224            source_size: None,
1225            source_hash: None,
1226            metadata: None,
1227        };
1228
1229        self.db.store_chunk(&chunk, &embedding).await?;
1230
1231        // Clear original chunks now that they are consolidated
1232        self.db.clear_session_memory(session_id).await?;
1233
1234        tracing::info!(
1235            "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1236        );
1237
1238        Ok(Some(summary_text))
1239    }
1240}
1241
1242/// Create memory manager with default database path
1243pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1244    let db_path = app_data_dir.join("tandem_memory.db");
1245    MemoryManager::new(&db_path).await
1246}
1247