Skip to main content

stormchaser_engine/handler/step/
scheduling.rs

1use serde_json::Value;
2use stormchaser_dsl::ast;
3use stormchaser_model::dsl;
4use stormchaser_model::step::StepStatus;
5use stormchaser_model::{EventId, RunId, StepInstanceId};
6
7use anyhow::Result;
8use sqlx::PgPool;
9use uuid::Uuid;
10
11use crate::handler::{
12    fetch_outputs, fetch_quotas, fetch_run_context, handle_approval_notification,
13};
14
15/// Schedules a step for execution, creating a new instance and managing its initial state transitions based on dependencies and quotas.
16pub async fn schedule_step(
17    run_id: RunId,
18    step_dsl: &ast::Step,
19    executor: &mut sqlx::PgConnection,
20    nats_client: async_nats::Client,
21    hcl_ctx: &hcl::eval::Context<'_>,
22    pool: PgPool,
23    workflow: &ast::Workflow,
24) -> Result<()> {
25    let mut resolved_type = step_dsl.r#type.clone();
26    let mut resolved_spec = step_dsl.spec.clone();
27    let mut resolved_params = serde_json::to_value(&step_dsl.params).unwrap_or(Value::Null);
28
29    // Merge Step Library if it exists
30    if let Some(library) = workflow
31        .step_libraries
32        .iter()
33        .find(|l| l.name == resolved_type)
34    {
35        resolved_type = library.r#type.clone();
36
37        // Merge specs
38        if let (Value::Object(mut lib_spec), Value::Object(step_spec)) =
39            (library.spec.clone(), resolved_spec.clone())
40        {
41            for (k, v) in step_spec {
42                lib_spec.insert(k, v);
43            }
44            resolved_spec = Value::Object(lib_spec);
45        } else if resolved_spec.is_null() {
46            resolved_spec = library.spec.clone();
47        }
48
49        // Merge params
50        if let (Value::Object(mut lib_params), Value::Object(step_params)) = (
51            serde_json::to_value(&library.params).unwrap_or(Value::Null),
52            resolved_params.clone(),
53        ) {
54            for (k, v) in step_params {
55                lib_params.insert(k, v);
56            }
57            resolved_params = Value::Object(lib_params);
58        } else if resolved_params.is_null() {
59            resolved_params = serde_json::to_value(&library.params).unwrap_or(Value::Null);
60        }
61    }
62
63    if let Some(condition_expr) = &step_dsl.condition {
64        match crate::hcl_eval::evaluate_raw_expr(condition_expr, hcl_ctx) {
65            Ok(Value::Bool(true)) => {}
66            Ok(Value::Bool(false)) => {
67                crate::db::insert_step_instance(
68                    executor,
69                    StepInstanceId::new_v4(),
70                    run_id,
71                    &step_dsl.name,
72                    &step_dsl.r#type,
73                    StepStatus::Skipped,
74                    chrono::Utc::now(),
75                )
76                .await?;
77                return Ok(());
78            }
79            Ok(_) => return Err(anyhow::anyhow!("Condition must evaluate to a boolean")),
80            Err(e) => return Err(e),
81        }
82    }
83
84    if let Some(iterate_expr) = &step_dsl.iterate {
85        let items = match crate::hcl_eval::evaluate_raw_expr(iterate_expr, hcl_ctx) {
86            Ok(Value::Array(arr)) => arr,
87            Ok(_) => return Err(anyhow::anyhow!("Iterate must evaluate to an array")),
88            Err(e) => return Err(e),
89        };
90
91        if items.is_empty() {
92            crate::db::insert_step_instance(
93                executor,
94                StepInstanceId::new_v4(),
95                run_id,
96                &step_dsl.name,
97                &step_dsl.r#type,
98                StepStatus::Skipped,
99                chrono::Utc::now(),
100            )
101            .await?;
102            return Ok(());
103        }
104
105        let max_parallel = step_dsl
106            .strategy
107            .as_ref()
108            .and_then(|s| s.max_parallel)
109            .unwrap_or(u32::MAX);
110
111        let run_context = fetch_run_context(run_id, &mut *executor).await?;
112        let steps_outputs = fetch_outputs(run_id, &mut *executor).await?;
113        let quotas = fetch_quotas(run_id, &mut *executor).await?;
114        let current_running_count: i64 =
115            crate::db::count_running_steps_for_run(&mut *executor, run_id).await?;
116
117        for (idx, item) in items.into_iter().enumerate() {
118            let step_instance_id = StepInstanceId::new(Uuid::new_v4());
119            let status = match step_dsl.r#type.as_str() {
120                "Approval" | "Wait" => StepStatus::WaitingForEvent,
121                _ => {
122                    if (idx as u32) < max_parallel
123                        && current_running_count < quotas.max_concurrency as i64
124                    {
125                        StepStatus::Pending
126                    } else {
127                        StepStatus::WaitingForEvent
128                    }
129                }
130            };
131
132            let mut iteration_ctx = crate::hcl_eval::create_context(
133                run_context.inputs.clone(),
134                run_id,
135                steps_outputs.clone(),
136            );
137            let iter_var_name = step_dsl.iterate_as.as_deref().unwrap_or("item");
138            iteration_ctx.declare_var(iter_var_name, crate::hcl_eval::json_to_hcl(item));
139
140            let mut resolved_spec_iter = resolved_spec.clone();
141            let _ = crate::hcl_eval::resolve_expressions(&mut resolved_spec_iter, &iteration_ctx);
142
143            let mut resolved_params_iter = resolved_params.clone();
144            let _ = crate::hcl_eval::resolve_expressions(&mut resolved_params_iter, &iteration_ctx);
145
146            crate::db::insert_step_instance_with_spec(
147                &mut *executor,
148                step_instance_id,
149                run_id,
150                &step_dsl.name,
151                &resolved_type,
152                status.clone(),
153                Some(idx as i32),
154                resolved_spec_iter.clone(),
155                resolved_params_iter.clone(),
156                chrono::Utc::now(),
157            )
158            .await?;
159
160            if status == StepStatus::WaitingForEvent && resolved_type == "Wait" {
161                if let Ok(wait_spec) =
162                    serde_json::from_value::<dsl::WaitEventSpec>(resolved_spec_iter.clone())
163                {
164                    let _ = crate::db::insert_event_correlation(
165                        &mut *executor,
166                        EventId::new_v4(),
167                        step_instance_id,
168                        run_id,
169                        &wait_spec.correlation_key,
170                        &wait_spec.correlation_value,
171                    )
172                    .await;
173                }
174            }
175        }
176    } else {
177        let _ = crate::hcl_eval::resolve_expressions(&mut resolved_spec, hcl_ctx);
178        let _ = crate::hcl_eval::resolve_expressions(&mut resolved_params, hcl_ctx);
179
180        let step_instance_id = StepInstanceId::new(Uuid::new_v4());
181        let initial_status = match resolved_type.as_str() {
182            "Approval" | "Wait" => StepStatus::WaitingForEvent,
183            _ => StepStatus::Pending,
184        };
185
186        println!(
187            "inserting step {} with type {}",
188            step_dsl.name, resolved_type
189        );
190        let insert_result = crate::db::insert_step_instance_with_spec_on_conflict_do_nothing(
191            &mut *executor,
192            step_instance_id,
193            run_id,
194            &step_dsl.name,
195            &resolved_type,
196            initial_status.clone(),
197            None::<i32>,
198            resolved_spec.clone(),
199            resolved_params.clone(),
200            chrono::Utc::now(),
201        )
202        .await?;
203
204        if insert_result.rows_affected() > 0 && initial_status == StepStatus::WaitingForEvent {
205            if resolved_type == "Wait" {
206                if let Ok(wait_spec) =
207                    serde_json::from_value::<dsl::WaitEventSpec>(resolved_spec.clone())
208                {
209                    let _ = crate::db::insert_event_correlation(
210                        &mut *executor,
211                        EventId::new_v4(),
212                        step_instance_id,
213                        run_id,
214                        &wait_spec.correlation_key,
215                        &wait_spec.correlation_value,
216                    )
217                    .await;
218                }
219            } else if step_dsl.r#type == "Approval" {
220                if let Ok(approval_spec) =
221                    serde_json::from_value::<dsl::ApprovalSpec>(resolved_spec.clone())
222                {
223                    if let Some(notify_spec) = approval_spec.notify {
224                        let pool = pool.clone();
225                        let nats_client = nats_client.clone();
226                        let spec_val = serde_json::to_value(notify_spec)?;
227                        tokio::spawn(async move {
228                            let _ = handle_approval_notification(
229                                run_id,
230                                step_instance_id,
231                                spec_val,
232                                pool,
233                                nats_client,
234                            )
235                            .await;
236                        });
237                    }
238                }
239            }
240        }
241    }
242
243    Ok(())
244}