Skip to main content

stormchaser_engine/handler/step/
scheduling.rs

1use serde_json::Value;
2use stormchaser_dsl::ast;
3use stormchaser_model::dsl;
4
5use anyhow::Result;
6use sqlx::PgPool;
7use stormchaser_model::step::StepStatus;
8use uuid::Uuid;
9
10use crate::handler::{
11    fetch_outputs, fetch_quotas, fetch_run_context, handle_approval_notification,
12};
13
14/// Schedules a step for execution, creating a new instance and managing its initial state transitions based on dependencies and quotas.
15pub async fn schedule_step(
16    run_id: Uuid,
17    step_dsl: &ast::Step,
18    executor: &mut sqlx::PgConnection,
19    nats_client: async_nats::Client,
20    hcl_ctx: &hcl::eval::Context<'_>,
21    pool: PgPool,
22    workflow: &ast::Workflow,
23) -> Result<()> {
24    let mut resolved_type = step_dsl.r#type.clone();
25    let mut resolved_spec = step_dsl.spec.clone();
26    let mut resolved_params = serde_json::to_value(&step_dsl.params).unwrap_or(Value::Null);
27
28    // Merge Step Library if it exists
29    if let Some(library) = workflow
30        .step_libraries
31        .iter()
32        .find(|l| l.name == resolved_type)
33    {
34        resolved_type = library.r#type.clone();
35
36        // Merge specs
37        if let (Value::Object(mut lib_spec), Value::Object(step_spec)) =
38            (library.spec.clone(), resolved_spec.clone())
39        {
40            for (k, v) in step_spec {
41                lib_spec.insert(k, v);
42            }
43            resolved_spec = Value::Object(lib_spec);
44        } else if resolved_spec.is_null() {
45            resolved_spec = library.spec.clone();
46        }
47
48        // Merge params
49        if let (Value::Object(mut lib_params), Value::Object(step_params)) = (
50            serde_json::to_value(&library.params).unwrap_or(Value::Null),
51            resolved_params.clone(),
52        ) {
53            for (k, v) in step_params {
54                lib_params.insert(k, v);
55            }
56            resolved_params = Value::Object(lib_params);
57        } else if resolved_params.is_null() {
58            resolved_params = serde_json::to_value(&library.params).unwrap_or(Value::Null);
59        }
60    }
61
62    if let Some(condition_expr) = &step_dsl.condition {
63        match crate::hcl_eval::evaluate_raw_expr(condition_expr, hcl_ctx) {
64            Ok(Value::Bool(true)) => {}
65            Ok(Value::Bool(false)) => {
66                crate::db::insert_step_instance(
67                    executor,
68                    Uuid::new_v4(),
69                    run_id,
70                    &step_dsl.name,
71                    &step_dsl.r#type,
72                    StepStatus::Skipped,
73                    chrono::Utc::now(),
74                )
75                .await?;
76                return Ok(());
77            }
78            Ok(_) => return Err(anyhow::anyhow!("Condition must evaluate to a boolean")),
79            Err(e) => return Err(e),
80        }
81    }
82
83    if let Some(iterate_expr) = &step_dsl.iterate {
84        let items = match crate::hcl_eval::evaluate_raw_expr(iterate_expr, hcl_ctx) {
85            Ok(Value::Array(arr)) => arr,
86            Ok(_) => return Err(anyhow::anyhow!("Iterate must evaluate to an array")),
87            Err(e) => return Err(e),
88        };
89
90        if items.is_empty() {
91            crate::db::insert_step_instance(
92                executor,
93                Uuid::new_v4(),
94                run_id,
95                &step_dsl.name,
96                &step_dsl.r#type,
97                StepStatus::Skipped,
98                chrono::Utc::now(),
99            )
100            .await?;
101            return Ok(());
102        }
103
104        let max_parallel = step_dsl
105            .strategy
106            .as_ref()
107            .and_then(|s| s.max_parallel)
108            .unwrap_or(u32::MAX);
109
110        let run_context = fetch_run_context(run_id, &mut *executor).await?;
111        let steps_outputs = fetch_outputs(run_id, &mut *executor).await?;
112        let quotas = fetch_quotas(run_id, &mut *executor).await?;
113        let current_running_count: i64 =
114            crate::db::count_running_steps_for_run(&mut *executor, run_id).await?;
115
116        for (idx, item) in items.into_iter().enumerate() {
117            let step_instance_id = Uuid::new_v4();
118            let status = match step_dsl.r#type.as_str() {
119                "Approval" | "Wait" => StepStatus::WaitingForEvent,
120                _ => {
121                    if (idx as u32) < max_parallel
122                        && current_running_count < quotas.max_concurrency as i64
123                    {
124                        StepStatus::Pending
125                    } else {
126                        StepStatus::WaitingForEvent
127                    }
128                }
129            };
130
131            let mut iteration_ctx = crate::hcl_eval::create_context(
132                run_context.inputs.clone(),
133                run_id,
134                steps_outputs.clone(),
135            );
136            let iter_var_name = step_dsl.iterate_as.as_deref().unwrap_or("item");
137            iteration_ctx.declare_var(iter_var_name, crate::hcl_eval::json_to_hcl(item));
138
139            let mut resolved_spec_iter = resolved_spec.clone();
140            let _ = crate::hcl_eval::resolve_expressions(&mut resolved_spec_iter, &iteration_ctx);
141
142            let mut resolved_params_iter = resolved_params.clone();
143            let _ = crate::hcl_eval::resolve_expressions(&mut resolved_params_iter, &iteration_ctx);
144
145            crate::db::insert_step_instance_with_spec(
146                &mut *executor,
147                step_instance_id,
148                run_id,
149                &step_dsl.name,
150                &resolved_type,
151                status.clone(),
152                Some(idx as i32),
153                resolved_spec_iter.clone(),
154                resolved_params_iter.clone(),
155                chrono::Utc::now(),
156            )
157            .await?;
158
159            if status == StepStatus::WaitingForEvent && resolved_type == "Wait" {
160                if let Ok(wait_spec) =
161                    serde_json::from_value::<dsl::WaitEventSpec>(resolved_spec_iter.clone())
162                {
163                    let _ = crate::db::insert_event_correlation(
164                        &mut *executor,
165                        Uuid::new_v4(),
166                        step_instance_id,
167                        run_id,
168                        &wait_spec.correlation_key,
169                        &wait_spec.correlation_value,
170                    )
171                    .await;
172                }
173            }
174        }
175    } else {
176        let _ = crate::hcl_eval::resolve_expressions(&mut resolved_spec, hcl_ctx);
177        let _ = crate::hcl_eval::resolve_expressions(&mut resolved_params, hcl_ctx);
178
179        let step_instance_id = Uuid::new_v4();
180        let initial_status = match resolved_type.as_str() {
181            "Approval" | "Wait" => StepStatus::WaitingForEvent,
182            _ => StepStatus::Pending,
183        };
184
185        println!(
186            "inserting step {} with type {}",
187            step_dsl.name, resolved_type
188        );
189        let insert_result = crate::db::insert_step_instance_with_spec_on_conflict_do_nothing(
190            &mut *executor,
191            step_instance_id,
192            run_id,
193            &step_dsl.name,
194            &resolved_type,
195            initial_status.clone(),
196            None::<i32>,
197            resolved_spec.clone(),
198            resolved_params.clone(),
199            chrono::Utc::now(),
200        )
201        .await?;
202
203        if insert_result.rows_affected() > 0 && initial_status == StepStatus::WaitingForEvent {
204            if resolved_type == "Wait" {
205                if let Ok(wait_spec) =
206                    serde_json::from_value::<dsl::WaitEventSpec>(resolved_spec.clone())
207                {
208                    let _ = crate::db::insert_event_correlation(
209                        &mut *executor,
210                        Uuid::new_v4(),
211                        step_instance_id,
212                        run_id,
213                        &wait_spec.correlation_key,
214                        &wait_spec.correlation_value,
215                    )
216                    .await;
217                }
218            } else if step_dsl.r#type == "Approval" {
219                if let Ok(approval_spec) =
220                    serde_json::from_value::<dsl::ApprovalSpec>(resolved_spec.clone())
221                {
222                    if let Some(notify_spec) = approval_spec.notify {
223                        let pool = pool.clone();
224                        let nats_client = nats_client.clone();
225                        let spec_val = serde_json::to_value(notify_spec)?;
226                        tokio::spawn(async move {
227                            let _ = handle_approval_notification(
228                                run_id,
229                                step_instance_id,
230                                spec_val,
231                                pool,
232                                nats_client,
233                            )
234                            .await;
235                        });
236                    }
237                }
238            }
239        }
240    }
241
242    Ok(())
243}