#![cfg(feature = "scheduler")]
use cano::prelude::*;
use chrono::Utc;
use tokio::time::{Duration, sleep};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum WorkflowAction {
Start,
Complete,
Error,
}
#[derive(Clone)]
struct ReportTask {
report_type: String,
}
impl ReportTask {
fn new(report_type: &str) -> Self {
Self {
report_type: report_type.to_string(),
}
}
}
#[task(state = WorkflowAction)]
impl ReportTask {
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn run(&self, res: &Resources) -> Result<TaskResult<WorkflowAction>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
println!("Preparing {} report...", self.report_type);
store.put("report_start_time", Utc::now().to_rfc3339())?;
println!("Generating {} report...", self.report_type);
sleep(Duration::from_millis(3000)).await;
let result = format!("{} report generated successfully", self.report_type);
println!("Report completed: {}", result);
store.put("report_result", result)?;
Ok(TaskResult::Single(WorkflowAction::Complete))
}
}
#[derive(Clone)]
struct CleanupTask {
cleanup_type: String,
}
impl CleanupTask {
fn new(cleanup_type: &str) -> Self {
Self {
cleanup_type: cleanup_type.to_string(),
}
}
}
#[task(state = WorkflowAction)]
impl CleanupTask {
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn run(&self, res: &Resources) -> Result<TaskResult<WorkflowAction>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
println!("Scanning for {} cleanup...", self.cleanup_type);
store.put("cleanup_start", Utc::now().to_rfc3339())?;
let items = [
"temp_file_1".to_string(),
"temp_file_2".to_string(),
"old_log".to_string(),
];
println!("Cleaning up {} items...", items.len());
sleep(Duration::from_millis(2000)).await;
let count = items.len();
println!("Cleanup completed: {} items removed", count);
store.put("cleanup_count", count.to_string())?;
Ok(TaskResult::Single(WorkflowAction::Complete))
}
}
#[derive(Clone)]
struct ManualTask {
task_name: String,
}
impl ManualTask {
fn new(task_name: &str) -> Self {
Self {
task_name: task_name.to_string(),
}
}
}
#[task(state = WorkflowAction)]
impl ManualTask {
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn run(&self, res: &Resources) -> Result<TaskResult<WorkflowAction>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
println!("Starting manual task: {}", self.task_name);
store.put("manual_task_start", Utc::now().to_rfc3339())?;
let prep = format!("Manual task: {}", self.task_name);
println!("Executing: {}", prep);
sleep(Duration::from_millis(200)).await;
let result = format!("{} completed", prep);
println!("Manual task finished: {}", result);
store.put("manual_task_result", result)?;
Ok(TaskResult::Single(WorkflowAction::Complete))
}
}
#[derive(Clone)]
struct SetupTask {
setup_type: String,
}
impl SetupTask {
fn new(setup_type: &str) -> Self {
Self {
setup_type: setup_type.to_string(),
}
}
}
#[task(state = WorkflowAction)]
impl SetupTask {
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn run(&self, res: &Resources) -> Result<TaskResult<WorkflowAction>, CanoError> {
let store = res.get::<MemoryStore, _>("store")?;
println!("Preparing {} setup...", self.setup_type);
store.put("setup_start", Utc::now().to_rfc3339())?;
let steps = vec![
"configure_database".to_string(),
"setup_cache".to_string(),
"initialize_logging".to_string(),
];
println!("Running setup tasks: {:?}", steps);
sleep(Duration::from_millis(300)).await;
println!("Setup completed successfully");
store.put("setup_complete", "true".to_string())?;
Ok(TaskResult::Single(WorkflowAction::Complete))
}
}
#[tokio::main]
async fn main() -> CanoResult<()> {
println!("Starting Scheduler Scheduling Example");
println!("=====================================");
let store = MemoryStore::new();
let hourly_report_flow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(WorkflowAction::Start, ReportTask::new("Hourly"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let cleanup_flow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(WorkflowAction::Start, CleanupTask::new("Temporary"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let manual_flow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(WorkflowAction::Start, ManualTask::new("Data Migration"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let setup_flow = Workflow::new(Resources::new().insert("store", store.clone()))
.register(WorkflowAction::Start, SetupTask::new("System"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let mut scheduler = Scheduler::new();
scheduler.every_seconds(
"hourly_report",
hourly_report_flow,
WorkflowAction::Start,
5,
)?;
scheduler.every_seconds("data_cleanup", cleanup_flow, WorkflowAction::Start, 3)?;
scheduler.manual("manual_migration", manual_flow, WorkflowAction::Start)?;
scheduler.manual("system_setup", setup_flow, WorkflowAction::Start)?;
println!("Configured flows:");
println!(" Hourly Report: Every 5 seconds");
println!(" Data Cleanup: Every 3 seconds");
println!(" Manual Migration: Manual trigger only");
println!(" System Setup: Manual trigger only");
println!();
println!("Starting scheduler system...");
let running = scheduler.start().await?;
sleep(Duration::from_secs(2)).await;
println!("Current workflow status:");
let flows_info = running.list().await;
for info in &flows_info {
println!(
" {}: {:?} (runs: {})",
info.id, info.status, info.run_count
);
}
println!();
println!("Waiting a bit then triggering system setup...");
sleep(Duration::from_secs(4)).await;
println!("Manually triggering system setup...");
running.trigger("system_setup").await?;
println!("Manually triggering data migration...");
running.trigger("manual_migration").await?;
println!("Running scheduler for 20 seconds to see concurrent executions...");
sleep(Duration::from_secs(20)).await;
println!("\nFinal workflow status:");
let final_flows_info = running.list().await;
for info in &final_flows_info {
println!(
" {}: {:?} (runs: {})",
info.id, info.status, info.run_count
);
}
println!("\nStopping scheduler...");
running.stop().await?;
println!("Scheduler scheduling example completed!");
Ok(())
}