#![cfg(feature = "scheduler")]
use async_trait::async_trait;
use cano::prelude::*;
use chrono::Utc;
use tokio::time::{Duration, sleep};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum WorkflowAction {
Start,
Complete,
Error,
}
#[derive(Clone)]
struct ReportNode {
report_type: String,
}
impl ReportNode {
fn new(report_type: &str) -> Self {
Self {
report_type: report_type.to_string(),
}
}
}
#[async_trait]
impl Node<WorkflowAction> for ReportNode {
type PrepResult = String;
type ExecResult = String;
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
println!("đ Preparing {} report...", self.report_type);
store.put("report_start_time", Utc::now().to_rfc3339())?;
Ok(format!("Preparing {} report", self.report_type))
}
async fn exec(&self, _prep_result: Self::PrepResult) -> Self::ExecResult {
println!("đ Generating {} report...", self.report_type);
sleep(Duration::from_millis(3000)).await;
format!("{} report generated successfully", self.report_type)
}
async fn post(
&self,
store: &MemoryStore,
exec_result: Self::ExecResult,
) -> Result<WorkflowAction, CanoError> {
println!("đ Report completed: {}", exec_result);
store.put("report_result", exec_result)?;
Ok(WorkflowAction::Complete)
}
}
#[derive(Clone)]
struct CleanupNode {
cleanup_type: String,
}
impl CleanupNode {
fn new(cleanup_type: &str) -> Self {
Self {
cleanup_type: cleanup_type.to_string(),
}
}
}
#[async_trait]
impl Node<WorkflowAction> for CleanupNode {
type PrepResult = Vec<String>;
type ExecResult = usize;
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
println!("đ§š Scanning for {} cleanup...", self.cleanup_type);
store.put("cleanup_start", Utc::now().to_rfc3339())?;
Ok(vec![
"temp_file_1".to_string(),
"temp_file_2".to_string(),
"old_log".to_string(),
])
}
async fn exec(&self, prep_result: Self::PrepResult) -> Self::ExecResult {
println!("đ§š Cleaning up {} items...", prep_result.len());
sleep(Duration::from_millis(2000)).await;
prep_result.len()
}
async fn post(
&self,
store: &MemoryStore,
exec_result: Self::ExecResult,
) -> Result<WorkflowAction, CanoError> {
println!("đ§š Cleanup completed: {} items removed", exec_result);
store.put("cleanup_count", exec_result.to_string())?;
Ok(WorkflowAction::Complete)
}
}
#[derive(Clone)]
struct ManualTaskNode {
task_name: String,
}
impl ManualTaskNode {
fn new(task_name: &str) -> Self {
Self {
task_name: task_name.to_string(),
}
}
}
#[async_trait]
impl Node<WorkflowAction> for ManualTaskNode {
type PrepResult = String;
type ExecResult = String;
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
println!("⥠Starting manual task: {}", self.task_name);
store.put("manual_task_start", Utc::now().to_rfc3339())?;
Ok(format!("Manual task: {}", self.task_name))
}
async fn exec(&self, prep_result: Self::PrepResult) -> Self::ExecResult {
println!("⥠Executing: {}", prep_result);
sleep(Duration::from_millis(200)).await;
format!("{} completed", prep_result)
}
async fn post(
&self,
store: &MemoryStore,
exec_result: Self::ExecResult,
) -> Result<WorkflowAction, CanoError> {
println!("⥠Manual task finished: {}", exec_result);
store.put("manual_task_result", exec_result)?;
Ok(WorkflowAction::Complete)
}
}
#[derive(Clone)]
struct SetupNode {
setup_type: String,
}
impl SetupNode {
fn new(setup_type: &str) -> Self {
Self {
setup_type: setup_type.to_string(),
}
}
}
#[async_trait]
impl Node<WorkflowAction> for SetupNode {
type PrepResult = Vec<String>;
type ExecResult = bool;
fn config(&self) -> TaskConfig {
TaskConfig::minimal()
}
async fn prep(&self, store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
println!("đ§ Preparing {} setup...", self.setup_type);
store.put("setup_start", Utc::now().to_rfc3339())?;
Ok(vec![
"configure_database".to_string(),
"setup_cache".to_string(),
"initialize_logging".to_string(),
])
}
async fn exec(&self, prep_result: Self::PrepResult) -> Self::ExecResult {
println!("đ§ Running setup tasks: {:?}", prep_result);
sleep(Duration::from_millis(300)).await;
true
}
async fn post(
&self,
store: &MemoryStore,
exec_result: Self::ExecResult,
) -> Result<WorkflowAction, CanoError> {
println!("đ§ Setup completed successfully: {}", exec_result);
store.put("setup_complete", exec_result.to_string())?;
Ok(WorkflowAction::Complete)
}
}
#[tokio::main]
async fn main() -> CanoResult<()> {
println!("đ Starting Scheduler Scheduling Example");
println!("=====================================");
let store = MemoryStore::new();
let hourly_report_flow = Workflow::new(store.clone())
.register(WorkflowAction::Start, ReportNode::new("Hourly"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let cleanup_flow = Workflow::new(store.clone())
.register(WorkflowAction::Start, CleanupNode::new("Temporary"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let manual_flow = Workflow::new(store.clone())
.register(WorkflowAction::Start, ManualTaskNode::new("Data Migration"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let setup_flow = Workflow::new(store.clone())
.register(WorkflowAction::Start, SetupNode::new("System"))
.add_exit_states(vec![WorkflowAction::Complete, WorkflowAction::Error]);
let mut scheduler = Scheduler::new();
scheduler.every_seconds(
"hourly_report",
hourly_report_flow,
WorkflowAction::Start,
5,
)?;
scheduler.every_seconds("data_cleanup", cleanup_flow, WorkflowAction::Start, 3)?;
scheduler.manual("manual_migration", manual_flow, WorkflowAction::Start)?;
scheduler.manual("system_setup", setup_flow, WorkflowAction::Start)?;
println!("đ
Configured flows:");
println!(" âĸ Hourly Report: Every 5 seconds");
println!(" âĸ Data Cleanup: Every 3 seconds");
println!(" âĸ Manual Migration: Manual trigger only");
println!(" âĸ System Setup: Manual trigger only");
println!();
println!("âļī¸ Starting scheduler system...");
let mut scheduler_handle = scheduler.clone();
let scheduler_task = tokio::spawn(async move { scheduler_handle.start().await });
sleep(Duration::from_secs(2)).await;
println!("đ Current workflow status:");
let flows_info = scheduler.list().await;
for info in &flows_info {
println!(
" âĸ {}: {:?} (runs: {})",
info.id, info.status, info.run_count
);
}
println!();
println!("âŗ Waiting a bit then triggering system setup...");
sleep(Duration::from_secs(4)).await;
println!("đ§ Manually triggering system setup...");
scheduler.trigger("system_setup").await?;
println!("đ§ Manually triggering data migration...");
scheduler.trigger("manual_migration").await?;
println!("âŗ Running scheduler for 20 seconds to see concurrent executions...");
sleep(Duration::from_secs(20)).await;
println!("\nđ Final workflow status:");
let final_flows_info = scheduler.list().await;
for info in &final_flows_info {
println!(
" âĸ {}: {:?} (runs: {})",
info.id, info.status, info.run_count
);
}
println!("\nâšī¸ Stopping scheduler...");
scheduler.stop().await?;
let _ = scheduler_task
.await
.map_err(|e| CanoError::task_execution(format!("Scheduler task failed: {}", e)))?;
println!("â
Scheduler scheduling example completed!");
Ok(())
}