1use crate::chunking::{chunk_text_semantic, ChunkingConfig, Tokenizer};
5use crate::context_layers::ContextLayerGenerator;
6use crate::context_uri::ContextUri;
7use crate::db::MemoryDatabase;
8use crate::embeddings::EmbeddingService;
9use crate::types::{
10 CleanupLogEntry, DirectoryListing, EmbeddingHealth, KnowledgeCoverageRecord,
11 KnowledgeItemRecord, KnowledgePromotionRequest, KnowledgePromotionResult, KnowledgeSpaceRecord,
12 LayerType, MemoryChunk, MemoryConfig, MemoryContext, MemoryError, MemoryLayer, MemoryNode,
13 MemoryResult, MemoryRetrievalMeta, MemorySearchResult, MemoryStats, MemoryTier, NodeType,
14 StoreMessageRequest, TreeNode,
15};
16use chrono::Utc;
17use std::collections::HashSet;
18use std::path::Path;
19use std::sync::Arc;
20use tandem_orchestrator::{
21 build_knowledge_coverage_key, normalize_knowledge_segment, KnowledgeBinding, KnowledgePackItem,
22 KnowledgePreflightRequest, KnowledgePreflightResult, KnowledgeReuseDecision,
23 KnowledgeReuseMode, KnowledgeScope, KnowledgeTrustLevel,
24};
25use tandem_providers::{MemoryConsolidationConfig, ProviderRegistry};
26use tokio::sync::Mutex;
27
28pub struct MemoryManager {
30 db: Arc<MemoryDatabase>,
31 embedding_service: Arc<Mutex<EmbeddingService>>,
32 tokenizer: Tokenizer,
33}
34
35const MAX_KNOWLEDGE_PACK_ITEMS: usize = 3;
36
37impl MemoryManager {
38 fn is_malformed_database_error(err: &crate::types::MemoryError) -> bool {
39 err.to_string()
40 .to_lowercase()
41 .contains("database disk image is malformed")
42 }
43
44 pub fn db(&self) -> &Arc<MemoryDatabase> {
45 &self.db
46 }
47
48 pub async fn new(db_path: &Path) -> MemoryResult<Self> {
50 let db = Arc::new(MemoryDatabase::new(db_path).await?);
51 let embedding_service = Arc::new(Mutex::new(EmbeddingService::new()));
52 let tokenizer = Tokenizer::new()?;
53
54 Ok(Self {
55 db,
56 embedding_service,
57 tokenizer,
58 })
59 }
60
61 pub async fn store_message(&self, request: StoreMessageRequest) -> MemoryResult<Vec<String>> {
68 if self
69 .db
70 .ensure_vector_tables_healthy()
71 .await
72 .unwrap_or(false)
73 {
74 tracing::warn!("Memory vector tables were repaired before storing message chunks");
75 }
76
77 let config = if let Some(ref pid) = request.project_id {
78 self.db.get_or_create_config(pid).await?
79 } else {
80 MemoryConfig::default()
81 };
82
83 let chunking_config = ChunkingConfig {
85 chunk_size: config.chunk_size as usize,
86 chunk_overlap: config.chunk_overlap as usize,
87 separator: None,
88 };
89
90 let text_chunks = chunk_text_semantic(&request.content, &chunking_config)?;
91
92 if text_chunks.is_empty() {
93 return Ok(Vec::new());
94 }
95
96 let mut chunk_ids = Vec::with_capacity(text_chunks.len());
97 let embedding_service = self.embedding_service.lock().await;
98
99 for text_chunk in text_chunks {
100 let chunk_id = uuid::Uuid::new_v4().to_string();
101
102 let embedding = embedding_service.embed(&text_chunk.content).await?;
104
105 let chunk = MemoryChunk {
107 id: chunk_id.clone(),
108 content: text_chunk.content,
109 tier: request.tier,
110 session_id: request.session_id.clone(),
111 project_id: request.project_id.clone(),
112 source: request.source.clone(),
113 source_path: request.source_path.clone(),
114 source_mtime: request.source_mtime,
115 source_size: request.source_size,
116 source_hash: request.source_hash.clone(),
117 created_at: Utc::now(),
118 token_count: text_chunk.token_count as i64,
119 metadata: request.metadata.clone(),
120 };
121
122 if let Err(err) = self.db.store_chunk(&chunk, &embedding).await {
124 tracing::warn!("Failed to store memory chunk {}: {}", chunk.id, err);
125 let repaired = {
126 let repaired_after_error =
127 self.db.try_repair_after_error(&err).await.unwrap_or(false);
128 repaired_after_error
129 || self
130 .db
131 .ensure_vector_tables_healthy()
132 .await
133 .unwrap_or(false)
134 };
135 if repaired {
136 tracing::warn!(
137 "Retrying memory chunk insert after vector table repair: {}",
138 chunk.id
139 );
140 if let Err(retry_err) = self.db.store_chunk(&chunk, &embedding).await {
141 if Self::is_malformed_database_error(&retry_err) {
142 tracing::warn!(
143 "Memory DB still malformed after vector repair. Resetting memory tables and retrying chunk insert: {}",
144 chunk.id
145 );
146 self.db.reset_all_memory_tables().await?;
147 self.db.store_chunk(&chunk, &embedding).await?;
148 } else {
149 return Err(retry_err);
150 }
151 }
152 } else {
153 return Err(err);
154 }
155 }
156 chunk_ids.push(chunk_id);
157 }
158
159 if config.auto_cleanup {
161 self.maybe_cleanup(&request.project_id).await?;
162 }
163
164 Ok(chunk_ids)
165 }
166
167 pub async fn search(
169 &self,
170 query: &str,
171 tier: Option<MemoryTier>,
172 project_id: Option<&str>,
173 session_id: Option<&str>,
174 limit: Option<i64>,
175 ) -> MemoryResult<Vec<MemorySearchResult>> {
176 let effective_limit = limit.unwrap_or(5);
177
178 let embedding_service = self.embedding_service.lock().await;
180 let query_embedding = embedding_service.embed(query).await?;
181 drop(embedding_service);
182
183 let mut results = Vec::new();
184
185 let tiers_to_search = match tier {
187 Some(t) => vec![t],
188 None => {
189 if project_id.is_some() {
190 vec![MemoryTier::Session, MemoryTier::Project, MemoryTier::Global]
191 } else {
192 vec![MemoryTier::Session, MemoryTier::Global]
193 }
194 }
195 };
196
197 for search_tier in tiers_to_search {
198 let tier_results = match self
199 .db
200 .search_similar(
201 &query_embedding,
202 search_tier,
203 project_id,
204 session_id,
205 effective_limit,
206 )
207 .await
208 {
209 Ok(results) => results,
210 Err(err) => {
211 tracing::warn!(
212 "Memory tier search failed for {:?}: {}. Attempting vector repair.",
213 search_tier,
214 err
215 );
216 let repaired = {
217 let repaired_after_error =
218 self.db.try_repair_after_error(&err).await.unwrap_or(false);
219 repaired_after_error
220 || self
221 .db
222 .ensure_vector_tables_healthy()
223 .await
224 .unwrap_or(false)
225 };
226 if repaired {
227 match self
228 .db
229 .search_similar(
230 &query_embedding,
231 search_tier,
232 project_id,
233 session_id,
234 effective_limit,
235 )
236 .await
237 {
238 Ok(results) => results,
239 Err(retry_err) => {
240 tracing::warn!(
241 "Memory tier search still failing for {:?} after repair: {}",
242 search_tier,
243 retry_err
244 );
245 continue;
246 }
247 }
248 } else {
249 continue;
250 }
251 }
252 };
253
254 for (chunk, distance) in tier_results {
255 let similarity = 1.0 - distance.clamp(0.0, 1.0);
259
260 results.push(MemorySearchResult { chunk, similarity });
261 }
262 }
263
264 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap());
266 results.truncate(effective_limit as usize);
267
268 Ok(results)
269 }
270
271 pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
272 self.db.upsert_knowledge_space(space).await
273 }
274
275 pub async fn get_knowledge_space(
276 &self,
277 id: &str,
278 ) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
279 self.db.get_knowledge_space(id).await
280 }
281
282 pub async fn list_knowledge_spaces(
283 &self,
284 project_id: Option<&str>,
285 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
286 self.db.list_knowledge_spaces(project_id).await
287 }
288
289 pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
290 self.db.upsert_knowledge_item(item).await
291 }
292
293 pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
294 self.db.get_knowledge_item(id).await
295 }
296
297 pub async fn list_knowledge_items(
298 &self,
299 space_id: &str,
300 coverage_key: Option<&str>,
301 ) -> MemoryResult<Vec<KnowledgeItemRecord>> {
302 self.db.list_knowledge_items(space_id, coverage_key).await
303 }
304
305 pub async fn upsert_knowledge_coverage(
306 &self,
307 coverage: &KnowledgeCoverageRecord,
308 ) -> MemoryResult<()> {
309 self.db.upsert_knowledge_coverage(coverage).await
310 }
311
312 pub async fn get_knowledge_coverage(
313 &self,
314 coverage_key: &str,
315 space_id: &str,
316 ) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
317 self.db.get_knowledge_coverage(coverage_key, space_id).await
318 }
319
320 pub async fn promote_knowledge_item(
321 &self,
322 request: &KnowledgePromotionRequest,
323 ) -> MemoryResult<Option<KnowledgePromotionResult>> {
324 self.db.promote_knowledge_item(request).await
325 }
326
327 fn space_matches_ref(
328 space: &KnowledgeSpaceRecord,
329 space_ref: &tandem_orchestrator::KnowledgeSpaceRef,
330 project_id: &str,
331 ) -> bool {
332 if space.scope != space_ref.scope {
333 return false;
334 }
335 match space_ref.scope {
336 KnowledgeScope::Project | KnowledgeScope::Run => {
337 let target_project = space_ref.project_id.as_deref().unwrap_or(project_id);
338 if space.project_id.as_deref() != Some(target_project) {
339 return false;
340 }
341 }
342 KnowledgeScope::Global => {}
343 }
344 if let Some(namespace) = space_ref.namespace.as_deref() {
345 if space.namespace.as_deref() != Some(namespace) {
346 return false;
347 }
348 }
349 true
350 }
351
352 fn select_preflight_namespace(
353 binding: &KnowledgeBinding,
354 spaces: &[KnowledgeSpaceRecord],
355 ) -> Option<String> {
356 if let Some(namespace) = binding.namespace.clone() {
357 return Some(namespace);
358 }
359 if binding.read_spaces.len() == 1 {
360 if let Some(namespace) = binding.read_spaces[0].namespace.clone() {
361 return Some(namespace);
362 }
363 }
364 if spaces.len() == 1 {
365 return spaces[0].namespace.clone();
366 }
367 let mut unique = HashSet::new();
368 for space in spaces {
369 if let Some(namespace) = space.namespace.as_ref() {
370 unique.insert(namespace);
371 }
372 }
373 if unique.len() == 1 {
374 unique.into_iter().next().map(|value| value.to_string())
375 } else {
376 None
377 }
378 }
379
380 fn binding_uses_explicit_spaces(binding: &KnowledgeBinding) -> bool {
381 !binding.read_spaces.is_empty() || !binding.promote_spaces.is_empty()
382 }
383
384 fn namespace_matches(space_namespace: Option<&str>, binding_namespace: Option<&str>) -> bool {
385 match (space_namespace, binding_namespace) {
386 (None, None) => true,
387 (Some(space), Some(binding)) => {
388 normalize_knowledge_segment(space) == normalize_knowledge_segment(binding)
389 }
390 _ => false,
391 }
392 }
393
394 fn is_fresh_enough(
395 freshness_expires_at_ms: Option<u64>,
396 freshness_policy_ms: Option<u64>,
397 coverage_last_promoted_at_ms: Option<u64>,
398 item_created_at_ms: u64,
399 now_ms: u64,
400 ) -> bool {
401 if let Some(expires_at_ms) = freshness_expires_at_ms {
402 return expires_at_ms > now_ms;
403 }
404 let Some(policy_ms) = freshness_policy_ms else {
405 return true;
406 };
407 let basis_ms = coverage_last_promoted_at_ms.unwrap_or(item_created_at_ms);
408 now_ms.saturating_sub(basis_ms) <= policy_ms
409 }
410
411 async fn resolve_preflight_spaces(
412 &self,
413 request: &KnowledgePreflightRequest,
414 _coverage_key: &str,
415 ) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
416 let binding = &request.binding;
417 let mut spaces = Vec::new();
418 let mut seen_space_ids = HashSet::new();
419
420 let push_space = |space: KnowledgeSpaceRecord,
421 spaces: &mut Vec<KnowledgeSpaceRecord>,
422 seen_space_ids: &mut HashSet<String>| {
423 if seen_space_ids.insert(space.id.clone()) {
424 spaces.push(space);
425 }
426 };
427
428 if Self::binding_uses_explicit_spaces(binding) {
429 for space_ref in binding
430 .read_spaces
431 .iter()
432 .chain(binding.promote_spaces.iter())
433 {
434 if let Some(space_id) = space_ref.space_id.as_deref() {
435 if let Some(space) = self.get_knowledge_space(space_id).await? {
436 push_space(space, &mut spaces, &mut seen_space_ids);
437 }
438 continue;
439 }
440
441 match space_ref.scope {
442 KnowledgeScope::Run => {}
443 KnowledgeScope::Project => {
444 let candidate_project_id = space_ref
445 .project_id
446 .as_deref()
447 .unwrap_or(&request.project_id);
448 let project_spaces = self
449 .list_knowledge_spaces(Some(candidate_project_id))
450 .await?;
451 for space in project_spaces.into_iter().filter(|space| {
452 Self::space_matches_ref(space, space_ref, &request.project_id)
453 }) {
454 push_space(space, &mut spaces, &mut seen_space_ids);
455 }
456 }
457 KnowledgeScope::Global => {
458 let global_spaces = self.list_knowledge_spaces(None).await?;
459 for space in global_spaces.into_iter().filter(|space| {
460 Self::space_matches_ref(space, space_ref, &request.project_id)
461 }) {
462 push_space(space, &mut spaces, &mut seen_space_ids);
463 }
464 }
465 }
466 }
467 return Ok(spaces);
468 }
469
470 if request.project_id.trim().is_empty() {
471 return Ok(spaces);
472 }
473
474 let project_spaces = self
475 .list_knowledge_spaces(Some(&request.project_id))
476 .await?;
477 let requested_namespace = if binding.namespace.is_some() {
478 binding.namespace.clone()
479 } else {
480 Self::select_preflight_namespace(binding, &project_spaces)
481 };
482 let Some(requested_namespace) = requested_namespace else {
483 return Ok(spaces);
484 };
485
486 for space in project_spaces.into_iter().filter(|space| {
487 space.scope == KnowledgeScope::Project
488 && Self::namespace_matches(
489 space.namespace.as_deref(),
490 Some(requested_namespace.as_str()),
491 )
492 }) {
493 push_space(space, &mut spaces, &mut seen_space_ids);
494 }
495 Ok(spaces)
496 }
497
498 pub async fn preflight_knowledge(
499 &self,
500 request: &KnowledgePreflightRequest,
501 ) -> MemoryResult<KnowledgePreflightResult> {
502 let binding = &request.binding;
503 let project_spaces = if request.project_id.trim().is_empty() {
504 Vec::new()
505 } else {
506 self.list_knowledge_spaces(Some(&request.project_id))
507 .await?
508 };
509 let namespace = binding
510 .namespace
511 .clone()
512 .or_else(|| Self::select_preflight_namespace(binding, &project_spaces));
513 let coverage_key = build_knowledge_coverage_key(
514 &request.project_id,
515 namespace.as_deref(),
516 &request.task_family,
517 &request.subject,
518 );
519
520 if !binding.enabled || binding.reuse_mode == KnowledgeReuseMode::Disabled {
521 return Ok(KnowledgePreflightResult {
522 project_id: request.project_id.clone(),
523 namespace,
524 task_family: request.task_family.clone(),
525 subject: request.subject.clone(),
526 coverage_key,
527 decision: KnowledgeReuseDecision::Disabled,
528 reuse_reason: None,
529 skip_reason: Some("knowledge reuse is disabled for this binding".to_string()),
530 freshness_reason: None,
531 items: Vec::new(),
532 });
533 }
534
535 let spaces = self
536 .resolve_preflight_spaces(request, &coverage_key)
537 .await?;
538 if spaces.is_empty() {
539 return Ok(KnowledgePreflightResult {
540 project_id: request.project_id.clone(),
541 namespace,
542 task_family: request.task_family.clone(),
543 subject: request.subject.clone(),
544 coverage_key,
545 decision: KnowledgeReuseDecision::NoPriorKnowledge,
546 reuse_reason: None,
547 skip_reason: Some("no reusable knowledge spaces were found".to_string()),
548 freshness_reason: None,
549 items: Vec::new(),
550 });
551 }
552
553 let now_ms = chrono::Utc::now().timestamp_millis().max(0) as u64;
554 let mut fresh_items = Vec::new();
555 let mut stale_items = Vec::new();
556 let mut freshest_reason = None;
557
558 for space in &spaces {
559 let items = self
560 .list_knowledge_items(&space.id, Some(&coverage_key))
561 .await?;
562 let coverage = self
563 .get_knowledge_coverage(&coverage_key, &space.id)
564 .await?;
565 for item in items {
566 if !item.status.is_active() {
567 continue;
568 }
569 let Some(trust_level) = item.status.as_trust_level() else {
570 continue;
571 };
572 if !trust_level.meets_floor(binding.trust_floor) {
573 continue;
574 }
575 let freshness_expires_at_ms = item.freshness_expires_at_ms.or_else(|| {
576 coverage
577 .as_ref()
578 .and_then(|coverage| coverage.freshness_expires_at_ms)
579 });
580 let pack_item = KnowledgePackItem {
581 item_id: item.id.clone(),
582 space_id: space.id.clone(),
583 coverage_key: item.coverage_key.clone(),
584 title: item.title.clone(),
585 summary: item.summary.clone(),
586 trust_level,
587 status: item.status.to_string(),
588 artifact_refs: item.artifact_refs.clone(),
589 source_memory_ids: item.source_memory_ids.clone(),
590 freshness_expires_at_ms,
591 };
592 if Self::is_fresh_enough(
593 freshness_expires_at_ms,
594 binding.freshness_ms,
595 coverage
596 .as_ref()
597 .and_then(|coverage| coverage.last_promoted_at_ms),
598 item.created_at_ms,
599 now_ms,
600 ) {
601 fresh_items.push(pack_item);
602 } else {
603 freshest_reason = Some(match freshness_expires_at_ms {
604 Some(expires_at_ms) => format!(
605 "coverage `{}` in space `{}` expired at {}",
606 coverage_key, space.id, expires_at_ms
607 ),
608 None => format!(
609 "coverage `{}` in space `{}` lacks freshness metadata",
610 coverage_key, space.id
611 ),
612 });
613 stale_items.push(pack_item);
614 }
615 }
616 }
617
618 fresh_items.sort_by(|left, right| {
619 right
620 .trust_level
621 .rank()
622 .cmp(&left.trust_level.rank())
623 .then_with(|| {
624 right
625 .freshness_expires_at_ms
626 .unwrap_or(0)
627 .cmp(&left.freshness_expires_at_ms.unwrap_or(0))
628 })
629 .then_with(|| left.title.cmp(&right.title))
630 });
631 stale_items.sort_by(|left, right| {
632 right
633 .trust_level
634 .rank()
635 .cmp(&left.trust_level.rank())
636 .then_with(|| left.title.cmp(&right.title))
637 });
638
639 if let Some(freshest_trust_level) = fresh_items.first().map(|item| item.trust_level) {
640 let selected = fresh_items
641 .into_iter()
642 .take(MAX_KNOWLEDGE_PACK_ITEMS)
643 .collect::<Vec<_>>();
644 let decision = match freshest_trust_level {
645 KnowledgeTrustLevel::ApprovedDefault => {
646 KnowledgeReuseDecision::ReuseApprovedDefault
647 }
648 _ => KnowledgeReuseDecision::ReusePromoted,
649 };
650 let selected_count = selected.len();
651 return Ok(KnowledgePreflightResult {
652 project_id: request.project_id.clone(),
653 namespace,
654 task_family: request.task_family.clone(),
655 subject: request.subject.clone(),
656 coverage_key,
657 decision,
658 reuse_reason: Some(format!(
659 "reusing {} promoted knowledge item(s) from {} space(s)",
660 selected_count,
661 spaces.len()
662 )),
663 skip_reason: None,
664 freshness_reason: None,
665 items: selected,
666 });
667 }
668
669 if !stale_items.is_empty() {
670 let selected = stale_items
671 .into_iter()
672 .take(MAX_KNOWLEDGE_PACK_ITEMS)
673 .collect::<Vec<_>>();
674 return Ok(KnowledgePreflightResult {
675 project_id: request.project_id.clone(),
676 namespace,
677 task_family: request.task_family.clone(),
678 subject: request.subject.clone(),
679 coverage_key,
680 decision: KnowledgeReuseDecision::RefreshRequired,
681 reuse_reason: None,
682 skip_reason: Some(
683 "prior knowledge exists but is not fresh enough to reuse".to_string(),
684 ),
685 freshness_reason: freshest_reason.or_else(|| {
686 Some("matching knowledge exists but freshness policy rejected it".to_string())
687 }),
688 items: selected,
689 });
690 }
691
692 Ok(KnowledgePreflightResult {
693 project_id: request.project_id.clone(),
694 namespace,
695 task_family: request.task_family.clone(),
696 subject: request.subject.clone(),
697 coverage_key,
698 decision: KnowledgeReuseDecision::NoPriorKnowledge,
699 reuse_reason: None,
700 skip_reason: Some("no active promoted knowledge matched this coverage key".to_string()),
701 freshness_reason: None,
702 items: Vec::new(),
703 })
704 }
705
706 pub async fn retrieve_context(
711 &self,
712 query: &str,
713 project_id: Option<&str>,
714 session_id: Option<&str>,
715 token_budget: Option<i64>,
716 ) -> MemoryResult<MemoryContext> {
717 let (context, _) = self
718 .retrieve_context_with_meta(query, project_id, session_id, token_budget)
719 .await?;
720 Ok(context)
721 }
722
723 pub async fn retrieve_context_with_meta(
725 &self,
726 query: &str,
727 project_id: Option<&str>,
728 session_id: Option<&str>,
729 token_budget: Option<i64>,
730 ) -> MemoryResult<(MemoryContext, MemoryRetrievalMeta)> {
731 let config = if let Some(pid) = project_id {
732 self.db.get_or_create_config(pid).await?
733 } else {
734 MemoryConfig::default()
735 };
736 let budget = token_budget.unwrap_or(config.token_budget);
737 let retrieval_limit = config.retrieval_k.max(1);
738
739 let current_session = if let Some(sid) = session_id {
741 self.db.get_session_chunks(sid).await?
742 } else {
743 Vec::new()
744 };
745
746 let search_results = self
748 .search(query, None, project_id, session_id, Some(retrieval_limit))
749 .await?;
750
751 let mut score_min: Option<f64> = None;
752 let mut score_max: Option<f64> = None;
753 for result in &search_results {
754 score_min = Some(match score_min {
755 Some(current) => current.min(result.similarity),
756 None => result.similarity,
757 });
758 score_max = Some(match score_max {
759 Some(current) => current.max(result.similarity),
760 None => result.similarity,
761 });
762 }
763
764 let mut current_session = current_session;
765 let mut relevant_history = Vec::new();
766 let mut project_facts = Vec::new();
767
768 for result in search_results {
769 match result.chunk.tier {
770 MemoryTier::Project => {
771 project_facts.push(result.chunk);
772 }
773 MemoryTier::Global => {
774 project_facts.push(result.chunk);
775 }
776 MemoryTier::Session => {
777 if !current_session.iter().any(|c| c.id == result.chunk.id) {
779 relevant_history.push(result.chunk);
780 }
781 }
782 }
783 }
784
785 let mut total_tokens: i64 = current_session.iter().map(|c| c.token_count).sum();
787 total_tokens += relevant_history.iter().map(|c| c.token_count).sum::<i64>();
788 total_tokens += project_facts.iter().map(|c| c.token_count).sum::<i64>();
789
790 if total_tokens > budget {
792 let excess = total_tokens - budget;
793 self.trim_context(
794 &mut current_session,
795 &mut relevant_history,
796 &mut project_facts,
797 excess,
798 )?;
799 total_tokens = current_session.iter().map(|c| c.token_count).sum::<i64>()
800 + relevant_history.iter().map(|c| c.token_count).sum::<i64>()
801 + project_facts.iter().map(|c| c.token_count).sum::<i64>();
802 }
803
804 let context = MemoryContext {
805 current_session,
806 relevant_history,
807 project_facts,
808 total_tokens,
809 };
810 let chunks_total = context.current_session.len()
811 + context.relevant_history.len()
812 + context.project_facts.len();
813 let meta = MemoryRetrievalMeta {
814 used: chunks_total > 0,
815 chunks_total,
816 session_chunks: context.current_session.len(),
817 history_chunks: context.relevant_history.len(),
818 project_fact_chunks: context.project_facts.len(),
819 score_min,
820 score_max,
821 };
822
823 Ok((context, meta))
824 }
825
826 fn trim_context(
828 &self,
829 current_session: &mut Vec<MemoryChunk>,
830 relevant_history: &mut Vec<MemoryChunk>,
831 project_facts: &mut Vec<MemoryChunk>,
832 excess_tokens: i64,
833 ) -> MemoryResult<()> {
834 let mut tokens_to_remove = excess_tokens;
835
836 while tokens_to_remove > 0 && !relevant_history.is_empty() {
838 if let Some(chunk) = relevant_history.pop() {
839 tokens_to_remove -= chunk.token_count;
840 }
841 }
842
843 while tokens_to_remove > 0 && !project_facts.is_empty() {
845 if let Some(chunk) = project_facts.pop() {
846 tokens_to_remove -= chunk.token_count;
847 }
848 }
849
850 while tokens_to_remove > 0 && !current_session.is_empty() {
851 if let Some(chunk) = current_session.pop() {
852 tokens_to_remove -= chunk.token_count;
853 }
854 }
855
856 Ok(())
857 }
858
859 pub async fn clear_session(&self, session_id: &str) -> MemoryResult<u64> {
861 let count = self.db.clear_session_memory(session_id).await?;
862
863 self.db
865 .log_cleanup(
866 "manual",
867 MemoryTier::Session,
868 None,
869 Some(session_id),
870 count as i64,
871 0,
872 )
873 .await?;
874
875 Ok(count)
876 }
877
878 pub async fn clear_project(&self, project_id: &str) -> MemoryResult<u64> {
880 let count = self.db.clear_project_memory(project_id).await?;
881
882 self.db
884 .log_cleanup(
885 "manual",
886 MemoryTier::Project,
887 Some(project_id),
888 None,
889 count as i64,
890 0,
891 )
892 .await?;
893
894 Ok(count)
895 }
896
897 pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
899 self.db.get_stats().await
900 }
901
902 pub async fn get_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
904 self.db.get_or_create_config(project_id).await
905 }
906
907 pub async fn set_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
909 self.db.update_config(project_id, config).await
910 }
911
912 pub async fn resolve_uri(&self, uri: &str) -> MemoryResult<Option<MemoryNode>> {
913 self.db.get_node_by_uri(uri).await
914 }
915
916 pub async fn list_directory(&self, uri: &str) -> MemoryResult<DirectoryListing> {
917 let nodes = self.db.list_directory(uri).await?;
918 let directories: Vec<MemoryNode> = nodes
919 .iter()
920 .filter(|n| n.node_type == NodeType::Directory)
921 .cloned()
922 .collect();
923 let files: Vec<MemoryNode> = nodes
924 .iter()
925 .filter(|n| n.node_type == NodeType::File)
926 .cloned()
927 .collect();
928
929 Ok(DirectoryListing {
930 uri: uri.to_string(),
931 nodes,
932 total_children: directories.len() + files.len(),
933 directories,
934 files,
935 })
936 }
937
938 pub async fn tree(&self, uri: &str, max_depth: usize) -> MemoryResult<Vec<TreeNode>> {
939 self.db.get_children_tree(uri, max_depth).await
940 }
941
942 pub async fn create_context_node(
943 &self,
944 uri: &str,
945 node_type: NodeType,
946 metadata: Option<serde_json::Value>,
947 ) -> MemoryResult<String> {
948 let parsed_uri =
949 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
950 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
951 self.db
952 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
953 .await
954 }
955
956 pub async fn get_context_layer(
957 &self,
958 node_id: &str,
959 layer_type: LayerType,
960 ) -> MemoryResult<Option<MemoryLayer>> {
961 self.db.get_layer(node_id, layer_type).await
962 }
963
964 pub async fn store_content_with_layers(
965 &self,
966 uri: &str,
967 content: &str,
968 metadata: Option<serde_json::Value>,
969 ) -> MemoryResult<String> {
970 let parsed_uri =
971 ContextUri::parse(uri).map_err(|e| MemoryError::InvalidConfig(e.message))?;
972 let node_type = if parsed_uri
973 .last_segment()
974 .map(|s| s.ends_with(".md") || s.ends_with(".txt") || s.contains("."))
975 .unwrap_or(false)
976 {
977 NodeType::File
978 } else {
979 NodeType::Directory
980 };
981
982 let parent_uri = parsed_uri.parent().map(|p| p.to_string());
983 let node_id = self
984 .db
985 .create_node(uri, parent_uri.as_deref(), node_type, metadata.as_ref())
986 .await?;
987
988 let token_count = self.tokenizer.count_tokens(content) as i64;
989 self.db
990 .create_layer(&node_id, LayerType::L2, content, token_count, None)
991 .await?;
992
993 Ok(node_id)
994 }
995
996 pub async fn generate_layers_for_node(
997 &self,
998 node_id: &str,
999 providers: &ProviderRegistry,
1000 ) -> MemoryResult<()> {
1001 let l2_layer = self.db.get_layer(node_id, LayerType::L2).await?;
1002 let l2_content = match l2_layer {
1003 Some(layer) => layer.content,
1004 None => return Ok(()),
1005 };
1006
1007 let generator = ContextLayerGenerator::new(Arc::new(providers.clone()));
1008
1009 let (l0_content, l1_content) = generator.generate_layers(&l2_content).await?;
1010
1011 let l0_tokens = self.tokenizer.count_tokens(&l0_content) as i64;
1012 let l1_tokens = self.tokenizer.count_tokens(&l1_content) as i64;
1013
1014 if self.db.get_layer(node_id, LayerType::L0).await?.is_none() {
1015 self.db
1016 .create_layer(node_id, LayerType::L0, &l0_content, l0_tokens, None)
1017 .await?;
1018 }
1019
1020 if self.db.get_layer(node_id, LayerType::L1).await?.is_none() {
1021 self.db
1022 .create_layer(node_id, LayerType::L1, &l1_content, l1_tokens, None)
1023 .await?;
1024 }
1025
1026 Ok(())
1027 }
1028
1029 pub async fn get_layer_content(
1030 &self,
1031 node_id: &str,
1032 layer_type: LayerType,
1033 ) -> MemoryResult<Option<String>> {
1034 let layer = self.db.get_layer(node_id, layer_type).await?;
1035 Ok(layer.map(|l| l.content))
1036 }
1037
1038 pub async fn store_content_with_layers_auto(
1039 &self,
1040 uri: &str,
1041 content: &str,
1042 metadata: Option<serde_json::Value>,
1043 providers: Option<&ProviderRegistry>,
1044 ) -> MemoryResult<String> {
1045 let node_id = self
1046 .store_content_with_layers(uri, content, metadata)
1047 .await?;
1048
1049 if let Some(p) = providers {
1050 if let Err(e) = self.generate_layers_for_node(&node_id, p).await {
1051 tracing::warn!("Failed to generate layers for node {}: {}", node_id, e);
1052 }
1053 }
1054
1055 Ok(node_id)
1056 }
1057
1058 pub async fn run_cleanup(&self, project_id: Option<&str>) -> MemoryResult<u64> {
1060 let mut total_cleaned = 0u64;
1061
1062 if let Some(pid) = project_id {
1063 let config = self.db.get_or_create_config(pid).await?;
1065
1066 if config.auto_cleanup {
1067 let cleaned = self
1069 .db
1070 .cleanup_old_sessions(config.session_retention_days)
1071 .await?;
1072 total_cleaned += cleaned;
1073
1074 if cleaned > 0 {
1075 self.db
1076 .log_cleanup(
1077 "auto",
1078 MemoryTier::Session,
1079 Some(pid),
1080 None,
1081 cleaned as i64,
1082 0,
1083 )
1084 .await?;
1085 }
1086 }
1087 } else {
1088 let cleaned = self.db.cleanup_old_sessions(30).await?;
1092 total_cleaned += cleaned;
1093 }
1094
1095 if total_cleaned > 100 {
1097 self.db.vacuum().await?;
1098 }
1099
1100 Ok(total_cleaned)
1101 }
1102
1103 async fn maybe_cleanup(&self, project_id: &Option<String>) -> MemoryResult<()> {
1105 if let Some(pid) = project_id {
1106 let stats = self.db.get_stats().await?;
1107 let config = self.db.get_or_create_config(pid).await?;
1108
1109 if stats.project_chunks > config.max_chunks {
1111 let excess = stats.project_chunks - config.max_chunks;
1113 tracing::info!("Project {} has {} excess chunks", pid, excess);
1116 }
1117 }
1118
1119 Ok(())
1120 }
1121
1122 pub async fn get_cleanup_log(&self, _limit: i64) -> MemoryResult<Vec<CleanupLogEntry>> {
1124 Ok(Vec::new())
1127 }
1128
1129 pub fn count_tokens(&self, text: &str) -> usize {
1131 self.tokenizer.count_tokens(text)
1132 }
1133
1134 pub async fn embedding_health(&self) -> EmbeddingHealth {
1136 let service = self.embedding_service.lock().await;
1137 if service.is_available() {
1138 EmbeddingHealth {
1139 status: "ok".to_string(),
1140 reason: None,
1141 }
1142 } else {
1143 EmbeddingHealth {
1144 status: "degraded_disabled".to_string(),
1145 reason: service.disabled_reason().map(ToString::to_string),
1146 }
1147 }
1148 }
1149
1150 pub async fn consolidate_session(
1152 &self,
1153 session_id: &str,
1154 project_id: Option<&str>,
1155 providers: &ProviderRegistry,
1156 config: &MemoryConsolidationConfig,
1157 ) -> MemoryResult<Option<String>> {
1158 if !config.enabled {
1159 return Ok(None);
1160 }
1161
1162 let chunks = self.db.get_session_chunks(session_id).await?;
1163 if chunks.is_empty() {
1164 return Ok(None);
1165 }
1166
1167 let mut text_parts = Vec::new();
1169 for chunk in &chunks {
1170 text_parts.push(chunk.content.clone());
1171 }
1172 let full_text = text_parts.join("\n\n---\n\n");
1173
1174 let prompt = format!(
1176 "Please provide a concise but comprehensive summary of the following chat session. \
1177 Focus on the key decisions, technical details, code changes, and unresolved issues. \
1178 Do NOT include conversational filler, greetings, or sign-offs. \
1179 This summary will be used as long-term memory to recall the context of this work.\n\n\
1180 Session transcripts:\n\n{}",
1181 full_text
1182 );
1183
1184 let provider_override = config.provider.as_deref().filter(|s| !s.is_empty());
1185 let model_override = config.model.as_deref().filter(|s| !s.is_empty());
1186
1187 let summary_text = match providers
1188 .complete_cheapest(&prompt, provider_override, model_override)
1189 .await
1190 {
1191 Ok(s) => s,
1192 Err(e) => {
1193 tracing::warn!("Memory consolidation LLM failed for session {session_id}: {e}");
1194 return Ok(None);
1195 }
1196 };
1197
1198 if summary_text.trim().is_empty() {
1199 return Ok(None);
1200 }
1201
1202 let embedding = {
1204 let service = self.embedding_service.lock().await;
1205 service
1206 .embed(&summary_text)
1207 .await
1208 .map_err(|e| crate::types::MemoryError::Embedding(e.to_string()))?
1209 };
1210
1211 let chunk_id = uuid::Uuid::new_v4().to_string();
1213 let chunk = MemoryChunk {
1214 id: chunk_id,
1215 content: summary_text.clone(),
1216 tier: MemoryTier::Project,
1217 session_id: None, project_id: project_id.map(ToString::to_string),
1219 created_at: Utc::now(),
1220 source: "consolidation".to_string(),
1221 token_count: self.count_tokens(&summary_text) as i64,
1222 source_path: None,
1223 source_mtime: None,
1224 source_size: None,
1225 source_hash: None,
1226 metadata: None,
1227 };
1228
1229 self.db.store_chunk(&chunk, &embedding).await?;
1230
1231 self.db.clear_session_memory(session_id).await?;
1233
1234 tracing::info!(
1235 "Session {session_id} consolidated into summary chunk. Original chunks cleared."
1236 );
1237
1238 Ok(Some(summary_text))
1239 }
1240}
1241
1242pub async fn create_memory_manager(app_data_dir: &Path) -> MemoryResult<MemoryManager> {
1244 let db_path = app_data_dir.join("tandem_memory.db");
1245 MemoryManager::new(&db_path).await
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250 use super::*;
1251 use tandem_orchestrator::{
1252 KnowledgeBinding, KnowledgePreflightRequest, KnowledgeReuseDecision, KnowledgeReuseMode,
1253 KnowledgeScope, KnowledgeTrustLevel,
1254 };
1255 use tempfile::TempDir;
1256
1257 fn is_embeddings_disabled(err: &crate::types::MemoryError) -> bool {
1258 matches!(err, crate::types::MemoryError::Embedding(msg) if msg.to_ascii_lowercase().contains("embeddings disabled"))
1259 }
1260
1261 async fn setup_test_manager() -> (MemoryManager, TempDir) {
1262 let temp_dir = TempDir::new().unwrap();
1263 let db_path = temp_dir.path().join("test_memory.db");
1264 let manager = MemoryManager::new(&db_path).await.unwrap();
1265 (manager, temp_dir)
1266 }
1267
1268 #[tokio::test]
1269 async fn test_store_and_search() {
1270 let (manager, _temp) = setup_test_manager().await;
1271
1272 let request = StoreMessageRequest {
1273 content: "This is a test message about artificial intelligence and machine learning."
1274 .to_string(),
1275 tier: MemoryTier::Project,
1276 session_id: Some("session-1".to_string()),
1277 project_id: Some("project-1".to_string()),
1278 source: "user_message".to_string(),
1279 source_path: None,
1280 source_mtime: None,
1281 source_size: None,
1282 source_hash: None,
1283 metadata: None,
1284 };
1285
1286 let chunk_ids = match manager.store_message(request).await {
1287 Ok(ids) => ids,
1288 Err(err) if is_embeddings_disabled(&err) => return,
1289 Err(err) => panic!("store_message failed: {err}"),
1290 };
1291 assert!(!chunk_ids.is_empty());
1292
1293 let results = manager
1295 .search(
1296 "artificial intelligence",
1297 None,
1298 Some("project-1"),
1299 None,
1300 None,
1301 )
1302 .await;
1303 let results = match results {
1304 Ok(results) => results,
1305 Err(err) if is_embeddings_disabled(&err) => return,
1306 Err(err) => panic!("search failed: {err}"),
1307 };
1308
1309 assert!(!results.is_empty());
1310 assert!(results[0].similarity >= 0.0);
1312 }
1313
1314 #[tokio::test]
1315 async fn test_retrieve_context() {
1316 let (manager, _temp) = setup_test_manager().await;
1317
1318 let request = StoreMessageRequest {
1320 content: "The project uses React and TypeScript for the frontend.".to_string(),
1321 tier: MemoryTier::Project,
1322 session_id: None,
1323 project_id: Some("project-1".to_string()),
1324 source: "assistant_response".to_string(),
1325 source_path: None,
1326 source_mtime: None,
1327 source_size: None,
1328 source_hash: None,
1329 metadata: None,
1330 };
1331 match manager.store_message(request).await {
1332 Ok(_) => {}
1333 Err(err) if is_embeddings_disabled(&err) => return,
1334 Err(err) => panic!("store_message failed: {err}"),
1335 }
1336
1337 let context = manager
1338 .retrieve_context("What technologies are used?", Some("project-1"), None, None)
1339 .await;
1340 let context = match context {
1341 Ok(context) => context,
1342 Err(err) if is_embeddings_disabled(&err) => return,
1343 Err(err) => panic!("retrieve_context failed: {err}"),
1344 };
1345
1346 assert!(!context.project_facts.is_empty());
1347 }
1348
1349 #[tokio::test]
1350 async fn test_retrieve_context_with_meta() {
1351 let (manager, _temp) = setup_test_manager().await;
1352
1353 let request = StoreMessageRequest {
1354 content: "The backend uses Rust and sqlite-vec for retrieval.".to_string(),
1355 tier: MemoryTier::Project,
1356 session_id: None,
1357 project_id: Some("project-1".to_string()),
1358 source: "assistant_response".to_string(),
1359 source_path: None,
1360 source_mtime: None,
1361 source_size: None,
1362 source_hash: None,
1363 metadata: None,
1364 };
1365 match manager.store_message(request).await {
1366 Ok(_) => {}
1367 Err(err) if is_embeddings_disabled(&err) => return,
1368 Err(err) => panic!("store_message failed: {err}"),
1369 }
1370
1371 let result = manager
1372 .retrieve_context_with_meta("What does the backend use?", Some("project-1"), None, None)
1373 .await;
1374 let (context, meta) = match result {
1375 Ok(v) => v,
1376 Err(err) if is_embeddings_disabled(&err) => return,
1377 Err(err) => panic!("retrieve_context_with_meta failed: {err}"),
1378 };
1379
1380 assert!(meta.chunks_total > 0);
1381 assert!(meta.used);
1382 assert_eq!(
1383 meta.chunks_total,
1384 context.current_session.len()
1385 + context.relevant_history.len()
1386 + context.project_facts.len()
1387 );
1388 assert!(meta.score_min.is_some());
1389 assert!(meta.score_max.is_some());
1390 }
1391
1392 #[tokio::test]
1393 async fn test_config_management() {
1394 let (manager, _temp) = setup_test_manager().await;
1395
1396 let config = manager.get_config("project-1").await.unwrap();
1397 assert_eq!(config.max_chunks, 10000);
1398
1399 let new_config = MemoryConfig {
1400 max_chunks: 5000,
1401 retrieval_k: 10,
1402 ..Default::default()
1403 };
1404
1405 manager.set_config("project-1", &new_config).await.unwrap();
1406
1407 let updated = manager.get_config("project-1").await.unwrap();
1408 assert_eq!(updated.max_chunks, 5000);
1409 assert_eq!(updated.retrieval_k, 10);
1410 }
1411
1412 #[tokio::test]
1413 async fn test_knowledge_registry_roundtrip_via_manager() {
1414 let (manager, _temp) = setup_test_manager().await;
1415 let now = chrono::Utc::now().timestamp_millis() as u64;
1416
1417 let space = KnowledgeSpaceRecord {
1418 id: "space-1".to_string(),
1419 scope: tandem_orchestrator::KnowledgeScope::Project,
1420 project_id: Some("project-1".to_string()),
1421 namespace: Some("engineering/debugging".to_string()),
1422 title: Some("Engineering debugging".to_string()),
1423 description: Some("Reusable debugging guidance".to_string()),
1424 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
1425 metadata: Some(serde_json::json!({"owner":"eng"})),
1426 created_at_ms: now,
1427 updated_at_ms: now,
1428 };
1429 manager.upsert_knowledge_space(&space).await.unwrap();
1430
1431 let item = KnowledgeItemRecord {
1432 id: "item-1".to_string(),
1433 space_id: "space-1".to_string(),
1434 coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
1435 dedupe_key: "item-1-dedupe".to_string(),
1436 item_type: "decision".to_string(),
1437 title: "Delay startup-dependent retries".to_string(),
1438 summary: Some("Retry only after startup has completed.".to_string()),
1439 payload: serde_json::json!({"action":"delay_retry"}),
1440 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
1441 status: crate::types::KnowledgeItemStatus::Promoted,
1442 run_id: Some("run-1".to_string()),
1443 artifact_refs: vec!["artifact://run-1/startup-note".to_string()],
1444 source_memory_ids: vec!["memory-1".to_string()],
1445 freshness_expires_at_ms: Some(now + 86_400_000),
1446 metadata: Some(serde_json::json!({"source_kind":"run"})),
1447 created_at_ms: now,
1448 updated_at_ms: now,
1449 };
1450 manager.upsert_knowledge_item(&item).await.unwrap();
1451
1452 let loaded_space = manager
1453 .get_knowledge_space("space-1")
1454 .await
1455 .unwrap()
1456 .unwrap();
1457 assert_eq!(
1458 loaded_space.namespace.as_deref(),
1459 Some("engineering/debugging")
1460 );
1461
1462 let loaded_item = manager.get_knowledge_item("item-1").await.unwrap().unwrap();
1463 assert_eq!(loaded_item.space_id, "space-1");
1464 assert_eq!(
1465 loaded_item.coverage_key,
1466 "project-1::engineering/debugging::startup::race"
1467 );
1468
1469 let items = manager
1470 .list_knowledge_items(
1471 "space-1",
1472 Some("project-1::engineering/debugging::startup::race"),
1473 )
1474 .await
1475 .unwrap();
1476 assert_eq!(items.len(), 1);
1477
1478 let coverage = KnowledgeCoverageRecord {
1479 coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
1480 space_id: "space-1".to_string(),
1481 latest_item_id: Some("item-1".to_string()),
1482 latest_dedupe_key: Some("item-1-dedupe".to_string()),
1483 last_seen_at_ms: now,
1484 last_promoted_at_ms: Some(now),
1485 freshness_expires_at_ms: Some(now + 86_400_000),
1486 metadata: Some(serde_json::json!({"reuse_reason":"same issue"})),
1487 };
1488 manager.upsert_knowledge_coverage(&coverage).await.unwrap();
1489
1490 let loaded_coverage = manager
1491 .get_knowledge_coverage("project-1::engineering/debugging::startup::race", "space-1")
1492 .await
1493 .unwrap()
1494 .unwrap();
1495 assert_eq!(loaded_coverage.space_id, "space-1");
1496 assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
1497 }
1498
1499 #[tokio::test]
1500 async fn test_knowledge_promotion_via_manager() {
1501 let (manager, _temp) = setup_test_manager().await;
1502 let now = chrono::Utc::now().timestamp_millis() as u64;
1503
1504 let space = KnowledgeSpaceRecord {
1505 id: "space-2".to_string(),
1506 scope: tandem_orchestrator::KnowledgeScope::Project,
1507 project_id: Some("project-2".to_string()),
1508 namespace: Some("ops/runbooks".to_string()),
1509 title: Some("Ops runbooks".to_string()),
1510 description: Some("Reusable operational guidance".to_string()),
1511 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
1512 metadata: None,
1513 created_at_ms: now,
1514 updated_at_ms: now,
1515 };
1516 manager.upsert_knowledge_space(&space).await.unwrap();
1517
1518 let item = KnowledgeItemRecord {
1519 id: "item-2".to_string(),
1520 space_id: space.id.clone(),
1521 coverage_key: "project-2::ops/runbooks::restarts::stale-service".to_string(),
1522 dedupe_key: "dedupe-2".to_string(),
1523 item_type: "runbook".to_string(),
1524 title: "Restart stale service".to_string(),
1525 summary: Some("Restart the service before retrying.".to_string()),
1526 payload: serde_json::json!({"action":"restart"}),
1527 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Working,
1528 status: crate::types::KnowledgeItemStatus::Working,
1529 run_id: Some("run-2".to_string()),
1530 artifact_refs: vec!["artifact://run-2/runbook".to_string()],
1531 source_memory_ids: vec!["memory-2".to_string()],
1532 freshness_expires_at_ms: None,
1533 metadata: None,
1534 created_at_ms: now,
1535 updated_at_ms: now,
1536 };
1537 manager.upsert_knowledge_item(&item).await.unwrap();
1538
1539 let result = manager
1540 .promote_knowledge_item(&crate::types::KnowledgePromotionRequest {
1541 item_id: item.id.clone(),
1542 target_status: crate::types::KnowledgeItemStatus::Promoted,
1543 promoted_at_ms: now + 5,
1544 freshness_expires_at_ms: Some(now + 86_400_000),
1545 reviewer_id: None,
1546 approval_id: None,
1547 reason: Some("manager wrapper".to_string()),
1548 })
1549 .await
1550 .unwrap()
1551 .expect("promotion result");
1552 assert_eq!(
1553 result.item.status,
1554 crate::types::KnowledgeItemStatus::Promoted
1555 );
1556 assert_eq!(result.coverage.latest_item_id.as_deref(), Some("item-2"));
1557 }
1558
1559 #[tokio::test]
1560 async fn test_preflight_knowledge_disabled() {
1561 let (manager, _temp) = setup_test_manager().await;
1562
1563 let request = KnowledgePreflightRequest {
1564 project_id: "project-1".to_string(),
1565 task_family: "debugging".to_string(),
1566 subject: "startup race".to_string(),
1567 binding: KnowledgeBinding {
1568 enabled: false,
1569 ..Default::default()
1570 },
1571 };
1572
1573 let result = manager.preflight_knowledge(&request).await.unwrap();
1574 assert_eq!(result.decision, KnowledgeReuseDecision::Disabled);
1575 assert!(result.skip_reason.is_some());
1576 }
1577
1578 #[tokio::test]
1579 async fn test_preflight_knowledge_reuses_promoted_item() {
1580 let (manager, _temp) = setup_test_manager().await;
1581 let now = chrono::Utc::now().timestamp_millis() as u64;
1582
1583 let space = KnowledgeSpaceRecord {
1584 id: "space-preflight-1".to_string(),
1585 scope: KnowledgeScope::Project,
1586 project_id: Some("project-1".to_string()),
1587 namespace: Some("engineering/debugging".to_string()),
1588 title: Some("Engineering debugging".to_string()),
1589 description: Some("Reusable debugging guidance".to_string()),
1590 trust_level: KnowledgeTrustLevel::Promoted,
1591 metadata: None,
1592 created_at_ms: now,
1593 updated_at_ms: now,
1594 };
1595 manager.upsert_knowledge_space(&space).await.unwrap();
1596
1597 let item = KnowledgeItemRecord {
1598 id: "item-preflight-1".to_string(),
1599 space_id: space.id.clone(),
1600 coverage_key: tandem_orchestrator::build_knowledge_coverage_key(
1601 "project-1",
1602 Some("engineering/debugging"),
1603 "startup",
1604 "race",
1605 ),
1606 dedupe_key: "dedupe-preflight-1".to_string(),
1607 item_type: "decision".to_string(),
1608 title: "Delay startup-dependent retries".to_string(),
1609 summary: Some("Retry after startup completes.".to_string()),
1610 payload: serde_json::json!({"action":"delay_retry"}),
1611 trust_level: KnowledgeTrustLevel::Promoted,
1612 status: crate::types::KnowledgeItemStatus::Promoted,
1613 run_id: Some("run-1".to_string()),
1614 artifact_refs: vec!["artifact://run-1/debug".to_string()],
1615 source_memory_ids: vec![],
1616 freshness_expires_at_ms: Some(now + 86_400_000),
1617 metadata: None,
1618 created_at_ms: now,
1619 updated_at_ms: now,
1620 };
1621 manager.upsert_knowledge_item(&item).await.unwrap();
1622
1623 let request = KnowledgePreflightRequest {
1624 project_id: "project-1".to_string(),
1625 task_family: "startup".to_string(),
1626 subject: "race".to_string(),
1627 binding: KnowledgeBinding {
1628 namespace: Some("engineering/debugging".to_string()),
1629 freshness_ms: Some(10_000),
1630 ..Default::default()
1631 },
1632 };
1633
1634 let result = manager.preflight_knowledge(&request).await.unwrap();
1635 assert_eq!(result.decision, KnowledgeReuseDecision::ReusePromoted);
1636 assert_eq!(result.items.len(), 1);
1637 assert!(result.reuse_reason.is_some());
1638 }
1639
1640 #[tokio::test]
1641 async fn test_preflight_knowledge_stale_requires_refresh() {
1642 let (manager, _temp) = setup_test_manager().await;
1643 let now = chrono::Utc::now().timestamp_millis() as u64;
1644
1645 let space = KnowledgeSpaceRecord {
1646 id: "space-preflight-2".to_string(),
1647 scope: KnowledgeScope::Project,
1648 project_id: Some("project-1".to_string()),
1649 namespace: Some("ops/runbooks".to_string()),
1650 title: Some("Ops runbooks".to_string()),
1651 description: Some("Reusable ops guidance".to_string()),
1652 trust_level: KnowledgeTrustLevel::Promoted,
1653 metadata: None,
1654 created_at_ms: now,
1655 updated_at_ms: now,
1656 };
1657 manager.upsert_knowledge_space(&space).await.unwrap();
1658
1659 let item = KnowledgeItemRecord {
1660 id: "item-preflight-2".to_string(),
1661 space_id: space.id.clone(),
1662 coverage_key: tandem_orchestrator::build_knowledge_coverage_key(
1663 "project-1",
1664 Some("ops/runbooks"),
1665 "restart",
1666 "stale service",
1667 ),
1668 dedupe_key: "dedupe-preflight-2".to_string(),
1669 item_type: "runbook".to_string(),
1670 title: "Restart stale service".to_string(),
1671 summary: Some("Restart and verify health.".to_string()),
1672 payload: serde_json::json!({"action":"restart"}),
1673 trust_level: KnowledgeTrustLevel::Promoted,
1674 status: crate::types::KnowledgeItemStatus::Promoted,
1675 run_id: Some("run-2".to_string()),
1676 artifact_refs: vec![],
1677 source_memory_ids: vec![],
1678 freshness_expires_at_ms: Some(now - 1000),
1679 metadata: None,
1680 created_at_ms: now,
1681 updated_at_ms: now,
1682 };
1683 manager.upsert_knowledge_item(&item).await.unwrap();
1684
1685 let request = KnowledgePreflightRequest {
1686 project_id: "project-1".to_string(),
1687 task_family: "restart".to_string(),
1688 subject: "stale service".to_string(),
1689 binding: KnowledgeBinding {
1690 namespace: Some("ops/runbooks".to_string()),
1691 freshness_ms: Some(10_000),
1692 ..Default::default()
1693 },
1694 };
1695
1696 let result = manager.preflight_knowledge(&request).await.unwrap();
1697 assert_eq!(result.decision, KnowledgeReuseDecision::RefreshRequired);
1698 assert!(result.freshness_reason.is_some());
1699 assert!(!result.items.is_empty());
1700 assert!(!result.is_reusable());
1701 }
1702
1703 #[tokio::test]
1704 async fn test_preflight_knowledge_no_prior_knowledge() {
1705 let (manager, _temp) = setup_test_manager().await;
1706
1707 let request = KnowledgePreflightRequest {
1708 project_id: "project-1".to_string(),
1709 task_family: "support".to_string(),
1710 subject: "triage".to_string(),
1711 binding: KnowledgeBinding {
1712 reuse_mode: KnowledgeReuseMode::Preflight,
1713 ..Default::default()
1714 },
1715 };
1716
1717 let result = manager.preflight_knowledge(&request).await.unwrap();
1718 assert_eq!(result.decision, KnowledgeReuseDecision::NoPriorKnowledge);
1719 assert!(result.skip_reason.is_some());
1720 }
1721
1722 #[tokio::test]
1723 async fn test_preflight_knowledge_requires_explicit_namespace_when_project_has_many() {
1724 let (manager, _temp) = setup_test_manager().await;
1725 let now = chrono::Utc::now().timestamp_millis() as u64;
1726
1727 let spaces = [
1728 ("space-alpha", "engineering/debugging", "Delay retries"),
1729 ("space-beta", "ops/runbooks", "Restart safely"),
1730 ];
1731 for (id, namespace, title) in spaces {
1732 manager
1733 .upsert_knowledge_space(&KnowledgeSpaceRecord {
1734 id: id.to_string(),
1735 scope: KnowledgeScope::Project,
1736 project_id: Some("project-1".to_string()),
1737 namespace: Some(namespace.to_string()),
1738 title: Some(title.to_string()),
1739 description: None,
1740 trust_level: KnowledgeTrustLevel::Promoted,
1741 metadata: None,
1742 created_at_ms: now,
1743 updated_at_ms: now,
1744 })
1745 .await
1746 .unwrap();
1747 }
1748
1749 let result = manager
1750 .preflight_knowledge(&KnowledgePreflightRequest {
1751 project_id: "project-1".to_string(),
1752 task_family: "debugging".to_string(),
1753 subject: "startup race".to_string(),
1754 binding: KnowledgeBinding::default(),
1755 })
1756 .await
1757 .unwrap();
1758
1759 assert_eq!(result.decision, KnowledgeReuseDecision::NoPriorKnowledge);
1760 assert!(result.items.is_empty());
1761 assert!(result
1762 .skip_reason
1763 .as_deref()
1764 .is_some_and(|reason| reason.contains("no reusable knowledge spaces")));
1765 }
1766
1767 #[tokio::test]
1768 async fn test_preflight_knowledge_infers_single_project_namespace() {
1769 let (manager, _temp) = setup_test_manager().await;
1770 let now = chrono::Utc::now().timestamp_millis() as u64;
1771
1772 let space = KnowledgeSpaceRecord {
1773 id: "space-single-namespace".to_string(),
1774 scope: KnowledgeScope::Project,
1775 project_id: Some("project-1".to_string()),
1776 namespace: Some("engineering/debugging".to_string()),
1777 title: Some("Engineering debugging".to_string()),
1778 description: None,
1779 trust_level: KnowledgeTrustLevel::Promoted,
1780 metadata: None,
1781 created_at_ms: now,
1782 updated_at_ms: now,
1783 };
1784 manager.upsert_knowledge_space(&space).await.unwrap();
1785
1786 let item = KnowledgeItemRecord {
1787 id: "item-single-namespace".to_string(),
1788 space_id: space.id.clone(),
1789 coverage_key: tandem_orchestrator::build_knowledge_coverage_key(
1790 "project-1",
1791 Some("engineering/debugging"),
1792 "debugging",
1793 "startup race",
1794 ),
1795 dedupe_key: "dedupe-single-namespace".to_string(),
1796 item_type: "decision".to_string(),
1797 title: "Delay startup retries".to_string(),
1798 summary: Some("Wait for startup completion.".to_string()),
1799 payload: serde_json::json!({"action":"delay_retry"}),
1800 trust_level: KnowledgeTrustLevel::Promoted,
1801 status: crate::types::KnowledgeItemStatus::Promoted,
1802 run_id: Some("run-single-namespace".to_string()),
1803 artifact_refs: vec![],
1804 source_memory_ids: vec![],
1805 freshness_expires_at_ms: Some(now + 86_400_000),
1806 metadata: None,
1807 created_at_ms: now,
1808 updated_at_ms: now,
1809 };
1810 manager.upsert_knowledge_item(&item).await.unwrap();
1811
1812 let result = manager
1813 .preflight_knowledge(&KnowledgePreflightRequest {
1814 project_id: "project-1".to_string(),
1815 task_family: "debugging".to_string(),
1816 subject: "startup race".to_string(),
1817 binding: KnowledgeBinding::default(),
1818 })
1819 .await
1820 .unwrap();
1821
1822 assert_eq!(result.decision, KnowledgeReuseDecision::ReusePromoted);
1823 assert_eq!(result.namespace.as_deref(), Some("engineering/debugging"));
1824 assert_eq!(result.items.len(), 1);
1825 }
1826
1827 #[tokio::test]
1828 async fn test_knowledge_preflight_disabled_binding_returns_disabled() {
1829 let (manager, _temp) = setup_test_manager().await;
1830 let result = manager
1831 .preflight_knowledge(&KnowledgePreflightRequest {
1832 project_id: "project-1".to_string(),
1833 task_family: "debugging".to_string(),
1834 subject: "startup race".to_string(),
1835 binding: tandem_orchestrator::KnowledgeBinding {
1836 enabled: false,
1837 ..Default::default()
1838 },
1839 })
1840 .await
1841 .unwrap();
1842 assert_eq!(
1843 result.decision,
1844 tandem_orchestrator::KnowledgeReuseDecision::Disabled
1845 );
1846 assert!(result.items.is_empty());
1847 assert!(result
1848 .skip_reason
1849 .as_deref()
1850 .is_some_and(|reason| reason.contains("disabled")));
1851 }
1852
1853 #[tokio::test]
1854 async fn test_knowledge_preflight_fresh_item_is_reusable() {
1855 let (manager, _temp) = setup_test_manager().await;
1856 let now = chrono::Utc::now().timestamp_millis() as u64;
1857
1858 let space = KnowledgeSpaceRecord {
1859 id: "space-preflight-1".to_string(),
1860 scope: tandem_orchestrator::KnowledgeScope::Project,
1861 project_id: Some("project-1".to_string()),
1862 namespace: Some("engineering/debugging".to_string()),
1863 title: Some("Engineering debugging".to_string()),
1864 description: None,
1865 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
1866 metadata: None,
1867 created_at_ms: now,
1868 updated_at_ms: now,
1869 };
1870 manager.upsert_knowledge_space(&space).await.unwrap();
1871
1872 let item = KnowledgeItemRecord {
1873 id: "item-preflight-1".to_string(),
1874 space_id: space.id.clone(),
1875 coverage_key: tandem_orchestrator::build_knowledge_coverage_key(
1876 "project-1",
1877 Some("engineering/debugging"),
1878 "debugging",
1879 "startup race",
1880 ),
1881 dedupe_key: "dedupe-preflight-1".to_string(),
1882 item_type: "decision".to_string(),
1883 title: "Delay startup retries".to_string(),
1884 summary: Some("Wait for startup completion before retrying.".to_string()),
1885 payload: serde_json::json!({"action":"delay_retry"}),
1886 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
1887 status: crate::types::KnowledgeItemStatus::Promoted,
1888 run_id: Some("run-preflight-1".to_string()),
1889 artifact_refs: vec!["artifact://run-preflight-1/report".to_string()],
1890 source_memory_ids: vec!["memory-preflight-1".to_string()],
1891 freshness_expires_at_ms: Some(now + 86_400_000),
1892 metadata: None,
1893 created_at_ms: now,
1894 updated_at_ms: now,
1895 };
1896 manager.upsert_knowledge_item(&item).await.unwrap();
1897
1898 let coverage = KnowledgeCoverageRecord {
1899 coverage_key: item.coverage_key.clone(),
1900 space_id: space.id.clone(),
1901 latest_item_id: Some(item.id.clone()),
1902 latest_dedupe_key: Some(item.dedupe_key.clone()),
1903 last_seen_at_ms: now,
1904 last_promoted_at_ms: Some(now),
1905 freshness_expires_at_ms: Some(now + 86_400_000),
1906 metadata: None,
1907 };
1908 manager.upsert_knowledge_coverage(&coverage).await.unwrap();
1909
1910 let result = manager
1911 .preflight_knowledge(&KnowledgePreflightRequest {
1912 project_id: "project-1".to_string(),
1913 task_family: "debugging".to_string(),
1914 subject: "startup race".to_string(),
1915 binding: tandem_orchestrator::KnowledgeBinding {
1916 namespace: Some("engineering/debugging".to_string()),
1917 ..Default::default()
1918 },
1919 })
1920 .await
1921 .unwrap();
1922 assert_eq!(
1923 result.decision,
1924 tandem_orchestrator::KnowledgeReuseDecision::ReusePromoted
1925 );
1926 assert!(result.is_reusable());
1927 assert!(!result.items.is_empty());
1928 assert!(result
1929 .reuse_reason
1930 .as_deref()
1931 .is_some_and(|reason| reason.contains("reusing")));
1932 }
1933
1934 #[tokio::test]
1935 async fn test_knowledge_preflight_stale_item_requests_refresh() {
1936 let (manager, _temp) = setup_test_manager().await;
1937 let now = chrono::Utc::now().timestamp_millis() as u64;
1938
1939 let space = KnowledgeSpaceRecord {
1940 id: "space-preflight-2".to_string(),
1941 scope: tandem_orchestrator::KnowledgeScope::Project,
1942 project_id: Some("project-2".to_string()),
1943 namespace: Some("support/runbooks".to_string()),
1944 title: Some("Support runbooks".to_string()),
1945 description: None,
1946 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
1947 metadata: None,
1948 created_at_ms: now,
1949 updated_at_ms: now,
1950 };
1951 manager.upsert_knowledge_space(&space).await.unwrap();
1952
1953 let item = KnowledgeItemRecord {
1954 id: "item-preflight-2".to_string(),
1955 space_id: space.id.clone(),
1956 coverage_key: tandem_orchestrator::build_knowledge_coverage_key(
1957 "project-2",
1958 Some("support/runbooks"),
1959 "runbooks",
1960 "restart service",
1961 ),
1962 dedupe_key: "dedupe-preflight-2".to_string(),
1963 item_type: "runbook".to_string(),
1964 title: "Restart stale service".to_string(),
1965 summary: Some("Restart before retrying.".to_string()),
1966 payload: serde_json::json!({"action":"restart"}),
1967 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
1968 status: crate::types::KnowledgeItemStatus::Promoted,
1969 run_id: Some("run-preflight-2".to_string()),
1970 artifact_refs: vec![],
1971 source_memory_ids: vec![],
1972 freshness_expires_at_ms: Some(now.saturating_sub(1)),
1973 metadata: None,
1974 created_at_ms: now.saturating_sub(86_400_000),
1975 updated_at_ms: now,
1976 };
1977 manager.upsert_knowledge_item(&item).await.unwrap();
1978
1979 let coverage = KnowledgeCoverageRecord {
1980 coverage_key: item.coverage_key.clone(),
1981 space_id: space.id.clone(),
1982 latest_item_id: Some(item.id.clone()),
1983 latest_dedupe_key: Some(item.dedupe_key.clone()),
1984 last_seen_at_ms: now,
1985 last_promoted_at_ms: Some(now.saturating_sub(1)),
1986 freshness_expires_at_ms: Some(now.saturating_sub(1)),
1987 metadata: None,
1988 };
1989 manager.upsert_knowledge_coverage(&coverage).await.unwrap();
1990
1991 let result = manager
1992 .preflight_knowledge(&KnowledgePreflightRequest {
1993 project_id: "project-2".to_string(),
1994 task_family: "runbooks".to_string(),
1995 subject: "restart service".to_string(),
1996 binding: tandem_orchestrator::KnowledgeBinding {
1997 namespace: Some("support/runbooks".to_string()),
1998 freshness_ms: Some(86_400_000),
1999 ..Default::default()
2000 },
2001 })
2002 .await
2003 .unwrap();
2004 assert_eq!(
2005 result.decision,
2006 tandem_orchestrator::KnowledgeReuseDecision::RefreshRequired
2007 );
2008 assert!(!result.is_reusable());
2009 assert!(result.items.is_empty() || result.freshness_reason.is_some());
2010 assert!(result
2011 .freshness_reason
2012 .as_deref()
2013 .is_some_and(|reason| reason.contains("expired") || reason.contains("freshness")));
2014 }
2015
2016 #[tokio::test]
2017 async fn test_knowledge_preflight_no_prior_knowledge_returns_no_prior() {
2018 let (manager, _temp) = setup_test_manager().await;
2019 let result = manager
2020 .preflight_knowledge(&KnowledgePreflightRequest {
2021 project_id: "project-3".to_string(),
2022 task_family: "ops".to_string(),
2023 subject: "incident triage".to_string(),
2024 binding: Default::default(),
2025 })
2026 .await
2027 .unwrap();
2028 assert_eq!(
2029 result.decision,
2030 tandem_orchestrator::KnowledgeReuseDecision::NoPriorKnowledge
2031 );
2032 assert!(result.items.is_empty());
2033 assert!(result
2034 .skip_reason
2035 .as_deref()
2036 .is_some_and(|reason| reason.contains("no active promoted knowledge")));
2037 }
2038}