// Sample Data Processing Workflow in KDL format
// This demonstrates a complete workflow specification with all supported features
name "sample_data_processing_workflow"
description "A sample workflow that demonstrates data processing with multiple jobs"
// File definitions
file "raw_data" path="/data/input/raw_data.csv"
file "validated_data" path="/data/processed/validated_data.csv"
file "analysis_results" path="/data/output/results.json"
file "validation_script" path="/scripts/validate_data.py"
file "analysis_script" path="/scripts/analyze_data.py"
// User data - structured metadata passed between jobs
user_data "download_metadata" {
is_ephemeral #true
data "{\"source_url\":\"https://example.com/data.csv\",\"download_timestamp\":\"2024-01-15T10:30:00Z\",\"file_size_bytes\":1048576}"
}
user_data "validation_results" {
is_ephemeral #false
data "{\"validation_rules\":[\"no_nulls\",\"valid_dates\",\"numeric_ranges\"],\"passed\":true,\"row_count\":10000}"
}
user_data "final_analysis" {
is_ephemeral #false
data "{\"analysis_type\":\"statistical_summary\",\"confidence_level\":0.95}"
}
// Resource requirements definitions
resource_requirements "small_job" {
num_cpus 1
num_gpus 0
num_nodes 1
memory "2g"
runtime "PT30M"
}
resource_requirements "large_job" {
num_cpus 4
num_gpus 1
num_nodes 1
memory "16g"
runtime "PT2H"
}
// Slurm scheduler configurations
slurm_scheduler "default_scheduler" {
account "project_account"
mem "8G"
nodes 1
ntasks_per_node 1
partition "general"
qos "normal"
tmp "10G"
walltime "01:00:00"
extra "--constraint=haswell"
}
slurm_scheduler "gpu_scheduler" {
account "gpu_project"
gres "gpu:1"
mem "32G"
nodes 1
ntasks_per_node 1
partition "gpu"
qos "high"
tmp "50G"
walltime "04:00:00"
extra "--constraint=v100"
}
// Job definitions
job "data_download" {
command "wget https://example.com/data.csv -O ${files.output.raw_data}"
cancel_on_blocking_job_failure #false
resource_requirements "small_job"
output_file "raw_data"
output_user_data "download_metadata"
scheduler "default_scheduler"
}
job "data_validation" {
command "python ${files.input.validation_script} ${files.input.raw_data}"
invocation_script "#!/bin/bash\\nset -e\\nexport PYTHONPATH=/opt/validation:$PYTHONPATH"
cancel_on_blocking_job_failure #true
resource_requirements "small_job"
depends_on_job "data_download"
input_file "raw_data"
input_file "validation_script"
output_file "validated_data"
input_user_data "download_metadata"
output_user_data "validation_results"
scheduler "default_scheduler"
}
job "data_analysis" {
command "python ${files.input.analysis_script} ${files.input.validated_data} --output ${files.output.analysis_results}"
cancel_on_blocking_job_failure #true
resource_requirements "large_job"
depends_on_job "data_validation"
input_file "validated_data"
input_file "analysis_script"
output_file "analysis_results"
input_user_data "validation_results"
output_user_data "final_analysis"
scheduler "gpu_scheduler"
}