use oxigdal_workflow::{
WorkflowDefinition,
dag::{
ResourceRequirements, RetryPolicy, TaskEdge, TaskNode, WorkflowDag, create_execution_plan,
},
monitoring::MonitoringService,
scheduler::{ScheduleType, Scheduler},
};
use std::collections::HashMap;
fn create_task(id: &str, name: &str) -> TaskNode {
TaskNode {
id: id.to_string(),
name: name.to_string(),
description: None,
config: serde_json::json!({}),
retry: RetryPolicy::default(),
timeout_secs: Some(60),
resources: ResourceRequirements::default(),
metadata: HashMap::new(),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Batch Processing Workflow Example");
println!("==================================\n");
let mut dag = WorkflowDag::new();
dag.add_task(create_task("list_files", "List Input Files"))?;
for i in 0..8 {
dag.add_task(create_task(
&format!("process_batch_{}", i),
&format!("Process Batch {}", i),
))?;
}
dag.add_task(create_task("merge_results", "Merge Results"))?;
dag.add_task(create_task("export", "Export Final Results"))?;
for i in 0..8 {
dag.add_dependency(
"list_files",
&format!("process_batch_{}", i),
TaskEdge::default(),
)?;
}
for i in 0..8 {
dag.add_dependency(
&format!("process_batch_{}", i),
"merge_results",
TaskEdge::default(),
)?;
}
dag.add_dependency("merge_results", "export", TaskEdge::default())?;
println!("Created DAG with {} nodes", dag.task_count());
let execution_plan = create_execution_plan(&dag)?;
println!("\nExecution order (by level):");
for (level_idx, level) in execution_plan.iter().enumerate() {
println!(
" Level {}: {} tasks ({})",
level_idx,
level.len(),
level.join(", ")
);
}
let workflow = WorkflowDefinition {
id: "batch-processing".to_string(),
name: "Batch Processing Workflow".to_string(),
description: Some("Process multiple files in parallel".to_string()),
version: "1.0.0".to_string(),
dag,
};
let _monitoring = MonitoringService::new();
println!("\nMonitoring service initialized");
let scheduler = Scheduler::with_defaults();
let schedule_id = scheduler
.add_schedule(
workflow,
ScheduleType::Interval {
interval_secs: 3600, },
)
.await?;
println!("Scheduled workflow with ID: {}", schedule_id);
let schedules = scheduler.get_schedules();
println!("\nTotal scheduled workflows: {}", schedules.len());
println!("\nBatch processing workflow configured successfully!");
println!(
"Expected parallelism: {} concurrent tasks in level 1",
execution_plan
.get(1)
.map(|g: &Vec<String>| g.len())
.unwrap_or(0)
);
Ok(())
}