Skip to main content

stormchaser_engine/db/steps/
instances.rs

1use chrono::{DateTime, Utc};
2use serde_json::Value;
3use sqlx::{Executor, Postgres};
4use stormchaser_model::step::StepStatus;
5use stormchaser_model::RunId;
6use stormchaser_model::StepInstanceId;
7
8#[allow(clippy::too_many_arguments)]
9/// Complete step instance.
10pub async fn complete_step_instance<'a, E>(
11    executor: E,
12    status: &StepStatus,
13    exit_code: Option<i32>,
14    runner_id: Option<&str>,
15    id: StepInstanceId,
16) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
17where
18    E: Executor<'a, Database = Postgres>,
19{
20    sqlx::query(
21        r#"
22        UPDATE step_instances
23        SET status = $1, finished_at = NOW(), exit_code = $2, runner_id = COALESCE($3, runner_id)
24        WHERE id = $4
25        "#,
26    )
27    .bind(status)
28    .bind(exit_code)
29    .bind(runner_id)
30    .bind(id)
31    .execute(executor)
32    .await
33}
34
35#[allow(clippy::too_many_arguments)]
36/// Get step instances by run id.
37pub async fn get_step_instances_by_run_id<'a, E, O>(
38    executor: E,
39    run_id: RunId,
40) -> Result<Vec<O>, sqlx::Error>
41where
42    E: Executor<'a, Database = Postgres>,
43    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
44{
45    sqlx::query_as::<_, O>(
46        r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE run_id = $1"#,
47    )
48    .bind(run_id)
49    .fetch_all(executor)
50    .await
51}
52
53#[allow(clippy::too_many_arguments)]
54/// Update step instance status.
55pub async fn update_step_instance_status<'a, E>(
56    executor: E,
57    status: &StepStatus,
58    id: StepInstanceId,
59) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
60where
61    E: Executor<'a, Database = Postgres>,
62{
63    sqlx::query("UPDATE step_instances SET status = $1 WHERE id = $2")
64        .bind(status)
65        .bind(id)
66        .execute(executor)
67        .await
68}
69
70#[allow(clippy::too_many_arguments)]
71/// Get step spec and params.
72pub async fn get_step_spec_and_params<'a, E, O>(
73    executor: E,
74    id: StepInstanceId,
75) -> Result<O, sqlx::Error>
76where
77    E: Executor<'a, Database = Postgres>,
78    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
79{
80    sqlx::query_as::<_, O>("SELECT spec, params FROM step_instances WHERE id = $1")
81        .bind(id)
82        .fetch_one(executor)
83        .await
84}
85
86#[allow(clippy::too_many_arguments)]
87/// Fail step instance with error.
88pub async fn fail_step_instance_with_error<'a, E>(
89    executor: E,
90    status: StepStatus,
91    error: &str,
92    exit_code: Option<i32>,
93    id: StepInstanceId,
94) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
95where
96    E: Executor<'a, Database = Postgres>,
97{
98    sqlx::query(
99        r#"
100        UPDATE step_instances
101        SET status = $1, finished_at = NOW(), error = $2, exit_code = $3
102        WHERE id = $4
103        "#,
104    )
105    .bind(status)
106    .bind(error)
107    .bind(exit_code)
108    .bind(id)
109    .execute(executor)
110    .await
111}
112
113/// Record step status history.
114pub async fn record_step_status_history<'a, E>(
115    executor: E,
116    step_instance_id: StepInstanceId,
117    status: &StepStatus,
118) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
119where
120    E: Executor<'a, Database = Postgres>,
121{
122    sqlx::query("INSERT INTO step_status_history (step_instance_id, status) VALUES ($1, $2)")
123        .bind(step_instance_id)
124        .bind(status)
125        .execute(executor)
126        .await
127}
128
129#[allow(clippy::too_many_arguments)]
130/// Insert step instance.
131pub async fn insert_step_instance<'a, E>(
132    executor: E,
133    id: StepInstanceId,
134    run_id: RunId,
135    step_name: &str,
136    step_type: &str,
137    status: StepStatus,
138    created_at: DateTime<Utc>,
139) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
140where
141    E: Executor<'a, Database = Postgres>,
142{
143    sqlx::query(
144        r#"
145                WITH inserted AS (
146                    INSERT INTO step_instances (id, run_id, step_name, step_type, status, created_at)
147                    VALUES ($1, $2, $3, $4, $5, $6)
148                    ON CONFLICT DO NOTHING
149                    RETURNING id
150                )
151                INSERT INTO step_status_history (step_instance_id, status)
152                SELECT id, $5 FROM inserted
153                "#,
154    )
155    .bind(id)
156    .bind(run_id)
157    .bind(step_name)
158    .bind(step_type)
159    .bind(status)
160    .bind(created_at)
161    .execute(executor)
162    .await
163}
164
165#[allow(clippy::too_many_arguments)]
166/// Count running steps for run.
167pub async fn count_running_steps_for_run<'a, E, O>(
168    executor: E,
169    run_id: RunId,
170) -> Result<O, sqlx::Error>
171where
172    E: Executor<'a, Database = Postgres>,
173    O: Send + Unpin,
174    (O,): for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
175{
176    sqlx::query_scalar::<_, O>(
177        r#"SELECT COUNT(*) FROM step_instances WHERE run_id = $1 AND status = 'running'"#,
178    )
179    .bind(run_id)
180    .fetch_one(executor)
181    .await
182}
183
184#[allow(clippy::too_many_arguments)]
185/// Insert step instance with spec.
186pub async fn insert_step_instance_with_spec<'a, E>(
187    executor: E,
188    id: StepInstanceId,
189    run_id: RunId,
190    step_name: &str,
191    step_type: &str,
192    status: StepStatus,
193    iteration_index: Option<i32>,
194    spec: Value,
195    params: Value,
196    created_at: DateTime<Utc>,
197) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
198where
199    E: Executor<'a, Database = Postgres>,
200{
201    sqlx::query(
202        r#"
203                WITH inserted AS (
204                    INSERT INTO step_instances (id, run_id, step_name, step_type, status, iteration_index, spec, params, created_at)
205                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
206                    RETURNING id
207                )
208                INSERT INTO step_status_history (step_instance_id, status)
209                SELECT id, $5 FROM inserted
210                "#,
211    )
212    .bind(id)
213    .bind(run_id)
214    .bind(step_name)
215    .bind(step_type)
216    .bind(status)
217    .bind(iteration_index)
218    .bind(spec)
219    .bind(params)
220    .bind(created_at)
221    .execute(executor)
222    .await
223}
224
225#[allow(clippy::too_many_arguments)]
226/// Insert step instance with spec on conflict do nothing.
227pub async fn insert_step_instance_with_spec_on_conflict_do_nothing<'a, E>(
228    executor: E,
229    id: StepInstanceId,
230    run_id: RunId,
231    step_name: &str,
232    step_type: &str,
233    status: StepStatus,
234    iteration_index: Option<i32>,
235    spec: Value,
236    params: Value,
237    created_at: DateTime<Utc>,
238) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
239where
240    E: Executor<'a, Database = Postgres>,
241{
242    sqlx::query(
243        r#"
244            WITH inserted AS (
245                INSERT INTO step_instances (id, run_id, step_name, step_type, status, iteration_index, spec, params, created_at)
246                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
247                ON CONFLICT DO NOTHING
248                RETURNING id
249            )
250            INSERT INTO step_status_history (step_instance_id, status)
251            SELECT id, $5 FROM inserted
252            "#,
253    )
254    .bind(id)
255    .bind(run_id)
256    .bind(step_name)
257    .bind(step_type)
258    .bind(status)
259    .bind(iteration_index)
260    .bind(spec)
261    .bind(params)
262    .bind(created_at)
263    .execute(executor)
264    .await
265}
266
267#[allow(clippy::too_many_arguments)]
268/// Get pending step instances for run.
269pub async fn get_pending_step_instances_for_run<'a, E, O>(
270    executor: E,
271    run_id: RunId,
272    limit: i64,
273) -> Result<Vec<O>, sqlx::Error>
274where
275    E: Executor<'a, Database = Postgres>,
276    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
277{
278    sqlx::query_as::<_, O>(
279        r#"
280        SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at
281        FROM step_instances
282        WHERE run_id = $1 AND status = 'pending' AND step_type NOT IN ('Approval', 'Wait')
283        ORDER BY created_at ASC
284        LIMIT $2
285        "#
286    )
287    .bind(run_id)
288    .bind(limit)
289    .fetch_all(executor)
290    .await
291}
292
293#[allow(clippy::too_many_arguments)]
294/// Get step instance by id.
295pub async fn get_step_instance_by_id<'a, E, O>(
296    executor: E,
297    id: StepInstanceId,
298) -> Result<Option<O>, sqlx::Error>
299where
300    E: Executor<'a, Database = Postgres>,
301    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
302{
303    sqlx::query_as::<_, O>(
304        r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE id = $1"#
305    )
306    .bind(id)
307    .fetch_optional(executor)
308    .await
309}
310
311#[allow(clippy::too_many_arguments)]
312/// Fail pending steps for run on timeout.
313pub async fn fail_pending_steps_for_run_on_timeout<'a, E>(
314    executor: E,
315    run_id: RunId,
316) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
317where
318    E: Executor<'a, Database = Postgres>,
319{
320    sqlx::query(
321        r#"
322        UPDATE step_instances
323        SET status = 'failed', error = 'Workflow timed out', finished_at = NOW()
324        WHERE run_id = $1 AND status IN ('pending', 'running', 'waiting_for_event')
325        "#,
326    )
327    .bind(run_id)
328    .execute(executor)
329    .await
330}
331
332#[allow(clippy::too_many_arguments)]
333/// Update step instance running.
334pub async fn update_step_instance_running<'a, E>(
335    executor: E,
336    status: &StepStatus,
337    runner_id: Option<&str>,
338    id: StepInstanceId,
339) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
340where
341    E: Executor<'a, Database = Postgres>,
342{
343    sqlx::query(
344        "UPDATE step_instances SET status = $1, started_at = COALESCE(started_at, NOW()), runner_id = COALESCE($2, runner_id) WHERE id = $3"
345    )
346    .bind(status)
347    .bind(runner_id)
348    .bind(id)
349    .execute(executor)
350    .await
351}
352
353#[allow(clippy::too_many_arguments)]
354/// Update step instance terminal.
355pub async fn update_step_instance_terminal<'a, E>(
356    executor: E,
357    status: &StepStatus,
358    id: StepInstanceId,
359) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
360where
361    E: Executor<'a, Database = Postgres>,
362{
363    sqlx::query("UPDATE step_instances SET status = $1, finished_at = NOW() WHERE id = $2")
364        .bind(status)
365        .bind(id)
366        .execute(executor)
367        .await
368}
369
370/// Get step type and spec.
371pub async fn get_step_type_and_spec<'a, E, O>(
372    executor: E,
373    id: StepInstanceId,
374) -> Result<O, sqlx::Error>
375where
376    E: Executor<'a, Database = Postgres>,
377    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
378{
379    sqlx::query_as::<_, O>("SELECT step_type, spec FROM step_instances WHERE id = $1")
380        .bind(id)
381        .fetch_one(executor)
382        .await
383}