1use chrono::{DateTime, Utc};
2use serde_json::Value;
3use sqlx::{Executor, Postgres};
4use stormchaser_model::step::StepStatus;
5use uuid::Uuid;
6
7use stormchaser_model::test_report;
8
9pub struct StepDefinitionInput {
11 pub step_type: String,
13 pub schema: Value,
15 pub documentation: Option<String>,
17}
18
19#[allow(clippy::too_many_arguments)]
20pub async fn upsert_step_definition<'a, E>(
22 executor: E,
23 step_type: &str,
24 schema: &Value,
25 documentation: Option<&str>,
26) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
27where
28 E: Executor<'a, Database = Postgres>,
29{
30 sqlx::query(
31 r#"
32 INSERT INTO step_definitions (step_type, schema, documentation, registered_at)
33 VALUES ($1, $2, $3, NOW())
34 ON CONFLICT (step_type) DO UPDATE SET
35 schema = EXCLUDED.schema,
36 documentation = EXCLUDED.documentation
37 "#,
38 )
39 .bind(step_type)
40 .bind(schema)
41 .bind(documentation)
42 .execute(executor)
43 .await
44}
45
46#[allow(clippy::too_many_arguments)]
47pub async fn upsert_step_definition_with_wasm<'a, E>(
49 executor: E,
50 step_type: &str,
51 schema: &Value,
52 documentation: Option<&str>,
53 wasm_module: &str,
54 wasm_function: &str,
55 wasm_config: &Value,
56) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
57where
58 E: Executor<'a, Database = Postgres>,
59{
60 sqlx::query(
61 r#"
62 INSERT INTO step_definitions (step_type, schema, documentation, registered_at, wasm_module, wasm_function, wasm_config)
63 VALUES ($1, $2, $3, NOW(), $4, $5, $6)
64 ON CONFLICT (step_type) DO UPDATE SET
65 schema = EXCLUDED.schema,
66 documentation = EXCLUDED.documentation,
67 wasm_module = EXCLUDED.wasm_module,
68 wasm_function = EXCLUDED.wasm_function,
69 wasm_config = EXCLUDED.wasm_config
70 "#,
71 )
72 .bind(step_type)
73 .bind(schema)
74 .bind(documentation)
75 .bind(wasm_module)
76 .bind(wasm_function)
77 .bind(wasm_config)
78 .execute(executor)
79 .await
80}
81
82#[allow(clippy::too_many_arguments)]
83pub async fn complete_step_instance<'a, E>(
85 executor: E,
86 status: &StepStatus,
87 exit_code: Option<i32>,
88 runner_id: Option<&str>,
89 id: Uuid,
90) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
91where
92 E: Executor<'a, Database = Postgres>,
93{
94 sqlx::query(
95 r#"
96 UPDATE step_instances
97 SET status = $1, finished_at = NOW(), exit_code = $2, runner_id = COALESCE($3, runner_id)
98 WHERE id = $4
99 "#,
100 )
101 .bind(status)
102 .bind(exit_code)
103 .bind(runner_id)
104 .bind(id)
105 .execute(executor)
106 .await
107}
108
109#[allow(clippy::too_many_arguments)]
110pub async fn get_step_instances_by_run_id<'a, E, O>(
112 executor: E,
113 run_id: Uuid,
114) -> Result<Vec<O>, sqlx::Error>
115where
116 E: Executor<'a, Database = Postgres>,
117 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
118{
119 sqlx::query_as::<_, O>(
120 r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE run_id = $1"#,
121 )
122 .bind(run_id)
123 .fetch_all(executor)
124 .await
125}
126
127#[allow(clippy::too_many_arguments)]
128pub async fn upsert_step_output<'a, E>(
130 executor: E,
131 step_instance_id: Uuid,
132 key: &str,
133 value: &Value,
134) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
135where
136 E: Executor<'a, Database = Postgres>,
137{
138 sqlx::query(
139 r#"
140 INSERT INTO step_outputs (step_instance_id, key, value)
141 VALUES ($1, $2, $3)
142 ON CONFLICT (step_instance_id, key) DO UPDATE SET value = EXCLUDED.value
143 "#,
144 )
145 .bind(step_instance_id)
146 .bind(key)
147 .bind(value)
148 .execute(executor)
149 .await
150}
151
152#[allow(clippy::too_many_arguments)]
153pub async fn upsert_step_output_with_sensitivity<'a, E>(
155 executor: E,
156 step_instance_id: Uuid,
157 key: &str,
158 value: &Value,
159 is_sensitive: bool,
160) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
161where
162 E: Executor<'a, Database = Postgres>,
163{
164 sqlx::query(
165 r#"
166 INSERT INTO step_outputs (step_instance_id, key, value, is_sensitive)
167 VALUES ($1, $2, $3, $4)
168 ON CONFLICT (step_instance_id, key) DO UPDATE SET value = EXCLUDED.value
169 "#,
170 )
171 .bind(step_instance_id)
172 .bind(key)
173 .bind(value)
174 .bind(is_sensitive)
175 .execute(executor)
176 .await
177}
178
179#[allow(clippy::too_many_arguments)]
180pub async fn update_step_instance_status<'a, E>(
182 executor: E,
183 status: &StepStatus,
184 id: Uuid,
185) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
186where
187 E: Executor<'a, Database = Postgres>,
188{
189 sqlx::query("UPDATE step_instances SET status = $1 WHERE id = $2")
190 .bind(status)
191 .bind(id)
192 .execute(executor)
193 .await
194}
195
196#[allow(clippy::too_many_arguments)]
197pub async fn get_step_spec_and_params<'a, E, O>(executor: E, id: Uuid) -> Result<O, sqlx::Error>
199where
200 E: Executor<'a, Database = Postgres>,
201 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
202{
203 sqlx::query_as::<_, O>("SELECT spec, params FROM step_instances WHERE id = $1")
204 .bind(id)
205 .fetch_one(executor)
206 .await
207}
208
209#[allow(clippy::too_many_arguments)]
210pub async fn fail_step_instance_with_error<'a, E>(
212 executor: E,
213 status: StepStatus,
214 error: &str,
215 exit_code: Option<i32>,
216 id: Uuid,
217) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
218where
219 E: Executor<'a, Database = Postgres>,
220{
221 sqlx::query(
222 r#"
223 UPDATE step_instances
224 SET status = $1, finished_at = NOW(), error = $2, exit_code = $3
225 WHERE id = $4
226 "#,
227 )
228 .bind(status)
229 .bind(error)
230 .bind(exit_code)
231 .bind(id)
232 .execute(executor)
233 .await
234}
235
236#[allow(clippy::too_many_arguments)]
237pub async fn get_step_outputs_for_run<'a, E, O>(
239 executor: E,
240 run_id: Uuid,
241) -> Result<Vec<O>, sqlx::Error>
242where
243 E: Executor<'a, Database = Postgres>,
244 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
245{
246 sqlx::query_as::<_, O>(
247 r#"
248 SELECT s.step_name, o.key, o.value
249 FROM step_outputs o
250 JOIN step_instances s ON o.step_instance_id = s.id
251 WHERE s.run_id = $1
252 "#,
253 )
254 .bind(run_id)
255 .fetch_all(executor)
256 .await
257}
258
259pub async fn record_step_status_history<'a, E>(
261 executor: E,
262 step_instance_id: Uuid,
263 status: &StepStatus,
264) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
265where
266 E: Executor<'a, Database = Postgres>,
267{
268 sqlx::query("INSERT INTO step_status_history (step_instance_id, status) VALUES ($1, $2)")
269 .bind(step_instance_id)
270 .bind(status)
271 .execute(executor)
272 .await
273}
274
275#[allow(clippy::too_many_arguments)]
276pub async fn insert_step_instance<'a, E>(
278 executor: E,
279 id: Uuid,
280 run_id: Uuid,
281 step_name: &str,
282 step_type: &str,
283 status: StepStatus,
284 created_at: DateTime<Utc>,
285) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
286where
287 E: Executor<'a, Database = Postgres>,
288{
289 sqlx::query(
290 r#"
291 WITH inserted AS (
292 INSERT INTO step_instances (id, run_id, step_name, step_type, status, created_at)
293 VALUES ($1, $2, $3, $4, $5, $6)
294 ON CONFLICT DO NOTHING
295 RETURNING id
296 )
297 INSERT INTO step_status_history (step_instance_id, status)
298 SELECT id, $5 FROM inserted
299 "#,
300 )
301 .bind(id)
302 .bind(run_id)
303 .bind(step_name)
304 .bind(step_type)
305 .bind(status)
306 .bind(created_at)
307 .execute(executor)
308 .await
309}
310
311#[allow(clippy::too_many_arguments)]
312pub async fn count_running_steps_for_run<'a, E, O>(
314 executor: E,
315 run_id: Uuid,
316) -> Result<O, sqlx::Error>
317where
318 E: Executor<'a, Database = Postgres>,
319 O: Send + Unpin,
320 (O,): for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
321{
322 sqlx::query_scalar::<_, O>(
323 r#"SELECT COUNT(*) FROM step_instances WHERE run_id = $1 AND status = 'running'"#,
324 )
325 .bind(run_id)
326 .fetch_one(executor)
327 .await
328}
329
330#[allow(clippy::too_many_arguments)]
331pub async fn insert_step_instance_with_spec<'a, E>(
333 executor: E,
334 id: Uuid,
335 run_id: Uuid,
336 step_name: &str,
337 step_type: &str,
338 status: StepStatus,
339 iteration_index: Option<i32>,
340 spec: Value,
341 params: Value,
342 created_at: DateTime<Utc>,
343) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
344where
345 E: Executor<'a, Database = Postgres>,
346{
347 sqlx::query(
348 r#"
349 WITH inserted AS (
350 INSERT INTO step_instances (id, run_id, step_name, step_type, status, iteration_index, spec, params, created_at)
351 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
352 RETURNING id
353 )
354 INSERT INTO step_status_history (step_instance_id, status)
355 SELECT id, $5 FROM inserted
356 "#,
357 )
358 .bind(id)
359 .bind(run_id)
360 .bind(step_name)
361 .bind(step_type)
362 .bind(status)
363 .bind(iteration_index)
364 .bind(spec)
365 .bind(params)
366 .bind(created_at)
367 .execute(executor)
368 .await
369}
370
371#[allow(clippy::too_many_arguments)]
372pub async fn insert_step_instance_with_spec_on_conflict_do_nothing<'a, E>(
374 executor: E,
375 id: Uuid,
376 run_id: Uuid,
377 step_name: &str,
378 step_type: &str,
379 status: StepStatus,
380 iteration_index: Option<i32>,
381 spec: Value,
382 params: Value,
383 created_at: DateTime<Utc>,
384) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
385where
386 E: Executor<'a, Database = Postgres>,
387{
388 sqlx::query(
389 r#"
390 WITH inserted AS (
391 INSERT INTO step_instances (id, run_id, step_name, step_type, status, iteration_index, spec, params, created_at)
392 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
393 ON CONFLICT DO NOTHING
394 RETURNING id
395 )
396 INSERT INTO step_status_history (step_instance_id, status)
397 SELECT id, $5 FROM inserted
398 "#,
399 )
400 .bind(id)
401 .bind(run_id)
402 .bind(step_name)
403 .bind(step_type)
404 .bind(status)
405 .bind(iteration_index)
406 .bind(spec)
407 .bind(params)
408 .bind(created_at)
409 .execute(executor)
410 .await
411}
412
413#[allow(clippy::too_many_arguments)]
414pub async fn get_wasm_step_definition<'a, E, O>(
416 executor: E,
417 step_type: &str,
418) -> Result<Option<O>, sqlx::Error>
419where
420 E: Executor<'a, Database = Postgres>,
421 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
422{
423 sqlx::query_as::<_, O>(
424 "SELECT wasm_module, wasm_function, wasm_config FROM step_definitions WHERE step_type = $1 AND wasm_module IS NOT NULL"
425 )
426 .bind(step_type)
427 .fetch_optional(executor)
428 .await
429}
430
431#[allow(clippy::too_many_arguments)]
432pub async fn get_pending_step_instances_for_run<'a, E, O>(
434 executor: E,
435 run_id: Uuid,
436 limit: i64,
437) -> Result<Vec<O>, sqlx::Error>
438where
439 E: Executor<'a, Database = Postgres>,
440 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
441{
442 sqlx::query_as::<_, O>(
443 r#"
444 SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at
445 FROM step_instances
446 WHERE run_id = $1 AND status = 'pending' AND step_type NOT IN ('Approval', 'Wait')
447 ORDER BY created_at ASC
448 LIMIT $2
449 "#
450 )
451 .bind(run_id)
452 .bind(limit)
453 .fetch_all(executor)
454 .await
455}
456
457#[allow(clippy::too_many_arguments)]
458pub async fn get_step_instance_by_id<'a, E, O>(
460 executor: E,
461 id: Uuid,
462) -> Result<Option<O>, sqlx::Error>
463where
464 E: Executor<'a, Database = Postgres>,
465 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
466{
467 sqlx::query_as::<_, O>(
468 r#"SELECT id, run_id, step_name, step_type, status as "status", iteration_index, runner_id, affinity_context, started_at, finished_at, exit_code, error, spec, params, created_at FROM step_instances WHERE id = $1"#
469 )
470 .bind(id)
471 .fetch_optional(executor)
472 .await
473}
474
475#[allow(clippy::too_many_arguments)]
476pub async fn fail_pending_steps_for_run_on_timeout<'a, E>(
478 executor: E,
479 run_id: Uuid,
480) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
481where
482 E: Executor<'a, Database = Postgres>,
483{
484 sqlx::query(
485 r#"
486 UPDATE step_instances
487 SET status = 'failed', error = 'Workflow timed out', finished_at = NOW()
488 WHERE run_id = $1 AND status IN ('pending', 'running', 'waiting_for_event')
489 "#,
490 )
491 .bind(run_id)
492 .execute(executor)
493 .await
494}
495
496#[allow(clippy::too_many_arguments)]
497pub async fn update_step_instance_running<'a, E>(
499 executor: E,
500 status: &StepStatus,
501 runner_id: Option<&str>,
502 id: Uuid,
503) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
504where
505 E: Executor<'a, Database = Postgres>,
506{
507 sqlx::query(
508 "UPDATE step_instances SET status = $1, started_at = COALESCE(started_at, NOW()), runner_id = COALESCE($2, runner_id) WHERE id = $3"
509 )
510 .bind(status)
511 .bind(runner_id)
512 .bind(id)
513 .execute(executor)
514 .await
515}
516
517#[allow(clippy::too_many_arguments)]
518pub async fn update_step_instance_terminal<'a, E>(
520 executor: E,
521 status: &StepStatus,
522 id: Uuid,
523) -> Result<sqlx::postgres::PgQueryResult, sqlx::Error>
524where
525 E: Executor<'a, Database = Postgres>,
526{
527 sqlx::query("UPDATE step_instances SET status = $1, finished_at = NOW() WHERE id = $2")
528 .bind(status)
529 .bind(id)
530 .execute(executor)
531 .await
532}
533
534pub async fn get_test_summaries_for_run<'a, E>(
536 executor: E,
537 run_id: Uuid,
538) -> Result<Vec<test_report::TestSummary>, sqlx::Error>
539where
540 E: Executor<'a, Database = Postgres>,
541{
542 sqlx::query_as(
543 r#"
544 WITH combined AS (
545 SELECT * FROM step_test_summaries
546 UNION ALL
547 SELECT * FROM archived_step_test_summaries
548 )
549 SELECT * FROM combined WHERE run_id = $1 ORDER BY created_at ASC
550 "#,
551 )
552 .bind(run_id)
553 .fetch_all(executor)
554 .await
555}
556
557pub async fn get_test_cases_for_report<'a, E>(
559 executor: E,
560 run_id: Uuid,
561 report_name: &str,
562) -> Result<Vec<test_report::TestCase>, sqlx::Error>
563where
564 E: Executor<'a, Database = Postgres>,
565{
566 sqlx::query_as(
567 r#"
568 WITH combined AS (
569 SELECT id, run_id, step_instance_id, report_name, test_suite, test_case, status::text as status, duration_ms, message, created_at FROM step_test_cases
570 UNION ALL
571 SELECT id, run_id, step_instance_id, report_name, test_suite, test_case, status::text as status, duration_ms, message, created_at FROM archived_step_test_cases
572 )
573 SELECT id, run_id, step_instance_id, report_name, test_suite, test_case, status::test_case_status as status, duration_ms, message, created_at
574 FROM combined WHERE run_id = $1 AND report_name = $2 ORDER BY created_at ASC
575 "#,
576 )
577 .bind(run_id)
578 .bind(report_name)
579 .fetch_all(executor)
580 .await
581}
582
583pub async fn get_step_type_and_spec<'a, E, O>(executor: E, id: Uuid) -> Result<O, sqlx::Error>
585where
586 E: Executor<'a, Database = Postgres>,
587 O: Send + Unpin + for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
588{
589 sqlx::query_as::<_, O>("SELECT step_type, spec FROM step_instances WHERE id = $1")
590 .bind(id)
591 .fetch_one(executor)
592 .await
593}