Skip to main content

awa_model/
insert.rs

1use crate::error::AwaError;
2use crate::job::{InsertOpts, InsertParams, JobRow, JobState};
3use crate::unique::compute_unique_key;
4use crate::JobArgs;
5use sqlx::postgres::PgConnection;
6use sqlx::{PgExecutor, PgPool};
7
8/// Insert a job with default options.
9pub async fn insert<'e, E>(executor: E, args: &impl JobArgs) -> Result<JobRow, AwaError>
10where
11    E: PgExecutor<'e>,
12{
13    insert_with(executor, args, InsertOpts::default()).await
14}
15
16/// Insert a job with custom options.
17#[tracing::instrument(skip(executor, args), fields(job.kind = args.kind_str(), job.queue = %opts.queue))]
18pub async fn insert_with<'e, E>(
19    executor: E,
20    args: &impl JobArgs,
21    opts: InsertOpts,
22) -> Result<JobRow, AwaError>
23where
24    E: PgExecutor<'e>,
25{
26    let kind = args.kind_str();
27    let args_json = args.to_args()?;
28
29    let state = if opts.run_at.is_some() {
30        JobState::Scheduled
31    } else {
32        JobState::Available
33    };
34
35    let unique_key = opts.unique.as_ref().map(|u| {
36        compute_unique_key(
37            kind,
38            if u.by_queue { Some(&opts.queue) } else { None },
39            if u.by_args { Some(&args_json) } else { None },
40            u.by_period,
41        )
42    });
43
44    let unique_states_bits: Option<String> = opts.unique.as_ref().map(|u| {
45        // Build a bit string where PG bit position N (leftmost = 0) corresponds
46        // to Rust bit N (least-significant = 0). PostgreSQL's get_bit(bitmask, N)
47        // reads from the left, so we place Rust bit 0 at the leftmost position.
48        let mut bit_string = String::with_capacity(8);
49        for bit_position in 0..8 {
50            if u.states & (1 << bit_position) != 0 {
51                bit_string.push('1');
52            } else {
53                bit_string.push('0');
54            }
55        }
56        bit_string
57    });
58
59    let row = sqlx::query_as::<_, JobRow>(
60        r#"
61        INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
62        VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
63        RETURNING *
64        "#,
65    )
66    .bind(kind)
67    .bind(&opts.queue)
68    .bind(&args_json)
69    .bind(state)
70    .bind(opts.priority)
71    .bind(opts.max_attempts)
72    .bind(opts.run_at)
73    .bind(&opts.metadata)
74    .bind(&opts.tags)
75    .bind(&unique_key)
76    .bind(&unique_states_bits)
77    .fetch_one(executor)
78    .await
79    .map_err(|err| {
80        if let sqlx::Error::Database(ref db_err) = err {
81            if db_err.code().as_deref() == Some("23505") {
82                // Unique constraint violation. The conflicting row ID isn't
83                // available from the PG error message directly — callers can
84                // query by unique_key if they need it.
85                return AwaError::UniqueConflict {
86                    constraint: db_err
87                        .constraint()
88                        .map(|c| c.to_string()),
89                };
90            }
91        }
92        AwaError::Database(err)
93    })?;
94
95    Ok(row)
96}
97
98/// Pre-computed values for a single job row, shared by `insert_many` and `insert_many_copy`.
99struct RowValues {
100    kind: String,
101    queue: String,
102    args: serde_json::Value,
103    state: JobState,
104    priority: i16,
105    max_attempts: i16,
106    run_at: Option<chrono::DateTime<chrono::Utc>>,
107    metadata: serde_json::Value,
108    tags: Vec<String>,
109    unique_key: Option<Vec<u8>>,
110    unique_states: Option<String>,
111}
112
113/// Pre-compute all row values including unique keys from InsertParams.
114fn precompute_row_values(jobs: &[InsertParams]) -> Vec<RowValues> {
115    jobs.iter()
116        .map(|job| {
117            let unique_key = job.opts.unique.as_ref().map(|u| {
118                compute_unique_key(
119                    &job.kind,
120                    if u.by_queue {
121                        Some(job.opts.queue.as_str())
122                    } else {
123                        None
124                    },
125                    if u.by_args { Some(&job.args) } else { None },
126                    u.by_period,
127                )
128            });
129
130            let unique_states = job.opts.unique.as_ref().map(|u| {
131                let mut bit_string = String::with_capacity(8);
132                for bit_position in 0..8 {
133                    if u.states & (1 << bit_position) != 0 {
134                        bit_string.push('1');
135                    } else {
136                        bit_string.push('0');
137                    }
138                }
139                bit_string
140            });
141
142            RowValues {
143                kind: job.kind.clone(),
144                queue: job.opts.queue.clone(),
145                args: job.args.clone(),
146                state: if job.opts.run_at.is_some() {
147                    JobState::Scheduled
148                } else {
149                    JobState::Available
150                },
151                priority: job.opts.priority,
152                max_attempts: job.opts.max_attempts,
153                run_at: job.opts.run_at,
154                metadata: job.opts.metadata.clone(),
155                tags: job.opts.tags.clone(),
156                unique_key,
157                unique_states,
158            }
159        })
160        .collect()
161}
162
163/// Insert multiple jobs in a single statement.
164///
165/// Supports uniqueness constraints — jobs with `unique` opts will have their
166/// `unique_key` and `unique_states` computed and included.
167#[tracing::instrument(skip(executor, jobs), fields(job.count = jobs.len()))]
168pub async fn insert_many<'e, E>(executor: E, jobs: &[InsertParams]) -> Result<Vec<JobRow>, AwaError>
169where
170    E: PgExecutor<'e>,
171{
172    if jobs.is_empty() {
173        return Ok(Vec::new());
174    }
175
176    let count = jobs.len();
177    let rows = precompute_row_values(jobs);
178
179    // Build multi-row INSERT with all columns including unique_key/unique_states
180    let mut query = String::from(
181        "INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) VALUES ",
182    );
183
184    let params_per_row = 11u32;
185    let mut param_index = 1u32;
186    for i in 0..count {
187        if i > 0 {
188            query.push_str(", ");
189        }
190        query.push_str(&format!(
191            "(${}, ${}, ${}, ${}, ${}, ${}, COALESCE(${}, now()), ${}, ${}, ${}, ${}::bit(8))",
192            param_index,
193            param_index + 1,
194            param_index + 2,
195            param_index + 3,
196            param_index + 4,
197            param_index + 5,
198            param_index + 6,
199            param_index + 7,
200            param_index + 8,
201            param_index + 9,
202            param_index + 10,
203        ));
204        param_index += params_per_row;
205    }
206    query.push_str(" RETURNING *");
207
208    let mut sql_query = sqlx::query_as::<_, JobRow>(&query);
209
210    for row in &rows {
211        sql_query = sql_query
212            .bind(&row.kind)
213            .bind(&row.queue)
214            .bind(&row.args)
215            .bind(row.state)
216            .bind(row.priority)
217            .bind(row.max_attempts)
218            .bind(row.run_at)
219            .bind(&row.metadata)
220            .bind(&row.tags)
221            .bind(&row.unique_key)
222            .bind(&row.unique_states);
223    }
224
225    let results = sql_query.fetch_all(executor).await?;
226
227    Ok(results)
228}
229
230/// Insert many jobs using COPY for high throughput.
231///
232/// Uses a temp staging table with no constraints for fast COPY ingestion,
233/// then INSERT...SELECT into `awa.jobs` with ON CONFLICT DO NOTHING for
234/// unique jobs. Accepts `&mut PgConnection` so callers can use pool
235/// connections or transactions (Transaction derefs to PgConnection).
236#[tracing::instrument(skip(conn, jobs), fields(job.count = jobs.len()))]
237pub async fn insert_many_copy(
238    conn: &mut PgConnection,
239    jobs: &[InsertParams],
240) -> Result<Vec<JobRow>, AwaError> {
241    if jobs.is_empty() {
242        return Ok(Vec::new());
243    }
244
245    let rows = precompute_row_values(jobs);
246
247    // 1. Create temp staging table (dropped on transaction commit)
248    sqlx::query(
249        r#"
250        CREATE TEMP TABLE awa_copy_staging (
251            kind        TEXT NOT NULL,
252            queue       TEXT NOT NULL,
253            args        JSONB NOT NULL,
254            state       TEXT NOT NULL,
255            priority    SMALLINT NOT NULL,
256            max_attempts SMALLINT NOT NULL,
257            run_at      TEXT,
258            metadata    JSONB NOT NULL,
259            tags        TEXT NOT NULL,
260            unique_key  TEXT,
261            unique_states TEXT
262        ) ON COMMIT DROP
263        "#,
264    )
265    .execute(&mut *conn)
266    .await?;
267
268    // 2. COPY data into staging table via CSV
269    let mut csv_buf = Vec::with_capacity(rows.len() * 256);
270    for row in &rows {
271        write_csv_row(&mut csv_buf, row);
272    }
273
274    let mut copy_in = conn
275        .copy_in_raw(
276            "COPY awa_copy_staging (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) FROM STDIN WITH (FORMAT csv, NULL '\\N')",
277        )
278        .await?;
279    copy_in.send(csv_buf).await?;
280    copy_in.finish().await?;
281
282    // 3. INSERT...SELECT from staging into real table
283    let has_unique = rows.iter().any(|r| r.unique_key.is_some());
284
285    let insert_sql = if has_unique {
286        r#"
287        INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
288        SELECT
289            s.kind,
290            s.queue,
291            s.args,
292            s.state::awa.job_state,
293            s.priority,
294            s.max_attempts,
295            CASE WHEN s.run_at = '\N' OR s.run_at IS NULL THEN now() ELSE s.run_at::timestamptz END,
296            s.metadata,
297            s.tags::text[],
298            CASE WHEN s.unique_key = '\N' OR s.unique_key IS NULL THEN NULL ELSE decode(s.unique_key, 'hex') END,
299            CASE WHEN s.unique_states = '\N' OR s.unique_states IS NULL THEN NULL ELSE s.unique_states::bit(8) END
300        FROM awa_copy_staging s
301        ON CONFLICT (unique_key) WHERE unique_key IS NOT NULL
302            AND unique_states IS NOT NULL
303            AND awa.job_state_in_bitmask(unique_states, state)
304        DO NOTHING
305        RETURNING *
306        "#
307    } else {
308        r#"
309        INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
310        SELECT
311            s.kind,
312            s.queue,
313            s.args,
314            s.state::awa.job_state,
315            s.priority,
316            s.max_attempts,
317            CASE WHEN s.run_at = '\N' OR s.run_at IS NULL THEN now() ELSE s.run_at::timestamptz END,
318            s.metadata,
319            s.tags::text[],
320            CASE WHEN s.unique_key = '\N' OR s.unique_key IS NULL THEN NULL ELSE decode(s.unique_key, 'hex') END,
321            CASE WHEN s.unique_states = '\N' OR s.unique_states IS NULL THEN NULL ELSE s.unique_states::bit(8) END
322        FROM awa_copy_staging s
323        RETURNING *
324        "#
325    };
326
327    let results = sqlx::query_as::<_, JobRow>(insert_sql)
328        .fetch_all(&mut *conn)
329        .await?;
330
331    Ok(results)
332}
333
334/// Convenience wrapper that acquires a connection from the pool.
335///
336/// Wraps the operation in a transaction so the ON COMMIT DROP staging table
337/// is cleaned up automatically.
338#[tracing::instrument(skip(pool, jobs), fields(job.count = jobs.len()))]
339pub async fn insert_many_copy_from_pool(
340    pool: &PgPool,
341    jobs: &[InsertParams],
342) -> Result<Vec<JobRow>, AwaError> {
343    if jobs.is_empty() {
344        return Ok(Vec::new());
345    }
346
347    let mut tx = pool.begin().await?;
348    let results = insert_many_copy(&mut tx, jobs).await?;
349    tx.commit().await?;
350
351    Ok(results)
352}
353
354// ── CSV serialization helpers ────────────────────────────────────────
355
356/// Write one RowValues as a CSV line to the buffer.
357fn write_csv_row(buf: &mut Vec<u8>, row: &RowValues) {
358    // kind
359    write_csv_field(buf, &row.kind);
360    buf.push(b',');
361    // queue
362    write_csv_field(buf, &row.queue);
363    buf.push(b',');
364    // args (JSONB as text)
365    let args_str = serde_json::to_string(&row.args).expect("JSON serialization should not fail");
366    write_csv_field(buf, &args_str);
367    buf.push(b',');
368    // state
369    write_csv_field(buf, &row.state.to_string());
370    buf.push(b',');
371    // priority
372    buf.extend_from_slice(row.priority.to_string().as_bytes());
373    buf.push(b',');
374    // max_attempts
375    buf.extend_from_slice(row.max_attempts.to_string().as_bytes());
376    buf.push(b',');
377    // run_at (TIMESTAMPTZ as RFC 3339, or \N for NULL)
378    match &row.run_at {
379        Some(dt) => write_csv_field(buf, &dt.to_rfc3339()),
380        None => buf.extend_from_slice(b"\\N"),
381    }
382    buf.push(b',');
383    // metadata (JSONB as text)
384    let metadata_str =
385        serde_json::to_string(&row.metadata).expect("JSON serialization should not fail");
386    write_csv_field(buf, &metadata_str);
387    buf.push(b',');
388    // tags (Postgres text[] literal)
389    write_pg_text_array(buf, &row.tags);
390    buf.push(b',');
391    // unique_key (hex-encoded bytes, or \N for NULL)
392    match &row.unique_key {
393        Some(key) => {
394            // Encode as hex string (no \x prefix — we use decode(hex) in SQL)
395            let hex = hex::encode(key);
396            write_csv_field(buf, &hex);
397        }
398        None => buf.extend_from_slice(b"\\N"),
399    }
400    buf.push(b',');
401    // unique_states (bit string, or \N for NULL)
402    match &row.unique_states {
403        Some(bits) => write_csv_field(buf, bits),
404        None => buf.extend_from_slice(b"\\N"),
405    }
406    buf.push(b'\n');
407}
408
409/// Write a CSV field, quoting if it contains special characters.
410fn write_csv_field(buf: &mut Vec<u8>, value: &str) {
411    if value.contains(',')
412        || value.contains('"')
413        || value.contains('\n')
414        || value.contains('\r')
415        || value.contains('\\')
416    {
417        buf.push(b'"');
418        for byte in value.bytes() {
419            if byte == b'"' {
420                buf.push(b'"');
421            }
422            buf.push(byte);
423        }
424        buf.push(b'"');
425    } else {
426        buf.extend_from_slice(value.as_bytes());
427    }
428}
429
430/// Write a Postgres text[] array literal: `{elem1,"elem with , comma"}`.
431/// The entire literal is CSV-quoted because it always contains braces.
432fn write_pg_text_array(buf: &mut Vec<u8>, values: &[String]) {
433    buf.push(b'"');
434    buf.push(b'{');
435    for (i, val) in values.iter().enumerate() {
436        if i > 0 {
437            buf.push(b',');
438        }
439        // Postgres array elements need quoting if they contain special chars
440        if val.is_empty()
441            || val.contains(',')
442            || val.contains('"')
443            || val.contains('\\')
444            || val.contains('{')
445            || val.contains('}')
446            || val.contains(' ')
447            || val.eq_ignore_ascii_case("NULL")
448        {
449            // Double-quote the element inside the Postgres array literal.
450            // Inside CSV, the outer " are already handled; we need to
451            // double-quote for both Postgres and CSV escaping.
452            buf.push(b'"');
453            buf.push(b'"');
454            for ch in val.chars() {
455                match ch {
456                    '"' => {
457                        // Postgres array: \" but inside CSV: "" per quote
458                        // So we emit \""  → but CSV sees \"" which is wrong.
459                        // Correct approach: inside a CSV-quoted field,
460                        // literal " becomes "". Inside PG array, " becomes \".
461                        // Combined: \\\"\"  ... this gets complex.
462                        // Simpler: PG array uses \"  and CSV doubles " to "".
463                        // PG inside CSV: \""  (PG backslash-quote, CSV double-quote)
464                        buf.extend_from_slice(b"\\\"\"");
465                    }
466                    '\\' => {
467                        // PG array backslash: \\, but inside CSV " is doubled
468                        buf.extend_from_slice(b"\\\\");
469                    }
470                    _ => {
471                        let mut utf8_buf = [0u8; 4];
472                        buf.extend_from_slice(ch.encode_utf8(&mut utf8_buf).as_bytes());
473                    }
474                }
475            }
476            buf.push(b'"');
477            buf.push(b'"');
478        } else {
479            buf.extend_from_slice(val.as_bytes());
480        }
481    }
482    buf.push(b'}');
483    buf.push(b'"');
484}
485
486/// Convenience: create InsertParams from a JobArgs impl.
487pub fn params(args: &impl JobArgs) -> Result<InsertParams, AwaError> {
488    params_with(args, InsertOpts::default())
489}
490
491/// Convenience: create InsertParams from a JobArgs impl with options.
492pub fn params_with(args: &impl JobArgs, opts: InsertOpts) -> Result<InsertParams, AwaError> {
493    Ok(InsertParams {
494        kind: args.kind_str().to_string(),
495        args: args.to_args()?,
496        opts,
497    })
498}