Skip to main content

stormchaser_api/db/
opa.rs

1use sqlx::PgPool;
2
3/// Retrieves run outputs for OPA evaluation.
4pub async fn get_run_outputs_for_opa(
5    pool: &PgPool,
6    run_id: stormchaser_model::RunId,
7) -> Result<serde_json::Map<String, serde_json::Value>, sqlx::Error> {
8    use sqlx::Row;
9    let outputs_rows = sqlx::query(
10        r#"
11        SELECT i.step_name, o.key as output_key, o.value as output_value
12        FROM combined_step_instances i
13        JOIN combined_step_outputs o ON i.id = o.step_instance_id
14        WHERE i.run_id = $1
15        "#,
16    )
17    .bind(run_id)
18    .fetch_all(pool)
19    .await?;
20
21    let mut run_outputs_map = serde_json::Map::new();
22    for row in outputs_rows {
23        let step_name: String = row.get("step_name");
24        let output_key: String = row.get("output_key");
25        let output_value: serde_json::Value = row.get("output_value");
26
27        if !run_outputs_map.contains_key(&step_name) {
28            run_outputs_map.insert(step_name.clone(), serde_json::json!({"outputs": {}}));
29        }
30        if let Some(step_obj) = run_outputs_map
31            .get_mut(&step_name)
32            .and_then(|v| v.as_object_mut())
33        {
34            if let Some(outputs_obj) = step_obj.get_mut("outputs").and_then(|v| v.as_object_mut()) {
35                outputs_obj.insert(output_key, output_value);
36            }
37        }
38    }
39    Ok(run_outputs_map)
40}
41
42/// Retrieves workflow context for OPA evaluation.
43pub async fn get_workflow_context_for_opa(
44    pool: &PgPool,
45    run_id: stormchaser_model::RunId,
46) -> Result<Option<WorkflowOpaContextData>, sqlx::Error> {
47    let context_row = sqlx::query(
48        r#"
49        SELECT
50            wr.initiating_user,
51            rc.workflow_definition,
52            rc.inputs as run_inputs
53        FROM workflow_runs wr
54        JOIN run_contexts rc ON wr.id = rc.run_id
55        WHERE wr.id = $1
56        "#,
57    )
58    .bind(run_id)
59    .fetch_optional(pool)
60    .await?;
61
62    if let Some(row) = context_row {
63        use sqlx::Row;
64        let initiating_user: String = row.get("initiating_user");
65        let workflow_definition: serde_json::Value = row.get("workflow_definition");
66        let run_inputs: serde_json::Value = row.get("run_inputs");
67        Ok(Some(WorkflowOpaContextData {
68            initiating_user,
69            workflow_definition,
70            run_inputs,
71        }))
72    } else {
73        Ok(None)
74    }
75}
76
77/// Data returned for workflow OPA context
78pub struct WorkflowOpaContextData {
79    pub initiating_user: String,
80    pub workflow_definition: serde_json::Value,
81    pub run_inputs: serde_json::Value,
82}