Skip to main content

stormchaser_api/db/
steps.rs

1use sqlx::PgPool;
2use stormchaser_model::RunId;
3use stormchaser_model::StepInstanceId;
4
5use stormchaser_model::step;
6
7/// Retrieves step instances for a specific run.
8/// Get step instances.
9pub async fn get_step_instances(
10    pool: &PgPool,
11    run_id: RunId,
12) -> Result<Vec<step::StepInstance>, sqlx::Error> {
13    sqlx::query_as(
14        "SELECT * FROM combined_step_instances WHERE run_id = $1 ORDER BY created_at ASC",
15    )
16    .bind(run_id)
17    .fetch_all(pool)
18    .await
19}
20
21/// Retrieves a single step instance by its UUID and run ID.
22pub async fn get_step_instance_by_id(
23    pool: &PgPool,
24    run_id: RunId,
25    step_id: StepInstanceId,
26) -> Result<Option<step::StepInstance>, sqlx::Error> {
27    sqlx::query_as("SELECT * FROM combined_step_instances WHERE run_id = $1 AND id = $2")
28        .bind(run_id)
29        .bind(step_id)
30        .fetch_optional(pool)
31        .await
32}
33
34/// Retrieves the outputs for a specific step instance.
35/// Get step outputs.
36pub async fn get_step_outputs(
37    pool: &PgPool,
38    step_instance_id: StepInstanceId,
39) -> Result<Vec<step::StepOutput>, sqlx::Error> {
40    sqlx::query_as("SELECT * FROM combined_step_outputs WHERE step_instance_id = $1")
41        .bind(step_instance_id)
42        .fetch_all(pool)
43        .await
44}
45
46/// Retrieves the status history for a specific step instance.
47/// Get step status history.
48pub async fn get_step_status_history(
49    pool: &PgPool,
50    step_instance_id: StepInstanceId,
51) -> Result<Vec<step::StepStatusHistory>, sqlx::Error> {
52    sqlx::query_as("SELECT * FROM combined_step_status_history WHERE step_instance_id = $1 ORDER BY created_at ASC")
53        .bind(step_instance_id)
54        .fetch_all(pool)
55        .await
56}
57
58/// Retrieves a step ID by its name and run ID.
59/// Get step id by name.
60pub async fn get_step_id_by_name(
61    pool: &PgPool,
62    run_id: RunId,
63    step_name: &str,
64) -> Result<Option<stormchaser_model::StepInstanceId>, sqlx::Error> {
65    sqlx::query_scalar("SELECT id FROM step_instances WHERE run_id = $1 AND step_name = $2 LIMIT 1")
66        .bind(run_id)
67        .bind(step_name)
68        .fetch_optional(pool)
69        .await
70}
71
72/// Retrieves step names and their IDs for a workflow run.
73/// Get step names.
74pub async fn get_step_names(
75    pool: &PgPool,
76    run_id: RunId,
77) -> Result<Vec<(stormchaser_model::StepInstanceId, String)>, sqlx::Error> {
78    sqlx::query_as("SELECT id, step_name FROM combined_step_instances WHERE run_id = $1")
79        .bind(run_id)
80        .fetch_all(pool)
81        .await
82}
83
84/// Retrieves combined step statuses for a workflow run.
85/// Get combined step statuses.
86pub async fn get_combined_step_statuses(
87    pool: &PgPool,
88    run_id: RunId,
89) -> Result<Vec<(stormchaser_model::StepInstanceId, String, String)>, sqlx::Error> {
90    sqlx::query_as(
91        "SELECT id, step_name, status::text FROM combined_step_instances WHERE run_id = $1",
92    )
93    .bind(run_id)
94    .fetch_all(pool)
95    .await
96}
97
98/// Retrieves a step instance for human approval.
99/// Get step instance for approval.
100pub async fn get_step_instance_for_approval(
101    pool: &PgPool,
102    step_id: StepInstanceId,
103    run_id: RunId,
104) -> Result<Option<step::StepInstance>, sqlx::Error> {
105    sqlx::query_as(
106        r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE id = $1 AND run_id = $2"#,
107    )
108    .bind(step_id)
109    .bind(run_id)
110    .fetch_optional(pool)
111    .await
112}