Skip to main content

stormchaser_engine/db/
steps.rs

1use chrono::{DateTime, Utc};
2use serde_json::Value;
3use sqlx::{Executor, Postgres};
4use stormchaser_model::step::StepStatus;
5use uuid::Uuid;
6
7use stormchaser_model::test_report;
8
9/// Stepdefinitioninput.
10pub struct StepDefinitionInput {
11    /// The step type.
12    pub step_type: String,
13    /// The schema.
14    pub schema: Value,
15    /// The documentation.
16    pub documentation: Option<String>,
17}
18
19#[allow(clippy::too_many_arguments)]
20/// Upsert step definition.
21pub async fn upsert_step_definition<'a, E>(
22    executor: E,
23    step_type: &str,
24    schema: &Value,
25    documentation: Option<&str>,
26) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
27where
28    E: Executor<'a, Database = Postgres>,
29{
30    sqlx::query(
31        r#"
32                INSERT INTO step_definitions (step_type, schema, documentation, registered_at)
33                VALUES ($1, $2, $3, NOW())
34                ON CONFLICT (step_type) DO UPDATE SET
35                    schema = EXCLUDED.schema,
36                    documentation = EXCLUDED.documentation
37                "#,
38    )
39    .bind(step_type)
40    .bind(schema)
41    .bind(documentation)
42    .execute(executor)
43    .await
44}
45
46#[allow(clippy::too_many_arguments)]
47/// Upsert step definition with wasm.
48pub async fn upsert_step_definition_with_wasm<'a, E>(
49    executor: E,
50    step_type: &str,
51    schema: &Value,
52    documentation: Option<&str>,
53    wasm_module: &str,
54    wasm_function: &str,
55    wasm_config: &Value,
56) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
57where
58    E: Executor<'a, Database = Postgres>,
59{
60    sqlx::query(
61        r#"
62        INSERT INTO step_definitions (step_type, schema, documentation, registered_at, wasm_module, wasm_function, wasm_config)
63        VALUES ($1, $2, $3, NOW(), $4, $5, $6)
64        ON CONFLICT (step_type) DO UPDATE SET
65            schema = EXCLUDED.schema,
66            documentation = EXCLUDED.documentation,
67            wasm_module = EXCLUDED.wasm_module,
68            wasm_function = EXCLUDED.wasm_function,
69            wasm_config = EXCLUDED.wasm_config
70        "#,
71    )
72    .bind(step_type)
73    .bind(schema)
74    .bind(documentation)
75    .bind(wasm_module)
76    .bind(wasm_function)
77    .bind(wasm_config)
78    .execute(executor)
79    .await
80}
81
82#[allow(clippy::too_many_arguments)]
83/// Complete step instance.
84pub async fn complete_step_instance<'a, E>(
85    executor: E,
86    status: &StepStatus,
87    exit_code: Option<i32>,
88    runner_id: Option<&str>,
89    id: Uuid,
90) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
91where
92    E: Executor<'a, Database = Postgres>,
93{
94    sqlx::query(
95        r#"
96        UPDATE step_instances
97        SET status = $1, finished_at = NOW(), exit_code = $2, runner_id = COALESCE($3, runner_id)
98        WHERE id = $4
99        "#,
100    )
101    .bind(status)
102    .bind(exit_code)
103    .bind(runner_id)
104    .bind(id)
105    .execute(executor)
106    .await
107}
108
109#[allow(clippy::too_many_arguments)]
110/// Get step instances by run id.
111pub async fn get_step_instances_by_run_id<'a, E, O>(
112    executor: E,
113    run_id: Uuid,
114) -> Result<Vec<O>, sqlx::Error>
115where
116    E: Executor<'a, Database = Postgres>,
117    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
118{
119    sqlx::query_as::<_, O>(
120        r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE run_id = $1"#,
121    )
122    .bind(run_id)
123    .fetch_all(executor)
124    .await
125}
126
127#[allow(clippy::too_many_arguments)]
128/// Upsert step output.
129pub async fn upsert_step_output<'a, E>(
130    executor: E,
131    step_instance_id: Uuid,
132    key: &str,
133    value: &Value,
134) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
135where
136    E: Executor<'a, Database = Postgres>,
137{
138    sqlx::query(
139        r#"
140                INSERT INTO step_outputs (step_instance_id, key, value)
141                VALUES ($1, $2, $3)
142                ON CONFLICT (step_instance_id, key) DO UPDATE SET value = EXCLUDED.value
143                "#,
144    )
145    .bind(step_instance_id)
146    .bind(key)
147    .bind(value)
148    .execute(executor)
149    .await
150}
151
152#[allow(clippy::too_many_arguments)]
153/// Upsert step output with sensitivity.
154pub async fn upsert_step_output_with_sensitivity<'a, E>(
155    executor: E,
156    step_instance_id: Uuid,
157    key: &str,
158    value: &Value,
159    is_sensitive: bool,
160) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
161where
162    E: Executor<'a, Database = Postgres>,
163{
164    sqlx::query(
165        r#"
166                                            INSERT INTO step_outputs (step_instance_id, key, value, is_sensitive)
167                                            VALUES ($1, $2, $3, $4)
168                                            ON CONFLICT (step_instance_id, key) DO UPDATE SET value = EXCLUDED.value
169                                            "#,
170    )
171    .bind(step_instance_id)
172    .bind(key)
173    .bind(value)
174    .bind(is_sensitive)
175    .execute(executor)
176    .await
177}
178
179#[allow(clippy::too_many_arguments)]
180/// Update step instance status.
181pub async fn update_step_instance_status<'a, E>(
182    executor: E,
183    status: &StepStatus,
184    id: Uuid,
185) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
186where
187    E: Executor<'a, Database = Postgres>,
188{
189    sqlx::query("UPDATE step_instances SET status = $1 WHERE id = $2")
190        .bind(status)
191        .bind(id)
192        .execute(executor)
193        .await
194}
195
196#[allow(clippy::too_many_arguments)]
197/// Get step spec and params.
198pub async fn get_step_spec_and_params<'a, E, O>(executor: E, id: Uuid) -> Result<O, sqlx::Error>
199where
200    E: Executor<'a, Database = Postgres>,
201    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
202{
203    sqlx::query_as::<_, O>("SELECT spec, params FROM step_instances WHERE id = $1")
204        .bind(id)
205        .fetch_one(executor)
206        .await
207}
208
209#[allow(clippy::too_many_arguments)]
210/// Fail step instance with error.
211pub async fn fail_step_instance_with_error<'a, E>(
212    executor: E,
213    status: StepStatus,
214    error: &str,
215    exit_code: Option<i32>,
216    id: Uuid,
217) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
218where
219    E: Executor<'a, Database = Postgres>,
220{
221    sqlx::query(
222        r#"
223        UPDATE step_instances
224        SET status = $1, finished_at = NOW(), error = $2, exit_code = $3
225        WHERE id = $4
226        "#,
227    )
228    .bind(status)
229    .bind(error)
230    .bind(exit_code)
231    .bind(id)
232    .execute(executor)
233    .await
234}
235
236#[allow(clippy::too_many_arguments)]
237/// Get step outputs for run.
238pub async fn get_step_outputs_for_run<'a, E, O>(
239    executor: E,
240    run_id: Uuid,
241) -> Result<Vec<O>, sqlx::Error>
242where
243    E: Executor<'a, Database = Postgres>,
244    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
245{
246    sqlx::query_as::<_, O>(
247        r#"
248        SELECT s.step_name, o.key, o.value
249        FROM step_outputs o
250        JOIN step_instances s ON o.step_instance_id = s.id
251        WHERE s.run_id = $1
252        "#,
253    )
254    .bind(run_id)
255    .fetch_all(executor)
256    .await
257}
258
259/// Record step status history.
260pub async fn record_step_status_history<'a, E>(
261    executor: E,
262    step_instance_id: Uuid,
263    status: &StepStatus,
264) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
265where
266    E: Executor<'a, Database = Postgres>,
267{
268    sqlx::query("INSERT INTO step_status_history (step_instance_id, status) VALUES ($1, $2)")
269        .bind(step_instance_id)
270        .bind(status)
271        .execute(executor)
272        .await
273}
274
275#[allow(clippy::too_many_arguments)]
276/// Insert step instance.
277pub async fn insert_step_instance<'a, E>(
278    executor: E,
279    id: Uuid,
280    run_id: Uuid,
281    step_name: &str,
282    step_type: &str,
283    status: StepStatus,
284    created_at: DateTime<Utc>,
285) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
286where
287    E: Executor<'a, Database = Postgres>,
288{
289    sqlx::query(
290        r#"
291                WITH inserted AS (
292                    INSERT INTO step_instances (id, run_id, step_name, step_type, status, created_at)
293                    VALUES ($1, $2, $3, $4, $5, $6)
294                    ON CONFLICT DO NOTHING
295                    RETURNING id
296                )
297                INSERT INTO step_status_history (step_instance_id, status)
298                SELECT id, $5 FROM inserted
299                "#,
300    )
301    .bind(id)
302    .bind(run_id)
303    .bind(step_name)
304    .bind(step_type)
305    .bind(status)
306    .bind(created_at)
307    .execute(executor)
308    .await
309}
310
311#[allow(clippy::too_many_arguments)]
312/// Count running steps for run.
313pub async fn count_running_steps_for_run<'a, E, O>(
314    executor: E,
315    run_id: Uuid,
316) -> Result<O, sqlx::Error>
317where
318    E: Executor<'a, Database = Postgres>,
319    O: Send + Unpin,
320    (O,): for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
321{
322    sqlx::query_scalar::<_, O>(
323        r#"SELECT COUNT(*) FROM step_instances WHERE run_id = $1 AND status = 'running'"#,
324    )
325    .bind(run_id)
326    .fetch_one(executor)
327    .await
328}
329
330#[allow(clippy::too_many_arguments)]
331/// Insert step instance with spec.
332pub async fn insert_step_instance_with_spec<'a, E>(
333    executor: E,
334    id: Uuid,
335    run_id: Uuid,
336    step_name: &str,
337    step_type: &str,
338    status: StepStatus,
339    iteration_index: Option<i32>,
340    spec: Value,
341    params: Value,
342    created_at: DateTime<Utc>,
343) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
344where
345    E: Executor<'a, Database = Postgres>,
346{
347    sqlx::query(
348        r#"
349                WITH inserted AS (
350                    INSERT INTO step_instances (id, run_id, step_name, step_type, status, iteration_index, spec, params, created_at)
351                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
352                    RETURNING id
353                )
354                INSERT INTO step_status_history (step_instance_id, status)
355                SELECT id, $5 FROM inserted
356                "#,
357    )
358    .bind(id)
359    .bind(run_id)
360    .bind(step_name)
361    .bind(step_type)
362    .bind(status)
363    .bind(iteration_index)
364    .bind(spec)
365    .bind(params)
366    .bind(created_at)
367    .execute(executor)
368    .await
369}
370
371#[allow(clippy::too_many_arguments)]
372/// Insert step instance with spec on conflict do nothing.
373pub async fn insert_step_instance_with_spec_on_conflict_do_nothing<'a, E>(
374    executor: E,
375    id: Uuid,
376    run_id: Uuid,
377    step_name: &str,
378    step_type: &str,
379    status: StepStatus,
380    iteration_index: Option<i32>,
381    spec: Value,
382    params: Value,
383    created_at: DateTime<Utc>,
384) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
385where
386    E: Executor<'a, Database = Postgres>,
387{
388    sqlx::query(
389        r#"
390            WITH inserted AS (
391                INSERT INTO step_instances (id, run_id, step_name, step_type, status, iteration_index, spec, params, created_at)
392                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
393                ON CONFLICT DO NOTHING
394                RETURNING id
395            )
396            INSERT INTO step_status_history (step_instance_id, status)
397            SELECT id, $5 FROM inserted
398            "#,
399    )
400    .bind(id)
401    .bind(run_id)
402    .bind(step_name)
403    .bind(step_type)
404    .bind(status)
405    .bind(iteration_index)
406    .bind(spec)
407    .bind(params)
408    .bind(created_at)
409    .execute(executor)
410    .await
411}
412
413#[allow(clippy::too_many_arguments)]
414/// Get wasm step definition.
415pub async fn get_wasm_step_definition<'a, E, O>(
416    executor: E,
417    step_type: &str,
418) -> Result<Option<O>, sqlx::Error>
419where
420    E: Executor<'a, Database = Postgres>,
421    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
422{
423    sqlx::query_as::<_, O>(
424        "SELECT wasm_module, wasm_function, wasm_config FROM step_definitions WHERE step_type = $1 AND wasm_module IS NOT NULL"
425    )
426    .bind(step_type)
427    .fetch_optional(executor)
428    .await
429}
430
431#[allow(clippy::too_many_arguments)]
432/// Get pending step instances for run.
433pub async fn get_pending_step_instances_for_run<'a, E, O>(
434    executor: E,
435    run_id: Uuid,
436    limit: i64,
437) -> Result<Vec<O>, sqlx::Error>
438where
439    E: Executor<'a, Database = Postgres>,
440    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
441{
442    sqlx::query_as::<_, O>(
443        r#"
444        SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at
445        FROM step_instances
446        WHERE run_id = $1 AND status = 'pending' AND step_type NOT IN ('Approval', 'Wait')
447        ORDER BY created_at ASC
448        LIMIT $2
449        "#
450    )
451    .bind(run_id)
452    .bind(limit)
453    .fetch_all(executor)
454    .await
455}
456
457#[allow(clippy::too_many_arguments)]
458/// Get step instance by id.
459pub async fn get_step_instance_by_id<'a, E, O>(
460    executor: E,
461    id: Uuid,
462) -> Result<Option<O>, sqlx::Error>
463where
464    E: Executor<'a, Database = Postgres>,
465    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
466{
467    sqlx::query_as::<_, O>(
468        r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE id = $1"#
469    )
470    .bind(id)
471    .fetch_optional(executor)
472    .await
473}
474
475#[allow(clippy::too_many_arguments)]
476/// Fail pending steps for run on timeout.
477pub async fn fail_pending_steps_for_run_on_timeout<'a, E>(
478    executor: E,
479    run_id: Uuid,
480) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
481where
482    E: Executor<'a, Database = Postgres>,
483{
484    sqlx::query(
485        r#"
486        UPDATE step_instances
487        SET status = 'failed', error = 'Workflow timed out', finished_at = NOW()
488        WHERE run_id = $1 AND status IN ('pending', 'running', 'waiting_for_event')
489        "#,
490    )
491    .bind(run_id)
492    .execute(executor)
493    .await
494}
495
496#[allow(clippy::too_many_arguments)]
497/// Update step instance running.
498pub async fn update_step_instance_running<'a, E>(
499    executor: E,
500    status: &StepStatus,
501    runner_id: Option<&str>,
502    id: Uuid,
503) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
504where
505    E: Executor<'a, Database = Postgres>,
506{
507    sqlx::query(
508        "UPDATE step_instances SET status = $1, started_at = COALESCE(started_at, NOW()), runner_id = COALESCE($2, runner_id) WHERE id = $3"
509    )
510    .bind(status)
511    .bind(runner_id)
512    .bind(id)
513    .execute(executor)
514    .await
515}
516
517#[allow(clippy::too_many_arguments)]
518/// Update step instance terminal.
519pub async fn update_step_instance_terminal<'a, E>(
520    executor: E,
521    status: &StepStatus,
522    id: Uuid,
523) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
524where
525    E: Executor<'a, Database = Postgres>,
526{
527    sqlx::query("UPDATE step_instances SET status = $1, finished_at = NOW() WHERE id = $2")
528        .bind(status)
529        .bind(id)
530        .execute(executor)
531        .await
532}
533
534/// Get test summaries for run.
535pub async fn get_test_summaries_for_run<'a, E>(
536    executor: E,
537    run_id: Uuid,
538) -> Result<Vec<test_report::TestSummary>, sqlx::Error>
539where
540    E: Executor<'a, Database = Postgres>,
541{
542    sqlx::query_as(
543        r#"
544        WITH combined AS (
545            SELECT * FROM step_test_summaries
546            UNION ALL
547            SELECT * FROM archived_step_test_summaries
548        )
549        SELECT * FROM combined WHERE run_id = $1 ORDER BY created_at ASC
550        "#,
551    )
552    .bind(run_id)
553    .fetch_all(executor)
554    .await
555}
556
557/// Get test cases for report.
558pub async fn get_test_cases_for_report<'a, E>(
559    executor: E,
560    run_id: Uuid,
561    report_name: &str,
562) -> Result<Vec<test_report::TestCase>, sqlx::Error>
563where
564    E: Executor<'a, Database = Postgres>,
565{
566    sqlx::query_as(
567        r#"
568        WITH combined AS (
569            SELECT id, run_id, step_instance_id, report_name, test_suite, test_case, status::text as status, duration_ms, message, created_at FROM step_test_cases
570            UNION ALL
571            SELECT id, run_id, step_instance_id, report_name, test_suite, test_case, status::text as status, duration_ms, message, created_at FROM archived_step_test_cases
572        )
573        SELECT id, run_id, step_instance_id, report_name, test_suite, test_case, status::test_case_status as status, duration_ms, message, created_at
574        FROM combined WHERE run_id = $1 AND report_name = $2 ORDER BY created_at ASC
575        "#,
576    )
577    .bind(run_id)
578    .bind(report_name)
579    .fetch_all(executor)
580    .await
581}
582
583/// Get step type and spec.
584pub async fn get_step_type_and_spec<'a, E, O>(executor: E, id: Uuid) -> Result<O, sqlx::Error>
585where
586    E: Executor<'a, Database = Postgres>,
587    O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
588{
589    sqlx::query_as::<_, O>("SELECT step_type, spec FROM step_instances WHERE id = $1")
590        .bind(id)
591        .fetch_one(executor)
592        .await
593}