Skip to main content

awa_model/
job.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use sqlx::prelude::FromRow;
4use std::fmt;
5
6/// Job states in the lifecycle state machine.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, sqlx::Type)]
8#[sqlx(type_name = "awa.job_state", rename_all = "snake_case")]
9#[serde(rename_all = "snake_case")]
10pub enum JobState {
11    Scheduled,
12    Available,
13    Running,
14    Completed,
15    Retryable,
16    Failed,
17    Cancelled,
18    WaitingExternal,
19}
20
21impl fmt::Display for JobState {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        match self {
24            JobState::Scheduled => write!(f, "scheduled"),
25            JobState::Available => write!(f, "available"),
26            JobState::Running => write!(f, "running"),
27            JobState::Completed => write!(f, "completed"),
28            JobState::Retryable => write!(f, "retryable"),
29            JobState::Failed => write!(f, "failed"),
30            JobState::Cancelled => write!(f, "cancelled"),
31            JobState::WaitingExternal => write!(f, "waiting_external"),
32        }
33    }
34}
35
36impl std::str::FromStr for JobState {
37    type Err = String;
38
39    fn from_str(s: &str) -> Result<Self, Self::Err> {
40        match s {
41            "scheduled" => Ok(JobState::Scheduled),
42            "available" => Ok(JobState::Available),
43            "running" => Ok(JobState::Running),
44            "completed" => Ok(JobState::Completed),
45            "retryable" => Ok(JobState::Retryable),
46            "failed" => Ok(JobState::Failed),
47            "cancelled" => Ok(JobState::Cancelled),
48            "waiting_external" => Ok(JobState::WaitingExternal),
49            other => Err(format!("unknown job state: {other}")),
50        }
51    }
52}
53
54impl JobState {
55    /// Bit position for unique_states bitmask.
56    pub fn bit_position(&self) -> u8 {
57        match self {
58            JobState::Scheduled => 0,
59            JobState::Available => 1,
60            JobState::Running => 2,
61            JobState::Completed => 3,
62            JobState::Retryable => 4,
63            JobState::Failed => 5,
64            JobState::Cancelled => 6,
65            JobState::WaitingExternal => 7,
66        }
67    }
68
69    /// Check if this state is terminal (no further transitions possible).
70    pub fn is_terminal(&self) -> bool {
71        matches!(
72            self,
73            JobState::Completed | JobState::Failed | JobState::Cancelled
74        )
75    }
76}
77
78/// A row from the `awa.jobs` table.
79#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
80pub struct JobRow {
81    pub id: i64,
82    pub kind: String,
83    pub queue: String,
84    pub args: serde_json::Value,
85    pub state: JobState,
86    pub priority: i16,
87    pub attempt: i16,
88    pub run_lease: i64,
89    pub max_attempts: i16,
90    pub run_at: DateTime<Utc>,
91    pub heartbeat_at: Option<DateTime<Utc>>,
92    pub deadline_at: Option<DateTime<Utc>>,
93    pub attempted_at: Option<DateTime<Utc>>,
94    pub finalized_at: Option<DateTime<Utc>>,
95    pub created_at: DateTime<Utc>,
96    pub errors: Option<Vec<serde_json::Value>>,
97    pub metadata: serde_json::Value,
98    pub tags: Vec<String>,
99    pub unique_key: Option<Vec<u8>>,
100    /// Unique states bitmask — stored as BIT(8) in Postgres.
101    /// Skipped in FromRow since it's only used by the DB-side unique index.
102    #[sqlx(skip)]
103    pub unique_states: Option<u8>,
104    /// Callback ID for external webhook completion.
105    pub callback_id: Option<uuid::Uuid>,
106    /// Deadline for callback timeout.
107    pub callback_timeout_at: Option<DateTime<Utc>>,
108    /// CEL filter expression for callback resolution.
109    pub callback_filter: Option<String>,
110    /// CEL expression: does the payload indicate completion?
111    pub callback_on_complete: Option<String>,
112    /// CEL expression: does the payload indicate failure?
113    pub callback_on_fail: Option<String>,
114    /// CEL expression to transform the payload before returning.
115    pub callback_transform: Option<String>,
116    /// Structured progress reported by the handler during execution.
117    pub progress: Option<serde_json::Value>,
118}
119
120/// Options for inserting a job.
121#[derive(Debug, Clone)]
122pub struct InsertOpts {
123    pub queue: String,
124    pub priority: i16,
125    pub max_attempts: i16,
126    pub run_at: Option<DateTime<Utc>>,
127    pub deadline_duration: Option<chrono::Duration>,
128    pub metadata: serde_json::Value,
129    pub tags: Vec<String>,
130    pub unique: Option<UniqueOpts>,
131}
132
133impl Default for InsertOpts {
134    fn default() -> Self {
135        Self {
136            queue: "default".to_string(),
137            priority: 2,
138            max_attempts: 25,
139            run_at: None,
140            deadline_duration: None,
141            metadata: serde_json::json!({}),
142            tags: Vec::new(),
143            unique: None,
144        }
145    }
146}
147
148/// Uniqueness constraint options.
149#[derive(Debug, Clone)]
150pub struct UniqueOpts {
151    /// Include queue in uniqueness calculation.
152    pub by_queue: bool,
153    /// Include args in uniqueness calculation.
154    pub by_args: bool,
155    /// Period bucket for time-based uniqueness (epoch seconds / period).
156    pub by_period: Option<i64>,
157    /// States in which uniqueness is enforced.
158    /// Default: scheduled, available, running, completed, retryable (bits 0-4).
159    pub states: u8,
160}
161
162impl Default for UniqueOpts {
163    fn default() -> Self {
164        Self {
165            by_queue: false,
166            by_args: true,
167            by_period: None,
168            // Default: bits 0-4 set (scheduled, available, running, completed, retryable)
169            states: 0b0001_1111,
170        }
171    }
172}
173
174impl UniqueOpts {
175    /// Convert the states bitmask to a BIT(8) representation for Postgres.
176    pub fn states_bits(&self) -> Vec<u8> {
177        vec![self.states]
178    }
179}
180
181/// Parameters for bulk insert.
182#[derive(Debug, Clone)]
183pub struct InsertParams {
184    pub kind: String,
185    pub args: serde_json::Value,
186    pub opts: InsertOpts,
187}