azoth-scheduler 0.2.5

Task scheduler for Azoth database with cron support
Documentation
# Azoth Scheduler

A cron-like task scheduling system for Azoth projects, built on event-sourcing principles.

## Features

- **Flexible Scheduling**
  - Cron expressions for complex recurring tasks
  - Simple interval-based scheduling
  - One-time execution at specific timestamps
  - Immediate execution (job queue functionality)

- **Robust Execution**
  - Configurable concurrency limits
  - Automatic retries with exponential backoff
  - Timeout support per task
  - Task cancellation

- **Event-Sourced**
  - All schedule operations are events in the canonical log
  - Task executions produce events for EventHandlers
  - Full audit trail of all task activity
  - Durable state across restarts

## Architecture

```
┌─────────────────────┐
│  Schedule Events    │  TaskScheduled, TaskExecuted, TaskCancelled
│  (Canonical Store)  │  Written via Transaction API
└──────────┬──────────┘
┌─────────────────────┐
│  Scheduler Loop     │  Continuously checks for due tasks
│  (like Projector)   │  - Polls schedule projection
└──────────┬──────────┘  - Executes via TaskHandler
           │              - Writes result events
┌─────────────────────┐
│ Schedule Projection │  SQLite table with next_run_time index
│   (SQLite)          │  Tracks task state, retries, history
└─────────────────────┘
┌─────────────────────┐
│   TaskHandler       │  Produces event for EventHandler
│   (User Code)       │
└──────────┬──────────┘
┌─────────────────────┐
│  EventHandler       │  Processes task result event
│  (User Code)        │  Updates projections, side effects
└─────────────────────┘
```

## Usage

### Basic Example

```rust
use azoth::AzothDb;
use azoth_scheduler::prelude::*;
use std::sync::Arc;
use std::time::Duration;

// Define a task handler
struct ReportHandler;

impl TaskHandler for ReportHandler {
    fn task_type(&self) -> &str {
        "generate_report"
    }

    fn execute(&self, ctx: &TaskContext, payload: &[u8]) -> Result<TaskEvent> {
        // Your task logic here
        println!("Generating report...");

        Ok(TaskEvent {
            event_type: "report_generated".to_string(),
            payload: vec![],
        })
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let db = Arc::new(AzothDb::open("./data")?);
    let conn = Arc::new(Connection::open("./projection.db")?);

    // Create scheduler
    let scheduler = Scheduler::builder(db.clone())
        .with_task_handler(ReportHandler)
        .with_poll_interval(Duration::from_secs(1))
        .build(conn)?;

    // Schedule tasks
    scheduler.schedule_task(
        ScheduleTaskRequest::builder("daily-report")
            .task_type("generate_report")
            .cron("0 0 0 * * *")  // Daily at midnight (6-field: sec min hour day month dow)
            .payload(vec![])
            .build()?
    )?;

    // Run scheduler
    scheduler.run().await?;
    Ok(())
}
```

### Schedule Types

#### Cron Expression
```rust
scheduler.schedule_task(
    ScheduleTaskRequest::builder("task-id")
        .task_type("my_task")
        .cron("0 0 0 * * *")  // Daily at midnight
        .payload(vec![])
        .build()?
)?;
```

Cron format: `second minute hour day_of_month month day_of_week` (6-field with seconds)
- `0 0 0 * * *` - Daily at midnight
- `0 0 */6 * * *` - Every 6 hours
- `0 0 9 * * 1-5` - Weekdays at 9 AM
- `*/30 * * * * *` - Every 30 seconds

#### Interval
```rust
scheduler.schedule_task(
    ScheduleTaskRequest::builder("task-id")
        .task_type("my_task")
        .interval(300)  // Every 5 minutes (300 seconds)
        .payload(vec![])
        .build()?
)?;
```

#### One-Time
```rust
let run_at = Utc::now().timestamp() + 3600; // 1 hour from now

scheduler.schedule_task(
    ScheduleTaskRequest::builder("task-id")
        .task_type("my_task")
        .one_time(run_at)
        .payload(vec![])
        .build()?
)?;
```

#### Immediate
```rust
scheduler.schedule_task(
    ScheduleTaskRequest::builder("task-id")
        .task_type("my_task")
        .immediate()
        .payload(vec![])
        .build()?
)?;
```

### Task Handlers

Task handlers execute the actual work and produce events:

```rust
struct EmailHandler;

impl TaskHandler for EmailHandler {
    fn task_type(&self) -> &str {
        "send_email"
    }

    fn execute(&self, ctx: &TaskContext, payload: &[u8]) -> Result<TaskEvent> {
        let email: EmailData = serde_json::from_slice(payload)?;

        // Send email
        send_email(&email)?;

        // Return event to be processed by EventHandlers
        Ok(TaskEvent {
            event_type: "email_sent".to_string(),
            payload: serde_json::to_vec(&EmailSentEvent {
                email_id: email.id,
                sent_at: Utc::now(),
            })?,
        })
    }

    fn validate(&self, payload: &[u8]) -> Result<()> {
        serde_json::from_slice::<EmailData>(payload)
            .map_err(|e| SchedulerError::InvalidTask(e.to_string()))?;
        Ok(())
    }
}
```

### Configuration

```rust
let scheduler = Scheduler::builder(db)
    .with_task_handler(MyHandler)
    .with_poll_interval(Duration::from_secs(1))      // How often to check for due tasks
    .with_max_concurrent_tasks(10)                    // Max parallel executions
    .with_default_max_retries(3)                      // Retries on failure
    .with_default_timeout_secs(300)                   // 5 minute timeout
    .build(conn)?;
```

### Task Management

```rust
// Cancel a task
scheduler.cancel_task("task-id", "No longer needed")?;

// Get task info
let task = scheduler.get_task("task-id")?;

// List all tasks
let tasks = scheduler.list_tasks(&TaskFilter::new())?;

// List only enabled tasks
let tasks = scheduler.list_tasks(&TaskFilter::new().enabled(true))?;
```

## Retry Behavior

When a task fails:
1. The failure is recorded with a `TaskExecuted` event (success: false)
2. The task's retry count is incremented
3. If retry count < max retries:
   - Task is rescheduled with exponential backoff (1min, 2min, 4min, etc.)
4. If retry count >= max retries:
   - Task is disabled
   - Check execution history to diagnose

## Examples

Run the included examples:

```bash
# Basic scheduler with immediate and interval tasks
cargo run --example basic_scheduler

# Cron-based scheduling
cargo run --example cron_tasks
```

## Testing

```bash
cargo test --package azoth-scheduler
```

## Design Rationale

**Why event-triggered tasks?**
- Fully event-sourced (all executions in event log)
- Testable and debuggable (inspect events)
- Distributed processing (handlers can run anywhere)
- Consistent with Azoth's architecture

**Why projection-based state?**
- Fast queries by next_run_time
- SQL queries for task management
- Durable across restarts

**Why poll-based timing?**
- Simple implementation
- Works with multiple schedulers (future feature)
- Minimum latency = poll interval (acceptable at 1 second)