1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::state::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
16
17pub async fn execute_state_machine(
20 state: SharedStepFunctionsState,
21 execution_arn: String,
22 definition: String,
23 input: Option<String>,
24 delivery: Option<Arc<DeliveryBus>>,
25 dynamodb_state: Option<SharedDynamoDbState>,
26) {
27 let def: Value = match serde_json::from_str(&definition) {
28 Ok(v) => v,
29 Err(e) => {
30 fail_execution(
31 &state,
32 &execution_arn,
33 "States.Runtime",
34 &format!("Failed to parse definition: {e}"),
35 );
36 return;
37 }
38 };
39
40 let raw_input: Value = input
41 .as_deref()
42 .and_then(|s| serde_json::from_str(s).ok())
43 .unwrap_or(json!({}));
44
45 add_event(
47 &state,
48 &execution_arn,
49 "ExecutionStarted",
50 0,
51 json!({
52 "input": serde_json::to_string(&raw_input).unwrap_or_default(),
53 "roleArn": "arn:aws:iam::123456789012:role/execution-role"
54 }),
55 );
56
57 let def_owned = def;
63 let state_clone = state.clone();
64 let execution_arn_clone = execution_arn.clone();
65 let delivery_clone = delivery.clone();
66 let dynamodb_state_clone = dynamodb_state.clone();
67 let handle = tokio::spawn(async move {
68 run_states(
69 &def_owned,
70 raw_input,
71 &delivery_clone,
72 &dynamodb_state_clone,
73 &state_clone,
74 &execution_arn_clone,
75 )
76 .await
77 });
78
79 match handle.await {
80 Ok(Ok(output)) => {
81 succeed_execution(&state, &execution_arn, &output);
82 }
83 Ok(Err((error, cause))) => {
84 fail_execution(&state, &execution_arn, &error, &cause);
85 }
86 Err(join_err) => {
87 let msg = if join_err.is_panic() {
88 let payload = join_err.into_panic();
89 if let Some(s) = payload.downcast_ref::<String>() {
90 s.clone()
91 } else if let Some(s) = payload.downcast_ref::<&'static str>() {
92 (*s).to_string()
93 } else {
94 "execution task panicked".to_string()
95 }
96 } else {
97 format!("execution task cancelled: {join_err}")
98 };
99 tracing::error!(
100 execution_arn = %execution_arn,
101 panic = %msg,
102 "Step Functions execution panicked"
103 );
104 fail_execution(&state, &execution_arn, "States.Runtime", &msg);
105 }
106 }
107}
108
109type StatesResult<'a> = std::pin::Pin<
110 Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
111>;
112
113fn run_states<'a>(
116 def: &'a Value,
117 input: Value,
118 delivery: &'a Option<Arc<DeliveryBus>>,
119 dynamodb_state: &'a Option<SharedDynamoDbState>,
120 shared_state: &'a SharedStepFunctionsState,
121 execution_arn: &'a str,
122) -> StatesResult<'a> {
123 Box::pin(async move {
124 let start_at = def["StartAt"]
125 .as_str()
126 .ok_or_else(|| {
127 (
128 "States.Runtime".to_string(),
129 "Missing StartAt in definition".to_string(),
130 )
131 })?
132 .to_string();
133
134 let states = def.get("States").ok_or_else(|| {
135 (
136 "States.Runtime".to_string(),
137 "Missing States in definition".to_string(),
138 )
139 })?;
140
141 let mut current_state = start_at;
142 let mut effective_input = input;
143 let mut iteration = 0;
144 let max_iterations = 500;
145
146 loop {
147 iteration += 1;
148 if iteration > max_iterations {
149 return Err((
150 "States.Runtime".to_string(),
151 "Maximum number of state transitions exceeded".to_string(),
152 ));
153 }
154
155 let state_def = states.get(¤t_state).cloned().ok_or_else(|| {
156 (
157 "States.Runtime".to_string(),
158 format!("State '{current_state}' not found in definition"),
159 )
160 })?;
161
162 let state_type = state_def["Type"]
163 .as_str()
164 .ok_or_else(|| {
165 (
166 "States.Runtime".to_string(),
167 format!("State '{current_state}' missing Type field"),
168 )
169 })?
170 .to_string();
171
172 debug!(
173 execution_arn = %execution_arn,
174 state = %current_state,
175 state_type = %state_type,
176 "Executing state"
177 );
178
179 let advance = match state_type.as_str() {
180 "Pass" => run_pass_state(
181 ¤t_state,
182 &state_def,
183 effective_input,
184 shared_state,
185 execution_arn,
186 ),
187 "Succeed" => run_succeed_state(
188 ¤t_state,
189 &state_def,
190 effective_input,
191 shared_state,
192 execution_arn,
193 ),
194 "Fail" => run_fail_state(
195 ¤t_state,
196 &state_def,
197 effective_input,
198 shared_state,
199 execution_arn,
200 ),
201 "Choice" => run_choice_state(
202 ¤t_state,
203 &state_def,
204 effective_input,
205 shared_state,
206 execution_arn,
207 ),
208 "Wait" => {
209 run_wait_state(
210 ¤t_state,
211 &state_def,
212 effective_input,
213 shared_state,
214 execution_arn,
215 )
216 .await
217 }
218 "Task" => {
219 run_task_state(
220 ¤t_state,
221 &state_def,
222 effective_input,
223 delivery,
224 dynamodb_state,
225 shared_state,
226 execution_arn,
227 )
228 .await
229 }
230 "Parallel" => {
231 run_parallel_state(
232 ¤t_state,
233 &state_def,
234 effective_input,
235 delivery,
236 dynamodb_state,
237 shared_state,
238 execution_arn,
239 )
240 .await
241 }
242 "Map" => {
243 run_map_state(
244 ¤t_state,
245 &state_def,
246 effective_input,
247 delivery,
248 dynamodb_state,
249 shared_state,
250 execution_arn,
251 )
252 .await
253 }
254 other => Advance::Fail(
255 "States.Runtime".to_string(),
256 format!("Unsupported state type: '{other}'"),
257 ),
258 };
259
260 match advance {
261 Advance::Next(next, new_input) => {
262 effective_input = new_input;
263 current_state = next;
264 }
265 Advance::End(output) => return Ok(output),
266 Advance::Fail(error, cause) => return Err((error, cause)),
267 }
268 }
269 })
270}
271
272enum Advance {
274 Next(String, Value),
276 End(Value),
278 Fail(String, String),
280}
281
282fn advance_from_next(state_def: &Value, input: Value) -> Advance {
283 match next_state(state_def) {
284 NextState::Name(next) => Advance::Next(next, input),
285 NextState::End => Advance::End(input),
286 NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
287 }
288}
289
290fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
291 match apply_state_catcher(state_def, input, &error, &cause) {
292 Some((next, new_input)) => Advance::Next(next, new_input),
293 None => Advance::Fail(error, cause),
294 }
295}
296
297fn run_pass_state(
298 name: &str,
299 state_def: &Value,
300 input: Value,
301 shared_state: &SharedStepFunctionsState,
302 execution_arn: &str,
303) -> Advance {
304 let entered_event_id = add_event(
305 shared_state,
306 execution_arn,
307 "PassStateEntered",
308 0,
309 json!({
310 "name": name,
311 "input": serde_json::to_string(&input).unwrap_or_default(),
312 }),
313 );
314
315 let result = execute_pass_state(state_def, &input);
316
317 add_event(
318 shared_state,
319 execution_arn,
320 "PassStateExited",
321 entered_event_id,
322 json!({
323 "name": name,
324 "output": serde_json::to_string(&result).unwrap_or_default(),
325 }),
326 );
327
328 advance_from_next(state_def, result)
329}
330
331fn run_succeed_state(
332 name: &str,
333 state_def: &Value,
334 input: Value,
335 shared_state: &SharedStepFunctionsState,
336 execution_arn: &str,
337) -> Advance {
338 add_event(
339 shared_state,
340 execution_arn,
341 "SucceedStateEntered",
342 0,
343 json!({
344 "name": name,
345 "input": serde_json::to_string(&input).unwrap_or_default(),
346 }),
347 );
348
349 let input_path = state_def["InputPath"].as_str();
350 let output_path = state_def["OutputPath"].as_str();
351
352 let processed = if input_path == Some("null") {
353 json!({})
354 } else {
355 apply_input_path(&input, input_path)
356 };
357
358 let output = if output_path == Some("null") {
359 json!({})
360 } else {
361 apply_output_path(&processed, output_path)
362 };
363
364 add_event(
365 shared_state,
366 execution_arn,
367 "SucceedStateExited",
368 0,
369 json!({
370 "name": name,
371 "output": serde_json::to_string(&output).unwrap_or_default(),
372 }),
373 );
374
375 Advance::End(output)
376}
377
378fn run_fail_state(
379 name: &str,
380 state_def: &Value,
381 input: Value,
382 shared_state: &SharedStepFunctionsState,
383 execution_arn: &str,
384) -> Advance {
385 let error = state_def["Error"]
386 .as_str()
387 .unwrap_or("States.Fail")
388 .to_string();
389 let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
390
391 add_event(
392 shared_state,
393 execution_arn,
394 "FailStateEntered",
395 0,
396 json!({
397 "name": name,
398 "input": serde_json::to_string(&input).unwrap_or_default(),
399 }),
400 );
401
402 Advance::Fail(error, cause)
403}
404
405fn run_choice_state(
406 name: &str,
407 state_def: &Value,
408 input: Value,
409 shared_state: &SharedStepFunctionsState,
410 execution_arn: &str,
411) -> Advance {
412 let entered_event_id = add_event(
413 shared_state,
414 execution_arn,
415 "ChoiceStateEntered",
416 0,
417 json!({
418 "name": name,
419 "input": serde_json::to_string(&input).unwrap_or_default(),
420 }),
421 );
422
423 let input_path = state_def["InputPath"].as_str();
424 let processed_input = if input_path == Some("null") {
425 json!({})
426 } else {
427 apply_input_path(&input, input_path)
428 };
429
430 match evaluate_choice(state_def, &processed_input) {
431 Some(next) => {
432 add_event(
433 shared_state,
434 execution_arn,
435 "ChoiceStateExited",
436 entered_event_id,
437 json!({
438 "name": name,
439 "output": serde_json::to_string(&input).unwrap_or_default(),
440 }),
441 );
442 Advance::Next(next, input)
443 }
444 None => Advance::Fail(
445 "States.NoChoiceMatched".to_string(),
446 format!("No choice rule matched and no Default in state '{name}'"),
447 ),
448 }
449}
450
451async fn run_wait_state(
452 name: &str,
453 state_def: &Value,
454 input: Value,
455 shared_state: &SharedStepFunctionsState,
456 execution_arn: &str,
457) -> Advance {
458 let entered_event_id = add_event(
459 shared_state,
460 execution_arn,
461 "WaitStateEntered",
462 0,
463 json!({
464 "name": name,
465 "input": serde_json::to_string(&input).unwrap_or_default(),
466 }),
467 );
468
469 execute_wait_state(state_def, &input).await;
470
471 add_event(
472 shared_state,
473 execution_arn,
474 "WaitStateExited",
475 entered_event_id,
476 json!({
477 "name": name,
478 "output": serde_json::to_string(&input).unwrap_or_default(),
479 }),
480 );
481
482 advance_from_next(state_def, input)
483}
484
485#[allow(clippy::too_many_arguments)]
486async fn run_task_state(
487 name: &str,
488 state_def: &Value,
489 input: Value,
490 delivery: &Option<Arc<DeliveryBus>>,
491 dynamodb_state: &Option<SharedDynamoDbState>,
492 shared_state: &SharedStepFunctionsState,
493 execution_arn: &str,
494) -> Advance {
495 let entered_event_id = add_event(
496 shared_state,
497 execution_arn,
498 "TaskStateEntered",
499 0,
500 json!({
501 "name": name,
502 "input": serde_json::to_string(&input).unwrap_or_default(),
503 }),
504 );
505
506 let result = execute_task_state(
507 state_def,
508 &input,
509 delivery,
510 dynamodb_state,
511 shared_state,
512 execution_arn,
513 entered_event_id,
514 )
515 .await;
516
517 match result {
518 Ok(output) => {
519 add_event(
520 shared_state,
521 execution_arn,
522 "TaskStateExited",
523 entered_event_id,
524 json!({
525 "name": name,
526 "output": serde_json::to_string(&output).unwrap_or_default(),
527 }),
528 );
529 advance_from_next(state_def, output)
530 }
531 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
532 }
533}
534
535#[allow(clippy::too_many_arguments)]
536async fn run_parallel_state(
537 name: &str,
538 state_def: &Value,
539 input: Value,
540 delivery: &Option<Arc<DeliveryBus>>,
541 dynamodb_state: &Option<SharedDynamoDbState>,
542 shared_state: &SharedStepFunctionsState,
543 execution_arn: &str,
544) -> Advance {
545 let entered_event_id = add_event(
546 shared_state,
547 execution_arn,
548 "ParallelStateEntered",
549 0,
550 json!({
551 "name": name,
552 "input": serde_json::to_string(&input).unwrap_or_default(),
553 }),
554 );
555
556 let result = execute_parallel_state(
557 state_def,
558 &input,
559 delivery,
560 dynamodb_state,
561 shared_state,
562 execution_arn,
563 )
564 .await;
565
566 match result {
567 Ok(output) => {
568 add_event(
569 shared_state,
570 execution_arn,
571 "ParallelStateExited",
572 entered_event_id,
573 json!({
574 "name": name,
575 "output": serde_json::to_string(&output).unwrap_or_default(),
576 }),
577 );
578 advance_from_next(state_def, output)
579 }
580 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
581 }
582}
583
584#[allow(clippy::too_many_arguments)]
585async fn run_map_state(
586 name: &str,
587 state_def: &Value,
588 input: Value,
589 delivery: &Option<Arc<DeliveryBus>>,
590 dynamodb_state: &Option<SharedDynamoDbState>,
591 shared_state: &SharedStepFunctionsState,
592 execution_arn: &str,
593) -> Advance {
594 let entered_event_id = add_event(
595 shared_state,
596 execution_arn,
597 "MapStateEntered",
598 0,
599 json!({
600 "name": name,
601 "input": serde_json::to_string(&input).unwrap_or_default(),
602 }),
603 );
604
605 let result = execute_map_state(
606 state_def,
607 &input,
608 delivery,
609 dynamodb_state,
610 shared_state,
611 execution_arn,
612 )
613 .await;
614
615 match result {
616 Ok(output) => {
617 add_event(
618 shared_state,
619 execution_arn,
620 "MapStateExited",
621 entered_event_id,
622 json!({
623 "name": name,
624 "output": serde_json::to_string(&output).unwrap_or_default(),
625 }),
626 );
627 advance_from_next(state_def, output)
628 }
629 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
630 }
631}
632
633async fn execute_wait_state(state_def: &Value, input: &Value) {
635 if let Some(seconds) = state_def["Seconds"].as_u64() {
636 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
637 return;
638 }
639
640 if let Some(path) = state_def["SecondsPath"].as_str() {
641 let val = crate::io_processing::resolve_path(input, path);
642 if let Some(seconds) = val.as_u64() {
643 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
644 }
645 return;
646 }
647
648 if let Some(ts_str) = state_def["Timestamp"].as_str() {
649 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
650 let now = Utc::now();
651 let target_utc = target.with_timezone(&chrono::Utc);
652 if target_utc > now {
653 let duration = (target_utc - now).to_std().unwrap_or_default();
654 tokio::time::sleep(duration).await;
655 }
656 }
657 return;
658 }
659
660 if let Some(path) = state_def["TimestampPath"].as_str() {
661 let val = crate::io_processing::resolve_path(input, path);
662 if let Some(ts_str) = val.as_str() {
663 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
664 let now = Utc::now();
665 let target_utc = target.with_timezone(&chrono::Utc);
666 if target_utc > now {
667 let duration = (target_utc - now).to_std().unwrap_or_default();
668 tokio::time::sleep(duration).await;
669 }
670 }
671 }
672 return;
673 }
674
675 warn!(
676 "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
677 );
678}
679
680fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
682 let input_path = state_def["InputPath"].as_str();
683 let result_path = state_def["ResultPath"].as_str();
684 let output_path = state_def["OutputPath"].as_str();
685
686 let effective_input = if input_path == Some("null") {
687 json!({})
688 } else {
689 apply_input_path(input, input_path)
690 };
691
692 let result = if let Some(r) = state_def.get("Result") {
693 r.clone()
694 } else {
695 effective_input.clone()
696 };
697
698 let after_result = if result_path == Some("null") {
699 input.clone()
700 } else {
701 apply_result_path(input, &result, result_path)
702 };
703
704 if output_path == Some("null") {
705 json!({})
706 } else {
707 apply_output_path(&after_result, output_path)
708 }
709}
710
711async fn execute_task_state(
714 state_def: &Value,
715 input: &Value,
716 delivery: &Option<Arc<DeliveryBus>>,
717 dynamodb_state: &Option<SharedDynamoDbState>,
718 shared_state: &SharedStepFunctionsState,
719 execution_arn: &str,
720 entered_event_id: i64,
721) -> Result<Value, (String, String)> {
722 let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
723
724 let input_path = state_def["InputPath"].as_str();
725 let result_path = state_def["ResultPath"].as_str();
726 let output_path = state_def["OutputPath"].as_str();
727
728 let effective_input = if input_path == Some("null") {
729 json!({})
730 } else {
731 apply_input_path(input, input_path)
732 };
733
734 let task_input = if let Some(params) = state_def.get("Parameters") {
735 apply_parameters(params, &effective_input)
736 } else {
737 effective_input
738 };
739
740 let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
741 let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
742 let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
743 let mut attempt = 0u32;
744
745 loop {
746 add_event(
747 shared_state,
748 execution_arn,
749 "TaskScheduled",
750 entered_event_id,
751 json!({
752 "resource": resource,
753 "region": "us-east-1",
754 "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
755 }),
756 );
757
758 add_event(
759 shared_state,
760 execution_arn,
761 "TaskStarted",
762 entered_event_id,
763 json!({ "resource": resource }),
764 );
765
766 let invoke_result = invoke_resource(
767 &resource,
768 &task_input,
769 delivery,
770 dynamodb_state,
771 timeout_seconds,
772 heartbeat_seconds,
773 shared_state,
774 )
775 .await;
776
777 match invoke_result {
778 Ok(result) => {
779 add_event(
780 shared_state,
781 execution_arn,
782 "TaskSucceeded",
783 entered_event_id,
784 json!({
785 "resource": resource,
786 "output": serde_json::to_string(&result).unwrap_or_default(),
787 }),
788 );
789
790 let selected = if let Some(selector) = state_def.get("ResultSelector") {
791 apply_parameters(selector, &result)
792 } else {
793 result
794 };
795
796 let after_result = if result_path == Some("null") {
797 input.clone()
798 } else {
799 apply_result_path(input, &selected, result_path)
800 };
801
802 let output = if output_path == Some("null") {
803 json!({})
804 } else {
805 apply_output_path(&after_result, output_path)
806 };
807
808 return Ok(output);
809 }
810 Err((error, cause)) => {
811 add_event(
812 shared_state,
813 execution_arn,
814 "TaskFailed",
815 entered_event_id,
816 json!({ "error": error, "cause": cause }),
817 );
818
819 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
820 attempt += 1;
821 let actual_delay = delay_ms.min(5000);
822 tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
823 continue;
824 }
825
826 return Err((error, cause));
827 }
828 }
829 }
830}
831
832async fn execute_parallel_state(
834 state_def: &Value,
835 input: &Value,
836 delivery: &Option<Arc<DeliveryBus>>,
837 dynamodb_state: &Option<SharedDynamoDbState>,
838 shared_state: &SharedStepFunctionsState,
839 execution_arn: &str,
840) -> Result<Value, (String, String)> {
841 let input_path = state_def["InputPath"].as_str();
842 let result_path = state_def["ResultPath"].as_str();
843 let output_path = state_def["OutputPath"].as_str();
844
845 let effective_input = if input_path == Some("null") {
846 json!({})
847 } else {
848 apply_input_path(input, input_path)
849 };
850
851 let branches = state_def["Branches"]
852 .as_array()
853 .cloned()
854 .unwrap_or_default();
855
856 if branches.is_empty() {
857 return Err((
858 "States.Runtime".to_string(),
859 "Parallel state has no Branches".to_string(),
860 ));
861 }
862
863 let mut handles = Vec::new();
865 for branch_def in &branches {
866 let branch = branch_def.clone();
867 let branch_input = effective_input.clone();
868 let delivery = delivery.clone();
869 let ddb = dynamodb_state.clone();
870 let state = shared_state.clone();
871 let arn = execution_arn.to_string();
872
873 handles.push(tokio::spawn(async move {
874 run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
875 }));
876 }
877
878 let mut results = Vec::with_capacity(handles.len());
880 for handle in handles {
881 let result = handle.await.map_err(|e| {
882 (
883 "States.Runtime".to_string(),
884 format!("Branch execution panicked: {e}"),
885 )
886 })??;
887 results.push(result);
888 }
889
890 let branch_output = Value::Array(results);
891
892 let selected = if let Some(selector) = state_def.get("ResultSelector") {
894 apply_parameters(selector, &branch_output)
895 } else {
896 branch_output
897 };
898
899 let after_result = if result_path == Some("null") {
901 input.clone()
902 } else {
903 apply_result_path(input, &selected, result_path)
904 };
905
906 let output = if output_path == Some("null") {
908 json!({})
909 } else {
910 apply_output_path(&after_result, output_path)
911 };
912
913 Ok(output)
914}
915
916async fn execute_map_state(
918 state_def: &Value,
919 input: &Value,
920 delivery: &Option<Arc<DeliveryBus>>,
921 dynamodb_state: &Option<SharedDynamoDbState>,
922 shared_state: &SharedStepFunctionsState,
923 execution_arn: &str,
924) -> Result<Value, (String, String)> {
925 let input_path = state_def["InputPath"].as_str();
926 let result_path = state_def["ResultPath"].as_str();
927 let output_path = state_def["OutputPath"].as_str();
928
929 let effective_input = if input_path == Some("null") {
930 json!({})
931 } else {
932 apply_input_path(input, input_path)
933 };
934
935 let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
937 let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
938 let items = items_value.as_array().cloned().unwrap_or_default();
939
940 let iterator_def = state_def
942 .get("ItemProcessor")
943 .or_else(|| state_def.get("Iterator"))
944 .cloned()
945 .ok_or_else(|| {
946 (
947 "States.Runtime".to_string(),
948 "Map state has no ItemProcessor or Iterator".to_string(),
949 )
950 })?;
951
952 let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
953 let effective_concurrency = if max_concurrency == 0 {
954 40
955 } else {
956 max_concurrency as usize
957 };
958
959 let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
960
961 let mut handles = Vec::new();
963 for (index, item) in items.into_iter().enumerate() {
964 let iter_def = iterator_def.clone();
965 let delivery = delivery.clone();
966 let ddb = dynamodb_state.clone();
967 let state = shared_state.clone();
968 let arn = execution_arn.to_string();
969 let sem = semaphore.clone();
970
971 let item_input = if let Some(selector) = state_def.get("ItemSelector") {
973 let mut ctx = serde_json::Map::new();
974 ctx.insert("value".to_string(), item.clone());
975 ctx.insert("index".to_string(), json!(index));
976 apply_parameters(selector, &Value::Object(ctx))
977 } else {
978 item
979 };
980
981 add_event(
982 shared_state,
983 execution_arn,
984 "MapIterationStarted",
985 0,
986 json!({ "index": index }),
987 );
988
989 handles.push(tokio::spawn(async move {
990 let _permit = sem
991 .acquire()
992 .await
993 .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
994 let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
995 Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
996 }));
997 }
998
999 let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
1001 for handle in handles {
1002 let (index, result) = handle.await.map_err(|e| {
1003 (
1004 "States.Runtime".to_string(),
1005 format!("Map iteration panicked: {e}"),
1006 )
1007 })??;
1008
1009 match result {
1010 Ok(output) => {
1011 add_event(
1012 shared_state,
1013 execution_arn,
1014 "MapIterationSucceeded",
1015 0,
1016 json!({ "index": index }),
1017 );
1018 results.push((index, output));
1019 }
1020 Err((error, cause)) => {
1021 add_event(
1022 shared_state,
1023 execution_arn,
1024 "MapIterationFailed",
1025 0,
1026 json!({ "index": index, "error": error }),
1027 );
1028 return Err((error, cause));
1029 }
1030 }
1031 }
1032
1033 results.sort_by_key(|(i, _)| *i);
1035 let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
1036
1037 let selected = if let Some(selector) = state_def.get("ResultSelector") {
1039 apply_parameters(selector, &map_output)
1040 } else {
1041 map_output
1042 };
1043
1044 let after_result = if result_path == Some("null") {
1046 input.clone()
1047 } else {
1048 apply_result_path(input, &selected, result_path)
1049 };
1050
1051 let output = if output_path == Some("null") {
1053 json!({})
1054 } else {
1055 apply_output_path(&after_result, output_path)
1056 };
1057
1058 Ok(output)
1059}
1060
1061#[allow(clippy::too_many_arguments)]
1063async fn invoke_resource(
1064 resource: &str,
1065 input: &Value,
1066 delivery: &Option<Arc<DeliveryBus>>,
1067 dynamodb_state: &Option<SharedDynamoDbState>,
1068 timeout_seconds: Option<u64>,
1069 heartbeat_seconds: Option<u64>,
1070 shared_state: &SharedStepFunctionsState,
1071) -> Result<Value, (String, String)> {
1072 if resource.contains(":states:") && resource.contains(":activity:") {
1074 return invoke_activity(
1075 resource,
1076 input,
1077 shared_state,
1078 timeout_seconds,
1079 heartbeat_seconds,
1080 )
1081 .await;
1082 }
1083
1084 if resource.contains(":lambda:") && resource.contains(":function:") {
1086 return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1087 }
1088
1089 if resource.starts_with("arn:aws:states:::lambda:invoke") {
1091 let function_name = input["FunctionName"].as_str().unwrap_or("");
1092 let payload = if let Some(p) = input.get("Payload") {
1093 p.clone()
1094 } else {
1095 input.clone()
1096 };
1097 return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1098 }
1099
1100 if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1101 return invoke_sqs_send_message(input, delivery);
1102 }
1103
1104 if resource.starts_with("arn:aws:states:::sns:publish") {
1105 return invoke_sns_publish(input, delivery);
1106 }
1107
1108 if resource.starts_with("arn:aws:states:::events:putEvents") {
1109 return invoke_eventbridge_put_events(input, delivery);
1110 }
1111
1112 if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1113 return invoke_dynamodb_get_item(input, dynamodb_state);
1114 }
1115
1116 if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1117 return invoke_dynamodb_put_item(input, dynamodb_state);
1118 }
1119
1120 if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1121 return invoke_dynamodb_delete_item(input, dynamodb_state);
1122 }
1123
1124 if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1125 return invoke_dynamodb_update_item(input, dynamodb_state);
1126 }
1127
1128 Err((
1129 "States.TaskFailed".to_string(),
1130 format!("Unsupported resource: {resource}"),
1131 ))
1132}
1133
1134fn invoke_sqs_send_message(
1136 input: &Value,
1137 delivery: &Option<Arc<DeliveryBus>>,
1138) -> Result<Value, (String, String)> {
1139 let delivery = delivery.as_ref().ok_or_else(|| {
1140 (
1141 "States.TaskFailed".to_string(),
1142 "No delivery bus configured for SQS".to_string(),
1143 )
1144 })?;
1145
1146 let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1147 (
1148 "States.TaskFailed".to_string(),
1149 "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1150 )
1151 })?;
1152
1153 let message_body = input["MessageBody"]
1154 .as_str()
1155 .map(|s| s.to_string())
1156 .unwrap_or_else(|| {
1157 serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1159 });
1160
1161 let queue_arn = queue_url_to_arn(queue_url);
1165
1166 delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1167
1168 Ok(json!({
1169 "MessageId": uuid::Uuid::new_v4().to_string(),
1170 "MD5OfMessageBody": md5_hex(&message_body),
1171 }))
1172}
1173
1174fn invoke_sns_publish(
1176 input: &Value,
1177 delivery: &Option<Arc<DeliveryBus>>,
1178) -> Result<Value, (String, String)> {
1179 let delivery = delivery.as_ref().ok_or_else(|| {
1180 (
1181 "States.TaskFailed".to_string(),
1182 "No delivery bus configured for SNS".to_string(),
1183 )
1184 })?;
1185
1186 let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1187 (
1188 "States.TaskFailed".to_string(),
1189 "Missing TopicArn in SNS publish parameters".to_string(),
1190 )
1191 })?;
1192
1193 let message = input["Message"]
1194 .as_str()
1195 .map(|s| s.to_string())
1196 .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1197
1198 let subject = input["Subject"].as_str();
1199
1200 delivery.publish_to_sns(topic_arn, &message, subject);
1201
1202 Ok(json!({
1203 "MessageId": uuid::Uuid::new_v4().to_string(),
1204 }))
1205}
1206
1207fn invoke_eventbridge_put_events(
1209 input: &Value,
1210 delivery: &Option<Arc<DeliveryBus>>,
1211) -> Result<Value, (String, String)> {
1212 let delivery = delivery.as_ref().ok_or_else(|| {
1213 (
1214 "States.TaskFailed".to_string(),
1215 "No delivery bus configured for EventBridge".to_string(),
1216 )
1217 })?;
1218
1219 let entries = input["Entries"]
1220 .as_array()
1221 .ok_or_else(|| {
1222 (
1223 "States.TaskFailed".to_string(),
1224 "Missing Entries in EventBridge putEvents parameters".to_string(),
1225 )
1226 })?
1227 .clone();
1228
1229 let mut event_ids = Vec::new();
1230 for entry in &entries {
1231 let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1232 let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1233 let detail = entry["Detail"]
1234 .as_str()
1235 .map(|s| s.to_string())
1236 .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1237 let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1238
1239 delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1240 event_ids.push(uuid::Uuid::new_v4().to_string());
1241 }
1242
1243 Ok(json!({
1244 "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1245 "FailedEntryCount": 0,
1246 }))
1247}
1248
1249fn invoke_dynamodb_get_item(
1251 input: &Value,
1252 dynamodb_state: &Option<SharedDynamoDbState>,
1253) -> Result<Value, (String, String)> {
1254 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1255 (
1256 "States.TaskFailed".to_string(),
1257 "No DynamoDB state configured".to_string(),
1258 )
1259 })?;
1260
1261 let table_name = input["TableName"].as_str().ok_or_else(|| {
1262 (
1263 "States.TaskFailed".to_string(),
1264 "Missing TableName in DynamoDB getItem parameters".to_string(),
1265 )
1266 })?;
1267
1268 let key = input
1269 .get("Key")
1270 .and_then(|k| k.as_object())
1271 .ok_or_else(|| {
1272 (
1273 "States.TaskFailed".to_string(),
1274 "Missing Key in DynamoDB getItem parameters".to_string(),
1275 )
1276 })?;
1277
1278 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1279
1280 let __mas = ddb.read();
1281 let state = __mas.default_ref();
1282 let table = state.tables.get(table_name).ok_or_else(|| {
1283 (
1284 "States.TaskFailed".to_string(),
1285 format!("Table '{table_name}' not found"),
1286 )
1287 })?;
1288
1289 let item = table
1290 .find_item_index(&key_map)
1291 .map(|idx| table.items[idx].clone());
1292
1293 match item {
1294 Some(item_map) => {
1295 let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1296 Ok(json!({ "Item": item_value }))
1297 }
1298 None => Ok(json!({})),
1299 }
1300}
1301
1302fn invoke_dynamodb_put_item(
1304 input: &Value,
1305 dynamodb_state: &Option<SharedDynamoDbState>,
1306) -> Result<Value, (String, String)> {
1307 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1308 (
1309 "States.TaskFailed".to_string(),
1310 "No DynamoDB state configured".to_string(),
1311 )
1312 })?;
1313
1314 let table_name = input["TableName"].as_str().ok_or_else(|| {
1315 (
1316 "States.TaskFailed".to_string(),
1317 "Missing TableName in DynamoDB putItem parameters".to_string(),
1318 )
1319 })?;
1320
1321 let item = input
1322 .get("Item")
1323 .and_then(|i| i.as_object())
1324 .ok_or_else(|| {
1325 (
1326 "States.TaskFailed".to_string(),
1327 "Missing Item in DynamoDB putItem parameters".to_string(),
1328 )
1329 })?;
1330
1331 let item_map: HashMap<String, Value> =
1332 item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1333
1334 let mut __mas = ddb.write();
1335 let state = __mas.default_mut();
1336 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1337 (
1338 "States.TaskFailed".to_string(),
1339 format!("Table '{table_name}' not found"),
1340 )
1341 })?;
1342
1343 if let Some(idx) = table.find_item_index(&item_map) {
1345 table.items[idx] = item_map;
1346 } else {
1347 table.items.push(item_map);
1348 }
1349
1350 Ok(json!({}))
1351}
1352
1353fn invoke_dynamodb_delete_item(
1355 input: &Value,
1356 dynamodb_state: &Option<SharedDynamoDbState>,
1357) -> Result<Value, (String, String)> {
1358 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1359 (
1360 "States.TaskFailed".to_string(),
1361 "No DynamoDB state configured".to_string(),
1362 )
1363 })?;
1364
1365 let table_name = input["TableName"].as_str().ok_or_else(|| {
1366 (
1367 "States.TaskFailed".to_string(),
1368 "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1369 )
1370 })?;
1371
1372 let key = input
1373 .get("Key")
1374 .and_then(|k| k.as_object())
1375 .ok_or_else(|| {
1376 (
1377 "States.TaskFailed".to_string(),
1378 "Missing Key in DynamoDB deleteItem parameters".to_string(),
1379 )
1380 })?;
1381
1382 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1383
1384 let mut __mas = ddb.write();
1385 let state = __mas.default_mut();
1386 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1387 (
1388 "States.TaskFailed".to_string(),
1389 format!("Table '{table_name}' not found"),
1390 )
1391 })?;
1392
1393 if let Some(idx) = table.find_item_index(&key_map) {
1394 table.items.remove(idx);
1395 }
1396
1397 Ok(json!({}))
1398}
1399
1400fn invoke_dynamodb_update_item(
1405 input: &Value,
1406 dynamodb_state: &Option<SharedDynamoDbState>,
1407) -> Result<Value, (String, String)> {
1408 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1409 (
1410 "States.TaskFailed".to_string(),
1411 "No DynamoDB state configured".to_string(),
1412 )
1413 })?;
1414
1415 let table_name = input["TableName"].as_str().ok_or_else(|| {
1416 (
1417 "States.TaskFailed".to_string(),
1418 "Missing TableName in DynamoDB updateItem parameters".to_string(),
1419 )
1420 })?;
1421
1422 let key = input
1423 .get("Key")
1424 .and_then(|k| k.as_object())
1425 .ok_or_else(|| {
1426 (
1427 "States.TaskFailed".to_string(),
1428 "Missing Key in DynamoDB updateItem parameters".to_string(),
1429 )
1430 })?;
1431
1432 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1433
1434 let mut __mas = ddb.write();
1435 let state = __mas.default_mut();
1436 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1437 (
1438 "States.TaskFailed".to_string(),
1439 format!("Table '{table_name}' not found"),
1440 )
1441 })?;
1442
1443 if let Some(update_expr) = input["UpdateExpression"].as_str() {
1445 let attr_values = input
1446 .get("ExpressionAttributeValues")
1447 .and_then(|v| v.as_object())
1448 .cloned()
1449 .unwrap_or_default();
1450 let attr_names = input
1451 .get("ExpressionAttributeNames")
1452 .and_then(|v| v.as_object())
1453 .cloned()
1454 .unwrap_or_default();
1455
1456 if let Some(idx) = table.find_item_index(&key_map) {
1457 apply_update_expression(
1458 &mut table.items[idx],
1459 update_expr,
1460 &attr_values,
1461 &attr_names,
1462 );
1463 } else {
1464 let mut new_item = key_map;
1466 apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1467 table.items.push(new_item);
1468 }
1469 }
1470
1471 Ok(json!({}))
1472}
1473
1474fn apply_update_expression(
1476 item: &mut HashMap<String, Value>,
1477 expr: &str,
1478 attr_values: &serde_json::Map<String, Value>,
1479 attr_names: &serde_json::Map<String, Value>,
1480) {
1481 let clauses = split_update_clauses(expr);
1486 for (clause, body) in clauses {
1487 match clause {
1488 UpdateClause::Set => apply_set(item, &body, attr_values, attr_names),
1489 UpdateClause::Remove => apply_remove(item, &body, attr_names),
1490 UpdateClause::Add => apply_add(item, &body, attr_values, attr_names),
1491 UpdateClause::Delete => apply_delete(item, &body, attr_values, attr_names),
1492 }
1493 }
1494}
1495
1496#[derive(Clone, Copy)]
1497enum UpdateClause {
1498 Set,
1499 Remove,
1500 Add,
1501 Delete,
1502}
1503
1504fn split_update_clauses(expr: &str) -> Vec<(UpdateClause, String)> {
1505 let mut out = Vec::new();
1506 let mut current: Option<UpdateClause> = None;
1507 let mut buf = String::new();
1508 for token in expr.split_whitespace() {
1509 let upper = token.to_ascii_uppercase();
1510 let next_clause = match upper.as_str() {
1511 "SET" => Some(UpdateClause::Set),
1512 "REMOVE" => Some(UpdateClause::Remove),
1513 "ADD" => Some(UpdateClause::Add),
1514 "DELETE" => Some(UpdateClause::Delete),
1515 _ => None,
1516 };
1517 if let Some(nc) = next_clause {
1518 if let Some(prev) = current.take() {
1519 out.push((prev, buf.trim().to_string()));
1520 buf.clear();
1521 }
1522 current = Some(nc);
1523 } else if current.is_some() {
1524 if !buf.is_empty() {
1525 buf.push(' ');
1526 }
1527 buf.push_str(token);
1528 }
1529 }
1530 if let Some(c) = current {
1531 out.push((c, buf.trim().to_string()));
1532 }
1533 out
1534}
1535
1536fn resolve_attr_name(token: &str, attr_names: &serde_json::Map<String, Value>) -> String {
1537 if token.starts_with('#') {
1538 attr_names
1539 .get(token)
1540 .and_then(|v| v.as_str())
1541 .unwrap_or(token)
1542 .to_string()
1543 } else {
1544 token.to_string()
1545 }
1546}
1547
1548fn apply_set(
1549 item: &mut HashMap<String, Value>,
1550 body: &str,
1551 attr_values: &serde_json::Map<String, Value>,
1552 attr_names: &serde_json::Map<String, Value>,
1553) {
1554 for assignment in split_top_commas(body) {
1555 let Some((lhs, rhs)) = assignment.split_once('=') else {
1556 continue;
1557 };
1558 let attr_name = resolve_attr_name(lhs.trim(), attr_names);
1559 let value = evaluate_set_rhs(rhs.trim(), &attr_name, item, attr_values, attr_names);
1560 if let Some(v) = value {
1561 item.insert(attr_name, v);
1562 }
1563 }
1564}
1565
1566fn evaluate_set_rhs(
1567 rhs: &str,
1568 attr_name: &str,
1569 item: &HashMap<String, Value>,
1570 attr_values: &serde_json::Map<String, Value>,
1571 attr_names: &serde_json::Map<String, Value>,
1572) -> Option<Value> {
1573 if let Some(args) = rhs
1575 .strip_prefix("if_not_exists(")
1576 .and_then(|s| s.strip_suffix(')'))
1577 {
1578 let parts: Vec<&str> = args.splitn(2, ',').collect();
1579 if parts.len() == 2 {
1580 let path = resolve_attr_name(parts[0].trim(), attr_names);
1581 if item.contains_key(&path) {
1582 return item.get(&path).cloned();
1583 }
1584 return resolve_value(parts[1].trim(), attr_values);
1585 }
1586 return None;
1587 }
1588 for op in ['+', '-'] {
1590 if let Some((left, right)) = split_top_op(rhs, op) {
1591 let left = left.trim();
1592 let right = right.trim();
1593 let left_val = if left.starts_with(':') {
1594 resolve_value(left, attr_values)
1595 } else {
1596 let name = resolve_attr_name(left, attr_names);
1597 item.get(&name).cloned()
1598 };
1599 let right_val = if right.starts_with(':') {
1600 resolve_value(right, attr_values)
1601 } else {
1602 let name = resolve_attr_name(right, attr_names);
1603 item.get(&name).cloned()
1604 };
1605 return arithmetic(left_val.as_ref(), op, right_val.as_ref());
1606 }
1607 }
1608 if rhs.starts_with(':') {
1610 return resolve_value(rhs, attr_values);
1611 }
1612 if rhs.starts_with('#') {
1613 let _ = attr_name;
1614 let name = resolve_attr_name(rhs, attr_names);
1615 return item.get(&name).cloned();
1616 }
1617 None
1618}
1619
1620fn arithmetic(left: Option<&Value>, op: char, right: Option<&Value>) -> Option<Value> {
1621 let lf = number_from_dynamo(left?)?;
1622 let rf = number_from_dynamo(right?)?;
1623 let out = match op {
1624 '+' => lf + rf,
1625 '-' => lf - rf,
1626 _ => return None,
1627 };
1628 Some(json!({ "N": format_number(out) }))
1629}
1630
1631fn number_from_dynamo(v: &Value) -> Option<f64> {
1632 v.get("N")?.as_str()?.parse().ok()
1633}
1634
1635fn format_number(n: f64) -> String {
1636 if n.fract() == 0.0 && n.is_finite() && n >= i64::MIN as f64 && n < i64::MAX as f64 {
1640 format!("{}", n as i64)
1641 } else {
1642 format!("{n}")
1643 }
1644}
1645
1646fn resolve_value(token: &str, attr_values: &serde_json::Map<String, Value>) -> Option<Value> {
1647 attr_values.get(token).cloned()
1648}
1649
1650fn apply_remove(
1651 item: &mut HashMap<String, Value>,
1652 body: &str,
1653 attr_names: &serde_json::Map<String, Value>,
1654) {
1655 for path in split_top_commas(body) {
1656 let name = resolve_attr_name(path.trim(), attr_names);
1657 item.remove(&name);
1658 }
1659}
1660
1661fn apply_add(
1662 item: &mut HashMap<String, Value>,
1663 body: &str,
1664 attr_values: &serde_json::Map<String, Value>,
1665 attr_names: &serde_json::Map<String, Value>,
1666) {
1667 for clause in split_top_commas(body) {
1671 let mut parts = clause.split_whitespace();
1672 let Some(path) = parts.next() else { continue };
1673 let Some(value_ref) = parts.next() else {
1674 continue;
1675 };
1676 let attr_name = resolve_attr_name(path, attr_names);
1677 let Some(delta) = resolve_value(value_ref, attr_values) else {
1678 continue;
1679 };
1680 let current = item.get(&attr_name).cloned();
1681 let next = match (current.as_ref(), &delta) {
1682 (None, _) => delta.clone(),
1683 (Some(cur), _) => arithmetic(Some(cur), '+', Some(&delta)).unwrap_or(delta.clone()),
1684 };
1685 item.insert(attr_name, next);
1686 }
1687}
1688
1689fn apply_delete(
1690 item: &mut HashMap<String, Value>,
1691 body: &str,
1692 attr_values: &serde_json::Map<String, Value>,
1693 attr_names: &serde_json::Map<String, Value>,
1694) {
1695 for clause in split_top_commas(body) {
1698 let mut parts = clause.split_whitespace();
1699 let Some(path) = parts.next() else { continue };
1700 let Some(value_ref) = parts.next() else {
1701 continue;
1702 };
1703 let attr_name = resolve_attr_name(path, attr_names);
1704 let Some(elements) = resolve_value(value_ref, attr_values) else {
1705 continue;
1706 };
1707 let Some(current) = item.get_mut(&attr_name) else {
1708 continue;
1709 };
1710 for set_kind in ["SS", "NS", "BS"] {
1711 if let (Some(cur_arr), Some(rem_arr)) = (
1712 current.get_mut(set_kind).and_then(|v| v.as_array_mut()),
1713 elements.get(set_kind).and_then(|v| v.as_array()),
1714 ) {
1715 let to_remove: std::collections::HashSet<String> = rem_arr
1716 .iter()
1717 .filter_map(|v| v.as_str().map(String::from))
1718 .collect();
1719 cur_arr.retain(|v| !v.as_str().is_some_and(|s| to_remove.contains(s)));
1720 if cur_arr.is_empty() {
1721 item.remove(&attr_name);
1722 }
1723 break;
1724 }
1725 }
1726 }
1727}
1728
1729fn split_top_commas(s: &str) -> Vec<String> {
1730 let mut out = Vec::new();
1733 let mut depth = 0i32;
1734 let mut buf = String::new();
1735 for c in s.chars() {
1736 match c {
1737 '(' => {
1738 depth += 1;
1739 buf.push(c);
1740 }
1741 ')' => {
1742 depth -= 1;
1743 buf.push(c);
1744 }
1745 ',' if depth == 0 => {
1746 out.push(std::mem::take(&mut buf).trim().to_string());
1747 }
1748 _ => buf.push(c),
1749 }
1750 }
1751 if !buf.trim().is_empty() {
1752 out.push(buf.trim().to_string());
1753 }
1754 out
1755}
1756
1757fn split_top_op(s: &str, op: char) -> Option<(&str, &str)> {
1758 let mut depth = 0i32;
1759 for (i, c) in s.char_indices() {
1760 match c {
1761 '(' => depth += 1,
1762 ')' => depth -= 1,
1763 c if c == op && depth == 0 && i > 0 => {
1764 return Some((&s[..i], &s[i + c.len_utf8()..]));
1765 }
1766 _ => {}
1767 }
1768 }
1769 None
1770}
1771
1772fn queue_url_to_arn(url: &str) -> String {
1775 let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1776 if parts.len() >= 2 {
1777 let queue_name = parts[0];
1778 let account_id = parts[1];
1779 Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
1780 } else {
1781 url.to_string()
1782 }
1783}
1784
1785fn md5_hex(data: &str) -> String {
1787 use md5::Digest;
1788 let result = md5::Md5::digest(data.as_bytes());
1789 format!("{result:032x}")
1790}
1791
1792async fn invoke_lambda_direct(
1794 function_arn: &str,
1795 input: &Value,
1796 delivery: &Option<Arc<DeliveryBus>>,
1797 timeout_seconds: Option<u64>,
1798) -> Result<Value, (String, String)> {
1799 let delivery = delivery.as_ref().ok_or_else(|| {
1800 (
1801 "States.TaskFailed".to_string(),
1802 "No delivery bus configured for Lambda invocation".to_string(),
1803 )
1804 })?;
1805
1806 let payload = serde_json::to_string(input).unwrap_or_default();
1807
1808 let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1809
1810 let result = if let Some(timeout) = timeout_seconds {
1811 match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1812 Ok(r) => r,
1813 Err(_) => {
1814 return Err((
1815 "States.Timeout".to_string(),
1816 format!("Task timed out after {timeout} seconds"),
1817 ));
1818 }
1819 }
1820 } else {
1821 invoke_future.await
1822 };
1823
1824 match result {
1825 Some(Ok(bytes)) => {
1826 let response_str = String::from_utf8_lossy(&bytes);
1827 let value: Value =
1828 serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1829 Ok(value)
1830 }
1831 Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1832 None => {
1833 Ok(json!({}))
1835 }
1836 }
1837}
1838
1839async fn invoke_activity(
1844 activity_arn: &str,
1845 input: &Value,
1846 shared_state: &SharedStepFunctionsState,
1847 timeout_seconds: Option<u64>,
1848 heartbeat_seconds: Option<u64>,
1849) -> Result<Value, (String, String)> {
1850 use crate::state::TaskTokenState;
1851
1852 let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
1854 {
1855 let accounts = shared_state.read();
1856 let exists = accounts
1857 .get(&activity_account)
1858 .map(|s| s.activities.contains_key(activity_arn))
1859 .unwrap_or(false);
1860 if !exists {
1861 return Err((
1862 "States.TaskFailed".to_string(),
1863 format!("Activity does not exist: {activity_arn}"),
1864 ));
1865 }
1866 }
1867
1868 let token = format!(
1869 "FCToken-{}-{}",
1870 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
1871 uuid::Uuid::new_v4().simple(),
1872 );
1873 let now = chrono::Utc::now();
1874 let input_str = serde_json::to_string(input).unwrap_or_else(|_| "{}".to_string());
1875 {
1876 let mut accounts = shared_state.write();
1877 let state = accounts.get_or_create(&activity_account);
1878 state.task_tokens.insert(
1879 token.clone(),
1880 TaskTokenState {
1881 activity_arn: activity_arn.to_string(),
1882 status: "PENDING".to_string(),
1883 output: None,
1884 error: None,
1885 cause: None,
1886 input: Some(input_str),
1887 created_at: now,
1888 last_heartbeat_at: None,
1889 heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
1890 timeout_seconds: timeout_seconds.map(|s| s as i64),
1891 },
1892 );
1893 }
1894
1895 let absolute_deadline =
1899 std::time::Instant::now() + std::time::Duration::from_secs(timeout_seconds.unwrap_or(3600));
1900 loop {
1901 let now_ts = chrono::Utc::now();
1902 let snapshot = {
1903 let accounts = shared_state.read();
1904 accounts
1905 .get(&activity_account)
1906 .and_then(|s| s.task_tokens.get(&token).cloned())
1907 };
1908 let Some(entry) = snapshot else {
1909 return Err((
1910 "States.TaskFailed".to_string(),
1911 "Activity task token disappeared".to_string(),
1912 ));
1913 };
1914 match entry.status.as_str() {
1915 "SUCCEEDED" => {
1916 cleanup_token(shared_state, &activity_account, &token);
1917 let output = entry.output.unwrap_or_else(|| "{}".to_string());
1918 let value: Value = serde_json::from_str(&output).unwrap_or(Value::String(output));
1919 return Ok(value);
1920 }
1921 "FAILED" => {
1922 cleanup_token(shared_state, &activity_account, &token);
1923 return Err((
1924 entry
1925 .error
1926 .unwrap_or_else(|| "States.TaskFailed".to_string()),
1927 entry.cause.unwrap_or_default(),
1928 ));
1929 }
1930 _ => {}
1931 }
1932 if entry.status == "IN_PROGRESS" {
1935 if let Some(hb) = entry.heartbeat_seconds {
1936 let last = entry.last_heartbeat_at.unwrap_or(entry.created_at);
1937 if (now_ts - last).num_seconds() > hb {
1938 cleanup_token(shared_state, &activity_account, &token);
1939 return Err((
1940 "States.HeartbeatTimeout".to_string(),
1941 format!("Activity worker missed heartbeat ({hb}s window)"),
1942 ));
1943 }
1944 }
1945 }
1946 if std::time::Instant::now() >= absolute_deadline {
1947 cleanup_token(shared_state, &activity_account, &token);
1948 let secs = timeout_seconds.unwrap_or(3600);
1949 return Err((
1950 "States.Timeout".to_string(),
1951 format!("Activity timed out after {secs} seconds"),
1952 ));
1953 }
1954 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1955 }
1956}
1957
1958fn cleanup_token(shared_state: &SharedStepFunctionsState, account_id: &str, token: &str) {
1959 let mut accounts = shared_state.write();
1960 if let Some(state) = accounts.get_mut(account_id) {
1961 state.task_tokens.remove(token);
1962 }
1963}
1964
1965fn apply_parameters(template: &Value, input: &Value) -> Value {
1967 match template {
1968 Value::Object(map) => {
1969 let mut result = serde_json::Map::new();
1970 for (key, value) in map {
1971 if let Some(stripped) = key.strip_suffix(".$") {
1972 if let Some(path) = value.as_str() {
1973 result.insert(
1974 stripped.to_string(),
1975 crate::io_processing::resolve_path(input, path),
1976 );
1977 }
1978 } else {
1979 result.insert(key.clone(), apply_parameters(value, input));
1980 }
1981 }
1982 Value::Object(result)
1983 }
1984 Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1985 other => other.clone(),
1986 }
1987}
1988
1989enum NextState {
1990 Name(String),
1991 End,
1992 Error(String),
1993}
1994
1995fn next_state(state_def: &Value) -> NextState {
1996 if state_def["End"].as_bool() == Some(true) {
1997 return NextState::End;
1998 }
1999 match state_def["Next"].as_str() {
2000 Some(next) => NextState::Name(next.to_string()),
2001 None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
2002 }
2003}
2004
2005fn apply_state_catcher(
2010 state_def: &Value,
2011 effective_input: &Value,
2012 error: &str,
2013 cause: &str,
2014) -> Option<(String, Value)> {
2015 let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
2016 let (next, result_path) = find_catcher(&catchers, error)?;
2017 let error_output = json!({
2018 "Error": error,
2019 "Cause": cause,
2020 });
2021 let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
2022 Some((next, new_input))
2023}
2024
2025fn account_id_from_arn(arn: &str) -> &str {
2027 arn.split(':').nth(4).unwrap_or("000000000000")
2028}
2029
2030fn add_event(
2031 state: &SharedStepFunctionsState,
2032 execution_arn: &str,
2033 event_type: &str,
2034 previous_event_id: i64,
2035 details: Value,
2036) -> i64 {
2037 let account_id = account_id_from_arn(execution_arn).to_string();
2038 let mut accounts = state.write();
2039 let s = accounts.get_or_create(&account_id);
2040 if let Some(exec) = s.executions.get_mut(execution_arn) {
2041 let id = exec.history_events.len() as i64 + 1;
2042 exec.history_events.push(HistoryEvent {
2043 id,
2044 event_type: event_type.to_string(),
2045 timestamp: Utc::now(),
2046 previous_event_id,
2047 details,
2048 });
2049 id
2050 } else {
2051 0
2052 }
2053}
2054
2055fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
2056 let account_id = account_id_from_arn(execution_arn).to_string();
2057 {
2059 let accounts = state.read();
2060 if let Some(s) = accounts.get(&account_id) {
2061 if let Some(exec) = s.executions.get(execution_arn) {
2062 if exec.status != ExecutionStatus::Running {
2063 return;
2064 }
2065 }
2066 }
2067 }
2068
2069 let output_str = serde_json::to_string(output).unwrap_or_default();
2070
2071 add_event(
2072 state,
2073 execution_arn,
2074 "ExecutionSucceeded",
2075 0,
2076 json!({ "output": output_str }),
2077 );
2078
2079 let mut accounts = state.write();
2080 let s = accounts.get_or_create(&account_id);
2081 if let Some(exec) = s.executions.get_mut(execution_arn) {
2082 exec.status = ExecutionStatus::Succeeded;
2083 exec.output = Some(output_str);
2084 exec.stop_date = Some(Utc::now());
2085 }
2086}
2087
2088fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
2089 let account_id = account_id_from_arn(execution_arn).to_string();
2090 {
2092 let accounts = state.read();
2093 if let Some(s) = accounts.get(&account_id) {
2094 if let Some(exec) = s.executions.get(execution_arn) {
2095 if exec.status != ExecutionStatus::Running {
2096 return;
2097 }
2098 }
2099 }
2100 }
2101
2102 add_event(
2103 state,
2104 execution_arn,
2105 "ExecutionFailed",
2106 0,
2107 json!({ "error": error, "cause": cause }),
2108 );
2109
2110 let mut accounts = state.write();
2111 let s = accounts.get_or_create(&account_id);
2112 if let Some(exec) = s.executions.get_mut(execution_arn) {
2113 exec.status = ExecutionStatus::Failed;
2114 exec.error = Some(error.to_string());
2115 exec.cause = Some(cause.to_string());
2116 exec.stop_date = Some(Utc::now());
2117 }
2118}
2119
2120#[cfg(test)]
2121mod tests {
2122 use super::*;
2123 use crate::state::Execution;
2124 use parking_lot::RwLock;
2125 use std::sync::Arc;
2126
2127 fn make_state() -> SharedStepFunctionsState {
2128 Arc::new(RwLock::new(
2129 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2130 ))
2131 }
2132
2133 fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
2134 let mut accounts = state.write();
2135 let s = accounts.get_or_create("123456789012");
2136 s.executions.insert(
2137 arn.to_string(),
2138 Execution {
2139 execution_arn: arn.to_string(),
2140 state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
2141 .to_string(),
2142 state_machine_name: "test".to_string(),
2143 name: "exec-1".to_string(),
2144 status: ExecutionStatus::Running,
2145 input,
2146 output: None,
2147 start_date: Utc::now(),
2148 stop_date: None,
2149 error: None,
2150 cause: None,
2151 history_events: vec![],
2152 },
2153 );
2154 }
2155
2156 #[tokio::test]
2157 async fn test_simple_pass_state() {
2158 let state = make_state();
2159 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2160 create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
2161
2162 let definition = json!({
2163 "StartAt": "PassState",
2164 "States": {
2165 "PassState": {
2166 "Type": "Pass",
2167 "Result": {"processed": true},
2168 "End": true
2169 }
2170 }
2171 })
2172 .to_string();
2173
2174 execute_state_machine(
2175 state.clone(),
2176 arn.to_string(),
2177 definition,
2178 Some(r#"{"hello":"world"}"#.to_string()),
2179 None,
2180 None,
2181 )
2182 .await;
2183
2184 let __a = state.read();
2185 let s = __a.default_ref();
2186 let exec = s.executions.get(arn).unwrap();
2187 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2188 assert!(exec.output.is_some());
2189 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2190 assert_eq!(output, json!({"processed": true}));
2191 }
2192
2193 #[tokio::test]
2194 async fn test_pass_chain() {
2195 let state = make_state();
2196 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2197 create_execution(&state, arn, Some(r#"{}"#.to_string()));
2198
2199 let definition = json!({
2200 "StartAt": "First",
2201 "States": {
2202 "First": {
2203 "Type": "Pass",
2204 "Result": "step1",
2205 "ResultPath": "$.first",
2206 "Next": "Second"
2207 },
2208 "Second": {
2209 "Type": "Pass",
2210 "Result": "step2",
2211 "ResultPath": "$.second",
2212 "End": true
2213 }
2214 }
2215 })
2216 .to_string();
2217
2218 execute_state_machine(
2219 state.clone(),
2220 arn.to_string(),
2221 definition,
2222 Some("{}".to_string()),
2223 None,
2224 None,
2225 )
2226 .await;
2227
2228 let __a = state.read();
2229 let s = __a.default_ref();
2230 let exec = s.executions.get(arn).unwrap();
2231 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2232 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2233 assert_eq!(output["first"], json!("step1"));
2234 assert_eq!(output["second"], json!("step2"));
2235 }
2236
2237 #[tokio::test]
2238 async fn test_succeed_state() {
2239 let state = make_state();
2240 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2241 create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
2242
2243 let definition = json!({
2244 "StartAt": "Done",
2245 "States": {
2246 "Done": {
2247 "Type": "Succeed"
2248 }
2249 }
2250 })
2251 .to_string();
2252
2253 execute_state_machine(
2254 state.clone(),
2255 arn.to_string(),
2256 definition,
2257 Some(r#"{"data": "value"}"#.to_string()),
2258 None,
2259 None,
2260 )
2261 .await;
2262
2263 let __a = state.read();
2264 let s = __a.default_ref();
2265 let exec = s.executions.get(arn).unwrap();
2266 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2267 }
2268
2269 #[tokio::test]
2270 async fn test_fail_state() {
2271 let state = make_state();
2272 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2273 create_execution(&state, arn, None);
2274
2275 let definition = json!({
2276 "StartAt": "FailState",
2277 "States": {
2278 "FailState": {
2279 "Type": "Fail",
2280 "Error": "CustomError",
2281 "Cause": "Something went wrong"
2282 }
2283 }
2284 })
2285 .to_string();
2286
2287 execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
2288
2289 let __a = state.read();
2290 let s = __a.default_ref();
2291 let exec = s.executions.get(arn).unwrap();
2292 assert_eq!(exec.status, ExecutionStatus::Failed);
2293 assert_eq!(exec.error.as_deref(), Some("CustomError"));
2294 assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
2295 }
2296
2297 #[tokio::test]
2298 async fn test_history_events_recorded() {
2299 let state = make_state();
2300 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2301 create_execution(&state, arn, Some("{}".to_string()));
2302
2303 let definition = json!({
2304 "StartAt": "PassState",
2305 "States": {
2306 "PassState": {
2307 "Type": "Pass",
2308 "End": true
2309 }
2310 }
2311 })
2312 .to_string();
2313
2314 execute_state_machine(
2315 state.clone(),
2316 arn.to_string(),
2317 definition,
2318 Some("{}".to_string()),
2319 None,
2320 None,
2321 )
2322 .await;
2323
2324 let __a = state.read();
2325 let s = __a.default_ref();
2326 let exec = s.executions.get(arn).unwrap();
2327 let event_types: Vec<&str> = exec
2328 .history_events
2329 .iter()
2330 .map(|e| e.event_type.as_str())
2331 .collect();
2332 assert_eq!(
2333 event_types,
2334 vec![
2335 "ExecutionStarted",
2336 "PassStateEntered",
2337 "PassStateExited",
2338 "ExecutionSucceeded"
2339 ]
2340 );
2341 }
2342
2343 fn drive(state: &SharedStepFunctionsState, arn: &str, def: Value, input: Option<&str>) {
2344 create_execution(state, arn, input.map(|s| s.to_string()));
2345 let fut = execute_state_machine(
2346 state.clone(),
2347 arn.to_string(),
2348 def.to_string(),
2349 input.map(|s| s.to_string()),
2350 None,
2351 None,
2352 );
2353 let rt = tokio::runtime::Builder::new_current_thread()
2354 .enable_time()
2355 .build()
2356 .unwrap();
2357 rt.block_on(fut);
2358 }
2359
2360 fn read_exec<R>(
2361 state: &SharedStepFunctionsState,
2362 arn: &str,
2363 f: impl FnOnce(&Execution) -> R,
2364 ) -> R {
2365 let __a = state.read();
2366 let s = __a.default_ref();
2367 f(s.executions.get(arn).expect("execution missing"))
2368 }
2369
2370 fn arn_for(name: &str) -> String {
2371 format!("arn:aws:states:us-east-1:123456789012:execution:test:{name}")
2372 }
2373
2374 #[test]
2377 fn pass_state_input_output_path_select_fields() {
2378 let state = make_state();
2379 let arn = arn_for("pass-paths");
2380 let def = json!({
2381 "StartAt": "P",
2382 "States": {
2383 "P": {
2384 "Type": "Pass",
2385 "InputPath": "$.inner",
2386 "OutputPath": "$.kept",
2387 "End": true
2388 }
2389 }
2390 });
2391 drive(
2392 &state,
2393 &arn,
2394 def,
2395 Some(r#"{"inner":{"kept":"yes","dropped":true},"sibling":1}"#),
2396 );
2397
2398 read_exec(&state, &arn, |exec| {
2399 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2400 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2401 assert_eq!(output, json!("yes"));
2402 });
2403 }
2404
2405 #[test]
2408 fn succeed_state_honors_input_path_null() {
2409 let state = make_state();
2410 let arn = arn_for("succeed-null");
2411 let def = json!({
2412 "StartAt": "S",
2413 "States": { "S": { "Type": "Succeed", "InputPath": "null" } }
2414 });
2415 drive(&state, &arn, def, Some(r#"{"a":1}"#));
2416
2417 read_exec(&state, &arn, |exec| {
2418 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2419 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2420 assert_eq!(output, json!({}));
2421 });
2422 }
2423
2424 #[test]
2425 fn fail_state_defaults_when_fields_missing() {
2426 let state = make_state();
2427 let arn = arn_for("fail-default");
2428 let def = json!({
2429 "StartAt": "F",
2430 "States": { "F": { "Type": "Fail" } }
2431 });
2432 drive(&state, &arn, def, None);
2433
2434 read_exec(&state, &arn, |exec| {
2435 assert_eq!(exec.status, ExecutionStatus::Failed);
2436 assert_eq!(exec.error.as_deref(), Some("States.Fail"));
2437 assert_eq!(exec.cause.as_deref(), Some(""));
2438 });
2439 }
2440
2441 fn choice_def() -> Value {
2444 json!({
2445 "StartAt": "C",
2446 "States": {
2447 "C": {
2448 "Type": "Choice",
2449 "Choices": [
2450 {
2451 "Variable": "$.n",
2452 "NumericGreaterThan": 10,
2453 "Next": "Big"
2454 }
2455 ],
2456 "Default": "Small"
2457 },
2458 "Big": { "Type": "Pass", "Result": "big", "End": true },
2459 "Small": { "Type": "Pass", "Result": "small", "End": true }
2460 }
2461 })
2462 }
2463
2464 #[test]
2465 fn choice_routes_to_matching_branch() {
2466 let state = make_state();
2467 let arn = arn_for("choice-big");
2468 drive(&state, &arn, choice_def(), Some(r#"{"n":42}"#));
2469
2470 read_exec(&state, &arn, |exec| {
2471 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2472 assert_eq!(
2473 serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2474 json!("big")
2475 );
2476 });
2477 }
2478
2479 #[test]
2480 fn choice_falls_through_to_default() {
2481 let state = make_state();
2482 let arn = arn_for("choice-default");
2483 drive(&state, &arn, choice_def(), Some(r#"{"n":3}"#));
2484
2485 read_exec(&state, &arn, |exec| {
2486 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2487 assert_eq!(
2488 serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2489 json!("small")
2490 );
2491 });
2492 }
2493
2494 #[test]
2495 fn choice_no_match_and_no_default_fails() {
2496 let state = make_state();
2497 let arn = arn_for("choice-nomatch");
2498 let def = json!({
2499 "StartAt": "C",
2500 "States": {
2501 "C": {
2502 "Type": "Choice",
2503 "Choices": [
2504 { "Variable": "$.n", "NumericEquals": 1, "Next": "End1" }
2505 ]
2506 },
2507 "End1": { "Type": "Pass", "End": true }
2508 }
2509 });
2510 drive(&state, &arn, def, Some(r#"{"n":99}"#));
2511
2512 read_exec(&state, &arn, |exec| {
2513 assert_eq!(exec.status, ExecutionStatus::Failed);
2514 assert_eq!(exec.error.as_deref(), Some("States.NoChoiceMatched"));
2515 });
2516 }
2517
2518 #[test]
2521 fn wait_seconds_then_advances() {
2522 let state = make_state();
2523 let arn = arn_for("wait-secs");
2524 let def = json!({
2525 "StartAt": "W",
2526 "States": {
2527 "W": { "Type": "Wait", "Seconds": 0, "Next": "Done" },
2528 "Done": { "Type": "Succeed" }
2529 }
2530 });
2531 drive(&state, &arn, def, Some(r#"{"k":1}"#));
2532
2533 read_exec(&state, &arn, |exec| {
2534 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2535 });
2536 }
2537
2538 #[test]
2539 fn wait_timestamp_in_past_is_noop() {
2540 let state = make_state();
2541 let arn = arn_for("wait-past");
2542 let def = json!({
2543 "StartAt": "W",
2544 "States": {
2545 "W": {
2546 "Type": "Wait",
2547 "Timestamp": "2000-01-01T00:00:00Z",
2548 "End": true
2549 }
2550 }
2551 });
2552 drive(&state, &arn, def, Some(r#"{"k":1}"#));
2553
2554 read_exec(&state, &arn, |exec| {
2555 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2556 });
2557 }
2558
2559 #[test]
2560 fn wait_without_any_duration_falls_through() {
2561 let state = make_state();
2562 let arn = arn_for("wait-none");
2563 let def = json!({
2564 "StartAt": "W",
2565 "States": { "W": { "Type": "Wait", "End": true } }
2566 });
2567 drive(&state, &arn, def, None);
2568
2569 read_exec(&state, &arn, |exec| {
2570 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2571 });
2572 }
2573
2574 #[test]
2577 fn parallel_runs_two_pass_branches_and_collects_results() {
2578 let state = make_state();
2579 let arn = arn_for("parallel-ok");
2580 let def = json!({
2581 "StartAt": "P",
2582 "States": {
2583 "P": {
2584 "Type": "Parallel",
2585 "End": true,
2586 "Branches": [
2587 {
2588 "StartAt": "A",
2589 "States": { "A": { "Type": "Pass", "Result": "a", "End": true } }
2590 },
2591 {
2592 "StartAt": "B",
2593 "States": { "B": { "Type": "Pass", "Result": "b", "End": true } }
2594 }
2595 ]
2596 }
2597 }
2598 });
2599 drive(&state, &arn, def, Some(r#"{}"#));
2600
2601 read_exec(&state, &arn, |exec| {
2602 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2603 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2604 assert_eq!(output, json!(["a", "b"]));
2605 });
2606 }
2607
2608 #[test]
2609 fn parallel_empty_branches_fails() {
2610 let state = make_state();
2611 let arn = arn_for("parallel-empty");
2612 let def = json!({
2613 "StartAt": "P",
2614 "States": { "P": { "Type": "Parallel", "Branches": [], "End": true } }
2615 });
2616 drive(&state, &arn, def, None);
2617
2618 read_exec(&state, &arn, |exec| {
2619 assert_eq!(exec.status, ExecutionStatus::Failed);
2620 assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2621 });
2622 }
2623
2624 #[test]
2627 fn map_iterates_pass_state_over_items_path() {
2628 let state = make_state();
2629 let arn = arn_for("map-ok");
2630 let def = json!({
2631 "StartAt": "M",
2632 "States": {
2633 "M": {
2634 "Type": "Map",
2635 "ItemsPath": "$.items",
2636 "Iterator": {
2637 "StartAt": "Item",
2638 "States": { "Item": { "Type": "Pass", "End": true } }
2639 },
2640 "End": true
2641 }
2642 }
2643 });
2644 drive(&state, &arn, def, Some(r#"{"items":[1,2,3]}"#));
2645
2646 read_exec(&state, &arn, |exec| {
2647 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2648 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2649 assert_eq!(output, json!([1, 2, 3]));
2650 });
2651 }
2652
2653 #[test]
2656 fn task_unsupported_resource_propagates_failure() {
2657 let state = make_state();
2658 let arn = arn_for("task-unsupported");
2659 let def = json!({
2660 "StartAt": "T",
2661 "States": {
2662 "T": {
2663 "Type": "Task",
2664 "Resource": "arn:aws:states:::nothing:here",
2665 "End": true
2666 }
2667 }
2668 });
2669 drive(&state, &arn, def, None);
2670
2671 read_exec(&state, &arn, |exec| {
2672 assert_eq!(exec.status, ExecutionStatus::Failed);
2673 assert_eq!(exec.error.as_deref(), Some("States.TaskFailed"));
2674 assert!(exec.cause.as_deref().unwrap().contains("Unsupported"));
2675 });
2676 }
2677
2678 #[test]
2679 fn task_sqs_send_without_delivery_fails() {
2680 let state = make_state();
2681 let arn = arn_for("task-sqs-nodelivery");
2682 let def = json!({
2683 "StartAt": "T",
2684 "States": {
2685 "T": {
2686 "Type": "Task",
2687 "Resource": "arn:aws:states:::sqs:sendMessage",
2688 "Parameters": { "QueueUrl": "http://localhost/123/q", "MessageBody": "hi" },
2689 "End": true
2690 }
2691 }
2692 });
2693 drive(&state, &arn, def, Some("{}"));
2694
2695 read_exec(&state, &arn, |exec| {
2696 assert_eq!(exec.status, ExecutionStatus::Failed);
2697 assert!(exec.cause.as_deref().unwrap().contains("delivery bus"));
2698 });
2699 }
2700
2701 #[test]
2704 fn task_catch_routes_error_into_handler() {
2705 let state = make_state();
2706 let arn = arn_for("task-catch");
2707 let def = json!({
2708 "StartAt": "T",
2709 "States": {
2710 "T": {
2711 "Type": "Task",
2712 "Resource": "arn:aws:states:::nothing:here",
2713 "Catch": [
2714 { "ErrorEquals": ["States.ALL"], "Next": "Handler", "ResultPath": "$.err" }
2715 ],
2716 "End": true
2717 },
2718 "Handler": { "Type": "Pass", "End": true }
2719 }
2720 });
2721 drive(&state, &arn, def, Some(r#"{"orig":"v"}"#));
2722
2723 read_exec(&state, &arn, |exec| {
2724 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2725 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2726 assert_eq!(output["orig"], json!("v"));
2728 assert_eq!(output["err"]["Error"], json!("States.TaskFailed"));
2729 });
2730 }
2731
2732 #[test]
2735 fn invalid_definition_json_fails_execution() {
2736 let state = make_state();
2737 let arn = arn_for("bad-json");
2738 create_execution(&state, &arn, None);
2739 let rt = tokio::runtime::Builder::new_current_thread()
2740 .enable_time()
2741 .build()
2742 .unwrap();
2743 rt.block_on(execute_state_machine(
2744 state.clone(),
2745 arn.clone(),
2746 "not json{".to_string(),
2747 None,
2748 None,
2749 None,
2750 ));
2751
2752 read_exec(&state, &arn, |exec| {
2753 assert_eq!(exec.status, ExecutionStatus::Failed);
2754 assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2755 assert!(exec.cause.as_deref().unwrap().contains("Failed to parse"));
2756 });
2757 }
2758
2759 #[test]
2760 fn missing_start_at_fails_execution() {
2761 let state = make_state();
2762 let arn = arn_for("no-startat");
2763 let def = json!({ "States": { "X": { "Type": "Succeed" } } });
2764 drive(&state, &arn, def, None);
2765
2766 read_exec(&state, &arn, |exec| {
2767 assert_eq!(exec.status, ExecutionStatus::Failed);
2768 assert!(exec.cause.as_deref().unwrap().contains("StartAt"));
2769 });
2770 }
2771
2772 #[test]
2773 fn next_state_not_found_fails_execution() {
2774 let state = make_state();
2775 let arn = arn_for("dangling-next");
2776 let def = json!({
2777 "StartAt": "A",
2778 "States": { "A": { "Type": "Pass", "Next": "DoesNotExist" } }
2779 });
2780 drive(&state, &arn, def, None);
2781
2782 read_exec(&state, &arn, |exec| {
2783 assert_eq!(exec.status, ExecutionStatus::Failed);
2784 assert!(exec.cause.as_deref().unwrap().contains("not found"));
2785 });
2786 }
2787
2788 #[test]
2789 fn unsupported_state_type_fails_execution() {
2790 let state = make_state();
2791 let arn = arn_for("bad-type");
2792 let def = json!({
2793 "StartAt": "X",
2794 "States": { "X": { "Type": "WatChoo", "End": true } }
2795 });
2796 drive(&state, &arn, def, None);
2797
2798 read_exec(&state, &arn, |exec| {
2799 assert_eq!(exec.status, ExecutionStatus::Failed);
2800 assert!(exec
2801 .cause
2802 .as_deref()
2803 .unwrap()
2804 .contains("Unsupported state type"));
2805 });
2806 }
2807
2808 #[test]
2811 fn apply_parameters_substitutes_json_path_refs() {
2812 let template = json!({
2813 "literal": "constant",
2814 "ref.$": "$.user.id",
2815 "nested": { "inner.$": "$.user.name" },
2816 "list": [ { "x.$": "$.user.id" } ]
2817 });
2818 let input = json!({ "user": { "id": 42, "name": "zoe" } });
2819 let out = apply_parameters(&template, &input);
2820 assert_eq!(out["literal"], json!("constant"));
2821 assert_eq!(out["ref"], json!(42));
2822 assert_eq!(out["nested"]["inner"], json!("zoe"));
2823 assert_eq!(out["list"][0]["x"], json!(42));
2824 }
2825
2826 #[test]
2827 fn next_state_returns_end_name_or_error() {
2828 match next_state(&json!({ "End": true })) {
2829 NextState::End => {}
2830 _ => panic!("expected End"),
2831 }
2832 match next_state(&json!({ "Next": "A" })) {
2833 NextState::Name(n) => assert_eq!(n, "A"),
2834 _ => panic!("expected Name"),
2835 }
2836 match next_state(&json!({})) {
2837 NextState::Error(_) => {}
2838 _ => panic!("expected Error"),
2839 }
2840 }
2841
2842 #[test]
2843 fn apply_state_catcher_matches_wildcard_and_stashes_error() {
2844 let state_def = json!({
2845 "Catch": [
2846 { "ErrorEquals": ["States.ALL"], "Next": "H", "ResultPath": "$.caught" }
2847 ]
2848 });
2849 let input = json!({ "a": 1 });
2850 let (next, new_input) =
2851 apply_state_catcher(&state_def, &input, "Boom", "it exploded").unwrap();
2852 assert_eq!(next, "H");
2853 assert_eq!(new_input["a"], json!(1));
2854 assert_eq!(new_input["caught"]["Error"], json!("Boom"));
2855 assert_eq!(new_input["caught"]["Cause"], json!("it exploded"));
2856 }
2857
2858 #[test]
2859 fn apply_state_catcher_returns_none_without_match() {
2860 let state_def = json!({
2861 "Catch": [
2862 { "ErrorEquals": ["Specific.Error"], "Next": "H" }
2863 ]
2864 });
2865 let input = json!({});
2866 assert!(apply_state_catcher(&state_def, &input, "Other", "why").is_none());
2867 }
2868
2869 #[test]
2870 fn queue_url_to_arn_parses_account_and_name() {
2871 assert_eq!(
2872 queue_url_to_arn("http://sqs.local:4566/123456789012/my-queue"),
2873 "arn:aws:sqs:us-east-1:123456789012:my-queue"
2874 );
2875 }
2876
2877 #[test]
2878 fn queue_url_to_arn_falls_back_for_unparseable_input() {
2879 assert_eq!(queue_url_to_arn("bad"), "bad");
2880 }
2881
2882 #[test]
2883 fn md5_hex_is_deterministic_and_32_chars() {
2884 let a = md5_hex("hello");
2885 let b = md5_hex("hello");
2886 assert_eq!(a, b);
2887 assert_eq!(a.len(), 32);
2888 assert_ne!(a, md5_hex("world"));
2889 }
2890
2891 #[test]
2892 fn apply_update_expression_sets_direct_and_aliased_attrs() {
2893 let mut item: HashMap<String, Value> = HashMap::new();
2894 item.insert("id".to_string(), json!({"S": "1"}));
2895
2896 let mut attr_values = serde_json::Map::new();
2897 attr_values.insert(":n".to_string(), json!({"S": "Alice"}));
2898 attr_values.insert(":c".to_string(), json!({"N": "5"}));
2899
2900 let mut attr_names = serde_json::Map::new();
2901 attr_names.insert("#name".to_string(), json!("name"));
2902
2903 apply_update_expression(
2904 &mut item,
2905 "SET #name = :n, count = :c",
2906 &attr_values,
2907 &attr_names,
2908 );
2909
2910 assert_eq!(item.get("name").unwrap(), &json!({"S": "Alice"}));
2911 assert_eq!(item.get("count").unwrap(), &json!({"N": "5"}));
2912 assert_eq!(item.get("id").unwrap(), &json!({"S": "1"}));
2913 }
2914
2915 #[test]
2916 fn apply_update_expression_accepts_lowercase_set_keyword() {
2917 let mut item: HashMap<String, Value> = HashMap::new();
2918 let mut attr_values = serde_json::Map::new();
2919 attr_values.insert(":v".to_string(), json!({"S": "x"}));
2920 apply_update_expression(
2921 &mut item,
2922 "set field = :v",
2923 &attr_values,
2924 &serde_json::Map::new(),
2925 );
2926 assert_eq!(item.get("field").unwrap(), &json!({"S": "x"}));
2927 }
2928
2929 #[test]
2930 fn apply_update_expression_set_arithmetic_increments_counter() {
2931 let mut item: HashMap<String, Value> = HashMap::new();
2932 item.insert("count".to_string(), json!({"N": "10"}));
2933 let mut attr_values = serde_json::Map::new();
2934 attr_values.insert(":inc".to_string(), json!({"N": "3"}));
2935 apply_update_expression(
2936 &mut item,
2937 "SET count = count + :inc",
2938 &attr_values,
2939 &serde_json::Map::new(),
2940 );
2941 assert_eq!(item.get("count").unwrap(), &json!({"N": "13"}));
2942 }
2943
2944 #[test]
2945 fn apply_update_expression_set_decrement() {
2946 let mut item: HashMap<String, Value> = HashMap::new();
2947 item.insert("count".to_string(), json!({"N": "10"}));
2948 let mut attr_values = serde_json::Map::new();
2949 attr_values.insert(":d".to_string(), json!({"N": "4"}));
2950 apply_update_expression(
2951 &mut item,
2952 "SET count = count - :d",
2953 &attr_values,
2954 &serde_json::Map::new(),
2955 );
2956 assert_eq!(item.get("count").unwrap(), &json!({"N": "6"}));
2957 }
2958
2959 #[test]
2960 fn apply_update_expression_remove_drops_attributes() {
2961 let mut item: HashMap<String, Value> = HashMap::new();
2962 item.insert("a".to_string(), json!({"S": "x"}));
2963 item.insert("b".to_string(), json!({"S": "y"}));
2964 item.insert("c".to_string(), json!({"S": "z"}));
2965 apply_update_expression(
2966 &mut item,
2967 "REMOVE a, c",
2968 &serde_json::Map::new(),
2969 &serde_json::Map::new(),
2970 );
2971 assert!(!item.contains_key("a"));
2972 assert!(item.contains_key("b"));
2973 assert!(!item.contains_key("c"));
2974 }
2975
2976 #[test]
2977 fn apply_update_expression_add_increments_existing_or_initializes() {
2978 let mut item: HashMap<String, Value> = HashMap::new();
2980 item.insert("count".to_string(), json!({"N": "5"}));
2981 let mut attr_values = serde_json::Map::new();
2982 attr_values.insert(":inc".to_string(), json!({"N": "2"}));
2983 apply_update_expression(
2984 &mut item,
2985 "ADD count :inc",
2986 &attr_values,
2987 &serde_json::Map::new(),
2988 );
2989 assert_eq!(item.get("count").unwrap(), &json!({"N": "7"}));
2990
2991 let mut item2: HashMap<String, Value> = HashMap::new();
2993 apply_update_expression(
2994 &mut item2,
2995 "ADD count :inc",
2996 &attr_values,
2997 &serde_json::Map::new(),
2998 );
2999 assert_eq!(item2.get("count").unwrap(), &json!({"N": "2"}));
3000 }
3001
3002 #[test]
3003 fn apply_update_expression_delete_removes_set_elements() {
3004 let mut item: HashMap<String, Value> = HashMap::new();
3005 item.insert("tags".to_string(), json!({"SS": ["a", "b", "c"]}));
3006 let mut attr_values = serde_json::Map::new();
3007 attr_values.insert(":rm".to_string(), json!({"SS": ["b"]}));
3008 apply_update_expression(
3009 &mut item,
3010 "DELETE tags :rm",
3011 &attr_values,
3012 &serde_json::Map::new(),
3013 );
3014 assert_eq!(item.get("tags").unwrap(), &json!({"SS": ["a", "c"]}));
3015 }
3016
3017 #[test]
3018 fn apply_update_expression_if_not_exists_initializes_only_when_absent() {
3019 let mut item: HashMap<String, Value> = HashMap::new();
3021 let mut attr_values = serde_json::Map::new();
3022 attr_values.insert(":zero".to_string(), json!({"N": "0"}));
3023 apply_update_expression(
3024 &mut item,
3025 "SET count = if_not_exists(count, :zero)",
3026 &attr_values,
3027 &serde_json::Map::new(),
3028 );
3029 assert_eq!(item.get("count").unwrap(), &json!({"N": "0"}));
3030
3031 let mut item2: HashMap<String, Value> = HashMap::new();
3033 item2.insert("count".to_string(), json!({"N": "42"}));
3034 apply_update_expression(
3035 &mut item2,
3036 "SET count = if_not_exists(count, :zero)",
3037 &attr_values,
3038 &serde_json::Map::new(),
3039 );
3040 assert_eq!(item2.get("count").unwrap(), &json!({"N": "42"}));
3041 }
3042
3043 #[test]
3044 fn apply_update_expression_combines_clauses() {
3045 let mut item: HashMap<String, Value> = HashMap::new();
3046 item.insert("a".to_string(), json!({"S": "old"}));
3047 item.insert("b".to_string(), json!({"N": "1"}));
3048 item.insert("c".to_string(), json!({"S": "drop"}));
3049 let mut attr_values = serde_json::Map::new();
3050 attr_values.insert(":new".to_string(), json!({"S": "new"}));
3051 attr_values.insert(":one".to_string(), json!({"N": "1"}));
3052 apply_update_expression(
3053 &mut item,
3054 "SET a = :new ADD b :one REMOVE c",
3055 &attr_values,
3056 &serde_json::Map::new(),
3057 );
3058 assert_eq!(item.get("a").unwrap(), &json!({"S": "new"}));
3059 assert_eq!(item.get("b").unwrap(), &json!({"N": "2"}));
3060 assert!(!item.contains_key("c"));
3061 }
3062
3063 #[test]
3066 fn task_dynamodb_get_item_without_state_fails() {
3067 let state = make_state();
3068 let arn = arn_for("ddb-get-nostate");
3069 let def = json!({
3070 "StartAt": "T",
3071 "States": {
3072 "T": {
3073 "Type": "Task",
3074 "Resource": "arn:aws:states:::dynamodb:getItem",
3075 "Parameters": { "TableName": "t", "Key": { "id": { "S": "1" } } },
3076 "End": true
3077 }
3078 }
3079 });
3080 drive(&state, &arn, def, Some("{}"));
3081 read_exec(&state, &arn, |exec| {
3082 assert_eq!(exec.status, ExecutionStatus::Failed);
3083 assert!(exec.cause.as_deref().unwrap().contains("DynamoDB"));
3084 });
3085 }
3086
3087 #[test]
3090 fn succeed_execution_is_noop_when_already_terminal() {
3091 let state = make_state();
3092 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already";
3093 create_execution(&state, arn, None);
3094 {
3095 let mut __a = state.write();
3096 let s = __a.default_mut();
3097 s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Failed;
3098 }
3099 succeed_execution(&state, arn, &json!({"x":1}));
3100 let __a = state.read();
3101 let s = __a.default_ref();
3102 let exec = s.executions.get(arn).unwrap();
3103 assert_eq!(exec.status, ExecutionStatus::Failed);
3104 assert!(exec.output.is_none());
3105 }
3106
3107 #[test]
3108 fn fail_execution_is_noop_when_already_terminal() {
3109 let state = make_state();
3110 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already2";
3111 create_execution(&state, arn, None);
3112 {
3113 let mut __a = state.write();
3114 let s = __a.default_mut();
3115 s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Succeeded;
3116 }
3117 fail_execution(&state, arn, "Oops", "nope");
3118 let __a = state.read();
3119 let s = __a.default_ref();
3120 let exec = s.executions.get(arn).unwrap();
3121 assert_eq!(exec.status, ExecutionStatus::Succeeded);
3122 assert!(exec.error.is_none());
3123 }
3124
3125 #[test]
3128 fn pass_state_result_path_merges_into_input() {
3129 let state = make_state();
3130 let arn = arn_for("result-path");
3131 let def = json!({
3132 "StartAt": "P",
3133 "States": {
3134 "P": {"Type": "Pass", "Result": {"x": 2}, "ResultPath": "$.data", "End": true}
3135 }
3136 });
3137 drive(&state, &arn, def, Some(r#"{"a":1}"#));
3138 let output = read_exec(&state, &arn, |e| e.output.clone().unwrap_or_default());
3139 let v: Value = serde_json::from_str(&output).unwrap();
3140 assert_eq!(v["a"], 1);
3141 assert_eq!(v["data"]["x"], 2);
3142 }
3143
3144 #[test]
3147 fn choice_string_greater_than_equals() {
3148 let state = make_state();
3149 let arn = arn_for("choice-sgte");
3150 let def = json!({
3151 "StartAt": "C",
3152 "States": {
3153 "C": {
3154 "Type": "Choice",
3155 "Choices": [
3156 {"Variable": "$.val", "StringGreaterThanEquals": "apple", "Next": "End"}
3157 ],
3158 "Default": "Fail"
3159 },
3160 "End": {"Type": "Pass", "End": true},
3161 "Fail": {"Type": "Fail"}
3162 }
3163 });
3164 drive(&state, &arn, def, Some(r#"{"val":"banana"}"#));
3165 let status = read_exec(&state, &arn, |e| e.status);
3166 assert_eq!(status, ExecutionStatus::Succeeded);
3167 }
3168
3169 #[test]
3170 fn choice_is_present_and_is_null() {
3171 let state = make_state();
3172 let arn = arn_for("choice-ispres");
3173 let def = json!({
3174 "StartAt": "C",
3175 "States": {
3176 "C": {
3177 "Type": "Choice",
3178 "Choices": [
3179 {"Variable": "$.foo", "IsPresent": true, "Next": "End"}
3180 ],
3181 "Default": "Fail"
3182 },
3183 "End": {"Type": "Pass", "End": true},
3184 "Fail": {"Type": "Fail"}
3185 }
3186 });
3187 drive(&state, &arn, def, Some(r#"{"foo":null}"#));
3188 assert_eq!(
3189 read_exec(&state, &arn, |e| e.status),
3190 ExecutionStatus::Succeeded
3191 );
3192 }
3193
3194 #[test]
3195 fn choice_or_short_circuits() {
3196 let state = make_state();
3197 let arn = arn_for("choice-or");
3198 let def = json!({
3199 "StartAt": "C",
3200 "States": {
3201 "C": {
3202 "Type": "Choice",
3203 "Choices": [{
3204 "Or": [
3205 {"Variable": "$.x", "NumericEquals": 99},
3206 {"Variable": "$.y", "StringEquals": "b"}
3207 ],
3208 "Next": "End"
3209 }],
3210 "Default": "Fail"
3211 },
3212 "End": {"Type": "Pass", "End": true},
3213 "Fail": {"Type": "Fail"}
3214 }
3215 });
3216 drive(&state, &arn, def, Some(r#"{"x":1,"y":"b"}"#));
3217 assert_eq!(
3218 read_exec(&state, &arn, |e| e.status),
3219 ExecutionStatus::Succeeded
3220 );
3221 }
3222
3223 #[test]
3224 fn choice_not_negates() {
3225 let state = make_state();
3226 let arn = arn_for("choice-not");
3227 let def = json!({
3228 "StartAt": "C",
3229 "States": {
3230 "C": {
3231 "Type": "Choice",
3232 "Choices": [{
3233 "Not": {"Variable": "$.x", "NumericEquals": 99},
3234 "Next": "End"
3235 }],
3236 "Default": "Fail"
3237 },
3238 "End": {"Type": "Pass", "End": true},
3239 "Fail": {"Type": "Fail"}
3240 }
3241 });
3242 drive(&state, &arn, def, Some(r#"{"x":1}"#));
3243 assert_eq!(
3244 read_exec(&state, &arn, |e| e.status),
3245 ExecutionStatus::Succeeded
3246 );
3247 }
3248
3249 #[test]
3250 fn choice_boolean_equals() {
3251 let state = make_state();
3252 let arn = arn_for("choice-bool");
3253 let def = json!({
3254 "StartAt": "C",
3255 "States": {
3256 "C": {
3257 "Type": "Choice",
3258 "Choices": [
3259 {"Variable": "$.ok", "BooleanEquals": true, "Next": "End"}
3260 ],
3261 "Default": "Fail"
3262 },
3263 "End": {"Type": "Pass", "End": true},
3264 "Fail": {"Type": "Fail"}
3265 }
3266 });
3267 drive(&state, &arn, def, Some(r#"{"ok":true}"#));
3268 assert_eq!(
3269 read_exec(&state, &arn, |e| e.status),
3270 ExecutionStatus::Succeeded
3271 );
3272 }
3273
3274 #[test]
3277 fn wait_seconds_path_uses_input_value() {
3278 let state = make_state();
3279 let arn = arn_for("wait-sp");
3280 let def = json!({
3281 "StartAt": "W",
3282 "States": {
3283 "W": {"Type": "Wait", "SecondsPath": "$.wait", "End": true}
3284 }
3285 });
3286 drive(&state, &arn, def, Some(r#"{"wait":0}"#));
3287 assert_eq!(
3288 read_exec(&state, &arn, |e| e.status),
3289 ExecutionStatus::Succeeded
3290 );
3291 }
3292
3293 #[test]
3296 fn map_state_empty_array_succeeds() {
3297 let state = make_state();
3298 let arn = arn_for("map-empty");
3299 let def = json!({
3300 "StartAt": "M",
3301 "States": {
3302 "M": {
3303 "Type": "Map",
3304 "ItemsPath": "$.items",
3305 "ItemProcessor": {
3306 "StartAt": "P",
3307 "States": {
3308 "P": {"Type": "Pass", "End": true}
3309 }
3310 },
3311 "End": true
3312 }
3313 }
3314 });
3315 drive(&state, &arn, def, Some(r#"{"items":[]}"#));
3316 assert_eq!(
3317 read_exec(&state, &arn, |e| e.status),
3318 ExecutionStatus::Succeeded
3319 );
3320 }
3321
3322 #[test]
3325 fn fail_state_with_explicit_error_and_cause() {
3326 let state = make_state();
3327 let arn = arn_for("fail-fields");
3328 create_execution(&state, &arn, None);
3329 let def = json!({
3330 "StartAt": "F",
3331 "States": {
3332 "F": {"Type": "Fail", "Error": "MyError", "Cause": "my cause"}
3333 }
3334 });
3335 drive(&state, &arn, def, None);
3336 let status = read_exec(&state, &arn, |e| e.status);
3337 assert_eq!(status, ExecutionStatus::Failed);
3338 let err = read_exec(&state, &arn, |e| e.error.clone().unwrap_or_default());
3339 assert_eq!(err, "MyError");
3340 }
3341}