1#![allow(clippy::all)]
2
3use crate::types::{
7 ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
8 KnowledgeCoverageRecord, KnowledgeItemRecord, KnowledgeItemStatus, KnowledgePromotionRequest,
9 KnowledgePromotionResult, KnowledgeSpaceRecord, MemoryChunk, MemoryConfig, MemoryError,
10 MemoryResult, MemoryStats, MemoryTenantScope, MemoryTier, ProjectMemoryStats,
11 DEFAULT_EMBEDDING_DIMENSION,
12};
13use chrono::{DateTime, Utc};
14use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
15use sqlite_vec::sqlite3_vec_init;
16use std::collections::HashSet;
17use std::path::Path;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::sync::Mutex;
21
22type ProjectIndexStatusRow = (
23 Option<String>,
24 Option<i64>,
25 Option<i64>,
26 Option<i64>,
27 Option<i64>,
28 Option<i64>,
29);
30
31pub struct MemoryDatabase {
33 conn: Arc<Mutex<Connection>>,
34 db_path: std::path::PathBuf,
35}
36
37include!("memory_database_impl_parts/part01.rs");
38include!("memory_database_impl_parts/part02.rs");
39
40fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
42 let id: String = row.get(0)?;
43 let content: String = row.get(1)?;
44 let (session_id, project_id, source_idx, created_at_idx, token_count_idx, metadata_idx) =
45 match tier {
46 MemoryTier::Session => (
47 Some(row.get(2)?),
48 row.get(3)?,
49 4usize,
50 5usize,
51 6usize,
52 7usize,
53 ),
54 MemoryTier::Project => (
55 row.get(2)?,
56 Some(row.get(3)?),
57 4usize,
58 5usize,
59 6usize,
60 7usize,
61 ),
62 MemoryTier::Global => (None, None, 2usize, 3usize, 4usize, 5usize),
63 };
64
65 let source: String = row.get(source_idx)?;
66 let created_at_str: String = row.get(created_at_idx)?;
67 let token_count: i64 = row.get(token_count_idx)?;
68 let metadata_str: Option<String> = row.get(metadata_idx)?;
69
70 let created_at = DateTime::parse_from_rfc3339(&created_at_str)
71 .map_err(|e| {
72 rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
73 })?
74 .with_timezone(&Utc);
75
76 let metadata = metadata_str
77 .filter(|s| !s.is_empty())
78 .and_then(|s| serde_json::from_str(&s).ok());
79
80 let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
81 let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
82 let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
83 let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
84 let tenant_scope = MemoryTenantScope {
85 org_id: row
86 .get::<_, Option<String>>("tenant_org_id")
87 .ok()
88 .flatten()
89 .filter(|value| !value.trim().is_empty())
90 .unwrap_or_else(|| LOCAL_TENANT_ORG_ID.to_string()),
91 workspace_id: row
92 .get::<_, Option<String>>("tenant_workspace_id")
93 .ok()
94 .flatten()
95 .filter(|value| !value.trim().is_empty())
96 .unwrap_or_else(|| LOCAL_TENANT_WORKSPACE_ID.to_string()),
97 deployment_id: row
98 .get::<_, Option<String>>("tenant_deployment_id")
99 .ok()
100 .flatten()
101 .filter(|value| !value.trim().is_empty()),
102 };
103
104 Ok(MemoryChunk {
105 id,
106 content,
107 tier,
108 session_id,
109 project_id,
110 source,
111 source_path,
112 source_mtime,
113 source_size,
114 source_hash,
115 tenant_scope,
116 created_at,
117 token_count,
118 metadata,
119 })
120}
121
122fn require_scope_id<'a>(tier: MemoryTier, scope: Option<&'a str>) -> MemoryResult<&'a str> {
123 scope
124 .filter(|value| !value.trim().is_empty())
125 .ok_or_else(|| {
126 crate::types::MemoryError::InvalidConfig(match tier {
127 MemoryTier::Session => "tier=session requires session_id".to_string(),
128 MemoryTier::Project => "tier=project requires project_id".to_string(),
129 MemoryTier::Global => "tier=global does not require a scope id".to_string(),
130 })
131 })
132}
133
134const LOCAL_TENANT_ORG_ID: &str = "local";
135const LOCAL_TENANT_WORKSPACE_ID: &str = "local";
136
137fn tenant_scope_matches_sql_clause(prefix: &str, first_param: usize) -> String {
138 format!(
139 "{prefix}.tenant_org_id = ?{first_param} AND {prefix}.tenant_workspace_id = ?{} AND IFNULL({prefix}.tenant_deployment_id, '') = IFNULL(?{}, '')",
140 first_param + 1,
141 first_param + 2
142 )
143}
144
145fn global_memory_record_tenant_scope(
146 record: &GlobalMemoryRecord,
147) -> (String, String, Option<String>) {
148 record
149 .provenance
150 .as_ref()
151 .and_then(|value| value.get("tenant_context"))
152 .and_then(memory_tenant_scope_from_value)
153 .unwrap_or_else(|| {
154 (
155 LOCAL_TENANT_ORG_ID.to_string(),
156 LOCAL_TENANT_WORKSPACE_ID.to_string(),
157 None,
158 )
159 })
160}
161
162fn memory_tenant_scope_from_value(
163 value: &serde_json::Value,
164) -> Option<(String, String, Option<String>)> {
165 let org_id = value.get("org_id")?.as_str()?.to_string();
166 let workspace_id = value.get("workspace_id")?.as_str()?.to_string();
167 let deployment_id = value
168 .get("deployment_id")
169 .and_then(|value| value.as_str())
170 .map(ToString::to_string);
171 Some((org_id, workspace_id, deployment_id))
172}
173
174fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
175 let metadata_str: Option<String> = row.get(12)?;
176 let provenance_str: Option<String> = row.get(13)?;
177 Ok(GlobalMemoryRecord {
178 id: row.get(0)?,
179 user_id: row.get(1)?,
180 source_type: row.get(2)?,
181 content: row.get(3)?,
182 content_hash: row.get(4)?,
183 run_id: row.get(5)?,
184 session_id: row.get(6)?,
185 message_id: row.get(7)?,
186 tool_name: row.get(8)?,
187 project_tag: row.get(9)?,
188 channel_tag: row.get(10)?,
189 host_tag: row.get(11)?,
190 metadata: metadata_str
191 .filter(|s| !s.is_empty())
192 .and_then(|s| serde_json::from_str(&s).ok()),
193 provenance: provenance_str
194 .filter(|s| !s.is_empty())
195 .and_then(|s| serde_json::from_str(&s).ok()),
196 redaction_status: row.get(14)?,
197 redaction_count: row.get::<_, i64>(15)? as u32,
198 visibility: row.get(16)?,
199 demoted: row.get::<_, i64>(17)? != 0,
200 score_boost: row.get(18)?,
201 created_at_ms: row.get::<_, i64>(19)? as u64,
202 updated_at_ms: row.get::<_, i64>(20)? as u64,
203 expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
204 })
205}
206
207impl MemoryDatabase {
208 pub async fn get_node_by_uri(
209 &self,
210 uri: &str,
211 ) -> MemoryResult<Option<crate::types::MemoryNode>> {
212 let conn = self.conn.lock().await;
213 let mut stmt = conn.prepare(
214 "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
215 FROM memory_nodes WHERE uri = ?1",
216 )?;
217
218 let result = stmt.query_row(params![uri], |row| {
219 let node_type_str: String = row.get(3)?;
220 let node_type = node_type_str
221 .parse()
222 .unwrap_or(crate::types::NodeType::File);
223 let metadata_str: Option<String> = row.get(6)?;
224 Ok(crate::types::MemoryNode {
225 id: row.get(0)?,
226 uri: row.get(1)?,
227 parent_uri: row.get(2)?,
228 node_type,
229 created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
230 updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
231 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
232 })
233 });
234
235 match result {
236 Ok(node) => Ok(Some(node)),
237 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
238 Err(e) => Err(MemoryError::Database(e)),
239 }
240 }
241
242 pub async fn create_node(
243 &self,
244 uri: &str,
245 parent_uri: Option<&str>,
246 node_type: crate::types::NodeType,
247 metadata: Option<&serde_json::Value>,
248 ) -> MemoryResult<String> {
249 let id = uuid::Uuid::new_v4().to_string();
250 let now = Utc::now().to_rfc3339();
251 let metadata_str = metadata.map(|m| serde_json::to_string(m)).transpose()?;
252
253 let conn = self.conn.lock().await;
254 conn.execute(
255 "INSERT INTO memory_nodes (id, uri, parent_uri, node_type, created_at, updated_at, metadata)
256 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
257 params![id, uri, parent_uri, node_type.to_string(), now, now, metadata_str],
258 )?;
259
260 Ok(id)
261 }
262
263 pub async fn list_directory(&self, uri: &str) -> MemoryResult<Vec<crate::types::MemoryNode>> {
264 let conn = self.conn.lock().await;
265 let mut stmt = conn.prepare(
266 "SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
267 FROM memory_nodes WHERE parent_uri = ?1 ORDER BY node_type DESC, uri ASC",
268 )?;
269
270 let rows = stmt.query_map(params![uri], |row| {
271 let node_type_str: String = row.get(3)?;
272 let node_type = node_type_str
273 .parse()
274 .unwrap_or(crate::types::NodeType::File);
275 let metadata_str: Option<String> = row.get(6)?;
276 Ok(crate::types::MemoryNode {
277 id: row.get(0)?,
278 uri: row.get(1)?,
279 parent_uri: row.get(2)?,
280 node_type,
281 created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
282 updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
283 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
284 })
285 })?;
286
287 rows.collect::<Result<Vec<_>, _>>()
288 .map_err(MemoryError::Database)
289 }
290
291 pub async fn get_layer(
292 &self,
293 node_id: &str,
294 layer_type: crate::types::LayerType,
295 ) -> MemoryResult<Option<crate::types::MemoryLayer>> {
296 let conn = self.conn.lock().await;
297 let mut stmt = conn.prepare(
298 "SELECT id, node_id, layer_type, content, token_count, embedding_id, created_at, source_chunk_id
299 FROM memory_layers WHERE node_id = ?1 AND layer_type = ?2"
300 )?;
301
302 let result = stmt.query_row(params![node_id, layer_type.to_string()], |row| {
303 let layer_type_str: String = row.get(2)?;
304 let layer_type = layer_type_str
305 .parse()
306 .unwrap_or(crate::types::LayerType::L2);
307 Ok(crate::types::MemoryLayer {
308 id: row.get(0)?,
309 node_id: row.get(1)?,
310 layer_type,
311 content: row.get(3)?,
312 token_count: row.get(4)?,
313 embedding_id: row.get(5)?,
314 created_at: row.get::<_, String>(6)?.parse().unwrap_or_default(),
315 source_chunk_id: row.get(7)?,
316 })
317 });
318
319 match result {
320 Ok(layer) => Ok(Some(layer)),
321 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
322 Err(e) => Err(MemoryError::Database(e)),
323 }
324 }
325
326 pub async fn create_layer(
327 &self,
328 node_id: &str,
329 layer_type: crate::types::LayerType,
330 content: &str,
331 token_count: i64,
332 source_chunk_id: Option<&str>,
333 ) -> MemoryResult<String> {
334 let id = uuid::Uuid::new_v4().to_string();
335 let now = Utc::now().to_rfc3339();
336
337 let conn = self.conn.lock().await;
338 conn.execute(
339 "INSERT INTO memory_layers (id, node_id, layer_type, content, token_count, created_at, source_chunk_id)
340 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
341 params![id, node_id, layer_type.to_string(), content, token_count, now, source_chunk_id],
342 )?;
343
344 Ok(id)
345 }
346
347 pub async fn get_children_tree(
348 &self,
349 parent_uri: &str,
350 max_depth: usize,
351 ) -> MemoryResult<Vec<crate::types::TreeNode>> {
352 if max_depth == 0 {
353 return Ok(Vec::new());
354 }
355
356 let children = self.list_directory(parent_uri).await?;
357 let mut tree_nodes = Vec::new();
358
359 for child in children {
360 let layer_summary = self.get_layer_summary(&child.id).await?;
361
362 let grandchildren = if child.node_type == crate::types::NodeType::Directory {
363 Box::pin(self.get_children_tree(&child.uri, max_depth.saturating_sub(1))).await?
364 } else {
365 Vec::new()
366 };
367
368 tree_nodes.push(crate::types::TreeNode {
369 node: child,
370 children: grandchildren,
371 layer_summary,
372 });
373 }
374
375 Ok(tree_nodes)
376 }
377
378 async fn get_layer_summary(
379 &self,
380 node_id: &str,
381 ) -> MemoryResult<Option<crate::types::LayerSummary>> {
382 let l0 = self.get_layer(node_id, crate::types::LayerType::L0).await?;
383 let l1 = self.get_layer(node_id, crate::types::LayerType::L1).await?;
384 let has_l2 = self
385 .get_layer(node_id, crate::types::LayerType::L2)
386 .await?
387 .is_some();
388
389 if l0.is_none() && l1.is_none() && !has_l2 {
390 return Ok(None);
391 }
392
393 Ok(Some(crate::types::LayerSummary {
394 l0_preview: l0.map(|l| truncate_string(&l.content, 100)),
395 l1_preview: l1.map(|l| truncate_string(&l.content, 200)),
396 has_l2,
397 }))
398 }
399
400 pub async fn node_exists(&self, uri: &str) -> MemoryResult<bool> {
401 let conn = self.conn.lock().await;
402 let count: i64 = conn.query_row(
403 "SELECT COUNT(*) FROM memory_nodes WHERE uri = ?1",
404 params![uri],
405 |row| row.get(0),
406 )?;
407 Ok(count > 0)
408 }
409}
410
411fn row_to_knowledge_space(row: &Row) -> Result<KnowledgeSpaceRecord, rusqlite::Error> {
412 let scope = row
413 .get::<_, String>(1)?
414 .parse()
415 .unwrap_or(tandem_orchestrator::KnowledgeScope::Project);
416 let trust_level = row
417 .get::<_, String>(6)?
418 .parse()
419 .unwrap_or(tandem_orchestrator::KnowledgeTrustLevel::Promoted);
420 let metadata = row
421 .get::<_, Option<String>>(7)?
422 .and_then(|raw| serde_json::from_str(&raw).ok());
423 Ok(KnowledgeSpaceRecord {
424 id: row.get(0)?,
425 scope,
426 project_id: row.get(2)?,
427 namespace: row.get(3)?,
428 title: row.get(4)?,
429 description: row.get(5)?,
430 trust_level,
431 metadata,
432 created_at_ms: row.get::<_, i64>(8)? as u64,
433 updated_at_ms: row.get::<_, i64>(9)? as u64,
434 })
435}
436
437fn row_to_knowledge_item(row: &Row) -> Result<KnowledgeItemRecord, rusqlite::Error> {
438 let trust_level = row
439 .get::<_, String>(8)?
440 .parse()
441 .unwrap_or(tandem_orchestrator::KnowledgeTrustLevel::Promoted);
442 let status = row
443 .get::<_, String>(9)?
444 .parse()
445 .unwrap_or(KnowledgeItemStatus::Working);
446 let payload = row
447 .get::<_, String>(7)
448 .ok()
449 .and_then(|raw| serde_json::from_str(&raw).ok())
450 .unwrap_or(serde_json::Value::Null);
451 let artifact_refs = row
452 .get::<_, String>(11)
453 .ok()
454 .and_then(|raw| serde_json::from_str(&raw).ok())
455 .unwrap_or_default();
456 let source_memory_ids = row
457 .get::<_, String>(12)
458 .ok()
459 .and_then(|raw| serde_json::from_str(&raw).ok())
460 .unwrap_or_default();
461 let metadata = row
462 .get::<_, Option<String>>(14)?
463 .and_then(|raw| serde_json::from_str(&raw).ok());
464 Ok(KnowledgeItemRecord {
465 id: row.get(0)?,
466 space_id: row.get(1)?,
467 coverage_key: row.get(2)?,
468 dedupe_key: row.get(3)?,
469 item_type: row.get(4)?,
470 title: row.get(5)?,
471 summary: row.get(6)?,
472 payload,
473 trust_level,
474 status,
475 run_id: row.get(10)?,
476 artifact_refs,
477 source_memory_ids,
478 freshness_expires_at_ms: row.get::<_, Option<i64>>(13)?.map(|value| value as u64),
479 metadata,
480 created_at_ms: row.get::<_, i64>(15)? as u64,
481 updated_at_ms: row.get::<_, i64>(16)? as u64,
482 })
483}
484
485fn row_to_knowledge_coverage(row: &Row) -> Result<KnowledgeCoverageRecord, rusqlite::Error> {
486 let metadata = row
487 .get::<_, Option<String>>(7)?
488 .and_then(|raw| serde_json::from_str(&raw).ok());
489 Ok(KnowledgeCoverageRecord {
490 coverage_key: row.get(0)?,
491 space_id: row.get(1)?,
492 latest_item_id: row.get(2)?,
493 latest_dedupe_key: row.get(3)?,
494 last_seen_at_ms: row.get::<_, i64>(4)? as u64,
495 last_promoted_at_ms: row.get::<_, Option<i64>>(5)?.map(|value| value as u64),
496 freshness_expires_at_ms: row.get::<_, Option<i64>>(6)?.map(|value| value as u64),
497 metadata,
498 })
499}
500
501fn truncate_string(s: &str, max_len: usize) -> String {
502 if s.len() <= max_len {
503 s.to_string()
504 } else {
505 format!("{}...", &s[..max_len.saturating_sub(3)])
506 }
507}
508
509fn build_fts_query(query: &str) -> String {
510 let tokens = query
511 .split_whitespace()
512 .filter_map(|tok| {
513 let cleaned =
514 tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
515 if cleaned.is_empty() {
516 None
517 } else {
518 Some(format!("\"{}\"", cleaned))
519 }
520 })
521 .collect::<Vec<_>>();
522 if tokens.is_empty() {
523 "\"\"".to_string()
524 } else {
525 tokens.join(" OR ")
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532 use serde_json::Value;
533 use tandem_orchestrator::{KnowledgeScope, KnowledgeTrustLevel};
534 use tempfile::TempDir;
535
536 async fn setup_test_db() -> (MemoryDatabase, TempDir) {
537 let temp_dir = TempDir::new().unwrap();
538 let db_path = temp_dir.path().join("test_memory.db");
539 let db = MemoryDatabase::new(&db_path).await.unwrap();
540 (db, temp_dir)
541 }
542
543 fn tenant_scope(org_id: &str, workspace_id: &str) -> MemoryTenantScope {
544 MemoryTenantScope {
545 org_id: org_id.to_string(),
546 workspace_id: workspace_id.to_string(),
547 deployment_id: Some("deployment-1".to_string()),
548 }
549 }
550
551 fn test_vector_chunk(
552 id: &str,
553 tier: MemoryTier,
554 tenant_scope: MemoryTenantScope,
555 content: &str,
556 source_hash: Option<&str>,
557 ) -> MemoryChunk {
558 MemoryChunk {
559 id: id.to_string(),
560 content: content.to_string(),
561 tier,
562 session_id: Some("shared-session".to_string()),
563 project_id: Some("shared-project".to_string()),
564 source: "test_vector".to_string(),
565 source_path: None,
566 source_mtime: None,
567 source_size: None,
568 source_hash: source_hash.map(ToString::to_string),
569 tenant_scope,
570 created_at: Utc::now(),
571 token_count: 4,
572 metadata: None,
573 }
574 }
575
576 fn embedding(first: f32, second: f32) -> Vec<f32> {
577 let mut values = vec![0.0f32; DEFAULT_EMBEDDING_DIMENSION];
578 values[0] = first;
579 values[1] = second;
580 values
581 }
582
583 #[tokio::test]
584 async fn test_init_schema() {
585 let (db, _temp) = setup_test_db().await;
586 let stats = db.get_stats().await.unwrap();
588 assert_eq!(stats.total_chunks, 0);
589 }
590
591 #[tokio::test]
592 async fn test_knowledge_registry_roundtrip() {
593 let (db, _temp) = setup_test_db().await;
594
595 let space = KnowledgeSpaceRecord {
596 id: "space-1".to_string(),
597 scope: tandem_orchestrator::KnowledgeScope::Project,
598 project_id: Some("project-1".to_string()),
599 namespace: Some("support".to_string()),
600 title: Some("Support Knowledge".to_string()),
601 description: Some("Reusable support guidance".to_string()),
602 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
603 metadata: Some(serde_json::json!({"owner": "ops"})),
604 created_at_ms: 1,
605 updated_at_ms: 2,
606 };
607 db.upsert_knowledge_space(&space).await.unwrap();
608
609 let item = KnowledgeItemRecord {
610 id: "item-1".to_string(),
611 space_id: space.id.clone(),
612 coverage_key: "project-1/support/debugging/slow-start".to_string(),
613 dedupe_key: "dedupe-1".to_string(),
614 item_type: "decision".to_string(),
615 title: "Restart service before retry".to_string(),
616 summary: Some("When the service is stale, restart before retrying.".to_string()),
617 payload: serde_json::json!({"action": "restart"}),
618 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
619 status: KnowledgeItemStatus::Promoted,
620 run_id: Some("run-1".to_string()),
621 artifact_refs: vec!["artifact://run-1/report".to_string()],
622 source_memory_ids: vec!["memory-1".to_string()],
623 freshness_expires_at_ms: Some(10),
624 metadata: Some(serde_json::json!({"source": "run"})),
625 created_at_ms: 3,
626 updated_at_ms: 4,
627 };
628 db.upsert_knowledge_item(&item).await.unwrap();
629
630 let coverage = KnowledgeCoverageRecord {
631 coverage_key: item.coverage_key.clone(),
632 space_id: space.id.clone(),
633 latest_item_id: Some(item.id.clone()),
634 latest_dedupe_key: Some(item.dedupe_key.clone()),
635 last_seen_at_ms: 5,
636 last_promoted_at_ms: Some(6),
637 freshness_expires_at_ms: Some(10),
638 metadata: Some(serde_json::json!({"coverage": true})),
639 };
640 db.upsert_knowledge_coverage(&coverage).await.unwrap();
641
642 let loaded_space = db.get_knowledge_space(&space.id).await.unwrap().unwrap();
643 assert_eq!(loaded_space.namespace.as_deref(), Some("support"));
644
645 let loaded_items = db
646 .list_knowledge_items(&space.id, Some(&item.coverage_key))
647 .await
648 .unwrap();
649 assert_eq!(loaded_items.len(), 1);
650 assert_eq!(loaded_items[0].title, item.title);
651
652 let loaded_coverage = db
653 .get_knowledge_coverage(&item.coverage_key, &space.id)
654 .await
655 .unwrap()
656 .unwrap();
657 assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
658 }
659
660 #[tokio::test]
661 async fn test_knowledge_registry_is_tenant_scoped() {
662 let (db, _temp) = setup_test_db().await;
663 let tenant_a = tenant_scope("org-a", "workspace-a");
664 let tenant_b = tenant_scope("org-b", "workspace-b");
665
666 let mut space_a = KnowledgeSpaceRecord {
667 id: "tenant-a-space".to_string(),
668 scope: tandem_orchestrator::KnowledgeScope::Project,
669 project_id: Some("shared-project".to_string()),
670 namespace: Some("shared-namespace".to_string()),
671 title: Some("Tenant A Knowledge".to_string()),
672 description: None,
673 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
674 metadata: None,
675 created_at_ms: 1,
676 updated_at_ms: 2,
677 };
678 let mut space_b = space_a.clone();
679 space_b.id = "tenant-b-space".to_string();
680 space_b.title = Some("Tenant B Knowledge".to_string());
681
682 db.upsert_knowledge_space_for_tenant(&space_a, &tenant_a)
683 .await
684 .unwrap();
685 db.upsert_knowledge_space_for_tenant(&space_b, &tenant_b)
686 .await
687 .unwrap();
688
689 let spaces_a = db
690 .list_knowledge_spaces_for_tenant(Some("shared-project"), &tenant_a)
691 .await
692 .unwrap();
693 let spaces_b = db
694 .list_knowledge_spaces_for_tenant(Some("shared-project"), &tenant_b)
695 .await
696 .unwrap();
697 assert_eq!(spaces_a.len(), 1);
698 assert_eq!(spaces_a[0].id, "tenant-a-space");
699 assert_eq!(spaces_b.len(), 1);
700 assert_eq!(spaces_b[0].id, "tenant-b-space");
701 assert!(db
702 .get_knowledge_space_for_tenant("tenant-b-space", &tenant_a)
703 .await
704 .unwrap()
705 .is_none());
706
707 let item_b = KnowledgeItemRecord {
708 id: "tenant-b-item".to_string(),
709 space_id: space_b.id.clone(),
710 coverage_key: "shared-project/topic/debugging".to_string(),
711 dedupe_key: "shared-dedupe".to_string(),
712 item_type: "decision".to_string(),
713 title: "Tenant B item".to_string(),
714 summary: None,
715 payload: serde_json::json!({"tenant": "b"}),
716 trust_level: tandem_orchestrator::KnowledgeTrustLevel::Working,
717 status: KnowledgeItemStatus::Working,
718 run_id: Some("run-b".to_string()),
719 artifact_refs: Vec::new(),
720 source_memory_ids: Vec::new(),
721 freshness_expires_at_ms: None,
722 metadata: None,
723 created_at_ms: 3,
724 updated_at_ms: 4,
725 };
726 db.upsert_knowledge_item_for_tenant(&item_b, &tenant_b)
727 .await
728 .unwrap();
729 assert!(db
730 .upsert_knowledge_item_for_tenant(&item_b, &tenant_a)
731 .await
732 .is_err());
733 assert!(db
734 .get_knowledge_item_for_tenant("tenant-b-item", &tenant_a)
735 .await
736 .unwrap()
737 .is_none());
738 assert!(db
739 .list_knowledge_items_for_tenant(&space_b.id, None, &tenant_a)
740 .await
741 .unwrap()
742 .is_empty());
743 assert_eq!(
744 db.list_knowledge_items_for_tenant(&space_b.id, None, &tenant_b)
745 .await
746 .unwrap()
747 .len(),
748 1
749 );
750
751 let coverage_b = KnowledgeCoverageRecord {
752 coverage_key: item_b.coverage_key.clone(),
753 space_id: space_b.id.clone(),
754 latest_item_id: Some(item_b.id.clone()),
755 latest_dedupe_key: Some(item_b.dedupe_key.clone()),
756 last_seen_at_ms: 5,
757 last_promoted_at_ms: None,
758 freshness_expires_at_ms: None,
759 metadata: None,
760 };
761 db.upsert_knowledge_coverage_for_tenant(&coverage_b, &tenant_b)
762 .await
763 .unwrap();
764 assert!(db
765 .upsert_knowledge_coverage_for_tenant(&coverage_b, &tenant_a)
766 .await
767 .is_err());
768 assert!(db
769 .get_knowledge_coverage_for_tenant(&coverage_b.coverage_key, &space_b.id, &tenant_a)
770 .await
771 .unwrap()
772 .is_none());
773 assert!(db
774 .get_knowledge_coverage_for_tenant(&coverage_b.coverage_key, &space_b.id, &tenant_b)
775 .await
776 .unwrap()
777 .is_some());
778
779 let promote = KnowledgePromotionRequest {
780 item_id: item_b.id.clone(),
781 target_status: KnowledgeItemStatus::Promoted,
782 promoted_at_ms: 10,
783 freshness_expires_at_ms: None,
784 reviewer_id: None,
785 approval_id: None,
786 reason: None,
787 };
788 assert!(db
789 .promote_knowledge_item_for_tenant(&promote, &tenant_a)
790 .await
791 .unwrap()
792 .is_none());
793 assert!(db
794 .promote_knowledge_item_for_tenant(&promote, &tenant_b)
795 .await
796 .unwrap()
797 .is_some());
798
799 space_a.updated_at_ms = 11;
800 db.upsert_knowledge_space_for_tenant(&space_a, &tenant_a)
801 .await
802 .unwrap();
803 }
804
805 #[tokio::test]
806 async fn test_store_and_retrieve_chunk() {
807 let (db, _temp) = setup_test_db().await;
808
809 let chunk = MemoryChunk {
810 id: "test-1".to_string(),
811 content: "Test content".to_string(),
812 tier: MemoryTier::Session,
813 session_id: Some("session-1".to_string()),
814 project_id: Some("project-1".to_string()),
815 source: "user_message".to_string(),
816 source_path: None,
817 source_mtime: None,
818 source_size: None,
819 source_hash: None,
820 tenant_scope: MemoryTenantScope::local(),
821 created_at: Utc::now(),
822 token_count: 10,
823 metadata: None,
824 };
825
826 let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
827 db.store_chunk(&chunk, &embedding).await.unwrap();
828
829 let chunks = db.get_session_chunks("session-1").await.unwrap();
830 assert_eq!(chunks.len(), 1);
831 assert_eq!(chunks[0].content, "Test content");
832 }
833
834 #[tokio::test]
835 async fn test_store_and_retrieve_global_chunk() {
836 let (db, _temp) = setup_test_db().await;
837
838 let chunk = MemoryChunk {
839 id: "global-1".to_string(),
840 content: "Global note".to_string(),
841 tier: MemoryTier::Global,
842 session_id: None,
843 project_id: None,
844 source: "agent_note".to_string(),
845 source_path: None,
846 source_mtime: None,
847 source_size: None,
848 source_hash: None,
849 tenant_scope: MemoryTenantScope::local(),
850 created_at: Utc::now(),
851 token_count: 7,
852 metadata: Some(serde_json::json!({"kind":"test"})),
853 };
854
855 let embedding = vec![0.2f32; DEFAULT_EMBEDDING_DIMENSION];
856 db.store_chunk(&chunk, &embedding).await.unwrap();
857
858 let chunks = db.get_global_chunks(10).await.unwrap();
859 assert_eq!(chunks.len(), 1);
860 assert_eq!(chunks[0].content, "Global note");
861 assert_eq!(chunks[0].source, "agent_note");
862 assert_eq!(chunks[0].token_count, 7);
863 assert_eq!(chunks[0].tier, MemoryTier::Global);
864 }
865
866 #[tokio::test]
867 async fn test_global_chunk_exists_by_source_hash() {
868 let (db, _temp) = setup_test_db().await;
869
870 let chunk = MemoryChunk {
871 id: "global-hash".to_string(),
872 content: "Global hash note".to_string(),
873 tier: MemoryTier::Global,
874 session_id: None,
875 project_id: None,
876 source: "chat_exchange".to_string(),
877 source_path: None,
878 source_mtime: None,
879 source_size: None,
880 source_hash: Some("hash-123".to_string()),
881 tenant_scope: MemoryTenantScope::local(),
882 created_at: Utc::now(),
883 token_count: 5,
884 metadata: None,
885 };
886
887 let embedding = vec![0.3f32; DEFAULT_EMBEDDING_DIMENSION];
888 db.store_chunk(&chunk, &embedding).await.unwrap();
889
890 assert!(db
891 .global_chunk_exists_by_source_hash("hash-123")
892 .await
893 .unwrap());
894 assert!(!db
895 .global_chunk_exists_by_source_hash("missing-hash")
896 .await
897 .unwrap());
898 }
899
900 #[tokio::test]
901 async fn test_vector_search_is_tenant_partitioned_before_top_k() {
902 let (db, _temp) = setup_test_db().await;
903 let tenant_a = tenant_scope("org-a", "workspace-a");
904 let tenant_b = tenant_scope("org-b", "workspace-b");
905 let query = embedding(1.0, 0.0);
906
907 db.store_chunk(
908 &test_vector_chunk(
909 "tenant-a-vector",
910 MemoryTier::Project,
911 tenant_a.clone(),
912 "tenant a memory",
913 None,
914 ),
915 &embedding(0.8, 0.2),
916 )
917 .await
918 .unwrap();
919 db.store_chunk(
920 &test_vector_chunk(
921 "tenant-b-vector",
922 MemoryTier::Project,
923 tenant_b.clone(),
924 "tenant b closer memory",
925 None,
926 ),
927 &query,
928 )
929 .await
930 .unwrap();
931
932 let results = db
933 .search_similar_for_tenant(
934 &query,
935 MemoryTier::Project,
936 Some("shared-project"),
937 None,
938 &tenant_a,
939 1,
940 )
941 .await
942 .unwrap();
943
944 assert_eq!(results.len(), 1);
945 assert_eq!(results[0].0.id, "tenant-a-vector");
946 assert_eq!(results[0].0.tenant_scope, tenant_a);
947 }
948
949 #[tokio::test]
950 async fn test_identical_vector_content_only_returns_request_tenant() {
951 let (db, _temp) = setup_test_db().await;
952 let tenant_a = tenant_scope("org-a", "workspace-a");
953 let tenant_b = tenant_scope("org-b", "workspace-b");
954 let vector = embedding(0.4, 0.6);
955
956 db.store_chunk(
957 &test_vector_chunk(
958 "tenant-a-identical",
959 MemoryTier::Global,
960 tenant_a.clone(),
961 "identical memory body",
962 Some("same-source-hash"),
963 ),
964 &vector,
965 )
966 .await
967 .unwrap();
968 db.store_chunk(
969 &test_vector_chunk(
970 "tenant-b-identical",
971 MemoryTier::Global,
972 tenant_b,
973 "identical memory body",
974 Some("same-source-hash"),
975 ),
976 &vector,
977 )
978 .await
979 .unwrap();
980
981 let results = db
982 .search_similar_for_tenant(&vector, MemoryTier::Global, None, None, &tenant_a, 10)
983 .await
984 .unwrap();
985
986 assert_eq!(results.len(), 1);
987 assert_eq!(results[0].0.id, "tenant-a-identical");
988 }
989
990 #[tokio::test]
991 async fn test_tenant_delete_does_not_remove_other_tenant_vector_memory() {
992 let (db, _temp) = setup_test_db().await;
993 let tenant_a = tenant_scope("org-a", "workspace-a");
994 let tenant_b = tenant_scope("org-b", "workspace-b");
995 let vector = embedding(0.2, 0.8);
996
997 db.store_chunk(
998 &test_vector_chunk(
999 "tenant-a-delete",
1000 MemoryTier::Global,
1001 tenant_a.clone(),
1002 "tenant a delete target",
1003 None,
1004 ),
1005 &vector,
1006 )
1007 .await
1008 .unwrap();
1009 db.store_chunk(
1010 &test_vector_chunk(
1011 "tenant-b-delete",
1012 MemoryTier::Global,
1013 tenant_b.clone(),
1014 "tenant b delete target",
1015 None,
1016 ),
1017 &vector,
1018 )
1019 .await
1020 .unwrap();
1021
1022 let cross_delete = db
1023 .delete_chunk_for_tenant(MemoryTier::Global, "tenant-b-delete", None, None, &tenant_a)
1024 .await
1025 .unwrap();
1026 assert_eq!(cross_delete, 0);
1027
1028 let tenant_b_results = db
1029 .search_similar_for_tenant(&vector, MemoryTier::Global, None, None, &tenant_b, 10)
1030 .await
1031 .unwrap();
1032 assert_eq!(tenant_b_results.len(), 1);
1033 assert_eq!(tenant_b_results[0].0.id, "tenant-b-delete");
1034
1035 let own_delete = db
1036 .delete_chunk_for_tenant(MemoryTier::Global, "tenant-a-delete", None, None, &tenant_a)
1037 .await
1038 .unwrap();
1039 assert_eq!(own_delete, 1);
1040 assert_eq!(
1041 db.search_similar_for_tenant(&vector, MemoryTier::Global, None, None, &tenant_b, 10)
1042 .await
1043 .unwrap()
1044 .len(),
1045 1
1046 );
1047 }
1048
1049 #[tokio::test]
1050 async fn test_same_source_hash_does_not_dedupe_across_tenants() {
1051 let (db, _temp) = setup_test_db().await;
1052 let tenant_a = tenant_scope("org-a", "workspace-a");
1053 let tenant_b = tenant_scope("org-b", "workspace-b");
1054 let source_hash = "shared-source-hash";
1055
1056 db.store_chunk(
1057 &test_vector_chunk(
1058 "tenant-a-hash",
1059 MemoryTier::Global,
1060 tenant_a.clone(),
1061 "same source hash",
1062 Some(source_hash),
1063 ),
1064 &embedding(0.7, 0.1),
1065 )
1066 .await
1067 .unwrap();
1068 db.store_chunk(
1069 &test_vector_chunk(
1070 "tenant-b-hash",
1071 MemoryTier::Global,
1072 tenant_b.clone(),
1073 "same source hash",
1074 Some(source_hash),
1075 ),
1076 &embedding(0.7, 0.1),
1077 )
1078 .await
1079 .unwrap();
1080
1081 assert!(db
1082 .global_chunk_exists_by_source_hash_for_tenant(source_hash, &tenant_a)
1083 .await
1084 .unwrap());
1085 assert!(db
1086 .global_chunk_exists_by_source_hash_for_tenant(source_hash, &tenant_b)
1087 .await
1088 .unwrap());
1089
1090 let tenant_a_chunks = db
1091 .get_global_chunks_for_tenant(&tenant_a, 10)
1092 .await
1093 .unwrap();
1094 let tenant_b_chunks = db
1095 .get_global_chunks_for_tenant(&tenant_b, 10)
1096 .await
1097 .unwrap();
1098 assert_eq!(tenant_a_chunks.len(), 1);
1099 assert_eq!(tenant_b_chunks.len(), 1);
1100 assert_ne!(tenant_a_chunks[0].id, tenant_b_chunks[0].id);
1101 }
1102
1103 #[tokio::test]
1104 async fn test_memory_stats_are_tenant_scoped() {
1105 let (db, _temp) = setup_test_db().await;
1106 let tenant_a = tenant_scope("org-a", "workspace-a");
1107 let tenant_b = tenant_scope("org-b", "workspace-b");
1108
1109 db.store_chunk(
1110 &test_vector_chunk(
1111 "tenant-a-session-stat",
1112 MemoryTier::Session,
1113 tenant_a.clone(),
1114 "tenant a session stats",
1115 None,
1116 ),
1117 &embedding(0.1, 0.2),
1118 )
1119 .await
1120 .unwrap();
1121 db.store_chunk(
1122 &test_vector_chunk(
1123 "tenant-a-project-stat",
1124 MemoryTier::Project,
1125 tenant_a.clone(),
1126 "tenant a project stats",
1127 None,
1128 ),
1129 &embedding(0.2, 0.3),
1130 )
1131 .await
1132 .unwrap();
1133 db.store_chunk(
1134 &test_vector_chunk(
1135 "tenant-a-global-stat",
1136 MemoryTier::Global,
1137 tenant_a.clone(),
1138 "tenant a global stats",
1139 None,
1140 ),
1141 &embedding(0.3, 0.4),
1142 )
1143 .await
1144 .unwrap();
1145 db.store_chunk(
1146 &test_vector_chunk(
1147 "tenant-b-project-stat",
1148 MemoryTier::Project,
1149 tenant_b.clone(),
1150 "tenant b project stats should not count",
1151 None,
1152 ),
1153 &embedding(0.4, 0.5),
1154 )
1155 .await
1156 .unwrap();
1157
1158 db.log_cleanup_for_tenant(
1159 "test",
1160 MemoryTier::Project,
1161 Some("shared-project"),
1162 None,
1163 1,
1164 10,
1165 &tenant_b,
1166 )
1167 .await
1168 .unwrap();
1169
1170 let tenant_a_stats = db.get_stats_for_tenant(&tenant_a).await.unwrap();
1171 assert_eq!(tenant_a_stats.session_chunks, 1);
1172 assert_eq!(tenant_a_stats.project_chunks, 1);
1173 assert_eq!(tenant_a_stats.global_chunks, 1);
1174 assert_eq!(tenant_a_stats.total_chunks, 3);
1175 assert!(tenant_a_stats.total_bytes > 0);
1176 assert!(tenant_a_stats.last_cleanup.is_none());
1177
1178 let tenant_b_stats = db.get_stats_for_tenant(&tenant_b).await.unwrap();
1179 assert_eq!(tenant_b_stats.session_chunks, 0);
1180 assert_eq!(tenant_b_stats.project_chunks, 1);
1181 assert_eq!(tenant_b_stats.global_chunks, 0);
1182 assert_eq!(tenant_b_stats.total_chunks, 1);
1183 assert!(tenant_b_stats.last_cleanup.is_some());
1184 }
1185
1186 #[tokio::test]
1187 async fn test_project_stats_are_tenant_scoped_for_vector_chunks() {
1188 let (db, _temp) = setup_test_db().await;
1189 let tenant_a = tenant_scope("org-a", "workspace-a");
1190 let tenant_b = tenant_scope("org-b", "workspace-b");
1191
1192 db.store_chunk(
1193 &test_vector_chunk(
1194 "tenant-a-project-stat-1",
1195 MemoryTier::Project,
1196 tenant_a.clone(),
1197 "tenant a project stat one",
1198 None,
1199 ),
1200 &embedding(0.5, 0.1),
1201 )
1202 .await
1203 .unwrap();
1204 let mut tenant_a_file = test_vector_chunk(
1205 "tenant-a-project-file-stat",
1206 MemoryTier::Project,
1207 tenant_a.clone(),
1208 "tenant a file stat",
1209 None,
1210 );
1211 tenant_a_file.source = "file".to_string();
1212 db.store_chunk(&tenant_a_file, &embedding(0.6, 0.1))
1213 .await
1214 .unwrap();
1215 db.store_chunk(
1216 &test_vector_chunk(
1217 "tenant-b-project-stat-1",
1218 MemoryTier::Project,
1219 tenant_b,
1220 "tenant b project stat",
1221 None,
1222 ),
1223 &embedding(0.7, 0.1),
1224 )
1225 .await
1226 .unwrap();
1227
1228 let stats = db
1229 .get_project_stats_for_tenant("shared-project", &tenant_a)
1230 .await
1231 .unwrap();
1232 assert_eq!(stats.project_chunks, 2);
1233 assert_eq!(stats.file_index_chunks, 1);
1234 assert!(stats.project_bytes > 0);
1235 assert!(stats.file_index_bytes > 0);
1236 }
1237
1238 #[tokio::test]
1239 async fn test_import_index_paths_are_tenant_scoped() {
1240 let (db, _temp) = setup_test_db().await;
1241 let tenant_a = tenant_scope("org-a", "workspace-a");
1242 let tenant_b = tenant_scope("org-b", "workspace-b");
1243
1244 db.upsert_import_index_entry_for_tenant(
1245 MemoryTier::Project,
1246 None,
1247 Some("shared-project"),
1248 "repo/README.md",
1249 10,
1250 100,
1251 "hash-a",
1252 &tenant_a,
1253 )
1254 .await
1255 .unwrap();
1256 db.upsert_import_index_entry_for_tenant(
1257 MemoryTier::Project,
1258 None,
1259 Some("shared-project"),
1260 "repo/README.md",
1261 20,
1262 200,
1263 "hash-b",
1264 &tenant_b,
1265 )
1266 .await
1267 .unwrap();
1268
1269 let tenant_a_paths = db
1270 .list_import_index_paths_for_tenant(
1271 MemoryTier::Project,
1272 None,
1273 Some("shared-project"),
1274 &tenant_a,
1275 )
1276 .await
1277 .unwrap();
1278 assert_eq!(tenant_a_paths, vec!["repo/README.md".to_string()]);
1279
1280 let tenant_a_entry = db
1281 .get_import_index_entry_for_tenant(
1282 MemoryTier::Project,
1283 None,
1284 Some("shared-project"),
1285 "repo/README.md",
1286 &tenant_a,
1287 )
1288 .await
1289 .unwrap()
1290 .unwrap();
1291 let tenant_b_entry = db
1292 .get_import_index_entry_for_tenant(
1293 MemoryTier::Project,
1294 None,
1295 Some("shared-project"),
1296 "repo/README.md",
1297 &tenant_b,
1298 )
1299 .await
1300 .unwrap()
1301 .unwrap();
1302 assert_eq!(tenant_a_entry.2, "hash-a");
1303 assert_eq!(tenant_b_entry.2, "hash-b");
1304 }
1305
1306 #[tokio::test]
1307 async fn test_delete_import_index_entry_is_tenant_scoped() {
1308 let (db, _temp) = setup_test_db().await;
1309 let tenant_a = tenant_scope("org-a", "workspace-a");
1310 let tenant_b = tenant_scope("org-b", "workspace-b");
1311
1312 for (tenant, hash) in [(&tenant_a, "hash-a"), (&tenant_b, "hash-b")] {
1313 db.upsert_import_index_entry_for_tenant(
1314 MemoryTier::Global,
1315 None,
1316 None,
1317 "shared/path.md",
1318 1,
1319 10,
1320 hash,
1321 tenant,
1322 )
1323 .await
1324 .unwrap();
1325 }
1326
1327 db.delete_import_index_entry_for_tenant(
1328 MemoryTier::Global,
1329 None,
1330 None,
1331 "shared/path.md",
1332 &tenant_a,
1333 )
1334 .await
1335 .unwrap();
1336
1337 assert!(db
1338 .get_import_index_entry_for_tenant(
1339 MemoryTier::Global,
1340 None,
1341 None,
1342 "shared/path.md",
1343 &tenant_a
1344 )
1345 .await
1346 .unwrap()
1347 .is_none());
1348 let tenant_b_entry = db
1349 .get_import_index_entry_for_tenant(
1350 MemoryTier::Global,
1351 None,
1352 None,
1353 "shared/path.md",
1354 &tenant_b,
1355 )
1356 .await
1357 .unwrap()
1358 .unwrap();
1359 assert_eq!(tenant_b_entry.2, "hash-b");
1360 }
1361
1362 #[tokio::test]
1363 async fn test_file_chunk_delete_by_path_is_tenant_scoped() {
1364 let (db, _temp) = setup_test_db().await;
1365 let tenant_a = tenant_scope("org-a", "workspace-a");
1366 let tenant_b = tenant_scope("org-b", "workspace-b");
1367
1368 let mut chunk_a = test_vector_chunk(
1369 "tenant-a-file-delete",
1370 MemoryTier::Project,
1371 tenant_a.clone(),
1372 "same file content",
1373 Some("same-hash"),
1374 );
1375 chunk_a.source = "file".to_string();
1376 chunk_a.source_path = Some("repo/file.md".to_string());
1377 let mut chunk_b = test_vector_chunk(
1378 "tenant-b-file-delete",
1379 MemoryTier::Project,
1380 tenant_b.clone(),
1381 "same file content",
1382 Some("same-hash"),
1383 );
1384 chunk_b.source = "file".to_string();
1385 chunk_b.source_path = Some("repo/file.md".to_string());
1386
1387 db.store_chunk(&chunk_a, &embedding(0.1, 0.2))
1388 .await
1389 .unwrap();
1390 db.store_chunk(&chunk_b, &embedding(0.1, 0.2))
1391 .await
1392 .unwrap();
1393
1394 let (deleted, _) = db
1395 .delete_file_chunks_by_path_for_tenant(
1396 MemoryTier::Project,
1397 None,
1398 Some("shared-project"),
1399 "repo/file.md",
1400 &tenant_a,
1401 )
1402 .await
1403 .unwrap();
1404 assert_eq!(deleted, 1);
1405
1406 assert!(db
1407 .get_project_chunks_for_tenant("shared-project", &tenant_a)
1408 .await
1409 .unwrap()
1410 .is_empty());
1411 let tenant_b_chunks = db
1412 .get_project_chunks_for_tenant("shared-project", &tenant_b)
1413 .await
1414 .unwrap();
1415 assert_eq!(tenant_b_chunks.len(), 1);
1416 assert_eq!(tenant_b_chunks[0].id, "tenant-b-file-delete");
1417 }
1418
1419 #[tokio::test]
1420 async fn test_project_file_index_clear_is_tenant_scoped() {
1421 let (db, _temp) = setup_test_db().await;
1422 let tenant_a = tenant_scope("org-a", "workspace-a");
1423 let tenant_b = tenant_scope("org-b", "workspace-b");
1424
1425 for (tenant, id, hash) in [
1426 (&tenant_a, "tenant-a-clear-file-index", "hash-a"),
1427 (&tenant_b, "tenant-b-clear-file-index", "hash-b"),
1428 ] {
1429 db.upsert_file_index_entry_for_tenant(
1430 "shared-project",
1431 "repo/file.md",
1432 1,
1433 10,
1434 hash,
1435 tenant,
1436 )
1437 .await
1438 .unwrap();
1439 db.upsert_project_index_status_for_tenant("shared-project", 5, 4, 3, 2, 1, tenant)
1440 .await
1441 .unwrap();
1442 let mut chunk = test_vector_chunk(
1443 id,
1444 MemoryTier::Project,
1445 tenant.clone(),
1446 "file index clear content",
1447 Some(hash),
1448 );
1449 chunk.source = "file".to_string();
1450 chunk.source_path = Some("repo/file.md".to_string());
1451 db.store_chunk(&chunk, &embedding(0.4, 0.5)).await.unwrap();
1452 }
1453
1454 let result = db
1455 .clear_project_file_index_for_tenant("shared-project", false, &tenant_a)
1456 .await
1457 .unwrap();
1458 assert_eq!(result.chunks_deleted, 1);
1459
1460 assert_eq!(
1461 db.project_file_index_count_for_tenant("shared-project", &tenant_a)
1462 .await
1463 .unwrap(),
1464 0
1465 );
1466 assert!(db
1467 .get_project_chunks_for_tenant("shared-project", &tenant_a)
1468 .await
1469 .unwrap()
1470 .is_empty());
1471
1472 assert_eq!(
1473 db.project_file_index_count_for_tenant("shared-project", &tenant_b)
1474 .await
1475 .unwrap(),
1476 1
1477 );
1478 assert_eq!(
1479 db.get_project_chunks_for_tenant("shared-project", &tenant_b)
1480 .await
1481 .unwrap()
1482 .len(),
1483 1
1484 );
1485 let tenant_b_stats = db
1486 .get_project_stats_for_tenant("shared-project", &tenant_b)
1487 .await
1488 .unwrap();
1489 assert_eq!(tenant_b_stats.last_indexed_files, Some(3));
1490 }
1491
1492 #[tokio::test]
1493 async fn test_project_stats_file_index_is_tenant_scoped() {
1494 let (db, _temp) = setup_test_db().await;
1495 let tenant_a = tenant_scope("org-a", "workspace-a");
1496 let tenant_b = tenant_scope("org-b", "workspace-b");
1497
1498 db.upsert_file_index_entry_for_tenant(
1499 "shared-project",
1500 "repo/a.md",
1501 1,
1502 10,
1503 "hash-a",
1504 &tenant_a,
1505 )
1506 .await
1507 .unwrap();
1508 db.upsert_project_index_status_for_tenant("shared-project", 10, 9, 8, 1, 0, &tenant_a)
1509 .await
1510 .unwrap();
1511 db.upsert_file_index_entry_for_tenant(
1512 "shared-project",
1513 "repo/b.md",
1514 2,
1515 20,
1516 "hash-b",
1517 &tenant_b,
1518 )
1519 .await
1520 .unwrap();
1521 db.upsert_project_index_status_for_tenant("shared-project", 3, 2, 1, 1, 0, &tenant_b)
1522 .await
1523 .unwrap();
1524
1525 let stats_a = db
1526 .get_project_stats_for_tenant("shared-project", &tenant_a)
1527 .await
1528 .unwrap();
1529 let stats_b = db
1530 .get_project_stats_for_tenant("shared-project", &tenant_b)
1531 .await
1532 .unwrap();
1533
1534 assert_eq!(stats_a.indexed_files, 1);
1535 assert_eq!(stats_a.last_total_files, Some(10));
1536 assert_eq!(stats_a.last_indexed_files, Some(8));
1537 assert_eq!(stats_b.indexed_files, 1);
1538 assert_eq!(stats_b.last_total_files, Some(3));
1539 assert_eq!(stats_b.last_indexed_files, Some(1));
1540 }
1541
1542 #[tokio::test]
1543 async fn test_clear_session_and_project_memory_are_tenant_scoped() {
1544 let (db, _temp) = setup_test_db().await;
1545 let tenant_a = tenant_scope("org-a", "workspace-a");
1546 let tenant_b = tenant_scope("org-b", "workspace-b");
1547
1548 db.store_chunk(
1549 &test_vector_chunk(
1550 "tenant-a-clear-session",
1551 MemoryTier::Session,
1552 tenant_a.clone(),
1553 "tenant a session clear target",
1554 None,
1555 ),
1556 &embedding(0.1, 0.9),
1557 )
1558 .await
1559 .unwrap();
1560 db.store_chunk(
1561 &test_vector_chunk(
1562 "tenant-b-clear-session",
1563 MemoryTier::Session,
1564 tenant_b.clone(),
1565 "tenant b session must remain",
1566 None,
1567 ),
1568 &embedding(0.1, 0.9),
1569 )
1570 .await
1571 .unwrap();
1572 db.store_chunk(
1573 &test_vector_chunk(
1574 "tenant-a-clear-project",
1575 MemoryTier::Project,
1576 tenant_a.clone(),
1577 "tenant a project clear target",
1578 None,
1579 ),
1580 &embedding(0.2, 0.8),
1581 )
1582 .await
1583 .unwrap();
1584 db.store_chunk(
1585 &test_vector_chunk(
1586 "tenant-b-clear-project",
1587 MemoryTier::Project,
1588 tenant_b.clone(),
1589 "tenant b project must remain",
1590 None,
1591 ),
1592 &embedding(0.2, 0.8),
1593 )
1594 .await
1595 .unwrap();
1596
1597 assert_eq!(
1598 db.clear_session_memory_for_tenant("shared-session", &tenant_a)
1599 .await
1600 .unwrap(),
1601 1
1602 );
1603 assert_eq!(
1604 db.clear_project_memory_for_tenant("shared-project", &tenant_a)
1605 .await
1606 .unwrap(),
1607 1
1608 );
1609
1610 let tenant_b_session = db
1611 .get_session_chunks_for_tenant("shared-session", &tenant_b)
1612 .await
1613 .unwrap();
1614 let tenant_b_project = db
1615 .get_project_chunks_for_tenant("shared-project", &tenant_b)
1616 .await
1617 .unwrap();
1618 assert_eq!(tenant_b_session.len(), 1);
1619 assert_eq!(tenant_b_project.len(), 1);
1620 }
1621
1622 #[tokio::test]
1623 async fn test_old_session_cleanup_is_tenant_scoped() {
1624 let (db, _temp) = setup_test_db().await;
1625 let tenant_a = tenant_scope("org-a", "workspace-a");
1626 let tenant_b = tenant_scope("org-b", "workspace-b");
1627 let old = Utc::now() - chrono::Duration::days(90);
1628
1629 let mut tenant_a_old = test_vector_chunk(
1630 "tenant-a-old-session",
1631 MemoryTier::Session,
1632 tenant_a.clone(),
1633 "tenant a old session",
1634 None,
1635 );
1636 tenant_a_old.created_at = old;
1637 db.store_chunk(&tenant_a_old, &embedding(0.3, 0.7))
1638 .await
1639 .unwrap();
1640
1641 let mut tenant_b_old = test_vector_chunk(
1642 "tenant-b-old-session",
1643 MemoryTier::Session,
1644 tenant_b.clone(),
1645 "tenant b old session",
1646 None,
1647 );
1648 tenant_b_old.created_at = old;
1649 db.store_chunk(&tenant_b_old, &embedding(0.3, 0.7))
1650 .await
1651 .unwrap();
1652
1653 assert_eq!(
1654 db.cleanup_old_sessions_for_tenant(30, &tenant_a)
1655 .await
1656 .unwrap(),
1657 1
1658 );
1659 assert!(db
1660 .get_session_chunks_for_tenant("shared-session", &tenant_a)
1661 .await
1662 .unwrap()
1663 .is_empty());
1664 assert_eq!(
1665 db.get_session_chunks_for_tenant("shared-session", &tenant_b)
1666 .await
1667 .unwrap()
1668 .len(),
1669 1
1670 );
1671 }
1672
1673 #[tokio::test]
1674 async fn test_config_crud() {
1675 let (db, _temp) = setup_test_db().await;
1676
1677 let config = db.get_or_create_config("project-1").await.unwrap();
1678 assert_eq!(config.max_chunks, 10000);
1679
1680 let new_config = MemoryConfig {
1681 max_chunks: 5000,
1682 ..Default::default()
1683 };
1684 db.update_config("project-1", &new_config).await.unwrap();
1685
1686 let updated = db.get_or_create_config("project-1").await.unwrap();
1687 assert_eq!(updated.max_chunks, 5000);
1688 }
1689
1690 #[tokio::test]
1691 async fn test_config_crud_is_tenant_scoped() {
1692 let (db, _temp) = setup_test_db().await;
1693 let tenant_a = tenant_scope("org-a", "workspace-a");
1694 let tenant_b = tenant_scope("org-b", "workspace-b");
1695
1696 let config_a = MemoryConfig {
1697 max_chunks: 111,
1698 session_retention_days: 7,
1699 ..Default::default()
1700 };
1701 let config_b = MemoryConfig {
1702 max_chunks: 222,
1703 session_retention_days: 14,
1704 ..Default::default()
1705 };
1706 db.update_config_for_tenant("shared-project", &config_a, &tenant_a)
1707 .await
1708 .unwrap();
1709 db.update_config_for_tenant("shared-project", &config_b, &tenant_b)
1710 .await
1711 .unwrap();
1712
1713 let loaded_a = db
1714 .get_or_create_config_for_tenant("shared-project", &tenant_a)
1715 .await
1716 .unwrap();
1717 let loaded_b = db
1718 .get_or_create_config_for_tenant("shared-project", &tenant_b)
1719 .await
1720 .unwrap();
1721
1722 assert_eq!(loaded_a.max_chunks, 111);
1723 assert_eq!(loaded_a.session_retention_days, 7);
1724 assert_eq!(loaded_b.max_chunks, 222);
1725 assert_eq!(loaded_b.session_retention_days, 14);
1726 }
1727
1728 #[tokio::test]
1729 async fn test_prune_old_session_chunks_is_tenant_scoped() {
1730 let (db, _temp) = setup_test_db().await;
1731 let tenant_a = tenant_scope("org-a", "workspace-a");
1732 let tenant_b = tenant_scope("org-b", "workspace-b");
1733 let old = Utc::now() - chrono::Duration::days(10);
1734
1735 let mut chunk_a = test_vector_chunk(
1736 "tenant-a-old-session-prune",
1737 MemoryTier::Session,
1738 tenant_a.clone(),
1739 "old tenant a session chunk",
1740 None,
1741 );
1742 chunk_a.created_at = old;
1743 let mut chunk_b = test_vector_chunk(
1744 "tenant-b-old-session-prune",
1745 MemoryTier::Session,
1746 tenant_b.clone(),
1747 "old tenant b session chunk",
1748 None,
1749 );
1750 chunk_b.created_at = old;
1751
1752 db.store_chunk(&chunk_a, &embedding(0.2, 0.8))
1753 .await
1754 .unwrap();
1755 db.store_chunk(&chunk_b, &embedding(0.2, 0.8))
1756 .await
1757 .unwrap();
1758
1759 let deleted = db
1760 .prune_old_session_chunks_for_tenant(1, &tenant_a)
1761 .await
1762 .unwrap();
1763 assert_eq!(deleted, 1);
1764 assert!(db
1765 .get_session_chunks_for_tenant("shared-session", &tenant_a)
1766 .await
1767 .unwrap()
1768 .is_empty());
1769 assert_eq!(
1770 db.get_session_chunks_for_tenant("shared-session", &tenant_b)
1771 .await
1772 .unwrap()
1773 .len(),
1774 1
1775 );
1776 }
1777
1778 #[tokio::test]
1779 async fn test_run_hygiene_reads_tenant_scoped_global_config() {
1780 let (db, _temp) = setup_test_db().await;
1781 let tenant_a = tenant_scope("org-a", "workspace-a");
1782 let tenant_b = tenant_scope("org-b", "workspace-b");
1783 let old = Utc::now() - chrono::Duration::days(10);
1784
1785 let config_a = MemoryConfig {
1786 session_retention_days: 1,
1787 ..Default::default()
1788 };
1789 let config_b = MemoryConfig {
1790 session_retention_days: 0,
1791 ..Default::default()
1792 };
1793 db.update_config_for_tenant("__global__", &config_a, &tenant_a)
1794 .await
1795 .unwrap();
1796 db.update_config_for_tenant("__global__", &config_b, &tenant_b)
1797 .await
1798 .unwrap();
1799
1800 let mut chunk_a = test_vector_chunk(
1801 "tenant-a-hygiene",
1802 MemoryTier::Session,
1803 tenant_a.clone(),
1804 "tenant a old hygiene chunk",
1805 None,
1806 );
1807 chunk_a.created_at = old;
1808 let mut chunk_b = test_vector_chunk(
1809 "tenant-b-hygiene",
1810 MemoryTier::Session,
1811 tenant_b.clone(),
1812 "tenant b old hygiene chunk",
1813 None,
1814 );
1815 chunk_b.created_at = old;
1816
1817 db.store_chunk(&chunk_a, &embedding(0.3, 0.7))
1818 .await
1819 .unwrap();
1820 db.store_chunk(&chunk_b, &embedding(0.3, 0.7))
1821 .await
1822 .unwrap();
1823
1824 let deleted = db.run_hygiene_for_tenant(0, &tenant_a).await.unwrap();
1825 assert_eq!(deleted, 1);
1826 assert!(db
1827 .get_session_chunks_for_tenant("shared-session", &tenant_a)
1828 .await
1829 .unwrap()
1830 .is_empty());
1831 assert_eq!(
1832 db.get_session_chunks_for_tenant("shared-session", &tenant_b)
1833 .await
1834 .unwrap()
1835 .len(),
1836 1
1837 );
1838 }
1839
1840 #[tokio::test]
1841 async fn test_global_memory_put_search_and_dedup() {
1842 let (db, _temp) = setup_test_db().await;
1843 let now = chrono::Utc::now().timestamp_millis() as u64;
1844 let record = GlobalMemoryRecord {
1845 id: "gm-1".to_string(),
1846 user_id: "user-a".to_string(),
1847 source_type: "user_message".to_string(),
1848 content: "remember rust workspace layout".to_string(),
1849 content_hash: "h1".to_string(),
1850 run_id: "run-1".to_string(),
1851 session_id: Some("s1".to_string()),
1852 message_id: Some("m1".to_string()),
1853 tool_name: None,
1854 project_tag: Some("proj-x".to_string()),
1855 channel_tag: None,
1856 host_tag: None,
1857 metadata: None,
1858 provenance: None,
1859 redaction_status: "passed".to_string(),
1860 redaction_count: 0,
1861 visibility: "private".to_string(),
1862 demoted: false,
1863 score_boost: 0.0,
1864 created_at_ms: now,
1865 updated_at_ms: now,
1866 expires_at_ms: None,
1867 };
1868 let first = db.put_global_memory_record(&record).await.unwrap();
1869 assert!(first.stored);
1870 let second = db.put_global_memory_record(&record).await.unwrap();
1871 assert!(second.deduped);
1872
1873 let hits = db
1874 .search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
1875 .await
1876 .unwrap();
1877 assert!(!hits.is_empty());
1878 assert_eq!(hits[0].record.id, "gm-1");
1879 }
1880
1881 #[tokio::test]
1882 async fn test_global_memory_tenant_filtered_fts_list_get_and_delete() {
1883 let (db, _temp) = setup_test_db().await;
1884 let now = chrono::Utc::now().timestamp_millis() as u64;
1885 let tenant_a = GlobalMemoryRecord {
1886 id: "gm-tenant-a".to_string(),
1887 user_id: "same-user".to_string(),
1888 source_type: "note".to_string(),
1889 content: "shared tenant phrase".to_string(),
1890 content_hash: "same-hash".to_string(),
1891 run_id: "same-run".to_string(),
1892 session_id: Some("same-session".to_string()),
1893 message_id: Some("same-message".to_string()),
1894 tool_name: None,
1895 project_tag: Some("same-project".to_string()),
1896 channel_tag: None,
1897 host_tag: None,
1898 metadata: None,
1899 provenance: Some(serde_json::json!({
1900 "tenant_context": {
1901 "org_id": "org-a",
1902 "workspace_id": "workspace-a",
1903 "source": "explicit"
1904 }
1905 })),
1906 redaction_status: "passed".to_string(),
1907 redaction_count: 0,
1908 visibility: "private".to_string(),
1909 demoted: false,
1910 score_boost: 0.0,
1911 created_at_ms: now,
1912 updated_at_ms: now,
1913 expires_at_ms: None,
1914 };
1915 let mut tenant_b = tenant_a.clone();
1916 tenant_b.id = "gm-tenant-b".to_string();
1917 tenant_b.provenance = Some(serde_json::json!({
1918 "tenant_context": {
1919 "org_id": "org-b",
1920 "workspace_id": "workspace-b",
1921 "source": "explicit"
1922 }
1923 }));
1924
1925 assert!(db.put_global_memory_record(&tenant_a).await.unwrap().stored);
1926 assert!(db.put_global_memory_record(&tenant_b).await.unwrap().stored);
1927
1928 let hits_a = db
1929 .search_global_memory_for_tenant(
1930 "org-a",
1931 "workspace-a",
1932 None,
1933 "same-user",
1934 "shared tenant phrase",
1935 10,
1936 Some("same-project"),
1937 None,
1938 None,
1939 )
1940 .await
1941 .unwrap();
1942 assert_eq!(hits_a.len(), 1);
1943 assert_eq!(hits_a[0].record.id, "gm-tenant-a");
1944
1945 let rows_b = db
1946 .list_global_memory_for_tenant(
1947 "org-b",
1948 "workspace-b",
1949 None,
1950 "same-user",
1951 Some("shared tenant"),
1952 Some("same-project"),
1953 None,
1954 10,
1955 0,
1956 )
1957 .await
1958 .unwrap();
1959 assert_eq!(rows_b.len(), 1);
1960 assert_eq!(rows_b[0].id, "gm-tenant-b");
1961
1962 assert!(db
1963 .get_global_memory_for_tenant("gm-tenant-b", "org-a", "workspace-a", None)
1964 .await
1965 .unwrap()
1966 .is_none());
1967 assert!(!db
1968 .delete_global_memory_for_tenant("gm-tenant-b", "org-a", "workspace-a", None)
1969 .await
1970 .unwrap());
1971 assert!(db
1972 .delete_global_memory_for_tenant("gm-tenant-b", "org-b", "workspace-b", None)
1973 .await
1974 .unwrap());
1975 }
1976
1977 #[tokio::test]
1978 async fn test_knowledge_registry_round_trip() {
1979 let (db, _temp) = setup_test_db().await;
1980 let now = chrono::Utc::now().timestamp_millis() as u64;
1981
1982 let space = KnowledgeSpaceRecord {
1983 id: "space-1".to_string(),
1984 scope: KnowledgeScope::Project,
1985 project_id: Some("project-1".to_string()),
1986 namespace: Some("marketing/positioning".to_string()),
1987 title: Some("Marketing positioning".to_string()),
1988 description: Some("Reusable positioning guidance".to_string()),
1989 trust_level: KnowledgeTrustLevel::ApprovedDefault,
1990 metadata: Some(serde_json::json!({"owner":"marketing"})),
1991 created_at_ms: now,
1992 updated_at_ms: now,
1993 };
1994 db.upsert_knowledge_space(&space).await.unwrap();
1995
1996 let loaded_space = db.get_knowledge_space("space-1").await.unwrap().unwrap();
1997 assert_eq!(loaded_space.id, "space-1");
1998 assert_eq!(loaded_space.scope, KnowledgeScope::Project);
1999 assert_eq!(loaded_space.project_id.as_deref(), Some("project-1"));
2000 assert_eq!(
2001 loaded_space.namespace.as_deref(),
2002 Some("marketing/positioning")
2003 );
2004
2005 let item = KnowledgeItemRecord {
2006 id: "item-1".to_string(),
2007 space_id: "space-1".to_string(),
2008 coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
2009 dedupe_key: "item-1-dedupe".to_string(),
2010 item_type: "evidence".to_string(),
2011 title: "Pricing sensitivity observation".to_string(),
2012 summary: Some("Customers reacted to annual pricing changes".to_string()),
2013 payload: serde_json::json!({"claim":"Annual pricing changes created friction"}),
2014 trust_level: KnowledgeTrustLevel::Promoted,
2015 status: KnowledgeItemStatus::Promoted,
2016 run_id: Some("run-1".to_string()),
2017 artifact_refs: vec!["artifact://run-1/research-sources".to_string()],
2018 source_memory_ids: vec!["memory-1".to_string()],
2019 freshness_expires_at_ms: Some(now + 86_400_000),
2020 metadata: Some(serde_json::json!({"source_kind":"web"})),
2021 created_at_ms: now,
2022 updated_at_ms: now,
2023 };
2024 db.upsert_knowledge_item(&item).await.unwrap();
2025
2026 let loaded_item = db.get_knowledge_item("item-1").await.unwrap().unwrap();
2027 assert_eq!(loaded_item.id, "item-1");
2028 assert_eq!(loaded_item.space_id, "space-1");
2029 assert_eq!(
2030 loaded_item.coverage_key,
2031 "project-1::marketing/positioning::strategy::pricing"
2032 );
2033 assert_eq!(loaded_item.status, KnowledgeItemStatus::Promoted);
2034 assert_eq!(
2035 loaded_item.artifact_refs,
2036 vec!["artifact://run-1/research-sources".to_string()]
2037 );
2038
2039 let by_space = db.list_knowledge_items("space-1", None).await.unwrap();
2040 assert_eq!(by_space.len(), 1);
2041 let by_coverage = db
2042 .list_knowledge_items(
2043 "space-1",
2044 Some("project-1::marketing/positioning::strategy::pricing"),
2045 )
2046 .await
2047 .unwrap();
2048 assert_eq!(by_coverage.len(), 1);
2049
2050 let coverage = KnowledgeCoverageRecord {
2051 coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
2052 space_id: "space-1".to_string(),
2053 latest_item_id: Some("item-1".to_string()),
2054 latest_dedupe_key: Some("item-1-dedupe".to_string()),
2055 last_seen_at_ms: now,
2056 last_promoted_at_ms: Some(now),
2057 freshness_expires_at_ms: Some(now + 86_400_000),
2058 metadata: Some(serde_json::json!({"reuse_reason":"same topic"})),
2059 };
2060 db.upsert_knowledge_coverage(&coverage).await.unwrap();
2061
2062 let loaded_coverage = db
2063 .get_knowledge_coverage(
2064 "project-1::marketing/positioning::strategy::pricing",
2065 "space-1",
2066 )
2067 .await
2068 .unwrap()
2069 .unwrap();
2070 assert_eq!(loaded_coverage.space_id, "space-1");
2071 assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
2072 assert_eq!(
2073 loaded_coverage.latest_dedupe_key.as_deref(),
2074 Some("item-1-dedupe")
2075 );
2076 }
2077
2078 #[tokio::test]
2079 async fn test_knowledge_promotion_working_to_promoted_updates_coverage() {
2080 let (db, _temp) = setup_test_db().await;
2081 let now = chrono::Utc::now().timestamp_millis() as u64;
2082
2083 let space = KnowledgeSpaceRecord {
2084 id: "space-promote-1".to_string(),
2085 scope: KnowledgeScope::Project,
2086 project_id: Some("project-1".to_string()),
2087 namespace: Some("engineering/debugging".to_string()),
2088 title: Some("Engineering debugging".to_string()),
2089 description: Some("Reusable debugging guidance".to_string()),
2090 trust_level: KnowledgeTrustLevel::Promoted,
2091 metadata: None,
2092 created_at_ms: now,
2093 updated_at_ms: now,
2094 };
2095 db.upsert_knowledge_space(&space).await.unwrap();
2096
2097 let item = KnowledgeItemRecord {
2098 id: "item-promote-1".to_string(),
2099 space_id: space.id.clone(),
2100 coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
2101 dedupe_key: "dedupe-promote-1".to_string(),
2102 item_type: "decision".to_string(),
2103 title: "Delay startup-dependent retries".to_string(),
2104 summary: Some("Retry only after startup completed.".to_string()),
2105 payload: serde_json::json!({"action":"delay_retry"}),
2106 trust_level: KnowledgeTrustLevel::Working,
2107 status: KnowledgeItemStatus::Working,
2108 run_id: Some("run-1".to_string()),
2109 artifact_refs: vec!["artifact://run-1/debug".to_string()],
2110 source_memory_ids: vec!["memory-1".to_string()],
2111 freshness_expires_at_ms: None,
2112 metadata: None,
2113 created_at_ms: now,
2114 updated_at_ms: now,
2115 };
2116 db.upsert_knowledge_item(&item).await.unwrap();
2117
2118 let promote = KnowledgePromotionRequest {
2119 item_id: item.id.clone(),
2120 target_status: KnowledgeItemStatus::Promoted,
2121 promoted_at_ms: now + 10,
2122 freshness_expires_at_ms: Some(now + 86_400_000),
2123 reviewer_id: None,
2124 approval_id: None,
2125 reason: Some("validated in workflow".to_string()),
2126 };
2127
2128 let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
2129 assert!(result.promoted);
2130 assert_eq!(result.item.status, KnowledgeItemStatus::Promoted);
2131 assert_eq!(result.item.trust_level, KnowledgeTrustLevel::Promoted);
2132 assert_eq!(
2133 result.coverage.latest_item_id.as_deref(),
2134 Some("item-promote-1")
2135 );
2136 assert_eq!(
2137 result.coverage.latest_dedupe_key.as_deref(),
2138 Some("dedupe-promote-1")
2139 );
2140 assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 10));
2141 }
2142
2143 #[tokio::test]
2144 async fn test_knowledge_promotion_promoted_to_approved_default_requires_review() {
2145 let (db, _temp) = setup_test_db().await;
2146 let now = chrono::Utc::now().timestamp_millis() as u64;
2147
2148 let space = KnowledgeSpaceRecord {
2149 id: "space-promote-2".to_string(),
2150 scope: KnowledgeScope::Project,
2151 project_id: Some("project-1".to_string()),
2152 namespace: Some("marketing/positioning".to_string()),
2153 title: Some("Marketing positioning".to_string()),
2154 description: Some("Reusable positioning guidance".to_string()),
2155 trust_level: KnowledgeTrustLevel::Promoted,
2156 metadata: None,
2157 created_at_ms: now,
2158 updated_at_ms: now,
2159 };
2160 db.upsert_knowledge_space(&space).await.unwrap();
2161
2162 let item = KnowledgeItemRecord {
2163 id: "item-promote-2".to_string(),
2164 space_id: space.id.clone(),
2165 coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
2166 dedupe_key: "dedupe-promote-2".to_string(),
2167 item_type: "evidence".to_string(),
2168 title: "Pricing observation".to_string(),
2169 summary: Some("Annual pricing changes created friction".to_string()),
2170 payload: serde_json::json!({"claim":"pricing friction"}),
2171 trust_level: KnowledgeTrustLevel::Promoted,
2172 status: KnowledgeItemStatus::Promoted,
2173 run_id: Some("run-2".to_string()),
2174 artifact_refs: vec!["artifact://run-2/research".to_string()],
2175 source_memory_ids: vec!["memory-2".to_string()],
2176 freshness_expires_at_ms: None,
2177 metadata: None,
2178 created_at_ms: now,
2179 updated_at_ms: now,
2180 };
2181 db.upsert_knowledge_item(&item).await.unwrap();
2182
2183 let promote = KnowledgePromotionRequest {
2184 item_id: item.id.clone(),
2185 target_status: KnowledgeItemStatus::ApprovedDefault,
2186 promoted_at_ms: now + 5,
2187 freshness_expires_at_ms: None,
2188 reviewer_id: None,
2189 approval_id: None,
2190 reason: Some("should require review".to_string()),
2191 };
2192
2193 let err = db.promote_knowledge_item(&promote).await.unwrap_err();
2194 match err {
2195 MemoryError::InvalidConfig(_) => {}
2196 other => panic!("unexpected error: {}", other),
2197 }
2198 }
2199
2200 #[tokio::test]
2201 async fn test_knowledge_promotion_promoted_to_approved_default_updates_coverage() {
2202 let (db, _temp) = setup_test_db().await;
2203 let now = chrono::Utc::now().timestamp_millis() as u64;
2204
2205 let space = KnowledgeSpaceRecord {
2206 id: "space-promote-3".to_string(),
2207 scope: KnowledgeScope::Project,
2208 project_id: Some("project-1".to_string()),
2209 namespace: Some("support/runbooks".to_string()),
2210 title: Some("Support runbooks".to_string()),
2211 description: Some("Reusable runbook guidance".to_string()),
2212 trust_level: KnowledgeTrustLevel::Promoted,
2213 metadata: None,
2214 created_at_ms: now,
2215 updated_at_ms: now,
2216 };
2217 db.upsert_knowledge_space(&space).await.unwrap();
2218
2219 let item = KnowledgeItemRecord {
2220 id: "item-promote-3".to_string(),
2221 space_id: space.id.clone(),
2222 coverage_key: "project-1::support/runbooks::oncall::restart".to_string(),
2223 dedupe_key: "dedupe-promote-3".to_string(),
2224 item_type: "runbook".to_string(),
2225 title: "Restart service and verify".to_string(),
2226 summary: Some("Restart then validate health endpoint.".to_string()),
2227 payload: serde_json::json!({"steps":["restart","healthcheck"]}),
2228 trust_level: KnowledgeTrustLevel::Promoted,
2229 status: KnowledgeItemStatus::Promoted,
2230 run_id: Some("run-3".to_string()),
2231 artifact_refs: vec!["artifact://run-3/runbook".to_string()],
2232 source_memory_ids: vec!["memory-3".to_string()],
2233 freshness_expires_at_ms: None,
2234 metadata: None,
2235 created_at_ms: now,
2236 updated_at_ms: now,
2237 };
2238 db.upsert_knowledge_item(&item).await.unwrap();
2239
2240 let promote = KnowledgePromotionRequest {
2241 item_id: item.id.clone(),
2242 target_status: KnowledgeItemStatus::ApprovedDefault,
2243 promoted_at_ms: now + 12,
2244 freshness_expires_at_ms: Some(now + 172_800_000),
2245 reviewer_id: Some("reviewer-1".to_string()),
2246 approval_id: Some("approval-1".to_string()),
2247 reason: Some("reviewed by ops".to_string()),
2248 };
2249
2250 let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
2251 assert!(result.promoted);
2252 assert_eq!(result.item.status, KnowledgeItemStatus::ApprovedDefault);
2253 assert_eq!(
2254 result.item.trust_level,
2255 KnowledgeTrustLevel::ApprovedDefault
2256 );
2257 assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 12));
2258 assert_eq!(
2259 result.coverage.latest_item_id.as_deref(),
2260 Some("item-promote-3")
2261 );
2262 }
2263
2264 #[tokio::test]
2265 async fn test_knowledge_promotion_rejects_deprecated() {
2266 let (db, _temp) = setup_test_db().await;
2267 let now = chrono::Utc::now().timestamp_millis() as u64;
2268
2269 let space = KnowledgeSpaceRecord {
2270 id: "space-promote-4".to_string(),
2271 scope: KnowledgeScope::Project,
2272 project_id: Some("project-1".to_string()),
2273 namespace: Some("ops".to_string()),
2274 title: Some("Ops knowledge".to_string()),
2275 description: Some("Reusable ops knowledge".to_string()),
2276 trust_level: KnowledgeTrustLevel::Promoted,
2277 metadata: None,
2278 created_at_ms: now,
2279 updated_at_ms: now,
2280 };
2281 db.upsert_knowledge_space(&space).await.unwrap();
2282
2283 let item = KnowledgeItemRecord {
2284 id: "item-promote-4".to_string(),
2285 space_id: space.id.clone(),
2286 coverage_key: "project-1::ops::incident::latency".to_string(),
2287 dedupe_key: "dedupe-promote-4".to_string(),
2288 item_type: "decision".to_string(),
2289 title: "Ignore deprecated item".to_string(),
2290 summary: None,
2291 payload: serde_json::json!({"decision":"deprecated"}),
2292 trust_level: KnowledgeTrustLevel::Promoted,
2293 status: KnowledgeItemStatus::Deprecated,
2294 run_id: Some("run-4".to_string()),
2295 artifact_refs: vec![],
2296 source_memory_ids: vec![],
2297 freshness_expires_at_ms: None,
2298 metadata: None,
2299 created_at_ms: now,
2300 updated_at_ms: now,
2301 };
2302 db.upsert_knowledge_item(&item).await.unwrap();
2303
2304 let promote = KnowledgePromotionRequest {
2305 item_id: item.id.clone(),
2306 target_status: KnowledgeItemStatus::Promoted,
2307 promoted_at_ms: now + 1,
2308 freshness_expires_at_ms: None,
2309 reviewer_id: None,
2310 approval_id: None,
2311 reason: None,
2312 };
2313
2314 let err = db.promote_knowledge_item(&promote).await.unwrap_err();
2315 match err {
2316 MemoryError::InvalidConfig(_) => {}
2317 other => panic!("unexpected error: {}", other),
2318 }
2319 }
2320
2321 #[tokio::test]
2322 async fn test_knowledge_promotion_idempotent_updates_coverage() {
2323 let (db, _temp) = setup_test_db().await;
2324 let now = chrono::Utc::now().timestamp_millis() as u64;
2325
2326 let space = KnowledgeSpaceRecord {
2327 id: "space-promote-5".to_string(),
2328 scope: KnowledgeScope::Project,
2329 project_id: Some("project-1".to_string()),
2330 namespace: Some("engineering/ops".to_string()),
2331 title: Some("Engineering ops".to_string()),
2332 description: None,
2333 trust_level: KnowledgeTrustLevel::Promoted,
2334 metadata: None,
2335 created_at_ms: now,
2336 updated_at_ms: now,
2337 };
2338 db.upsert_knowledge_space(&space).await.unwrap();
2339
2340 let item = KnowledgeItemRecord {
2341 id: "item-promote-5".to_string(),
2342 space_id: space.id.clone(),
2343 coverage_key: "project-1::engineering/ops::deploy::guardrails".to_string(),
2344 dedupe_key: "dedupe-promote-5".to_string(),
2345 item_type: "pattern".to_string(),
2346 title: "Deploy guardrails".to_string(),
2347 summary: None,
2348 payload: serde_json::json!({"pattern":"guardrails"}),
2349 trust_level: KnowledgeTrustLevel::Promoted,
2350 status: KnowledgeItemStatus::Promoted,
2351 run_id: Some("run-5".to_string()),
2352 artifact_refs: vec![],
2353 source_memory_ids: vec![],
2354 freshness_expires_at_ms: None,
2355 metadata: None,
2356 created_at_ms: now,
2357 updated_at_ms: now,
2358 };
2359 db.upsert_knowledge_item(&item).await.unwrap();
2360
2361 let promote = KnowledgePromotionRequest {
2362 item_id: item.id.clone(),
2363 target_status: KnowledgeItemStatus::Promoted,
2364 promoted_at_ms: now + 20,
2365 freshness_expires_at_ms: None,
2366 reviewer_id: None,
2367 approval_id: None,
2368 reason: None,
2369 };
2370
2371 let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
2372 assert!(!result.promoted);
2373 assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 20));
2374 assert_eq!(
2375 result.coverage.latest_item_id.as_deref(),
2376 Some("item-promote-5")
2377 );
2378 }
2379
2380 #[tokio::test]
2381 async fn test_knowledge_item_promotion_updates_coverage() {
2382 let (db, _temp) = setup_test_db().await;
2383 let now = chrono::Utc::now().timestamp_millis() as u64;
2384
2385 let space = KnowledgeSpaceRecord {
2386 id: "space-promote".to_string(),
2387 scope: KnowledgeScope::Project,
2388 project_id: Some("project-1".to_string()),
2389 namespace: Some("engineering/debugging".to_string()),
2390 title: Some("Engineering debugging".to_string()),
2391 description: Some("Reusable debugging guidance".to_string()),
2392 trust_level: KnowledgeTrustLevel::Promoted,
2393 metadata: None,
2394 created_at_ms: now,
2395 updated_at_ms: now,
2396 };
2397 db.upsert_knowledge_space(&space).await.unwrap();
2398
2399 let item = KnowledgeItemRecord {
2400 id: "item-promote".to_string(),
2401 space_id: space.id.clone(),
2402 coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
2403 dedupe_key: "dedupe-promote".to_string(),
2404 item_type: "decision".to_string(),
2405 title: "Delay startup-dependent retries".to_string(),
2406 summary: Some("Retry only after startup completes.".to_string()),
2407 payload: serde_json::json!({"action": "delay_retry"}),
2408 trust_level: KnowledgeTrustLevel::Working,
2409 status: KnowledgeItemStatus::Working,
2410 run_id: Some("run-promote".to_string()),
2411 artifact_refs: vec!["artifact://run-promote/report".to_string()],
2412 source_memory_ids: vec!["memory-promote".to_string()],
2413 freshness_expires_at_ms: None,
2414 metadata: Some(serde_json::json!({"source_kind":"run"})),
2415 created_at_ms: now,
2416 updated_at_ms: now,
2417 };
2418 db.upsert_knowledge_item(&item).await.unwrap();
2419
2420 let request = KnowledgePromotionRequest {
2421 item_id: item.id.clone(),
2422 target_status: KnowledgeItemStatus::Promoted,
2423 promoted_at_ms: now + 10,
2424 freshness_expires_at_ms: Some(now + 86_400_000),
2425 reviewer_id: None,
2426 approval_id: None,
2427 reason: Some("validated".to_string()),
2428 };
2429 let promoted = db
2430 .promote_knowledge_item(&request)
2431 .await
2432 .unwrap()
2433 .expect("promotion result");
2434 assert_eq!(promoted.previous_status, KnowledgeItemStatus::Working);
2435 assert!(promoted.promoted);
2436 assert_eq!(promoted.item.status, KnowledgeItemStatus::Promoted);
2437 assert_eq!(promoted.item.trust_level, KnowledgeTrustLevel::Promoted);
2438 assert_eq!(
2439 promoted.item.freshness_expires_at_ms,
2440 Some(now + 86_400_000)
2441 );
2442 assert_eq!(
2443 promoted
2444 .item
2445 .metadata
2446 .as_ref()
2447 .and_then(|value| value.get("promotion"))
2448 .and_then(|value| value.get("to_status"))
2449 .and_then(Value::as_str),
2450 Some("promoted")
2451 );
2452 assert_eq!(
2453 promoted.coverage.latest_item_id.as_deref(),
2454 Some("item-promote")
2455 );
2456 assert_eq!(
2457 promoted.coverage.latest_dedupe_key.as_deref(),
2458 Some("dedupe-promote")
2459 );
2460 assert_eq!(promoted.coverage.last_promoted_at_ms, Some(now + 10));
2461 assert_eq!(
2462 promoted.coverage.freshness_expires_at_ms,
2463 Some(now + 86_400_000)
2464 );
2465
2466 let loaded = db
2467 .get_knowledge_item("item-promote")
2468 .await
2469 .unwrap()
2470 .unwrap();
2471 assert_eq!(loaded.status, KnowledgeItemStatus::Promoted);
2472 assert_eq!(
2473 loaded
2474 .metadata
2475 .as_ref()
2476 .and_then(|value| value.get("promotion"))
2477 .and_then(|value| value.get("from_status"))
2478 .and_then(Value::as_str),
2479 Some("working")
2480 );
2481 }
2482
2483 #[tokio::test]
2484 async fn test_knowledge_item_approved_default_requires_review() {
2485 let (db, _temp) = setup_test_db().await;
2486 let now = chrono::Utc::now().timestamp_millis() as u64;
2487
2488 let space = KnowledgeSpaceRecord {
2489 id: "space-approved".to_string(),
2490 scope: KnowledgeScope::Project,
2491 project_id: Some("project-1".to_string()),
2492 namespace: Some("marketing/positioning".to_string()),
2493 title: Some("Marketing positioning".to_string()),
2494 description: Some("Reusable positioning guidance".to_string()),
2495 trust_level: KnowledgeTrustLevel::Promoted,
2496 metadata: None,
2497 created_at_ms: now,
2498 updated_at_ms: now,
2499 };
2500 db.upsert_knowledge_space(&space).await.unwrap();
2501
2502 let item = KnowledgeItemRecord {
2503 id: "item-approved".to_string(),
2504 space_id: space.id.clone(),
2505 coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
2506 dedupe_key: "dedupe-approved".to_string(),
2507 item_type: "evidence".to_string(),
2508 title: "Pricing sensitivity observation".to_string(),
2509 summary: Some("Customers reacted to annual pricing changes".to_string()),
2510 payload: serde_json::json!({"claim":"Annual pricing changes created friction"}),
2511 trust_level: KnowledgeTrustLevel::Promoted,
2512 status: KnowledgeItemStatus::Promoted,
2513 run_id: Some("run-approved".to_string()),
2514 artifact_refs: vec!["artifact://run-approved/research".to_string()],
2515 source_memory_ids: vec!["memory-approved".to_string()],
2516 freshness_expires_at_ms: Some(now + 1234),
2517 metadata: None,
2518 created_at_ms: now,
2519 updated_at_ms: now,
2520 };
2521 db.upsert_knowledge_item(&item).await.unwrap();
2522
2523 let request = KnowledgePromotionRequest {
2524 item_id: item.id.clone(),
2525 target_status: KnowledgeItemStatus::ApprovedDefault,
2526 promoted_at_ms: now + 20,
2527 freshness_expires_at_ms: Some(now + 90_000_000),
2528 reviewer_id: Some("reviewer-1".to_string()),
2529 approval_id: Some("approval-1".to_string()),
2530 reason: Some("approved as default guidance".to_string()),
2531 };
2532 let promoted = db
2533 .promote_knowledge_item(&request)
2534 .await
2535 .unwrap()
2536 .expect("promotion result");
2537 assert_eq!(promoted.previous_status, KnowledgeItemStatus::Promoted);
2538 assert_eq!(promoted.item.status, KnowledgeItemStatus::ApprovedDefault);
2539 assert_eq!(
2540 promoted.item.trust_level,
2541 KnowledgeTrustLevel::ApprovedDefault
2542 );
2543 assert_eq!(promoted.coverage.last_promoted_at_ms, Some(now + 20));
2544 assert_eq!(
2545 promoted
2546 .item
2547 .metadata
2548 .as_ref()
2549 .and_then(|value| value.get("promotion"))
2550 .and_then(|value| value.get("approval_id"))
2551 .and_then(Value::as_str),
2552 Some("approval-1")
2553 );
2554 }
2555
2556 #[tokio::test]
2557 async fn test_knowledge_item_promotion_rejects_invalid_transition() {
2558 let (db, _temp) = setup_test_db().await;
2559 let now = chrono::Utc::now().timestamp_millis() as u64;
2560
2561 let space = KnowledgeSpaceRecord {
2562 id: "space-invalid".to_string(),
2563 scope: KnowledgeScope::Project,
2564 project_id: Some("project-1".to_string()),
2565 namespace: Some("support".to_string()),
2566 title: Some("Support".to_string()),
2567 description: Some("Support guidance".to_string()),
2568 trust_level: KnowledgeTrustLevel::Promoted,
2569 metadata: None,
2570 created_at_ms: now,
2571 updated_at_ms: now,
2572 };
2573 db.upsert_knowledge_space(&space).await.unwrap();
2574
2575 let item = KnowledgeItemRecord {
2576 id: "item-invalid".to_string(),
2577 space_id: space.id.clone(),
2578 coverage_key: "project-1::support::workflow::triage".to_string(),
2579 dedupe_key: "dedupe-invalid".to_string(),
2580 item_type: "decision".to_string(),
2581 title: "Triage first".to_string(),
2582 summary: None,
2583 payload: serde_json::json!({"action":"triage"}),
2584 trust_level: KnowledgeTrustLevel::Working,
2585 status: KnowledgeItemStatus::Working,
2586 run_id: Some("run-invalid".to_string()),
2587 artifact_refs: vec![],
2588 source_memory_ids: vec![],
2589 freshness_expires_at_ms: None,
2590 metadata: None,
2591 created_at_ms: now,
2592 updated_at_ms: now,
2593 };
2594 db.upsert_knowledge_item(&item).await.unwrap();
2595
2596 let request = KnowledgePromotionRequest {
2597 item_id: item.id.clone(),
2598 target_status: KnowledgeItemStatus::ApprovedDefault,
2599 promoted_at_ms: now + 1,
2600 freshness_expires_at_ms: None,
2601 reviewer_id: Some("reviewer-1".to_string()),
2602 approval_id: Some("approval-1".to_string()),
2603 reason: Some("should fail".to_string()),
2604 };
2605 let err = db.promote_knowledge_item(&request).await.unwrap_err();
2606 assert!(matches!(err, MemoryError::InvalidConfig(_)));
2607 let loaded = db.get_knowledge_item(&item.id).await.unwrap().unwrap();
2608 assert_eq!(loaded.status, KnowledgeItemStatus::Working);
2609 }
2610}