1use crate::error::AwaError;
2use crate::job::{InsertOpts, InsertParams, JobRow, JobState};
3use crate::unique::compute_unique_key;
4use crate::JobArgs;
5use sqlx::postgres::PgConnection;
6use sqlx::{PgExecutor, PgPool};
7
8const COPY_NULL_SENTINEL: &str = "__AWA_NULL__";
9
10pub(crate) fn reject_null_bytes(value: &serde_json::Value) -> Result<(), AwaError> {
23 match value {
24 serde_json::Value::String(s) if s.contains('\0') => Err(AwaError::Validation(
25 "job args/metadata must not contain null bytes (\\u0000): Postgres JSONB does not support them".into(),
26 )),
27 serde_json::Value::Array(arr) => {
28 for v in arr {
29 reject_null_bytes(v)?;
30 }
31 Ok(())
32 }
33 serde_json::Value::Object(map) => {
34 for (k, v) in map {
35 if k.contains('\0') {
36 return Err(AwaError::Validation(
37 "job args/metadata keys must not contain null bytes (\\u0000)".into(),
38 ));
39 }
40 reject_null_bytes(v)?;
41 }
42 Ok(())
43 }
44 _ => Ok(()),
45 }
46}
47
48pub(crate) struct PreparedRow {
52 pub kind: String,
53 pub queue: String,
54 pub args: serde_json::Value,
55 pub state: JobState,
56 pub priority: i16,
57 pub max_attempts: i16,
58 pub run_at: Option<chrono::DateTime<chrono::Utc>>,
59 pub metadata: serde_json::Value,
60 pub tags: Vec<String>,
61 pub unique_key: Option<Vec<u8>>,
62 pub unique_states: Option<String>,
63}
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66enum TargetTable {
67 JobsHot,
68 ScheduledJobs,
69}
70
71impl TargetTable {
72 fn as_str(self) -> &'static str {
73 match self {
74 TargetTable::JobsHot => "awa.jobs_hot",
75 TargetTable::ScheduledJobs => "awa.scheduled_jobs",
76 }
77 }
78}
79
80fn target_table_for_state(state: JobState) -> TargetTable {
81 match state {
82 JobState::Scheduled | JobState::Retryable => TargetTable::ScheduledJobs,
83 _ => TargetTable::JobsHot,
84 }
85}
86
87fn homogeneous_target_table(rows: &[PreparedRow]) -> Option<TargetTable> {
88 let first = rows.first().map(|row| target_table_for_state(row.state))?;
89 rows.iter()
90 .all(|row| target_table_for_state(row.state) == first)
91 .then_some(first)
92}
93
94fn map_sqlx_error(err: sqlx::Error) -> AwaError {
95 if let sqlx::Error::Database(ref db_err) = err {
96 if db_err.code().as_deref() == Some("23505") {
97 return AwaError::UniqueConflict {
98 constraint: db_err.constraint().map(|c| c.to_string()),
99 };
100 }
101 }
102 AwaError::Database(err)
103}
104
105fn build_multi_insert_query(target_table: &str, count: usize) -> String {
106 let mut query = format!(
107 "INSERT INTO {} (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) VALUES ",
108 target_table
109 );
110
111 let params_per_row = 11u32;
112 let mut param_index = 1u32;
113 for i in 0..count {
114 if i > 0 {
115 query.push_str(", ");
116 }
117 query.push_str(&format!(
118 "(${}, ${}, ${}, ${}, ${}, ${}, COALESCE(${}, now()), ${}, ${}, ${}, ${}::bit(8))",
119 param_index,
120 param_index + 1,
121 param_index + 2,
122 param_index + 3,
123 param_index + 4,
124 param_index + 5,
125 param_index + 6,
126 param_index + 7,
127 param_index + 8,
128 param_index + 9,
129 param_index + 10,
130 ));
131 param_index += params_per_row;
132 }
133 query.push_str(" RETURNING *");
134 query
135}
136
137fn compute_unique_fields(
139 kind: &str,
140 args: &serde_json::Value,
141 opts: &InsertOpts,
142) -> (Option<Vec<u8>>, Option<String>) {
143 let unique_key = opts.unique.as_ref().map(|u| {
144 compute_unique_key(
145 kind,
146 if u.by_queue { Some(&opts.queue) } else { None },
147 if u.by_args { Some(args) } else { None },
148 u.by_period,
149 )
150 });
151
152 let unique_states = opts.unique.as_ref().map(|u| {
153 let mut bit_string = String::with_capacity(8);
157 for bit_position in 0..8 {
158 if u.states & (1 << bit_position) != 0 {
159 bit_string.push('1');
160 } else {
161 bit_string.push('0');
162 }
163 }
164 bit_string
165 });
166
167 (unique_key, unique_states)
168}
169
170pub(crate) fn prepare_row(args: &impl JobArgs, opts: InsertOpts) -> Result<PreparedRow, AwaError> {
174 let kind = args.kind_str().to_string();
175 let args_value = args.to_args()?;
176 prepare_row_raw(kind, args_value, opts)
177}
178
179pub(crate) fn prepare_row_raw(
181 kind: String,
182 args: serde_json::Value,
183 opts: InsertOpts,
184) -> Result<PreparedRow, AwaError> {
185 reject_null_bytes(&args)?;
186 reject_null_bytes(&opts.metadata)?;
187
188 let state = if opts.run_at.is_some() {
189 JobState::Scheduled
190 } else {
191 JobState::Available
192 };
193
194 let (unique_key, unique_states) = compute_unique_fields(&kind, &args, &opts);
195
196 Ok(PreparedRow {
197 kind,
198 queue: opts.queue,
199 args,
200 state,
201 priority: opts.priority,
202 max_attempts: opts.max_attempts,
203 run_at: opts.run_at,
204 metadata: opts.metadata,
205 tags: opts.tags,
206 unique_key,
207 unique_states,
208 })
209}
210
211pub async fn insert<'e, E>(executor: E, args: &impl JobArgs) -> Result<JobRow, AwaError>
215where
216 E: PgExecutor<'e>,
217{
218 insert_with(executor, args, InsertOpts::default()).await
219}
220
221#[tracing::instrument(skip(executor, args), fields(job.kind = args.kind_str(), job.queue = %opts.queue))]
223pub async fn insert_with<'e, E>(
224 executor: E,
225 args: &impl JobArgs,
226 opts: InsertOpts,
227) -> Result<JobRow, AwaError>
228where
229 E: PgExecutor<'e>,
230{
231 let row = prepare_row(args, opts)?;
232 let query = format!(
233 r#"
234 INSERT INTO {} (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
235 VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
236 RETURNING *
237 "#,
238 target_table_for_state(row.state).as_str()
239 );
240
241 sqlx::query_as::<_, JobRow>(&query)
242 .bind(&row.kind)
243 .bind(&row.queue)
244 .bind(&row.args)
245 .bind(row.state)
246 .bind(row.priority)
247 .bind(row.max_attempts)
248 .bind(row.run_at)
249 .bind(&row.metadata)
250 .bind(&row.tags)
251 .bind(&row.unique_key)
252 .bind(&row.unique_states)
253 .fetch_one(executor)
254 .await
255 .map_err(map_sqlx_error)
256}
257
258fn precompute_rows(jobs: &[InsertParams]) -> Result<Vec<PreparedRow>, AwaError> {
260 jobs.iter()
261 .map(|job| prepare_row_raw(job.kind.clone(), job.args.clone(), job.opts.clone()))
262 .collect()
263}
264
265#[tracing::instrument(skip(executor, jobs), fields(job.count = jobs.len()))]
270pub async fn insert_many<'e, E>(executor: E, jobs: &[InsertParams]) -> Result<Vec<JobRow>, AwaError>
271where
272 E: PgExecutor<'e>,
273{
274 if jobs.is_empty() {
275 return Ok(Vec::new());
276 }
277
278 let rows = precompute_rows(jobs)?;
279 let target_table = homogeneous_target_table(&rows)
280 .map(TargetTable::as_str)
281 .unwrap_or("awa.jobs");
282 let query = build_multi_insert_query(target_table, rows.len());
283
284 let mut sql_query = sqlx::query_as::<_, JobRow>(&query);
285
286 for row in &rows {
287 sql_query = sql_query
288 .bind(&row.kind)
289 .bind(&row.queue)
290 .bind(&row.args)
291 .bind(row.state)
292 .bind(row.priority)
293 .bind(row.max_attempts)
294 .bind(row.run_at)
295 .bind(&row.metadata)
296 .bind(&row.tags)
297 .bind(&row.unique_key)
298 .bind(&row.unique_states);
299 }
300
301 let results = sql_query.fetch_all(executor).await?;
302
303 Ok(results)
304}
305
306#[tracing::instrument(skip(conn, jobs), fields(job.count = jobs.len()))]
313pub async fn insert_many_copy(
314 conn: &mut PgConnection,
315 jobs: &[InsertParams],
316) -> Result<Vec<JobRow>, AwaError> {
317 if jobs.is_empty() {
318 return Ok(Vec::new());
319 }
320
321 let rows = precompute_rows(jobs)?;
322 let target_table = homogeneous_target_table(&rows)
323 .map(TargetTable::as_str)
324 .unwrap_or("awa.jobs");
325
326 sqlx::query(
332 r#"
333 CREATE TEMP TABLE IF NOT EXISTS pg_temp.awa_copy_staging (
334 kind TEXT NOT NULL,
335 queue TEXT NOT NULL,
336 args JSONB NOT NULL,
337 state awa.job_state NOT NULL,
338 priority SMALLINT NOT NULL,
339 max_attempts SMALLINT NOT NULL,
340 run_at TIMESTAMPTZ,
341 metadata JSONB NOT NULL,
342 tags TEXT[] NOT NULL,
343 unique_key BYTEA,
344 unique_states BIT(8)
345 ) ON COMMIT DELETE ROWS
346 "#,
347 )
348 .execute(&mut *conn)
349 .await?;
350
351 let mut csv_buf = Vec::with_capacity(rows.len() * 256);
353 for row in &rows {
354 write_csv_row(&mut csv_buf, row);
355 }
356
357 let mut copy_in = conn
358 .copy_in_raw(
359 "COPY pg_temp.awa_copy_staging (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) FROM STDIN WITH (FORMAT csv, NULL '__AWA_NULL__')",
360 )
361 .await?;
362 copy_in.send(csv_buf).await?;
363 copy_in.finish().await?;
364
365 let has_unique = rows.iter().any(|r| r.unique_key.is_some());
367
368 let results = if has_unique {
369 let staged_rows = sqlx::query_as::<
374 _,
375 (
376 String,
377 String,
378 serde_json::Value,
379 String,
380 i16,
381 i16,
382 Option<chrono::DateTime<chrono::Utc>>,
383 serde_json::Value,
384 Vec<String>,
385 Option<Vec<u8>>,
386 Option<String>,
387 ),
388 >(
389 r#"
390 SELECT
391 kind,
392 queue,
393 args,
394 state::text,
395 priority,
396 max_attempts,
397 run_at,
398 metadata,
399 tags,
400 unique_key,
401 unique_states::text
402 FROM pg_temp.awa_copy_staging
403 "#,
404 )
405 .fetch_all(&mut *conn)
406 .await?;
407
408 let mut inserted = Vec::with_capacity(staged_rows.len());
409 for (
410 kind,
411 queue,
412 args,
413 state,
414 priority,
415 max_attempts,
416 run_at,
417 metadata,
418 tags,
419 unique_key,
420 unique_states,
421 ) in staged_rows
422 {
423 sqlx::query("SAVEPOINT awa_copy_unique_row")
424 .execute(&mut *conn)
425 .await?;
426
427 let query = format!(
428 r#"
429 INSERT INTO {} (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
430 VALUES ($1, $2, $3, $4::awa.job_state, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
431 RETURNING *
432 "#,
433 target_table
434 );
435 let result = sqlx::query_as::<_, JobRow>(&query)
436 .bind(&kind)
437 .bind(&queue)
438 .bind(&args)
439 .bind(&state)
440 .bind(priority)
441 .bind(max_attempts)
442 .bind(run_at)
443 .bind(&metadata)
444 .bind(&tags)
445 .bind(&unique_key)
446 .bind(&unique_states)
447 .fetch_one(&mut *conn)
448 .await;
449
450 match result {
451 Ok(row) => {
452 inserted.push(row);
453 sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
454 .execute(&mut *conn)
455 .await?;
456 }
457 Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("23505") => {
458 sqlx::query("ROLLBACK TO SAVEPOINT awa_copy_unique_row")
459 .execute(&mut *conn)
460 .await?;
461 sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
462 .execute(&mut *conn)
463 .await?;
464 continue;
465 }
466 Err(err) => {
467 sqlx::query("ROLLBACK TO SAVEPOINT awa_copy_unique_row")
468 .execute(&mut *conn)
469 .await?;
470 sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
471 .execute(&mut *conn)
472 .await?;
473 return Err(AwaError::Database(err));
474 }
475 }
476 }
477
478 inserted
479 } else {
480 let insert_sql = format!(
481 r#"
482 INSERT INTO {} (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
483 SELECT
484 s.kind,
485 s.queue,
486 s.args,
487 s.state::awa.job_state,
488 s.priority,
489 s.max_attempts,
490 COALESCE(s.run_at, now()),
491 s.metadata,
492 s.tags,
493 s.unique_key,
494 s.unique_states
495 FROM pg_temp.awa_copy_staging s
496 RETURNING *
497 "#,
498 target_table
499 );
500
501 sqlx::query_as::<_, JobRow>(&insert_sql)
502 .fetch_all(&mut *conn)
503 .await?
504 };
505
506 sqlx::query("DELETE FROM pg_temp.awa_copy_staging")
509 .execute(&mut *conn)
510 .await?;
511
512 Ok(results)
513}
514
515#[tracing::instrument(skip(pool, jobs), fields(job.count = jobs.len()))]
520pub async fn insert_many_copy_from_pool(
521 pool: &PgPool,
522 jobs: &[InsertParams],
523) -> Result<Vec<JobRow>, AwaError> {
524 if jobs.is_empty() {
525 return Ok(Vec::new());
526 }
527
528 let mut tx = pool.begin().await?;
529 let results = insert_many_copy(&mut tx, jobs).await?;
530 tx.commit().await?;
531
532 Ok(results)
533}
534
535fn write_csv_row(buf: &mut Vec<u8>, row: &PreparedRow) {
539 write_csv_field(buf, &row.kind);
541 buf.push(b',');
542 write_csv_field(buf, &row.queue);
544 buf.push(b',');
545 let args_str = serde_json::to_string(&row.args).expect("JSON serialization should not fail");
547 write_csv_field(buf, &args_str);
548 buf.push(b',');
549 write_csv_field(buf, &row.state.to_string());
551 buf.push(b',');
552 buf.extend_from_slice(row.priority.to_string().as_bytes());
554 buf.push(b',');
555 buf.extend_from_slice(row.max_attempts.to_string().as_bytes());
557 buf.push(b',');
558 match &row.run_at {
560 Some(dt) => write_csv_field(buf, &dt.to_rfc3339()),
561 None => buf.extend_from_slice(COPY_NULL_SENTINEL.as_bytes()),
562 }
563 buf.push(b',');
564 let metadata_str =
566 serde_json::to_string(&row.metadata).expect("JSON serialization should not fail");
567 write_csv_field(buf, &metadata_str);
568 buf.push(b',');
569 write_pg_text_array(buf, &row.tags);
571 buf.push(b',');
572 match &row.unique_key {
574 Some(key) => {
575 let bytea_hex = format!("\\x{}", hex::encode(key));
576 write_csv_field(buf, &bytea_hex);
577 }
578 None => buf.extend_from_slice(COPY_NULL_SENTINEL.as_bytes()),
579 }
580 buf.push(b',');
581 match &row.unique_states {
583 Some(bits) => write_csv_field(buf, bits),
584 None => buf.extend_from_slice(COPY_NULL_SENTINEL.as_bytes()),
585 }
586 buf.push(b'\n');
587}
588
589fn write_csv_field(buf: &mut Vec<u8>, value: &str) {
591 if value.contains(',')
592 || value.contains('"')
593 || value.contains('\n')
594 || value.contains('\r')
595 || value.contains('\\')
596 || value == COPY_NULL_SENTINEL
597 {
598 buf.push(b'"');
599 for byte in value.bytes() {
600 if byte == b'"' {
601 buf.push(b'"');
602 }
603 buf.push(byte);
604 }
605 buf.push(b'"');
606 } else {
607 buf.extend_from_slice(value.as_bytes());
608 }
609}
610
611fn write_pg_text_array(buf: &mut Vec<u8>, values: &[String]) {
614 buf.push(b'"');
615 buf.push(b'{');
616 for (i, val) in values.iter().enumerate() {
617 if i > 0 {
618 buf.push(b',');
619 }
620 if val.is_empty()
621 || val.contains(',')
622 || val.contains('"')
623 || val.contains('\\')
624 || val.contains('{')
625 || val.contains('}')
626 || val.contains(' ')
627 || val.eq_ignore_ascii_case("NULL")
628 {
629 buf.push(b'"');
630 buf.push(b'"');
631 for ch in val.chars() {
632 match ch {
633 '"' => buf.extend_from_slice(b"\\\"\""),
634 '\\' => buf.extend_from_slice(b"\\\\"),
635 _ => {
636 let mut utf8_buf = [0u8; 4];
637 buf.extend_from_slice(ch.encode_utf8(&mut utf8_buf).as_bytes());
638 }
639 }
640 }
641 buf.push(b'"');
642 buf.push(b'"');
643 } else {
644 buf.extend_from_slice(val.as_bytes());
645 }
646 }
647 buf.push(b'}');
648 buf.push(b'"');
649}
650
651pub fn params(args: &impl JobArgs) -> Result<InsertParams, AwaError> {
653 params_with(args, InsertOpts::default())
654}
655
656pub fn params_with(args: &impl JobArgs, opts: InsertOpts) -> Result<InsertParams, AwaError> {
658 Ok(InsertParams {
659 kind: args.kind_str().to_string(),
660 args: args.to_args()?,
661 opts,
662 })
663}