stormchaser_engine/handler/step/
scheduling.rs1use serde_json::Value;
2use stormchaser_dsl::ast;
3use stormchaser_model::dsl;
4
5use anyhow::Result;
6use sqlx::PgPool;
7use stormchaser_model::step::StepStatus;
8use uuid::Uuid;
9
10use crate::handler::{
11 fetch_outputs, fetch_quotas, fetch_run_context, handle_approval_notification,
12};
13
14pub async fn schedule_step(
16 run_id: Uuid,
17 step_dsl: &ast::Step,
18 executor: &mut sqlx::PgConnection,
19 nats_client: async_nats::Client,
20 hcl_ctx: &hcl::eval::Context<'_>,
21 pool: PgPool,
22 workflow: &ast::Workflow,
23) -> Result<()> {
24 let mut resolved_type = step_dsl.r#type.clone();
25 let mut resolved_spec = step_dsl.spec.clone();
26 let mut resolved_params = serde_json::to_value(&step_dsl.params).unwrap_or(Value::Null);
27
28 if let Some(library) = workflow
30 .step_libraries
31 .iter()
32 .find(|l| l.name == resolved_type)
33 {
34 resolved_type = library.r#type.clone();
35
36 if let (Value::Object(mut lib_spec), Value::Object(step_spec)) =
38 (library.spec.clone(), resolved_spec.clone())
39 {
40 for (k, v) in step_spec {
41 lib_spec.insert(k, v);
42 }
43 resolved_spec = Value::Object(lib_spec);
44 } else if resolved_spec.is_null() {
45 resolved_spec = library.spec.clone();
46 }
47
48 if let (Value::Object(mut lib_params), Value::Object(step_params)) = (
50 serde_json::to_value(&library.params).unwrap_or(Value::Null),
51 resolved_params.clone(),
52 ) {
53 for (k, v) in step_params {
54 lib_params.insert(k, v);
55 }
56 resolved_params = Value::Object(lib_params);
57 } else if resolved_params.is_null() {
58 resolved_params = serde_json::to_value(&library.params).unwrap_or(Value::Null);
59 }
60 }
61
62 if let Some(condition_expr) = &step_dsl.condition {
63 match crate::hcl_eval::evaluate_raw_expr(condition_expr, hcl_ctx) {
64 Ok(Value::Bool(true)) => {}
65 Ok(Value::Bool(false)) => {
66 crate::db::insert_step_instance(
67 executor,
68 Uuid::new_v4(),
69 run_id,
70 &step_dsl.name,
71 &step_dsl.r#type,
72 StepStatus::Skipped,
73 chrono::Utc::now(),
74 )
75 .await?;
76 return Ok(());
77 }
78 Ok(_) => return Err(anyhow::anyhow!("Condition must evaluate to a boolean")),
79 Err(e) => return Err(e),
80 }
81 }
82
83 if let Some(iterate_expr) = &step_dsl.iterate {
84 let items = match crate::hcl_eval::evaluate_raw_expr(iterate_expr, hcl_ctx) {
85 Ok(Value::Array(arr)) => arr,
86 Ok(_) => return Err(anyhow::anyhow!("Iterate must evaluate to an array")),
87 Err(e) => return Err(e),
88 };
89
90 if items.is_empty() {
91 crate::db::insert_step_instance(
92 executor,
93 Uuid::new_v4(),
94 run_id,
95 &step_dsl.name,
96 &step_dsl.r#type,
97 StepStatus::Skipped,
98 chrono::Utc::now(),
99 )
100 .await?;
101 return Ok(());
102 }
103
104 let max_parallel = step_dsl
105 .strategy
106 .as_ref()
107 .and_then(|s| s.max_parallel)
108 .unwrap_or(u32::MAX);
109
110 let run_context = fetch_run_context(run_id, &mut *executor).await?;
111 let steps_outputs = fetch_outputs(run_id, &mut *executor).await?;
112 let quotas = fetch_quotas(run_id, &mut *executor).await?;
113 let current_running_count: i64 =
114 crate::db::count_running_steps_for_run(&mut *executor, run_id).await?;
115
116 for (idx, item) in items.into_iter().enumerate() {
117 let step_instance_id = Uuid::new_v4();
118 let status = match step_dsl.r#type.as_str() {
119 "Approval" | "Wait" => StepStatus::WaitingForEvent,
120 _ => {
121 if (idx as u32) < max_parallel
122 && current_running_count < quotas.max_concurrency as i64
123 {
124 StepStatus::Pending
125 } else {
126 StepStatus::WaitingForEvent
127 }
128 }
129 };
130
131 let mut iteration_ctx = crate::hcl_eval::create_context(
132 run_context.inputs.clone(),
133 run_id,
134 steps_outputs.clone(),
135 );
136 let iter_var_name = step_dsl.iterate_as.as_deref().unwrap_or("item");
137 iteration_ctx.declare_var(iter_var_name, crate::hcl_eval::json_to_hcl(item));
138
139 let mut resolved_spec_iter = resolved_spec.clone();
140 let _ = crate::hcl_eval::resolve_expressions(&mut resolved_spec_iter, &iteration_ctx);
141
142 let mut resolved_params_iter = resolved_params.clone();
143 let _ = crate::hcl_eval::resolve_expressions(&mut resolved_params_iter, &iteration_ctx);
144
145 crate::db::insert_step_instance_with_spec(
146 &mut *executor,
147 step_instance_id,
148 run_id,
149 &step_dsl.name,
150 &resolved_type,
151 status.clone(),
152 Some(idx as i32),
153 resolved_spec_iter.clone(),
154 resolved_params_iter.clone(),
155 chrono::Utc::now(),
156 )
157 .await?;
158
159 if status == StepStatus::WaitingForEvent && resolved_type == "Wait" {
160 if let Ok(wait_spec) =
161 serde_json::from_value::<dsl::WaitEventSpec>(resolved_spec_iter.clone())
162 {
163 let _ = crate::db::insert_event_correlation(
164 &mut *executor,
165 Uuid::new_v4(),
166 step_instance_id,
167 run_id,
168 &wait_spec.correlation_key,
169 &wait_spec.correlation_value,
170 )
171 .await;
172 }
173 }
174 }
175 } else {
176 let _ = crate::hcl_eval::resolve_expressions(&mut resolved_spec, hcl_ctx);
177 let _ = crate::hcl_eval::resolve_expressions(&mut resolved_params, hcl_ctx);
178
179 let step_instance_id = Uuid::new_v4();
180 let initial_status = match resolved_type.as_str() {
181 "Approval" | "Wait" => StepStatus::WaitingForEvent,
182 _ => StepStatus::Pending,
183 };
184
185 println!(
186 "inserting step {} with type {}",
187 step_dsl.name, resolved_type
188 );
189 let insert_result = crate::db::insert_step_instance_with_spec_on_conflict_do_nothing(
190 &mut *executor,
191 step_instance_id,
192 run_id,
193 &step_dsl.name,
194 &resolved_type,
195 initial_status.clone(),
196 None::<i32>,
197 resolved_spec.clone(),
198 resolved_params.clone(),
199 chrono::Utc::now(),
200 )
201 .await?;
202
203 if insert_result.rows_affected() > 0 && initial_status == StepStatus::WaitingForEvent {
204 if resolved_type == "Wait" {
205 if let Ok(wait_spec) =
206 serde_json::from_value::<dsl::WaitEventSpec>(resolved_spec.clone())
207 {
208 let _ = crate::db::insert_event_correlation(
209 &mut *executor,
210 Uuid::new_v4(),
211 step_instance_id,
212 run_id,
213 &wait_spec.correlation_key,
214 &wait_spec.correlation_value,
215 )
216 .await;
217 }
218 } else if step_dsl.r#type == "Approval" {
219 if let Ok(approval_spec) =
220 serde_json::from_value::<dsl::ApprovalSpec>(resolved_spec.clone())
221 {
222 if let Some(notify_spec) = approval_spec.notify {
223 let pool = pool.clone();
224 let nats_client = nats_client.clone();
225 let spec_val = serde_json::to_value(notify_spec)?;
226 tokio::spawn(async move {
227 let _ = handle_approval_notification(
228 run_id,
229 step_instance_id,
230 spec_val,
231 pool,
232 nats_client,
233 )
234 .await;
235 });
236 }
237 }
238 }
239 }
240 }
241
242 Ok(())
243}