Skip to main content

stormchaser_engine/handler/workflow/
direct.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use stormchaser_dsl::StormchaserParser;
7use stormchaser_model::auth::{EngineOpaContext, OpaClient};
8use stormchaser_model::events::WorkflowStartPendingEvent;
9use stormchaser_model::workflow::{RunStatus, WorkflowRun};
10use stormchaser_model::RunId;
11use tracing::{debug, error, info};
12
13#[tracing::instrument(skip(payload, pool, opa_client, nats_client), fields(run_id = tracing::field::Empty))]
14/// Handle workflow direct.
15pub async fn handle_workflow_direct(
16    payload: Value,
17    pool: PgPool,
18    opa_client: Arc<OpaClient>,
19    nats_client: async_nats::Client,
20) -> Result<()> {
21    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
22    let run_id = uuid::Uuid::parse_str(run_id_str).map(RunId::new)?;
23    tracing::Span::current().record("run_id", tracing::field::display(run_id));
24    let workflow_content = payload["dsl"].as_str().context("Missing dsl content")?;
25    let initiating_user = payload["initiating_user"]
26        .as_str()
27        .unwrap_or("system")
28        .to_string();
29    let inputs = payload["inputs"].clone();
30
31    info!("Handling direct one-off workflow run: {}", run_id);
32
33    // 1. Parse the DSL content directly
34    let parser = StormchaserParser::new();
35    let parsed_workflow = match parser.parse(workflow_content) {
36        Ok(w) => w,
37        Err(e) => {
38            error!("Direct workflow parsing failed for {}: {}", run_id, e);
39            return Err(e);
40        }
41    };
42
43    // 2. OPA Policy Check
44    let opa_context = EngineOpaContext {
45        run_id,
46        initiating_user: initiating_user.clone(),
47        workflow_ast: serde_json::to_value(&parsed_workflow)?,
48        inputs: inputs.clone(),
49    };
50
51    match opa_client.check_context(opa_context).await {
52        Ok(true) => debug!("OPA allowed execution for direct run {}", run_id),
53        Ok(false) => {
54            let err_msg = "Execution denied by OPA policy".to_string();
55            info!("Direct run {}: {}", run_id, err_msg);
56            return Err(anyhow::anyhow!(err_msg));
57        }
58        Err(e) => {
59            let err_msg = format!("OPA check failed: {}", e);
60            error!("Direct run {}: {}", run_id, err_msg);
61            return Err(anyhow::anyhow!(err_msg));
62        }
63    }
64
65    // 3. Create WorkflowRun and RunContext in DB
66    // Direct runs have no repo_url or workflow_path in the traditional sense
67    let run = WorkflowRun {
68        id: run_id,
69        workflow_name: parsed_workflow.name.clone(),
70        initiating_user: initiating_user.clone(),
71        repo_url: "direct://".to_string(),
72        workflow_path: "inline.storm".to_string(),
73        git_ref: "HEAD".to_string(),
74        status: RunStatus::StartPending,
75        version: 1,
76        fencing_token: Utc::now().timestamp_nanos_opt().unwrap_or(0),
77        created_at: Utc::now(),
78        updated_at: Utc::now(),
79        started_resolving_at: Some(Utc::now()),
80        started_at: None,
81        finished_at: None,
82        error: None,
83    };
84
85    let mut tx = pool.begin().await?;
86
87    crate::db::insert_full_workflow_run(
88        &mut *tx,
89        &run,
90        &parsed_workflow.dsl_version,
91        serde_json::to_value(&parsed_workflow)?,
92        Some(workflow_content),
93        inputs,
94        10,
95        "1",
96        "4Gi",
97        "10Gi",
98        "1h",
99    )
100    .await?;
101
102    tx.commit().await?;
103
104    // 4. Emit event for transition to StartPending
105    let event = WorkflowStartPendingEvent {
106        run_id,
107        event_type: "workflow_start_pending".to_string(),
108        timestamp: Utc::now(),
109    };
110    let js = async_nats::jetstream::new(nats_client);
111    stormchaser_model::nats::publish_cloudevent(
112        &js,
113        "stormchaser.v1.run.start_pending",
114        "stormchaser.v1.run.start_pending",
115        "/stormchaser",
116        serde_json::to_value(event).unwrap(),
117        Some("1.0"),
118        None,
119    )
120    .await
121    .with_context(|| {
122        format!(
123            "Failed to publish start_pending event for direct run {}",
124            run_id
125        )
126    })?;
127
128    info!(
129        "Successfully initialized direct one-off workflow run {}",
130        run_id
131    );
132
133    Ok(())
134}