use assert_cmd::Command;
use predicates::prelude::*;
use std::fs;
use tempfile::TempDir;
fn cli() -> Command {
Command::new(env!("CARGO_BIN_EXE_cortex-mem"))
}
fn config_path() -> String {
match std::env::var("CONFIG_PATH") {
Ok(p) => {
let path = std::path::Path::new(&p);
if path.is_absolute() {
p
} else {
let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("CARGO_MANIFEST_DIR has no parent");
workspace_root.join(path).to_string_lossy().to_string()
}
}
Err(_) => {
let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("CARGO_MANIFEST_DIR has no parent");
workspace_root
.join("config.toml")
.to_string_lossy()
.to_string()
}
}
}
fn tenant_id() -> String {
std::env::var("TENANT_ID").unwrap_or_else(|_| "default".to_string())
}
fn setup_temp_env() -> (TempDir, String) {
let tmp = TempDir::new().expect("Failed to create temp dir");
let data_dir = tmp.path().join("cortex-data");
fs::create_dir_all(&data_dir).expect("Failed to create data dir");
let tenants_dir = data_dir.join("tenants");
fs::create_dir_all(&tenants_dir).expect("Failed to create tenants dir");
fs::create_dir_all(tenants_dir.join("tenant-alpha")).expect("Failed to create tenant dir");
fs::create_dir_all(tenants_dir.join("tenant-beta")).expect("Failed to create tenant dir");
let config_content = format!(
r#"[qdrant]
url = "http://localhost:16334"
collection_name = "test-cortex-mem"
embedding_dim = 256
timeout_secs = 5
api_key = ""
[embedding]
api_base_url = "http://localhost:18080"
api_key = "test-key"
model_name = "test-model"
batch_size = 10
timeout_secs = 5
[llm]
api_base_url = "http://localhost:18080"
api_key = "test-key"
model_efficient = "test-model"
temperature = 0.1
max_tokens = 65536
[server]
host = "localhost"
port = 8085
cors_origins = ["*"]
[cortex]
data_dir = "{data_dir}"
[logging]
enabled = false
log_directory = "logs"
level = "error"
"#,
data_dir = data_dir.display()
);
let config_path = tmp.path().join("config.toml");
fs::write(&config_path, &config_content).expect("Failed to write config file");
let config_str = config_path.to_string_lossy().to_string();
(tmp, config_str)
}
#[test]
fn test_help_command() {
cli()
.arg("--help")
.assert()
.success()
.stdout(predicate::str::contains("cortex-mem"))
.stdout(predicate::str::contains("Usage"));
}
#[test]
fn test_version_command() {
cli()
.arg("--version")
.assert()
.success()
.stdout(predicate::str::contains("cortex-mem"));
}
#[test]
fn test_add_subcommand_help() {
cli()
.args(["add", "--help"])
.assert()
.success()
.stdout(predicate::str::contains("thread").or(predicate::str::contains("content")));
}
#[test]
fn test_search_subcommand_help() {
cli()
.args(["search", "--help"])
.assert()
.success()
.stdout(predicate::str::contains("query").or(predicate::str::contains("limit")));
}
#[test]
fn test_session_subcommand_help() {
cli().args(["session", "--help"]).assert().success().stdout(
predicate::str::contains("list")
.or(predicate::str::contains("create"))
.or(predicate::str::contains("close")),
);
}
#[test]
fn test_layers_subcommand_help() {
cli().args(["layers", "--help"]).assert().success().stdout(
predicate::str::contains("status")
.or(predicate::str::contains("ensure-all"))
.or(predicate::str::contains("regenerate-oversized")),
);
}
#[test]
fn test_tenant_subcommand_help() {
cli()
.args(["tenant", "--help"])
.assert()
.success()
.stdout(predicate::str::contains("list"));
}
#[test]
fn test_get_subcommand_help() {
cli()
.args(["get", "--help"])
.assert()
.success()
.stdout(predicate::str::contains("uri").or(predicate::str::contains("URI")));
}
#[test]
fn test_list_subcommand_help() {
cli()
.args(["list", "--help"])
.assert()
.success()
.stdout(predicate::str::contains("uri").or(predicate::str::contains("URI")));
}
#[test]
fn test_delete_subcommand_help() {
cli()
.args(["delete", "--help"])
.assert()
.success()
.stdout(predicate::str::contains("uri").or(predicate::str::contains("URI")));
}
#[test]
fn test_stats_subcommand_help() {
cli().args(["stats", "--help"]).assert().success();
}
#[test]
fn test_tenant_list_with_tenants() {
let (_tmp, config) = setup_temp_env();
cli()
.args(["-c", &config, "tenant", "list"])
.assert()
.success()
.stdout(predicate::str::contains("tenant-alpha"))
.stdout(predicate::str::contains("tenant-beta"));
}
#[test]
fn test_tenant_list_empty() {
let tmp = TempDir::new().expect("Failed to create temp dir");
let data_dir = tmp.path().join("cortex-data-empty");
fs::create_dir_all(&data_dir).expect("Failed to create data dir");
let config_content = format!(
r#"[qdrant]
url = "http://localhost:16334"
collection_name = "test-cortex-mem"
embedding_dim = 256
timeout_secs = 5
api_key = ""
[embedding]
api_base_url = "http://localhost:18080"
api_key = "test-key"
model_name = "test-model"
batch_size = 10
timeout_secs = 5
[llm]
api_base_url = "http://localhost:18080"
api_key = "test-key"
model_efficient = "test-model"
temperature = 0.1
max_tokens = 65536
[server]
host = "localhost"
port = 8085
cors_origins = ["*"]
[cortex]
data_dir = "{data_dir}"
[logging]
enabled = false
log_directory = "logs"
level = "error"
"#,
data_dir = data_dir.display()
);
let config_path = tmp.path().join("config.toml");
fs::write(&config_path, &config_content).expect("Failed to write config");
cli()
.args(["-c", &config_path.to_string_lossy(), "tenant", "list"])
.assert()
.success()
.stdout(predicate::str::contains("No tenants").or(predicate::str::contains("found")));
}
#[test]
fn test_tenant_list_missing_config() {
cli()
.args(["-c", "/nonexistent/path/config.toml", "tenant", "list"])
.assert()
.failure();
}
#[test]
fn test_add_missing_thread_arg() {
cli().args(["add", "some content"]).assert().failure();
}
#[test]
fn test_add_missing_content_arg() {
cli()
.args(["add", "--thread", "my-thread"])
.assert()
.failure();
}
#[test]
fn test_search_missing_query_arg() {
cli().args(["search"]).assert().failure();
}
#[test]
fn test_get_missing_uri_arg() {
cli().args(["get"]).assert().failure();
}
#[test]
fn test_delete_missing_uri_arg() {
cli().args(["delete"]).assert().failure();
}
#[test]
fn test_session_create_missing_thread() {
cli().args(["session", "create"]).assert().failure();
}
#[test]
fn test_session_close_missing_thread() {
cli().args(["session", "close"]).assert().failure();
}
#[test]
#[ignore = "参数验证位于 MemoryOperations 初始化之后,需要外部服务才能到达验证逻辑"]
fn test_search_invalid_min_score_over_limit() {
let config = config_path();
let tenant = tenant_id();
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"search",
"test query",
"-s",
"2.0",
])
.assert()
.failure()
.stderr(
predicate::str::contains("min_score must be between")
.or(predicate::str::contains("between 0.0 and 1.0")),
);
}
#[test]
#[ignore = "URI 验证位于 MemoryOperations 初始化之后,需要外部服务才能到达验证逻辑"]
fn test_get_invalid_uri_scheme() {
let config = config_path();
let tenant = tenant_id();
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"get",
"http://invalid-scheme/path",
])
.assert()
.failure()
.stderr(
predicate::str::contains("Invalid URI scheme")
.or(predicate::str::contains("invalid").or(predicate::str::contains("error"))),
);
}
#[test]
fn test_list_no_config_fails() {
cli()
.args(["-c", "/tmp/nonexistent_config_xyzabc.toml", "list"])
.assert()
.failure();
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_add_user_message() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-add-{}", uuid_short());
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"add",
"--thread",
&thread_id,
"--role",
"user",
"Hello, this is a test message from cli test",
])
.assert()
.success()
.stdout(predicate::str::contains("successfully").or(predicate::str::contains("✓")))
.stdout(predicate::str::contains("cortex://session"));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_add_assistant_message() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-add-asst-{}", uuid_short());
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"add",
"--thread",
&thread_id,
"--role",
"assistant",
"This is an assistant response for testing",
])
.assert()
.success()
.stdout(predicate::str::contains("successfully").or(predicate::str::contains("✓")));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_list_default_uri() {
let config = config_path();
let tenant = tenant_id();
cli()
.args(["-c", &config, "--tenant", &tenant, "list"])
.assert()
.success();
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_list_user_dimension() {
let config = config_path();
let tenant = tenant_id();
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"list",
"--uri",
"cortex://user",
])
.assert()
.success()
.stdout(predicate::str::contains("Found").or(predicate::str::contains("No memories")));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_list_after_add() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-list-{}", uuid_short());
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"add",
"--thread",
&thread_id,
"Test message for list verification",
])
.assert()
.success();
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"list",
"--uri",
&format!("cortex://session/{}", thread_id),
])
.assert()
.success()
.stdout(predicate::str::contains("Found").or(predicate::str::contains("item")));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_get_after_add() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-get-{}", uuid_short());
let unique_content = format!("Unique test content {}", uuid_short());
let add_output = cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"add",
"--thread",
&thread_id,
&unique_content,
])
.assert()
.success()
.get_output()
.stdout
.clone();
let output_str = String::from_utf8_lossy(&add_output);
let uri = extract_uri_from_output(&output_str);
if let Some(uri) = uri {
cli()
.args(["-c", &config, "--tenant", &tenant, "get", &uri])
.assert()
.success()
.stdout(predicate::str::contains(unique_content));
} else {
println!("WARN: Could not extract URI from add output, skipping get check");
}
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_get_abstract_only() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-abstract-{}", uuid_short());
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"add",
"--thread",
&thread_id,
"Content to test abstract layer retrieval",
])
.assert()
.success();
let list_output = cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"list",
"--uri",
&format!("cortex://session/{}", thread_id),
])
.assert()
.success()
.get_output()
.stdout
.clone();
let output_str = String::from_utf8_lossy(&list_output);
println!("List output: {}", output_str);
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_delete_after_add() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-delete-{}", uuid_short());
let add_output = cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"add",
"--thread",
&thread_id,
"Message to be deleted",
])
.assert()
.success()
.get_output()
.stdout
.clone();
let output_str = String::from_utf8_lossy(&add_output);
let uri = extract_uri_from_output(&output_str);
if let Some(uri) = uri {
cli()
.args(["-c", &config, "--tenant", &tenant, "delete", &uri])
.assert()
.success()
.stdout(
predicate::str::contains("deleted").or(predicate::str::contains("successfully")),
);
} else {
println!("WARN: Could not extract URI from add output, skipping delete check");
}
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_search_basic() {
let config = config_path();
let tenant = tenant_id();
let output = cli()
.args(["-c", &config, "--tenant", &tenant, "search", "test query"])
.output()
.expect("Failed to run command");
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
if output.status.success() {
assert!(
stdout.contains("Found") || stdout.contains("results") || stdout.contains("0 results"),
"Expected search result output, got: {}",
stdout
);
} else {
let is_network_or_service_error = stderr.contains("Embedding error")
|| stderr.contains("HTTP request failed")
|| stderr.contains("connection refused")
|| stderr.contains("Vector store error")
|| stderr.contains("tonic::transport");
assert!(
is_network_or_service_error,
"Unexpected failure (not a network/service error): stderr={}",
stderr
);
println!(
"INFO: search failed due to service unavailability (acceptable): {}",
stderr.trim()
);
}
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_search_with_options() {
let config = config_path();
let tenant = tenant_id();
let output = cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"search",
"test query",
"--limit",
"5",
"--min-score",
"0.5",
])
.output()
.expect("Failed to run command");
let stderr = String::from_utf8_lossy(&output.stderr);
if !output.status.success() {
let is_service_error = stderr.contains("Embedding error")
|| stderr.contains("HTTP request failed")
|| stderr.contains("connection refused")
|| stderr.contains("Vector store error");
assert!(is_service_error, "Unexpected failure: {}", stderr);
}
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_search_user_scope() {
let config = config_path();
let tenant = tenant_id();
let output = cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"search",
"user preference query",
"--scope",
"user",
])
.output()
.expect("Failed to run command");
let stderr = String::from_utf8_lossy(&output.stderr);
if !output.status.success() {
let is_service_error = stderr.contains("Embedding error")
|| stderr.contains("HTTP request failed")
|| stderr.contains("connection refused")
|| stderr.contains("Vector store error");
assert!(is_service_error, "Unexpected failure: {}", stderr);
}
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_search_in_thread() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-search-{}", uuid_short());
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"add",
"--thread",
&thread_id,
"Rust programming language features",
])
.assert()
.success();
let search_output = cli()
.args([
"-c", &config, "--tenant", &tenant, "search", "Rust", "--thread", &thread_id,
])
.output()
.expect("Failed to run search command");
let search_stderr = String::from_utf8_lossy(&search_output.stderr);
if !search_output.status.success() {
let is_service_error = search_stderr.contains("Embedding error")
|| search_stderr.contains("HTTP request failed")
|| search_stderr.contains("connection refused")
|| search_stderr.contains("Vector store error");
assert!(
is_service_error,
"Unexpected search failure: {}",
search_stderr
);
println!(
"INFO: search in thread failed due to service unavailability: {}",
search_stderr.trim()
);
}
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_session_list() {
let config = config_path();
let tenant = tenant_id();
cli()
.args(["-c", &config, "--tenant", &tenant, "session", "list"])
.assert()
.success()
.stdout(
predicate::str::contains("sessions")
.or(predicate::str::contains("No sessions"))
.or(predicate::str::contains("Found")),
);
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_session_create() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-session-create-{}", uuid_short());
cli()
.args([
"-c", &config, "--tenant", &tenant, "session", "create", &thread_id,
])
.assert()
.success()
.stdout(predicate::str::contains("created").or(predicate::str::contains("✓")));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_session_create_with_title() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-session-titled-{}", uuid_short());
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"session",
"create",
&thread_id,
"--title",
"My Test Session",
])
.assert()
.success()
.stdout(predicate::str::contains("created").or(predicate::str::contains("✓")));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_session_close_after_create() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-session-close-{}", uuid_short());
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"add",
"--thread",
&thread_id,
"A test message before closing session",
])
.assert()
.success();
cli()
.args([
"-c", &config, "--tenant", &tenant, "session", "close", &thread_id,
])
.assert()
.success()
.stdout(predicate::str::contains("closed").or(predicate::str::contains("completed")));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_stats() {
let config = config_path();
let tenant = tenant_id();
cli()
.args(["-c", &config, "--tenant", &tenant, "stats"])
.assert()
.success()
.stdout(predicate::str::contains("Statistics").or(predicate::str::contains("Sessions")));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_layers_status() {
let config = config_path();
let tenant = tenant_id();
cli()
.args(["-c", &config, "--tenant", &tenant, "layers", "status"])
.assert()
.success()
.stdout(
predicate::str::contains("Layer file status")
.or(predicate::str::contains("Total directories")),
);
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_layers_ensure_all() {
let config = config_path();
let tenant = tenant_id();
cli()
.args(["-c", &config, "--tenant", &tenant, "layers", "ensure-all"])
.assert()
.success()
.stdout(predicate::str::contains("Statistics").or(predicate::str::contains("Generated")));
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_layers_regenerate_oversized() {
let config = config_path();
let tenant = tenant_id();
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"layers",
"regenerate-oversized",
])
.assert()
.success()
.stdout(
predicate::str::contains("Statistics")
.or(predicate::str::contains("Oversized"))
.or(predicate::str::contains("All .abstract files")),
);
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_verbose_mode() {
let config = config_path();
let tenant = tenant_id();
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"--verbose",
"session",
"list",
])
.assert()
.success();
}
#[test]
#[ignore = "需要外部服务 (Qdrant + LLM + Embedding),请配置环境变量后运行"]
fn test_full_workflow() {
let config = config_path();
let tenant = tenant_id();
let thread_id = format!("cli-test-workflow-{}", uuid_short());
let content = format!("Workflow test content {}", uuid_short());
let add_output = cli()
.args([
"-c", &config, "--tenant", &tenant, "add", "--thread", &thread_id, &content,
])
.assert()
.success()
.get_output()
.stdout
.clone();
let output_str = String::from_utf8_lossy(&add_output);
println!("Add output: {}", output_str);
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"list",
"--uri",
&format!("cortex://session/{}", thread_id),
])
.assert()
.success();
cli()
.args([
"-c",
&config,
"--tenant",
&tenant,
"search",
"Workflow test",
"--thread",
&thread_id,
])
.assert()
.success();
cli()
.args(["-c", &config, "--tenant", &tenant, "stats"])
.assert()
.success();
}
fn uuid_short() -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::time::SystemTime;
let mut hasher = DefaultHasher::new();
SystemTime::now().hash(&mut hasher);
std::thread::current().id().hash(&mut hasher);
format!("{:08x}", hasher.finish())
}
fn extract_uri_from_output(output: &str) -> Option<String> {
output.lines().find_map(|line| {
if let Some(pos) = line.find("cortex://") {
let uri_start = &line[pos..];
let uri_end = uri_start
.find(|c: char| c.is_whitespace())
.unwrap_or(uri_start.len());
Some(uri_start[..uri_end].to_string())
} else {
None
}
})
}