use siafu::{JobBuilder, Scheduler};
use std::time::Duration;
use std::time::SystemTime;
use std::sync::{Arc, Mutex};
use std::thread;
use std::collections::HashMap;
use rand::Rng;
use siafu::utils::time::ScheduleTime;
use siafu::scheduler::types::RecurringInterval;
struct AppState {
job_results: HashMap<String, bool>,
error_count: HashMap<String, u32>,
}
fn extract_job_handler() {
println!("📥 Starting data extraction process...");
let success = rand::rng().random_bool(0.5);
if success {
println!("✅ Data extraction completed successfully!");
} else {
println!("❌ Error: Data extraction failed!");
}
}
fn transform_job_handler() {
println!("🔄 Starting data transformation...");
println!("✅ Data transformation completed");
}
fn load_job_handler() {
println!("📤 Starting data loading process...");
println!("✅ Data loaded successfully into target systems");
}
fn monitor_job_handler() {
println!("\n📊 Job status monitor running...");
println!();
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let state = Arc::new(Mutex::new(AppState {
job_results: HashMap::new(),
error_count: HashMap::new(),
}));
let mut scheduler = Scheduler::new();
let extract_job = JobBuilder::new("data-extract")
.once(ScheduleTime::Delay(Duration::from_secs(3)))
.add_handler(extract_job_handler)
.build();
let extract_state = Arc::clone(&state);
let extract_job_id = extract_job.name.clone();
scheduler.add_job(extract_job)?;
let transform_job = JobBuilder::new("transform-data")
.once(ScheduleTime::Delay(Duration::from_secs(8)))
.add_handler(transform_job_handler)
.build();
let transform_state = Arc::clone(&state);
let transform_job_id = transform_job.name.clone();
scheduler.add_job(transform_job)?;
let load_job = JobBuilder::new("load-data")
.once(ScheduleTime::Delay(Duration::from_secs(13)))
.add_handler(load_job_handler)
.build();
let load_state = Arc::clone(&state);
let load_job_id = load_job.name.clone();
scheduler.add_job(load_job)?;
let monitor_job = JobBuilder::new("job-monitor")
.recurring(RecurringInterval::Secondly(5), Some(ScheduleTime::Delay(Duration::from_secs(5))))
.add_handler(monitor_job_handler)
.build();
let monitor_state = Arc::clone(&state);
scheduler.add_job(monitor_job)?;
println!("🚀 Advanced job orchestration system started");
println!("📋 Jobs scheduled with dependencies: extract → transform → load");
println!("🔍 Monitor will check job status every 5 seconds\n");
scheduler.run_non_blocking()?;
println!("✨ Advanced scheduler demo completed!");
println!("\n📑 Final Job Status:");
let state = state.lock().unwrap();
for (job_name, success) in &state.job_results {
let status = if *success { "✅ Succeeded" } else { "❌ Failed" };
println!(" - {}: {}", job_name, status);
}
Ok(())
}