stormchaser_engine/handler/workflow/
direct.rs1use anyhow::{Context, Result};
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::PgPool;
5use std::sync::Arc;
6use stormchaser_dsl::StormchaserParser;
7use stormchaser_model::auth::{EngineOpaContext, OpaClient};
8use stormchaser_model::events::WorkflowStartPendingEvent;
9use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
10use stormchaser_model::workflow::{RunStatus, WorkflowRun};
11use stormchaser_model::RunId;
12use tracing::{debug, error, info};
13
14#[tracing::instrument(skip(payload, pool, opa_client, nats_client), fields(run_id = tracing::field::Empty))]
15pub async fn handle_workflow_direct(
17 payload: Value,
18 pool: PgPool,
19 opa_client: Arc<OpaClient>,
20 nats_client: async_nats::Client,
21) -> Result<()> {
22 let run_id_str = payload["run_id"].as_str().context("Missing run_id")?;
23 let run_id = uuid::Uuid::parse_str(run_id_str).map(RunId::new)?;
24 tracing::Span::current().record("run_id", tracing::field::display(run_id));
25 let workflow_content = payload["dsl"].as_str().context("Missing dsl content")?;
26 let initiating_user = payload["initiating_user"]
27 .as_str()
28 .unwrap_or("system")
29 .to_string();
30 let inputs = payload["inputs"].clone();
31
32 info!("Handling direct one-off workflow run: {}", run_id);
33
34 let parser = StormchaserParser::new();
36 let parsed_workflow = match parser.parse(workflow_content) {
37 Ok(w) => w,
38 Err(e) => {
39 error!("Direct workflow parsing failed for {}: {}", run_id, e);
40 return Err(e);
41 }
42 };
43
44 let opa_context = EngineOpaContext {
46 run_id,
47 initiating_user: initiating_user.clone(),
48 workflow_ast: serde_json::to_value(&parsed_workflow)?,
49 inputs: inputs.clone(),
50 };
51
52 match opa_client.check_context(opa_context).await {
53 Ok(true) => debug!("OPA allowed execution for direct run {}", run_id),
54 Ok(false) => {
55 let err_msg = "Execution denied by OPA policy".to_string();
56 info!("Direct run {}: {}", run_id, err_msg);
57 return Err(anyhow::anyhow!(err_msg));
58 }
59 Err(e) => {
60 let err_msg = format!("OPA check failed: {}", e);
61 error!("Direct run {}: {}", run_id, err_msg);
62 return Err(anyhow::anyhow!(err_msg));
63 }
64 }
65
66 let run = WorkflowRun {
69 id: run_id,
70 workflow_name: parsed_workflow.name.clone(),
71 initiating_user: initiating_user.clone(),
72 repo_url: "direct://".to_string(),
73 workflow_path: "inline.storm".to_string(),
74 git_ref: "HEAD".to_string(),
75 status: RunStatus::StartPending,
76 version: 1,
77 fencing_token: Utc::now().timestamp_nanos_opt().unwrap_or(0),
78 created_at: Utc::now(),
79 updated_at: Utc::now(),
80 started_resolving_at: Some(Utc::now()),
81 started_at: None,
82 finished_at: None,
83 error: None,
84 };
85
86 let mut tx = pool.begin().await?;
87
88 crate::db::insert_full_workflow_run(
89 &mut *tx,
90 &run,
91 &parsed_workflow.dsl_version,
92 serde_json::to_value(&parsed_workflow)?,
93 Some(workflow_content),
94 inputs,
95 10,
96 "1",
97 "4Gi",
98 "10Gi",
99 "1h",
100 )
101 .await?;
102
103 tx.commit().await?;
104
105 let event = WorkflowStartPendingEvent {
107 run_id,
108 event_type: EventType::Workflow(WorkflowEventType::StartPending),
109 timestamp: Utc::now(),
110 };
111 let js = async_nats::jetstream::new(nats_client);
112 use stormchaser_model::nats::NatsSubject;
113 stormchaser_model::nats::publish_cloudevent(
114 &js,
115 NatsSubject::RunStartPending,
116 EventType::Workflow(WorkflowEventType::StartPending),
117 EventSource::System,
118 serde_json::to_value(event).unwrap(),
119 Some(SchemaVersion::new("1.0".to_string())),
120 None,
121 )
122 .await
123 .with_context(|| {
124 format!(
125 "Failed to publish start_pending event for direct run {}",
126 run_id
127 )
128 })?;
129
130 info!(
131 "Successfully initialized direct one-off workflow run {}",
132 run_id
133 );
134
135 Ok(())
136}