mod common;
use std::collections::HashSet;
use nexus_core::config::AgentConfig;
use nexus_core::{CognitiveLevel, CognitiveMetadata, MemoryCategory};
use nexus_memory_agent::{RuntimeController, RuntimeMode, RuntimeShutdownReason};
use nexus_storage::models::EnqueueJobParams;
use nexus_storage::repository::{MemoryRepository, NamespaceRepository, StoreMemoryParams};
use nexus_storage::StorageManager;
use common::{env_lock, runtime_cognition_config, SoakFixture};
#[tokio::test(flavor = "current_thread")]
async fn three_consecutive_sessions_isolate_cognition_state() {
let _guard = env_lock().lock().await;
let sessions = ["soak-alpha", "soak-beta", "soak-gamma"];
let agent = "claude-code";
let session_contents: &[&[&str]] = &[
&["Alpha: project uses PostgreSQL for persistence"],
&["Beta: project uses Redis for caching"],
&["Gamma: project uses gRPC for communication"],
];
let fixture = SoakFixture::new(agent).await;
let cognition = runtime_cognition_config();
for (i, session_key) in sessions.iter().enumerate() {
let controller = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
let cwd = format!("/tmp/soak-isolation-{session_key}");
controller
.ensure_started(
agent,
Some(session_key),
Some(&cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
for content in session_contents[i] {
fixture
.store_cognitive(
content,
agent,
session_key,
CognitiveLevel::Explicit,
&MemoryCategory::Facts,
)
.await;
}
controller
.flush_and_shutdown(
agent,
Some(session_key),
Some(&cwd),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
}
for (i, session_key) in sessions.iter().enumerate() {
let memories = fixture
.repo
.list_by_session_key(fixture.namespace_id, session_key, 100, true)
.await
.unwrap();
assert!(
memories.iter().any(|m| m.content.contains(session_key)),
"Session {session_key} should contain its own content"
);
for (j, other_key) in sessions.iter().enumerate() {
if i != j {
let other_content = session_contents[j][0];
assert!(
!memories.iter().any(|m| m.content.contains(other_content)),
"Session {session_key} should NOT contain content from {other_key}"
);
}
}
}
}
#[tokio::test(flavor = "current_thread")]
async fn gemini_and_qwen_namespace_isolation_from_claude() {
let _guard = env_lock().lock().await;
let cognition = runtime_cognition_config();
let agents = ["claude-code", "gemini", "qwen"];
let session_keys = ["claude-soak", "gemini-soak", "qwen-soak"];
let contents = [
"Claude-specific memory about anthropic tooling",
"Gemini-specific memory about google tooling",
"Qwen-specific memory about alibaba tooling",
];
let temp = tempfile::tempdir().unwrap();
let home_dir = temp.path().join("home");
let state_dir = temp.path().join("state");
let db_path = temp.path().join("nexus.db");
std::fs::create_dir_all(&home_dir).unwrap();
std::fs::create_dir_all(&state_dir).unwrap();
let _env = common::EnvGuard::set(&[
("HOME", home_dir.display().to_string()),
("XDG_STATE_HOME", state_dir.display().to_string()),
("NEXUS_DATABASE_PATH", db_path.display().to_string()),
]);
let url = format!("sqlite:{}", db_path.display());
let mut storage = StorageManager::from_url(&url).await.unwrap();
storage.initialize().await.unwrap();
let namespace_repo = NamespaceRepository::new(storage.pool().clone());
let repo = MemoryRepository::new(storage.pool().clone());
for i in 0..agents.len() {
let agent = agents[i];
let controller = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
let cwd = format!("/tmp/soak-multiagent-{agent}");
controller
.ensure_started(
agent,
Some(session_keys[i]),
Some(&cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
let namespace = namespace_repo.get_or_create(agent, agent).await.unwrap();
let mut cognitive = CognitiveMetadata::new(
CognitiveLevel::Explicit,
agent,
agent,
Some(session_keys[i].to_string()),
"soak_test",
);
cognitive.confidence = Some(0.9);
let metadata = cognitive.merge_into(&serde_json::json!({}));
repo.store(StoreMemoryParams {
namespace_id: namespace.id,
content: contents[i],
category: &MemoryCategory::Facts,
memory_lane_type: None,
labels: &[],
metadata: &metadata,
embedding: None,
embedding_model: None,
})
.await
.unwrap();
controller
.flush_and_shutdown(
agent,
Some(session_keys[i]),
Some(&cwd),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
}
for i in 0..agents.len() {
let agent = agents[i];
let namespace = namespace_repo.get_or_create(agent, agent).await.unwrap();
let memories = repo
.list_by_session_key(namespace.id, session_keys[i], 100, true)
.await
.unwrap();
assert!(
memories.iter().any(|m| m.content == contents[i]),
"{agent} namespace should contain its own memory"
);
for j in 0..agents.len() {
if i != j {
assert!(
!memories.iter().any(|m| m.content == contents[j]),
"{agent} namespace should NOT contain memory from {}",
agents[j]
);
}
}
}
}
#[tokio::test(flavor = "current_thread")]
async fn buffered_ingestion_drains_on_session_start_and_shutdown() {
let _guard = env_lock().lock().await;
let agent = "claude-code";
let session_key = "soak-buffered-drain";
let fixture = SoakFixture::new(agent).await;
let cognition = runtime_cognition_config();
for i in 0..3 {
let mut cognitive = CognitiveMetadata::new(
CognitiveLevel::Raw,
agent,
agent,
Some(session_key.to_string()),
"soak_test",
);
cognitive.confidence = Some(0.7);
let mut metadata = cognitive.merge_into(&serde_json::json!({}));
metadata["raw_activity"] = serde_json::json!({
"event": "tool_use",
"tool": "cargo_test",
"ordinal": i,
});
fixture
.repo
.store(StoreMemoryParams {
namespace_id: fixture.namespace_id,
content: &format!(
r#"{{"event":"tool_use","tool":"cargo test","exit_code":0,"ordinal":{i}}}"#
),
category: &MemoryCategory::Session,
memory_lane_type: None,
labels: &["raw-activity".to_string()],
metadata: &metadata,
embedding: None,
embedding_model: None,
})
.await
.unwrap();
}
let perspective = serde_json::json!({
"observer": agent,
"subject": agent,
"session_key": session_key,
});
let job_id = fixture
.repo
.enqueue_job(EnqueueJobParams {
namespace_id: fixture.namespace_id,
job_type: "activity_distill",
priority: 10,
perspective: Some(&perspective),
payload: &serde_json::json!({"session_key": session_key}),
})
.await
.unwrap();
assert!(job_id > 0, "Enqueued job should have a valid ID");
let controller = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
let cwd = "/tmp/soak-buffered-drain";
controller
.ensure_started(
agent,
Some(session_key),
Some(cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture.store_contradiction_pair(agent, session_key).await;
controller
.flush_and_shutdown(
agent,
Some(session_key),
Some(cwd),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
let raw_memories = fixture
.repo
.list_by_session_key(fixture.namespace_id, session_key, 100, true)
.await
.unwrap();
assert!(
raw_memories
.iter()
.any(|m| m.labels.iter().any(|l| l == "raw-activity")),
"Raw activity memories should exist for the session"
);
assert!(
raw_memories
.iter()
.any(|m| m.content.contains("cache system")),
"Explicit session memories should exist"
);
}
#[tokio::test(flavor = "current_thread")]
async fn repeated_session_end_dreaming_safely_accumulates() {
let _guard = env_lock().lock().await;
let agent = "claude-code";
let session_keys = ["soak-dream-1", "soak-dream-2", "soak-dream-3"];
let cognition = runtime_cognition_config();
let fixture = SoakFixture::new(agent).await;
for session_key in &session_keys {
let controller = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
let cwd = format!("/tmp/soak-dream-{session_key}");
controller
.ensure_started(
agent,
Some(session_key),
Some(cwd.as_str()),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture.store_contradiction_pair(agent, session_key).await;
controller
.flush_and_shutdown(
agent,
Some(session_key),
Some(cwd.as_str()),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
}
for session_key in &session_keys {
let digest = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_key, "short")
.await
.unwrap();
assert!(
digest.is_some(),
"Session {session_key} should have a short digest after shutdown"
);
}
let mut seen_digest_ids: HashSet<i64> = HashSet::new();
for session_key in &session_keys {
let digest = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_key, "short")
.await
.unwrap()
.unwrap();
assert!(
seen_digest_ids.insert(digest.id),
"Session {session_key} should have a unique digest, not a duplicate of another session"
);
}
let contradictions = fixture
.repo
.get_by_cognitive_level(fixture.namespace_id, CognitiveLevel::Contradiction, 50)
.await
.unwrap();
if !contradictions.is_empty() {
let has_session_reference = contradictions.iter().any(|m| {
let meta_str = m.metadata.to_string();
session_keys.iter().any(|sk| meta_str.contains(sk))
});
assert!(
has_session_reference,
"Contradiction memories should reference at least one of the soak sessions"
);
}
}
#[tokio::test(flavor = "current_thread")]
async fn dream_outputs_dont_corrupt_earlier_session_digests() {
let _guard = env_lock().lock().await;
let agent = "claude-code";
let session_a = "soak-digest-a";
let session_b = "soak-digest-b";
let cognition = runtime_cognition_config();
let fixture = SoakFixture::new(agent).await;
let controller_a = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
let cwd_a = "/tmp/soak-digest-a";
controller_a
.ensure_started(
agent,
Some(session_a),
Some(cwd_a),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture
.store_cognitive(
"Session A unique fact about feature X implementation",
agent,
session_a,
CognitiveLevel::Explicit,
&MemoryCategory::Facts,
)
.await;
controller_a
.flush_and_shutdown(
agent,
Some(session_a),
Some(cwd_a),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
let digest_a = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_a, "short")
.await
.unwrap()
.expect("Session A should have a digest");
let digest_a_content = digest_a.content.clone();
let controller_b = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
let cwd_b = "/tmp/soak-digest-b";
controller_b
.ensure_started(
agent,
Some(session_b),
Some(cwd_b),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture.store_contradiction_pair(agent, session_b).await;
controller_b
.flush_and_shutdown(
agent,
Some(session_b),
Some(cwd_b),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
let digest_a_after = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_a, "short")
.await
.unwrap()
.expect("Session A digest should still exist");
assert_eq!(
digest_a.id, digest_a_after.id,
"Session A digest ID should not change after session B runs"
);
assert_eq!(
digest_a_content, digest_a_after.content,
"Session A digest content should not be modified by session B"
);
let digest_b = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_b, "short")
.await
.unwrap()
.expect("Session B should have a digest");
assert_ne!(
digest_a.id, digest_b.id,
"Session A and B digests should be distinct"
);
}
#[tokio::test(flavor = "current_thread")]
async fn runtime_state_files_isolated_per_session() {
let _guard = env_lock().lock().await;
let agent = "claude-code";
let sessions = ["soak-state-x", "soak-state-y"];
let cognition = runtime_cognition_config();
let fixture = SoakFixture::new(agent).await;
let state_paths: Vec<std::path::PathBuf> = sessions
.iter()
.map(|sk| common::session_state_file(agent, sk))
.collect();
for p in &state_paths {
let _ = std::fs::remove_file(p);
}
for (i, session_key) in sessions.iter().enumerate() {
let controller = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
let cwd = format!("/tmp/soak-state-{session_key}");
controller
.ensure_started(
agent,
Some(session_key),
Some(&cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture
.store_cognitive(
&format!("Session {i} unique content for state isolation test"),
agent,
session_key,
CognitiveLevel::Explicit,
&MemoryCategory::Facts,
)
.await;
}
for (i, p) in state_paths.iter().enumerate() {
assert!(
p.exists(),
"State file for session {} should exist after ensure_started",
sessions[i]
);
}
let controller_x = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
controller_x
.flush_and_shutdown(
agent,
Some(sessions[0]),
Some("/tmp/soak-state-soak-state-x"),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
assert!(
!state_paths[0].exists(),
"Session {} state file should be removed after shutdown",
sessions[0]
);
assert!(
state_paths[1].exists(),
"Session {} state file should still exist (not yet shut down)",
sessions[1]
);
let controller_y = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
controller_y
.flush_and_shutdown(
agent,
Some(sessions[1]),
Some("/tmp/soak-state-soak-state-y"),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
assert!(
!state_paths[1].exists(),
"Session {} state file should be removed after shutdown",
sessions[1]
);
for session_key in &sessions {
assert!(
fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_key, "short")
.await
.unwrap()
.is_some(),
"Session {session_key} should have its own digest"
);
}
}
#[tokio::test(flavor = "current_thread")]
async fn reentering_session_preserves_key_no_state_bleed() {
let _guard = env_lock().lock().await;
let agent = "claude-code";
let session_key = "soak-reenter";
let cognition = runtime_cognition_config();
let fixture = SoakFixture::new(agent).await;
let cwd = "/tmp/soak-reenter";
let controller_1 = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
controller_1
.ensure_started(
agent,
Some(session_key),
Some(cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture
.store_cognitive(
"First run: feature X is implemented",
agent,
session_key,
CognitiveLevel::Explicit,
&MemoryCategory::Facts,
)
.await;
controller_1
.flush_and_shutdown(
agent,
Some(session_key),
Some(cwd),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
let controller_2 = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
controller_2
.ensure_started(
agent,
Some(session_key),
Some(cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture
.store_cognitive(
"Second run: feature Y is implemented",
agent,
session_key,
CognitiveLevel::Explicit,
&MemoryCategory::Facts,
)
.await;
controller_2
.flush_and_shutdown(
agent,
Some(session_key),
Some(cwd),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
let all_memories = fixture
.repo
.list_by_session_key(fixture.namespace_id, session_key, 100, true)
.await
.unwrap();
assert!(
all_memories.iter().any(|m| m.content.contains("feature X")),
"First run memory should persist under the same session key"
);
assert!(
all_memories.iter().any(|m| m.content.contains("feature Y")),
"Second run memory should also be stored under the same session key"
);
let digest = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_key, "short")
.await
.unwrap();
assert!(digest.is_some(), "Re-entered session should have a digest");
}
#[tokio::test(flavor = "current_thread")]
async fn dream_digest_content_references_session_memories() {
let _guard = env_lock().lock().await;
let agent = "claude-code";
let session_key = "soak-digest-ref";
let unique_marker = "UNIQUE_DIGEST_MARKER_7f3a";
let cognition = runtime_cognition_config();
let fixture = SoakFixture::new(agent).await;
let controller = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
let cwd = "/tmp/soak-digest-ref";
controller
.ensure_started(
agent,
Some(session_key),
Some(cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture
.store_cognitive(
&format!("The project uses {unique_marker} for advanced caching"),
agent,
session_key,
CognitiveLevel::Explicit,
&MemoryCategory::Facts,
)
.await;
fixture.store_contradiction_pair(agent, session_key).await;
controller
.flush_and_shutdown(
agent,
Some(session_key),
Some(cwd),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
let digest = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_key, "short")
.await
.unwrap()
.expect("Digest should exist");
let digest_meta_str = digest.metadata.to_string();
assert!(
digest_meta_str.contains(session_key),
"Digest metadata should reference the session key {session_key}, got: {digest_meta_str}"
);
let session_memories = fixture
.repo
.list_by_session_key(fixture.namespace_id, session_key, 100, true)
.await
.unwrap();
assert!(
session_memories
.iter()
.any(|m| m.content.contains(unique_marker)),
"Session memories should contain the unique marker {unique_marker}"
);
}
#[tokio::test(flavor = "current_thread")]
async fn repeated_dream_same_session_no_orphan_digests() {
let _guard = env_lock().lock().await;
let agent = "claude-code";
let session_key = "soak-dup-digest";
let cognition = runtime_cognition_config();
let fixture = SoakFixture::new(agent).await;
let cwd = "/tmp/soak-dup-digest";
let c1 = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
c1.ensure_started(
agent,
Some(session_key),
Some(cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture.store_contradiction_pair(agent, session_key).await;
c1.flush_and_shutdown(
agent,
Some(session_key),
Some(cwd),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
let digest_after_first = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_key, "short")
.await
.unwrap()
.expect("First cycle should produce a digest");
let first_id = digest_after_first.id;
let c2 = RuntimeController::new(cognition.clone(), AgentConfig::default(), None);
c2.ensure_started(
agent,
Some(session_key),
Some(cwd),
RuntimeMode::SessionScoped,
)
.await
.unwrap();
fixture
.store_cognitive(
"Second cycle: additional observation about refactoring",
agent,
session_key,
CognitiveLevel::Explicit,
&MemoryCategory::Facts,
)
.await;
c2.flush_and_shutdown(
agent,
Some(session_key),
Some(cwd),
RuntimeShutdownReason::SessionEnded,
)
.await
.unwrap();
let digest_after_second = fixture
.repo
.latest_digest_for_session(fixture.namespace_id, session_key, "short")
.await
.unwrap()
.expect("Second cycle should still have a digest");
assert_eq!(
first_id, digest_after_second.id,
"Same session key should reuse the same digest row, not create an orphan"
);
let all_memories = fixture
.repo
.list_by_session_key(fixture.namespace_id, session_key, 200, true)
.await
.unwrap();
assert!(
all_memories.len() >= 5,
"Both cycles' memories should be present under the same session key (got {})",
all_memories.len()
);
assert!(
all_memories
.iter()
.any(|m| m.content.contains("refactoring")),
"Second cycle's memory should be stored under the same session key"
);
}