1use crate::error::AwaError;
2use crate::job::{InsertOpts, InsertParams, JobRow, JobState};
3use crate::unique::compute_unique_key;
4use crate::JobArgs;
5use sqlx::PgExecutor;
6
7pub async fn insert<'e, E>(executor: E, args: &impl JobArgs) -> Result<JobRow, AwaError>
9where
10 E: PgExecutor<'e>,
11{
12 insert_with(executor, args, InsertOpts::default()).await
13}
14
15#[tracing::instrument(skip(executor, args), fields(job.kind = args.kind_str(), job.queue = %opts.queue))]
17pub async fn insert_with<'e, E>(
18 executor: E,
19 args: &impl JobArgs,
20 opts: InsertOpts,
21) -> Result<JobRow, AwaError>
22where
23 E: PgExecutor<'e>,
24{
25 let kind = args.kind_str();
26 let args_json = args.to_args()?;
27
28 let state = if opts.run_at.is_some() {
29 JobState::Scheduled
30 } else {
31 JobState::Available
32 };
33
34 let unique_key = opts.unique.as_ref().map(|u| {
35 compute_unique_key(
36 kind,
37 if u.by_queue { Some(&opts.queue) } else { None },
38 if u.by_args { Some(&args_json) } else { None },
39 u.by_period,
40 )
41 });
42
43 let unique_states_bits: Option<String> = opts.unique.as_ref().map(|u| {
44 let mut bit_string = String::with_capacity(8);
48 for bit_position in 0..8 {
49 if u.states & (1 << bit_position) != 0 {
50 bit_string.push('1');
51 } else {
52 bit_string.push('0');
53 }
54 }
55 bit_string
56 });
57
58 let row = sqlx::query_as::<_, JobRow>(
59 r#"
60 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states)
61 VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, now()), $8, $9, $10, $11::bit(8))
62 RETURNING *
63 "#,
64 )
65 .bind(kind)
66 .bind(&opts.queue)
67 .bind(&args_json)
68 .bind(state)
69 .bind(opts.priority)
70 .bind(opts.max_attempts)
71 .bind(opts.run_at)
72 .bind(&opts.metadata)
73 .bind(&opts.tags)
74 .bind(&unique_key)
75 .bind(&unique_states_bits)
76 .fetch_one(executor)
77 .await
78 .map_err(|err| {
79 if let sqlx::Error::Database(ref db_err) = err {
80 if db_err.code().as_deref() == Some("23505") {
81 return AwaError::UniqueConflict {
85 existing_id: db_err
86 .constraint()
87 .map(|c| c.to_string()),
88 };
89 }
90 }
91 AwaError::Database(err)
92 })?;
93
94 Ok(row)
95}
96
97#[tracing::instrument(skip(executor, jobs), fields(job.count = jobs.len()))]
102pub async fn insert_many<'e, E>(executor: E, jobs: &[InsertParams]) -> Result<Vec<JobRow>, AwaError>
103where
104 E: PgExecutor<'e>,
105{
106 if jobs.is_empty() {
107 return Ok(Vec::new());
108 }
109
110 let count = jobs.len();
111
112 struct RowValues {
114 kind: String,
115 queue: String,
116 args: serde_json::Value,
117 state: JobState,
118 priority: i16,
119 max_attempts: i16,
120 run_at: Option<chrono::DateTime<chrono::Utc>>,
121 metadata: serde_json::Value,
122 tags: Vec<String>,
123 unique_key: Option<Vec<u8>>,
124 unique_states: Option<String>,
125 }
126
127 let rows: Vec<RowValues> = jobs
128 .iter()
129 .map(|job| {
130 let unique_key = job.opts.unique.as_ref().map(|u| {
131 compute_unique_key(
132 &job.kind,
133 if u.by_queue {
134 Some(job.opts.queue.as_str())
135 } else {
136 None
137 },
138 if u.by_args { Some(&job.args) } else { None },
139 u.by_period,
140 )
141 });
142
143 let unique_states = job.opts.unique.as_ref().map(|u| {
144 let mut bit_string = String::with_capacity(8);
145 for bit_position in 0..8 {
146 if u.states & (1 << bit_position) != 0 {
147 bit_string.push('1');
148 } else {
149 bit_string.push('0');
150 }
151 }
152 bit_string
153 });
154
155 RowValues {
156 kind: job.kind.clone(),
157 queue: job.opts.queue.clone(),
158 args: job.args.clone(),
159 state: if job.opts.run_at.is_some() {
160 JobState::Scheduled
161 } else {
162 JobState::Available
163 },
164 priority: job.opts.priority,
165 max_attempts: job.opts.max_attempts,
166 run_at: job.opts.run_at,
167 metadata: job.opts.metadata.clone(),
168 tags: job.opts.tags.clone(),
169 unique_key,
170 unique_states,
171 }
172 })
173 .collect();
174
175 let mut query = String::from(
177 "INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, run_at, metadata, tags, unique_key, unique_states) VALUES ",
178 );
179
180 let params_per_row = 11u32;
181 let mut param_index = 1u32;
182 for i in 0..count {
183 if i > 0 {
184 query.push_str(", ");
185 }
186 query.push_str(&format!(
187 "(${}, ${}, ${}, ${}, ${}, ${}, COALESCE(${}, now()), ${}, ${}, ${}, ${}::bit(8))",
188 param_index,
189 param_index + 1,
190 param_index + 2,
191 param_index + 3,
192 param_index + 4,
193 param_index + 5,
194 param_index + 6,
195 param_index + 7,
196 param_index + 8,
197 param_index + 9,
198 param_index + 10,
199 ));
200 param_index += params_per_row;
201 }
202 query.push_str(" RETURNING *");
203
204 let mut sql_query = sqlx::query_as::<_, JobRow>(&query);
205
206 for row in &rows {
207 sql_query = sql_query
208 .bind(&row.kind)
209 .bind(&row.queue)
210 .bind(&row.args)
211 .bind(row.state)
212 .bind(row.priority)
213 .bind(row.max_attempts)
214 .bind(row.run_at)
215 .bind(&row.metadata)
216 .bind(&row.tags)
217 .bind(&row.unique_key)
218 .bind(&row.unique_states);
219 }
220
221 let results = sql_query.fetch_all(executor).await?;
222
223 Ok(results)
224}
225
226pub fn params(args: &impl JobArgs) -> Result<InsertParams, AwaError> {
228 params_with(args, InsertOpts::default())
229}
230
231pub fn params_with(args: &impl JobArgs, opts: InsertOpts) -> Result<InsertParams, AwaError> {
233 Ok(InsertParams {
234 kind: args.kind_str().to_string(),
235 args: args.to_args()?,
236 opts,
237 })
238}