Skip to main content

awa_model/
insert.rs

1use crate::error::AwaError;
2use crate::job::{InsertOpts, InsertParams, JobRow, JobState};
3use crate::unique::compute_unique_key;
4use crate::JobArgs;
5use sqlx::postgres::PgConnection;
6use sqlx::{PgExecutor, PgPool};
7
8/// Insert a job with default options.
9pub async fn insert<'e, E>(executor: E, args: &impl JobArgs) -> Result<JobRow, AwaError>
10where
11    E: PgExecutor<'e>,
12{
13    insert_with(executor, args, InsertOpts::default()).await
14}
15
16/// Insert a job with custom options.
17#[tracing::instrument(skip(executor, args), fields(job.kind = args.kind_str(), job.queue = %opts.queue))]
18pub async fn insert_with<'e, E>(
19    executor: E,
20    args: &impl JobArgs,
21    opts: InsertOpts,
22) -> Result<JobRow, AwaError>
23where
24    E: PgExecutor<'e>,
25{
26    let kind = args.kind_str();
27    let args_json = args.to_args()?;
28
29    let state = if opts.run_at.is_some() {
30        JobState::Scheduled
31    } else {
32        JobState::Available
33    };
34
35    let unique_key = opts.unique.as_ref().map(|u| {
36        compute_unique_key(
37            kind,
38            if u.by_queue { Some(&opts.queue) } else { None },
39            if u.by_args { Some(&args_json) } else { None },
40            u.by_period,
41        )
42    });
43
44    let unique_states_bits: Option<String> = opts.unique.as_ref().map(|u| {
45        // Build a bit string where PG bit position N (leftmost = 0) corresponds
46        // to Rust bit N (least-significant = 0). PostgreSQL's get_bit(bitmask, N)
47        // reads from the left, so we place Rust bit 0 at the leftmost position.
48        let mut bit_string = String::with_capacity(8);
49        for bit_position in 0..8 {
50            if u.states & (1 << bit_position) != 0 {
51                bit_string.push('1');
52            } else {
53                bit_string.push('0');
54            }
55        }
56        bit_string
57    });
58
59    let row = sqlx::query_as::<_, JobRow>(
60        r#"
61        INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
62        VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
63        RETURNING *
64        "#,
65    )
66    .bind(kind)
67    .bind(&opts.queue)
68    .bind(&args_json)
69    .bind(state)
70    .bind(opts.priority)
71    .bind(opts.max_attempts)
72    .bind(opts.run_at)
73    .bind(&opts.metadata)
74    .bind(&opts.tags)
75    .bind(&unique_key)
76    .bind(&unique_states_bits)
77    .fetch_one(executor)
78    .await
79    .map_err(|err| {
80        if let sqlx::Error::Database(ref db_err) = err {
81            if db_err.code().as_deref() == Some("23505") {
82                // Unique constraint violation. The conflicting row ID isn't
83                // available from the PG error message directly — callers can
84                // query by unique_key if they need it.
85                return AwaError::UniqueConflict {
86                    constraint: db_err
87                        .constraint()
88                        .map(|c| c.to_string()),
89                };
90            }
91        }
92        AwaError::Database(err)
93    })?;
94
95    Ok(row)
96}
97
98/// Pre-computed values for a single job row, shared by `insert_many` and `insert_many_copy`.
99struct RowValues {
100    kind: String,
101    queue: String,
102    args: serde_json::Value,
103    state: JobState,
104    priority: i16,
105    max_attempts: i16,
106    run_at: Option<chrono::DateTime<chrono::Utc>>,
107    metadata: serde_json::Value,
108    tags: Vec<String>,
109    unique_key: Option<Vec<u8>>,
110    unique_states: Option<String>,
111}
112
113/// Pre-compute all row values including unique keys from InsertParams.
114fn precompute_row_values(jobs: &[InsertParams]) -> Vec<RowValues> {
115    jobs.iter()
116        .map(|job| {
117            let unique_key = job.opts.unique.as_ref().map(|u| {
118                compute_unique_key(
119                    &job.kind,
120                    if u.by_queue {
121                        Some(job.opts.queue.as_str())
122                    } else {
123                        None
124                    },
125                    if u.by_args { Some(&job.args) } else { None },
126                    u.by_period,
127                )
128            });
129
130            let unique_states = job.opts.unique.as_ref().map(|u| {
131                let mut bit_string = String::with_capacity(8);
132                for bit_position in 0..8 {
133                    if u.states & (1 << bit_position) != 0 {
134                        bit_string.push('1');
135                    } else {
136                        bit_string.push('0');
137                    }
138                }
139                bit_string
140            });
141
142            RowValues {
143                kind: job.kind.clone(),
144                queue: job.opts.queue.clone(),
145                args: job.args.clone(),
146                state: if job.opts.run_at.is_some() {
147                    JobState::Scheduled
148                } else {
149                    JobState::Available
150                },
151                priority: job.opts.priority,
152                max_attempts: job.opts.max_attempts,
153                run_at: job.opts.run_at,
154                metadata: job.opts.metadata.clone(),
155                tags: job.opts.tags.clone(),
156                unique_key,
157                unique_states,
158            }
159        })
160        .collect()
161}
162
163/// Insert multiple jobs in a single statement.
164///
165/// Supports uniqueness constraints — jobs with `unique` opts will have their
166/// `unique_key` and `unique_states` computed and included.
167#[tracing::instrument(skip(executor, jobs), fields(job.count = jobs.len()))]
168pub async fn insert_many<'e, E>(executor: E, jobs: &[InsertParams]) -> Result<Vec<JobRow>, AwaError>
169where
170    E: PgExecutor<'e>,
171{
172    if jobs.is_empty() {
173        return Ok(Vec::new());
174    }
175
176    let count = jobs.len();
177    let rows = precompute_row_values(jobs);
178
179    // Build multi-row INSERT with all columns including unique_key/unique_states
180    let mut query = String::from(
181        "INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) VALUES ",
182    );
183
184    let params_per_row = 11u32;
185    let mut param_index = 1u32;
186    for i in 0..count {
187        if i > 0 {
188            query.push_str(", ");
189        }
190        query.push_str(&format!(
191            "(${}, ${}, ${}, ${}, ${}, ${}, COALESCE(${}, now()), ${}, ${}, ${}, ${}::bit(8))",
192            param_index,
193            param_index + 1,
194            param_index + 2,
195            param_index + 3,
196            param_index + 4,
197            param_index + 5,
198            param_index + 6,
199            param_index + 7,
200            param_index + 8,
201            param_index + 9,
202            param_index + 10,
203        ));
204        param_index += params_per_row;
205    }
206    query.push_str(" RETURNING *");
207
208    let mut sql_query = sqlx::query_as::<_, JobRow>(&query);
209
210    for row in &rows {
211        sql_query = sql_query
212            .bind(&row.kind)
213            .bind(&row.queue)
214            .bind(&row.args)
215            .bind(row.state)
216            .bind(row.priority)
217            .bind(row.max_attempts)
218            .bind(row.run_at)
219            .bind(&row.metadata)
220            .bind(&row.tags)
221            .bind(&row.unique_key)
222            .bind(&row.unique_states);
223    }
224
225    let results = sql_query.fetch_all(executor).await?;
226
227    Ok(results)
228}
229
230/// Insert many jobs using COPY for high throughput.
231///
232/// Uses a temp staging table with no constraints for fast COPY ingestion,
233/// then INSERT...SELECT into `awa.jobs` with ON CONFLICT DO NOTHING for
234/// unique jobs. Accepts `&mut PgConnection` so callers can use pool
235/// connections or transactions (Transaction derefs to PgConnection).
236#[tracing::instrument(skip(conn, jobs), fields(job.count = jobs.len()))]
237pub async fn insert_many_copy(
238    conn: &mut PgConnection,
239    jobs: &[InsertParams],
240) -> Result<Vec<JobRow>, AwaError> {
241    if jobs.is_empty() {
242        return Ok(Vec::new());
243    }
244
245    let rows = precompute_row_values(jobs);
246
247    // 1. Create temp staging table (dropped on transaction commit)
248    sqlx::query(
249        r#"
250        CREATE TEMP TABLE awa_copy_staging (
251            kind        TEXT NOT NULL,
252            queue       TEXT NOT NULL,
253            args        JSONB NOT NULL,
254            state       TEXT NOT NULL,
255            priority    SMALLINT NOT NULL,
256            max_attempts SMALLINT NOT NULL,
257            run_at      TEXT,
258            metadata    JSONB NOT NULL,
259            tags        TEXT NOT NULL,
260            unique_key  TEXT,
261            unique_states TEXT
262        ) ON COMMIT DROP
263        "#,
264    )
265    .execute(&mut *conn)
266    .await?;
267
268    // 2. COPY data into staging table via CSV
269    let mut csv_buf = Vec::with_capacity(rows.len() * 256);
270    for row in &rows {
271        write_csv_row(&mut csv_buf, row);
272    }
273
274    let mut copy_in = conn
275        .copy_in_raw(
276            "COPY awa_copy_staging (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) FROM STDIN WITH (FORMAT csv, NULL '\\N')",
277        )
278        .await?;
279    copy_in.send(csv_buf).await?;
280    copy_in.finish().await?;
281
282    // 3. INSERT...SELECT from staging into real table
283    let has_unique = rows.iter().any(|r| r.unique_key.is_some());
284
285    let results = if has_unique {
286        // The compatibility `awa.jobs` surface is now a view backed by hot and
287        // deferred tables, so the old `ON CONFLICT` path is no longer available
288        // here. Keep COPY for staging/parsing, then insert unique rows one at a
289        // time and skip duplicates explicitly.
290        let staged_rows = sqlx::query_as::<
291            _,
292            (
293                String,
294                String,
295                serde_json::Value,
296                String,
297                i16,
298                i16,
299                Option<chrono::DateTime<chrono::Utc>>,
300                serde_json::Value,
301                Vec<String>,
302                Option<Vec<u8>>,
303                Option<String>,
304            ),
305        >(
306            r#"
307            SELECT
308                kind,
309                queue,
310                args,
311                state,
312                priority,
313                max_attempts,
314                CASE WHEN run_at = '\N' OR run_at IS NULL THEN NULL ELSE run_at::timestamptz END,
315                metadata,
316                tags::text[],
317                CASE WHEN unique_key = '\N' OR unique_key IS NULL THEN NULL ELSE decode(unique_key, 'hex') END,
318                unique_states
319            FROM awa_copy_staging
320            "#,
321        )
322        .fetch_all(&mut *conn)
323        .await?;
324
325        let mut inserted = Vec::with_capacity(staged_rows.len());
326        for (
327            kind,
328            queue,
329            args,
330            state,
331            priority,
332            max_attempts,
333            run_at,
334            metadata,
335            tags,
336            unique_key,
337            unique_states,
338        ) in staged_rows
339        {
340            sqlx::query("SAVEPOINT awa_copy_unique_row")
341                .execute(&mut *conn)
342                .await?;
343
344            let result = sqlx::query_as::<_, JobRow>(
345                r#"
346                INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
347                VALUES ($1, $2, $3, $4::awa.job_state, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
348                RETURNING *
349                "#,
350            )
351            .bind(&kind)
352            .bind(&queue)
353            .bind(&args)
354            .bind(&state)
355            .bind(priority)
356            .bind(max_attempts)
357            .bind(run_at)
358            .bind(&metadata)
359            .bind(&tags)
360            .bind(&unique_key)
361            .bind(&unique_states)
362            .fetch_one(&mut *conn)
363            .await;
364
365            match result {
366                Ok(row) => {
367                    inserted.push(row);
368                    sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
369                        .execute(&mut *conn)
370                        .await?;
371                }
372                Err(sqlx::Error::Database(db_err)) if db_err.code().as_deref() == Some("23505") => {
373                    sqlx::query("ROLLBACK TO SAVEPOINT awa_copy_unique_row")
374                        .execute(&mut *conn)
375                        .await?;
376                    sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
377                        .execute(&mut *conn)
378                        .await?;
379                    continue;
380                }
381                Err(err) => {
382                    sqlx::query("ROLLBACK TO SAVEPOINT awa_copy_unique_row")
383                        .execute(&mut *conn)
384                        .await?;
385                    sqlx::query("RELEASE SAVEPOINT awa_copy_unique_row")
386                        .execute(&mut *conn)
387                        .await?;
388                    return Err(AwaError::Database(err));
389                }
390            }
391        }
392
393        inserted
394    } else {
395        let insert_sql = r#"
396            INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
397            SELECT
398                s.kind,
399                s.queue,
400                s.args,
401                s.state::awa.job_state,
402                s.priority,
403                s.max_attempts,
404                CASE WHEN s.run_at = '\N' OR s.run_at IS NULL THEN now() ELSE s.run_at::timestamptz END,
405                s.metadata,
406                s.tags::text[],
407                CASE WHEN s.unique_key = '\N' OR s.unique_key IS NULL THEN NULL ELSE decode(s.unique_key, 'hex') END,
408                CASE WHEN s.unique_states = '\N' OR s.unique_states IS NULL THEN NULL ELSE s.unique_states::bit(8) END
409            FROM awa_copy_staging s
410            RETURNING *
411        "#;
412
413        sqlx::query_as::<_, JobRow>(insert_sql)
414            .fetch_all(&mut *conn)
415            .await?
416    };
417
418    Ok(results)
419}
420
421/// Convenience wrapper that acquires a connection from the pool.
422///
423/// Wraps the operation in a transaction so the ON COMMIT DROP staging table
424/// is cleaned up automatically.
425#[tracing::instrument(skip(pool, jobs), fields(job.count = jobs.len()))]
426pub async fn insert_many_copy_from_pool(
427    pool: &PgPool,
428    jobs: &[InsertParams],
429) -> Result<Vec<JobRow>, AwaError> {
430    if jobs.is_empty() {
431        return Ok(Vec::new());
432    }
433
434    let mut tx = pool.begin().await?;
435    let results = insert_many_copy(&mut tx, jobs).await?;
436    tx.commit().await?;
437
438    Ok(results)
439}
440
441// ── CSV serialization helpers ────────────────────────────────────────
442
443/// Write one RowValues as a CSV line to the buffer.
444fn write_csv_row(buf: &mut Vec<u8>, row: &RowValues) {
445    // kind
446    write_csv_field(buf, &row.kind);
447    buf.push(b',');
448    // queue
449    write_csv_field(buf, &row.queue);
450    buf.push(b',');
451    // args (JSONB as text)
452    let args_str = serde_json::to_string(&row.args).expect("JSON serialization should not fail");
453    write_csv_field(buf, &args_str);
454    buf.push(b',');
455    // state
456    write_csv_field(buf, &row.state.to_string());
457    buf.push(b',');
458    // priority
459    buf.extend_from_slice(row.priority.to_string().as_bytes());
460    buf.push(b',');
461    // max_attempts
462    buf.extend_from_slice(row.max_attempts.to_string().as_bytes());
463    buf.push(b',');
464    // run_at (TIMESTAMPTZ as RFC 3339, or \N for NULL)
465    match &row.run_at {
466        Some(dt) => write_csv_field(buf, &dt.to_rfc3339()),
467        None => buf.extend_from_slice(b"\\N"),
468    }
469    buf.push(b',');
470    // metadata (JSONB as text)
471    let metadata_str =
472        serde_json::to_string(&row.metadata).expect("JSON serialization should not fail");
473    write_csv_field(buf, &metadata_str);
474    buf.push(b',');
475    // tags (Postgres text[] literal)
476    write_pg_text_array(buf, &row.tags);
477    buf.push(b',');
478    // unique_key (hex-encoded bytes, or \N for NULL)
479    match &row.unique_key {
480        Some(key) => {
481            // Encode as hex string (no \x prefix — we use decode(hex) in SQL)
482            let hex = hex::encode(key);
483            write_csv_field(buf, &hex);
484        }
485        None => buf.extend_from_slice(b"\\N"),
486    }
487    buf.push(b',');
488    // unique_states (bit string, or \N for NULL)
489    match &row.unique_states {
490        Some(bits) => write_csv_field(buf, bits),
491        None => buf.extend_from_slice(b"\\N"),
492    }
493    buf.push(b'\n');
494}
495
496/// Write a CSV field, quoting if it contains special characters.
497fn write_csv_field(buf: &mut Vec<u8>, value: &str) {
498    if value.contains(',')
499        || value.contains('"')
500        || value.contains('\n')
501        || value.contains('\r')
502        || value.contains('\\')
503    {
504        buf.push(b'"');
505        for byte in value.bytes() {
506            if byte == b'"' {
507                buf.push(b'"');
508            }
509            buf.push(byte);
510        }
511        buf.push(b'"');
512    } else {
513        buf.extend_from_slice(value.as_bytes());
514    }
515}
516
517/// Write a Postgres text[] array literal: `{elem1,"elem with , comma"}`.
518/// The entire literal is CSV-quoted because it always contains braces.
519fn write_pg_text_array(buf: &mut Vec<u8>, values: &[String]) {
520    buf.push(b'"');
521    buf.push(b'{');
522    for (i, val) in values.iter().enumerate() {
523        if i > 0 {
524            buf.push(b',');
525        }
526        // Postgres array elements need quoting if they contain special chars
527        if val.is_empty()
528            || val.contains(',')
529            || val.contains('"')
530            || val.contains('\\')
531            || val.contains('{')
532            || val.contains('}')
533            || val.contains(' ')
534            || val.eq_ignore_ascii_case("NULL")
535        {
536            // Double-quote the element inside the Postgres array literal.
537            // Inside CSV, the outer " are already handled; we need to
538            // double-quote for both Postgres and CSV escaping.
539            buf.push(b'"');
540            buf.push(b'"');
541            for ch in val.chars() {
542                match ch {
543                    '"' => {
544                        // Postgres array: \" but inside CSV: "" per quote
545                        // So we emit \""  → but CSV sees \"" which is wrong.
546                        // Correct approach: inside a CSV-quoted field,
547                        // literal " becomes "". Inside PG array, " becomes \".
548                        // Combined: \\\"\"  ... this gets complex.
549                        // Simpler: PG array uses \"  and CSV doubles " to "".
550                        // PG inside CSV: \""  (PG backslash-quote, CSV double-quote)
551                        buf.extend_from_slice(b"\\\"\"");
552                    }
553                    '\\' => {
554                        // PG array backslash: \\, but inside CSV " is doubled
555                        buf.extend_from_slice(b"\\\\");
556                    }
557                    _ => {
558                        let mut utf8_buf = [0u8; 4];
559                        buf.extend_from_slice(ch.encode_utf8(&mut utf8_buf).as_bytes());
560                    }
561                }
562            }
563            buf.push(b'"');
564            buf.push(b'"');
565        } else {
566            buf.extend_from_slice(val.as_bytes());
567        }
568    }
569    buf.push(b'}');
570    buf.push(b'"');
571}
572
573/// Convenience: create InsertParams from a JobArgs impl.
574pub fn params(args: &impl JobArgs) -> Result<InsertParams, AwaError> {
575    params_with(args, InsertOpts::default())
576}
577
578/// Convenience: create InsertParams from a JobArgs impl with options.
579pub fn params_with(args: &impl JobArgs, opts: InsertOpts) -> Result<InsertParams, AwaError> {
580    Ok(InsertParams {
581        kind: args.kind_str().to_string(),
582        args: args.to_args()?,
583        opts,
584    })
585}