use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
pub fn now_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TaskState {
Pending,
Ready,
Running,
Completed,
Failed,
Retrying,
Cancelled,
Skipped,
DeadLettered,
}
impl TaskState {
pub fn is_terminal(&self) -> bool {
matches!(
self,
TaskState::Completed
| TaskState::Failed
| TaskState::Cancelled
| TaskState::Skipped
| TaskState::DeadLettered
)
}
pub fn is_success(&self) -> bool {
matches!(self, TaskState::Completed)
}
pub fn is_failure(&self) -> bool {
matches!(self, TaskState::Failed | TaskState::DeadLettered)
}
pub fn can_transition_to(&self, next: TaskState) -> bool {
use TaskState::*;
match (self, next) {
(a, b) if a == &b => true,
(Pending, Ready | Skipped | Cancelled) => true,
(Ready, Running | Skipped | Cancelled) => true,
(Running, Completed | Failed | Retrying | Cancelled) => true,
(Retrying, Running | Ready | Failed | DeadLettered | Cancelled) => true,
(Failed, Retrying | DeadLettered) => true,
_ => false,
}
}
}
impl std::fmt::Display for TaskState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
TaskState::Pending => "pending",
TaskState::Ready => "ready",
TaskState::Running => "running",
TaskState::Completed => "completed",
TaskState::Failed => "failed",
TaskState::Retrying => "retrying",
TaskState::Cancelled => "cancelled",
TaskState::Skipped => "skipped",
TaskState::DeadLettered => "dead_lettered",
};
f.write_str(s)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskRecord {
pub id: String,
pub state: TaskState,
pub attempts: u32,
pub created_at: u64,
pub updated_at: u64,
pub started_at: Option<u64>,
pub finished_at: Option<u64>,
pub output: Option<serde_json::Value>,
pub error: Option<String>,
pub metadata: HashMap<String, String>,
}
impl TaskRecord {
pub fn new(id: impl Into<String>) -> Self {
let now = now_millis();
TaskRecord {
id: id.into(),
state: TaskState::Pending,
attempts: 0,
created_at: now,
updated_at: now,
started_at: None,
finished_at: None,
output: None,
error: None,
metadata: HashMap::new(),
}
}
pub fn transition(&mut self, state: TaskState) -> bool {
if !self.state.can_transition_to(state) {
return false;
}
self.state = state;
self.updated_at = now_millis();
match state {
TaskState::Running => self.started_at = Some(self.updated_at),
s if s.is_terminal() => self.finished_at = Some(self.updated_at),
_ => {}
}
true
}
pub fn duration_millis(&self) -> Option<u64> {
match (self.started_at, self.finished_at) {
(Some(s), Some(f)) if f >= s => Some(f - s),
_ => None,
}
}
}