use crate::{InstanceId, NodeId, TaskId};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TaskDef {
pub id: TaskId,
pub name: String,
pub schedule: Schedule,
pub config: TaskConfig,
pub enabled: bool,
pub hlc_timestamp: u64,
pub version: u64,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
impl TaskDef {
pub fn new(name: impl Into<String>, schedule: Schedule) -> Self {
let now = Utc::now();
Self {
id: TaskId::new(),
name: name.into(),
schedule,
config: TaskConfig::default(),
enabled: true,
hlc_timestamp: 0,
version: 0,
created_at: now,
updated_at: now,
}
}
pub fn with_config(mut self, config: TaskConfig) -> Self {
self.config = config;
self
}
pub fn with_retry(mut self, retry: RetryPolicy) -> Self {
self.config.retry = retry;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = timeout;
self
}
pub fn with_payload(mut self, payload: Bytes) -> Self {
self.config.payload = Some(payload);
self
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum Schedule {
Cron(String),
Interval(Duration),
Once(DateTime<Utc>),
}
impl Schedule {
pub fn cron(expr: impl Into<String>) -> Self {
Schedule::Cron(expr.into())
}
pub fn interval(duration: Duration) -> Self {
Schedule::Interval(duration)
}
pub fn once(at: DateTime<Utc>) -> Self {
Schedule::Once(at)
}
pub fn delay(duration: Duration) -> Self {
Schedule::Once(Utc::now() + chrono::Duration::from_std(duration).unwrap())
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TaskConfig {
pub timeout: Duration,
pub retry: RetryPolicy,
pub payload: Option<Bytes>,
pub handler_id: String,
}
impl Default for TaskConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(300), retry: RetryPolicy::default(),
payload: None,
handler_id: String::new(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub initial_delay: Duration,
pub max_delay: Duration,
pub multiplier: f64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(300),
multiplier: 2.0,
}
}
}
impl RetryPolicy {
pub fn none() -> Self {
Self {
max_attempts: 0,
..Default::default()
}
}
pub fn exponential(max_attempts: u32, initial_delay: Duration) -> Self {
Self {
max_attempts,
initial_delay,
multiplier: 2.0,
..Default::default()
}
}
pub fn fixed(max_attempts: u32, delay: Duration) -> Self {
Self {
max_attempts,
initial_delay: delay,
max_delay: delay,
multiplier: 1.0,
}
}
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
if attempt == 0 {
return Duration::ZERO;
}
let delay = self.initial_delay.as_secs_f64() * self.multiplier.powi(attempt as i32 - 1);
let delay = Duration::from_secs_f64(delay);
std::cmp::min(delay, self.max_delay)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TaskInstance {
pub id: InstanceId,
pub task_id: TaskId,
pub scheduled_at: DateTime<Utc>,
pub status: TaskStatus,
pub claimed_by: Option<NodeId>,
pub claim_version: u64,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub result: Option<Bytes>,
pub error: Option<String>,
pub attempt: u32,
}
impl TaskInstance {
pub fn new(task_id: TaskId, scheduled_at: DateTime<Utc>) -> Self {
Self {
id: InstanceId::new(),
task_id,
scheduled_at,
status: TaskStatus::Pending,
claimed_by: None,
claim_version: 0,
started_at: None,
completed_at: None,
result: None,
error: None,
attempt: 0,
}
}
pub fn is_ready(&self) -> bool {
self.status == TaskStatus::Pending && self.scheduled_at <= Utc::now()
}
pub fn can_retry(&self, policy: &RetryPolicy) -> bool {
self.status == TaskStatus::Failed && self.attempt < policy.max_attempts
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskStatus {
Pending,
Claimed,
Running,
Success,
Failed,
Cancelled,
}
impl TaskStatus {
pub fn is_terminal(&self) -> bool {
matches!(self, TaskStatus::Success | TaskStatus::Cancelled)
}
pub fn as_str(&self) -> &'static str {
match self {
TaskStatus::Pending => "pending",
TaskStatus::Claimed => "claimed",
TaskStatus::Running => "running",
TaskStatus::Success => "success",
TaskStatus::Failed => "failed",
TaskStatus::Cancelled => "cancelled",
}
}
}
impl std::str::FromStr for TaskStatus {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"pending" => Ok(TaskStatus::Pending),
"claimed" => Ok(TaskStatus::Claimed),
"running" => Ok(TaskStatus::Running),
"success" => Ok(TaskStatus::Success),
"failed" => Ok(TaskStatus::Failed),
"cancelled" => Ok(TaskStatus::Cancelled),
_ => Err(format!("unknown task status: {}", s)),
}
}
}
#[derive(Clone, Debug)]
pub struct TaskContext {
pub task_id: TaskId,
pub instance_id: InstanceId,
pub scheduled_at: DateTime<Utc>,
pub attempt: u32,
payload: Bytes,
}
impl TaskContext {
pub fn new(
task_id: TaskId,
instance_id: InstanceId,
scheduled_at: DateTime<Utc>,
attempt: u32,
payload: Bytes,
) -> Self {
Self {
task_id,
instance_id,
scheduled_at,
attempt,
payload,
}
}
pub fn payload_bytes(&self) -> &Bytes {
&self.payload
}
pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, bincode::Error> {
bincode::deserialize(&self.payload)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_policy_delay() {
let policy = RetryPolicy::exponential(5, Duration::from_secs(1));
assert_eq!(policy.delay_for_attempt(0), Duration::ZERO);
assert_eq!(policy.delay_for_attempt(1), Duration::from_secs(1));
assert_eq!(policy.delay_for_attempt(2), Duration::from_secs(2));
assert_eq!(policy.delay_for_attempt(3), Duration::from_secs(4));
}
#[test]
fn test_task_status_parse() {
assert_eq!("pending".parse::<TaskStatus>().unwrap(), TaskStatus::Pending);
assert_eq!("running".parse::<TaskStatus>().unwrap(), TaskStatus::Running);
}
#[test]
fn test_task_instance_ready() {
let task_id = TaskId::new();
let past = Utc::now() - chrono::Duration::hours(1);
let future = Utc::now() + chrono::Duration::hours(1);
let past_instance = TaskInstance::new(task_id.clone(), past);
let future_instance = TaskInstance::new(task_id, future);
assert!(past_instance.is_ready());
assert!(!future_instance.is_ready());
}
}