Skip to main content

stormchaser_api/db/
steps.rs

1use sqlx::PgPool;
2use stormchaser_model::step::StepStatus;
3use stormchaser_model::RunId;
4use stormchaser_model::StepInstanceId;
5
6use stormchaser_model::step;
7
8/// Retrieves step instances for a specific run.
9/// Get step instances.
10pub async fn get_step_instances(
11    pool: &PgPool,
12    run_id: RunId,
13) -> Result<Vec<step::StepInstance>, sqlx::Error> {
14    sqlx::query_as(
15        "SELECT * FROM combined_step_instances WHERE run_id = $1 ORDER BY created_at ASC",
16    )
17    .bind(run_id)
18    .fetch_all(pool)
19    .await
20}
21
22/// Retrieves a single step instance by its UUID and run ID.
23pub async fn get_step_instance_by_id(
24    pool: &PgPool,
25    run_id: RunId,
26    step_id: StepInstanceId,
27) -> Result<Option<step::StepInstance>, sqlx::Error> {
28    sqlx::query_as("SELECT * FROM combined_step_instances WHERE run_id = $1 AND id = $2")
29        .bind(run_id)
30        .bind(step_id)
31        .fetch_optional(pool)
32        .await
33}
34
35/// Retrieves the outputs for a specific step instance.
36/// Get step outputs.
37pub async fn get_step_outputs(
38    pool: &PgPool,
39    step_instance_id: StepInstanceId,
40) -> Result<Vec<step::StepOutput>, sqlx::Error> {
41    sqlx::query_as("SELECT * FROM combined_step_outputs WHERE step_instance_id = $1")
42        .bind(step_instance_id)
43        .fetch_all(pool)
44        .await
45}
46
47/// Retrieves the status history for a specific step instance.
48/// Get step status history.
49pub async fn get_step_status_history(
50    pool: &PgPool,
51    step_instance_id: StepInstanceId,
52) -> Result<Vec<step::StepStatusHistory>, sqlx::Error> {
53    sqlx::query_as("SELECT * FROM combined_step_status_history WHERE step_instance_id = $1 ORDER BY created_at ASC")
54        .bind(step_instance_id)
55        .fetch_all(pool)
56        .await
57}
58
59/// Retrieves a step ID by its name and run ID.
60/// Get step id by name.
61pub async fn get_step_id_by_name(
62    pool: &PgPool,
63    run_id: RunId,
64    step_name: &str,
65) -> Result<Option<stormchaser_model::StepInstanceId>, sqlx::Error> {
66    sqlx::query_scalar("SELECT id FROM step_instances WHERE run_id = $1 AND step_name = $2 LIMIT 1")
67        .bind(run_id)
68        .bind(step_name)
69        .fetch_optional(pool)
70        .await
71}
72
73/// Retrieves step names and their IDs for a workflow run.
74/// Get step names.
75pub async fn get_step_names(
76    pool: &PgPool,
77    run_id: RunId,
78) -> Result<Vec<(stormchaser_model::StepInstanceId, String)>, sqlx::Error> {
79    sqlx::query_as("SELECT id, step_name FROM combined_step_instances WHERE run_id = $1")
80        .bind(run_id)
81        .fetch_all(pool)
82        .await
83}
84
85/// Retrieves combined step statuses for a workflow run.
86/// Get combined step statuses.
87pub async fn get_combined_step_statuses(
88    pool: &PgPool,
89    run_id: RunId,
90) -> Result<Vec<(stormchaser_model::StepInstanceId, String, StepStatus)>, sqlx::Error> {
91    sqlx::query_as(
92        r#"SELECT id, step_name, status as "status: StepStatus" FROM combined_step_instances WHERE run_id = $1"#,
93    )
94    .bind(run_id)
95    .fetch_all(pool)
96    .await
97}
98
99/// Retrieves a step instance for human approval.
100/// Get step instance for approval.
101pub async fn get_step_instance_for_approval(
102    pool: &PgPool,
103    step_id: StepInstanceId,
104    run_id: RunId,
105) -> Result<Option<step::StepInstance>, sqlx::Error> {
106    sqlx::query_as(
107        r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE id = $1 AND run_id = $2"#,
108    )
109    .bind(step_id)
110    .bind(run_id)
111    .fetch_optional(pool)
112    .await
113}