#![allow(clippy::all)]
use crate::types::{
ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
KnowledgeCoverageRecord, KnowledgeItemRecord, KnowledgeItemStatus, KnowledgePromotionRequest,
KnowledgePromotionResult, KnowledgeSpaceRecord, MemoryChunk, MemoryConfig, MemoryError,
MemoryResult, MemoryStats, MemoryTenantScope, MemoryTier, ProjectMemoryStats,
SourceObjectLifecycleRecord, SourceObjectLifecycleState, DEFAULT_EMBEDDING_DIMENSION,
};
use chrono::{DateTime, Utc};
use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
use sqlite_vec::sqlite3_vec_init;
use std::collections::HashSet;
use std::path::Path;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use tokio::sync::Mutex;
type ProjectIndexStatusRow = (
Option<String>,
Option<i64>,
Option<i64>,
Option<i64>,
Option<i64>,
Option<i64>,
);
pub struct MemoryDatabase {
conn: Arc<Mutex<Connection>>,
db_path: std::path::PathBuf,
}
static SCHEMA_INIT_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
include!("memory_database_impl_parts/part01.rs");
include!("memory_database_impl_parts/part02.rs");
fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
let id: String = row.get(0)?;
let content: String = row.get(1)?;
let (session_id, project_id, source_idx, created_at_idx, token_count_idx, metadata_idx) =
match tier {
MemoryTier::Session => (
Some(row.get(2)?),
row.get(3)?,
4usize,
5usize,
6usize,
7usize,
),
MemoryTier::Project => (
row.get(2)?,
Some(row.get(3)?),
4usize,
5usize,
6usize,
7usize,
),
MemoryTier::Global => (None, None, 2usize, 3usize, 4usize, 5usize),
};
let source: String = row.get(source_idx)?;
let created_at_str: String = row.get(created_at_idx)?;
let token_count: i64 = row.get(token_count_idx)?;
let metadata_str: Option<String> = row.get(metadata_idx)?;
let created_at = DateTime::parse_from_rfc3339(&created_at_str)
.map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
})?
.with_timezone(&Utc);
let metadata = metadata_str
.filter(|s| !s.is_empty())
.and_then(|s| serde_json::from_str(&s).ok());
let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
let tenant_scope = MemoryTenantScope {
org_id: row
.get::<_, Option<String>>("tenant_org_id")
.ok()
.flatten()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| LOCAL_TENANT_ORG_ID.to_string()),
workspace_id: row
.get::<_, Option<String>>("tenant_workspace_id")
.ok()
.flatten()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| LOCAL_TENANT_WORKSPACE_ID.to_string()),
deployment_id: row
.get::<_, Option<String>>("tenant_deployment_id")
.ok()
.flatten()
.filter(|value| !value.trim().is_empty()),
};
Ok(MemoryChunk {
id,
content,
tier,
session_id,
project_id,
source,
source_path,
source_mtime,
source_size,
source_hash,
tenant_scope,
created_at,
token_count,
metadata,
})
}
fn require_scope_id<'a>(tier: MemoryTier, scope: Option<&'a str>) -> MemoryResult<&'a str> {
scope
.filter(|value| !value.trim().is_empty())
.ok_or_else(|| {
crate::types::MemoryError::InvalidConfig(match tier {
MemoryTier::Session => "tier=session requires session_id".to_string(),
MemoryTier::Project => "tier=project requires project_id".to_string(),
MemoryTier::Global => "tier=global does not require a scope id".to_string(),
})
})
}
const LOCAL_TENANT_ORG_ID: &str = "local";
const LOCAL_TENANT_WORKSPACE_ID: &str = "local";
fn tenant_scope_matches_sql_clause(prefix: &str, first_param: usize) -> String {
format!(
"{prefix}.tenant_org_id = ?{first_param} AND {prefix}.tenant_workspace_id = ?{} AND IFNULL({prefix}.tenant_deployment_id, '') = IFNULL(?{}, '')",
first_param + 1,
first_param + 2
)
}
fn global_memory_record_tenant_scope(
record: &GlobalMemoryRecord,
) -> (String, String, Option<String>) {
record
.provenance
.as_ref()
.and_then(|value| value.get("tenant_context"))
.and_then(memory_tenant_scope_from_value)
.unwrap_or_else(|| {
(
LOCAL_TENANT_ORG_ID.to_string(),
LOCAL_TENANT_WORKSPACE_ID.to_string(),
None,
)
})
}
fn memory_tenant_scope_from_value(
value: &serde_json::Value,
) -> Option<(String, String, Option<String>)> {
let org_id = value.get("org_id")?.as_str()?.to_string();
let workspace_id = value.get("workspace_id")?.as_str()?.to_string();
let deployment_id = value
.get("deployment_id")
.and_then(|value| value.as_str())
.map(ToString::to_string);
Some((org_id, workspace_id, deployment_id))
}
fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
let metadata_str: Option<String> = row.get(12)?;
let provenance_str: Option<String> = row.get(13)?;
Ok(GlobalMemoryRecord {
id: row.get(0)?,
user_id: row.get(1)?,
source_type: row.get(2)?,
content: row.get(3)?,
content_hash: row.get(4)?,
run_id: row.get(5)?,
session_id: row.get(6)?,
message_id: row.get(7)?,
tool_name: row.get(8)?,
project_tag: row.get(9)?,
channel_tag: row.get(10)?,
host_tag: row.get(11)?,
metadata: metadata_str
.filter(|s| !s.is_empty())
.and_then(|s| serde_json::from_str(&s).ok()),
provenance: provenance_str
.filter(|s| !s.is_empty())
.and_then(|s| serde_json::from_str(&s).ok()),
redaction_status: row.get(14)?,
redaction_count: row.get::<_, i64>(15)? as u32,
visibility: row.get(16)?,
demoted: row.get::<_, i64>(17)? != 0,
score_boost: row.get(18)?,
created_at_ms: row.get::<_, i64>(19)? as u64,
updated_at_ms: row.get::<_, i64>(20)? as u64,
expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
})
}
fn row_to_source_object_lifecycle(
row: &Row,
) -> Result<SourceObjectLifecycleRecord, rusqlite::Error> {
let metadata_str: Option<String> = row.get("metadata")?;
let resource_ref_str: String = row.get("resource_ref")?;
let tenant_scope = MemoryTenantScope {
org_id: row.get("tenant_org_id")?,
workspace_id: row.get("tenant_workspace_id")?,
deployment_id: row
.get::<_, Option<String>>("tenant_deployment_id")?
.filter(|value| !value.is_empty()),
};
let tier = match row.get::<_, String>("tier")?.as_str() {
"session" => MemoryTier::Session,
"project" => MemoryTier::Project,
_ => MemoryTier::Global,
};
Ok(SourceObjectLifecycleRecord {
source_object_id: row.get("source_object_id")?,
tenant_scope,
source_binding_id: row.get("source_binding_id")?,
connector_id: row.get("connector_id")?,
state: SourceObjectLifecycleState::parse(&row.get::<_, String>("state")?),
tier,
session_id: row.get("session_id")?,
project_id: row.get("project_id")?,
import_namespace: row.get("import_namespace")?,
indexed_path: row.get("indexed_path")?,
native_object_id: row.get("native_object_id")?,
resource_ref: serde_json::from_str(&resource_ref_str).unwrap_or(serde_json::Value::Null),
data_class: row.get("data_class")?,
content_hash: row.get("content_hash")?,
source_hash: row.get("source_hash")?,
first_seen_at_ms: row.get::<_, i64>("first_seen_at_ms")? as u64,
last_seen_at_ms: row.get::<_, i64>("last_seen_at_ms")? as u64,
tombstoned_at_ms: row
.get::<_, Option<i64>>("tombstoned_at_ms")?
.map(|value| value as u64),
metadata: metadata_str
.filter(|value| !value.is_empty())
.and_then(|value| serde_json::from_str(&value).ok()),
})
}
impl MemoryDatabase {
pub async fn get_node_by_uri(
&self,
uri: &str,
) -> MemoryResult<Option<crate::types::MemoryNode>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
FROM memory_nodes WHERE uri = ?1",
)?;
let result = stmt.query_row(params![uri], |row| {
let node_type_str: String = row.get(3)?;
let node_type = node_type_str
.parse()
.unwrap_or(crate::types::NodeType::File);
let metadata_str: Option<String> = row.get(6)?;
Ok(crate::types::MemoryNode {
id: row.get(0)?,
uri: row.get(1)?,
parent_uri: row.get(2)?,
node_type,
created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
})
});
match result {
Ok(node) => Ok(Some(node)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(MemoryError::Database(e)),
}
}
pub async fn create_node(
&self,
uri: &str,
parent_uri: Option<&str>,
node_type: crate::types::NodeType,
metadata: Option<&serde_json::Value>,
) -> MemoryResult<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let metadata_str = metadata.map(|m| serde_json::to_string(m)).transpose()?;
let conn = self.conn.lock().await;
conn.execute(
"INSERT INTO memory_nodes (id, uri, parent_uri, node_type, created_at, updated_at, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![id, uri, parent_uri, node_type.to_string(), now, now, metadata_str],
)?;
Ok(id)
}
pub async fn list_directory(&self, uri: &str) -> MemoryResult<Vec<crate::types::MemoryNode>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
FROM memory_nodes WHERE parent_uri = ?1 ORDER BY node_type DESC, uri ASC",
)?;
let rows = stmt.query_map(params![uri], |row| {
let node_type_str: String = row.get(3)?;
let node_type = node_type_str
.parse()
.unwrap_or(crate::types::NodeType::File);
let metadata_str: Option<String> = row.get(6)?;
Ok(crate::types::MemoryNode {
id: row.get(0)?,
uri: row.get(1)?,
parent_uri: row.get(2)?,
node_type,
created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
})
})?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(MemoryError::Database)
}
pub async fn get_layer(
&self,
node_id: &str,
layer_type: crate::types::LayerType,
) -> MemoryResult<Option<crate::types::MemoryLayer>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, node_id, layer_type, content, token_count, embedding_id, created_at, source_chunk_id
FROM memory_layers WHERE node_id = ?1 AND layer_type = ?2"
)?;
let result = stmt.query_row(params![node_id, layer_type.to_string()], |row| {
let layer_type_str: String = row.get(2)?;
let layer_type = layer_type_str
.parse()
.unwrap_or(crate::types::LayerType::L2);
Ok(crate::types::MemoryLayer {
id: row.get(0)?,
node_id: row.get(1)?,
layer_type,
content: row.get(3)?,
token_count: row.get(4)?,
embedding_id: row.get(5)?,
created_at: row.get::<_, String>(6)?.parse().unwrap_or_default(),
source_chunk_id: row.get(7)?,
})
});
match result {
Ok(layer) => Ok(Some(layer)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(MemoryError::Database(e)),
}
}
pub async fn create_layer(
&self,
node_id: &str,
layer_type: crate::types::LayerType,
content: &str,
token_count: i64,
source_chunk_id: Option<&str>,
) -> MemoryResult<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let conn = self.conn.lock().await;
conn.execute(
"INSERT INTO memory_layers (id, node_id, layer_type, content, token_count, created_at, source_chunk_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![id, node_id, layer_type.to_string(), content, token_count, now, source_chunk_id],
)?;
Ok(id)
}
pub async fn get_children_tree(
&self,
parent_uri: &str,
max_depth: usize,
) -> MemoryResult<Vec<crate::types::TreeNode>> {
if max_depth == 0 {
return Ok(Vec::new());
}
let children = self.list_directory(parent_uri).await?;
let mut tree_nodes = Vec::new();
for child in children {
let layer_summary = self.get_layer_summary(&child.id).await?;
let grandchildren = if child.node_type == crate::types::NodeType::Directory {
Box::pin(self.get_children_tree(&child.uri, max_depth.saturating_sub(1))).await?
} else {
Vec::new()
};
tree_nodes.push(crate::types::TreeNode {
node: child,
children: grandchildren,
layer_summary,
});
}
Ok(tree_nodes)
}
async fn get_layer_summary(
&self,
node_id: &str,
) -> MemoryResult<Option<crate::types::LayerSummary>> {
let l0 = self.get_layer(node_id, crate::types::LayerType::L0).await?;
let l1 = self.get_layer(node_id, crate::types::LayerType::L1).await?;
let has_l2 = self
.get_layer(node_id, crate::types::LayerType::L2)
.await?
.is_some();
if l0.is_none() && l1.is_none() && !has_l2 {
return Ok(None);
}
Ok(Some(crate::types::LayerSummary {
l0_preview: l0.map(|l| truncate_string(&l.content, 100)),
l1_preview: l1.map(|l| truncate_string(&l.content, 200)),
has_l2,
}))
}
pub async fn node_exists(&self, uri: &str) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM memory_nodes WHERE uri = ?1",
params![uri],
|row| row.get(0),
)?;
Ok(count > 0)
}
}
fn row_to_knowledge_space(row: &Row) -> Result<KnowledgeSpaceRecord, rusqlite::Error> {
let scope = row
.get::<_, String>(1)?
.parse()
.unwrap_or(tandem_orchestrator::KnowledgeScope::Project);
let trust_level = row
.get::<_, String>(6)?
.parse()
.unwrap_or(tandem_orchestrator::KnowledgeTrustLevel::Promoted);
let metadata = row
.get::<_, Option<String>>(7)?
.and_then(|raw| serde_json::from_str(&raw).ok());
Ok(KnowledgeSpaceRecord {
id: row.get(0)?,
scope,
project_id: row.get(2)?,
namespace: row.get(3)?,
title: row.get(4)?,
description: row.get(5)?,
trust_level,
metadata,
created_at_ms: row.get::<_, i64>(8)? as u64,
updated_at_ms: row.get::<_, i64>(9)? as u64,
})
}
fn row_to_knowledge_item(row: &Row) -> Result<KnowledgeItemRecord, rusqlite::Error> {
let trust_level = row
.get::<_, String>(8)?
.parse()
.unwrap_or(tandem_orchestrator::KnowledgeTrustLevel::Promoted);
let status = row
.get::<_, String>(9)?
.parse()
.unwrap_or(KnowledgeItemStatus::Working);
let payload = row
.get::<_, String>(7)
.ok()
.and_then(|raw| serde_json::from_str(&raw).ok())
.unwrap_or(serde_json::Value::Null);
let artifact_refs = row
.get::<_, String>(11)
.ok()
.and_then(|raw| serde_json::from_str(&raw).ok())
.unwrap_or_default();
let source_memory_ids = row
.get::<_, String>(12)
.ok()
.and_then(|raw| serde_json::from_str(&raw).ok())
.unwrap_or_default();
let metadata = row
.get::<_, Option<String>>(14)?
.and_then(|raw| serde_json::from_str(&raw).ok());
Ok(KnowledgeItemRecord {
id: row.get(0)?,
space_id: row.get(1)?,
coverage_key: row.get(2)?,
dedupe_key: row.get(3)?,
item_type: row.get(4)?,
title: row.get(5)?,
summary: row.get(6)?,
payload,
trust_level,
status,
run_id: row.get(10)?,
artifact_refs,
source_memory_ids,
freshness_expires_at_ms: row.get::<_, Option<i64>>(13)?.map(|value| value as u64),
metadata,
created_at_ms: row.get::<_, i64>(15)? as u64,
updated_at_ms: row.get::<_, i64>(16)? as u64,
})
}
fn row_to_knowledge_coverage(row: &Row) -> Result<KnowledgeCoverageRecord, rusqlite::Error> {
let metadata = row
.get::<_, Option<String>>(7)?
.and_then(|raw| serde_json::from_str(&raw).ok());
Ok(KnowledgeCoverageRecord {
coverage_key: row.get(0)?,
space_id: row.get(1)?,
latest_item_id: row.get(2)?,
latest_dedupe_key: row.get(3)?,
last_seen_at_ms: row.get::<_, i64>(4)? as u64,
last_promoted_at_ms: row.get::<_, Option<i64>>(5)?.map(|value| value as u64),
freshness_expires_at_ms: row.get::<_, Option<i64>>(6)?.map(|value| value as u64),
metadata,
})
}
fn truncate_string(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
format!("{}...", &s[..max_len.saturating_sub(3)])
}
}
fn build_fts_query(query: &str) -> String {
let tokens = query
.split_whitespace()
.filter_map(|tok| {
let cleaned =
tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
if cleaned.is_empty() {
None
} else {
Some(format!("\"{}\"", cleaned))
}
})
.collect::<Vec<_>>();
if tokens.is_empty() {
"\"\"".to_string()
} else {
tokens.join(" OR ")
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::Value;
use tandem_orchestrator::{KnowledgeScope, KnowledgeTrustLevel};
use tempfile::TempDir;
async fn setup_test_db() -> (MemoryDatabase, TempDir) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test_memory.db");
let db = MemoryDatabase::new(&db_path).await.unwrap();
(db, temp_dir)
}
fn tenant_scope(org_id: &str, workspace_id: &str) -> MemoryTenantScope {
MemoryTenantScope {
org_id: org_id.to_string(),
workspace_id: workspace_id.to_string(),
deployment_id: Some("deployment-1".to_string()),
}
}
fn test_vector_chunk(
id: &str,
tier: MemoryTier,
tenant_scope: MemoryTenantScope,
content: &str,
source_hash: Option<&str>,
) -> MemoryChunk {
MemoryChunk {
id: id.to_string(),
content: content.to_string(),
tier,
session_id: Some("shared-session".to_string()),
project_id: Some("shared-project".to_string()),
source: "test_vector".to_string(),
source_path: None,
source_mtime: None,
source_size: None,
source_hash: source_hash.map(ToString::to_string),
tenant_scope,
created_at: Utc::now(),
token_count: 4,
metadata: None,
}
}
fn embedding(first: f32, second: f32) -> Vec<f32> {
let mut values = vec![0.0f32; DEFAULT_EMBEDDING_DIMENSION];
values[0] = first;
values[1] = second;
values
}
fn source_object_record(
source_object_id: &str,
tenant_scope: MemoryTenantScope,
) -> SourceObjectLifecycleRecord {
SourceObjectLifecycleRecord {
source_object_id: source_object_id.to_string(),
tenant_scope,
source_binding_id: "shared-binding".to_string(),
connector_id: "manual_upload".to_string(),
state: SourceObjectLifecycleState::Active,
tier: MemoryTier::Global,
session_id: None,
project_id: None,
import_namespace: "shared-import".to_string(),
indexed_path: "shared-import/note.md".to_string(),
native_object_id: "shared-import/note.md".to_string(),
resource_ref: serde_json::json!({
"organization_id": "org-a",
"workspace_id": "workspace-a",
"resource_kind": "document_collection",
"resource_id": "shared-docs"
}),
data_class: "internal".to_string(),
content_hash: Some("content-hash".to_string()),
source_hash: Some("source-hash".to_string()),
first_seen_at_ms: 1_000,
last_seen_at_ms: 1_000,
tombstoned_at_ms: None,
metadata: None,
}
}
#[tokio::test]
async fn test_init_schema() {
let (db, _temp) = setup_test_db().await;
let stats = db.get_stats().await.unwrap();
assert_eq!(stats.total_chunks, 0);
}
#[tokio::test]
async fn source_object_lifecycle_native_ids_are_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
db.upsert_source_object_active_for_tenant(&source_object_record(
"source-object-a",
tenant_a.clone(),
))
.await
.unwrap();
db.upsert_source_object_active_for_tenant(&source_object_record(
"source-object-b",
tenant_b.clone(),
))
.await
.unwrap();
let object_a = db
.get_source_object_lifecycle_by_native_for_tenant(
&tenant_a,
"shared-binding",
"shared-import/note.md",
)
.await
.unwrap()
.expect("tenant A source object");
let object_b = db
.get_source_object_lifecycle_by_native_for_tenant(
&tenant_b,
"shared-binding",
"shared-import/note.md",
)
.await
.unwrap()
.expect("tenant B source object");
assert_eq!(object_a.source_object_id, "source-object-a");
assert_eq!(object_b.source_object_id, "source-object-b");
assert_ne!(object_a.source_object_id, object_b.source_object_id);
}
#[tokio::test]
async fn test_knowledge_registry_roundtrip() {
let (db, _temp) = setup_test_db().await;
let space = KnowledgeSpaceRecord {
id: "space-1".to_string(),
scope: tandem_orchestrator::KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("support".to_string()),
title: Some("Support Knowledge".to_string()),
description: Some("Reusable support guidance".to_string()),
trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
metadata: Some(serde_json::json!({"owner": "ops"})),
created_at_ms: 1,
updated_at_ms: 2,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-1".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1/support/debugging/slow-start".to_string(),
dedupe_key: "dedupe-1".to_string(),
item_type: "decision".to_string(),
title: "Restart service before retry".to_string(),
summary: Some("When the service is stale, restart before retrying.".to_string()),
payload: serde_json::json!({"action": "restart"}),
trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
status: KnowledgeItemStatus::Promoted,
run_id: Some("run-1".to_string()),
artifact_refs: vec!["artifact://run-1/report".to_string()],
source_memory_ids: vec!["memory-1".to_string()],
freshness_expires_at_ms: Some(10),
metadata: Some(serde_json::json!({"source": "run"})),
created_at_ms: 3,
updated_at_ms: 4,
};
db.upsert_knowledge_item(&item).await.unwrap();
let coverage = KnowledgeCoverageRecord {
coverage_key: item.coverage_key.clone(),
space_id: space.id.clone(),
latest_item_id: Some(item.id.clone()),
latest_dedupe_key: Some(item.dedupe_key.clone()),
last_seen_at_ms: 5,
last_promoted_at_ms: Some(6),
freshness_expires_at_ms: Some(10),
metadata: Some(serde_json::json!({"coverage": true})),
};
db.upsert_knowledge_coverage(&coverage).await.unwrap();
let loaded_space = db.get_knowledge_space(&space.id).await.unwrap().unwrap();
assert_eq!(loaded_space.namespace.as_deref(), Some("support"));
let loaded_items = db
.list_knowledge_items(&space.id, Some(&item.coverage_key))
.await
.unwrap();
assert_eq!(loaded_items.len(), 1);
assert_eq!(loaded_items[0].title, item.title);
let loaded_coverage = db
.get_knowledge_coverage(&item.coverage_key, &space.id)
.await
.unwrap()
.unwrap();
assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
}
#[tokio::test]
async fn test_knowledge_registry_is_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let mut space_a = KnowledgeSpaceRecord {
id: "tenant-a-space".to_string(),
scope: tandem_orchestrator::KnowledgeScope::Project,
project_id: Some("shared-project".to_string()),
namespace: Some("shared-namespace".to_string()),
title: Some("Tenant A Knowledge".to_string()),
description: None,
trust_level: tandem_orchestrator::KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: 1,
updated_at_ms: 2,
};
let mut space_b = space_a.clone();
space_b.id = "tenant-b-space".to_string();
space_b.title = Some("Tenant B Knowledge".to_string());
db.upsert_knowledge_space_for_tenant(&space_a, &tenant_a)
.await
.unwrap();
db.upsert_knowledge_space_for_tenant(&space_b, &tenant_b)
.await
.unwrap();
let spaces_a = db
.list_knowledge_spaces_for_tenant(Some("shared-project"), &tenant_a)
.await
.unwrap();
let spaces_b = db
.list_knowledge_spaces_for_tenant(Some("shared-project"), &tenant_b)
.await
.unwrap();
assert_eq!(spaces_a.len(), 1);
assert_eq!(spaces_a[0].id, "tenant-a-space");
assert_eq!(spaces_b.len(), 1);
assert_eq!(spaces_b[0].id, "tenant-b-space");
assert!(db
.get_knowledge_space_for_tenant("tenant-b-space", &tenant_a)
.await
.unwrap()
.is_none());
let item_b = KnowledgeItemRecord {
id: "tenant-b-item".to_string(),
space_id: space_b.id.clone(),
coverage_key: "shared-project/topic/debugging".to_string(),
dedupe_key: "shared-dedupe".to_string(),
item_type: "decision".to_string(),
title: "Tenant B item".to_string(),
summary: None,
payload: serde_json::json!({"tenant": "b"}),
trust_level: tandem_orchestrator::KnowledgeTrustLevel::Working,
status: KnowledgeItemStatus::Working,
run_id: Some("run-b".to_string()),
artifact_refs: Vec::new(),
source_memory_ids: Vec::new(),
freshness_expires_at_ms: None,
metadata: None,
created_at_ms: 3,
updated_at_ms: 4,
};
db.upsert_knowledge_item_for_tenant(&item_b, &tenant_b)
.await
.unwrap();
assert!(db
.upsert_knowledge_item_for_tenant(&item_b, &tenant_a)
.await
.is_err());
assert!(db
.get_knowledge_item_for_tenant("tenant-b-item", &tenant_a)
.await
.unwrap()
.is_none());
assert!(db
.list_knowledge_items_for_tenant(&space_b.id, None, &tenant_a)
.await
.unwrap()
.is_empty());
assert_eq!(
db.list_knowledge_items_for_tenant(&space_b.id, None, &tenant_b)
.await
.unwrap()
.len(),
1
);
let coverage_b = KnowledgeCoverageRecord {
coverage_key: item_b.coverage_key.clone(),
space_id: space_b.id.clone(),
latest_item_id: Some(item_b.id.clone()),
latest_dedupe_key: Some(item_b.dedupe_key.clone()),
last_seen_at_ms: 5,
last_promoted_at_ms: None,
freshness_expires_at_ms: None,
metadata: None,
};
db.upsert_knowledge_coverage_for_tenant(&coverage_b, &tenant_b)
.await
.unwrap();
assert!(db
.upsert_knowledge_coverage_for_tenant(&coverage_b, &tenant_a)
.await
.is_err());
assert!(db
.get_knowledge_coverage_for_tenant(&coverage_b.coverage_key, &space_b.id, &tenant_a)
.await
.unwrap()
.is_none());
assert!(db
.get_knowledge_coverage_for_tenant(&coverage_b.coverage_key, &space_b.id, &tenant_b)
.await
.unwrap()
.is_some());
let promote = KnowledgePromotionRequest {
item_id: item_b.id.clone(),
target_status: KnowledgeItemStatus::Promoted,
promoted_at_ms: 10,
freshness_expires_at_ms: None,
reviewer_id: None,
approval_id: None,
reason: None,
};
assert!(db
.promote_knowledge_item_for_tenant(&promote, &tenant_a)
.await
.unwrap()
.is_none());
assert!(db
.promote_knowledge_item_for_tenant(&promote, &tenant_b)
.await
.unwrap()
.is_some());
space_a.updated_at_ms = 11;
db.upsert_knowledge_space_for_tenant(&space_a, &tenant_a)
.await
.unwrap();
}
#[tokio::test]
async fn test_store_and_retrieve_chunk() {
let (db, _temp) = setup_test_db().await;
let chunk = MemoryChunk {
id: "test-1".to_string(),
content: "Test content".to_string(),
tier: MemoryTier::Session,
session_id: Some("session-1".to_string()),
project_id: Some("project-1".to_string()),
source: "user_message".to_string(),
source_path: None,
source_mtime: None,
source_size: None,
source_hash: None,
tenant_scope: MemoryTenantScope::local(),
created_at: Utc::now(),
token_count: 10,
metadata: None,
};
let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
db.store_chunk(&chunk, &embedding).await.unwrap();
let chunks = db.get_session_chunks("session-1").await.unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].content, "Test content");
}
#[tokio::test]
async fn test_store_and_retrieve_global_chunk() {
let (db, _temp) = setup_test_db().await;
let chunk = MemoryChunk {
id: "global-1".to_string(),
content: "Global note".to_string(),
tier: MemoryTier::Global,
session_id: None,
project_id: None,
source: "agent_note".to_string(),
source_path: None,
source_mtime: None,
source_size: None,
source_hash: None,
tenant_scope: MemoryTenantScope::local(),
created_at: Utc::now(),
token_count: 7,
metadata: Some(serde_json::json!({"kind":"test"})),
};
let embedding = vec![0.2f32; DEFAULT_EMBEDDING_DIMENSION];
db.store_chunk(&chunk, &embedding).await.unwrap();
let chunks = db.get_global_chunks(10).await.unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].content, "Global note");
assert_eq!(chunks[0].source, "agent_note");
assert_eq!(chunks[0].token_count, 7);
assert_eq!(chunks[0].tier, MemoryTier::Global);
}
#[tokio::test]
async fn test_global_chunk_exists_by_source_hash() {
let (db, _temp) = setup_test_db().await;
let chunk = MemoryChunk {
id: "global-hash".to_string(),
content: "Global hash note".to_string(),
tier: MemoryTier::Global,
session_id: None,
project_id: None,
source: "chat_exchange".to_string(),
source_path: None,
source_mtime: None,
source_size: None,
source_hash: Some("hash-123".to_string()),
tenant_scope: MemoryTenantScope::local(),
created_at: Utc::now(),
token_count: 5,
metadata: None,
};
let embedding = vec![0.3f32; DEFAULT_EMBEDDING_DIMENSION];
db.store_chunk(&chunk, &embedding).await.unwrap();
assert!(db
.global_chunk_exists_by_source_hash("hash-123")
.await
.unwrap());
assert!(!db
.global_chunk_exists_by_source_hash("missing-hash")
.await
.unwrap());
}
#[tokio::test]
async fn test_vector_search_is_tenant_partitioned_before_top_k() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let query = embedding(1.0, 0.0);
db.store_chunk(
&test_vector_chunk(
"tenant-a-vector",
MemoryTier::Project,
tenant_a.clone(),
"tenant a memory",
None,
),
&embedding(0.8, 0.2),
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-b-vector",
MemoryTier::Project,
tenant_b.clone(),
"tenant b closer memory",
None,
),
&query,
)
.await
.unwrap();
let results = db
.search_similar_for_tenant(
&query,
MemoryTier::Project,
Some("shared-project"),
None,
&tenant_a,
1,
)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].0.id, "tenant-a-vector");
assert_eq!(results[0].0.tenant_scope, tenant_a);
}
#[tokio::test]
async fn test_identical_vector_content_only_returns_request_tenant() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let vector = embedding(0.4, 0.6);
db.store_chunk(
&test_vector_chunk(
"tenant-a-identical",
MemoryTier::Global,
tenant_a.clone(),
"identical memory body",
Some("same-source-hash"),
),
&vector,
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-b-identical",
MemoryTier::Global,
tenant_b,
"identical memory body",
Some("same-source-hash"),
),
&vector,
)
.await
.unwrap();
let results = db
.search_similar_for_tenant(&vector, MemoryTier::Global, None, None, &tenant_a, 10)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].0.id, "tenant-a-identical");
}
#[tokio::test]
async fn test_tenant_delete_does_not_remove_other_tenant_vector_memory() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let vector = embedding(0.2, 0.8);
db.store_chunk(
&test_vector_chunk(
"tenant-a-delete",
MemoryTier::Global,
tenant_a.clone(),
"tenant a delete target",
None,
),
&vector,
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-b-delete",
MemoryTier::Global,
tenant_b.clone(),
"tenant b delete target",
None,
),
&vector,
)
.await
.unwrap();
let cross_delete = db
.delete_chunk_for_tenant(MemoryTier::Global, "tenant-b-delete", None, None, &tenant_a)
.await
.unwrap();
assert_eq!(cross_delete, 0);
let tenant_b_results = db
.search_similar_for_tenant(&vector, MemoryTier::Global, None, None, &tenant_b, 10)
.await
.unwrap();
assert_eq!(tenant_b_results.len(), 1);
assert_eq!(tenant_b_results[0].0.id, "tenant-b-delete");
let own_delete = db
.delete_chunk_for_tenant(MemoryTier::Global, "tenant-a-delete", None, None, &tenant_a)
.await
.unwrap();
assert_eq!(own_delete, 1);
assert_eq!(
db.search_similar_for_tenant(&vector, MemoryTier::Global, None, None, &tenant_b, 10)
.await
.unwrap()
.len(),
1
);
}
#[tokio::test]
async fn test_same_source_hash_does_not_dedupe_across_tenants() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let source_hash = "shared-source-hash";
db.store_chunk(
&test_vector_chunk(
"tenant-a-hash",
MemoryTier::Global,
tenant_a.clone(),
"same source hash",
Some(source_hash),
),
&embedding(0.7, 0.1),
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-b-hash",
MemoryTier::Global,
tenant_b.clone(),
"same source hash",
Some(source_hash),
),
&embedding(0.7, 0.1),
)
.await
.unwrap();
assert!(db
.global_chunk_exists_by_source_hash_for_tenant(source_hash, &tenant_a)
.await
.unwrap());
assert!(db
.global_chunk_exists_by_source_hash_for_tenant(source_hash, &tenant_b)
.await
.unwrap());
let tenant_a_chunks = db
.get_global_chunks_for_tenant(&tenant_a, 10)
.await
.unwrap();
let tenant_b_chunks = db
.get_global_chunks_for_tenant(&tenant_b, 10)
.await
.unwrap();
assert_eq!(tenant_a_chunks.len(), 1);
assert_eq!(tenant_b_chunks.len(), 1);
assert_ne!(tenant_a_chunks[0].id, tenant_b_chunks[0].id);
}
#[tokio::test]
async fn test_memory_stats_are_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
db.store_chunk(
&test_vector_chunk(
"tenant-a-session-stat",
MemoryTier::Session,
tenant_a.clone(),
"tenant a session stats",
None,
),
&embedding(0.1, 0.2),
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-a-project-stat",
MemoryTier::Project,
tenant_a.clone(),
"tenant a project stats",
None,
),
&embedding(0.2, 0.3),
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-a-global-stat",
MemoryTier::Global,
tenant_a.clone(),
"tenant a global stats",
None,
),
&embedding(0.3, 0.4),
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-b-project-stat",
MemoryTier::Project,
tenant_b.clone(),
"tenant b project stats should not count",
None,
),
&embedding(0.4, 0.5),
)
.await
.unwrap();
db.log_cleanup_for_tenant(
"test",
MemoryTier::Project,
Some("shared-project"),
None,
1,
10,
&tenant_b,
)
.await
.unwrap();
let tenant_a_stats = db.get_stats_for_tenant(&tenant_a).await.unwrap();
assert_eq!(tenant_a_stats.session_chunks, 1);
assert_eq!(tenant_a_stats.project_chunks, 1);
assert_eq!(tenant_a_stats.global_chunks, 1);
assert_eq!(tenant_a_stats.total_chunks, 3);
assert!(tenant_a_stats.total_bytes > 0);
assert!(tenant_a_stats.last_cleanup.is_none());
let tenant_b_stats = db.get_stats_for_tenant(&tenant_b).await.unwrap();
assert_eq!(tenant_b_stats.session_chunks, 0);
assert_eq!(tenant_b_stats.project_chunks, 1);
assert_eq!(tenant_b_stats.global_chunks, 0);
assert_eq!(tenant_b_stats.total_chunks, 1);
assert!(tenant_b_stats.last_cleanup.is_some());
}
#[tokio::test]
async fn test_project_stats_are_tenant_scoped_for_vector_chunks() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
db.store_chunk(
&test_vector_chunk(
"tenant-a-project-stat-1",
MemoryTier::Project,
tenant_a.clone(),
"tenant a project stat one",
None,
),
&embedding(0.5, 0.1),
)
.await
.unwrap();
let mut tenant_a_file = test_vector_chunk(
"tenant-a-project-file-stat",
MemoryTier::Project,
tenant_a.clone(),
"tenant a file stat",
None,
);
tenant_a_file.source = "file".to_string();
db.store_chunk(&tenant_a_file, &embedding(0.6, 0.1))
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-b-project-stat-1",
MemoryTier::Project,
tenant_b,
"tenant b project stat",
None,
),
&embedding(0.7, 0.1),
)
.await
.unwrap();
let stats = db
.get_project_stats_for_tenant("shared-project", &tenant_a)
.await
.unwrap();
assert_eq!(stats.project_chunks, 2);
assert_eq!(stats.file_index_chunks, 1);
assert!(stats.project_bytes > 0);
assert!(stats.file_index_bytes > 0);
}
#[tokio::test]
async fn test_import_index_paths_are_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
db.upsert_import_index_entry_for_tenant(
MemoryTier::Project,
None,
Some("shared-project"),
"repo/README.md",
10,
100,
"hash-a",
&tenant_a,
)
.await
.unwrap();
db.upsert_import_index_entry_for_tenant(
MemoryTier::Project,
None,
Some("shared-project"),
"repo/README.md",
20,
200,
"hash-b",
&tenant_b,
)
.await
.unwrap();
let tenant_a_paths = db
.list_import_index_paths_for_tenant(
MemoryTier::Project,
None,
Some("shared-project"),
&tenant_a,
)
.await
.unwrap();
assert_eq!(tenant_a_paths, vec!["repo/README.md".to_string()]);
let tenant_a_entry = db
.get_import_index_entry_for_tenant(
MemoryTier::Project,
None,
Some("shared-project"),
"repo/README.md",
&tenant_a,
)
.await
.unwrap()
.unwrap();
let tenant_b_entry = db
.get_import_index_entry_for_tenant(
MemoryTier::Project,
None,
Some("shared-project"),
"repo/README.md",
&tenant_b,
)
.await
.unwrap()
.unwrap();
assert_eq!(tenant_a_entry.2, "hash-a");
assert_eq!(tenant_b_entry.2, "hash-b");
}
#[tokio::test]
async fn test_delete_import_index_entry_is_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
for (tenant, hash) in [(&tenant_a, "hash-a"), (&tenant_b, "hash-b")] {
db.upsert_import_index_entry_for_tenant(
MemoryTier::Global,
None,
None,
"shared/path.md",
1,
10,
hash,
tenant,
)
.await
.unwrap();
}
db.delete_import_index_entry_for_tenant(
MemoryTier::Global,
None,
None,
"shared/path.md",
&tenant_a,
)
.await
.unwrap();
assert!(db
.get_import_index_entry_for_tenant(
MemoryTier::Global,
None,
None,
"shared/path.md",
&tenant_a
)
.await
.unwrap()
.is_none());
let tenant_b_entry = db
.get_import_index_entry_for_tenant(
MemoryTier::Global,
None,
None,
"shared/path.md",
&tenant_b,
)
.await
.unwrap()
.unwrap();
assert_eq!(tenant_b_entry.2, "hash-b");
}
#[tokio::test]
async fn test_file_chunk_delete_by_path_is_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let mut chunk_a = test_vector_chunk(
"tenant-a-file-delete",
MemoryTier::Project,
tenant_a.clone(),
"same file content",
Some("same-hash"),
);
chunk_a.source = "file".to_string();
chunk_a.source_path = Some("repo/file.md".to_string());
let mut chunk_b = test_vector_chunk(
"tenant-b-file-delete",
MemoryTier::Project,
tenant_b.clone(),
"same file content",
Some("same-hash"),
);
chunk_b.source = "file".to_string();
chunk_b.source_path = Some("repo/file.md".to_string());
db.store_chunk(&chunk_a, &embedding(0.1, 0.2))
.await
.unwrap();
db.store_chunk(&chunk_b, &embedding(0.1, 0.2))
.await
.unwrap();
let (deleted, _) = db
.delete_file_chunks_by_path_for_tenant(
MemoryTier::Project,
None,
Some("shared-project"),
"repo/file.md",
&tenant_a,
)
.await
.unwrap();
assert_eq!(deleted, 1);
assert!(db
.get_project_chunks_for_tenant("shared-project", &tenant_a)
.await
.unwrap()
.is_empty());
let tenant_b_chunks = db
.get_project_chunks_for_tenant("shared-project", &tenant_b)
.await
.unwrap();
assert_eq!(tenant_b_chunks.len(), 1);
assert_eq!(tenant_b_chunks[0].id, "tenant-b-file-delete");
}
#[tokio::test]
async fn test_project_file_index_clear_is_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
for (tenant, id, hash) in [
(&tenant_a, "tenant-a-clear-file-index", "hash-a"),
(&tenant_b, "tenant-b-clear-file-index", "hash-b"),
] {
db.upsert_file_index_entry_for_tenant(
"shared-project",
"repo/file.md",
1,
10,
hash,
tenant,
)
.await
.unwrap();
db.upsert_project_index_status_for_tenant("shared-project", 5, 4, 3, 2, 1, tenant)
.await
.unwrap();
let mut chunk = test_vector_chunk(
id,
MemoryTier::Project,
tenant.clone(),
"file index clear content",
Some(hash),
);
chunk.source = "file".to_string();
chunk.source_path = Some("repo/file.md".to_string());
db.store_chunk(&chunk, &embedding(0.4, 0.5)).await.unwrap();
}
let result = db
.clear_project_file_index_for_tenant("shared-project", false, &tenant_a)
.await
.unwrap();
assert_eq!(result.chunks_deleted, 1);
assert_eq!(
db.project_file_index_count_for_tenant("shared-project", &tenant_a)
.await
.unwrap(),
0
);
assert!(db
.get_project_chunks_for_tenant("shared-project", &tenant_a)
.await
.unwrap()
.is_empty());
assert_eq!(
db.project_file_index_count_for_tenant("shared-project", &tenant_b)
.await
.unwrap(),
1
);
assert_eq!(
db.get_project_chunks_for_tenant("shared-project", &tenant_b)
.await
.unwrap()
.len(),
1
);
let tenant_b_stats = db
.get_project_stats_for_tenant("shared-project", &tenant_b)
.await
.unwrap();
assert_eq!(tenant_b_stats.last_indexed_files, Some(3));
}
#[tokio::test]
async fn test_project_stats_file_index_is_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
db.upsert_file_index_entry_for_tenant(
"shared-project",
"repo/a.md",
1,
10,
"hash-a",
&tenant_a,
)
.await
.unwrap();
db.upsert_project_index_status_for_tenant("shared-project", 10, 9, 8, 1, 0, &tenant_a)
.await
.unwrap();
db.upsert_file_index_entry_for_tenant(
"shared-project",
"repo/b.md",
2,
20,
"hash-b",
&tenant_b,
)
.await
.unwrap();
db.upsert_project_index_status_for_tenant("shared-project", 3, 2, 1, 1, 0, &tenant_b)
.await
.unwrap();
let stats_a = db
.get_project_stats_for_tenant("shared-project", &tenant_a)
.await
.unwrap();
let stats_b = db
.get_project_stats_for_tenant("shared-project", &tenant_b)
.await
.unwrap();
assert_eq!(stats_a.indexed_files, 1);
assert_eq!(stats_a.last_total_files, Some(10));
assert_eq!(stats_a.last_indexed_files, Some(8));
assert_eq!(stats_b.indexed_files, 1);
assert_eq!(stats_b.last_total_files, Some(3));
assert_eq!(stats_b.last_indexed_files, Some(1));
}
#[tokio::test]
async fn test_clear_session_and_project_memory_are_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
db.store_chunk(
&test_vector_chunk(
"tenant-a-clear-session",
MemoryTier::Session,
tenant_a.clone(),
"tenant a session clear target",
None,
),
&embedding(0.1, 0.9),
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-b-clear-session",
MemoryTier::Session,
tenant_b.clone(),
"tenant b session must remain",
None,
),
&embedding(0.1, 0.9),
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-a-clear-project",
MemoryTier::Project,
tenant_a.clone(),
"tenant a project clear target",
None,
),
&embedding(0.2, 0.8),
)
.await
.unwrap();
db.store_chunk(
&test_vector_chunk(
"tenant-b-clear-project",
MemoryTier::Project,
tenant_b.clone(),
"tenant b project must remain",
None,
),
&embedding(0.2, 0.8),
)
.await
.unwrap();
assert_eq!(
db.clear_session_memory_for_tenant("shared-session", &tenant_a)
.await
.unwrap(),
1
);
assert_eq!(
db.clear_project_memory_for_tenant("shared-project", &tenant_a)
.await
.unwrap(),
1
);
let tenant_b_session = db
.get_session_chunks_for_tenant("shared-session", &tenant_b)
.await
.unwrap();
let tenant_b_project = db
.get_project_chunks_for_tenant("shared-project", &tenant_b)
.await
.unwrap();
assert_eq!(tenant_b_session.len(), 1);
assert_eq!(tenant_b_project.len(), 1);
}
#[tokio::test]
async fn test_old_session_cleanup_is_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let old = Utc::now() - chrono::Duration::days(90);
let mut tenant_a_old = test_vector_chunk(
"tenant-a-old-session",
MemoryTier::Session,
tenant_a.clone(),
"tenant a old session",
None,
);
tenant_a_old.created_at = old;
db.store_chunk(&tenant_a_old, &embedding(0.3, 0.7))
.await
.unwrap();
let mut tenant_b_old = test_vector_chunk(
"tenant-b-old-session",
MemoryTier::Session,
tenant_b.clone(),
"tenant b old session",
None,
);
tenant_b_old.created_at = old;
db.store_chunk(&tenant_b_old, &embedding(0.3, 0.7))
.await
.unwrap();
assert_eq!(
db.cleanup_old_sessions_for_tenant(30, &tenant_a)
.await
.unwrap(),
1
);
assert!(db
.get_session_chunks_for_tenant("shared-session", &tenant_a)
.await
.unwrap()
.is_empty());
assert_eq!(
db.get_session_chunks_for_tenant("shared-session", &tenant_b)
.await
.unwrap()
.len(),
1
);
}
#[tokio::test]
async fn test_config_crud() {
let (db, _temp) = setup_test_db().await;
let config = db.get_or_create_config("project-1").await.unwrap();
assert_eq!(config.max_chunks, 10000);
let new_config = MemoryConfig {
max_chunks: 5000,
..Default::default()
};
db.update_config("project-1", &new_config).await.unwrap();
let updated = db.get_or_create_config("project-1").await.unwrap();
assert_eq!(updated.max_chunks, 5000);
}
#[tokio::test]
async fn test_config_crud_is_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let config_a = MemoryConfig {
max_chunks: 111,
session_retention_days: 7,
..Default::default()
};
let config_b = MemoryConfig {
max_chunks: 222,
session_retention_days: 14,
..Default::default()
};
db.update_config_for_tenant("shared-project", &config_a, &tenant_a)
.await
.unwrap();
db.update_config_for_tenant("shared-project", &config_b, &tenant_b)
.await
.unwrap();
let loaded_a = db
.get_or_create_config_for_tenant("shared-project", &tenant_a)
.await
.unwrap();
let loaded_b = db
.get_or_create_config_for_tenant("shared-project", &tenant_b)
.await
.unwrap();
assert_eq!(loaded_a.max_chunks, 111);
assert_eq!(loaded_a.session_retention_days, 7);
assert_eq!(loaded_b.max_chunks, 222);
assert_eq!(loaded_b.session_retention_days, 14);
}
#[tokio::test]
async fn test_prune_old_session_chunks_is_tenant_scoped() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let old = Utc::now() - chrono::Duration::days(10);
let mut chunk_a = test_vector_chunk(
"tenant-a-old-session-prune",
MemoryTier::Session,
tenant_a.clone(),
"old tenant a session chunk",
None,
);
chunk_a.created_at = old;
let mut chunk_b = test_vector_chunk(
"tenant-b-old-session-prune",
MemoryTier::Session,
tenant_b.clone(),
"old tenant b session chunk",
None,
);
chunk_b.created_at = old;
db.store_chunk(&chunk_a, &embedding(0.2, 0.8))
.await
.unwrap();
db.store_chunk(&chunk_b, &embedding(0.2, 0.8))
.await
.unwrap();
let deleted = db
.prune_old_session_chunks_for_tenant(1, &tenant_a)
.await
.unwrap();
assert_eq!(deleted, 1);
assert!(db
.get_session_chunks_for_tenant("shared-session", &tenant_a)
.await
.unwrap()
.is_empty());
assert_eq!(
db.get_session_chunks_for_tenant("shared-session", &tenant_b)
.await
.unwrap()
.len(),
1
);
}
#[tokio::test]
async fn test_run_hygiene_reads_tenant_scoped_global_config() {
let (db, _temp) = setup_test_db().await;
let tenant_a = tenant_scope("org-a", "workspace-a");
let tenant_b = tenant_scope("org-b", "workspace-b");
let old = Utc::now() - chrono::Duration::days(10);
let config_a = MemoryConfig {
session_retention_days: 1,
..Default::default()
};
let config_b = MemoryConfig {
session_retention_days: 0,
..Default::default()
};
db.update_config_for_tenant("__global__", &config_a, &tenant_a)
.await
.unwrap();
db.update_config_for_tenant("__global__", &config_b, &tenant_b)
.await
.unwrap();
let mut chunk_a = test_vector_chunk(
"tenant-a-hygiene",
MemoryTier::Session,
tenant_a.clone(),
"tenant a old hygiene chunk",
None,
);
chunk_a.created_at = old;
let mut chunk_b = test_vector_chunk(
"tenant-b-hygiene",
MemoryTier::Session,
tenant_b.clone(),
"tenant b old hygiene chunk",
None,
);
chunk_b.created_at = old;
db.store_chunk(&chunk_a, &embedding(0.3, 0.7))
.await
.unwrap();
db.store_chunk(&chunk_b, &embedding(0.3, 0.7))
.await
.unwrap();
let deleted = db.run_hygiene_for_tenant(0, &tenant_a).await.unwrap();
assert_eq!(deleted, 1);
assert!(db
.get_session_chunks_for_tenant("shared-session", &tenant_a)
.await
.unwrap()
.is_empty());
assert_eq!(
db.get_session_chunks_for_tenant("shared-session", &tenant_b)
.await
.unwrap()
.len(),
1
);
}
#[tokio::test]
async fn test_global_memory_put_search_and_dedup() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let record = GlobalMemoryRecord {
id: "gm-1".to_string(),
user_id: "user-a".to_string(),
source_type: "user_message".to_string(),
content: "remember rust workspace layout".to_string(),
content_hash: "h1".to_string(),
run_id: "run-1".to_string(),
session_id: Some("s1".to_string()),
message_id: Some("m1".to_string()),
tool_name: None,
project_tag: Some("proj-x".to_string()),
channel_tag: None,
host_tag: None,
metadata: None,
provenance: None,
redaction_status: "passed".to_string(),
redaction_count: 0,
visibility: "private".to_string(),
demoted: false,
score_boost: 0.0,
created_at_ms: now,
updated_at_ms: now,
expires_at_ms: None,
};
let first = db.put_global_memory_record(&record).await.unwrap();
assert!(first.stored);
let second = db.put_global_memory_record(&record).await.unwrap();
assert!(second.deduped);
let hits = db
.search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
.await
.unwrap();
assert!(!hits.is_empty());
assert_eq!(hits[0].record.id, "gm-1");
}
#[tokio::test]
async fn test_global_memory_tenant_filtered_fts_list_get_and_delete() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let tenant_a = GlobalMemoryRecord {
id: "gm-tenant-a".to_string(),
user_id: "same-user".to_string(),
source_type: "note".to_string(),
content: "shared tenant phrase".to_string(),
content_hash: "same-hash".to_string(),
run_id: "same-run".to_string(),
session_id: Some("same-session".to_string()),
message_id: Some("same-message".to_string()),
tool_name: None,
project_tag: Some("same-project".to_string()),
channel_tag: None,
host_tag: None,
metadata: None,
provenance: Some(serde_json::json!({
"tenant_context": {
"org_id": "org-a",
"workspace_id": "workspace-a",
"source": "explicit"
}
})),
redaction_status: "passed".to_string(),
redaction_count: 0,
visibility: "private".to_string(),
demoted: false,
score_boost: 0.0,
created_at_ms: now,
updated_at_ms: now,
expires_at_ms: None,
};
let mut tenant_b = tenant_a.clone();
tenant_b.id = "gm-tenant-b".to_string();
tenant_b.provenance = Some(serde_json::json!({
"tenant_context": {
"org_id": "org-b",
"workspace_id": "workspace-b",
"source": "explicit"
}
}));
assert!(db.put_global_memory_record(&tenant_a).await.unwrap().stored);
assert!(db.put_global_memory_record(&tenant_b).await.unwrap().stored);
let hits_a = db
.search_global_memory_for_tenant(
"org-a",
"workspace-a",
None,
"same-user",
"shared tenant phrase",
10,
Some("same-project"),
None,
None,
)
.await
.unwrap();
assert_eq!(hits_a.len(), 1);
assert_eq!(hits_a[0].record.id, "gm-tenant-a");
let rows_b = db
.list_global_memory_for_tenant(
"org-b",
"workspace-b",
None,
"same-user",
Some("shared tenant"),
Some("same-project"),
None,
10,
0,
)
.await
.unwrap();
assert_eq!(rows_b.len(), 1);
assert_eq!(rows_b[0].id, "gm-tenant-b");
assert!(db
.get_global_memory_for_tenant("gm-tenant-b", "org-a", "workspace-a", None)
.await
.unwrap()
.is_none());
assert!(!db
.delete_global_memory_for_tenant("gm-tenant-b", "org-a", "workspace-a", None)
.await
.unwrap());
assert!(db
.delete_global_memory_for_tenant("gm-tenant-b", "org-b", "workspace-b", None)
.await
.unwrap());
}
#[tokio::test]
async fn test_knowledge_registry_round_trip() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-1".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("marketing/positioning".to_string()),
title: Some("Marketing positioning".to_string()),
description: Some("Reusable positioning guidance".to_string()),
trust_level: KnowledgeTrustLevel::ApprovedDefault,
metadata: Some(serde_json::json!({"owner":"marketing"})),
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let loaded_space = db.get_knowledge_space("space-1").await.unwrap().unwrap();
assert_eq!(loaded_space.id, "space-1");
assert_eq!(loaded_space.scope, KnowledgeScope::Project);
assert_eq!(loaded_space.project_id.as_deref(), Some("project-1"));
assert_eq!(
loaded_space.namespace.as_deref(),
Some("marketing/positioning")
);
let item = KnowledgeItemRecord {
id: "item-1".to_string(),
space_id: "space-1".to_string(),
coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
dedupe_key: "item-1-dedupe".to_string(),
item_type: "evidence".to_string(),
title: "Pricing sensitivity observation".to_string(),
summary: Some("Customers reacted to annual pricing changes".to_string()),
payload: serde_json::json!({"claim":"Annual pricing changes created friction"}),
trust_level: KnowledgeTrustLevel::Promoted,
status: KnowledgeItemStatus::Promoted,
run_id: Some("run-1".to_string()),
artifact_refs: vec!["artifact://run-1/research-sources".to_string()],
source_memory_ids: vec!["memory-1".to_string()],
freshness_expires_at_ms: Some(now + 86_400_000),
metadata: Some(serde_json::json!({"source_kind":"web"})),
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let loaded_item = db.get_knowledge_item("item-1").await.unwrap().unwrap();
assert_eq!(loaded_item.id, "item-1");
assert_eq!(loaded_item.space_id, "space-1");
assert_eq!(
loaded_item.coverage_key,
"project-1::marketing/positioning::strategy::pricing"
);
assert_eq!(loaded_item.status, KnowledgeItemStatus::Promoted);
assert_eq!(
loaded_item.artifact_refs,
vec!["artifact://run-1/research-sources".to_string()]
);
let by_space = db.list_knowledge_items("space-1", None).await.unwrap();
assert_eq!(by_space.len(), 1);
let by_coverage = db
.list_knowledge_items(
"space-1",
Some("project-1::marketing/positioning::strategy::pricing"),
)
.await
.unwrap();
assert_eq!(by_coverage.len(), 1);
let coverage = KnowledgeCoverageRecord {
coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
space_id: "space-1".to_string(),
latest_item_id: Some("item-1".to_string()),
latest_dedupe_key: Some("item-1-dedupe".to_string()),
last_seen_at_ms: now,
last_promoted_at_ms: Some(now),
freshness_expires_at_ms: Some(now + 86_400_000),
metadata: Some(serde_json::json!({"reuse_reason":"same topic"})),
};
db.upsert_knowledge_coverage(&coverage).await.unwrap();
let loaded_coverage = db
.get_knowledge_coverage(
"project-1::marketing/positioning::strategy::pricing",
"space-1",
)
.await
.unwrap()
.unwrap();
assert_eq!(loaded_coverage.space_id, "space-1");
assert_eq!(loaded_coverage.latest_item_id.as_deref(), Some("item-1"));
assert_eq!(
loaded_coverage.latest_dedupe_key.as_deref(),
Some("item-1-dedupe")
);
}
#[tokio::test]
async fn test_knowledge_promotion_working_to_promoted_updates_coverage() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-promote-1".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("engineering/debugging".to_string()),
title: Some("Engineering debugging".to_string()),
description: Some("Reusable debugging guidance".to_string()),
trust_level: KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-promote-1".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
dedupe_key: "dedupe-promote-1".to_string(),
item_type: "decision".to_string(),
title: "Delay startup-dependent retries".to_string(),
summary: Some("Retry only after startup completed.".to_string()),
payload: serde_json::json!({"action":"delay_retry"}),
trust_level: KnowledgeTrustLevel::Working,
status: KnowledgeItemStatus::Working,
run_id: Some("run-1".to_string()),
artifact_refs: vec!["artifact://run-1/debug".to_string()],
source_memory_ids: vec!["memory-1".to_string()],
freshness_expires_at_ms: None,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let promote = KnowledgePromotionRequest {
item_id: item.id.clone(),
target_status: KnowledgeItemStatus::Promoted,
promoted_at_ms: now + 10,
freshness_expires_at_ms: Some(now + 86_400_000),
reviewer_id: None,
approval_id: None,
reason: Some("validated in workflow".to_string()),
};
let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
assert!(result.promoted);
assert_eq!(result.item.status, KnowledgeItemStatus::Promoted);
assert_eq!(result.item.trust_level, KnowledgeTrustLevel::Promoted);
assert_eq!(
result.coverage.latest_item_id.as_deref(),
Some("item-promote-1")
);
assert_eq!(
result.coverage.latest_dedupe_key.as_deref(),
Some("dedupe-promote-1")
);
assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 10));
}
#[tokio::test]
async fn test_knowledge_promotion_promoted_to_approved_default_requires_review() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-promote-2".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("marketing/positioning".to_string()),
title: Some("Marketing positioning".to_string()),
description: Some("Reusable positioning guidance".to_string()),
trust_level: KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-promote-2".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
dedupe_key: "dedupe-promote-2".to_string(),
item_type: "evidence".to_string(),
title: "Pricing observation".to_string(),
summary: Some("Annual pricing changes created friction".to_string()),
payload: serde_json::json!({"claim":"pricing friction"}),
trust_level: KnowledgeTrustLevel::Promoted,
status: KnowledgeItemStatus::Promoted,
run_id: Some("run-2".to_string()),
artifact_refs: vec!["artifact://run-2/research".to_string()],
source_memory_ids: vec!["memory-2".to_string()],
freshness_expires_at_ms: None,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let promote = KnowledgePromotionRequest {
item_id: item.id.clone(),
target_status: KnowledgeItemStatus::ApprovedDefault,
promoted_at_ms: now + 5,
freshness_expires_at_ms: None,
reviewer_id: None,
approval_id: None,
reason: Some("should require review".to_string()),
};
let err = db.promote_knowledge_item(&promote).await.unwrap_err();
match err {
MemoryError::InvalidConfig(_) => {}
other => panic!("unexpected error: {}", other),
}
}
#[tokio::test]
async fn test_knowledge_promotion_promoted_to_approved_default_updates_coverage() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-promote-3".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("support/runbooks".to_string()),
title: Some("Support runbooks".to_string()),
description: Some("Reusable runbook guidance".to_string()),
trust_level: KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-promote-3".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1::support/runbooks::oncall::restart".to_string(),
dedupe_key: "dedupe-promote-3".to_string(),
item_type: "runbook".to_string(),
title: "Restart service and verify".to_string(),
summary: Some("Restart then validate health endpoint.".to_string()),
payload: serde_json::json!({"steps":["restart","healthcheck"]}),
trust_level: KnowledgeTrustLevel::Promoted,
status: KnowledgeItemStatus::Promoted,
run_id: Some("run-3".to_string()),
artifact_refs: vec!["artifact://run-3/runbook".to_string()],
source_memory_ids: vec!["memory-3".to_string()],
freshness_expires_at_ms: None,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let promote = KnowledgePromotionRequest {
item_id: item.id.clone(),
target_status: KnowledgeItemStatus::ApprovedDefault,
promoted_at_ms: now + 12,
freshness_expires_at_ms: Some(now + 172_800_000),
reviewer_id: Some("reviewer-1".to_string()),
approval_id: Some("approval-1".to_string()),
reason: Some("reviewed by ops".to_string()),
};
let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
assert!(result.promoted);
assert_eq!(result.item.status, KnowledgeItemStatus::ApprovedDefault);
assert_eq!(
result.item.trust_level,
KnowledgeTrustLevel::ApprovedDefault
);
assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 12));
assert_eq!(
result.coverage.latest_item_id.as_deref(),
Some("item-promote-3")
);
}
#[tokio::test]
async fn test_knowledge_promotion_rejects_deprecated() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-promote-4".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("ops".to_string()),
title: Some("Ops knowledge".to_string()),
description: Some("Reusable ops knowledge".to_string()),
trust_level: KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-promote-4".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1::ops::incident::latency".to_string(),
dedupe_key: "dedupe-promote-4".to_string(),
item_type: "decision".to_string(),
title: "Ignore deprecated item".to_string(),
summary: None,
payload: serde_json::json!({"decision":"deprecated"}),
trust_level: KnowledgeTrustLevel::Promoted,
status: KnowledgeItemStatus::Deprecated,
run_id: Some("run-4".to_string()),
artifact_refs: vec![],
source_memory_ids: vec![],
freshness_expires_at_ms: None,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let promote = KnowledgePromotionRequest {
item_id: item.id.clone(),
target_status: KnowledgeItemStatus::Promoted,
promoted_at_ms: now + 1,
freshness_expires_at_ms: None,
reviewer_id: None,
approval_id: None,
reason: None,
};
let err = db.promote_knowledge_item(&promote).await.unwrap_err();
match err {
MemoryError::InvalidConfig(_) => {}
other => panic!("unexpected error: {}", other),
}
}
#[tokio::test]
async fn test_knowledge_promotion_idempotent_updates_coverage() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-promote-5".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("engineering/ops".to_string()),
title: Some("Engineering ops".to_string()),
description: None,
trust_level: KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-promote-5".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1::engineering/ops::deploy::guardrails".to_string(),
dedupe_key: "dedupe-promote-5".to_string(),
item_type: "pattern".to_string(),
title: "Deploy guardrails".to_string(),
summary: None,
payload: serde_json::json!({"pattern":"guardrails"}),
trust_level: KnowledgeTrustLevel::Promoted,
status: KnowledgeItemStatus::Promoted,
run_id: Some("run-5".to_string()),
artifact_refs: vec![],
source_memory_ids: vec![],
freshness_expires_at_ms: None,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let promote = KnowledgePromotionRequest {
item_id: item.id.clone(),
target_status: KnowledgeItemStatus::Promoted,
promoted_at_ms: now + 20,
freshness_expires_at_ms: None,
reviewer_id: None,
approval_id: None,
reason: None,
};
let result = db.promote_knowledge_item(&promote).await.unwrap().unwrap();
assert!(!result.promoted);
assert_eq!(result.coverage.last_promoted_at_ms, Some(now + 20));
assert_eq!(
result.coverage.latest_item_id.as_deref(),
Some("item-promote-5")
);
}
#[tokio::test]
async fn test_knowledge_item_promotion_updates_coverage() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-promote".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("engineering/debugging".to_string()),
title: Some("Engineering debugging".to_string()),
description: Some("Reusable debugging guidance".to_string()),
trust_level: KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-promote".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1::engineering/debugging::startup::race".to_string(),
dedupe_key: "dedupe-promote".to_string(),
item_type: "decision".to_string(),
title: "Delay startup-dependent retries".to_string(),
summary: Some("Retry only after startup completes.".to_string()),
payload: serde_json::json!({"action": "delay_retry"}),
trust_level: KnowledgeTrustLevel::Working,
status: KnowledgeItemStatus::Working,
run_id: Some("run-promote".to_string()),
artifact_refs: vec!["artifact://run-promote/report".to_string()],
source_memory_ids: vec!["memory-promote".to_string()],
freshness_expires_at_ms: None,
metadata: Some(serde_json::json!({"source_kind":"run"})),
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let request = KnowledgePromotionRequest {
item_id: item.id.clone(),
target_status: KnowledgeItemStatus::Promoted,
promoted_at_ms: now + 10,
freshness_expires_at_ms: Some(now + 86_400_000),
reviewer_id: None,
approval_id: None,
reason: Some("validated".to_string()),
};
let promoted = db
.promote_knowledge_item(&request)
.await
.unwrap()
.expect("promotion result");
assert_eq!(promoted.previous_status, KnowledgeItemStatus::Working);
assert!(promoted.promoted);
assert_eq!(promoted.item.status, KnowledgeItemStatus::Promoted);
assert_eq!(promoted.item.trust_level, KnowledgeTrustLevel::Promoted);
assert_eq!(
promoted.item.freshness_expires_at_ms,
Some(now + 86_400_000)
);
assert_eq!(
promoted
.item
.metadata
.as_ref()
.and_then(|value| value.get("promotion"))
.and_then(|value| value.get("to_status"))
.and_then(Value::as_str),
Some("promoted")
);
assert_eq!(
promoted.coverage.latest_item_id.as_deref(),
Some("item-promote")
);
assert_eq!(
promoted.coverage.latest_dedupe_key.as_deref(),
Some("dedupe-promote")
);
assert_eq!(promoted.coverage.last_promoted_at_ms, Some(now + 10));
assert_eq!(
promoted.coverage.freshness_expires_at_ms,
Some(now + 86_400_000)
);
let loaded = db
.get_knowledge_item("item-promote")
.await
.unwrap()
.unwrap();
assert_eq!(loaded.status, KnowledgeItemStatus::Promoted);
assert_eq!(
loaded
.metadata
.as_ref()
.and_then(|value| value.get("promotion"))
.and_then(|value| value.get("from_status"))
.and_then(Value::as_str),
Some("working")
);
}
#[tokio::test]
async fn test_knowledge_item_approved_default_requires_review() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-approved".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("marketing/positioning".to_string()),
title: Some("Marketing positioning".to_string()),
description: Some("Reusable positioning guidance".to_string()),
trust_level: KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-approved".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1::marketing/positioning::strategy::pricing".to_string(),
dedupe_key: "dedupe-approved".to_string(),
item_type: "evidence".to_string(),
title: "Pricing sensitivity observation".to_string(),
summary: Some("Customers reacted to annual pricing changes".to_string()),
payload: serde_json::json!({"claim":"Annual pricing changes created friction"}),
trust_level: KnowledgeTrustLevel::Promoted,
status: KnowledgeItemStatus::Promoted,
run_id: Some("run-approved".to_string()),
artifact_refs: vec!["artifact://run-approved/research".to_string()],
source_memory_ids: vec!["memory-approved".to_string()],
freshness_expires_at_ms: Some(now + 1234),
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let request = KnowledgePromotionRequest {
item_id: item.id.clone(),
target_status: KnowledgeItemStatus::ApprovedDefault,
promoted_at_ms: now + 20,
freshness_expires_at_ms: Some(now + 90_000_000),
reviewer_id: Some("reviewer-1".to_string()),
approval_id: Some("approval-1".to_string()),
reason: Some("approved as default guidance".to_string()),
};
let promoted = db
.promote_knowledge_item(&request)
.await
.unwrap()
.expect("promotion result");
assert_eq!(promoted.previous_status, KnowledgeItemStatus::Promoted);
assert_eq!(promoted.item.status, KnowledgeItemStatus::ApprovedDefault);
assert_eq!(
promoted.item.trust_level,
KnowledgeTrustLevel::ApprovedDefault
);
assert_eq!(promoted.coverage.last_promoted_at_ms, Some(now + 20));
assert_eq!(
promoted
.item
.metadata
.as_ref()
.and_then(|value| value.get("promotion"))
.and_then(|value| value.get("approval_id"))
.and_then(Value::as_str),
Some("approval-1")
);
}
#[tokio::test]
async fn test_knowledge_item_promotion_rejects_invalid_transition() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let space = KnowledgeSpaceRecord {
id: "space-invalid".to_string(),
scope: KnowledgeScope::Project,
project_id: Some("project-1".to_string()),
namespace: Some("support".to_string()),
title: Some("Support".to_string()),
description: Some("Support guidance".to_string()),
trust_level: KnowledgeTrustLevel::Promoted,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_space(&space).await.unwrap();
let item = KnowledgeItemRecord {
id: "item-invalid".to_string(),
space_id: space.id.clone(),
coverage_key: "project-1::support::workflow::triage".to_string(),
dedupe_key: "dedupe-invalid".to_string(),
item_type: "decision".to_string(),
title: "Triage first".to_string(),
summary: None,
payload: serde_json::json!({"action":"triage"}),
trust_level: KnowledgeTrustLevel::Working,
status: KnowledgeItemStatus::Working,
run_id: Some("run-invalid".to_string()),
artifact_refs: vec![],
source_memory_ids: vec![],
freshness_expires_at_ms: None,
metadata: None,
created_at_ms: now,
updated_at_ms: now,
};
db.upsert_knowledge_item(&item).await.unwrap();
let request = KnowledgePromotionRequest {
item_id: item.id.clone(),
target_status: KnowledgeItemStatus::ApprovedDefault,
promoted_at_ms: now + 1,
freshness_expires_at_ms: None,
reviewer_id: Some("reviewer-1".to_string()),
approval_id: Some("approval-1".to_string()),
reason: Some("should fail".to_string()),
};
let err = db.promote_knowledge_item(&request).await.unwrap_err();
assert!(matches!(err, MemoryError::InvalidConfig(_)));
let loaded = db.get_knowledge_item(&item.id).await.unwrap().unwrap();
assert_eq!(loaded.status, KnowledgeItemStatus::Working);
}
}