1use crate::{ListRunsQuery, WorkflowRunDetail};
2use serde_json::Value;
3use sqlx::{PgPool, Postgres, Transaction};
4use stormchaser_model::workflow::RunStatus;
5use stormchaser_model::RunId;
6
7#[allow(clippy::too_many_arguments)]
9pub async fn insert_workflow_run(
11 tx: &mut Transaction<'_, Postgres>,
12 run_id: RunId,
13 workflow_name: &str,
14 initiating_user: &str,
15 repo_url: &str,
16 workflow_path: &str,
17 git_ref: &str,
18 status: RunStatus,
19 fencing_token: i64,
20) -> Result<(), sqlx::Error> {
21 sqlx::query(
22 r#"
23 INSERT INTO workflow_runs (id, workflow_name, initiating_user, repo_url, workflow_path, git_ref, status, fencing_token)
24 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
25 "#,
26 )
27 .bind(run_id)
28 .bind(workflow_name)
29 .bind(initiating_user)
30 .bind(repo_url)
31 .bind(workflow_path)
32 .bind(git_ref)
33 .bind(status)
34 .bind(fencing_token)
35 .execute(&mut **tx)
36 .await?;
37 Ok(())
38}
39
40pub async fn insert_run_context(
43 tx: &mut Transaction<'_, Postgres>,
44 run_id: RunId,
45 dsl_version: &str,
46 workflow_definition: Value,
47 source_code: &str,
48 inputs: &Value,
49) -> Result<(), sqlx::Error> {
50 sqlx::query(
51 r#"
52 INSERT INTO run_contexts (run_id, dsl_version, workflow_definition, source_code, inputs)
53 VALUES ($1, $2, $3, $4, $5)
54 "#,
55 )
56 .bind(run_id)
57 .bind(dsl_version)
58 .bind(workflow_definition)
59 .bind(source_code)
60 .bind(inputs)
61 .execute(&mut **tx)
62 .await?;
63 Ok(())
64}
65
66pub async fn insert_run_quotas(
69 tx: &mut Transaction<'_, Postgres>,
70 run_id: RunId,
71 max_concurrency: i32,
72 max_cpu: &str,
73 max_memory: &str,
74 max_storage: &str,
75 timeout: &str,
76) -> Result<(), sqlx::Error> {
77 sqlx::query(
78 r#"
79 INSERT INTO run_quotas (run_id, max_concurrency, max_cpu, max_memory, max_storage, timeout)
80 VALUES ($1, $2, $3, $4, $5, $6)
81 "#,
82 )
83 .bind(run_id)
84 .bind(max_concurrency)
85 .bind(max_cpu)
86 .bind(max_memory)
87 .bind(max_storage)
88 .bind(timeout)
89 .execute(&mut **tx)
90 .await?;
91 Ok(())
92}
93
94pub async fn list_workflow_runs(
96 pool: &PgPool,
97 params: &ListRunsQuery,
98 limit: i64,
99 offset: i64,
100) -> Result<Vec<WorkflowRunDetail>, sqlx::Error> {
101 let mut query = sqlx::QueryBuilder::new(
102 r#"
103 WITH combined_runs AS (
104 SELECT
105 wr.id, wr.workflow_name, wr.initiating_user, wr.repo_url, wr.workflow_path, wr.git_ref,
106 wr.status::run_status as "status", wr.version, wr.created_at, wr.updated_at, wr.started_resolving_at, wr.started_at, wr.finished_at, wr.error,
107 rc.inputs, rc.secrets, rc.source_code, rc.dsl_version
108 FROM workflow_runs wr
109 JOIN run_contexts rc ON wr.id = rc.run_id
110 UNION ALL
111 SELECT
112 wr.id, wr.workflow_name, wr.initiating_user, wr.repo_url, wr.workflow_path, wr.git_ref,
113 wr.status::run_status as "status", wr.version, wr.created_at, wr.updated_at, wr.started_resolving_at, wr.started_at, wr.finished_at, wr.error,
114 rc.inputs, rc.secrets, rc.source_code, rc.dsl_version
115 FROM archived_workflow_runs wr
116 JOIN archived_run_contexts rc ON wr.id = rc.run_id
117 )
118 SELECT * FROM combined_runs wr WHERE 1=1
119 "#,
120 );
121
122 if let Some(name) = ¶ms.workflow_name {
123 query.push(" AND wr.workflow_name LIKE ");
124 query.push_bind(format!("%{}%", name));
125 }
126
127 if let Some(status) = ¶ms.status {
128 query.push(" AND wr.status = ");
129 query.push_bind(status);
130 }
131
132 if let Some(user) = ¶ms.initiating_user {
133 query.push(" AND wr.initiating_user = ");
134 query.push_bind(user);
135 }
136
137 if let Some(repo) = ¶ms.repo_url {
138 query.push(" AND wr.repo_url = ");
139 query.push_bind(repo);
140 }
141
142 if let Some(path) = ¶ms.workflow_path {
143 query.push(" AND wr.workflow_path = ");
144 query.push_bind(path);
145 }
146
147 if let Some(after) = ¶ms.created_after {
148 query.push(" AND wr.created_at >= ");
149 query.push_bind(after);
150 }
151
152 if let Some(before) = ¶ms.created_before {
153 query.push(" AND wr.created_at <= ");
154 query.push_bind(before);
155 }
156
157 query.push(" ORDER BY wr.created_at DESC LIMIT ");
158 query.push_bind(limit);
159 query.push(" OFFSET ");
160 query.push_bind(offset);
161
162 query.build_query_as().fetch_all(pool).await
163}
164
165pub async fn get_workflow_run_detail(
168 pool: &PgPool,
169 run_id: RunId,
170) -> Result<Option<WorkflowRunDetail>, sqlx::Error> {
171 sqlx::query_as("SELECT * FROM combined_run_details WHERE id = $1")
172 .bind(run_id)
173 .fetch_optional(pool)
174 .await
175}
176
177pub async fn get_workflow_run_status(
180 pool: &PgPool,
181 run_id: RunId,
182) -> Result<Option<RunStatus>, sqlx::Error> {
183 sqlx::query_scalar(
184 r#"SELECT status as "status: RunStatus" FROM combined_workflow_runs WHERE id = $1"#,
185 )
186 .bind(run_id)
187 .fetch_optional(pool)
188 .await
189}
190
191pub async fn get_combined_run_status(
194 pool: &PgPool,
195 run_id: RunId,
196) -> Result<Option<RunStatus>, sqlx::Error> {
197 sqlx::query_scalar(
198 r#"SELECT status as "status: RunStatus" FROM combined_workflow_runs WHERE id = $1"#,
199 )
200 .bind(run_id)
201 .fetch_optional(pool)
202 .await
203}
204
205pub async fn delete_workflow_run(pool: &PgPool, id: RunId) -> Result<(), sqlx::Error> {
207 sqlx::query("DELETE FROM workflow_runs WHERE id = $1")
209 .bind(id)
210 .execute(pool)
211 .await?;
212
213 sqlx::query("DELETE FROM archived_workflow_runs WHERE id = $1")
215 .bind(id)
216 .execute(pool)
217 .await?;
218
219 Ok(())
220}