mod test_helpers;
use std::fs;
use std::path::PathBuf;
use std::process::Command;
use tempfile::TempDir;
struct TestEnv {
temp_dir: TempDir,
binary_path: PathBuf,
venv_path_env: String,
venv_root: PathBuf,
}
impl TestEnv {
fn new() -> Option<Self> {
let venv_root = test_helpers::setup_execution_venv()?;
let venv_path_env = test_helpers::setup_venv_environment()?;
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let binary_path = env!("CARGO_BIN_EXE_nb").into();
Some(Self {
temp_dir,
binary_path,
venv_path_env,
venv_root,
})
}
fn notebook_path(&self, name: &str) -> PathBuf {
self.temp_dir.path().join(name)
}
fn copy_fixture(&self, fixture_name: &str, dest_name: &str) -> PathBuf {
let fixture_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join(fixture_name);
let dest_path = self.notebook_path(dest_name);
fs::copy(&fixture_path, &dest_path)
.unwrap_or_else(|_| panic!("Failed to copy fixture {}", fixture_name));
dest_path
}
fn copy_fixture_dir(&self, fixture_subdir: &str, dest_name: &str) -> PathBuf {
let fixture_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
.join(fixture_subdir);
let dest_path = self.notebook_path(dest_name);
fn copy_dir_recursive(src: &std::path::Path, dst: &std::path::Path) -> std::io::Result<()> {
fs::create_dir_all(dst)?;
for entry in fs::read_dir(src)? {
let entry = entry?;
let ty = entry.file_type()?;
let src_path = entry.path();
let dst_path = dst.join(entry.file_name());
if ty.is_dir() {
copy_dir_recursive(&src_path, &dst_path)?;
} else {
fs::copy(&src_path, &dst_path)?;
}
}
Ok(())
}
copy_dir_recursive(&fixture_path, &dest_path)
.unwrap_or_else(|_| panic!("Failed to copy fixture directory {}", fixture_subdir));
dest_path
}
fn run(&self, args: &[&str]) -> CommandResult {
let output = Command::new(&self.binary_path)
.args(args)
.current_dir(self.temp_dir.path())
.env("PATH", &self.venv_path_env)
.env("VIRTUAL_ENV", &self.venv_root)
.env_remove("PYTHONHOME") .output()
.expect("Failed to execute command");
CommandResult {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
success: output.status.success(),
}
}
}
struct CommandResult {
stdout: String,
stderr: String,
success: bool,
}
impl CommandResult {
fn assert_success(self) -> Self {
if !self.success {
panic!(
"Command failed:\nStderr: {}\nStdout: {}",
self.stderr, self.stdout
);
}
self
}
fn assert_failure(self) -> Self {
if self.success {
panic!(
"Expected command to fail but it succeeded:\nStdout: {}\nStderr: {}",
self.stdout, self.stderr
);
}
self
}
}
#[test]
fn test_execute_single_cell() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
let result = env
.run(&["execute", nb_path.to_str().unwrap(), "--cell-index", "0"])
.assert_success();
assert!(
test_helpers::parse_notebook_header(&result.stdout).is_some(),
"Execute stdout should contain @@notebook header"
);
}
#[test]
fn test_execute_cell_with_output() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
let exec_result = env
.run(&["execute", nb_path.to_str().unwrap()])
.assert_success();
let outputs = test_helpers::parse_outputs(&exec_result.stdout);
assert!(
!outputs.is_empty(),
"Execute stdout should contain @@output sentinels"
);
assert!(
exec_result.stdout.contains("Result: 52"),
"Execute stdout should contain the print output"
);
let result = env
.run(&["read", nb_path.to_str().unwrap(), "--cell-index", "2"])
.assert_success();
let cells = test_helpers::parse_cells(&result.stdout);
assert_eq!(cells.len(), 1);
assert_eq!(cells[0].get_str("cell_type"), Some("code"));
let read_outputs = test_helpers::parse_outputs(&result.stdout);
assert!(
!read_outputs.is_empty(),
"Cell should have outputs after execution"
);
assert!(result.stdout.contains("Result: 52"));
}
#[test]
fn test_execute_cell_by_id() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
let result = env
.run(&["execute", nb_path.to_str().unwrap(), "--cell", "cell-1"])
.assert_success();
assert!(
test_helpers::parse_notebook_header(&result.stdout).is_some(),
"Execute stdout should contain @@notebook header"
);
}
#[test]
fn test_execute_notebook_preserves_state() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
let exec_result = env
.run(&["execute", nb_path.to_str().unwrap()])
.assert_success();
assert!(
exec_result.stdout.contains("Result: 52"),
"Execute stdout should contain the computed result"
);
let result = env
.run(&["read", nb_path.to_str().unwrap(), "--cell-index", "2"])
.assert_success();
assert!(result.stdout.contains("Result: 52"));
}
#[test]
fn test_execute_entire_notebook() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
let exec_result = env
.run(&["execute", nb_path.to_str().unwrap()])
.assert_success();
let cells = test_helpers::parse_cells(&exec_result.stdout);
assert!(!cells.is_empty(), "Should have cells in execute stdout");
let code_cells: Vec<_> = cells
.iter()
.filter(|c| c.get_str("cell_type") == Some("code"))
.collect();
assert!(!code_cells.is_empty(), "Should have code cells");
for cell in &code_cells {
assert!(
cell.get_i64("execution_count").is_some(),
"Code cell at index {:?} should have execution_count after full notebook execution",
cell.get_i64("index")
);
}
let read_result = env
.run(&["read", nb_path.to_str().unwrap()])
.assert_success();
let read_cells = test_helpers::parse_cells(&read_result.stdout);
let read_code_cells: Vec<_> = read_cells
.iter()
.filter(|c| c.get_str("cell_type") == Some("code"))
.collect();
for cell in &read_code_cells {
assert!(
cell.get_i64("execution_count").is_some(),
"Persisted code cell should have execution_count"
);
}
}
#[test]
fn test_execute_notebook_with_range() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
env.run(&[
"execute",
nb_path.to_str().unwrap(),
"--start",
"0",
"--end",
"1",
])
.assert_success();
}
#[test]
fn test_execute_with_error() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("with_error.ipynb", "test.ipynb");
let result = env
.run(&["execute", nb_path.to_str().unwrap()])
.assert_failure();
assert!(
test_helpers::parse_notebook_header(&result.stdout).is_some(),
"Failed execute should still output @@notebook header"
);
let outputs = test_helpers::parse_outputs(&result.stdout);
assert!(
!outputs.is_empty(),
"Failed execute should include outputs (error or successful cells)"
);
}
#[test]
fn test_execute_with_allow_errors() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("with_error.ipynb", "test.ipynb");
let result = env.run(&["execute", nb_path.to_str().unwrap(), "--allow-errors"]);
assert!(
test_helpers::parse_notebook_header(&result.stdout).is_some(),
"Should have @@notebook header in stdout"
);
let cells = test_helpers::parse_cells(&result.stdout);
let code_cells: Vec<_> = cells
.iter()
.filter(|c| c.get_str("cell_type") == Some("code"))
.collect();
assert!(
!code_cells.is_empty(),
"Should have code cells in execute output"
);
assert!(
code_cells[0].get_i64("execution_count").is_some(),
"First code cell should have execution_count"
);
let read_result = env
.run(&["read", nb_path.to_str().unwrap(), "--cell-index", "0"])
.assert_success();
assert!(read_result.stdout.contains("execution_count"));
}
#[test]
fn test_execute_with_timeout() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
env.run(&[
"execute",
nb_path.to_str().unwrap(),
"--cell-index",
"0",
"--timeout",
"60",
])
.assert_success();
}
#[test]
fn test_execute_last_cell_with_negative_index() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.notebook_path("test.ipynb");
env.run(&["create", nb_path.to_str().unwrap()])
.assert_success();
env.run(&[
"cell",
"add",
nb_path.to_str().unwrap(),
"--source",
"x = 10",
])
.assert_success();
env.run(&[
"cell",
"add",
nb_path.to_str().unwrap(),
"--source",
"result = 2 + 2",
])
.assert_success();
let result = env
.run(&["execute", nb_path.to_str().unwrap(), "--cell-index", "-1"])
.assert_success();
assert!(
test_helpers::parse_notebook_header(&result.stdout).is_some(),
"Execute stdout should contain @@notebook header"
);
}
#[test]
#[ignore] fn test_execute_dry_run() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
env.run(&[
"execute",
nb_path.to_str().unwrap(),
"--cell-index",
"0",
"--dry-run",
])
.assert_success();
let result = env
.run(&["read", nb_path.to_str().unwrap()])
.assert_success();
assert!(result.stdout.contains("\"execution_count\": null"));
}
#[test]
fn test_execute_json_format() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
let result = env
.run(&[
"execute",
nb_path.to_str().unwrap(),
"--cell-index",
"0",
"--json",
])
.assert_success();
let json: serde_json::Value =
serde_json::from_str(&result.stdout).expect("Should output valid JSON");
assert!(
json.get("success").is_some(),
"JSON should have success field"
);
assert!(json.get("cells").is_some(), "JSON should have cells array");
}
#[test]
fn test_workflow_create_add_execute() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.notebook_path("workflow.ipynb");
env.run(&["create", nb_path.to_str().unwrap()])
.assert_success();
env.run(&[
"cell",
"add",
nb_path.to_str().unwrap(),
"--source",
"result = 2 + 2",
])
.assert_success();
env.run(&[
"cell",
"add",
nb_path.to_str().unwrap(),
"--source",
"print(f'Answer: {result}')",
])
.assert_success();
let exec_result = env
.run(&["execute", nb_path.to_str().unwrap()])
.assert_success();
assert!(
exec_result.stdout.contains("Answer: 4"),
"Execute stdout should contain the computed answer"
);
let result = env
.run(&["read", nb_path.to_str().unwrap(), "--cell-index", "2"])
.assert_success();
assert!(result.stdout.contains("Answer: 4"));
}
#[test]
fn test_workflow_modify_and_reexecute() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
env.run(&["execute", nb_path.to_str().unwrap()])
.assert_success();
env.run(&[
"cell",
"update",
nb_path.to_str().unwrap(),
"--cell-index",
"0",
"--source",
"x = 100",
])
.assert_success();
let exec_result = env
.run(&["execute", nb_path.to_str().unwrap()])
.assert_success();
assert!(
exec_result.stdout.contains("Result: 110"),
"Execute stdout should show Result: 110 after modifying x to 100"
);
let result = env
.run(&["read", nb_path.to_str().unwrap(), "--cell-index", "2"])
.assert_success();
assert!(result.stdout.contains("Result: 110"));
}
#[test]
fn test_execute_with_relative_paths() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let subdir_path = env.copy_fixture_dir("subdir", "subdir");
let nb_path = subdir_path.join("with_relative_path.ipynb");
env.run(&["execute", nb_path.to_str().unwrap()])
.assert_success();
let result = env
.run(&["read", nb_path.to_str().unwrap(), "--cell-index", "0"])
.assert_success();
assert!(result.stdout.contains("Hello from relative path!"));
}
#[test]
fn test_execute_output_matches_read() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
let exec_result = env
.run(&["execute", nb_path.to_str().unwrap()])
.assert_success();
let read_result = env
.run(&["read", nb_path.to_str().unwrap()])
.assert_success();
let exec_cells = test_helpers::parse_cells(&exec_result.stdout);
let read_cells = test_helpers::parse_cells(&read_result.stdout);
assert_eq!(
exec_cells.len(),
read_cells.len(),
"Execute and read should produce the same number of cells"
);
let exec_outputs = test_helpers::parse_outputs(&exec_result.stdout);
let read_outputs = test_helpers::parse_outputs(&read_result.stdout);
assert_eq!(
exec_outputs.len(),
read_outputs.len(),
"Execute and read should produce the same number of outputs"
);
for (exec_out, read_out) in exec_outputs.iter().zip(read_outputs.iter()) {
assert_eq!(
exec_out.get_str("output_type"),
read_out.get_str("output_type"),
"Output types should match between execute and read"
);
}
}
#[test]
fn test_execute_error_shows_partial_results_and_error() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("with_error.ipynb", "test.ipynb");
let result = env
.run(&["execute", nb_path.to_str().unwrap()])
.assert_failure();
assert!(
test_helpers::parse_notebook_header(&result.stdout).is_some(),
"Should have @@notebook header despite failure"
);
let cells = test_helpers::parse_cells(&result.stdout);
let code_cells: Vec<_> = cells
.iter()
.filter(|c| c.get_str("cell_type") == Some("code"))
.collect();
assert!(
code_cells.len() >= 2,
"Should have at least 2 code cells in output"
);
assert!(
code_cells[0].get_i64("execution_count").is_some(),
"First code cell should have execution_count (it succeeded)"
);
let outputs = test_helpers::parse_outputs(&result.stdout);
let error_outputs: Vec<_> = outputs
.iter()
.filter(|o| o.get_str("output_type") == Some("error"))
.collect();
assert!(
!error_outputs.is_empty(),
"Should have an error output from the failed cell"
);
assert_eq!(
error_outputs[0].get_str("ename"),
Some("NameError"),
"Error should be a NameError"
);
let read_result = env
.run(&["read", nb_path.to_str().unwrap()])
.assert_success();
let read_cells = test_helpers::parse_cells(&read_result.stdout);
let read_code_cells: Vec<_> = read_cells
.iter()
.filter(|c| c.get_str("cell_type") == Some("code"))
.collect();
assert!(
read_code_cells[0].get_i64("execution_count").is_some(),
"Persisted first cell should have execution_count"
);
}
#[test]
fn test_execute_json_includes_outputs() {
let Some(env) = TestEnv::new() else {
eprintln!("⚠️ Skipping test: execution environment not available");
return;
};
let nb_path = env.copy_fixture("for_execution.ipynb", "test.ipynb");
let result = env
.run(&["execute", nb_path.to_str().unwrap(), "--json"])
.assert_success();
let json: serde_json::Value =
serde_json::from_str(&result.stdout).expect("Should output valid JSON");
assert_eq!(json["success"], true);
assert!(json["executed_cells"].as_u64().unwrap() > 0);
let cells = json["cells"].as_array().expect("Should have cells array");
let code_cells: Vec<_> = cells.iter().filter(|c| c["cell_type"] == "code").collect();
let last_cell = code_cells.last().expect("Should have code cells");
let outputs = last_cell["outputs"]
.as_array()
.expect("Last code cell should have outputs");
assert!(!outputs.is_empty(), "Should have at least one output");
let output_text = serde_json::to_string(outputs).unwrap();
assert!(
output_text.contains("Result: 52"),
"Output should contain 'Result: 52'"
);
}