stormchaser_engine/handler/step/
scheduling.rs1use serde_json::Value;
2use stormchaser_dsl::ast;
3use stormchaser_model::dsl;
4use stormchaser_model::step::StepStatus;
5use stormchaser_model::{EventId, RunId, StepInstanceId};
6
7use anyhow::Result;
8use sqlx::PgPool;
9use uuid::Uuid;
10
11use crate::handler::{
12 fetch_outputs, fetch_quotas, fetch_run_context, handle_approval_notification,
13};
14
15pub async fn schedule_step(
17 run_id: RunId,
18 step_dsl: &ast::Step,
19 executor: &mut sqlx::PgConnection,
20 nats_client: async_nats::Client,
21 hcl_ctx: &hcl::eval::Context<'_>,
22 pool: PgPool,
23 workflow: &ast::Workflow,
24) -> Result<()> {
25 let mut resolved_type = step_dsl.r#type.clone();
26 let mut resolved_spec = step_dsl.spec.clone();
27 let mut resolved_params = serde_json::to_value(&step_dsl.params).unwrap_or(Value::Null);
28
29 if let Some(library) = workflow
31 .step_libraries
32 .iter()
33 .find(|l| l.name == resolved_type)
34 {
35 resolved_type = library.r#type.clone();
36
37 if let (Value::Object(mut lib_spec), Value::Object(step_spec)) =
39 (library.spec.clone(), resolved_spec.clone())
40 {
41 for (k, v) in step_spec {
42 lib_spec.insert(k, v);
43 }
44 resolved_spec = Value::Object(lib_spec);
45 } else if resolved_spec.is_null() {
46 resolved_spec = library.spec.clone();
47 }
48
49 if let (Value::Object(mut lib_params), Value::Object(step_params)) = (
51 serde_json::to_value(&library.params).unwrap_or(Value::Null),
52 resolved_params.clone(),
53 ) {
54 for (k, v) in step_params {
55 lib_params.insert(k, v);
56 }
57 resolved_params = Value::Object(lib_params);
58 } else if resolved_params.is_null() {
59 resolved_params = serde_json::to_value(&library.params).unwrap_or(Value::Null);
60 }
61 }
62
63 if let Some(condition_expr) = &step_dsl.condition {
64 match crate::hcl_eval::evaluate_raw_expr(condition_expr, hcl_ctx) {
65 Ok(Value::Bool(true)) => {}
66 Ok(Value::Bool(false)) => {
67 crate::db::insert_step_instance(
68 executor,
69 StepInstanceId::new_v4(),
70 run_id,
71 &step_dsl.name,
72 &step_dsl.r#type,
73 StepStatus::Skipped,
74 chrono::Utc::now(),
75 )
76 .await?;
77 return Ok(());
78 }
79 Ok(_) => return Err(anyhow::anyhow!("Condition must evaluate to a boolean")),
80 Err(e) => return Err(e),
81 }
82 }
83
84 if let Some(iterate_expr) = &step_dsl.iterate {
85 let items = match crate::hcl_eval::evaluate_raw_expr(iterate_expr, hcl_ctx) {
86 Ok(Value::Array(arr)) => arr,
87 Ok(_) => return Err(anyhow::anyhow!("Iterate must evaluate to an array")),
88 Err(e) => return Err(e),
89 };
90
91 if items.is_empty() {
92 crate::db::insert_step_instance(
93 executor,
94 StepInstanceId::new_v4(),
95 run_id,
96 &step_dsl.name,
97 &step_dsl.r#type,
98 StepStatus::Skipped,
99 chrono::Utc::now(),
100 )
101 .await?;
102 return Ok(());
103 }
104
105 let max_parallel = step_dsl
106 .strategy
107 .as_ref()
108 .and_then(|s| s.max_parallel)
109 .unwrap_or(u32::MAX);
110
111 let run_context = fetch_run_context(run_id, &mut *executor).await?;
112 let steps_outputs = fetch_outputs(run_id, &mut *executor).await?;
113 let quotas = fetch_quotas(run_id, &mut *executor).await?;
114 let current_running_count: i64 =
115 crate::db::count_running_steps_for_run(&mut *executor, run_id).await?;
116
117 for (idx, item) in items.into_iter().enumerate() {
118 let step_instance_id = StepInstanceId::new(Uuid::new_v4());
119 let status = match step_dsl.r#type.as_str() {
120 "Approval" | "Wait" => StepStatus::WaitingForEvent,
121 _ => {
122 if (idx as u32) < max_parallel
123 && current_running_count < quotas.max_concurrency as i64
124 {
125 StepStatus::Pending
126 } else {
127 StepStatus::WaitingForEvent
128 }
129 }
130 };
131
132 let mut iteration_ctx = crate::hcl_eval::create_context(
133 run_context.inputs.clone(),
134 run_id,
135 steps_outputs.clone(),
136 );
137 let iter_var_name = step_dsl.iterate_as.as_deref().unwrap_or("item");
138 iteration_ctx.declare_var(iter_var_name, crate::hcl_eval::json_to_hcl(item));
139
140 let mut resolved_spec_iter = resolved_spec.clone();
141 let _ = crate::hcl_eval::resolve_expressions(&mut resolved_spec_iter, &iteration_ctx);
142
143 let mut resolved_params_iter = resolved_params.clone();
144 let _ = crate::hcl_eval::resolve_expressions(&mut resolved_params_iter, &iteration_ctx);
145
146 crate::db::insert_step_instance_with_spec(
147 &mut *executor,
148 step_instance_id,
149 run_id,
150 &step_dsl.name,
151 &resolved_type,
152 status.clone(),
153 Some(idx as i32),
154 resolved_spec_iter.clone(),
155 resolved_params_iter.clone(),
156 chrono::Utc::now(),
157 )
158 .await?;
159
160 if status == StepStatus::WaitingForEvent && resolved_type == "Wait" {
161 if let Ok(wait_spec) =
162 serde_json::from_value::<dsl::WaitEventSpec>(resolved_spec_iter.clone())
163 {
164 let _ = crate::db::insert_event_correlation(
165 &mut *executor,
166 EventId::new_v4(),
167 step_instance_id,
168 run_id,
169 &wait_spec.correlation_key,
170 &wait_spec.correlation_value,
171 )
172 .await;
173 }
174 }
175 }
176 } else {
177 let _ = crate::hcl_eval::resolve_expressions(&mut resolved_spec, hcl_ctx);
178 let _ = crate::hcl_eval::resolve_expressions(&mut resolved_params, hcl_ctx);
179
180 let step_instance_id = StepInstanceId::new(Uuid::new_v4());
181 let initial_status = match resolved_type.as_str() {
182 "Approval" | "Wait" => StepStatus::WaitingForEvent,
183 _ => StepStatus::Pending,
184 };
185
186 println!(
187 "inserting step {} with type {}",
188 step_dsl.name, resolved_type
189 );
190 let insert_result = crate::db::insert_step_instance_with_spec_on_conflict_do_nothing(
191 &mut *executor,
192 step_instance_id,
193 run_id,
194 &step_dsl.name,
195 &resolved_type,
196 initial_status.clone(),
197 None::<i32>,
198 resolved_spec.clone(),
199 resolved_params.clone(),
200 chrono::Utc::now(),
201 )
202 .await?;
203
204 if insert_result.rows_affected() > 0 && initial_status == StepStatus::WaitingForEvent {
205 if resolved_type == "Wait" {
206 if let Ok(wait_spec) =
207 serde_json::from_value::<dsl::WaitEventSpec>(resolved_spec.clone())
208 {
209 let _ = crate::db::insert_event_correlation(
210 &mut *executor,
211 EventId::new_v4(),
212 step_instance_id,
213 run_id,
214 &wait_spec.correlation_key,
215 &wait_spec.correlation_value,
216 )
217 .await;
218 }
219 } else if step_dsl.r#type == "Approval" {
220 if let Ok(approval_spec) =
221 serde_json::from_value::<dsl::ApprovalSpec>(resolved_spec.clone())
222 {
223 if let Some(notify_spec) = approval_spec.notify {
224 let pool = pool.clone();
225 let nats_client = nats_client.clone();
226 let spec_val = serde_json::to_value(notify_spec)?;
227 tokio::spawn(async move {
228 let _ = handle_approval_notification(
229 run_id,
230 step_instance_id,
231 spec_val,
232 pool,
233 nats_client,
234 )
235 .await;
236 });
237 }
238 }
239 }
240 }
241 }
242
243 Ok(())
244}