stormchaser_api/db/
opa.rs1use sqlx::PgPool;
2
3pub async fn get_run_outputs_for_opa(
5 pool: &PgPool,
6 run_id: stormchaser_model::RunId,
7) -> Result<serde_json::Map<String, serde_json::Value>, sqlx::Error> {
8 use sqlx::Row;
9 let outputs_rows = sqlx::query(
10 r#"
11 SELECT i.step_name, o.key as output_key, o.value as output_value
12 FROM combined_step_instances i
13 JOIN combined_step_outputs o ON i.id = o.step_instance_id
14 WHERE i.run_id = $1
15 "#,
16 )
17 .bind(run_id)
18 .fetch_all(pool)
19 .await?;
20
21 let mut run_outputs_map = serde_json::Map::new();
22 for row in outputs_rows {
23 let step_name: String = row.get("step_name");
24 let output_key: String = row.get("output_key");
25 let output_value: serde_json::Value = row.get("output_value");
26
27 if !run_outputs_map.contains_key(&step_name) {
28 run_outputs_map.insert(step_name.clone(), serde_json::json!({"outputs": {}}));
29 }
30 if let Some(step_obj) = run_outputs_map
31 .get_mut(&step_name)
32 .and_then(|v| v.as_object_mut())
33 {
34 if let Some(outputs_obj) = step_obj.get_mut("outputs").and_then(|v| v.as_object_mut()) {
35 outputs_obj.insert(output_key, output_value);
36 }
37 }
38 }
39 Ok(run_outputs_map)
40}
41
42pub async fn get_workflow_context_for_opa(
44 pool: &PgPool,
45 run_id: stormchaser_model::RunId,
46) -> Result<Option<WorkflowOpaContextData>, sqlx::Error> {
47 let context_row = sqlx::query(
48 r#"
49 SELECT
50 wr.initiating_user,
51 rc.workflow_definition,
52 rc.inputs as run_inputs
53 FROM workflow_runs wr
54 JOIN run_contexts rc ON wr.id = rc.run_id
55 WHERE wr.id = $1
56 "#,
57 )
58 .bind(run_id)
59 .fetch_optional(pool)
60 .await?;
61
62 if let Some(row) = context_row {
63 use sqlx::Row;
64 let initiating_user: String = row.get("initiating_user");
65 let workflow_definition: serde_json::Value = row.get("workflow_definition");
66 let run_inputs: serde_json::Value = row.get("run_inputs");
67 Ok(Some(WorkflowOpaContextData {
68 initiating_user,
69 workflow_definition,
70 run_inputs,
71 }))
72 } else {
73 Ok(None)
74 }
75}
76
77pub struct WorkflowOpaContextData {
79 pub initiating_user: String,
80 pub workflow_definition: serde_json::Value,
81 pub run_inputs: serde_json::Value,
82}