#![cfg(feature = "scheduler")]
use async_trait::async_trait;
use cano::prelude::*;
use tokio::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum TaskState {
Start,
Complete,
}
#[derive(Clone)]
struct DailyTask;
#[async_trait]
impl Node<TaskState> for DailyTask {
type PrepResult = ();
type ExecResult = ();
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
Ok(())
}
async fn exec(&self, _data: Self::PrepResult) -> Self::ExecResult {
println!(
"📅 Daily task executed at {}",
chrono::Utc::now().format("%H:%M:%S")
);
}
async fn post(
&self,
_store: &MemoryStore,
_result: Self::ExecResult,
) -> Result<TaskState, CanoError> {
Ok(TaskState::Complete)
}
}
#[derive(Clone)]
struct HourlyTask;
#[async_trait]
impl Node<TaskState> for HourlyTask {
type PrepResult = ();
type ExecResult = ();
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
Ok(())
}
async fn exec(&self, _data: Self::PrepResult) -> Self::ExecResult {
println!(
"⏰ Hourly task executed at {}",
chrono::Utc::now().format("%H:%M:%S")
);
}
async fn post(
&self,
_store: &MemoryStore,
_result: Self::ExecResult,
) -> Result<TaskState, CanoError> {
Ok(TaskState::Complete)
}
}
#[derive(Clone)]
struct FrequentTask;
#[async_trait]
impl Node<TaskState> for FrequentTask {
type PrepResult = ();
type ExecResult = ();
async fn prep(&self, _store: &MemoryStore) -> Result<Self::PrepResult, CanoError> {
Ok(())
}
async fn exec(&self, _data: Self::PrepResult) -> Self::ExecResult {
println!(
"🔄 Frequent task executed at {}",
chrono::Utc::now().format("%H:%M:%S")
);
}
async fn post(
&self,
_store: &MemoryStore,
_result: Self::ExecResult,
) -> Result<TaskState, CanoError> {
Ok(TaskState::Complete)
}
}
#[tokio::main]
async fn main() -> CanoResult<()> {
println!("⏰ Duration-Based Scheduling Example");
println!("====================================");
let mut scheduler = Scheduler::new();
let store = MemoryStore::new();
let daily_flow = Workflow::new(store.clone())
.register(TaskState::Start, DailyTask)
.add_exit_state(TaskState::Complete);
let hourly_flow = Workflow::new(store.clone())
.register(TaskState::Start, HourlyTask)
.add_exit_state(TaskState::Complete);
let frequent_flow = Workflow::new(store.clone())
.register(TaskState::Start, FrequentTask)
.add_exit_state(TaskState::Complete);
scheduler.every(
"daily_task",
daily_flow,
TaskState::Start,
Duration::from_secs(4),
)?; scheduler.every(
"hourly_task",
hourly_flow,
TaskState::Start,
Duration::from_secs(2),
)?; scheduler.every(
"frequent_task",
frequent_flow,
TaskState::Start,
Duration::from_secs(1),
)?;
println!("📅 Scheduled workflows:");
println!(" • Daily task: Every 4 seconds (simulated)");
println!(" • Hourly task: Every 2 seconds (simulated)");
println!(" • Frequent task: Every 1 second");
println!();
let mut scheduler_handle = scheduler.clone();
let scheduler_task = tokio::spawn(async move {
println!("Scheduler background task started.");
if let Err(e) = scheduler_handle.start().await {
eprintln!("Scheduler failed: {}", e);
}
println!("Scheduler background task finished.");
});
println!("🚀 Scheduler started! Running for 10 seconds...");
tokio::time::sleep(Duration::from_secs(10)).await;
scheduler.stop().await?;
println!("✅ Scheduler stopped gracefully");
scheduler_task
.await
.map_err(|e| CanoError::task_execution(format!("Scheduler task failed: {}", e)))?;
Ok(())
}