stormchaser_engine/handler/workflow/
direct.rs1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use stormchaser_dsl::StormchaserParser;
7use stormchaser_model::auth::{EngineOpaContext, OpaClient};
8use stormchaser_model::events::WorkflowStartPendingEvent;
9use stormchaser_model::workflow::{RunStatus, WorkflowRun};
10use stormchaser_model::RunId;
11use tracing::{debug, error, info};
12
13#[tracing::instrument(skip(payload, pool, opa_client, nats_client), fields(run_id = tracing::field::Empty))]
14pub async fn handle_workflow_direct(
16 payload: Value,
17 pool: PgPool,
18 opa_client: Arc<OpaClient>,
19 nats_client: async_nats::Client,
20) -> Result<()> {
21 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
22 let run_id = uuid::Uuid::parse_str(run_id_str).map(RunId::new)?;
23 tracing::Span::current().record("run_id", tracing::field::display(run_id));
24 let workflow_content = payload["dsl"].as_str().context("Missing dsl content")?;
25 let initiating_user = payload["initiating_user"]
26 .as_str()
27 .unwrap_or("system")
28 .to_string();
29 let inputs = payload["inputs"].clone();
30
31 info!("Handling direct one-off workflow run: {}", run_id);
32
33 let parser = StormchaserParser::new();
35 let parsed_workflow = match parser.parse(workflow_content) {
36 Ok(w) => w,
37 Err(e) => {
38 error!("Direct workflow parsing failed for {}: {}", run_id, e);
39 return Err(e);
40 }
41 };
42
43 let opa_context = EngineOpaContext {
45 run_id,
46 initiating_user: initiating_user.clone(),
47 workflow_ast: serde_json::to_value(&parsed_workflow)?,
48 inputs: inputs.clone(),
49 };
50
51 match opa_client.check_context(opa_context).await {
52 Ok(true) => debug!("OPA allowed execution for direct run {}", run_id),
53 Ok(false) => {
54 let err_msg = "Execution denied by OPA policy".to_string();
55 info!("Direct run {}: {}", run_id, err_msg);
56 return Err(anyhow::anyhow!(err_msg));
57 }
58 Err(e) => {
59 let err_msg = format!("OPA check failed: {}", e);
60 error!("Direct run {}: {}", run_id, err_msg);
61 return Err(anyhow::anyhow!(err_msg));
62 }
63 }
64
65 let run = WorkflowRun {
68 id: run_id,
69 workflow_name: parsed_workflow.name.clone(),
70 initiating_user: initiating_user.clone(),
71 repo_url: "direct://".to_string(),
72 workflow_path: "inline.storm".to_string(),
73 git_ref: "HEAD".to_string(),
74 status: RunStatus::StartPending,
75 version: 1,
76 fencing_token: Utc::now().timestamp_nanos_opt().unwrap_or(0),
77 created_at: Utc::now(),
78 updated_at: Utc::now(),
79 started_resolving_at: Some(Utc::now()),
80 started_at: None,
81 finished_at: None,
82 error: None,
83 };
84
85 let mut tx = pool.begin().await?;
86
87 crate::db::insert_full_workflow_run(
88 &mut *tx,
89 &run,
90 &parsed_workflow.dsl_version,
91 serde_json::to_value(&parsed_workflow)?,
92 Some(workflow_content),
93 inputs,
94 10,
95 "1",
96 "4Gi",
97 "10Gi",
98 "1h",
99 )
100 .await?;
101
102 tx.commit().await?;
103
104 let event = WorkflowStartPendingEvent {
106 run_id,
107 event_type: "workflow_start_pending".to_string(),
108 timestamp: Utc::now(),
109 };
110 let js = async_nats::jetstream::new(nats_client);
111 stormchaser_model::nats::publish_cloudevent(
112 &js,
113 "stormchaser.v1.run.start_pending",
114 "stormchaser.v1.run.start_pending",
115 "/stormchaser",
116 serde_json::to_value(event).unwrap(),
117 Some("1.0"),
118 None,
119 )
120 .await
121 .with_context(|| {
122 format!(
123 "Failed to publish start_pending event for direct run {}",
124 run_id
125 )
126 })?;
127
128 info!(
129 "Successfully initialized direct one-off workflow run {}",
130 run_id
131 );
132
133 Ok(())
134}