use assert_cmd::Command;
use tempfile::TempDir;
#[test]
fn test_dataset_name_rejects_directory_traversal_parent() {
let temp_datasets_root = TempDir::new().unwrap();
let schema_path = temp_datasets_root.path().join("test_schema.cql");
std::fs::write(
&schema_path,
"CREATE TABLE test_table (id int PRIMARY KEY);",
)
.unwrap();
let mut cmd = Command::cargo_bin("cqlite").unwrap();
cmd.env("CQLITE_DATASETS_ROOT", temp_datasets_root.path())
.arg("--dataset")
.arg("../../../etc") .arg("--schema")
.arg(&schema_path)
.arg("query")
.arg("SELECT * FROM test_table");
let output = cmd.output().unwrap();
assert!(!output.status.success());
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("Invalid dataset name") || stderr.contains("must not contain"),
"Expected security validation error, got: {stderr}"
);
}
#[test]
fn test_dataset_name_rejects_forward_slash() {
let temp_datasets_root = TempDir::new().unwrap();
let schema_path = temp_datasets_root.path().join("test_schema.cql");
std::fs::write(
&schema_path,
"CREATE TABLE test_table (id int PRIMARY KEY);",
)
.unwrap();
let mut cmd = Command::cargo_bin("cqlite").unwrap();
cmd.env("CQLITE_DATASETS_ROOT", temp_datasets_root.path())
.arg("--dataset")
.arg("foo/bar/baz")
.arg("--schema")
.arg(&schema_path)
.arg("query")
.arg("SELECT * FROM test_table");
let output = cmd.output().unwrap();
assert!(!output.status.success());
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("Invalid dataset name") || stderr.contains("'/'"),
"Expected security validation error, got: {stderr}"
);
}
#[test]
fn test_dataset_name_rejects_backslash() {
let temp_datasets_root = TempDir::new().unwrap();
let schema_path = temp_datasets_root.path().join("test_schema.cql");
std::fs::write(
&schema_path,
"CREATE TABLE test_table (id int PRIMARY KEY);",
)
.unwrap();
let mut cmd = Command::cargo_bin("cqlite").unwrap();
cmd.env("CQLITE_DATASETS_ROOT", temp_datasets_root.path())
.arg("--dataset")
.arg("..\\..\\windows\\system32")
.arg("--schema")
.arg(&schema_path)
.arg("query")
.arg("SELECT * FROM test_table");
let output = cmd.output().unwrap();
assert!(!output.status.success());
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("Invalid dataset name") || stderr.contains("'\\'"),
"Expected security validation error, got: {stderr}"
);
}
#[test]
fn test_dataset_name_rejects_leading_dot() {
let temp_datasets_root = TempDir::new().unwrap();
let schema_path = temp_datasets_root.path().join("test_schema.cql");
std::fs::write(
&schema_path,
"CREATE TABLE test_table (id int PRIMARY KEY);",
)
.unwrap();
let mut cmd = Command::cargo_bin("cqlite").unwrap();
cmd.env("CQLITE_DATASETS_ROOT", temp_datasets_root.path())
.arg("--dataset")
.arg(".hidden_config")
.arg("--schema")
.arg(&schema_path)
.arg("query")
.arg("SELECT * FROM test_table");
let output = cmd.output().unwrap();
assert!(!output.status.success());
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("Invalid dataset name") || stderr.contains("start with '.'"),
"Expected security validation error, got: {stderr}"
);
}
#[test]
fn test_dataset_name_accepts_valid_names() {
let temp_datasets_root = TempDir::new().unwrap();
let sstables_dir = temp_datasets_root
.path()
.join("sstables")
.join("valid_dataset");
std::fs::create_dir_all(&sstables_dir).unwrap();
let schema_path = temp_datasets_root.path().join("test_schema.cql");
std::fs::write(
&schema_path,
"CREATE TABLE test_table (id int PRIMARY KEY);",
)
.unwrap();
let mut cmd = Command::cargo_bin("cqlite").unwrap();
cmd.env("CQLITE_DATASETS_ROOT", temp_datasets_root.path())
.arg("--dataset")
.arg("valid_dataset") .arg("--schema")
.arg(&schema_path)
.arg("query")
.arg("SELECT * FROM test_table");
let output = cmd.output().unwrap();
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
!stderr.contains("Invalid dataset name"),
"Valid dataset name should not trigger validation error, got: {stderr}"
);
}
#[test]
#[cfg(unix)] fn test_dataset_canonicalization_prevents_symlink_escape() {
use std::os::unix::fs::symlink;
let temp_root = TempDir::new().unwrap();
let datasets_root = temp_root.path().join("datasets");
let sstables_dir = datasets_root.join("sstables");
std::fs::create_dir_all(&sstables_dir).unwrap();
let outside_dir = temp_root.path().join("external_data");
std::fs::create_dir(&outside_dir).unwrap();
let symlink_path = sstables_dir.join("malicious");
symlink(&outside_dir, &symlink_path).unwrap();
let schema_path = temp_root.path().join("test_schema.cql");
std::fs::write(
&schema_path,
"CREATE TABLE test_table (id int PRIMARY KEY);",
)
.unwrap();
let mut cmd = Command::cargo_bin("cqlite").unwrap();
cmd.env("CQLITE_DATASETS_ROOT", &datasets_root)
.arg("--dataset")
.arg("malicious") .arg("--schema")
.arg(&schema_path)
.arg("query")
.arg("SELECT * FROM test_table");
let output = cmd.output().unwrap();
assert!(!output.status.success());
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("Security violation") || stderr.contains("escaped"),
"Expected security violation for symlink escape, got: {stderr}"
);
}
#[test]
fn test_dataset_name_handles_unicode() {
let temp_datasets_root = TempDir::new().unwrap();
let schema_path = temp_datasets_root.path().join("test_schema.cql");
std::fs::write(
&schema_path,
"CREATE TABLE test_table (id int PRIMARY KEY);",
)
.unwrap();
let problematic_names = vec![
"dataset\u{202E}txt.exe", "dataset\u{200B}name", "dataset\u{FEFF}name", ];
for name in problematic_names {
let mut cmd = Command::cargo_bin("cqlite").unwrap();
cmd.env("CQLITE_DATASETS_ROOT", temp_datasets_root.path())
.arg("--dataset")
.arg(name)
.arg("--schema")
.arg(&schema_path)
.arg("query")
.arg("SELECT * FROM test_table");
let output = cmd.output().unwrap();
if output.status.success() {
}
}
}