1use crate::error::AwaError;
2use crate::job::{InsertOpts, InsertParams, JobRow, JobState};
3use crate::unique::compute_unique_key;
4use crate::JobArgs;
5use sqlx::postgres::PgConnection;
6use sqlx::{PgExecutor, PgPool};
7
8pub async fn insert<'e, E>(executor: E, args: &impl JobArgs) -> Result<JobRow, AwaError>
10where
11 E: PgExecutor<'e>,
12{
13 insert_with(executor, args, InsertOpts::default()).await
14}
15
16#[tracing::instrument(skip(executor, args), fields(job.kind = args.kind_str(), job.queue = %opts.queue))]
18pub async fn insert_with<'e, E>(
19 executor: E,
20 args: &impl JobArgs,
21 opts: InsertOpts,
22) -> Result<JobRow, AwaError>
23where
24 E: PgExecutor<'e>,
25{
26 let kind = args.kind_str();
27 let args_json = args.to_args()?;
28
29 let state = if opts.run_at.is_some() {
30 JobState::Scheduled
31 } else {
32 JobState::Available
33 };
34
35 let unique_key = opts.unique.as_ref().map(|u| {
36 compute_unique_key(
37 kind,
38 if u.by_queue { Some(&opts.queue) } else { None },
39 if u.by_args { Some(&args_json) } else { None },
40 u.by_period,
41 )
42 });
43
44 let unique_states_bits: Option<String> = opts.unique.as_ref().map(|u| {
45 let mut bit_string = String::with_capacity(8);
49 for bit_position in 0..8 {
50 if u.states & (1 << bit_position) != 0 {
51 bit_string.push('1');
52 } else {
53 bit_string.push('0');
54 }
55 }
56 bit_string
57 });
58
59 let row = sqlx::query_as::<_, JobRow>(
60 r#"
61 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
62 VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
63 RETURNING *
64 "#,
65 )
66 .bind(kind)
67 .bind(&opts.queue)
68 .bind(&args_json)
69 .bind(state)
70 .bind(opts.priority)
71 .bind(opts.max_attempts)
72 .bind(opts.run_at)
73 .bind(&opts.metadata)
74 .bind(&opts.tags)
75 .bind(&unique_key)
76 .bind(&unique_states_bits)
77 .fetch_one(executor)
78 .await
79 .map_err(|err| {
80 if let sqlx::Error::Database(ref db_err) = err {
81 if db_err.code().as_deref() == Some("23505") {
82 return AwaError::UniqueConflict {
86 constraint: db_err
87 .constraint()
88 .map(|c| c.to_string()),
89 };
90 }
91 }
92 AwaError::Database(err)
93 })?;
94
95 Ok(row)
96}
97
98struct RowValues {
100 kind: String,
101 queue: String,
102 args: serde_json::Value,
103 state: JobState,
104 priority: i16,
105 max_attempts: i16,
106 run_at: Option<chrono::DateTime<chrono::Utc>>,
107 metadata: serde_json::Value,
108 tags: Vec<String>,
109 unique_key: Option<Vec<u8>>,
110 unique_states: Option<String>,
111}
112
113fn precompute_row_values(jobs: &[InsertParams]) -> Vec<RowValues> {
115 jobs.iter()
116 .map(|job| {
117 let unique_key = job.opts.unique.as_ref().map(|u| {
118 compute_unique_key(
119 &job.kind,
120 if u.by_queue {
121 Some(job.opts.queue.as_str())
122 } else {
123 None
124 },
125 if u.by_args { Some(&job.args) } else { None },
126 u.by_period,
127 )
128 });
129
130 let unique_states = job.opts.unique.as_ref().map(|u| {
131 let mut bit_string = String::with_capacity(8);
132 for bit_position in 0..8 {
133 if u.states & (1 << bit_position) != 0 {
134 bit_string.push('1');
135 } else {
136 bit_string.push('0');
137 }
138 }
139 bit_string
140 });
141
142 RowValues {
143 kind: job.kind.clone(),
144 queue: job.opts.queue.clone(),
145 args: job.args.clone(),
146 state: if job.opts.run_at.is_some() {
147 JobState::Scheduled
148 } else {
149 JobState::Available
150 },
151 priority: job.opts.priority,
152 max_attempts: job.opts.max_attempts,
153 run_at: job.opts.run_at,
154 metadata: job.opts.metadata.clone(),
155 tags: job.opts.tags.clone(),
156 unique_key,
157 unique_states,
158 }
159 })
160 .collect()
161}
162
163#[tracing::instrument(skip(executor, jobs), fields(job.count = jobs.len()))]
168pub async fn insert_many<'e, E>(executor: E, jobs: &[InsertParams]) -> Result<Vec<JobRow>, AwaError>
169where
170 E: PgExecutor<'e>,
171{
172 if jobs.is_empty() {
173 return Ok(Vec::new());
174 }
175
176 let count = jobs.len();
177 let rows = precompute_row_values(jobs);
178
179 let mut query = String::from(
181 "INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) VALUES ",
182 );
183
184 let params_per_row = 11u32;
185 let mut param_index = 1u32;
186 for i in 0..count {
187 if i > 0 {
188 query.push_str(", ");
189 }
190 query.push_str(&format!(
191 "(${}, ${}, ${}, ${}, ${}, ${}, COALESCE(${}, now()), ${}, ${}, ${}, ${}::bit(8))",
192 param_index,
193 param_index + 1,
194 param_index + 2,
195 param_index + 3,
196 param_index + 4,
197 param_index + 5,
198 param_index + 6,
199 param_index + 7,
200 param_index + 8,
201 param_index + 9,
202 param_index + 10,
203 ));
204 param_index += params_per_row;
205 }
206 query.push_str(" RETURNING *");
207
208 let mut sql_query = sqlx::query_as::<_, JobRow>(&query);
209
210 for row in &rows {
211 sql_query = sql_query
212 .bind(&row.kind)
213 .bind(&row.queue)
214 .bind(&row.args)
215 .bind(row.state)
216 .bind(row.priority)
217 .bind(row.max_attempts)
218 .bind(row.run_at)
219 .bind(&row.metadata)
220 .bind(&row.tags)
221 .bind(&row.unique_key)
222 .bind(&row.unique_states);
223 }
224
225 let results = sql_query.fetch_all(executor).await?;
226
227 Ok(results)
228}
229
230#[tracing::instrument(skip(conn, jobs), fields(job.count = jobs.len()))]
237pub async fn insert_many_copy(
238 conn: &mut PgConnection,
239 jobs: &[InsertParams],
240) -> Result<Vec<JobRow>, AwaError> {
241 if jobs.is_empty() {
242 return Ok(Vec::new());
243 }
244
245 let rows = precompute_row_values(jobs);
246
247 sqlx::query(
249 r#"
250 CREATE TEMP TABLE awa_copy_staging (
251 kind TEXT NOT NULL,
252 queue TEXT NOT NULL,
253 args JSONB NOT NULL,
254 state TEXT NOT NULL,
255 priority SMALLINT NOT NULL,
256 max_attempts SMALLINT NOT NULL,
257 run_at TEXT,
258 metadata JSONB NOT NULL,
259 tags TEXT NOT NULL,
260 unique_key TEXT,
261 unique_states TEXT
262 ) ON COMMIT DROP
263 "#,
264 )
265 .execute(&mut *conn)
266 .await?;
267
268 let mut csv_buf = Vec::with_capacity(rows.len() * 256);
270 for row in &rows {
271 write_csv_row(&mut csv_buf, row);
272 }
273
274 let mut copy_in = conn
275 .copy_in_raw(
276 "COPY awa_copy_staging (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) FROM STDIN WITH (FORMAT csv, NULL '\\N')",
277 )
278 .await?;
279 copy_in.send(csv_buf).await?;
280 copy_in.finish().await?;
281
282 let has_unique = rows.iter().any(|r| r.unique_key.is_some());
284
285 let insert_sql = if has_unique {
286 r#"
287 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
288 SELECT
289 s.kind,
290 s.queue,
291 s.args,
292 s.state::awa.job_state,
293 s.priority,
294 s.max_attempts,
295 CASE WHEN s.run_at = '\N' OR s.run_at IS NULL THEN now() ELSE s.run_at::timestamptz END,
296 s.metadata,
297 s.tags::text[],
298 CASE WHEN s.unique_key = '\N' OR s.unique_key IS NULL THEN NULL ELSE decode(s.unique_key, 'hex') END,
299 CASE WHEN s.unique_states = '\N' OR s.unique_states IS NULL THEN NULL ELSE s.unique_states::bit(8) END
300 FROM awa_copy_staging s
301 ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL
302 AND unique_states IS NOT NULL
303 AND awa.job_state_in_bitmask(unique_states, state)
304 DO NOTHING
305 RETURNING *
306 "#
307 } else {
308 r#"
309 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
310 SELECT
311 s.kind,
312 s.queue,
313 s.args,
314 s.state::awa.job_state,
315 s.priority,
316 s.max_attempts,
317 CASE WHEN s.run_at = '\N' OR s.run_at IS NULL THEN now() ELSE s.run_at::timestamptz END,
318 s.metadata,
319 s.tags::text[],
320 CASE WHEN s.unique_key = '\N' OR s.unique_key IS NULL THEN NULL ELSE decode(s.unique_key, 'hex') END,
321 CASE WHEN s.unique_states = '\N' OR s.unique_states IS NULL THEN NULL ELSE s.unique_states::bit(8) END
322 FROM awa_copy_staging s
323 RETURNING *
324 "#
325 };
326
327 let results = sqlx::query_as::<_, JobRow>(insert_sql)
328 .fetch_all(&mut *conn)
329 .await?;
330
331 Ok(results)
332}
333
334#[tracing::instrument(skip(pool, jobs), fields(job.count = jobs.len()))]
339pub async fn insert_many_copy_from_pool(
340 pool: &PgPool,
341 jobs: &[InsertParams],
342) -> Result<Vec<JobRow>, AwaError> {
343 if jobs.is_empty() {
344 return Ok(Vec::new());
345 }
346
347 let mut tx = pool.begin().await?;
348 let results = insert_many_copy(&mut tx, jobs).await?;
349 tx.commit().await?;
350
351 Ok(results)
352}
353
354fn write_csv_row(buf: &mut Vec<u8>, row: &RowValues) {
358 write_csv_field(buf, &row.kind);
360 buf.push(b',');
361 write_csv_field(buf, &row.queue);
363 buf.push(b',');
364 let args_str = serde_json::to_string(&row.args).expect("JSON serialization should not fail");
366 write_csv_field(buf, &args_str);
367 buf.push(b',');
368 write_csv_field(buf, &row.state.to_string());
370 buf.push(b',');
371 buf.extend_from_slice(row.priority.to_string().as_bytes());
373 buf.push(b',');
374 buf.extend_from_slice(row.max_attempts.to_string().as_bytes());
376 buf.push(b',');
377 match &row.run_at {
379 Some(dt) => write_csv_field(buf, &dt.to_rfc3339()),
380 None => buf.extend_from_slice(b"\\N"),
381 }
382 buf.push(b',');
383 let metadata_str =
385 serde_json::to_string(&row.metadata).expect("JSON serialization should not fail");
386 write_csv_field(buf, &metadata_str);
387 buf.push(b',');
388 write_pg_text_array(buf, &row.tags);
390 buf.push(b',');
391 match &row.unique_key {
393 Some(key) => {
394 let hex = hex::encode(key);
396 write_csv_field(buf, &hex);
397 }
398 None => buf.extend_from_slice(b"\\N"),
399 }
400 buf.push(b',');
401 match &row.unique_states {
403 Some(bits) => write_csv_field(buf, bits),
404 None => buf.extend_from_slice(b"\\N"),
405 }
406 buf.push(b'\n');
407}
408
409fn write_csv_field(buf: &mut Vec<u8>, value: &str) {
411 if value.contains(',')
412 || value.contains('"')
413 || value.contains('\n')
414 || value.contains('\r')
415 || value.contains('\\')
416 {
417 buf.push(b'"');
418 for byte in value.bytes() {
419 if byte == b'"' {
420 buf.push(b'"');
421 }
422 buf.push(byte);
423 }
424 buf.push(b'"');
425 } else {
426 buf.extend_from_slice(value.as_bytes());
427 }
428}
429
430fn write_pg_text_array(buf: &mut Vec<u8>, values: &[String]) {
433 buf.push(b'"');
434 buf.push(b'{');
435 for (i, val) in values.iter().enumerate() {
436 if i > 0 {
437 buf.push(b',');
438 }
439 if val.is_empty()
441 || val.contains(',')
442 || val.contains('"')
443 || val.contains('\\')
444 || val.contains('{')
445 || val.contains('}')
446 || val.contains(' ')
447 || val.eq_ignore_ascii_case("NULL")
448 {
449 buf.push(b'"');
453 buf.push(b'"');
454 for ch in val.chars() {
455 match ch {
456 '"' => {
457 buf.extend_from_slice(b"\\\"\"");
465 }
466 '\\' => {
467 buf.extend_from_slice(b"\\\\");
469 }
470 _ => {
471 let mut utf8_buf = [0u8; 4];
472 buf.extend_from_slice(ch.encode_utf8(&mut utf8_buf).as_bytes());
473 }
474 }
475 }
476 buf.push(b'"');
477 buf.push(b'"');
478 } else {
479 buf.extend_from_slice(val.as_bytes());
480 }
481 }
482 buf.push(b'}');
483 buf.push(b'"');
484}
485
486pub fn params(args: &impl JobArgs) -> Result<InsertParams, AwaError> {
488 params_with(args, InsertOpts::default())
489}
490
491pub fn params_with(args: &impl JobArgs, opts: InsertOpts) -> Result<InsertParams, AwaError> {
493 Ok(InsertParams {
494 kind: args.kind_str().to_string(),
495 args: args.to_args()?,
496 opts,
497 })
498}