Skip to main content

stormchaser_engine/handler/workflow/
direct.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use stormchaser_dsl::StormchaserParser;
7use stormchaser_model::auth::{EngineOpaContext, OpaClient};
8use stormchaser_model::events::WorkflowStartPendingEvent;
9use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
10use stormchaser_model::workflow::{RunStatus, WorkflowRun};
11use stormchaser_model::RunId;
12use tracing::{debug, error, info};
13
14#[tracing::instrument(skip(payload, pool, opa_client, nats_client), fields(run_id = tracing::field::Empty))]
15/// Handle workflow direct.
16pub async fn handle_workflow_direct(
17    payload: Value,
18    pool: PgPool,
19    opa_client: Arc<OpaClient>,
20    nats_client: async_nats::Client,
21) -> Result<()> {
22    let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
23    let run_id = uuid::Uuid::parse_str(run_id_str).map(RunId::new)?;
24    tracing::Span::current().record("run_id", tracing::field::display(run_id));
25    let workflow_content = payload["dsl"].as_str().context("Missing dsl content")?;
26    let initiating_user = payload["initiating_user"]
27        .as_str()
28        .unwrap_or("system")
29        .to_string();
30    let inputs = payload["inputs"].clone();
31
32    info!("Handling direct one-off workflow run: {}", run_id);
33
34    // 1. Parse the DSL content directly
35    let parser = StormchaserParser::new();
36    let parsed_workflow = match parser.parse(workflow_content) {
37        Ok(w) => w,
38        Err(e) => {
39            error!("Direct workflow parsing failed for {}: {}", run_id, e);
40            return Err(e);
41        }
42    };
43
44    // 2. OPA Policy Check
45    let opa_context = EngineOpaContext {
46        run_id,
47        initiating_user: initiating_user.clone(),
48        workflow_ast: serde_json::to_value(&parsed_workflow)?,
49        inputs: inputs.clone(),
50    };
51
52    match opa_client.check_context(opa_context).await {
53        Ok(true) => debug!("OPA allowed execution for direct run {}", run_id),
54        Ok(false) => {
55            let err_msg = "Execution denied by OPA policy".to_string();
56            info!("Direct run {}: {}", run_id, err_msg);
57            return Err(anyhow::anyhow!(err_msg));
58        }
59        Err(e) => {
60            let err_msg = format!("OPA check failed: {}", e);
61            error!("Direct run {}: {}", run_id, err_msg);
62            return Err(anyhow::anyhow!(err_msg));
63        }
64    }
65
66    // 3. Create WorkflowRun and RunContext in DB
67    // Direct runs have no repo_url or workflow_path in the traditional sense
68    let run = WorkflowRun {
69        id: run_id,
70        workflow_name: parsed_workflow.name.clone(),
71        initiating_user: initiating_user.clone(),
72        repo_url: "direct://".to_string(),
73        workflow_path: "inline.storm".to_string(),
74        git_ref: "HEAD".to_string(),
75        status: RunStatus::StartPending,
76        version: 1,
77        fencing_token: Utc::now().timestamp_nanos_opt().unwrap_or(0),
78        created_at: Utc::now(),
79        updated_at: Utc::now(),
80        started_resolving_at: Some(Utc::now()),
81        started_at: None,
82        finished_at: None,
83        error: None,
84    };
85
86    let mut tx = pool.begin().await?;
87
88    crate::db::insert_full_workflow_run(
89        &mut *tx,
90        &run,
91        &parsed_workflow.dsl_version,
92        serde_json::to_value(&parsed_workflow)?,
93        Some(workflow_content),
94        inputs,
95        10,
96        "1",
97        "4Gi",
98        "10Gi",
99        "1h",
100    )
101    .await?;
102
103    tx.commit().await?;
104
105    // 4. Emit event for transition to StartPending
106    let event = WorkflowStartPendingEvent {
107        run_id,
108        event_type: EventType::Workflow(WorkflowEventType::StartPending),
109        timestamp: Utc::now(),
110    };
111    let js = async_nats::jetstream::new(nats_client);
112    use stormchaser_model::nats::NatsSubject;
113    stormchaser_model::nats::publish_cloudevent(
114        &js,
115        NatsSubject::RunStartPending,
116        EventType::Workflow(WorkflowEventType::StartPending),
117        EventSource::System,
118        serde_json::to_value(event).unwrap(),
119        Some(SchemaVersion::new("1.0".to_string())),
120        None,
121    )
122    .await
123    .with_context(|| {
124        format!(
125            "Failed to publish start_pending event for direct run {}",
126            run_id
127        )
128    })?;
129
130    info!(
131        "Successfully initialized direct one-off workflow run {}",
132        run_id
133    );
134
135    Ok(())
136}