use chrono::Utc;
use flate2::write::GzEncoder;
use flate2::Compression;
use tar::Builder;
use tempfile::TempDir;
use cloacina::packaging::{
ManifestV2, ManifestValidationError, PackageInfoV2, PackageLanguage, PythonRuntime,
RustRuntime, TaskDefinitionV2,
};
use cloacina::registry::loader::{
detect_package_kind, extract_python_package, peek_manifest, PackageKind,
};
fn build_archive(manifest: &ManifestV2, workflow_files: &[(&str, &[u8])]) -> Vec<u8> {
let buf = Vec::new();
let enc = GzEncoder::new(buf, Compression::fast());
let mut builder = Builder::new(enc);
let manifest_json = serde_json::to_vec_pretty(manifest).unwrap();
let mut header = tar::Header::new_gnu();
header.set_size(manifest_json.len() as u64);
header.set_mode(0o644);
header.set_cksum();
builder
.append_data(&mut header, "manifest.json", manifest_json.as_slice())
.unwrap();
for (path, content) in workflow_files {
let mut h = tar::Header::new_gnu();
h.set_size(content.len() as u64);
h.set_mode(0o644);
h.set_cksum();
builder.append_data(&mut h, *path, *content).unwrap();
}
let mut dh = tar::Header::new_gnu();
dh.set_size(0);
dh.set_entry_type(tar::EntryType::Directory);
dh.set_mode(0o755);
dh.set_cksum();
builder
.append_data(&mut dh, "vendor/", &[] as &[u8])
.unwrap();
let enc = builder.into_inner().unwrap();
enc.finish().unwrap()
}
fn data_pipeline_manifest() -> ManifestV2 {
ManifestV2 {
format_version: "2".to_string(),
package: PackageInfoV2 {
name: "data-pipeline-example".to_string(),
version: "1.0.0".to_string(),
description: Some("Example Python workflow for Cloacina".to_string()),
fingerprint: "sha256:abc123".to_string(),
targets: vec!["linux-x86_64".to_string(), "macos-arm64".to_string()],
},
language: PackageLanguage::Python,
python: Some(PythonRuntime {
requires_python: ">=3.11".to_string(),
entry_module: "data_pipeline.tasks".to_string(),
}),
rust: None,
tasks: vec![
TaskDefinitionV2 {
id: "fetch-data".to_string(),
function: "data_pipeline.tasks:fetch_data".to_string(),
dependencies: vec![],
description: Some("Fetch data from external source".to_string()),
retries: 0,
timeout_seconds: None,
},
TaskDefinitionV2 {
id: "validate-data".to_string(),
function: "data_pipeline.tasks:validate_data".to_string(),
dependencies: vec!["fetch-data".to_string()],
description: Some("Validate raw data".to_string()),
retries: 0,
timeout_seconds: None,
},
TaskDefinitionV2 {
id: "aggregate-data".to_string(),
function: "data_pipeline.tasks:aggregate_data".to_string(),
dependencies: vec!["validate-data".to_string()],
description: Some("Compute summary statistics".to_string()),
retries: 0,
timeout_seconds: None,
},
TaskDefinitionV2 {
id: "generate-report".to_string(),
function: "data_pipeline.tasks:generate_report".to_string(),
dependencies: vec!["aggregate-data".to_string()],
description: Some("Produce summary report".to_string()),
retries: 0,
timeout_seconds: None,
},
],
created_at: Utc::now(),
signature: None,
}
}
fn data_pipeline_files() -> Vec<(&'static str, &'static [u8])> {
vec![
(
"workflow/data_pipeline/__init__.py",
b"# Data Pipeline Example\n",
),
(
"workflow/data_pipeline/tasks.py",
b"from cloaca import task\n\n@task(id=\"fetch-data\", dependencies=[])\ndef fetch_data(context):\n return context\n",
),
]
}
#[test]
fn peek_manifest_returns_correct_metadata() {
let manifest = data_pipeline_manifest();
let archive = build_archive(&manifest, &data_pipeline_files());
let peeked = peek_manifest(&archive).unwrap();
assert_eq!(peeked.package.name, "data-pipeline-example");
assert_eq!(peeked.package.version, "1.0.0");
assert_eq!(peeked.language, PackageLanguage::Python);
assert_eq!(peeked.tasks.len(), 4);
}
#[test]
fn detect_package_kind_identifies_python() {
let manifest = data_pipeline_manifest();
let archive = build_archive(&manifest, &data_pipeline_files());
let kind = detect_package_kind(&archive).unwrap();
assert!(matches!(kind, PackageKind::Python(_)));
}
#[test]
fn detect_package_kind_identifies_rust() {
let mut manifest = data_pipeline_manifest();
manifest.language = PackageLanguage::Rust;
manifest.python = None;
manifest.rust = Some(RustRuntime {
library_path: "lib/libworkflow.so".to_string(),
});
manifest.tasks[0].function = "execute_task".to_string();
manifest.tasks[1].function = "execute_task".to_string();
manifest.tasks[2].function = "execute_task".to_string();
manifest.tasks[3].function = "execute_task".to_string();
let archive = build_archive(&manifest, &data_pipeline_files());
let kind = detect_package_kind(&archive).unwrap();
assert!(matches!(kind, PackageKind::Rust(_)));
}
#[test]
fn extract_python_package_full_roundtrip() {
let manifest = data_pipeline_manifest();
let archive = build_archive(&manifest, &data_pipeline_files());
let staging = TempDir::new().unwrap();
let extracted = extract_python_package(&archive, staging.path()).unwrap();
assert!(extracted.root_dir.exists());
assert!(extracted.workflow_dir.exists());
assert!(extracted.vendor_dir.parent().is_some());
assert_eq!(extracted.manifest.package.name, "data-pipeline-example");
assert_eq!(extracted.manifest.tasks.len(), 4);
assert_eq!(extracted.entry_module, "data_pipeline.tasks");
assert!(extracted
.workflow_dir
.join("data_pipeline/__init__.py")
.exists());
assert!(extracted
.workflow_dir
.join("data_pipeline/tasks.py")
.exists());
}
#[test]
fn extract_rejects_rust_archive() {
let mut manifest = data_pipeline_manifest();
manifest.language = PackageLanguage::Rust;
manifest.python = None;
manifest.rust = Some(RustRuntime {
library_path: "lib/libworkflow.so".to_string(),
});
manifest
.tasks
.iter_mut()
.for_each(|t| t.function = "ffi_entry".to_string());
let archive = build_archive(&manifest, &data_pipeline_files());
let staging = TempDir::new().unwrap();
let err = extract_python_package(&archive, staging.path()).unwrap_err();
assert!(
err.to_string().contains("Wrong language")
|| err.to_string().contains("wrong language")
|| format!("{:?}", err).contains("WrongLanguage"),
"Expected WrongLanguage error, got: {err:?}"
);
}
#[test]
fn manifest_validates_task_dependency_references() {
let mut manifest = data_pipeline_manifest();
manifest.tasks[1].dependencies = vec!["nonexistent-task".to_string()];
let err = manifest.validate().unwrap_err();
assert!(
matches!(err, ManifestValidationError::InvalidDependency { .. }),
"Expected InvalidDependency error, got: {err:?}"
);
}
#[test]
fn manifest_validates_duplicate_task_ids() {
let mut manifest = data_pipeline_manifest();
manifest.tasks[1].id = "fetch-data".to_string();
let err = manifest.validate().unwrap_err();
assert!(
matches!(err, ManifestValidationError::DuplicateTaskId { .. }),
"Expected DuplicateTaskId error, got: {err:?}"
);
}
#[test]
fn manifest_validates_python_function_path_format() {
let mut manifest = data_pipeline_manifest();
manifest.tasks[0].function = "missing_colon_separator".to_string();
let err = manifest.validate().unwrap_err();
assert!(
matches!(err, ManifestValidationError::InvalidFunctionPath { .. }),
"Expected InvalidFunctionPath error, got: {err:?}"
);
}