Skip to main content

tandem_memory/manager_parts/
part01.rs

1// Memory Manager Module
2// High-level memory operations (store, retrieve, cleanup)
3
4use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::types::{
10    CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
11    KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
12    LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
13    MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTier, NodeType,
14    StoreMessageRequest, TreeNode,
15};
16use chrono::Utc;
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tandem_orchestrator::{
21    build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
22    KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
23    KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
24};
25use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
26use tokio::sync::Mutex;
27
28/// High-level memory manager that coordinates database, embeddings, and chunking
29pub struct MemoryManager {
30    db: Arc<MemoryDatabase>,
31    embedding_service: Arc<Mutex<EmbeddingService>>,
32    tokenizer: Tokenizer,
33}
34
35const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
36const GUIDE_DOC_SOURCE_PREFIX: &str = "guide_docs:";
37const GUIDE_DOC_RECENCY_HALFLIFE_MS: f64 = 30.0 * 24.0 * 60.0 * 60.0 * 1000.0;
38const GUIDE_DOC_RECENCY_WEIGHT: f64 = 0.12;
39
40impl MemoryManager {
41    fn guide_doc_similarity(similarity: f64, chunk: &MemoryChunk, now_ms: i64) -> f64 {
42        if !chunk.source.starts_with(GUIDE_DOC_SOURCE_PREFIX) {
43            return similarity.clamp(0.0, 1.0);
44        }
45
46        let normalized_similarity = similarity.clamp(0.0, 1.0);
47        let source_mtime = chunk
48            .source_mtime
49            .filter(|value| *value > 0)
50            .unwrap_or_else(|| chunk.created_at.timestamp_millis());
51        let age_ms = (now_ms - source_mtime).max(0) as f64;
52        let recency_score = 1.0 / (1.0 + (age_ms / GUIDE_DOC_RECENCY_HALFLIFE_MS));
53        ((1.0 - GUIDE_DOC_RECENCY_WEIGHT) * normalized_similarity
54            + (GUIDE_DOC_RECENCY_WEIGHT * recency_score))
55            .clamp(0.0, 1.0)
56    }
57
58    fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
59        err.to_string()
60            .to_lowercase()
61            .contains("database disk image is malformed")
62    }
63
64    pub fn db(&self) -> &Arc<MemoryDatabase> {
65        &self.db
66    }
67
68    /// Initialize the memory manager
69    pub async fn new(db_path: &Path) -> MemoryResult<Self> {
70        let db = Arc::new(MemoryDatabase::new(db_path).await?);
71        let embedding_service = Arc::new(Mutex::new(EmbeddingService::new()));
72        let tokenizer = Tokenizer::new()?;
73
74        Ok(Self {
75            db,
76            embedding_service,
77            tokenizer,
78        })
79    }
80
81    /// Store a message in memory
82    ///
83    /// This will:
84    /// 1. Chunk the message content
85    /// 2. Generate embeddings for each chunk
86    /// 3. Store chunks and embeddings in the database
87    pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
88        if self
89            .db
90            .ensure_vector_tables_healthy()
91            .await
92            .unwrap_or(false)
93        {
94            tracing::warn!("Memory vector tables were repaired before storing message chunks");
95        }
96
97        let config = if let Some(ref pid) = request.project_id {
98            self.db.get_or_create_config(pid).await?
99        } else {
100            MemoryConfig::default()
101        };
102
103        // Chunk the content
104        let chunking_config = ChunkingConfig {
105            chunk_size: config.chunk_size as usize,
106            chunk_overlap: config.chunk_overlap as usize,
107            separator: None,
108        };
109
110        let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
111
112        if text_chunks.is_empty() {
113            return Ok(Vec::new());
114        }
115
116        let mut chunk_ids = Vec::with_capacity(text_chunks.len());
117        let embedding_service = self.embedding_service.lock().await;
118
119        for text_chunk in text_chunks {
120            let chunk_id = uuid::Uuid::new_v4().to_string();
121
122            // Generate embedding
123            let embedding = embedding_service.embed(&text_chunk.content).await?;
124
125            // Create memory chunk
126            let chunk = MemoryChunk {
127                id: chunk_id.clone(),
128                content: text_chunk.content,
129                tier: request.tier,
130                session_id: request.session_id.clone(),
131                project_id: request.project_id.clone(),
132                source: request.source.clone(),
133                source_path: request.source_path.clone(),
134                source_mtime: request.source_mtime,
135                source_size: request.source_size,
136                source_hash: request.source_hash.clone(),
137                created_at: Utc::now(),
138                token_count: text_chunk.token_count as i64,
139                metadata: request.metadata.clone(),
140            };
141
142            // Store in database (retry once after vector-table self-heal).
143            if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
144                tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
145                let repaired = {
146                    let repaired_after_error =
147                        self.db.try_repair_after_error(&err).await.unwrap_or(false);
148                    repaired_after_error
149                        || self
150                            .db
151                            .ensure_vector_tables_healthy()
152                            .await
153                            .unwrap_or(false)
154                };
155                if repaired {
156                    tracing::warn!(
157                        "Retrying memory chunk insert after vector table repair: {}",
158                        chunk.id
159                    );
160                    if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
161                        if Self::is_malformed_database_error(&retry_err) {
162                            tracing::warn!(
163                                "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
164                                chunk.id
165                            );
166                            self.db.reset_all_memory_tables().await?;
167                            self.db.store_chunk(&chunk, &embedding).await?;
168                        } else {
169                            return Err(retry_err);
170                        }
171                    }
172                } else {
173                    return Err(err);
174                }
175            }
176            chunk_ids.push(chunk_id);
177        }
178
179        // Check if cleanup is needed
180        if config.auto_cleanup {
181            self.maybe_cleanup(&request.project_id).await?;
182        }
183
184        Ok(chunk_ids)
185    }
186
187    /// Search memory for relevant chunks
188    pub async fn search(
189        &self,
190        query: &str,
191        tier: Option<MemoryTier>,
192        project_id: Option<&str>,
193        session_id: Option<&str>,
194        limit: Option<i64>,
195    ) -> MemoryResult<Vec<MemorySearchResult>> {
196        let effective_limit = limit.unwrap_or(5);
197
198        // Generate query embedding
199        let embedding_service = self.embedding_service.lock().await;
200        let query_embedding = embedding_service.embed(query).await?;
201        drop(embedding_service);
202
203        let mut results = Vec::new();
204
205        // Search in specified tier or all tiers
206        let tiers_to_search = match tier {
207            Some(t) => vec![t],
208            None => {
209                if project_id.is_some() {
210                    vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
211                } else {
212                    vec![MemoryTier::Session, MemoryTier::Global]
213                }
214            }
215        };
216
217        let now_ms = Utc::now().timestamp_millis();
218        for search_tier in tiers_to_search {
219            let tier_results = match self
220                .db
221                .search_similar(
222                    &query_embedding,
223                    search_tier,
224                    project_id,
225                    session_id,
226                    effective_limit,
227                )
228                .await
229            {
230                Ok(results) => results,
231                Err(err) => {
232                    tracing::warn!(
233                        "Memory tier search failed for {:?}: {}. Attempting vector repair.",
234                        search_tier,
235                        err
236                    );
237                    let repaired = {
238                        let repaired_after_error =
239                            self.db.try_repair_after_error(&err).await.unwrap_or(false);
240                        repaired_after_error
241                            || self
242                                .db
243                                .ensure_vector_tables_healthy()
244                                .await
245                                .unwrap_or(false)
246                    };
247                    if repaired {
248                        match self
249                            .db
250                            .search_similar(
251                                &query_embedding,
252                                search_tier,
253                                project_id,
254                                session_id,
255                                effective_limit,
256                            )
257                            .await
258                        {
259                            Ok(results) => results,
260                            Err(retry_err) => {
261                                tracing::warn!(
262                                    "Memory tier search still failing for {:?} after repair: {}",
263                                    search_tier,
264                                    retry_err
265                                );
266                                continue;
267                            }
268                        }
269                    } else {
270                        continue;
271                    }
272                }
273            };
274
275            for (chunk, distance) in tier_results {
276                // Convert distance to similarity (cosine similarity)
277                // sqlite-vec returns distance, where lower is more similar
278                // Cosine similarity ranges from -1 to 1, but for normalized vectors it's 0 to 1
279                let similarity = 1.0 - distance.clamp(0.0, 1.0);
280                let similarity = if search_tier == MemoryTier::Global {
281                    Self::guide_doc_similarity(similarity, &chunk, now_ms)
282                } else {
283                    similarity
284                };
285
286                results.push(MemorySearchResult { chunk, similarity });
287            }
288        }
289
290        // Sort by similarity (highest first) and limit results
291        results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
292        results.truncate(effective_limit as usize);
293
294        Ok(results)
295    }
296
297    pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
298        self.db.upsert_knowledge_space(space).await
299    }
300
301    pub async fn get_knowledge_space(
302        &self,
303        id: &str,
304    ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
305        self.db.get_knowledge_space(id).await
306    }
307
308    pub async fn list_knowledge_spaces(
309        &self,
310        project_id: Option<&str>,
311    ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
312        self.db.list_knowledge_spaces(project_id).await
313    }
314
315    pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
316        self.db.upsert_knowledge_item(item).await
317    }
318
319    pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
320        self.db.get_knowledge_item(id).await
321    }
322
323    pub async fn list_knowledge_items(
324        &self,
325        space_id: &str,
326        coverage_key: Option<&str>,
327    ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
328        self.db.list_knowledge_items(space_id, coverage_key).await
329    }
330
331    pub async fn upsert_knowledge_coverage(
332        &self,
333        coverage: &KnowledgeCoverageRecord,
334    ) -> MemoryResult<()> {
335        self.db.upsert_knowledge_coverage(coverage).await
336    }
337
338    pub async fn get_knowledge_coverage(
339        &self,
340        coverage_key: &str,
341        space_id: &str,
342    ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
343        self.db.get_knowledge_coverage(coverage_key, space_id).await
344    }
345
346    pub async fn promote_knowledge_item(
347        &self,
348        request: &KnowledgePromotionRequest,
349    ) -> MemoryResult<Option<KnowledgePromotionResult>> {
350        self.db.promote_knowledge_item(request).await
351    }
352
353    fn space_matches_ref(
354        space: &KnowledgeSpaceRecord,
355        space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
356        project_id: &str,
357    ) -> bool {
358        if space.scope != space_ref.scope {
359            return false;
360        }
361        match space_ref.scope {
362            KnowledgeScope::Project | KnowledgeScope::Run => {
363                let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
364                if space.project_id.as_deref() != Some(target_project) {
365                    return false;
366                }
367            }
368            KnowledgeScope::Global => {}
369        }
370        if let Some(namespace) = space_ref.namespace.as_deref() {
371            if space.namespace.as_deref() != Some(namespace) {
372                return false;
373            }
374        }
375        true
376    }
377
378    fn select_preflight_namespace(
379        binding: &KnowledgeBinding,
380        spaces: &[KnowledgeSpaceRecord],
381    ) -> Option<String> {
382        if let Some(namespace) = binding.namespace.clone() {
383            return Some(namespace);
384        }
385        if binding.read_spaces.len() == 1 {
386            if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
387                return Some(namespace);
388            }
389        }
390        if spaces.len() == 1 {
391            return spaces[0].namespace.clone();
392        }
393        let mut unique = HashSet::new();
394        for space in spaces {
395            if let Some(namespace) = space.namespace.as_ref() {
396                unique.insert(namespace);
397            }
398        }
399        if unique.len() == 1 {
400            unique.into_iter().next().map(|value| value.to_string())
401        } else {
402            None
403        }
404    }
405
406    fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
407        !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
408    }
409
410    fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
411        match (space_namespace, binding_namespace) {
412            (None, None) => true,
413            (Some(space), Some(binding)) => {
414                normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
415            }
416            _ => false,
417        }
418    }
419
420    fn is_fresh_enough(
421        freshness_expires_at_ms: Option<u64>,
422        freshness_policy_ms: Option<u64>,
423        coverage_last_promoted_at_ms: Option<u64>,
424        item_created_at_ms: u64,
425        now_ms: u64,
426    ) -> bool {
427        if let Some(expires_at_ms) = freshness_expires_at_ms {
428            return expires_at_ms > now_ms;
429        }
430        let Some(policy_ms) = freshness_policy_ms else {
431            return true;
432        };
433        let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
434        now_ms.saturating_sub(basis_ms) <= policy_ms
435    }
436
437    async fn resolve_preflight_spaces(
438        &self,
439        request: &KnowledgePreflightRequest,
440        _coverage_key: &str,
441    ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
442        let binding = &request.binding;
443        let mut spaces = Vec::new();
444        let mut seen_space_ids = HashSet::new();
445
446        let push_space = |space: KnowledgeSpaceRecord,
447                          spaces: &mut Vec<KnowledgeSpaceRecord>,
448                          seen_space_ids: &mut HashSet<String>| {
449            if seen_space_ids.insert(space.id.clone()) {
450                spaces.push(space);
451            }
452        };
453
454        if Self::binding_uses_explicit_spaces(binding) {
455            for space_ref in binding
456                .read_spaces
457                .iter()
458                .chain(binding.promote_spaces.iter())
459            {
460                if let Some(space_id) = space_ref.space_id.as_deref() {
461                    if let Some(space) = self.get_knowledge_space(space_id).await? {
462                        push_space(space, &mut spaces, &mut seen_space_ids);
463                    }
464                    continue;
465                }
466
467                match space_ref.scope {
468                    KnowledgeScope::Run => {}
469                    KnowledgeScope::Project => {
470                        let candidate_project_id = space_ref
471                            .project_id
472                            .as_deref()
473                            .unwrap_or(&request.project_id);
474                        let project_spaces = self
475                            .list_knowledge_spaces(Some(candidate_project_id))
476                            .await?;
477                        for space in project_spaces.into_iter().filter(|space| {
478                            Self::space_matches_ref(space, space_ref, &request.project_id)
479                        }) {
480                            push_space(space, &mut spaces, &mut seen_space_ids);
481                        }
482                    }
483                    KnowledgeScope::Global => {
484                        let global_spaces = self.list_knowledge_spaces(None).await?;
485                        for space in global_spaces.into_iter().filter(|space| {
486                            Self::space_matches_ref(space, space_ref, &request.project_id)
487                        }) {
488                            push_space(space, &mut spaces, &mut seen_space_ids);
489                        }
490                    }
491                }
492            }
493            return Ok(spaces);
494        }
495
496        if request.project_id.trim().is_empty() {
497            return Ok(spaces);
498        }
499
500        let project_spaces = self
501            .list_knowledge_spaces(Some(&request.project_id))
502            .await?;
503        let requested_namespace = if binding.namespace.is_some() {
504            binding.namespace.clone()
505        } else {
506            Self::select_preflight_namespace(binding, &project_spaces)
507        };
508        let Some(requested_namespace) = requested_namespace else {
509            return Ok(spaces);
510        };
511
512        for space in project_spaces.into_iter().filter(|space| {
513            space.scope == KnowledgeScope::Project
514                && Self::namespace_matches(
515                    space.namespace.as_deref(),
516                    Some(requested_namespace.as_str()),
517                )
518        }) {
519            push_space(space, &mut spaces, &mut seen_space_ids);
520        }
521        Ok(spaces)
522    }
523
524    pub async fn preflight_knowledge(
525        &self,
526        request: &KnowledgePreflightRequest,
527    ) -> MemoryResult<KnowledgePreflightResult> {
528        let binding = &request.binding;
529        let project_spaces = if request.project_id.trim().is_empty() {
530            Vec::new()
531        } else {
532            self.list_knowledge_spaces(Some(&request.project_id))
533                .await?
534        };
535        let namespace = binding
536            .namespace
537            .clone()
538            .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
539        let coverage_key = build_knowledge_coverage_key(
540            &request.project_id,
541            namespace.as_deref(),
542            &request.task_family,
543            &request.subject,
544        );
545
546        if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
547            return Ok(KnowledgePreflightResult {
548                project_id: request.project_id.clone(),
549                namespace,
550                task_family: request.task_family.clone(),
551                subject: request.subject.clone(),
552                coverage_key,
553                decision: KnowledgeReuseDecision::Disabled,
554                reuse_reason: None,
555                skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
556                freshness_reason: None,
557                items: Vec::new(),
558            });
559        }
560
561        let spaces = self
562            .resolve_preflight_spaces(request, &coverage_key)
563            .await?;
564        if spaces.is_empty() {
565            return Ok(KnowledgePreflightResult {
566                project_id: request.project_id.clone(),
567                namespace,
568                task_family: request.task_family.clone(),
569                subject: request.subject.clone(),
570                coverage_key,
571                decision: KnowledgeReuseDecision::NoPriorKnowledge,
572                reuse_reason: None,
573                skip_reason: Some("no reusable knowledge spaces were found".to_string()),
574                freshness_reason: None,
575                items: Vec::new(),
576            });
577        }
578
579        let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
580        let mut fresh_items = Vec::new();
581        let mut stale_items = Vec::new();
582        let mut freshest_reason = None;
583
584        for space in &spaces {
585            let items = self
586                .list_knowledge_items(&space.id, Some(&coverage_key))
587                .await?;
588            let coverage = self
589                .get_knowledge_coverage(&coverage_key, &space.id)
590                .await?;
591            for item in items {
592                if !item.status.is_active() {
593                    continue;
594                }
595                let Some(trust_level) = item.status.as_trust_level() else {
596                    continue;
597                };
598                if !trust_level.meets_floor(binding.trust_floor) {
599                    continue;
600                }
601                let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
602                    coverage
603                        .as_ref()
604                        .and_then(|coverage| coverage.freshness_expires_at_ms)
605                });
606                let pack_item = KnowledgePackItem {
607                    item_id: item.id.clone(),
608                    space_id: space.id.clone(),
609                    coverage_key: item.coverage_key.clone(),
610                    title: item.title.clone(),
611                    summary: item.summary.clone(),
612                    trust_level,
613                    status: item.status.to_string(),
614                    artifact_refs: item.artifact_refs.clone(),
615                    source_memory_ids: item.source_memory_ids.clone(),
616                    freshness_expires_at_ms,
617                };
618                if Self::is_fresh_enough(
619                    freshness_expires_at_ms,
620                    binding.freshness_ms,
621                    coverage
622                        .as_ref()
623                        .and_then(|coverage| coverage.last_promoted_at_ms),
624                    item.created_at_ms,
625                    now_ms,
626                ) {
627                    fresh_items.push(pack_item);
628                } else {
629                    freshest_reason = Some(match freshness_expires_at_ms {
630                        Some(expires_at_ms) => format!(
631                            "coverage `{}` in space `{}` expired at {}",
632                            coverage_key, space.id, expires_at_ms
633                        ),
634                        None => format!(
635                            "coverage `{}` in space `{}` lacks freshness metadata",
636                            coverage_key, space.id
637                        ),
638                    });
639                    stale_items.push(pack_item);
640                }
641            }
642        }
643
644        fresh_items.sort_by(|left, right| {
645            right
646                .trust_level
647                .rank()
648                .cmp(&left.trust_level.rank())
649                .then_with(|| {
650                    right
651                        .freshness_expires_at_ms
652                        .unwrap_or(0)
653                        .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
654                })
655                .then_with(|| left.title.cmp(&right.title))
656        });
657        stale_items.sort_by(|left, right| {
658            right
659                .trust_level
660                .rank()
661                .cmp(&left.trust_level.rank())
662                .then_with(|| left.title.cmp(&right.title))
663        });
664
665        if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
666            let selected = fresh_items
667                .into_iter()
668                .take(MAX_KNOWLEDGE_PACK_ITEMS)
669                .collect::<Vec<_>>();
670            let decision = match freshest_trust_level {
671                KnowledgeTrustLevel::ApprovedDefault => {
672                    KnowledgeReuseDecision::ReuseApprovedDefault
673                }
674                _ => KnowledgeReuseDecision::ReusePromoted,
675            };
676            let selected_count = selected.len();
677            return Ok(KnowledgePreflightResult {
678                project_id: request.project_id.clone(),
679                namespace,
680                task_family: request.task_family.clone(),
681                subject: request.subject.clone(),
682                coverage_key,
683                decision,
684                reuse_reason: Some(format!(
685                    "reusing {} promoted knowledge item(s) from {} space(s)",
686                    selected_count,
687                    spaces.len()
688                )),
689                skip_reason: None,
690                freshness_reason: None,
691                items: selected,
692            });
693        }
694
695        if !stale_items.is_empty() {
696            let selected = stale_items
697                .into_iter()
698                .take(MAX_KNOWLEDGE_PACK_ITEMS)
699                .collect::<Vec<_>>();
700            return Ok(KnowledgePreflightResult {
701                project_id: request.project_id.clone(),
702                namespace,
703                task_family: request.task_family.clone(),
704                subject: request.subject.clone(),
705                coverage_key,
706                decision: KnowledgeReuseDecision::RefreshRequired,
707                reuse_reason: None,
708                skip_reason: Some(
709                    "prior knowledge exists but is not fresh enough to reuse".to_string(),
710                ),
711                freshness_reason: freshest_reason.or_else(|| {
712                    Some("matching knowledge exists but freshness policy rejected it".to_string())
713                }),
714                items: selected,
715            });
716        }
717
718        Ok(KnowledgePreflightResult {
719            project_id: request.project_id.clone(),
720            namespace,
721            task_family: request.task_family.clone(),
722            subject: request.subject.clone(),
723            coverage_key,
724            decision: KnowledgeReuseDecision::NoPriorKnowledge,
725            reuse_reason: None,
726            skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
727            freshness_reason: None,
728            items: Vec::new(),
729        })
730    }
731
732    /// Retrieve context for a message
733    ///
734    /// This retrieves relevant chunks from all tiers and formats them
735    /// for injection into the prompt
736    pub async fn retrieve_context(
737        &self,
738        query: &str,
739        project_id: Option<&str>,
740        session_id: Option<&str>,
741        token_budget: Option<i64>,
742    ) -> MemoryResult<MemoryContext> {
743        let (context, _) = self
744            .retrieve_context_with_meta(query, project_id, session_id, token_budget)
745            .await?;
746        Ok(context)
747    }
748
749    /// Retrieve context plus retrieval metadata for observability.
750    pub async fn retrieve_context_with_meta(
751        &self,
752        query: &str,
753        project_id: Option<&str>,
754        session_id: Option<&str>,
755        token_budget: Option<i64>,
756    ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
757        let config = if let Some(pid) = project_id {
758            self.db.get_or_create_config(pid).await?
759        } else {
760            MemoryConfig::default()
761        };
762        let budget = token_budget.unwrap_or(config.token_budget);
763        let retrieval_limit = config.retrieval_k.max(1);
764
765        // Get recent session chunks
766        let current_session = if let Some(sid) = session_id {
767            self.db.get_session_chunks(sid).await?
768        } else {
769            Vec::new()
770        };
771
772        // Search for relevant history
773        let search_results = self
774            .search(query, None, project_id, session_id, Some(retrieval_limit))
775            .await?;
776
777        let mut score_min: Option<f64> = None;
778        let mut score_max: Option<f64> = None;
779        for result in &search_results {
780            score_min = Some(match score_min {
781                Some(current) => current.min(result.similarity),
782                None => result.similarity,
783            });
784            score_max = Some(match score_max {
785                Some(current) => current.max(result.similarity),
786                None => result.similarity,
787            });
788        }
789
790        let mut current_session = current_session;
791        let mut relevant_history = Vec::new();
792        let mut project_facts = Vec::new();
793
794        for result in search_results {
795            match result.chunk.tier {
796                MemoryTier::Project => {
797                    project_facts.push(result.chunk);
798                }
799                MemoryTier::Global => {
800                    project_facts.push(result.chunk);
801                }
802                MemoryTier::Session => {
803                    // Only add to relevant_history if not in current_session
804                    if !current_session.iter().any(|c| c.id == result.chunk.id) {
805                        relevant_history.push(result.chunk);
806                    }
807                }
808            }
809        }
810
811        // Calculate total tokens and trim if necessary
812        let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
813        total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
814        total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
815
816        // Trim to fit budget if necessary
817        if total_tokens > budget {
818            let excess = total_tokens - budget;
819            self.trim_context(
820                &mut current_session,
821                &mut relevant_history,
822                &mut project_facts,
823                excess,
824            )?;
825            total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
826                + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
827                + project_facts.iter().map(|c| c.token_count).sum::<i64>();
828        }
829
830        let context = MemoryContext {
831            current_session,
832            relevant_history,
833            project_facts,
834            total_tokens,
835        };
836        let chunks_total = context.current_session.len()
837            + context.relevant_history.len()
838            + context.project_facts.len();
839        let meta = MemoryRetrievalMeta {
840            used: chunks_total > 0,
841            chunks_total,
842            session_chunks: context.current_session.len(),
843            history_chunks: context.relevant_history.len(),
844            project_fact_chunks: context.project_facts.len(),
845            score_min,
846            score_max,
847        };
848
849        Ok((context, meta))
850    }
851
852    /// Trim context to fit within token budget
853    fn trim_context(
854        &self,
855        current_session: &mut Vec<MemoryChunk>,
856        relevant_history: &mut Vec<MemoryChunk>,
857        project_facts: &mut Vec<MemoryChunk>,
858        excess_tokens: i64,
859    ) -> MemoryResult<()> {
860        let mut tokens_to_remove = excess_tokens;
861
862        // First, trim relevant_history (less important than project_facts)
863        while tokens_to_remove > 0 && !relevant_history.is_empty() {
864            if let Some(chunk) = relevant_history.pop() {
865                tokens_to_remove -= chunk.token_count;
866            }
867        }
868
869        // If still over budget, trim project_facts
870        while tokens_to_remove > 0 && !project_facts.is_empty() {
871            if let Some(chunk) = project_facts.pop() {
872                tokens_to_remove -= chunk.token_count;
873            }
874        }
875
876        while tokens_to_remove > 0 && !current_session.is_empty() {
877            if let Some(chunk) = current_session.pop() {
878                tokens_to_remove -= chunk.token_count;
879            }
880        }
881
882        Ok(())
883    }
884
885    /// Clear session memory
886    pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
887        let count = self.db.clear_session_memory(session_id).await?;
888
889        // Log cleanup
890        self.db
891            .log_cleanup(
892                "manual",
893                MemoryTier::Session,
894                None,
895                Some(session_id),
896                count as i64,
897                0,
898            )
899            .await?;
900
901        Ok(count)
902    }
903
904    /// Clear project memory
905    pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
906        let count = self.db.clear_project_memory(project_id).await?;
907
908        // Log cleanup
909        self.db
910            .log_cleanup(
911                "manual",
912                MemoryTier::Project,
913                Some(project_id),
914                None,
915                count as i64,
916                0,
917            )
918            .await?;
919
920        Ok(count)
921    }
922
923    /// Get memory statistics
924    pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
925        self.db.get_stats().await
926    }
927
928    /// Get memory configuration for a project
929    pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
930        self.db.get_or_create_config(project_id).await
931    }
932
933    /// Update memory configuration for a project
934    pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
935        self.db.update_config(project_id, config).await
936    }
937
938    pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
939        self.db.get_node_by_uri(uri).await
940    }
941
942    pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
943        let nodes = self.db.list_directory(uri).await?;
944        let directories: Vec<MemoryNode> = nodes
945            .iter()
946            .filter(|n| n.node_type == NodeType::Directory)
947            .cloned()
948            .collect();
949        let files: Vec<MemoryNode> = nodes
950            .iter()
951            .filter(|n| n.node_type == NodeType::File)
952            .cloned()
953            .collect();
954
955        Ok(DirectoryListing {
956            uri: uri.to_string(),
957            nodes,
958            total_children: directories.len() + files.len(),
959            directories,
960            files,
961        })
962    }
963
964    pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
965        self.db.get_children_tree(uri, max_depth).await
966    }
967
968    pub async fn create_context_node(
969        &self,
970        uri: &str,
971        node_type: NodeType,
972        metadata: Option<serde_json::Value>,
973    ) -> MemoryResult<String> {
974        let parsed_uri =
975            ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
976        let parent_uri = parsed_uri.parent().map(|p| p.to_string());
977        self.db
978            .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
979            .await
980    }
981
982    pub async fn get_context_layer(
983        &self,
984        node_id: &str,
985        layer_type: LayerType,
986    ) -> MemoryResult<Option<MemoryLayer>> {
987        self.db.get_layer(node_id, layer_type).await
988    }
989
990    pub async fn store_content_with_layers(
991        &self,
992        uri: &str,
993        content: &str,
994        metadata: Option<serde_json::Value>,
995    ) -> MemoryResult<String> {
996        let parsed_uri =
997            ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
998        let node_type = if parsed_uri
999            .last_segment()
1000            .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
1001            .unwrap_or(false)
1002        {
1003            NodeType::File
1004        } else {
1005            NodeType::Directory
1006        };
1007
1008        let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1009        let node_id = self
1010            .db
1011            .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1012            .await?;
1013
1014        let token_count = self.tokenizer.count_tokens(content) as i64;
1015        self.db
1016            .create_layer(&node_id, LayerType::L2, content, token_count, None)
1017            .await?;
1018
1019        Ok(node_id)
1020    }
1021
1022    pub async fn generate_layers_for_node(
1023        &self,
1024        node_id: &str,
1025        providers: &ProviderRegistry,
1026    ) -> MemoryResult<()> {
1027        let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1028        let l2_content = match l2_layer {
1029            Some(layer) => layer.content,
1030            None => return Ok(()),
1031        };
1032
1033        let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1034
1035        let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1036
1037        let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1038        let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1039
1040        if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1041            self.db
1042                .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1043                .await?;
1044        }
1045
1046        if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1047            self.db
1048                .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1049                .await?;
1050        }
1051
1052        Ok(())
1053    }
1054
1055    pub async fn get_layer_content(
1056        &self,
1057        node_id: &str,
1058        layer_type: LayerType,
1059    ) -> MemoryResult<Option<String>> {
1060        let layer = self.db.get_layer(node_id, layer_type).await?;
1061        Ok(layer.map(|l| l.content))
1062    }
1063
1064    pub async fn store_content_with_layers_auto(
1065        &self,
1066        uri: &str,
1067        content: &str,
1068        metadata: Option<serde_json::Value>,
1069        providers: Option<&ProviderRegistry>,
1070    ) -> MemoryResult<String> {
1071        let node_id = self
1072            .store_content_with_layers(uri, content, metadata)
1073            .await?;
1074
1075        if let Some(p) = providers {
1076            if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1077                tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1078            }
1079        }
1080
1081        Ok(node_id)
1082    }
1083
1084    /// Run cleanup based on retention policies
1085    pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1086        let mut total_cleaned = 0u64;
1087
1088        if let Some(pid) = project_id {
1089            // Get config for this project
1090            let config = self.db.get_or_create_config(pid).await?;
1091
1092            if config.auto_cleanup {
1093                // Clean up old session memory
1094                let cleaned = self
1095                    .db
1096                    .cleanup_old_sessions(config.session_retention_days)
1097                    .await?;
1098                total_cleaned += cleaned;
1099
1100                if cleaned > 0 {
1101                    self.db
1102                        .log_cleanup(
1103                            "auto",
1104                            MemoryTier::Session,
1105                            Some(pid),
1106                            None,
1107                            cleaned as i64,
1108                            0,
1109                        )
1110                        .await?;
1111                }
1112            }
1113        } else {
1114            // Clean up all projects with auto_cleanup enabled
1115            // This would require listing all projects, for now just clean session memory
1116            // with a default retention period
1117            let cleaned = self.db.cleanup_old_sessions(30).await?;
1118            total_cleaned += cleaned;
1119        }
1120
1121        // Vacuum if significant cleanup occurred
1122        if total_cleaned > 100 {
1123            self.db.vacuum().await?;
1124        }
1125
1126        Ok(total_cleaned)
1127    }
1128
1129    /// Check if cleanup is needed and run it
1130    async fn maybe_cleanup(&self, project_id: &Option<String>) -> MemoryResult<()> {
1131        if let Some(pid) = project_id {
1132            let stats = self.db.get_stats().await?;
1133            let config = self.db.get_or_create_config(pid).await?;
1134
1135            // Check if we're over the chunk limit
1136            if stats.project_chunks > config.max_chunks {
1137                // Remove oldest chunks
1138                let excess = stats.project_chunks - config.max_chunks;
1139                // This would require a new DB method to delete oldest chunks
1140                // For now, just log
1141                tracing::info!("Project {} has {} excess chunks", pid, excess);
1142            }
1143        }
1144
1145        Ok(())
1146    }
1147
1148    /// Get cleanup log entries
1149    pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1150        // This would be implemented in the DB layer
1151        // For now, return empty
1152        Ok(Vec::new())
1153    }
1154
1155    /// Count tokens in text
1156    pub fn count_tokens(&self, text: &str) -> usize {
1157        self.tokenizer.count_tokens(text)
1158    }
1159
1160    /// Report embedding backend health for UI/telemetry.
1161    pub async fn embedding_health(&self) -> EmbeddingHealth {
1162        let service = self.embedding_service.lock().await;
1163        if service.is_available() {
1164            EmbeddingHealth {
1165                status: "ok".to_string(),
1166                reason: None,
1167            }
1168        } else {
1169            EmbeddingHealth {
1170                status: "degraded_disabled".to_string(),
1171                reason: service.disabled_reason().map(ToString::to_string),
1172            }
1173        }
1174    }
1175
1176    /// Consolidate a session's memory into a summary chunk using the cheapest available provider.
1177    pub async fn consolidate_session(
1178        &self,
1179        session_id: &str,
1180        project_id: Option<&str>,
1181        providers: &ProviderRegistry,
1182        config: &MemoryConsolidationConfig,
1183    ) -> MemoryResult<Option<String>> {
1184        if !config.enabled {
1185            return Ok(None);
1186        }
1187
1188        let chunks = self.db.get_session_chunks(session_id).await?;
1189        if chunks.is_empty() {
1190            return Ok(None);
1191        }
1192
1193        // Assemble text
1194        let mut text_parts = Vec::new();
1195        for chunk in &chunks {
1196            text_parts.push(chunk.content.clone());
1197        }
1198        let full_text = text_parts.join("\n\n---\n\n");
1199
1200        // Build prompt
1201        let prompt = format!(
1202            "Please provide a concise but comprehensive summary of the following chat session. \
1203            Focus on the key decisions, technical details, code changes, and unresolved issues. \
1204            Do NOT include conversational filler, greetings, or sign-offs. \
1205            This summary will be used as long-term memory to recall the context of this work.\n\n\
1206            Session transcripts:\n\n{}",
1207            full_text
1208        );
1209
1210        let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1211        let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1212
1213        let summary_text = match providers
1214            .complete_cheapest(&prompt, provider_override, model_override)
1215            .await
1216        {
1217            Ok(s) => s,
1218            Err(e) => {
1219                tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1220                return Ok(None);
1221            }
1222        };
1223
1224        if summary_text.trim().is_empty() {
1225            return Ok(None);
1226        }
1227
1228        // Generate embedding for the summary
1229        let embedding = {
1230            let service = self.embedding_service.lock().await;
1231            service
1232                .embed(&summary_text)
1233                .await
1234                .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1235        };
1236
1237        // Store the summary chunk
1238        let chunk_id = uuid::Uuid::new_v4().to_string();
1239        let chunk = MemoryChunk {
1240            id: chunk_id,
1241            content: summary_text.clone(),
1242            tier: MemoryTier::Project,
1243            session_id: None, // The summary belongs to the project, not the ephemeral session
1244            project_id: project_id.map(ToString::to_string),
1245            created_at: Utc::now(),
1246            source: "consolidation".to_string(),
1247            token_count: self.count_tokens(&summary_text) as i64,
1248            source_path: None,
1249            source_mtime: None,
1250            source_size: None,
1251            source_hash: None,
1252            metadata: None,
1253        };
1254
1255        self.db.store_chunk(&chunk, &embedding).await?;
1256
1257        // Clear original chunks now that they are consolidated
1258        self.db.clear_session_memory(session_id).await?;
1259
1260        tracing::info!(
1261            "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1262        );
1263
1264        Ok(Some(summary_text))
1265    }
1266}
1267
1268/// Create memory manager with default database path
1269pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1270    let db_path = app_data_dir.join("tandem_memory.db");
1271    MemoryManager::new(&db_path).await
1272}