1use chrono::{DateTime, Utc};
2use serde::Serialize;
3use sha2::{Digest, Sha256};
4
5use crate::db::{ConnExt, ConnQueryExt, Database};
6use crate::error::{Error, Result};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum EnqueueResult {
14 Created(String),
16 Duplicate(String),
19}
20
21#[derive(Clone)]
23pub struct EnqueueOptions {
24 pub queue: String,
26 pub run_at: Option<DateTime<Utc>>,
28}
29
30impl Default for EnqueueOptions {
31 fn default() -> Self {
32 Self {
33 queue: "default".to_string(),
34 run_at: None,
35 }
36 }
37}
38
39#[derive(Clone)]
44pub struct Enqueuer {
45 db: Database,
46}
47
48impl Enqueuer {
49 pub fn new(db: Database) -> Self {
51 Self { db }
52 }
53
54 pub async fn enqueue<T: Serialize>(&self, name: &str, payload: &T) -> Result<String> {
63 self.enqueue_with(name, payload, EnqueueOptions::default())
64 .await
65 }
66
67 pub async fn enqueue_at<T: Serialize>(
76 &self,
77 name: &str,
78 payload: &T,
79 run_at: DateTime<Utc>,
80 ) -> Result<String> {
81 self.enqueue_with(
82 name,
83 payload,
84 EnqueueOptions {
85 run_at: Some(run_at),
86 ..Default::default()
87 },
88 )
89 .await
90 }
91
92 pub async fn enqueue_with<T: Serialize>(
101 &self,
102 name: &str,
103 payload: &T,
104 options: EnqueueOptions,
105 ) -> Result<String> {
106 let id = crate::id::ulid();
107 let payload_json = serde_json::to_string(payload)
108 .map_err(|e| Error::internal(format!("serialize job payload: {e}")))?;
109 let now = Utc::now();
110 let run_at = options.run_at.unwrap_or(now);
111 let now_str = now.to_rfc3339();
112 let run_at_str = run_at.to_rfc3339();
113
114 self.db
115 .conn()
116 .execute_raw(
117 "INSERT INTO jobs (id, name, queue, payload, status, attempt, run_at, created_at, updated_at) \
118 VALUES (?1, ?2, ?3, ?4, 'pending', 0, ?5, ?6, ?7)",
119 libsql::params![id.as_str(), name, options.queue.as_str(), payload_json.as_str(), run_at_str.as_str(), now_str.as_str(), now_str.as_str()],
120 )
121 .await
122 .map_err(|e| Error::internal(format!("enqueue job: {e}")))?;
123
124 Ok(id)
125 }
126
127 pub async fn enqueue_unique<T: Serialize>(
138 &self,
139 name: &str,
140 payload: &T,
141 ) -> Result<EnqueueResult> {
142 self.enqueue_unique_with(name, payload, EnqueueOptions::default())
143 .await
144 }
145
146 pub async fn enqueue_unique_with<T: Serialize>(
157 &self,
158 name: &str,
159 payload: &T,
160 options: EnqueueOptions,
161 ) -> Result<EnqueueResult> {
162 let payload_json = serde_json::to_string(payload)
163 .map_err(|e| Error::internal(format!("serialize job payload: {e}")))?;
164 let hash = compute_payload_hash(name, &payload_json);
165 let id = crate::id::ulid();
166 let now = Utc::now();
167 let run_at = options.run_at.unwrap_or(now);
168 let now_str = now.to_rfc3339();
169 let run_at_str = run_at.to_rfc3339();
170
171 match self
172 .db
173 .conn()
174 .execute_raw(
175 "INSERT INTO jobs (id, name, queue, payload, payload_hash, status, attempt, run_at, created_at, updated_at) \
176 VALUES (?1, ?2, ?3, ?4, ?5, 'pending', 0, ?6, ?7, ?8)",
177 libsql::params![id.as_str(), name, options.queue.as_str(), payload_json.as_str(), hash.as_str(), run_at_str.as_str(), now_str.as_str(), now_str.as_str()],
178 )
179 .await
180 {
181 Ok(_) => Ok(EnqueueResult::Created(id)),
182 Err(ref e) if is_unique_violation(e) => {
183 let existing_id: String = self
184 .db
185 .conn()
186 .query_one_map(
187 "SELECT id FROM jobs WHERE payload_hash = ?1 AND status IN ('pending', 'running') LIMIT 1",
188 libsql::params![hash.as_str()],
189 |row| {
190 use crate::db::FromValue;
191 let val = row.get_value(0).map_err(crate::Error::from)?;
192 String::from_value(val)
193 },
194 )
195 .await
196 .map_err(|e| Error::internal(format!("fetch duplicate job id: {e}")))?;
197
198 Ok(EnqueueResult::Duplicate(existing_id))
199 }
200 Err(e) => Err(Error::internal(format!("enqueue unique job: {e}"))),
201 }
202 }
203
204 pub async fn cancel(&self, id: &str) -> Result<bool> {
213 let now_str = Utc::now().to_rfc3339();
214 let affected = self
215 .db
216 .conn()
217 .execute_raw(
218 "UPDATE jobs SET status = 'cancelled', updated_at = ?1 WHERE id = ?2 AND status = 'pending'",
219 libsql::params![now_str.as_str(), id],
220 )
221 .await
222 .map_err(|e| Error::internal(format!("cancel job: {e}")))?;
223
224 Ok(affected > 0)
225 }
226}
227
228fn is_unique_violation(err: &libsql::Error) -> bool {
230 matches!(err, libsql::Error::SqliteFailure(2067 | 1555, _))
231}
232
233fn compute_payload_hash(name: &str, payload_json: &str) -> String {
234 let mut hasher = Sha256::new();
235 hasher.update(name.as_bytes());
236 hasher.update(b"\0");
237 hasher.update(payload_json.as_bytes());
238 crate::encoding::hex::encode(&hasher.finalize())
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244
245 #[test]
246 fn payload_hash_is_deterministic() {
247 let h1 = compute_payload_hash("test", r#"{"a":1}"#);
248 let h2 = compute_payload_hash("test", r#"{"a":1}"#);
249 assert_eq!(h1, h2);
250 }
251
252 #[test]
253 fn payload_hash_differs_by_name() {
254 let h1 = compute_payload_hash("job_a", r#"{"a":1}"#);
255 let h2 = compute_payload_hash("job_b", r#"{"a":1}"#);
256 assert_ne!(h1, h2);
257 }
258
259 #[test]
260 fn payload_hash_differs_by_payload() {
261 let h1 = compute_payload_hash("test", r#"{"a":1}"#);
262 let h2 = compute_payload_hash("test", r#"{"a":2}"#);
263 assert_ne!(h1, h2);
264 }
265
266 #[test]
267 fn payload_hash_no_boundary_collision() {
268 let h1 = compute_payload_hash("ab", "c");
269 let h2 = compute_payload_hash("a", "bc");
270 assert_ne!(h1, h2);
271 }
272}