1use crate::error::AwaError;
2use crate::job::{InsertOpts, InsertParams, JobRow, JobState};
3use crate::unique::compute_unique_key;
4use crate::JobArgs;
5use sqlx::postgres::PgConnection;
6use sqlx::{PgExecutor, PgPool};
7
8pub async fn insert<'e, E>(executor: E, args: &impl JobArgs) -> Result<JobRow, AwaError>
10where
11 E: PgExecutor<'e>,
12{
13 insert_with(executor, args, InsertOpts::default()).await
14}
15
16#[tracing::instrument(skip(executor, args), fields(job.kind = args.kind_str(), job.queue = %opts.queue))]
18pub async fn insert_with<'e, E>(
19 executor: E,
20 args: &impl JobArgs,
21 opts: InsertOpts,
22) -> Result<JobRow, AwaError>
23where
24 E: PgExecutor<'e>,
25{
26 let kind = args.kind_str();
27 let args_json = args.to_args()?;
28
29 let state = if opts.run_at.is_some() {
30 JobState::Scheduled
31 } else {
32 JobState::Available
33 };
34
35 let unique_key = opts.unique.as_ref().map(|u| {
36 compute_unique_key(
37 kind,
38 if u.by_queue { Some(&opts.queue) } else { None },
39 if u.by_args { Some(&args_json) } else { None },
40 u.by_period,
41 )
42 });
43
44 let unique_states_bits: Option<String> = opts.unique.as_ref().map(|u| {
45 let mut bit_string = String::with_capacity(8);
49 for bit_position in 0..8 {
50 if u.states & (1 << bit_position) != 0 {
51 bit_string.push('1');
52 } else {
53 bit_string.push('0');
54 }
55 }
56 bit_string
57 });
58
59 let row = sqlx::query_as::<_, JobRow>(
60 r#"
61 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
62 VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
63 RETURNING *
64 "#,
65 )
66 .bind(kind)
67 .bind(&opts.queue)
68 .bind(&args_json)
69 .bind(state)
70 .bind(opts.priority)
71 .bind(opts.max_attempts)
72 .bind(opts.run_at)
73 .bind(&opts.metadata)
74 .bind(&opts.tags)
75 .bind(&unique_key)
76 .bind(&unique_states_bits)
77 .fetch_one(executor)
78 .await
79 .map_err(|err| {
80 if let sqlx::Error::Database(ref db_err) = err {
81 if db_err.code().as_deref() == Some("23505") {
82 return AwaError::UniqueConflict {
86 constraint: db_err
87 .constraint()
88 .map(|c| c.to_string()),
89 };
90 }
91 }
92 AwaError::Database(err)
93 })?;
94
95 Ok(row)
96}
97
98struct RowValues {
100 kind: String,
101 queue: String,
102 args: serde_json::Value,
103 state: JobState,
104 priority: i16,
105 max_attempts: i16,
106 run_at: Option<chrono::DateTime<chrono::Utc>>,
107 metadata: serde_json::Value,
108 tags: Vec<String>,
109 unique_key: Option<Vec<u8>>,
110 unique_states: Option<String>,
111}
112
113fn precompute_row_values(jobs: &[InsertParams]) -> Vec<RowValues> {
115 jobs.iter()
116 .map(|job| {
117 let unique_key = job.opts.unique.as_ref().map(|u| {
118 compute_unique_key(
119 &job.kind,
120 if u.by_queue {
121 Some(job.opts.queue.as_str())
122 } else {
123 None
124 },
125 if u.by_args { Some(&job.args) } else { None },
126 u.by_period,
127 )
128 });
129
130 let unique_states = job.opts.unique.as_ref().map(|u| {
131 let mut bit_string = String::with_capacity(8);
132 for bit_position in 0..8 {
133 if u.states & (1 << bit_position) != 0 {
134 bit_string.push('1');
135 } else {
136 bit_string.push('0');
137 }
138 }
139 bit_string
140 });
141
142 RowValues {
143 kind: job.kind.clone(),
144 queue: job.opts.queue.clone(),
145 args: job.args.clone(),
146 state: if job.opts.run_at.is_some() {
147 JobState::Scheduled
148 } else {
149 JobState::Available
150 },
151 priority: job.opts.priority,
152 max_attempts: job.opts.max_attempts,
153 run_at: job.opts.run_at,
154 metadata: job.opts.metadata.clone(),
155 tags: job.opts.tags.clone(),
156 unique_key,
157 unique_states,
158 }
159 })
160 .collect()
161}
162
163#[tracing::instrument(skip(executor, jobs), fields(job.count = jobs.len()))]
168pub async fn insert_many<'e, E>(executor: E, jobs: &[InsertParams]) -> Result<Vec<JobRow>, AwaError>
169where
170 E: PgExecutor<'e>,
171{
172 if jobs.is_empty() {
173 return Ok(Vec::new());
174 }
175
176 let count = jobs.len();
177 let rows = precompute_row_values(jobs);
178
179 let mut query = String::from(
181 "INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) VALUES ",
182 );
183
184 let params_per_row = 11u32;
185 let mut param_index = 1u32;
186 for i in 0..count {
187 if i > 0 {
188 query.push_str(", ");
189 }
190 query.push_str(&format!(
191 "(${}, ${}, ${}, ${}, ${}, ${}, COALESCE(${}, now()), ${}, ${}, ${}, ${}::bit(8))",
192 param_index,
193 param_index + 1,
194 param_index + 2,
195 param_index + 3,
196 param_index + 4,
197 param_index + 5,
198 param_index + 6,
199 param_index + 7,
200 param_index + 8,
201 param_index + 9,
202 param_index + 10,
203 ));
204 param_index += params_per_row;
205 }
206 query.push_str(" RETURNING *");
207
208 let mut sql_query = sqlx::query_as::<_, JobRow>(&query);
209
210 for row in &rows {
211 sql_query = sql_query
212 .bind(&row.kind)
213 .bind(&row.queue)
214 .bind(&row.args)
215 .bind(row.state)
216 .bind(row.priority)
217 .bind(row.max_attempts)
218 .bind(row.run_at)
219 .bind(&row.metadata)
220 .bind(&row.tags)
221 .bind(&row.unique_key)
222 .bind(&row.unique_states);
223 }
224
225 let results = sql_query.fetch_all(executor).await?;
226
227 Ok(results)
228}
229
230#[tracing::instrument(skip(conn, jobs), fields(job.count = jobs.len()))]
237pub async fn insert_many_copy(
238 conn: &mut PgConnection,
239 jobs: &[InsertParams],
240) -> Result<Vec<JobRow>, AwaError> {
241 if jobs.is_empty() {
242 return Ok(Vec::new());
243 }
244
245 let rows = precompute_row_values(jobs);
246
247 sqlx::query(
249 r#"
250 CREATE TEMP TABLE awa_copy_staging (
251 kind TEXT NOT NULL,
252 queue TEXT NOT NULL,
253 args JSONB NOT NULL,
254 state TEXT NOT NULL,
255 priority SMALLINT NOT NULL,
256 max_attempts SMALLINT NOT NULL,
257 run_at TEXT,
258 metadata JSONB NOT NULL,
259 tags TEXT NOT NULL,
260 unique_key TEXT,
261 unique_states TEXT
262 ) ON COMMIT DROP
263 "#,
264 )
265 .execute(&mut *conn)
266 .await?;
267
268 let mut csv_buf = Vec::with_capacity(rows.len() * 256);
270 for row in &rows {
271 write_csv_row(&mut csv_buf, row);
272 }
273
274 let mut copy_in = conn
275 .copy_in_raw(
276 "COPY awa_copy_staging (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) FROM STDIN WITH (FORMAT csv, NULL '\\N')",
277 )
278 .await?;
279 copy_in.send(csv_buf).await?;
280 copy_in.finish().await?;
281
282 let has_unique = rows.iter().any(|r| r.unique_key.is_some());
284
285 let results = if has_unique {
286 let staged_rows = sqlx::query_as::<
291 _,
292 (
293 String,
294 String,
295 serde_json::Value,
296 String,
297 i16,
298 i16,
299 Option<chrono::DateTime<chrono::Utc>>,
300 serde_json::Value,
301 Vec<String>,
302 Option<Vec<u8>>,
303 Option<String>,
304 ),
305 >(
306 r#"
307 SELECT
308 kind,
309 queue,
310 args,
311 state,
312 priority,
313 max_attempts,
314 CASE WHEN run_at = '\N' OR run_at IS NULL THEN NULL ELSE run_at::timestamptz END,
315 metadata,
316 tags::text[],
317 CASE WHEN unique_key = '\N' OR unique_key IS NULL THEN NULL ELSE decode(unique_key, 'hex') END,
318 unique_states
319 FROM awa_copy_staging
320 "#,
321 )
322 .fetch_all(&mut *conn)
323 .await?;
324
325 let mut inserted = Vec::with_capacity(staged_rows.len());
326 for (
327 kind,
328 queue,
329 args,
330 state,
331 priority,
332 max_attempts,
333 run_at,
334 metadata,
335 tags,
336 unique_key,
337 unique_states,
338 ) in staged_rows
339 {
340 sqlx::query("SAVEPOINT awa_copy_unique_row")
341 .execute(&mut *conn)
342 .await?;
343
344 let result = sqlx::query_as::<_, JobRow>(
345 r#"
346 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
347 VALUES ($1, $2, $3, $4::awa.job_state, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
348 RETURNING *
349 "#,
350 )
351 .bind(&kind)
352 .bind(&queue)
353 .bind(&args)
354 .bind(&state)
355 .bind(priority)
356 .bind(max_attempts)
357 .bind(run_at)
358 .bind(&metadata)
359 .bind(&tags)
360 .bind(&unique_key)
361 .bind(&unique_states)
362 .fetch_one(&mut *conn)
363 .await;
364
365 match result {
366 Ok(row) => {
367 inserted.push(row);
368 sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
369 .execute(&mut *conn)
370 .await?;
371 }
372 Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("23505") => {
373 sqlx::query("ROLLBACK TO SAVEPOINT awa_copy_unique_row")
374 .execute(&mut *conn)
375 .await?;
376 sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
377 .execute(&mut *conn)
378 .await?;
379 continue;
380 }
381 Err(err) => {
382 sqlx::query("ROLLBACK TO SAVEPOINT awa_copy_unique_row")
383 .execute(&mut *conn)
384 .await?;
385 sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
386 .execute(&mut *conn)
387 .await?;
388 return Err(AwaError::Database(err));
389 }
390 }
391 }
392
393 inserted
394 } else {
395 let insert_sql = r#"
396 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
397 SELECT
398 s.kind,
399 s.queue,
400 s.args,
401 s.state::awa.job_state,
402 s.priority,
403 s.max_attempts,
404 CASE WHEN s.run_at = '\N' OR s.run_at IS NULL THEN now() ELSE s.run_at::timestamptz END,
405 s.metadata,
406 s.tags::text[],
407 CASE WHEN s.unique_key = '\N' OR s.unique_key IS NULL THEN NULL ELSE decode(s.unique_key, 'hex') END,
408 CASE WHEN s.unique_states = '\N' OR s.unique_states IS NULL THEN NULL ELSE s.unique_states::bit(8) END
409 FROM awa_copy_staging s
410 RETURNING *
411 "#;
412
413 sqlx::query_as::<_, JobRow>(insert_sql)
414 .fetch_all(&mut *conn)
415 .await?
416 };
417
418 Ok(results)
419}
420
421#[tracing::instrument(skip(pool, jobs), fields(job.count = jobs.len()))]
426pub async fn insert_many_copy_from_pool(
427 pool: &PgPool,
428 jobs: &[InsertParams],
429) -> Result<Vec<JobRow>, AwaError> {
430 if jobs.is_empty() {
431 return Ok(Vec::new());
432 }
433
434 let mut tx = pool.begin().await?;
435 let results = insert_many_copy(&mut tx, jobs).await?;
436 tx.commit().await?;
437
438 Ok(results)
439}
440
441fn write_csv_row(buf: &mut Vec<u8>, row: &RowValues) {
445 write_csv_field(buf, &row.kind);
447 buf.push(b',');
448 write_csv_field(buf, &row.queue);
450 buf.push(b',');
451 let args_str = serde_json::to_string(&row.args).expect("JSON serialization should not fail");
453 write_csv_field(buf, &args_str);
454 buf.push(b',');
455 write_csv_field(buf, &row.state.to_string());
457 buf.push(b',');
458 buf.extend_from_slice(row.priority.to_string().as_bytes());
460 buf.push(b',');
461 buf.extend_from_slice(row.max_attempts.to_string().as_bytes());
463 buf.push(b',');
464 match &row.run_at {
466 Some(dt) => write_csv_field(buf, &dt.to_rfc3339()),
467 None => buf.extend_from_slice(b"\\N"),
468 }
469 buf.push(b',');
470 let metadata_str =
472 serde_json::to_string(&row.metadata).expect("JSON serialization should not fail");
473 write_csv_field(buf, &metadata_str);
474 buf.push(b',');
475 write_pg_text_array(buf, &row.tags);
477 buf.push(b',');
478 match &row.unique_key {
480 Some(key) => {
481 let hex = hex::encode(key);
483 write_csv_field(buf, &hex);
484 }
485 None => buf.extend_from_slice(b"\\N"),
486 }
487 buf.push(b',');
488 match &row.unique_states {
490 Some(bits) => write_csv_field(buf, bits),
491 None => buf.extend_from_slice(b"\\N"),
492 }
493 buf.push(b'\n');
494}
495
496fn write_csv_field(buf: &mut Vec<u8>, value: &str) {
498 if value.contains(',')
499 || value.contains('"')
500 || value.contains('\n')
501 || value.contains('\r')
502 || value.contains('\\')
503 {
504 buf.push(b'"');
505 for byte in value.bytes() {
506 if byte == b'"' {
507 buf.push(b'"');
508 }
509 buf.push(byte);
510 }
511 buf.push(b'"');
512 } else {
513 buf.extend_from_slice(value.as_bytes());
514 }
515}
516
517fn write_pg_text_array(buf: &mut Vec<u8>, values: &[String]) {
520 buf.push(b'"');
521 buf.push(b'{');
522 for (i, val) in values.iter().enumerate() {
523 if i > 0 {
524 buf.push(b',');
525 }
526 if val.is_empty()
528 || val.contains(',')
529 || val.contains('"')
530 || val.contains('\\')
531 || val.contains('{')
532 || val.contains('}')
533 || val.contains(' ')
534 || val.eq_ignore_ascii_case("NULL")
535 {
536 buf.push(b'"');
540 buf.push(b'"');
541 for ch in val.chars() {
542 match ch {
543 '"' => {
544 buf.extend_from_slice(b"\\\"\"");
552 }
553 '\\' => {
554 buf.extend_from_slice(b"\\\\");
556 }
557 _ => {
558 let mut utf8_buf = [0u8; 4];
559 buf.extend_from_slice(ch.encode_utf8(&mut utf8_buf).as_bytes());
560 }
561 }
562 }
563 buf.push(b'"');
564 buf.push(b'"');
565 } else {
566 buf.extend_from_slice(val.as_bytes());
567 }
568 }
569 buf.push(b'}');
570 buf.push(b'"');
571}
572
573pub fn params(args: &impl JobArgs) -> Result<InsertParams, AwaError> {
575 params_with(args, InsertOpts::default())
576}
577
578pub fn params_with(args: &impl JobArgs, opts: InsertOpts) -> Result<InsertParams, AwaError> {
580 Ok(InsertParams {
581 kind: args.kind_str().to_string(),
582 args: args.to_args()?,
583 opts,
584 })
585}