Skip to main content

awa_model/
insert.rs

1use crate::error::AwaError;
2use crate::job::{InsertOpts, InsertParams, JobRow, JobState};
3use crate::unique::compute_unique_key;
4use crate::JobArgs;
5use sqlx::PgExecutor;
6
7/// Insert a job with default options.
8pub async fn insert<'e, E>(executor: E, args: &impl JobArgs) -> Result<JobRow, AwaError>
9where
10    E: PgExecutor<'e>,
11{
12    insert_with(executor, args, InsertOpts::default()).await
13}
14
15/// Insert a job with custom options.
16#[tracing::instrument(skip(executor, args), fields(job.kind = args.kind_str(), job.queue = %opts.queue))]
17pub async fn insert_with<'e, E>(
18    executor: E,
19    args: &impl JobArgs,
20    opts: InsertOpts,
21) -> Result<JobRow, AwaError>
22where
23    E: PgExecutor<'e>,
24{
25    let kind = args.kind_str();
26    let args_json = args.to_args()?;
27
28    let state = if opts.run_at.is_some() {
29        JobState::Scheduled
30    } else {
31        JobState::Available
32    };
33
34    let unique_key = opts.unique.as_ref().map(|u| {
35        compute_unique_key(
36            kind,
37            if u.by_queue { Some(&opts.queue) } else { None },
38            if u.by_args { Some(&args_json) } else { None },
39            u.by_period,
40        )
41    });
42
43    let unique_states_bits: Option<String> = opts.unique.as_ref().map(|u| {
44        // Build a bit string where PG bit position N (leftmost = 0) corresponds
45        // to Rust bit N (least-significant = 0). PostgreSQL's get_bit(bitmask, N)
46        // reads from the left, so we place Rust bit 0 at the leftmost position.
47        let mut bit_string = String::with_capacity(8);
48        for bit_position in 0..8 {
49            if u.states & (1 << bit_position) != 0 {
50                bit_string.push('1');
51            } else {
52                bit_string.push('0');
53            }
54        }
55        bit_string
56    });
57
58    let row = sqlx::query_as::<_, JobRow>(
59        r#"
60        INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
61        VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
62        RETURNING *
63        "#,
64    )
65    .bind(kind)
66    .bind(&opts.queue)
67    .bind(&args_json)
68    .bind(state)
69    .bind(opts.priority)
70    .bind(opts.max_attempts)
71    .bind(opts.run_at)
72    .bind(&opts.metadata)
73    .bind(&opts.tags)
74    .bind(&unique_key)
75    .bind(&unique_states_bits)
76    .fetch_one(executor)
77    .await
78    .map_err(|err| {
79        if let sqlx::Error::Database(ref db_err) = err {
80            if db_err.code().as_deref() == Some("23505") {
81                // Unique constraint violation. The conflicting row ID isn't
82                // available from the PG error message directly — callers can
83                // query by unique_key if they need it.
84                return AwaError::UniqueConflict {
85                    existing_id: db_err
86                        .constraint()
87                        .map(|c| c.to_string()),
88                };
89            }
90        }
91        AwaError::Database(err)
92    })?;
93
94    Ok(row)
95}
96
97/// Insert multiple jobs in a single statement.
98///
99/// Supports uniqueness constraints — jobs with `unique` opts will have their
100/// `unique_key` and `unique_states` computed and included.
101#[tracing::instrument(skip(executor, jobs), fields(job.count = jobs.len()))]
102pub async fn insert_many<'e, E>(executor: E, jobs: &[InsertParams]) -> Result<Vec<JobRow>, AwaError>
103where
104    E: PgExecutor<'e>,
105{
106    if jobs.is_empty() {
107        return Ok(Vec::new());
108    }
109
110    let count = jobs.len();
111
112    // Pre-compute all values including unique keys
113    struct RowValues {
114        kind: String,
115        queue: String,
116        args: serde_json::Value,
117        state: JobState,
118        priority: i16,
119        max_attempts: i16,
120        run_at: Option<chrono::DateTime<chrono::Utc>>,
121        metadata: serde_json::Value,
122        tags: Vec<String>,
123        unique_key: Option<Vec<u8>>,
124        unique_states: Option<String>,
125    }
126
127    let rows: Vec<RowValues> = jobs
128        .iter()
129        .map(|job| {
130            let unique_key = job.opts.unique.as_ref().map(|u| {
131                compute_unique_key(
132                    &job.kind,
133                    if u.by_queue {
134                        Some(job.opts.queue.as_str())
135                    } else {
136                        None
137                    },
138                    if u.by_args { Some(&job.args) } else { None },
139                    u.by_period,
140                )
141            });
142
143            let unique_states = job.opts.unique.as_ref().map(|u| {
144                let mut bit_string = String::with_capacity(8);
145                for bit_position in 0..8 {
146                    if u.states & (1 << bit_position) != 0 {
147                        bit_string.push('1');
148                    } else {
149                        bit_string.push('0');
150                    }
151                }
152                bit_string
153            });
154
155            RowValues {
156                kind: job.kind.clone(),
157                queue: job.opts.queue.clone(),
158                args: job.args.clone(),
159                state: if job.opts.run_at.is_some() {
160                    JobState::Scheduled
161                } else {
162                    JobState::Available
163                },
164                priority: job.opts.priority,
165                max_attempts: job.opts.max_attempts,
166                run_at: job.opts.run_at,
167                metadata: job.opts.metadata.clone(),
168                tags: job.opts.tags.clone(),
169                unique_key,
170                unique_states,
171            }
172        })
173        .collect();
174
175    // Build multi-row INSERT with all columns including unique_key/unique_states
176    let mut query = String::from(
177        "INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) VALUES ",
178    );
179
180    let params_per_row = 11u32;
181    let mut param_index = 1u32;
182    for i in 0..count {
183        if i > 0 {
184            query.push_str(", ");
185        }
186        query.push_str(&format!(
187            "(${}, ${}, ${}, ${}, ${}, ${}, COALESCE(${}, now()), ${}, ${}, ${}, ${}::bit(8))",
188            param_index,
189            param_index + 1,
190            param_index + 2,
191            param_index + 3,
192            param_index + 4,
193            param_index + 5,
194            param_index + 6,
195            param_index + 7,
196            param_index + 8,
197            param_index + 9,
198            param_index + 10,
199        ));
200        param_index += params_per_row;
201    }
202    query.push_str(" RETURNING *");
203
204    let mut sql_query = sqlx::query_as::<_, JobRow>(&query);
205
206    for row in &rows {
207        sql_query = sql_query
208            .bind(&row.kind)
209            .bind(&row.queue)
210            .bind(&row.args)
211            .bind(row.state)
212            .bind(row.priority)
213            .bind(row.max_attempts)
214            .bind(row.run_at)
215            .bind(&row.metadata)
216            .bind(&row.tags)
217            .bind(&row.unique_key)
218            .bind(&row.unique_states);
219    }
220
221    let results = sql_query.fetch_all(executor).await?;
222
223    Ok(results)
224}
225
226/// Convenience: create InsertParams from a JobArgs impl.
227pub fn params(args: &impl JobArgs) -> Result<InsertParams, AwaError> {
228    params_with(args, InsertOpts::default())
229}
230
231/// Convenience: create InsertParams from a JobArgs impl with options.
232pub fn params_with(args: &impl JobArgs, opts: InsertOpts) -> Result<InsertParams, AwaError> {
233    Ok(InsertParams {
234        kind: args.kind_str().to_string(),
235        args: args.to_args()?,
236        opts,
237    })
238}