Skip to main content

awa_model/
insert.rs

1use crate::error::AwaError;
2use crate::job::{InsertOpts, InsertParams, JobRow, JobState};
3use crate::unique::compute_unique_key;
4use crate::JobArgs;
5use sqlx::postgres::PgConnection;
6use sqlx::{PgExecutor, PgPool};
7
8const COPY_NULL_SENTINEL: &str = "__AWA_NULL__";
9
10// ── Shared insert preparation ───────────────────────────────────────────
11//
12// Single source of truth for computing all derived insert values:
13// kind, serialized args, null-byte validation, state, unique_key,
14// unique_states. Used by:
15// - insert_with (single sqlx insert)
16// - precompute_row_values (batch insert_many / insert_many_copy)
17// - bridge adapters (tokio-postgres, etc.)
18
19/// Reject JSON values containing null bytes (`\u0000`), which Postgres
20/// JSONB does not support. Produces a clear validation error instead of
21/// an opaque database error.
22pub(crate) fn reject_null_bytes(value: &serde_json::Value) -> Result<(), AwaError> {
23    match value {
24        serde_json::Value::String(s) if s.contains('\0') => Err(AwaError::Validation(
25            "job args/metadata must not contain null bytes (\\u0000): Postgres JSONB does not support them".into(),
26        )),
27        serde_json::Value::Array(arr) => {
28            for v in arr {
29                reject_null_bytes(v)?;
30            }
31            Ok(())
32        }
33        serde_json::Value::Object(map) => {
34            for (k, v) in map {
35                if k.contains('\0') {
36                    return Err(AwaError::Validation(
37                        "job args/metadata keys must not contain null bytes (\\u0000)".into(),
38                    ));
39                }
40                reject_null_bytes(v)?;
41            }
42            Ok(())
43        }
44        _ => Ok(()),
45    }
46}
47
48/// Pre-computed values for a single job row, ready to bind into any driver.
49///
50/// This is the shared internal representation used by all insert paths.
51pub(crate) struct PreparedRow {
52    pub kind: String,
53    pub queue: String,
54    pub args: serde_json::Value,
55    pub state: JobState,
56    pub priority: i16,
57    pub max_attempts: i16,
58    pub run_at: Option<chrono::DateTime<chrono::Utc>>,
59    pub metadata: serde_json::Value,
60    pub tags: Vec<String>,
61    pub unique_key: Option<Vec<u8>>,
62    pub unique_states: Option<String>,
63}
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66enum TargetTable {
67    JobsHot,
68    ScheduledJobs,
69}
70
71impl TargetTable {
72    fn as_str(self) -> &'static str {
73        match self {
74            TargetTable::JobsHot => "awa.jobs_hot",
75            TargetTable::ScheduledJobs => "awa.scheduled_jobs",
76        }
77    }
78}
79
80fn target_table_for_state(state: JobState) -> TargetTable {
81    match state {
82        JobState::Scheduled | JobState::Retryable => TargetTable::ScheduledJobs,
83        _ => TargetTable::JobsHot,
84    }
85}
86
87fn homogeneous_target_table(rows: &[PreparedRow]) -> Option<TargetTable> {
88    let first = rows.first().map(|row| target_table_for_state(row.state))?;
89    rows.iter()
90        .all(|row| target_table_for_state(row.state) == first)
91        .then_some(first)
92}
93
94fn map_sqlx_error(err: sqlx::Error) -> AwaError {
95    if let sqlx::Error::Database(ref db_err) = err {
96        if db_err.code().as_deref() == Some("23505") {
97            return AwaError::UniqueConflict {
98                constraint: db_err.constraint().map(|c| c.to_string()),
99            };
100        }
101    }
102    AwaError::Database(err)
103}
104
105fn build_multi_insert_query(target_table: &str, count: usize) -> String {
106    let mut query = format!(
107        "INSERT INTO {} (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) VALUES ",
108        target_table
109    );
110
111    let params_per_row = 11u32;
112    let mut param_index = 1u32;
113    for i in 0..count {
114        if i > 0 {
115            query.push_str(", ");
116        }
117        query.push_str(&format!(
118            "(${}, ${}, ${}, ${}, ${}, ${}, COALESCE(${}, now()), ${}, ${}, ${}, ${}::bit(8))",
119            param_index,
120            param_index + 1,
121            param_index + 2,
122            param_index + 3,
123            param_index + 4,
124            param_index + 5,
125            param_index + 6,
126            param_index + 7,
127            param_index + 8,
128            param_index + 9,
129            param_index + 10,
130        ));
131        param_index += params_per_row;
132    }
133    query.push_str(" RETURNING *");
134    query
135}
136
137/// Compute unique_key and unique_states from opts.
138fn compute_unique_fields(
139    kind: &str,
140    args: &serde_json::Value,
141    opts: &InsertOpts,
142) -> (Option<Vec<u8>>, Option<String>) {
143    let unique_key = opts.unique.as_ref().map(|u| {
144        compute_unique_key(
145            kind,
146            if u.by_queue { Some(&opts.queue) } else { None },
147            if u.by_args { Some(args) } else { None },
148            u.by_period,
149        )
150    });
151
152    let unique_states = opts.unique.as_ref().map(|u| {
153        // Build a bit string where PG bit position N (leftmost = 0) corresponds
154        // to Rust bit N (least-significant = 0). PostgreSQL's get_bit(bitmask, N)
155        // reads from the left, so we place Rust bit 0 at the leftmost position.
156        let mut bit_string = String::with_capacity(8);
157        for bit_position in 0..8 {
158            if u.states & (1 << bit_position) != 0 {
159                bit_string.push('1');
160            } else {
161                bit_string.push('0');
162            }
163        }
164        bit_string
165    });
166
167    (unique_key, unique_states)
168}
169
170/// Prepare a single row from typed job args and options.
171///
172/// Validates null bytes, determines state, computes unique key.
173pub(crate) fn prepare_row(args: &impl JobArgs, opts: InsertOpts) -> Result<PreparedRow, AwaError> {
174    let kind = args.kind_str().to_string();
175    let args_value = args.to_args()?;
176    prepare_row_raw(kind, args_value, opts)
177}
178
179/// Prepare a single row from raw kind, JSON args, and options.
180pub(crate) fn prepare_row_raw(
181    kind: String,
182    args: serde_json::Value,
183    opts: InsertOpts,
184) -> Result<PreparedRow, AwaError> {
185    reject_null_bytes(&args)?;
186    reject_null_bytes(&opts.metadata)?;
187
188    let state = if opts.run_at.is_some() {
189        JobState::Scheduled
190    } else {
191        JobState::Available
192    };
193
194    let (unique_key, unique_states) = compute_unique_fields(&kind, &args, &opts);
195
196    Ok(PreparedRow {
197        kind,
198        queue: opts.queue,
199        args,
200        state,
201        priority: opts.priority,
202        max_attempts: opts.max_attempts,
203        run_at: opts.run_at,
204        metadata: opts.metadata,
205        tags: opts.tags,
206        unique_key,
207        unique_states,
208    })
209}
210
211// ── sqlx insert functions ───────────────────────────────────────────────
212
213/// Insert a job with default options.
214pub async fn insert<'e, E>(executor: E, args: &impl JobArgs) -> Result<JobRow, AwaError>
215where
216    E: PgExecutor<'e>,
217{
218    insert_with(executor, args, InsertOpts::default()).await
219}
220
221/// Insert a job with custom options.
222#[tracing::instrument(skip(executor, args), fields(job.kind = args.kind_str(), job.queue = %opts.queue))]
223pub async fn insert_with<'e, E>(
224    executor: E,
225    args: &impl JobArgs,
226    opts: InsertOpts,
227) -> Result<JobRow, AwaError>
228where
229    E: PgExecutor<'e>,
230{
231    let row = prepare_row(args, opts)?;
232    let query = format!(
233        r#"
234        INSERT INTO {} (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
235        VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
236        RETURNING *
237        "#,
238        target_table_for_state(row.state).as_str()
239    );
240
241    sqlx::query_as::<_, JobRow>(&query)
242        .bind(&row.kind)
243        .bind(&row.queue)
244        .bind(&row.args)
245        .bind(row.state)
246        .bind(row.priority)
247        .bind(row.max_attempts)
248        .bind(row.run_at)
249        .bind(&row.metadata)
250        .bind(&row.tags)
251        .bind(&row.unique_key)
252        .bind(&row.unique_states)
253        .fetch_one(executor)
254        .await
255        .map_err(map_sqlx_error)
256}
257
258/// Pre-compute all row values including unique keys from InsertParams.
259fn precompute_rows(jobs: &[InsertParams]) -> Result<Vec<PreparedRow>, AwaError> {
260    jobs.iter()
261        .map(|job| prepare_row_raw(job.kind.clone(), job.args.clone(), job.opts.clone()))
262        .collect()
263}
264
265/// Insert multiple jobs in a single statement.
266///
267/// Supports uniqueness constraints — jobs with `unique` opts will have their
268/// `unique_key` and `unique_states` computed and included.
269#[tracing::instrument(skip(executor, jobs), fields(job.count = jobs.len()))]
270pub async fn insert_many<'e, E>(executor: E, jobs: &[InsertParams]) -> Result<Vec<JobRow>, AwaError>
271where
272    E: PgExecutor<'e>,
273{
274    if jobs.is_empty() {
275        return Ok(Vec::new());
276    }
277
278    let rows = precompute_rows(jobs)?;
279    let target_table = homogeneous_target_table(&rows)
280        .map(TargetTable::as_str)
281        .unwrap_or("awa.jobs");
282    let query = build_multi_insert_query(target_table, rows.len());
283
284    let mut sql_query = sqlx::query_as::<_, JobRow>(&query);
285
286    for row in &rows {
287        sql_query = sql_query
288            .bind(&row.kind)
289            .bind(&row.queue)
290            .bind(&row.args)
291            .bind(row.state)
292            .bind(row.priority)
293            .bind(row.max_attempts)
294            .bind(row.run_at)
295            .bind(&row.metadata)
296            .bind(&row.tags)
297            .bind(&row.unique_key)
298            .bind(&row.unique_states);
299    }
300
301    let results = sql_query.fetch_all(executor).await?;
302
303    Ok(results)
304}
305
306/// Insert many jobs using COPY for high throughput.
307///
308/// Uses a temp staging table with no constraints for fast COPY ingestion,
309/// then INSERT...SELECT into `awa.jobs` with ON CONFLICT DO NOTHING for
310/// unique jobs. Accepts `&mut PgConnection` so callers can use pool
311/// connections or transactions (Transaction derefs to PgConnection).
312#[tracing::instrument(skip(conn, jobs), fields(job.count = jobs.len()))]
313pub async fn insert_many_copy(
314    conn: &mut PgConnection,
315    jobs: &[InsertParams],
316) -> Result<Vec<JobRow>, AwaError> {
317    if jobs.is_empty() {
318        return Ok(Vec::new());
319    }
320
321    let rows = precompute_rows(jobs)?;
322    let target_table = homogeneous_target_table(&rows)
323        .map(TargetTable::as_str)
324        .unwrap_or("awa.jobs");
325
326    // 1. Create or reuse a session-local staging table.
327    //
328    // Keeping the temp table structure across transactions avoids repeated
329    // catalog churn under concurrent producers while preserving transactional
330    // cleanup of staged rows at commit/rollback boundaries.
331    sqlx::query(
332        r#"
333        CREATE TEMP TABLE IF NOT EXISTS pg_temp.awa_copy_staging (
334            kind        TEXT NOT NULL,
335            queue       TEXT NOT NULL,
336            args        JSONB NOT NULL,
337            state       awa.job_state NOT NULL,
338            priority    SMALLINT NOT NULL,
339            max_attempts SMALLINT NOT NULL,
340            run_at      TIMESTAMPTZ,
341            metadata    JSONB NOT NULL,
342            tags        TEXT[] NOT NULL,
343            unique_key  BYTEA,
344            unique_states BIT(8)
345        ) ON COMMIT DELETE ROWS
346        "#,
347    )
348    .execute(&mut *conn)
349    .await?;
350
351    // 2. COPY data into staging table via CSV
352    let mut csv_buf = Vec::with_capacity(rows.len() * 256);
353    for row in &rows {
354        write_csv_row(&mut csv_buf, row);
355    }
356
357    let mut copy_in = conn
358        .copy_in_raw(
359            "COPY pg_temp.awa_copy_staging (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) FROM STDIN WITH (FORMAT csv, NULL '__AWA_NULL__')",
360        )
361        .await?;
362    copy_in.send(csv_buf).await?;
363    copy_in.finish().await?;
364
365    // 3. INSERT...SELECT from staging into real table
366    let has_unique = rows.iter().any(|r| r.unique_key.is_some());
367
368    let results = if has_unique {
369        // The compatibility `awa.jobs` surface is now a view backed by hot and
370        // deferred tables, so the old `ON CONFLICT` path is no longer available
371        // here. Keep COPY for staging/parsing, then insert unique rows one at a
372        // time and skip duplicates explicitly.
373        let staged_rows = sqlx::query_as::<
374            _,
375            (
376                String,
377                String,
378                serde_json::Value,
379                String,
380                i16,
381                i16,
382                Option<chrono::DateTime<chrono::Utc>>,
383                serde_json::Value,
384                Vec<String>,
385                Option<Vec<u8>>,
386                Option<String>,
387            ),
388        >(
389            r#"
390            SELECT
391                kind,
392                queue,
393                args,
394                state::text,
395                priority,
396                max_attempts,
397                run_at,
398                metadata,
399                tags,
400                unique_key,
401                unique_states::text
402            FROM pg_temp.awa_copy_staging
403            "#,
404        )
405        .fetch_all(&mut *conn)
406        .await?;
407
408        let mut inserted = Vec::with_capacity(staged_rows.len());
409        for (
410            kind,
411            queue,
412            args,
413            state,
414            priority,
415            max_attempts,
416            run_at,
417            metadata,
418            tags,
419            unique_key,
420            unique_states,
421        ) in staged_rows
422        {
423            sqlx::query("SAVEPOINT awa_copy_unique_row")
424                .execute(&mut *conn)
425                .await?;
426
427            let query = format!(
428                r#"
429                INSERT INTO {} (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
430                VALUES ($1, $2, $3, $4::awa.job_state, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
431                RETURNING *
432                "#,
433                target_table
434            );
435            let result = sqlx::query_as::<_, JobRow>(&query)
436                .bind(&kind)
437                .bind(&queue)
438                .bind(&args)
439                .bind(&state)
440                .bind(priority)
441                .bind(max_attempts)
442                .bind(run_at)
443                .bind(&metadata)
444                .bind(&tags)
445                .bind(&unique_key)
446                .bind(&unique_states)
447                .fetch_one(&mut *conn)
448                .await;
449
450            match result {
451                Ok(row) => {
452                    inserted.push(row);
453                    sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
454                        .execute(&mut *conn)
455                        .await?;
456                }
457                Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("23505") => {
458                    sqlx::query("ROLLBACK TO SAVEPOINT awa_copy_unique_row")
459                        .execute(&mut *conn)
460                        .await?;
461                    sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
462                        .execute(&mut *conn)
463                        .await?;
464                    continue;
465                }
466                Err(err) => {
467                    sqlx::query("ROLLBACK TO SAVEPOINT awa_copy_unique_row")
468                        .execute(&mut *conn)
469                        .await?;
470                    sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
471                        .execute(&mut *conn)
472                        .await?;
473                    return Err(AwaError::Database(err));
474                }
475            }
476        }
477
478        inserted
479    } else {
480        let insert_sql = format!(
481            r#"
482            INSERT INTO {} (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
483            SELECT
484                s.kind,
485                s.queue,
486                s.args,
487                s.state::awa.job_state,
488                s.priority,
489                s.max_attempts,
490                COALESCE(s.run_at, now()),
491                s.metadata,
492                s.tags,
493                s.unique_key,
494                s.unique_states
495            FROM pg_temp.awa_copy_staging s
496            RETURNING *
497        "#,
498            target_table
499        );
500
501        sqlx::query_as::<_, JobRow>(&insert_sql)
502            .fetch_all(&mut *conn)
503            .await?
504    };
505
506    // Keep the session-local staging table reusable across multiple COPY calls
507    // within the same outer transaction.
508    sqlx::query("DELETE FROM pg_temp.awa_copy_staging")
509        .execute(&mut *conn)
510        .await?;
511
512    Ok(results)
513}
514
515/// Convenience wrapper that acquires a connection from the pool.
516///
517/// Wraps the operation in a transaction so the staging rows are cleaned up at
518/// commit time even if the caller does not reuse the connection afterward.
519#[tracing::instrument(skip(pool, jobs), fields(job.count = jobs.len()))]
520pub async fn insert_many_copy_from_pool(
521    pool: &PgPool,
522    jobs: &[InsertParams],
523) -> Result<Vec<JobRow>, AwaError> {
524    if jobs.is_empty() {
525        return Ok(Vec::new());
526    }
527
528    let mut tx = pool.begin().await?;
529    let results = insert_many_copy(&mut tx, jobs).await?;
530    tx.commit().await?;
531
532    Ok(results)
533}
534
535// ── CSV serialization helpers ────────────────────────────────────────
536
537/// Write one PreparedRow as a CSV line to the buffer.
538fn write_csv_row(buf: &mut Vec<u8>, row: &PreparedRow) {
539    // kind
540    write_csv_field(buf, &row.kind);
541    buf.push(b',');
542    // queue
543    write_csv_field(buf, &row.queue);
544    buf.push(b',');
545    // args (JSONB as text)
546    let args_str = serde_json::to_string(&row.args).expect("JSON serialization should not fail");
547    write_csv_field(buf, &args_str);
548    buf.push(b',');
549    // state
550    write_csv_field(buf, &row.state.to_string());
551    buf.push(b',');
552    // priority
553    buf.extend_from_slice(row.priority.to_string().as_bytes());
554    buf.push(b',');
555    // max_attempts
556    buf.extend_from_slice(row.max_attempts.to_string().as_bytes());
557    buf.push(b',');
558    // run_at (TIMESTAMPTZ as RFC 3339, or the COPY null sentinel)
559    match &row.run_at {
560        Some(dt) => write_csv_field(buf, &dt.to_rfc3339()),
561        None => buf.extend_from_slice(COPY_NULL_SENTINEL.as_bytes()),
562    }
563    buf.push(b',');
564    // metadata (JSONB as text)
565    let metadata_str =
566        serde_json::to_string(&row.metadata).expect("JSON serialization should not fail");
567    write_csv_field(buf, &metadata_str);
568    buf.push(b',');
569    // tags (Postgres text[] literal)
570    write_pg_text_array(buf, &row.tags);
571    buf.push(b',');
572    // unique_key (bytea hex format, or the COPY null sentinel)
573    match &row.unique_key {
574        Some(key) => {
575            let bytea_hex = format!("\\x{}", hex::encode(key));
576            write_csv_field(buf, &bytea_hex);
577        }
578        None => buf.extend_from_slice(COPY_NULL_SENTINEL.as_bytes()),
579    }
580    buf.push(b',');
581    // unique_states (bit string, or the COPY null sentinel)
582    match &row.unique_states {
583        Some(bits) => write_csv_field(buf, bits),
584        None => buf.extend_from_slice(COPY_NULL_SENTINEL.as_bytes()),
585    }
586    buf.push(b'\n');
587}
588
589/// Write a CSV field, quoting if it contains special characters.
590fn write_csv_field(buf: &mut Vec<u8>, value: &str) {
591    if value.contains(',')
592        || value.contains('"')
593        || value.contains('\n')
594        || value.contains('\r')
595        || value.contains('\\')
596        || value == COPY_NULL_SENTINEL
597    {
598        buf.push(b'"');
599        for byte in value.bytes() {
600            if byte == b'"' {
601                buf.push(b'"');
602            }
603            buf.push(byte);
604        }
605        buf.push(b'"');
606    } else {
607        buf.extend_from_slice(value.as_bytes());
608    }
609}
610
611/// Write a Postgres text[] array literal: `{elem1,"elem with , comma"}`.
612/// The entire literal is CSV-quoted because it always contains braces.
613fn write_pg_text_array(buf: &mut Vec<u8>, values: &[String]) {
614    buf.push(b'"');
615    buf.push(b'{');
616    for (i, val) in values.iter().enumerate() {
617        if i > 0 {
618            buf.push(b',');
619        }
620        if val.is_empty()
621            || val.contains(',')
622            || val.contains('"')
623            || val.contains('\\')
624            || val.contains('{')
625            || val.contains('}')
626            || val.contains(' ')
627            || val.eq_ignore_ascii_case("NULL")
628        {
629            buf.push(b'"');
630            buf.push(b'"');
631            for ch in val.chars() {
632                match ch {
633                    '"' => buf.extend_from_slice(b"\\\"\""),
634                    '\\' => buf.extend_from_slice(b"\\\\"),
635                    _ => {
636                        let mut utf8_buf = [0u8; 4];
637                        buf.extend_from_slice(ch.encode_utf8(&mut utf8_buf).as_bytes());
638                    }
639                }
640            }
641            buf.push(b'"');
642            buf.push(b'"');
643        } else {
644            buf.extend_from_slice(val.as_bytes());
645        }
646    }
647    buf.push(b'}');
648    buf.push(b'"');
649}
650
651/// Convenience: create InsertParams from a JobArgs impl.
652pub fn params(args: &impl JobArgs) -> Result<InsertParams, AwaError> {
653    params_with(args, InsertOpts::default())
654}
655
656/// Convenience: create InsertParams from a JobArgs impl with options.
657pub fn params_with(args: &impl JobArgs, opts: InsertOpts) -> Result<InsertParams, AwaError> {
658    Ok(InsertParams {
659        kind: args.kind_str().to_string(),
660        args: args.to_args()?,
661        opts,
662    })
663}