Skip to main content

awa_model/
job.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use sqlx::prelude::FromRow;
4use std::fmt;
5
6/// Job states in the lifecycle state machine.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, sqlx::Type)]
8#[sqlx(type_name = "awa.job_state", rename_all = "snake_case")]
9#[serde(rename_all = "snake_case")]
10pub enum JobState {
11    Scheduled,
12    Available,
13    Running,
14    Completed,
15    Retryable,
16    Failed,
17    Cancelled,
18    WaitingExternal,
19}
20
21impl fmt::Display for JobState {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        match self {
24            JobState::Scheduled => write!(f, "scheduled"),
25            JobState::Available => write!(f, "available"),
26            JobState::Running => write!(f, "running"),
27            JobState::Completed => write!(f, "completed"),
28            JobState::Retryable => write!(f, "retryable"),
29            JobState::Failed => write!(f, "failed"),
30            JobState::Cancelled => write!(f, "cancelled"),
31            JobState::WaitingExternal => write!(f, "waiting_external"),
32        }
33    }
34}
35
36impl JobState {
37    /// Bit position for unique_states bitmask.
38    pub fn bit_position(&self) -> u8 {
39        match self {
40            JobState::Scheduled => 0,
41            JobState::Available => 1,
42            JobState::Running => 2,
43            JobState::Completed => 3,
44            JobState::Retryable => 4,
45            JobState::Failed => 5,
46            JobState::Cancelled => 6,
47            JobState::WaitingExternal => 7,
48        }
49    }
50
51    /// Check if this state is terminal (no further transitions possible).
52    pub fn is_terminal(&self) -> bool {
53        matches!(
54            self,
55            JobState::Completed | JobState::Failed | JobState::Cancelled
56        )
57    }
58}
59
60/// A row from the `awa.jobs` table.
61#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
62pub struct JobRow {
63    pub id: i64,
64    pub kind: String,
65    pub queue: String,
66    pub args: serde_json::Value,
67    pub state: JobState,
68    pub priority: i16,
69    pub attempt: i16,
70    pub run_lease: i64,
71    pub max_attempts: i16,
72    pub run_at: DateTime<Utc>,
73    pub heartbeat_at: Option<DateTime<Utc>>,
74    pub deadline_at: Option<DateTime<Utc>>,
75    pub attempted_at: Option<DateTime<Utc>>,
76    pub finalized_at: Option<DateTime<Utc>>,
77    pub created_at: DateTime<Utc>,
78    pub errors: Option<Vec<serde_json::Value>>,
79    pub metadata: serde_json::Value,
80    pub tags: Vec<String>,
81    pub unique_key: Option<Vec<u8>>,
82    /// Unique states bitmask — stored as BIT(8) in Postgres.
83    /// Skipped in FromRow since it's only used by the DB-side unique index.
84    #[sqlx(skip)]
85    pub unique_states: Option<u8>,
86    /// Callback ID for external webhook completion.
87    pub callback_id: Option<uuid::Uuid>,
88    /// Deadline for callback timeout.
89    pub callback_timeout_at: Option<DateTime<Utc>>,
90    /// CEL filter expression for callback resolution.
91    pub callback_filter: Option<String>,
92    /// CEL expression: does the payload indicate completion?
93    pub callback_on_complete: Option<String>,
94    /// CEL expression: does the payload indicate failure?
95    pub callback_on_fail: Option<String>,
96    /// CEL expression to transform the payload before returning.
97    pub callback_transform: Option<String>,
98    /// Structured progress reported by the handler during execution.
99    pub progress: Option<serde_json::Value>,
100}
101
102/// Options for inserting a job.
103#[derive(Debug, Clone)]
104pub struct InsertOpts {
105    pub queue: String,
106    pub priority: i16,
107    pub max_attempts: i16,
108    pub run_at: Option<DateTime<Utc>>,
109    pub deadline_duration: Option<chrono::Duration>,
110    pub metadata: serde_json::Value,
111    pub tags: Vec<String>,
112    pub unique: Option<UniqueOpts>,
113}
114
115impl Default for InsertOpts {
116    fn default() -> Self {
117        Self {
118            queue: "default".to_string(),
119            priority: 2,
120            max_attempts: 25,
121            run_at: None,
122            deadline_duration: None,
123            metadata: serde_json::json!({}),
124            tags: Vec::new(),
125            unique: None,
126        }
127    }
128}
129
130/// Uniqueness constraint options.
131#[derive(Debug, Clone)]
132pub struct UniqueOpts {
133    /// Include queue in uniqueness calculation.
134    pub by_queue: bool,
135    /// Include args in uniqueness calculation.
136    pub by_args: bool,
137    /// Period bucket for time-based uniqueness (epoch seconds / period).
138    pub by_period: Option<i64>,
139    /// States in which uniqueness is enforced.
140    /// Default: scheduled, available, running, completed, retryable (bits 0-4).
141    pub states: u8,
142}
143
144impl Default for UniqueOpts {
145    fn default() -> Self {
146        Self {
147            by_queue: false,
148            by_args: true,
149            by_period: None,
150            // Default: bits 0-4 set (scheduled, available, running, completed, retryable)
151            states: 0b0001_1111,
152        }
153    }
154}
155
156impl UniqueOpts {
157    /// Convert the states bitmask to a BIT(8) representation for Postgres.
158    pub fn states_bits(&self) -> Vec<u8> {
159        vec![self.states]
160    }
161}
162
163/// Parameters for bulk insert.
164#[derive(Debug, Clone)]
165pub struct InsertParams {
166    pub kind: String,
167    pub args: serde_json::Value,
168    pub opts: InsertOpts,
169}