Skip to main content

stormchaser_api/db/
runs.rs

1use crate::{ListRunsQuery, WorkflowRunDetail};
2use serde_json::Value;
3use sqlx::{PgPool, Postgres, Transaction};
4use stormchaser_model::workflow::RunStatus;
5use stormchaser_model::RunId;
6
7/// Inserts a new workflow run into the database.
8#[allow(clippy::too_many_arguments)]
9/// Insert workflow run.
10pub async fn insert_workflow_run(
11    tx: &mut Transaction<'_, Postgres>,
12    run_id: RunId,
13    workflow_name: &str,
14    initiating_user: &str,
15    repo_url: &str,
16    workflow_path: &str,
17    git_ref: &str,
18    status: RunStatus,
19    fencing_token: i64,
20) -> Result<(), sqlx::Error> {
21    sqlx::query(
22        r#"
23        INSERT INTO workflow_runs (id, workflow_name, initiating_user, repo_url, workflow_path, git_ref, status, fencing_token)
24        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
25        "#,
26    )
27    .bind(run_id)
28    .bind(workflow_name)
29    .bind(initiating_user)
30    .bind(repo_url)
31    .bind(workflow_path)
32    .bind(git_ref)
33    .bind(status)
34    .bind(fencing_token)
35    .execute(&mut **tx)
36    .await?;
37    Ok(())
38}
39
40/// Inserts the context details of a workflow run.
41/// Insert run context.
42pub async fn insert_run_context(
43    tx: &mut Transaction<'_, Postgres>,
44    run_id: RunId,
45    dsl_version: &str,
46    workflow_definition: Value,
47    source_code: &str,
48    inputs: &Value,
49) -> Result<(), sqlx::Error> {
50    sqlx::query(
51        r#"
52        INSERT INTO run_contexts (run_id, dsl_version, workflow_definition, source_code, inputs)
53        VALUES ($1, $2, $3, $4, $5)
54        "#,
55    )
56    .bind(run_id)
57    .bind(dsl_version)
58    .bind(workflow_definition)
59    .bind(source_code)
60    .bind(inputs)
61    .execute(&mut **tx)
62    .await?;
63    Ok(())
64}
65
66/// Inserts the resource quotas for a workflow run.
67/// Insert run quotas.
68pub async fn insert_run_quotas(
69    tx: &mut Transaction<'_, Postgres>,
70    run_id: RunId,
71    max_concurrency: i32,
72    max_cpu: &str,
73    max_memory: &str,
74    max_storage: &str,
75    timeout: &str,
76) -> Result<(), sqlx::Error> {
77    sqlx::query(
78        r#"
79        INSERT INTO run_quotas (run_id, max_concurrency, max_cpu, max_memory, max_storage, timeout)
80        VALUES ($1, $2, $3, $4, $5, $6)
81        "#,
82    )
83    .bind(run_id)
84    .bind(max_concurrency)
85    .bind(max_cpu)
86    .bind(max_memory)
87    .bind(max_storage)
88    .bind(timeout)
89    .execute(&mut **tx)
90    .await?;
91    Ok(())
92}
93
94/// Retrieves a list of workflow runs, optionally filtered by the query parameters.
95pub async fn list_workflow_runs(
96    pool: &PgPool,
97    params: &ListRunsQuery,
98    limit: i64,
99    offset: i64,
100) -> Result<Vec<WorkflowRunDetail>, sqlx::Error> {
101    let mut query = sqlx::QueryBuilder::new(
102        r#"
103        WITH combined_runs AS (
104            SELECT
105                wr.id, wr.workflow_name, wr.initiating_user, wr.repo_url, wr.workflow_path, wr.git_ref,
106                wr.status::run_status as "status", wr.version, wr.created_at, wr.updated_at, wr.started_resolving_at, wr.started_at, wr.finished_at, wr.error,
107                rc.inputs, rc.secrets, rc.source_code, rc.dsl_version
108            FROM workflow_runs wr
109            JOIN run_contexts rc ON wr.id = rc.run_id
110            UNION ALL
111            SELECT
112                wr.id, wr.workflow_name, wr.initiating_user, wr.repo_url, wr.workflow_path, wr.git_ref,
113                wr.status::run_status as "status", wr.version, wr.created_at, wr.updated_at, wr.started_resolving_at, wr.started_at, wr.finished_at, wr.error,
114                rc.inputs, rc.secrets, rc.source_code, rc.dsl_version
115            FROM archived_workflow_runs wr
116            JOIN archived_run_contexts rc ON wr.id = rc.run_id
117        )
118        SELECT * FROM combined_runs wr WHERE 1=1
119        "#,
120    );
121
122    if let Some(name) = &params.workflow_name {
123        query.push(" AND wr.workflow_name LIKE ");
124        query.push_bind(format!("%{}%", name));
125    }
126
127    if let Some(status) = &params.status {
128        query.push(" AND wr.status = ");
129        query.push_bind(status);
130    }
131
132    if let Some(user) = &params.initiating_user {
133        query.push(" AND wr.initiating_user = ");
134        query.push_bind(user);
135    }
136
137    if let Some(repo) = &params.repo_url {
138        query.push(" AND wr.repo_url = ");
139        query.push_bind(repo);
140    }
141
142    if let Some(path) = &params.workflow_path {
143        query.push(" AND wr.workflow_path = ");
144        query.push_bind(path);
145    }
146
147    if let Some(after) = &params.created_after {
148        query.push(" AND wr.created_at >= ");
149        query.push_bind(after);
150    }
151
152    if let Some(before) = &params.created_before {
153        query.push(" AND wr.created_at <= ");
154        query.push_bind(before);
155    }
156
157    query.push(" ORDER BY wr.created_at DESC LIMIT ");
158    query.push_bind(limit);
159    query.push(" OFFSET ");
160    query.push_bind(offset);
161
162    query.build_query_as().fetch_all(pool).await
163}
164
165/// Retrieves full details for a workflow run.
166/// Get workflow run detail.
167pub async fn get_workflow_run_detail(
168    pool: &PgPool,
169    run_id: RunId,
170) -> Result<Option<WorkflowRunDetail>, sqlx::Error> {
171    sqlx::query_as("SELECT * FROM combined_run_details WHERE id = $1")
172        .bind(run_id)
173        .fetch_optional(pool)
174        .await
175}
176
177/// Retrieves the status of a workflow run.
178/// Get workflow run status.
179pub async fn get_workflow_run_status(
180    pool: &PgPool,
181    run_id: RunId,
182) -> Result<Option<RunStatus>, sqlx::Error> {
183    sqlx::query_scalar(
184        r#"SELECT status as "status: RunStatus" FROM combined_workflow_runs WHERE id = $1"#,
185    )
186    .bind(run_id)
187    .fetch_optional(pool)
188    .await
189}
190
191/// Retrieves the combined status for a workflow run.
192/// Get combined run status.
193pub async fn get_combined_run_status(
194    pool: &PgPool,
195    run_id: RunId,
196) -> Result<Option<RunStatus>, sqlx::Error> {
197    sqlx::query_scalar(
198        r#"SELECT status as "status: RunStatus" FROM combined_workflow_runs WHERE id = $1"#,
199    )
200    .bind(run_id)
201    .fetch_optional(pool)
202    .await
203}
204
205/// Deletes a workflow run completely from the system (active and archived).
206pub async fn delete_workflow_run(pool: &PgPool, id: RunId) -> Result<(), sqlx::Error> {
207    // Delete from active table
208    sqlx::query("DELETE FROM workflow_runs WHERE id = $1")
209        .bind(id)
210        .execute(pool)
211        .await?;
212
213    // Delete from archived table
214    sqlx::query("DELETE FROM archived_workflow_runs WHERE id = $1")
215        .bind(id)
216        .execute(pool)
217        .await?;
218
219    Ok(())
220}