Skip to main content

tandem_memory/manager_parts/
part01.rs

1// Memory Manager Module
2// High-level memory operations (store, retrieve, cleanup)
3
4use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::types::{
10    CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
11    KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
12    LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
13    MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTier, NodeType,
14    StoreMessageRequest, TreeNode,
15};
16use chrono::Utc;
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tandem_orchestrator::{
21    build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
22    KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
23    KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
24};
25use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
26use tokio::sync::Mutex;
27
28/// High-level memory manager that coordinates database, embeddings, and chunking
29pub struct MemoryManager {
30    db: Arc<MemoryDatabase>,
31    embedding_service: Arc<Mutex<EmbeddingService>>,
32    tokenizer: Tokenizer,
33}
34
35const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
36const GUIDE_DOC_SOURCE_PREFIX: &str = "guide_docs:";
37const GUIDE_DOC_RECENCY_HALFLIFE_MS: f64 = 30.0 * 24.0 * 60.0 * 60.0 * 1000.0;
38const GUIDE_DOC_RECENCY_WEIGHT: f64 = 0.12;
39
40impl MemoryManager {
41    fn guide_doc_similarity(similarity: f64, chunk: &MemoryChunk, now_ms: i64) -> f64 {
42        if !chunk.source.starts_with(GUIDE_DOC_SOURCE_PREFIX) {
43            return similarity.clamp(0.0, 1.0);
44        }
45
46        let normalized_similarity = similarity.clamp(0.0, 1.0);
47        let source_mtime = chunk
48            .source_mtime
49            .filter(|value| *value > 0)
50            .unwrap_or_else(|| chunk.created_at.timestamp_millis());
51        let age_ms = (now_ms - source_mtime).max(0) as f64;
52        let recency_score = 1.0 / (1.0 + (age_ms / GUIDE_DOC_RECENCY_HALFLIFE_MS));
53        ((1.0 - GUIDE_DOC_RECENCY_WEIGHT) * normalized_similarity
54            + (GUIDE_DOC_RECENCY_WEIGHT * recency_score))
55            .clamp(0.0, 1.0)
56    }
57
58    fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
59        err.to_string()
60            .to_lowercase()
61            .contains("database disk image is malformed")
62    }
63
64    pub fn db(&self) -> &Arc<MemoryDatabase> {
65        &self.db
66    }
67
68    /// Initialize the memory manager
69    pub async fn new(db_path: &Path) -> MemoryResult<Self> {
70        Self::new_with_embedding_service(db_path, EmbeddingService::new()).await
71    }
72
73    /// Initialize the memory manager with a caller-provided embedding
74    /// service. Tests use this to avoid depending on local model assets.
75    pub async fn new_with_embedding_service(
76        db_path: &Path,
77        embedding_service: EmbeddingService,
78    ) -> MemoryResult<Self> {
79        let db = Arc::new(MemoryDatabase::new(db_path).await?);
80        let embedding_service = Arc::new(Mutex::new(embedding_service));
81        let tokenizer = Tokenizer::new()?;
82
83        Ok(Self {
84            db,
85            embedding_service,
86            tokenizer,
87        })
88    }
89
90    /// Store a message in memory
91    ///
92    /// This will:
93    /// 1. Chunk the message content
94    /// 2. Generate embeddings for each chunk
95    /// 3. Store chunks and embeddings in the database
96    pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
97        if self
98            .db
99            .ensure_vector_tables_healthy()
100            .await
101            .unwrap_or(false)
102        {
103            tracing::warn!("Memory vector tables were repaired before storing message chunks");
104        }
105
106        let config = if let Some(ref pid) = request.project_id {
107            self.db.get_or_create_config(pid).await?
108        } else {
109            MemoryConfig::default()
110        };
111
112        // Chunk the content
113        let chunking_config = ChunkingConfig {
114            chunk_size: config.chunk_size as usize,
115            chunk_overlap: config.chunk_overlap as usize,
116            separator: None,
117        };
118
119        let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
120
121        if text_chunks.is_empty() {
122            return Ok(Vec::new());
123        }
124
125        let mut chunk_ids = Vec::with_capacity(text_chunks.len());
126        let embedding_service = self.embedding_service.lock().await;
127
128        for text_chunk in text_chunks {
129            let chunk_id = uuid::Uuid::new_v4().to_string();
130
131            // Generate embedding
132            let embedding = embedding_service.embed(&text_chunk.content).await?;
133
134            // Create memory chunk
135            let chunk = MemoryChunk {
136                id: chunk_id.clone(),
137                content: text_chunk.content,
138                tier: request.tier,
139                session_id: request.session_id.clone(),
140                project_id: request.project_id.clone(),
141                source: request.source.clone(),
142                source_path: request.source_path.clone(),
143                source_mtime: request.source_mtime,
144                source_size: request.source_size,
145                source_hash: request.source_hash.clone(),
146                created_at: Utc::now(),
147                token_count: text_chunk.token_count as i64,
148                metadata: request.metadata.clone(),
149            };
150
151            // Store in database (retry once after vector-table self-heal).
152            if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
153                tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
154                let repaired = {
155                    let repaired_after_error =
156                        self.db.try_repair_after_error(&err).await.unwrap_or(false);
157                    repaired_after_error
158                        || self
159                            .db
160                            .ensure_vector_tables_healthy()
161                            .await
162                            .unwrap_or(false)
163                };
164                if repaired {
165                    tracing::warn!(
166                        "Retrying memory chunk insert after vector table repair: {}",
167                        chunk.id
168                    );
169                    if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
170                        if Self::is_malformed_database_error(&retry_err) {
171                            tracing::warn!(
172                                "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
173                                chunk.id
174                            );
175                            self.db.reset_all_memory_tables().await?;
176                            self.db.store_chunk(&chunk, &embedding).await?;
177                        } else {
178                            return Err(retry_err);
179                        }
180                    }
181                } else {
182                    return Err(err);
183                }
184            }
185            chunk_ids.push(chunk_id);
186        }
187
188        // Check if cleanup is needed
189        if config.auto_cleanup {
190            self.maybe_cleanup(&request.project_id).await?;
191        }
192
193        Ok(chunk_ids)
194    }
195
196    /// Search memory for relevant chunks
197    pub async fn search(
198        &self,
199        query: &str,
200        tier: Option<MemoryTier>,
201        project_id: Option<&str>,
202        session_id: Option<&str>,
203        limit: Option<i64>,
204    ) -> MemoryResult<Vec<MemorySearchResult>> {
205        let effective_limit = limit.unwrap_or(5);
206
207        // Generate query embedding
208        let embedding_service = self.embedding_service.lock().await;
209        let query_embedding = embedding_service.embed(query).await?;
210        drop(embedding_service);
211
212        let mut results = Vec::new();
213
214        // Search in specified tier or all tiers
215        let tiers_to_search = match tier {
216            Some(t) => vec![t],
217            None => {
218                if project_id.is_some() {
219                    vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
220                } else {
221                    vec![MemoryTier::Session, MemoryTier::Global]
222                }
223            }
224        };
225
226        let now_ms = Utc::now().timestamp_millis();
227        for search_tier in tiers_to_search {
228            let tier_results = match self
229                .db
230                .search_similar(
231                    &query_embedding,
232                    search_tier,
233                    project_id,
234                    session_id,
235                    effective_limit,
236                )
237                .await
238            {
239                Ok(results) => results,
240                Err(err) => {
241                    tracing::warn!(
242                        "Memory tier search failed for {:?}: {}. Attempting vector repair.",
243                        search_tier,
244                        err
245                    );
246                    let repaired = {
247                        let repaired_after_error =
248                            self.db.try_repair_after_error(&err).await.unwrap_or(false);
249                        repaired_after_error
250                            || self
251                                .db
252                                .ensure_vector_tables_healthy()
253                                .await
254                                .unwrap_or(false)
255                    };
256                    if repaired {
257                        match self
258                            .db
259                            .search_similar(
260                                &query_embedding,
261                                search_tier,
262                                project_id,
263                                session_id,
264                                effective_limit,
265                            )
266                            .await
267                        {
268                            Ok(results) => results,
269                            Err(retry_err) => {
270                                tracing::warn!(
271                                    "Memory tier search still failing for {:?} after repair: {}",
272                                    search_tier,
273                                    retry_err
274                                );
275                                continue;
276                            }
277                        }
278                    } else {
279                        continue;
280                    }
281                }
282            };
283
284            for (chunk, distance) in tier_results {
285                // Convert distance to similarity (cosine similarity)
286                // sqlite-vec returns distance, where lower is more similar
287                // Cosine similarity ranges from -1 to 1, but for normalized vectors it's 0 to 1
288                let similarity = 1.0 - distance.clamp(0.0, 1.0);
289                let similarity = if search_tier == MemoryTier::Global {
290                    Self::guide_doc_similarity(similarity, &chunk, now_ms)
291                } else {
292                    similarity
293                };
294
295                results.push(MemorySearchResult { chunk, similarity });
296            }
297        }
298
299        // Sort by similarity (highest first) and limit results
300        results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
301        results.truncate(effective_limit as usize);
302
303        Ok(results)
304    }
305
306    pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
307        self.db.upsert_knowledge_space(space).await
308    }
309
310    pub async fn get_knowledge_space(
311        &self,
312        id: &str,
313    ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
314        self.db.get_knowledge_space(id).await
315    }
316
317    pub async fn list_knowledge_spaces(
318        &self,
319        project_id: Option<&str>,
320    ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
321        self.db.list_knowledge_spaces(project_id).await
322    }
323
324    pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
325        self.db.upsert_knowledge_item(item).await
326    }
327
328    pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
329        self.db.get_knowledge_item(id).await
330    }
331
332    pub async fn list_knowledge_items(
333        &self,
334        space_id: &str,
335        coverage_key: Option<&str>,
336    ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
337        self.db.list_knowledge_items(space_id, coverage_key).await
338    }
339
340    pub async fn upsert_knowledge_coverage(
341        &self,
342        coverage: &KnowledgeCoverageRecord,
343    ) -> MemoryResult<()> {
344        self.db.upsert_knowledge_coverage(coverage).await
345    }
346
347    pub async fn get_knowledge_coverage(
348        &self,
349        coverage_key: &str,
350        space_id: &str,
351    ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
352        self.db.get_knowledge_coverage(coverage_key, space_id).await
353    }
354
355    pub async fn promote_knowledge_item(
356        &self,
357        request: &KnowledgePromotionRequest,
358    ) -> MemoryResult<Option<KnowledgePromotionResult>> {
359        self.db.promote_knowledge_item(request).await
360    }
361
362    fn space_matches_ref(
363        space: &KnowledgeSpaceRecord,
364        space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
365        project_id: &str,
366    ) -> bool {
367        if space.scope != space_ref.scope {
368            return false;
369        }
370        match space_ref.scope {
371            KnowledgeScope::Project | KnowledgeScope::Run => {
372                let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
373                if space.project_id.as_deref() != Some(target_project) {
374                    return false;
375                }
376            }
377            KnowledgeScope::Global => {}
378        }
379        if let Some(namespace) = space_ref.namespace.as_deref() {
380            if space.namespace.as_deref() != Some(namespace) {
381                return false;
382            }
383        }
384        true
385    }
386
387    fn select_preflight_namespace(
388        binding: &KnowledgeBinding,
389        spaces: &[KnowledgeSpaceRecord],
390    ) -> Option<String> {
391        if let Some(namespace) = binding.namespace.clone() {
392            return Some(namespace);
393        }
394        if binding.read_spaces.len() == 1 {
395            if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
396                return Some(namespace);
397            }
398        }
399        if spaces.len() == 1 {
400            return spaces[0].namespace.clone();
401        }
402        let mut unique = HashSet::new();
403        for space in spaces {
404            if let Some(namespace) = space.namespace.as_ref() {
405                unique.insert(namespace);
406            }
407        }
408        if unique.len() == 1 {
409            unique.into_iter().next().map(|value| value.to_string())
410        } else {
411            None
412        }
413    }
414
415    fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
416        !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
417    }
418
419    fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
420        match (space_namespace, binding_namespace) {
421            (None, None) => true,
422            (Some(space), Some(binding)) => {
423                normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
424            }
425            _ => false,
426        }
427    }
428
429    fn is_fresh_enough(
430        freshness_expires_at_ms: Option<u64>,
431        freshness_policy_ms: Option<u64>,
432        coverage_last_promoted_at_ms: Option<u64>,
433        item_created_at_ms: u64,
434        now_ms: u64,
435    ) -> bool {
436        if let Some(expires_at_ms) = freshness_expires_at_ms {
437            return expires_at_ms > now_ms;
438        }
439        let Some(policy_ms) = freshness_policy_ms else {
440            return true;
441        };
442        let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
443        now_ms.saturating_sub(basis_ms) <= policy_ms
444    }
445
446    async fn resolve_preflight_spaces(
447        &self,
448        request: &KnowledgePreflightRequest,
449        _coverage_key: &str,
450    ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
451        let binding = &request.binding;
452        let mut spaces = Vec::new();
453        let mut seen_space_ids = HashSet::new();
454
455        let push_space = |space: KnowledgeSpaceRecord,
456                          spaces: &mut Vec<KnowledgeSpaceRecord>,
457                          seen_space_ids: &mut HashSet<String>| {
458            if seen_space_ids.insert(space.id.clone()) {
459                spaces.push(space);
460            }
461        };
462
463        if Self::binding_uses_explicit_spaces(binding) {
464            for space_ref in binding
465                .read_spaces
466                .iter()
467                .chain(binding.promote_spaces.iter())
468            {
469                if let Some(space_id) = space_ref.space_id.as_deref() {
470                    if let Some(space) = self.get_knowledge_space(space_id).await? {
471                        push_space(space, &mut spaces, &mut seen_space_ids);
472                    }
473                    continue;
474                }
475
476                match space_ref.scope {
477                    KnowledgeScope::Run => {}
478                    KnowledgeScope::Project => {
479                        let candidate_project_id = space_ref
480                            .project_id
481                            .as_deref()
482                            .unwrap_or(&request.project_id);
483                        let project_spaces = self
484                            .list_knowledge_spaces(Some(candidate_project_id))
485                            .await?;
486                        for space in project_spaces.into_iter().filter(|space| {
487                            Self::space_matches_ref(space, space_ref, &request.project_id)
488                        }) {
489                            push_space(space, &mut spaces, &mut seen_space_ids);
490                        }
491                    }
492                    KnowledgeScope::Global => {
493                        let global_spaces = self.list_knowledge_spaces(None).await?;
494                        for space in global_spaces.into_iter().filter(|space| {
495                            Self::space_matches_ref(space, space_ref, &request.project_id)
496                        }) {
497                            push_space(space, &mut spaces, &mut seen_space_ids);
498                        }
499                    }
500                }
501            }
502            return Ok(spaces);
503        }
504
505        if request.project_id.trim().is_empty() {
506            return Ok(spaces);
507        }
508
509        let project_spaces = self
510            .list_knowledge_spaces(Some(&request.project_id))
511            .await?;
512        let requested_namespace = if binding.namespace.is_some() {
513            binding.namespace.clone()
514        } else {
515            Self::select_preflight_namespace(binding, &project_spaces)
516        };
517        let Some(requested_namespace) = requested_namespace else {
518            return Ok(spaces);
519        };
520
521        for space in project_spaces.into_iter().filter(|space| {
522            space.scope == KnowledgeScope::Project
523                && Self::namespace_matches(
524                    space.namespace.as_deref(),
525                    Some(requested_namespace.as_str()),
526                )
527        }) {
528            push_space(space, &mut spaces, &mut seen_space_ids);
529        }
530        Ok(spaces)
531    }
532
533    pub async fn preflight_knowledge(
534        &self,
535        request: &KnowledgePreflightRequest,
536    ) -> MemoryResult<KnowledgePreflightResult> {
537        let binding = &request.binding;
538        let project_spaces = if request.project_id.trim().is_empty() {
539            Vec::new()
540        } else {
541            self.list_knowledge_spaces(Some(&request.project_id))
542                .await?
543        };
544        let namespace = binding
545            .namespace
546            .clone()
547            .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
548        let coverage_key = build_knowledge_coverage_key(
549            &request.project_id,
550            namespace.as_deref(),
551            &request.task_family,
552            &request.subject,
553        );
554
555        if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
556            return Ok(KnowledgePreflightResult {
557                project_id: request.project_id.clone(),
558                namespace,
559                task_family: request.task_family.clone(),
560                subject: request.subject.clone(),
561                coverage_key,
562                decision: KnowledgeReuseDecision::Disabled,
563                reuse_reason: None,
564                skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
565                freshness_reason: None,
566                items: Vec::new(),
567            });
568        }
569
570        let spaces = self
571            .resolve_preflight_spaces(request, &coverage_key)
572            .await?;
573        if spaces.is_empty() {
574            return Ok(KnowledgePreflightResult {
575                project_id: request.project_id.clone(),
576                namespace,
577                task_family: request.task_family.clone(),
578                subject: request.subject.clone(),
579                coverage_key,
580                decision: KnowledgeReuseDecision::NoPriorKnowledge,
581                reuse_reason: None,
582                skip_reason: Some("no reusable knowledge spaces were found".to_string()),
583                freshness_reason: None,
584                items: Vec::new(),
585            });
586        }
587
588        let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
589        let mut fresh_items = Vec::new();
590        let mut stale_items = Vec::new();
591        let mut freshest_reason = None;
592
593        for space in &spaces {
594            let items = self
595                .list_knowledge_items(&space.id, Some(&coverage_key))
596                .await?;
597            let coverage = self
598                .get_knowledge_coverage(&coverage_key, &space.id)
599                .await?;
600            for item in items {
601                if !item.status.is_active() {
602                    continue;
603                }
604                let Some(trust_level) = item.status.as_trust_level() else {
605                    continue;
606                };
607                if !trust_level.meets_floor(binding.trust_floor) {
608                    continue;
609                }
610                let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
611                    coverage
612                        .as_ref()
613                        .and_then(|coverage| coverage.freshness_expires_at_ms)
614                });
615                let pack_item = KnowledgePackItem {
616                    item_id: item.id.clone(),
617                    space_id: space.id.clone(),
618                    coverage_key: item.coverage_key.clone(),
619                    title: item.title.clone(),
620                    summary: item.summary.clone(),
621                    trust_level,
622                    status: item.status.to_string(),
623                    artifact_refs: item.artifact_refs.clone(),
624                    source_memory_ids: item.source_memory_ids.clone(),
625                    freshness_expires_at_ms,
626                };
627                if Self::is_fresh_enough(
628                    freshness_expires_at_ms,
629                    binding.freshness_ms,
630                    coverage
631                        .as_ref()
632                        .and_then(|coverage| coverage.last_promoted_at_ms),
633                    item.created_at_ms,
634                    now_ms,
635                ) {
636                    fresh_items.push(pack_item);
637                } else {
638                    freshest_reason = Some(match freshness_expires_at_ms {
639                        Some(expires_at_ms) => format!(
640                            "coverage `{}` in space `{}` expired at {}",
641                            coverage_key, space.id, expires_at_ms
642                        ),
643                        None => format!(
644                            "coverage `{}` in space `{}` lacks freshness metadata",
645                            coverage_key, space.id
646                        ),
647                    });
648                    stale_items.push(pack_item);
649                }
650            }
651        }
652
653        fresh_items.sort_by(|left, right| {
654            right
655                .trust_level
656                .rank()
657                .cmp(&left.trust_level.rank())
658                .then_with(|| {
659                    right
660                        .freshness_expires_at_ms
661                        .unwrap_or(0)
662                        .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
663                })
664                .then_with(|| left.title.cmp(&right.title))
665        });
666        stale_items.sort_by(|left, right| {
667            right
668                .trust_level
669                .rank()
670                .cmp(&left.trust_level.rank())
671                .then_with(|| left.title.cmp(&right.title))
672        });
673
674        if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
675            let selected = fresh_items
676                .into_iter()
677                .take(MAX_KNOWLEDGE_PACK_ITEMS)
678                .collect::<Vec<_>>();
679            let decision = match freshest_trust_level {
680                KnowledgeTrustLevel::ApprovedDefault => {
681                    KnowledgeReuseDecision::ReuseApprovedDefault
682                }
683                _ => KnowledgeReuseDecision::ReusePromoted,
684            };
685            let selected_count = selected.len();
686            return Ok(KnowledgePreflightResult {
687                project_id: request.project_id.clone(),
688                namespace,
689                task_family: request.task_family.clone(),
690                subject: request.subject.clone(),
691                coverage_key,
692                decision,
693                reuse_reason: Some(format!(
694                    "reusing {} promoted knowledge item(s) from {} space(s)",
695                    selected_count,
696                    spaces.len()
697                )),
698                skip_reason: None,
699                freshness_reason: None,
700                items: selected,
701            });
702        }
703
704        if !stale_items.is_empty() {
705            let selected = stale_items
706                .into_iter()
707                .take(MAX_KNOWLEDGE_PACK_ITEMS)
708                .collect::<Vec<_>>();
709            return Ok(KnowledgePreflightResult {
710                project_id: request.project_id.clone(),
711                namespace,
712                task_family: request.task_family.clone(),
713                subject: request.subject.clone(),
714                coverage_key,
715                decision: KnowledgeReuseDecision::RefreshRequired,
716                reuse_reason: None,
717                skip_reason: Some(
718                    "prior knowledge exists but is not fresh enough to reuse".to_string(),
719                ),
720                freshness_reason: freshest_reason.or_else(|| {
721                    Some("matching knowledge exists but freshness policy rejected it".to_string())
722                }),
723                items: selected,
724            });
725        }
726
727        Ok(KnowledgePreflightResult {
728            project_id: request.project_id.clone(),
729            namespace,
730            task_family: request.task_family.clone(),
731            subject: request.subject.clone(),
732            coverage_key,
733            decision: KnowledgeReuseDecision::NoPriorKnowledge,
734            reuse_reason: None,
735            skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
736            freshness_reason: None,
737            items: Vec::new(),
738        })
739    }
740
741    /// Retrieve context for a message
742    ///
743    /// This retrieves relevant chunks from all tiers and formats them
744    /// for injection into the prompt
745    pub async fn retrieve_context(
746        &self,
747        query: &str,
748        project_id: Option<&str>,
749        session_id: Option<&str>,
750        token_budget: Option<i64>,
751    ) -> MemoryResult<MemoryContext> {
752        let (context, _) = self
753            .retrieve_context_with_meta(query, project_id, session_id, token_budget)
754            .await?;
755        Ok(context)
756    }
757
758    /// Retrieve context plus retrieval metadata for observability.
759    pub async fn retrieve_context_with_meta(
760        &self,
761        query: &str,
762        project_id: Option<&str>,
763        session_id: Option<&str>,
764        token_budget: Option<i64>,
765    ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
766        let config = if let Some(pid) = project_id {
767            self.db.get_or_create_config(pid).await?
768        } else {
769            MemoryConfig::default()
770        };
771        let budget = token_budget.unwrap_or(config.token_budget);
772        let retrieval_limit = config.retrieval_k.max(1);
773
774        // Get recent session chunks
775        let current_session = if let Some(sid) = session_id {
776            self.db.get_session_chunks(sid).await?
777        } else {
778            Vec::new()
779        };
780
781        // Search for relevant history
782        let search_results = self
783            .search(query, None, project_id, session_id, Some(retrieval_limit))
784            .await?;
785
786        let mut score_min: Option<f64> = None;
787        let mut score_max: Option<f64> = None;
788        for result in &search_results {
789            score_min = Some(match score_min {
790                Some(current) => current.min(result.similarity),
791                None => result.similarity,
792            });
793            score_max = Some(match score_max {
794                Some(current) => current.max(result.similarity),
795                None => result.similarity,
796            });
797        }
798
799        let mut current_session = current_session;
800        let mut relevant_history = Vec::new();
801        let mut project_facts = Vec::new();
802
803        for result in search_results {
804            match result.chunk.tier {
805                MemoryTier::Project => {
806                    project_facts.push(result.chunk);
807                }
808                MemoryTier::Global => {
809                    project_facts.push(result.chunk);
810                }
811                MemoryTier::Session => {
812                    // Only add to relevant_history if not in current_session
813                    if !current_session.iter().any(|c| c.id == result.chunk.id) {
814                        relevant_history.push(result.chunk);
815                    }
816                }
817            }
818        }
819
820        // Calculate total tokens and trim if necessary
821        let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
822        total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
823        total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
824
825        // Trim to fit budget if necessary
826        if total_tokens > budget {
827            let excess = total_tokens - budget;
828            self.trim_context(
829                &mut current_session,
830                &mut relevant_history,
831                &mut project_facts,
832                excess,
833            )?;
834            total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
835                + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
836                + project_facts.iter().map(|c| c.token_count).sum::<i64>();
837        }
838
839        let context = MemoryContext {
840            current_session,
841            relevant_history,
842            project_facts,
843            total_tokens,
844        };
845        let chunks_total = context.current_session.len()
846            + context.relevant_history.len()
847            + context.project_facts.len();
848        let meta = MemoryRetrievalMeta {
849            used: chunks_total > 0,
850            chunks_total,
851            session_chunks: context.current_session.len(),
852            history_chunks: context.relevant_history.len(),
853            project_fact_chunks: context.project_facts.len(),
854            score_min,
855            score_max,
856        };
857
858        Ok((context, meta))
859    }
860
861    /// Trim context to fit within token budget
862    fn trim_context(
863        &self,
864        current_session: &mut Vec<MemoryChunk>,
865        relevant_history: &mut Vec<MemoryChunk>,
866        project_facts: &mut Vec<MemoryChunk>,
867        excess_tokens: i64,
868    ) -> MemoryResult<()> {
869        let mut tokens_to_remove = excess_tokens;
870
871        // First, trim relevant_history (less important than project_facts)
872        while tokens_to_remove > 0 && !relevant_history.is_empty() {
873            if let Some(chunk) = relevant_history.pop() {
874                tokens_to_remove -= chunk.token_count;
875            }
876        }
877
878        // If still over budget, trim project_facts
879        while tokens_to_remove > 0 && !project_facts.is_empty() {
880            if let Some(chunk) = project_facts.pop() {
881                tokens_to_remove -= chunk.token_count;
882            }
883        }
884
885        while tokens_to_remove > 0 && !current_session.is_empty() {
886            if let Some(chunk) = current_session.pop() {
887                tokens_to_remove -= chunk.token_count;
888            }
889        }
890
891        Ok(())
892    }
893
894    /// Clear session memory
895    pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
896        let count = self.db.clear_session_memory(session_id).await?;
897
898        // Log cleanup
899        self.db
900            .log_cleanup(
901                "manual",
902                MemoryTier::Session,
903                None,
904                Some(session_id),
905                count as i64,
906                0,
907            )
908            .await?;
909
910        Ok(count)
911    }
912
913    /// Clear project memory
914    pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
915        let count = self.db.clear_project_memory(project_id).await?;
916
917        // Log cleanup
918        self.db
919            .log_cleanup(
920                "manual",
921                MemoryTier::Project,
922                Some(project_id),
923                None,
924                count as i64,
925                0,
926            )
927            .await?;
928
929        Ok(count)
930    }
931
932    /// Get memory statistics
933    pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
934        self.db.get_stats().await
935    }
936
937    /// Get memory configuration for a project
938    pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
939        self.db.get_or_create_config(project_id).await
940    }
941
942    /// Update memory configuration for a project
943    pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
944        self.db.update_config(project_id, config).await
945    }
946
947    pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
948        self.db.get_node_by_uri(uri).await
949    }
950
951    pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
952        let nodes = self.db.list_directory(uri).await?;
953        let directories: Vec<MemoryNode> = nodes
954            .iter()
955            .filter(|n| n.node_type == NodeType::Directory)
956            .cloned()
957            .collect();
958        let files: Vec<MemoryNode> = nodes
959            .iter()
960            .filter(|n| n.node_type == NodeType::File)
961            .cloned()
962            .collect();
963
964        Ok(DirectoryListing {
965            uri: uri.to_string(),
966            nodes,
967            total_children: directories.len() + files.len(),
968            directories,
969            files,
970        })
971    }
972
973    pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
974        self.db.get_children_tree(uri, max_depth).await
975    }
976
977    pub async fn create_context_node(
978        &self,
979        uri: &str,
980        node_type: NodeType,
981        metadata: Option<serde_json::Value>,
982    ) -> MemoryResult<String> {
983        let parsed_uri =
984            ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
985        let parent_uri = parsed_uri.parent().map(|p| p.to_string());
986        self.db
987            .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
988            .await
989    }
990
991    pub async fn get_context_layer(
992        &self,
993        node_id: &str,
994        layer_type: LayerType,
995    ) -> MemoryResult<Option<MemoryLayer>> {
996        self.db.get_layer(node_id, layer_type).await
997    }
998
999    pub async fn store_content_with_layers(
1000        &self,
1001        uri: &str,
1002        content: &str,
1003        metadata: Option<serde_json::Value>,
1004    ) -> MemoryResult<String> {
1005        let parsed_uri =
1006            ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
1007        let node_type = if parsed_uri
1008            .last_segment()
1009            .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
1010            .unwrap_or(false)
1011        {
1012            NodeType::File
1013        } else {
1014            NodeType::Directory
1015        };
1016
1017        let parent_uri = parsed_uri.parent().map(|p| p.to_string());
1018        let node_id = self
1019            .db
1020            .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
1021            .await?;
1022
1023        let token_count = self.tokenizer.count_tokens(content) as i64;
1024        self.db
1025            .create_layer(&node_id, LayerType::L2, content, token_count, None)
1026            .await?;
1027
1028        Ok(node_id)
1029    }
1030
1031    pub async fn generate_layers_for_node(
1032        &self,
1033        node_id: &str,
1034        providers: &ProviderRegistry,
1035    ) -> MemoryResult<()> {
1036        let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1037        let l2_content = match l2_layer {
1038            Some(layer) => layer.content,
1039            None => return Ok(()),
1040        };
1041
1042        let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1043
1044        let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1045
1046        let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1047        let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1048
1049        if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1050            self.db
1051                .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1052                .await?;
1053        }
1054
1055        if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1056            self.db
1057                .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1058                .await?;
1059        }
1060
1061        Ok(())
1062    }
1063
1064    pub async fn get_layer_content(
1065        &self,
1066        node_id: &str,
1067        layer_type: LayerType,
1068    ) -> MemoryResult<Option<String>> {
1069        let layer = self.db.get_layer(node_id, layer_type).await?;
1070        Ok(layer.map(|l| l.content))
1071    }
1072
1073    pub async fn store_content_with_layers_auto(
1074        &self,
1075        uri: &str,
1076        content: &str,
1077        metadata: Option<serde_json::Value>,
1078        providers: Option<&ProviderRegistry>,
1079    ) -> MemoryResult<String> {
1080        let node_id = self
1081            .store_content_with_layers(uri, content, metadata)
1082            .await?;
1083
1084        if let Some(p) = providers {
1085            if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1086                tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1087            }
1088        }
1089
1090        Ok(node_id)
1091    }
1092
1093    /// Run cleanup based on retention policies
1094    pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1095        let mut total_cleaned = 0u64;
1096
1097        if let Some(pid) = project_id {
1098            // Get config for this project
1099            let config = self.db.get_or_create_config(pid).await?;
1100
1101            if config.auto_cleanup {
1102                // Clean up old session memory
1103                let cleaned = self
1104                    .db
1105                    .cleanup_old_sessions(config.session_retention_days)
1106                    .await?;
1107                total_cleaned += cleaned;
1108
1109                if cleaned > 0 {
1110                    self.db
1111                        .log_cleanup(
1112                            "auto",
1113                            MemoryTier::Session,
1114                            Some(pid),
1115                            None,
1116                            cleaned as i64,
1117                            0,
1118                        )
1119                        .await?;
1120                }
1121            }
1122        } else {
1123            // Clean up all projects with auto_cleanup enabled
1124            // This would require listing all projects, for now just clean session memory
1125            // with a default retention period
1126            let cleaned = self.db.cleanup_old_sessions(30).await?;
1127            total_cleaned += cleaned;
1128        }
1129
1130        // Vacuum if significant cleanup occurred
1131        if total_cleaned > 100 {
1132            self.db.vacuum().await?;
1133        }
1134
1135        Ok(total_cleaned)
1136    }
1137
1138    /// Check if cleanup is needed and run it
1139    async fn maybe_cleanup(&self, project_id: &Option<String>) -> MemoryResult<()> {
1140        if let Some(pid) = project_id {
1141            let stats = self.db.get_stats().await?;
1142            let config = self.db.get_or_create_config(pid).await?;
1143
1144            // Check if we're over the chunk limit
1145            if stats.project_chunks > config.max_chunks {
1146                // Remove oldest chunks
1147                let excess = stats.project_chunks - config.max_chunks;
1148                // This would require a new DB method to delete oldest chunks
1149                // For now, just log
1150                tracing::info!("Project {} has {} excess chunks", pid, excess);
1151            }
1152        }
1153
1154        Ok(())
1155    }
1156
1157    /// Get cleanup log entries
1158    pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1159        // This would be implemented in the DB layer
1160        // For now, return empty
1161        Ok(Vec::new())
1162    }
1163
1164    /// Count tokens in text
1165    pub fn count_tokens(&self, text: &str) -> usize {
1166        self.tokenizer.count_tokens(text)
1167    }
1168
1169    /// Report embedding backend health for UI/telemetry.
1170    pub async fn embedding_health(&self) -> EmbeddingHealth {
1171        let service = self.embedding_service.lock().await;
1172        if service.is_available() {
1173            EmbeddingHealth {
1174                status: "ok".to_string(),
1175                reason: None,
1176            }
1177        } else {
1178            EmbeddingHealth {
1179                status: "degraded_disabled".to_string(),
1180                reason: service.disabled_reason().map(ToString::to_string),
1181            }
1182        }
1183    }
1184
1185    /// Consolidate a session's memory into a summary chunk using the cheapest available provider.
1186    pub async fn consolidate_session(
1187        &self,
1188        session_id: &str,
1189        project_id: Option<&str>,
1190        providers: &ProviderRegistry,
1191        config: &MemoryConsolidationConfig,
1192    ) -> MemoryResult<Option<String>> {
1193        if !config.enabled {
1194            return Ok(None);
1195        }
1196
1197        let chunks = self.db.get_session_chunks(session_id).await?;
1198        if chunks.is_empty() {
1199            return Ok(None);
1200        }
1201
1202        // Assemble text
1203        let mut text_parts = Vec::new();
1204        for chunk in &chunks {
1205            text_parts.push(chunk.content.clone());
1206        }
1207        let full_text = text_parts.join("\n\n---\n\n");
1208
1209        // Build prompt
1210        let prompt = format!(
1211            "Please provide a concise but comprehensive summary of the following chat session. \
1212            Focus on the key decisions, technical details, code changes, and unresolved issues. \
1213            Do NOT include conversational filler, greetings, or sign-offs. \
1214            This summary will be used as long-term memory to recall the context of this work.\n\n\
1215            Session transcripts:\n\n{}",
1216            full_text
1217        );
1218
1219        let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1220        let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1221
1222        let summary_text = match providers
1223            .complete_cheapest(&prompt, provider_override, model_override)
1224            .await
1225        {
1226            Ok(s) => s,
1227            Err(e) => {
1228                tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1229                return Ok(None);
1230            }
1231        };
1232
1233        if summary_text.trim().is_empty() {
1234            return Ok(None);
1235        }
1236
1237        // Generate embedding for the summary
1238        let embedding = {
1239            let service = self.embedding_service.lock().await;
1240            service
1241                .embed(&summary_text)
1242                .await
1243                .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1244        };
1245
1246        // Store the summary chunk
1247        let chunk_id = uuid::Uuid::new_v4().to_string();
1248        let chunk = MemoryChunk {
1249            id: chunk_id,
1250            content: summary_text.clone(),
1251            tier: MemoryTier::Project,
1252            session_id: None, // The summary belongs to the project, not the ephemeral session
1253            project_id: project_id.map(ToString::to_string),
1254            created_at: Utc::now(),
1255            source: "consolidation".to_string(),
1256            token_count: self.count_tokens(&summary_text) as i64,
1257            source_path: None,
1258            source_mtime: None,
1259            source_size: None,
1260            source_hash: None,
1261            metadata: None,
1262        };
1263
1264        self.db.store_chunk(&chunk, &embedding).await?;
1265
1266        // Clear original chunks now that they are consolidated
1267        self.db.clear_session_memory(session_id).await?;
1268
1269        tracing::info!(
1270            "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1271        );
1272
1273        Ok(Some(summary_text))
1274    }
1275}
1276
1277/// Create memory manager with default database path
1278pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1279    let db_path = app_data_dir.join("tandem_memory.db");
1280    MemoryManager::new(&db_path).await
1281}