1use chrono::{DateTime, Utc};
2use serde_json::Value;
3use sqlx::{Executor, Postgres};
4use stormchaser_model::step::StepStatus;
5use stormchaser_model::RunId;
6use stormchaser_model::StepInstanceId;
7
8#[allow(clippy::too_many_arguments)]
9pub async fn complete_step_instance<'a, E>(
11 executor: E,
12 status: &StepStatus,
13 exit_code: Option<i32>,
14 runner_id: Option<&str>,
15 id: StepInstanceId,
16) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
17where
18 E: Executor<'a, Database = Postgres>,
19{
20 sqlx::query(
21 r#"
22 UPDATE step_instances
23 SET status = $1, finished_at = NOW(), exit_code = $2, runner_id = COALESCE($3, runner_id)
24 WHERE id = $4
25 "#,
26 )
27 .bind(status)
28 .bind(exit_code)
29 .bind(runner_id)
30 .bind(id)
31 .execute(executor)
32 .await
33}
34
35#[allow(clippy::too_many_arguments)]
36pub async fn get_step_instances_by_run_id<'a, E, O>(
38 executor: E,
39 run_id: RunId,
40) -> Result<Vec<O>, sqlx::Error>
41where
42 E: Executor<'a, Database = Postgres>,
43 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
44{
45 sqlx::query_as::<_, O>(
46 r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE run_id = $1"#,
47 )
48 .bind(run_id)
49 .fetch_all(executor)
50 .await
51}
52
53#[allow(clippy::too_many_arguments)]
54pub async fn update_step_instance_status<'a, E>(
56 executor: E,
57 status: &StepStatus,
58 id: StepInstanceId,
59) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
60where
61 E: Executor<'a, Database = Postgres>,
62{
63 sqlx::query("UPDATE step_instances SET status = $1 WHERE id = $2")
64 .bind(status)
65 .bind(id)
66 .execute(executor)
67 .await
68}
69
70#[allow(clippy::too_many_arguments)]
71pub async fn get_step_spec_and_params<'a, E, O>(
73 executor: E,
74 id: StepInstanceId,
75) -> Result<O, sqlx::Error>
76where
77 E: Executor<'a, Database = Postgres>,
78 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
79{
80 sqlx::query_as::<_, O>("SELECT spec, params FROM step_instances WHERE id = $1")
81 .bind(id)
82 .fetch_one(executor)
83 .await
84}
85
86#[allow(clippy::too_many_arguments)]
87pub async fn fail_step_instance_with_error<'a, E>(
89 executor: E,
90 status: StepStatus,
91 error: &str,
92 exit_code: Option<i32>,
93 id: StepInstanceId,
94) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
95where
96 E: Executor<'a, Database = Postgres>,
97{
98 sqlx::query(
99 r#"
100 UPDATE step_instances
101 SET status = $1, finished_at = NOW(), error = $2, exit_code = $3
102 WHERE id = $4
103 "#,
104 )
105 .bind(status)
106 .bind(error)
107 .bind(exit_code)
108 .bind(id)
109 .execute(executor)
110 .await
111}
112
113pub async fn record_step_status_history<'a, E>(
115 executor: E,
116 step_instance_id: StepInstanceId,
117 status: &StepStatus,
118) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
119where
120 E: Executor<'a, Database = Postgres>,
121{
122 sqlx::query("INSERT INTO step_status_history (step_instance_id, status) VALUES ($1, $2)")
123 .bind(step_instance_id)
124 .bind(status)
125 .execute(executor)
126 .await
127}
128
129#[allow(clippy::too_many_arguments)]
130pub async fn insert_step_instance<'a, E>(
132 executor: E,
133 id: StepInstanceId,
134 run_id: RunId,
135 step_name: &str,
136 step_type: &str,
137 status: StepStatus,
138 created_at: DateTime<Utc>,
139) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
140where
141 E: Executor<'a, Database = Postgres>,
142{
143 sqlx::query(
144 r#"
145 WITH inserted AS (
146 INSERT INTO step_instances (id, run_id, step_name, step_type, status, created_at)
147 VALUES ($1, $2, $3, $4, $5, $6)
148 ON CONFLICT DO NOTHING
149 RETURNING id
150 )
151 INSERT INTO step_status_history (step_instance_id, status)
152 SELECT id, $5 FROM inserted
153 "#,
154 )
155 .bind(id)
156 .bind(run_id)
157 .bind(step_name)
158 .bind(step_type)
159 .bind(status)
160 .bind(created_at)
161 .execute(executor)
162 .await
163}
164
165#[allow(clippy::too_many_arguments)]
166pub async fn count_running_steps_for_run<'a, E, O>(
168 executor: E,
169 run_id: RunId,
170) -> Result<O, sqlx::Error>
171where
172 E: Executor<'a, Database = Postgres>,
173 O: Send + Unpin,
174 (O,): for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
175{
176 sqlx::query_scalar::<_, O>(
177 r#"SELECT COUNT(*) FROM step_instances WHERE run_id = $1 AND status = 'running'"#,
178 )
179 .bind(run_id)
180 .fetch_one(executor)
181 .await
182}
183
184#[allow(clippy::too_many_arguments)]
185pub async fn insert_step_instance_with_spec<'a, E>(
187 executor: E,
188 id: StepInstanceId,
189 run_id: RunId,
190 step_name: &str,
191 step_type: &str,
192 status: StepStatus,
193 iteration_index: Option<i32>,
194 spec: Value,
195 params: Value,
196 created_at: DateTime<Utc>,
197) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
198where
199 E: Executor<'a, Database = Postgres>,
200{
201 sqlx::query(
202 r#"
203 WITH inserted AS (
204 INSERT INTO step_instances (id, run_id, step_name, step_type, status, iteration_index, spec, params, created_at)
205 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
206 RETURNING id
207 )
208 INSERT INTO step_status_history (step_instance_id, status)
209 SELECT id, $5 FROM inserted
210 "#,
211 )
212 .bind(id)
213 .bind(run_id)
214 .bind(step_name)
215 .bind(step_type)
216 .bind(status)
217 .bind(iteration_index)
218 .bind(spec)
219 .bind(params)
220 .bind(created_at)
221 .execute(executor)
222 .await
223}
224
225#[allow(clippy::too_many_arguments)]
226pub async fn insert_step_instance_with_spec_on_conflict_do_nothing<'a, E>(
228 executor: E,
229 id: StepInstanceId,
230 run_id: RunId,
231 step_name: &str,
232 step_type: &str,
233 status: StepStatus,
234 iteration_index: Option<i32>,
235 spec: Value,
236 params: Value,
237 created_at: DateTime<Utc>,
238) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
239where
240 E: Executor<'a, Database = Postgres>,
241{
242 sqlx::query(
243 r#"
244 WITH inserted AS (
245 INSERT INTO step_instances (id, run_id, step_name, step_type, status, iteration_index, spec, params, created_at)
246 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
247 ON CONFLICT DO NOTHING
248 RETURNING id
249 )
250 INSERT INTO step_status_history (step_instance_id, status)
251 SELECT id, $5 FROM inserted
252 "#,
253 )
254 .bind(id)
255 .bind(run_id)
256 .bind(step_name)
257 .bind(step_type)
258 .bind(status)
259 .bind(iteration_index)
260 .bind(spec)
261 .bind(params)
262 .bind(created_at)
263 .execute(executor)
264 .await
265}
266
267#[allow(clippy::too_many_arguments)]
268pub async fn get_pending_step_instances_for_run<'a, E, O>(
270 executor: E,
271 run_id: RunId,
272 limit: i64,
273) -> Result<Vec<O>, sqlx::Error>
274where
275 E: Executor<'a, Database = Postgres>,
276 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
277{
278 sqlx::query_as::<_, O>(
279 r#"
280 SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at
281 FROM step_instances
282 WHERE run_id = $1 AND status = 'pending' AND step_type NOT IN ('Approval', 'Wait')
283 ORDER BY created_at ASC
284 LIMIT $2
285 "#
286 )
287 .bind(run_id)
288 .bind(limit)
289 .fetch_all(executor)
290 .await
291}
292
293#[allow(clippy::too_many_arguments)]
294pub async fn get_step_instance_by_id<'a, E, O>(
296 executor: E,
297 id: StepInstanceId,
298) -> Result<Option<O>, sqlx::Error>
299where
300 E: Executor<'a, Database = Postgres>,
301 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
302{
303 sqlx::query_as::<_, O>(
304 r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE id = $1"#
305 )
306 .bind(id)
307 .fetch_optional(executor)
308 .await
309}
310
311#[allow(clippy::too_many_arguments)]
312pub async fn fail_pending_steps_for_run_on_timeout<'a, E>(
314 executor: E,
315 run_id: RunId,
316) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
317where
318 E: Executor<'a, Database = Postgres>,
319{
320 sqlx::query(
321 r#"
322 UPDATE step_instances
323 SET status = 'failed', error = 'Workflow timed out', finished_at = NOW()
324 WHERE run_id = $1 AND status IN ('pending', 'running', 'waiting_for_event')
325 "#,
326 )
327 .bind(run_id)
328 .execute(executor)
329 .await
330}
331
332#[allow(clippy::too_many_arguments)]
333pub async fn update_step_instance_running<'a, E>(
335 executor: E,
336 status: &StepStatus,
337 runner_id: Option<&str>,
338 id: StepInstanceId,
339) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
340where
341 E: Executor<'a, Database = Postgres>,
342{
343 sqlx::query(
344 "UPDATE step_instances SET status = $1, started_at = COALESCE(started_at, NOW()), runner_id = COALESCE($2, runner_id) WHERE id = $3"
345 )
346 .bind(status)
347 .bind(runner_id)
348 .bind(id)
349 .execute(executor)
350 .await
351}
352
353#[allow(clippy::too_many_arguments)]
354pub async fn update_step_instance_terminal<'a, E>(
356 executor: E,
357 status: &StepStatus,
358 id: StepInstanceId,
359) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
360where
361 E: Executor<'a, Database = Postgres>,
362{
363 sqlx::query("UPDATE step_instances SET status = $1, finished_at = NOW() WHERE id = $2")
364 .bind(status)
365 .bind(id)
366 .execute(executor)
367 .await
368}
369
370pub async fn get_step_type_and_spec<'a, E, O>(
372 executor: E,
373 id: StepInstanceId,
374) -> Result<O, sqlx::Error>
375where
376 E: Executor<'a, Database = Postgres>,
377 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
378{
379 sqlx::query_as::<_, O>("SELECT step_type, spec FROM step_instances WHERE id = $1")
380 .bind(id)
381 .fetch_one(executor)
382 .await
383}