1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::state::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
16
17pub async fn execute_state_machine(
20 state: SharedStepFunctionsState,
21 execution_arn: String,
22 definition: String,
23 input: Option<String>,
24 delivery: Option<Arc<DeliveryBus>>,
25 dynamodb_state: Option<SharedDynamoDbState>,
26) {
27 let def: Value = match serde_json::from_str(&definition) {
28 Ok(v) => v,
29 Err(e) => {
30 fail_execution(
31 &state,
32 &execution_arn,
33 "States.Runtime",
34 &format!("Failed to parse definition: {e}"),
35 );
36 return;
37 }
38 };
39
40 let raw_input: Value = input
41 .as_deref()
42 .and_then(|s| serde_json::from_str(s).ok())
43 .unwrap_or(json!({}));
44
45 add_event(
47 &state,
48 &execution_arn,
49 "ExecutionStarted",
50 0,
51 json!({
52 "input": serde_json::to_string(&raw_input).unwrap_or_default(),
53 "roleArn": "arn:aws:iam::123456789012:role/execution-role"
54 }),
55 );
56
57 let def_owned = def;
63 let state_clone = state.clone();
64 let execution_arn_clone = execution_arn.clone();
65 let delivery_clone = delivery.clone();
66 let dynamodb_state_clone = dynamodb_state.clone();
67 let handle = tokio::spawn(async move {
68 run_states(
69 &def_owned,
70 raw_input,
71 &delivery_clone,
72 &dynamodb_state_clone,
73 &state_clone,
74 &execution_arn_clone,
75 )
76 .await
77 });
78
79 match handle.await {
80 Ok(Ok(output)) => {
81 succeed_execution(&state, &execution_arn, &output);
82 }
83 Ok(Err((error, cause))) => {
84 fail_execution(&state, &execution_arn, &error, &cause);
85 }
86 Err(join_err) => {
87 let msg = if join_err.is_panic() {
88 let payload = join_err.into_panic();
89 if let Some(s) = payload.downcast_ref::<String>() {
90 s.clone()
91 } else if let Some(s) = payload.downcast_ref::<&'static str>() {
92 (*s).to_string()
93 } else {
94 "execution task panicked".to_string()
95 }
96 } else {
97 format!("execution task cancelled: {join_err}")
98 };
99 tracing::error!(
100 execution_arn = %execution_arn,
101 panic = %msg,
102 "Step Functions execution panicked"
103 );
104 fail_execution(&state, &execution_arn, "States.Runtime", &msg);
105 }
106 }
107}
108
109type StatesResult<'a> = std::pin::Pin<
110 Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
111>;
112
113fn run_states<'a>(
116 def: &'a Value,
117 input: Value,
118 delivery: &'a Option<Arc<DeliveryBus>>,
119 dynamodb_state: &'a Option<SharedDynamoDbState>,
120 shared_state: &'a SharedStepFunctionsState,
121 execution_arn: &'a str,
122) -> StatesResult<'a> {
123 Box::pin(async move {
124 let start_at = def["StartAt"]
125 .as_str()
126 .ok_or_else(|| {
127 (
128 "States.Runtime".to_string(),
129 "Missing StartAt in definition".to_string(),
130 )
131 })?
132 .to_string();
133
134 let states = def.get("States").ok_or_else(|| {
135 (
136 "States.Runtime".to_string(),
137 "Missing States in definition".to_string(),
138 )
139 })?;
140
141 let mut current_state = start_at;
142 let mut effective_input = input;
143 let mut iteration = 0;
144 let max_iterations = 500;
145
146 loop {
147 iteration += 1;
148 if iteration > max_iterations {
149 return Err((
150 "States.Runtime".to_string(),
151 "Maximum number of state transitions exceeded".to_string(),
152 ));
153 }
154
155 let state_def = states.get(¤t_state).cloned().ok_or_else(|| {
156 (
157 "States.Runtime".to_string(),
158 format!("State '{current_state}' not found in definition"),
159 )
160 })?;
161
162 let state_type = state_def["Type"]
163 .as_str()
164 .ok_or_else(|| {
165 (
166 "States.Runtime".to_string(),
167 format!("State '{current_state}' missing Type field"),
168 )
169 })?
170 .to_string();
171
172 debug!(
173 execution_arn = %execution_arn,
174 state = %current_state,
175 state_type = %state_type,
176 "Executing state"
177 );
178
179 let advance = match state_type.as_str() {
180 "Pass" => run_pass_state(
181 ¤t_state,
182 &state_def,
183 effective_input,
184 shared_state,
185 execution_arn,
186 ),
187 "Succeed" => run_succeed_state(
188 ¤t_state,
189 &state_def,
190 effective_input,
191 shared_state,
192 execution_arn,
193 ),
194 "Fail" => run_fail_state(
195 ¤t_state,
196 &state_def,
197 effective_input,
198 shared_state,
199 execution_arn,
200 ),
201 "Choice" => run_choice_state(
202 ¤t_state,
203 &state_def,
204 effective_input,
205 shared_state,
206 execution_arn,
207 ),
208 "Wait" => {
209 run_wait_state(
210 ¤t_state,
211 &state_def,
212 effective_input,
213 shared_state,
214 execution_arn,
215 )
216 .await
217 }
218 "Task" => {
219 run_task_state(
220 ¤t_state,
221 &state_def,
222 effective_input,
223 delivery,
224 dynamodb_state,
225 shared_state,
226 execution_arn,
227 )
228 .await
229 }
230 "Parallel" => {
231 run_parallel_state(
232 ¤t_state,
233 &state_def,
234 effective_input,
235 delivery,
236 dynamodb_state,
237 shared_state,
238 execution_arn,
239 )
240 .await
241 }
242 "Map" => {
243 run_map_state(
244 ¤t_state,
245 &state_def,
246 effective_input,
247 delivery,
248 dynamodb_state,
249 shared_state,
250 execution_arn,
251 )
252 .await
253 }
254 other => Advance::Fail(
255 "States.Runtime".to_string(),
256 format!("Unsupported state type: '{other}'"),
257 ),
258 };
259
260 match advance {
261 Advance::Next(next, new_input) => {
262 effective_input = new_input;
263 current_state = next;
264 }
265 Advance::End(output) => return Ok(output),
266 Advance::Fail(error, cause) => return Err((error, cause)),
267 }
268 }
269 })
270}
271
272enum Advance {
274 Next(String, Value),
276 End(Value),
278 Fail(String, String),
280}
281
282fn advance_from_next(state_def: &Value, input: Value) -> Advance {
283 match next_state(state_def) {
284 NextState::Name(next) => Advance::Next(next, input),
285 NextState::End => Advance::End(input),
286 NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
287 }
288}
289
290fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
291 match apply_state_catcher(state_def, input, &error, &cause) {
292 Some((next, new_input)) => Advance::Next(next, new_input),
293 None => Advance::Fail(error, cause),
294 }
295}
296
297fn run_pass_state(
298 name: &str,
299 state_def: &Value,
300 input: Value,
301 shared_state: &SharedStepFunctionsState,
302 execution_arn: &str,
303) -> Advance {
304 let entered_event_id = add_event(
305 shared_state,
306 execution_arn,
307 "PassStateEntered",
308 0,
309 json!({
310 "name": name,
311 "input": serde_json::to_string(&input).unwrap_or_default(),
312 }),
313 );
314
315 let result = execute_pass_state(state_def, &input);
316
317 add_event(
318 shared_state,
319 execution_arn,
320 "PassStateExited",
321 entered_event_id,
322 json!({
323 "name": name,
324 "output": serde_json::to_string(&result).unwrap_or_default(),
325 }),
326 );
327
328 advance_from_next(state_def, result)
329}
330
331fn run_succeed_state(
332 name: &str,
333 state_def: &Value,
334 input: Value,
335 shared_state: &SharedStepFunctionsState,
336 execution_arn: &str,
337) -> Advance {
338 add_event(
339 shared_state,
340 execution_arn,
341 "SucceedStateEntered",
342 0,
343 json!({
344 "name": name,
345 "input": serde_json::to_string(&input).unwrap_or_default(),
346 }),
347 );
348
349 let input_path = state_def["InputPath"].as_str();
350 let output_path = state_def["OutputPath"].as_str();
351
352 let processed = if input_path == Some("null") {
353 json!({})
354 } else {
355 apply_input_path(&input, input_path)
356 };
357
358 let output = if output_path == Some("null") {
359 json!({})
360 } else {
361 apply_output_path(&processed, output_path)
362 };
363
364 add_event(
365 shared_state,
366 execution_arn,
367 "SucceedStateExited",
368 0,
369 json!({
370 "name": name,
371 "output": serde_json::to_string(&output).unwrap_or_default(),
372 }),
373 );
374
375 Advance::End(output)
376}
377
378fn run_fail_state(
379 name: &str,
380 state_def: &Value,
381 input: Value,
382 shared_state: &SharedStepFunctionsState,
383 execution_arn: &str,
384) -> Advance {
385 let error = state_def["Error"]
386 .as_str()
387 .unwrap_or("States.Fail")
388 .to_string();
389 let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
390
391 add_event(
392 shared_state,
393 execution_arn,
394 "FailStateEntered",
395 0,
396 json!({
397 "name": name,
398 "input": serde_json::to_string(&input).unwrap_or_default(),
399 }),
400 );
401
402 Advance::Fail(error, cause)
403}
404
405fn run_choice_state(
406 name: &str,
407 state_def: &Value,
408 input: Value,
409 shared_state: &SharedStepFunctionsState,
410 execution_arn: &str,
411) -> Advance {
412 let entered_event_id = add_event(
413 shared_state,
414 execution_arn,
415 "ChoiceStateEntered",
416 0,
417 json!({
418 "name": name,
419 "input": serde_json::to_string(&input).unwrap_or_default(),
420 }),
421 );
422
423 let input_path = state_def["InputPath"].as_str();
424 let processed_input = if input_path == Some("null") {
425 json!({})
426 } else {
427 apply_input_path(&input, input_path)
428 };
429
430 match evaluate_choice(state_def, &processed_input) {
431 Some(next) => {
432 add_event(
433 shared_state,
434 execution_arn,
435 "ChoiceStateExited",
436 entered_event_id,
437 json!({
438 "name": name,
439 "output": serde_json::to_string(&input).unwrap_or_default(),
440 }),
441 );
442 Advance::Next(next, input)
443 }
444 None => Advance::Fail(
445 "States.NoChoiceMatched".to_string(),
446 format!("No choice rule matched and no Default in state '{name}'"),
447 ),
448 }
449}
450
451async fn run_wait_state(
452 name: &str,
453 state_def: &Value,
454 input: Value,
455 shared_state: &SharedStepFunctionsState,
456 execution_arn: &str,
457) -> Advance {
458 let entered_event_id = add_event(
459 shared_state,
460 execution_arn,
461 "WaitStateEntered",
462 0,
463 json!({
464 "name": name,
465 "input": serde_json::to_string(&input).unwrap_or_default(),
466 }),
467 );
468
469 execute_wait_state(state_def, &input).await;
470
471 add_event(
472 shared_state,
473 execution_arn,
474 "WaitStateExited",
475 entered_event_id,
476 json!({
477 "name": name,
478 "output": serde_json::to_string(&input).unwrap_or_default(),
479 }),
480 );
481
482 advance_from_next(state_def, input)
483}
484
485#[allow(clippy::too_many_arguments)]
486async fn run_task_state(
487 name: &str,
488 state_def: &Value,
489 input: Value,
490 delivery: &Option<Arc<DeliveryBus>>,
491 dynamodb_state: &Option<SharedDynamoDbState>,
492 shared_state: &SharedStepFunctionsState,
493 execution_arn: &str,
494) -> Advance {
495 let entered_event_id = add_event(
496 shared_state,
497 execution_arn,
498 "TaskStateEntered",
499 0,
500 json!({
501 "name": name,
502 "input": serde_json::to_string(&input).unwrap_or_default(),
503 }),
504 );
505
506 let result = execute_task_state(
507 state_def,
508 &input,
509 delivery,
510 dynamodb_state,
511 shared_state,
512 execution_arn,
513 entered_event_id,
514 )
515 .await;
516
517 match result {
518 Ok(output) => {
519 add_event(
520 shared_state,
521 execution_arn,
522 "TaskStateExited",
523 entered_event_id,
524 json!({
525 "name": name,
526 "output": serde_json::to_string(&output).unwrap_or_default(),
527 }),
528 );
529 advance_from_next(state_def, output)
530 }
531 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
532 }
533}
534
535#[allow(clippy::too_many_arguments)]
536async fn run_parallel_state(
537 name: &str,
538 state_def: &Value,
539 input: Value,
540 delivery: &Option<Arc<DeliveryBus>>,
541 dynamodb_state: &Option<SharedDynamoDbState>,
542 shared_state: &SharedStepFunctionsState,
543 execution_arn: &str,
544) -> Advance {
545 let entered_event_id = add_event(
546 shared_state,
547 execution_arn,
548 "ParallelStateEntered",
549 0,
550 json!({
551 "name": name,
552 "input": serde_json::to_string(&input).unwrap_or_default(),
553 }),
554 );
555
556 let result = execute_parallel_state(
557 state_def,
558 &input,
559 delivery,
560 dynamodb_state,
561 shared_state,
562 execution_arn,
563 )
564 .await;
565
566 match result {
567 Ok(output) => {
568 add_event(
569 shared_state,
570 execution_arn,
571 "ParallelStateExited",
572 entered_event_id,
573 json!({
574 "name": name,
575 "output": serde_json::to_string(&output).unwrap_or_default(),
576 }),
577 );
578 advance_from_next(state_def, output)
579 }
580 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
581 }
582}
583
584#[allow(clippy::too_many_arguments)]
585async fn run_map_state(
586 name: &str,
587 state_def: &Value,
588 input: Value,
589 delivery: &Option<Arc<DeliveryBus>>,
590 dynamodb_state: &Option<SharedDynamoDbState>,
591 shared_state: &SharedStepFunctionsState,
592 execution_arn: &str,
593) -> Advance {
594 let entered_event_id = add_event(
595 shared_state,
596 execution_arn,
597 "MapStateEntered",
598 0,
599 json!({
600 "name": name,
601 "input": serde_json::to_string(&input).unwrap_or_default(),
602 }),
603 );
604
605 let result = execute_map_state(
606 state_def,
607 &input,
608 delivery,
609 dynamodb_state,
610 shared_state,
611 execution_arn,
612 )
613 .await;
614
615 match result {
616 Ok(output) => {
617 add_event(
618 shared_state,
619 execution_arn,
620 "MapStateExited",
621 entered_event_id,
622 json!({
623 "name": name,
624 "output": serde_json::to_string(&output).unwrap_or_default(),
625 }),
626 );
627 advance_from_next(state_def, output)
628 }
629 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
630 }
631}
632
633async fn execute_wait_state(state_def: &Value, input: &Value) {
635 if let Some(seconds) = state_def["Seconds"].as_u64() {
636 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
637 return;
638 }
639
640 if let Some(path) = state_def["SecondsPath"].as_str() {
641 let val = crate::io_processing::resolve_path(input, path);
642 if let Some(seconds) = val.as_u64() {
643 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
644 }
645 return;
646 }
647
648 if let Some(ts_str) = state_def["Timestamp"].as_str() {
649 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
650 let now = Utc::now();
651 let target_utc = target.with_timezone(&chrono::Utc);
652 if target_utc > now {
653 let duration = (target_utc - now).to_std().unwrap_or_default();
654 tokio::time::sleep(duration).await;
655 }
656 }
657 return;
658 }
659
660 if let Some(path) = state_def["TimestampPath"].as_str() {
661 let val = crate::io_processing::resolve_path(input, path);
662 if let Some(ts_str) = val.as_str() {
663 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
664 let now = Utc::now();
665 let target_utc = target.with_timezone(&chrono::Utc);
666 if target_utc > now {
667 let duration = (target_utc - now).to_std().unwrap_or_default();
668 tokio::time::sleep(duration).await;
669 }
670 }
671 }
672 return;
673 }
674
675 warn!(
676 "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
677 );
678}
679
680fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
682 let input_path = state_def["InputPath"].as_str();
683 let result_path = state_def["ResultPath"].as_str();
684 let output_path = state_def["OutputPath"].as_str();
685
686 let effective_input = if input_path == Some("null") {
687 json!({})
688 } else {
689 apply_input_path(input, input_path)
690 };
691
692 let result = if let Some(r) = state_def.get("Result") {
693 r.clone()
694 } else {
695 effective_input.clone()
696 };
697
698 let after_result = if result_path == Some("null") {
699 input.clone()
700 } else {
701 apply_result_path(input, &result, result_path)
702 };
703
704 if output_path == Some("null") {
705 json!({})
706 } else {
707 apply_output_path(&after_result, output_path)
708 }
709}
710
711async fn execute_task_state(
714 state_def: &Value,
715 input: &Value,
716 delivery: &Option<Arc<DeliveryBus>>,
717 dynamodb_state: &Option<SharedDynamoDbState>,
718 shared_state: &SharedStepFunctionsState,
719 execution_arn: &str,
720 entered_event_id: i64,
721) -> Result<Value, (String, String)> {
722 let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
723
724 let input_path = state_def["InputPath"].as_str();
725 let result_path = state_def["ResultPath"].as_str();
726 let output_path = state_def["OutputPath"].as_str();
727
728 let effective_input = if input_path == Some("null") {
729 json!({})
730 } else {
731 apply_input_path(input, input_path)
732 };
733
734 let task_input = if let Some(params) = state_def.get("Parameters") {
735 apply_parameters(params, &effective_input)
736 } else {
737 effective_input
738 };
739
740 let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
741 let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
742 let mut attempt = 0u32;
743
744 loop {
745 add_event(
746 shared_state,
747 execution_arn,
748 "TaskScheduled",
749 entered_event_id,
750 json!({
751 "resource": resource,
752 "region": "us-east-1",
753 "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
754 }),
755 );
756
757 add_event(
758 shared_state,
759 execution_arn,
760 "TaskStarted",
761 entered_event_id,
762 json!({ "resource": resource }),
763 );
764
765 let invoke_result = invoke_resource(
766 &resource,
767 &task_input,
768 delivery,
769 dynamodb_state,
770 timeout_seconds,
771 )
772 .await;
773
774 match invoke_result {
775 Ok(result) => {
776 add_event(
777 shared_state,
778 execution_arn,
779 "TaskSucceeded",
780 entered_event_id,
781 json!({
782 "resource": resource,
783 "output": serde_json::to_string(&result).unwrap_or_default(),
784 }),
785 );
786
787 let selected = if let Some(selector) = state_def.get("ResultSelector") {
788 apply_parameters(selector, &result)
789 } else {
790 result
791 };
792
793 let after_result = if result_path == Some("null") {
794 input.clone()
795 } else {
796 apply_result_path(input, &selected, result_path)
797 };
798
799 let output = if output_path == Some("null") {
800 json!({})
801 } else {
802 apply_output_path(&after_result, output_path)
803 };
804
805 return Ok(output);
806 }
807 Err((error, cause)) => {
808 add_event(
809 shared_state,
810 execution_arn,
811 "TaskFailed",
812 entered_event_id,
813 json!({ "error": error, "cause": cause }),
814 );
815
816 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
817 attempt += 1;
818 let actual_delay = delay_ms.min(5000);
819 tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
820 continue;
821 }
822
823 return Err((error, cause));
824 }
825 }
826 }
827}
828
829async fn execute_parallel_state(
831 state_def: &Value,
832 input: &Value,
833 delivery: &Option<Arc<DeliveryBus>>,
834 dynamodb_state: &Option<SharedDynamoDbState>,
835 shared_state: &SharedStepFunctionsState,
836 execution_arn: &str,
837) -> Result<Value, (String, String)> {
838 let input_path = state_def["InputPath"].as_str();
839 let result_path = state_def["ResultPath"].as_str();
840 let output_path = state_def["OutputPath"].as_str();
841
842 let effective_input = if input_path == Some("null") {
843 json!({})
844 } else {
845 apply_input_path(input, input_path)
846 };
847
848 let branches = state_def["Branches"]
849 .as_array()
850 .cloned()
851 .unwrap_or_default();
852
853 if branches.is_empty() {
854 return Err((
855 "States.Runtime".to_string(),
856 "Parallel state has no Branches".to_string(),
857 ));
858 }
859
860 let mut handles = Vec::new();
862 for branch_def in &branches {
863 let branch = branch_def.clone();
864 let branch_input = effective_input.clone();
865 let delivery = delivery.clone();
866 let ddb = dynamodb_state.clone();
867 let state = shared_state.clone();
868 let arn = execution_arn.to_string();
869
870 handles.push(tokio::spawn(async move {
871 run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
872 }));
873 }
874
875 let mut results = Vec::with_capacity(handles.len());
877 for handle in handles {
878 let result = handle.await.map_err(|e| {
879 (
880 "States.Runtime".to_string(),
881 format!("Branch execution panicked: {e}"),
882 )
883 })??;
884 results.push(result);
885 }
886
887 let branch_output = Value::Array(results);
888
889 let selected = if let Some(selector) = state_def.get("ResultSelector") {
891 apply_parameters(selector, &branch_output)
892 } else {
893 branch_output
894 };
895
896 let after_result = if result_path == Some("null") {
898 input.clone()
899 } else {
900 apply_result_path(input, &selected, result_path)
901 };
902
903 let output = if output_path == Some("null") {
905 json!({})
906 } else {
907 apply_output_path(&after_result, output_path)
908 };
909
910 Ok(output)
911}
912
913async fn execute_map_state(
915 state_def: &Value,
916 input: &Value,
917 delivery: &Option<Arc<DeliveryBus>>,
918 dynamodb_state: &Option<SharedDynamoDbState>,
919 shared_state: &SharedStepFunctionsState,
920 execution_arn: &str,
921) -> Result<Value, (String, String)> {
922 let input_path = state_def["InputPath"].as_str();
923 let result_path = state_def["ResultPath"].as_str();
924 let output_path = state_def["OutputPath"].as_str();
925
926 let effective_input = if input_path == Some("null") {
927 json!({})
928 } else {
929 apply_input_path(input, input_path)
930 };
931
932 let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
934 let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
935 let items = items_value.as_array().cloned().unwrap_or_default();
936
937 let iterator_def = state_def
939 .get("ItemProcessor")
940 .or_else(|| state_def.get("Iterator"))
941 .cloned()
942 .ok_or_else(|| {
943 (
944 "States.Runtime".to_string(),
945 "Map state has no ItemProcessor or Iterator".to_string(),
946 )
947 })?;
948
949 let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
950 let effective_concurrency = if max_concurrency == 0 {
951 40
952 } else {
953 max_concurrency as usize
954 };
955
956 let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
957
958 let mut handles = Vec::new();
960 for (index, item) in items.into_iter().enumerate() {
961 let iter_def = iterator_def.clone();
962 let delivery = delivery.clone();
963 let ddb = dynamodb_state.clone();
964 let state = shared_state.clone();
965 let arn = execution_arn.to_string();
966 let sem = semaphore.clone();
967
968 let item_input = if let Some(selector) = state_def.get("ItemSelector") {
970 let mut ctx = serde_json::Map::new();
971 ctx.insert("value".to_string(), item.clone());
972 ctx.insert("index".to_string(), json!(index));
973 apply_parameters(selector, &Value::Object(ctx))
974 } else {
975 item
976 };
977
978 add_event(
979 shared_state,
980 execution_arn,
981 "MapIterationStarted",
982 0,
983 json!({ "index": index }),
984 );
985
986 handles.push(tokio::spawn(async move {
987 let _permit = sem
988 .acquire()
989 .await
990 .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
991 let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
992 Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
993 }));
994 }
995
996 let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
998 for handle in handles {
999 let (index, result) = handle.await.map_err(|e| {
1000 (
1001 "States.Runtime".to_string(),
1002 format!("Map iteration panicked: {e}"),
1003 )
1004 })??;
1005
1006 match result {
1007 Ok(output) => {
1008 add_event(
1009 shared_state,
1010 execution_arn,
1011 "MapIterationSucceeded",
1012 0,
1013 json!({ "index": index }),
1014 );
1015 results.push((index, output));
1016 }
1017 Err((error, cause)) => {
1018 add_event(
1019 shared_state,
1020 execution_arn,
1021 "MapIterationFailed",
1022 0,
1023 json!({ "index": index, "error": error }),
1024 );
1025 return Err((error, cause));
1026 }
1027 }
1028 }
1029
1030 results.sort_by_key(|(i, _)| *i);
1032 let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
1033
1034 let selected = if let Some(selector) = state_def.get("ResultSelector") {
1036 apply_parameters(selector, &map_output)
1037 } else {
1038 map_output
1039 };
1040
1041 let after_result = if result_path == Some("null") {
1043 input.clone()
1044 } else {
1045 apply_result_path(input, &selected, result_path)
1046 };
1047
1048 let output = if output_path == Some("null") {
1050 json!({})
1051 } else {
1052 apply_output_path(&after_result, output_path)
1053 };
1054
1055 Ok(output)
1056}
1057
1058async fn invoke_resource(
1060 resource: &str,
1061 input: &Value,
1062 delivery: &Option<Arc<DeliveryBus>>,
1063 dynamodb_state: &Option<SharedDynamoDbState>,
1064 timeout_seconds: Option<u64>,
1065) -> Result<Value, (String, String)> {
1066 if resource.contains(":lambda:") && resource.contains(":function:") {
1068 return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1069 }
1070
1071 if resource.starts_with("arn:aws:states:::lambda:invoke") {
1073 let function_name = input["FunctionName"].as_str().unwrap_or("");
1074 let payload = if let Some(p) = input.get("Payload") {
1075 p.clone()
1076 } else {
1077 input.clone()
1078 };
1079 return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1080 }
1081
1082 if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1083 return invoke_sqs_send_message(input, delivery);
1084 }
1085
1086 if resource.starts_with("arn:aws:states:::sns:publish") {
1087 return invoke_sns_publish(input, delivery);
1088 }
1089
1090 if resource.starts_with("arn:aws:states:::events:putEvents") {
1091 return invoke_eventbridge_put_events(input, delivery);
1092 }
1093
1094 if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1095 return invoke_dynamodb_get_item(input, dynamodb_state);
1096 }
1097
1098 if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1099 return invoke_dynamodb_put_item(input, dynamodb_state);
1100 }
1101
1102 if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1103 return invoke_dynamodb_delete_item(input, dynamodb_state);
1104 }
1105
1106 if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1107 return invoke_dynamodb_update_item(input, dynamodb_state);
1108 }
1109
1110 Err((
1111 "States.TaskFailed".to_string(),
1112 format!("Unsupported resource: {resource}"),
1113 ))
1114}
1115
1116fn invoke_sqs_send_message(
1118 input: &Value,
1119 delivery: &Option<Arc<DeliveryBus>>,
1120) -> Result<Value, (String, String)> {
1121 let delivery = delivery.as_ref().ok_or_else(|| {
1122 (
1123 "States.TaskFailed".to_string(),
1124 "No delivery bus configured for SQS".to_string(),
1125 )
1126 })?;
1127
1128 let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1129 (
1130 "States.TaskFailed".to_string(),
1131 "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1132 )
1133 })?;
1134
1135 let message_body = input["MessageBody"]
1136 .as_str()
1137 .map(|s| s.to_string())
1138 .unwrap_or_else(|| {
1139 serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1141 });
1142
1143 let queue_arn = queue_url_to_arn(queue_url);
1147
1148 delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1149
1150 Ok(json!({
1151 "MessageId": uuid::Uuid::new_v4().to_string(),
1152 "MD5OfMessageBody": md5_hex(&message_body),
1153 }))
1154}
1155
1156fn invoke_sns_publish(
1158 input: &Value,
1159 delivery: &Option<Arc<DeliveryBus>>,
1160) -> Result<Value, (String, String)> {
1161 let delivery = delivery.as_ref().ok_or_else(|| {
1162 (
1163 "States.TaskFailed".to_string(),
1164 "No delivery bus configured for SNS".to_string(),
1165 )
1166 })?;
1167
1168 let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1169 (
1170 "States.TaskFailed".to_string(),
1171 "Missing TopicArn in SNS publish parameters".to_string(),
1172 )
1173 })?;
1174
1175 let message = input["Message"]
1176 .as_str()
1177 .map(|s| s.to_string())
1178 .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1179
1180 let subject = input["Subject"].as_str();
1181
1182 delivery.publish_to_sns(topic_arn, &message, subject);
1183
1184 Ok(json!({
1185 "MessageId": uuid::Uuid::new_v4().to_string(),
1186 }))
1187}
1188
1189fn invoke_eventbridge_put_events(
1191 input: &Value,
1192 delivery: &Option<Arc<DeliveryBus>>,
1193) -> Result<Value, (String, String)> {
1194 let delivery = delivery.as_ref().ok_or_else(|| {
1195 (
1196 "States.TaskFailed".to_string(),
1197 "No delivery bus configured for EventBridge".to_string(),
1198 )
1199 })?;
1200
1201 let entries = input["Entries"]
1202 .as_array()
1203 .ok_or_else(|| {
1204 (
1205 "States.TaskFailed".to_string(),
1206 "Missing Entries in EventBridge putEvents parameters".to_string(),
1207 )
1208 })?
1209 .clone();
1210
1211 let mut event_ids = Vec::new();
1212 for entry in &entries {
1213 let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1214 let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1215 let detail = entry["Detail"]
1216 .as_str()
1217 .map(|s| s.to_string())
1218 .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1219 let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1220
1221 delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1222 event_ids.push(uuid::Uuid::new_v4().to_string());
1223 }
1224
1225 Ok(json!({
1226 "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1227 "FailedEntryCount": 0,
1228 }))
1229}
1230
1231fn invoke_dynamodb_get_item(
1233 input: &Value,
1234 dynamodb_state: &Option<SharedDynamoDbState>,
1235) -> Result<Value, (String, String)> {
1236 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1237 (
1238 "States.TaskFailed".to_string(),
1239 "No DynamoDB state configured".to_string(),
1240 )
1241 })?;
1242
1243 let table_name = input["TableName"].as_str().ok_or_else(|| {
1244 (
1245 "States.TaskFailed".to_string(),
1246 "Missing TableName in DynamoDB getItem parameters".to_string(),
1247 )
1248 })?;
1249
1250 let key = input
1251 .get("Key")
1252 .and_then(|k| k.as_object())
1253 .ok_or_else(|| {
1254 (
1255 "States.TaskFailed".to_string(),
1256 "Missing Key in DynamoDB getItem parameters".to_string(),
1257 )
1258 })?;
1259
1260 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1261
1262 let __mas = ddb.read();
1263 let state = __mas.default_ref();
1264 let table = state.tables.get(table_name).ok_or_else(|| {
1265 (
1266 "States.TaskFailed".to_string(),
1267 format!("Table '{table_name}' not found"),
1268 )
1269 })?;
1270
1271 let item = table
1272 .find_item_index(&key_map)
1273 .map(|idx| table.items[idx].clone());
1274
1275 match item {
1276 Some(item_map) => {
1277 let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1278 Ok(json!({ "Item": item_value }))
1279 }
1280 None => Ok(json!({})),
1281 }
1282}
1283
1284fn invoke_dynamodb_put_item(
1286 input: &Value,
1287 dynamodb_state: &Option<SharedDynamoDbState>,
1288) -> Result<Value, (String, String)> {
1289 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1290 (
1291 "States.TaskFailed".to_string(),
1292 "No DynamoDB state configured".to_string(),
1293 )
1294 })?;
1295
1296 let table_name = input["TableName"].as_str().ok_or_else(|| {
1297 (
1298 "States.TaskFailed".to_string(),
1299 "Missing TableName in DynamoDB putItem parameters".to_string(),
1300 )
1301 })?;
1302
1303 let item = input
1304 .get("Item")
1305 .and_then(|i| i.as_object())
1306 .ok_or_else(|| {
1307 (
1308 "States.TaskFailed".to_string(),
1309 "Missing Item in DynamoDB putItem parameters".to_string(),
1310 )
1311 })?;
1312
1313 let item_map: HashMap<String, Value> =
1314 item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1315
1316 let mut __mas = ddb.write();
1317 let state = __mas.default_mut();
1318 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1319 (
1320 "States.TaskFailed".to_string(),
1321 format!("Table '{table_name}' not found"),
1322 )
1323 })?;
1324
1325 if let Some(idx) = table.find_item_index(&item_map) {
1327 table.items[idx] = item_map;
1328 } else {
1329 table.items.push(item_map);
1330 }
1331
1332 Ok(json!({}))
1333}
1334
1335fn invoke_dynamodb_delete_item(
1337 input: &Value,
1338 dynamodb_state: &Option<SharedDynamoDbState>,
1339) -> Result<Value, (String, String)> {
1340 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1341 (
1342 "States.TaskFailed".to_string(),
1343 "No DynamoDB state configured".to_string(),
1344 )
1345 })?;
1346
1347 let table_name = input["TableName"].as_str().ok_or_else(|| {
1348 (
1349 "States.TaskFailed".to_string(),
1350 "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1351 )
1352 })?;
1353
1354 let key = input
1355 .get("Key")
1356 .and_then(|k| k.as_object())
1357 .ok_or_else(|| {
1358 (
1359 "States.TaskFailed".to_string(),
1360 "Missing Key in DynamoDB deleteItem parameters".to_string(),
1361 )
1362 })?;
1363
1364 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1365
1366 let mut __mas = ddb.write();
1367 let state = __mas.default_mut();
1368 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1369 (
1370 "States.TaskFailed".to_string(),
1371 format!("Table '{table_name}' not found"),
1372 )
1373 })?;
1374
1375 if let Some(idx) = table.find_item_index(&key_map) {
1376 table.items.remove(idx);
1377 }
1378
1379 Ok(json!({}))
1380}
1381
1382fn invoke_dynamodb_update_item(
1385 input: &Value,
1386 dynamodb_state: &Option<SharedDynamoDbState>,
1387) -> Result<Value, (String, String)> {
1388 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1389 (
1390 "States.TaskFailed".to_string(),
1391 "No DynamoDB state configured".to_string(),
1392 )
1393 })?;
1394
1395 let table_name = input["TableName"].as_str().ok_or_else(|| {
1396 (
1397 "States.TaskFailed".to_string(),
1398 "Missing TableName in DynamoDB updateItem parameters".to_string(),
1399 )
1400 })?;
1401
1402 let key = input
1403 .get("Key")
1404 .and_then(|k| k.as_object())
1405 .ok_or_else(|| {
1406 (
1407 "States.TaskFailed".to_string(),
1408 "Missing Key in DynamoDB updateItem parameters".to_string(),
1409 )
1410 })?;
1411
1412 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1413
1414 let mut __mas = ddb.write();
1415 let state = __mas.default_mut();
1416 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1417 (
1418 "States.TaskFailed".to_string(),
1419 format!("Table '{table_name}' not found"),
1420 )
1421 })?;
1422
1423 if let Some(update_expr) = input["UpdateExpression"].as_str() {
1425 let attr_values = input
1426 .get("ExpressionAttributeValues")
1427 .and_then(|v| v.as_object())
1428 .cloned()
1429 .unwrap_or_default();
1430 let attr_names = input
1431 .get("ExpressionAttributeNames")
1432 .and_then(|v| v.as_object())
1433 .cloned()
1434 .unwrap_or_default();
1435
1436 if let Some(idx) = table.find_item_index(&key_map) {
1437 apply_update_expression(
1438 &mut table.items[idx],
1439 update_expr,
1440 &attr_values,
1441 &attr_names,
1442 );
1443 } else {
1444 let mut new_item = key_map;
1446 apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1447 table.items.push(new_item);
1448 }
1449 }
1450
1451 Ok(json!({}))
1452}
1453
1454fn apply_update_expression(
1456 item: &mut HashMap<String, Value>,
1457 expr: &str,
1458 attr_values: &serde_json::Map<String, Value>,
1459 attr_names: &serde_json::Map<String, Value>,
1460) {
1461 let trimmed = expr.trim();
1464 let set_part = if trimmed.len() >= 4 && trimmed[..4].eq_ignore_ascii_case("SET ") {
1465 &trimmed[4..]
1466 } else {
1467 trimmed
1468 };
1469
1470 for assignment in set_part.split(',') {
1471 let parts: Vec<&str> = assignment.splitn(2, '=').collect();
1472 if parts.len() == 2 {
1473 let attr_ref = parts[0].trim();
1474 let val_ref = parts[1].trim();
1475
1476 let attr_name = if attr_ref.starts_with('#') {
1478 attr_names
1479 .get(attr_ref)
1480 .and_then(|v| v.as_str())
1481 .unwrap_or(attr_ref)
1482 .to_string()
1483 } else {
1484 attr_ref.to_string()
1485 };
1486
1487 if val_ref.starts_with(':') {
1489 if let Some(val) = attr_values.get(val_ref) {
1490 item.insert(attr_name, val.clone());
1491 }
1492 }
1493 }
1494 }
1495}
1496
1497fn queue_url_to_arn(url: &str) -> String {
1500 let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1501 if parts.len() >= 2 {
1502 let queue_name = parts[0];
1503 let account_id = parts[1];
1504 Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
1505 } else {
1506 url.to_string()
1507 }
1508}
1509
1510fn md5_hex(data: &str) -> String {
1512 use md5::Digest;
1513 let result = md5::Md5::digest(data.as_bytes());
1514 format!("{result:032x}")
1515}
1516
1517async fn invoke_lambda_direct(
1519 function_arn: &str,
1520 input: &Value,
1521 delivery: &Option<Arc<DeliveryBus>>,
1522 timeout_seconds: Option<u64>,
1523) -> Result<Value, (String, String)> {
1524 let delivery = delivery.as_ref().ok_or_else(|| {
1525 (
1526 "States.TaskFailed".to_string(),
1527 "No delivery bus configured for Lambda invocation".to_string(),
1528 )
1529 })?;
1530
1531 let payload = serde_json::to_string(input).unwrap_or_default();
1532
1533 let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1534
1535 let result = if let Some(timeout) = timeout_seconds {
1536 match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1537 Ok(r) => r,
1538 Err(_) => {
1539 return Err((
1540 "States.Timeout".to_string(),
1541 format!("Task timed out after {timeout} seconds"),
1542 ));
1543 }
1544 }
1545 } else {
1546 invoke_future.await
1547 };
1548
1549 match result {
1550 Some(Ok(bytes)) => {
1551 let response_str = String::from_utf8_lossy(&bytes);
1552 let value: Value =
1553 serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1554 Ok(value)
1555 }
1556 Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1557 None => {
1558 Ok(json!({}))
1560 }
1561 }
1562}
1563
1564fn apply_parameters(template: &Value, input: &Value) -> Value {
1566 match template {
1567 Value::Object(map) => {
1568 let mut result = serde_json::Map::new();
1569 for (key, value) in map {
1570 if let Some(stripped) = key.strip_suffix(".$") {
1571 if let Some(path) = value.as_str() {
1572 result.insert(
1573 stripped.to_string(),
1574 crate::io_processing::resolve_path(input, path),
1575 );
1576 }
1577 } else {
1578 result.insert(key.clone(), apply_parameters(value, input));
1579 }
1580 }
1581 Value::Object(result)
1582 }
1583 Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1584 other => other.clone(),
1585 }
1586}
1587
1588enum NextState {
1589 Name(String),
1590 End,
1591 Error(String),
1592}
1593
1594fn next_state(state_def: &Value) -> NextState {
1595 if state_def["End"].as_bool() == Some(true) {
1596 return NextState::End;
1597 }
1598 match state_def["Next"].as_str() {
1599 Some(next) => NextState::Name(next.to_string()),
1600 None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
1601 }
1602}
1603
1604fn apply_state_catcher(
1609 state_def: &Value,
1610 effective_input: &Value,
1611 error: &str,
1612 cause: &str,
1613) -> Option<(String, Value)> {
1614 let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
1615 let (next, result_path) = find_catcher(&catchers, error)?;
1616 let error_output = json!({
1617 "Error": error,
1618 "Cause": cause,
1619 });
1620 let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
1621 Some((next, new_input))
1622}
1623
1624fn account_id_from_arn(arn: &str) -> &str {
1626 arn.split(':').nth(4).unwrap_or("000000000000")
1627}
1628
1629fn add_event(
1630 state: &SharedStepFunctionsState,
1631 execution_arn: &str,
1632 event_type: &str,
1633 previous_event_id: i64,
1634 details: Value,
1635) -> i64 {
1636 let account_id = account_id_from_arn(execution_arn).to_string();
1637 let mut accounts = state.write();
1638 let s = accounts.get_or_create(&account_id);
1639 if let Some(exec) = s.executions.get_mut(execution_arn) {
1640 let id = exec.history_events.len() as i64 + 1;
1641 exec.history_events.push(HistoryEvent {
1642 id,
1643 event_type: event_type.to_string(),
1644 timestamp: Utc::now(),
1645 previous_event_id,
1646 details,
1647 });
1648 id
1649 } else {
1650 0
1651 }
1652}
1653
1654fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
1655 let account_id = account_id_from_arn(execution_arn).to_string();
1656 {
1658 let accounts = state.read();
1659 if let Some(s) = accounts.get(&account_id) {
1660 if let Some(exec) = s.executions.get(execution_arn) {
1661 if exec.status != ExecutionStatus::Running {
1662 return;
1663 }
1664 }
1665 }
1666 }
1667
1668 let output_str = serde_json::to_string(output).unwrap_or_default();
1669
1670 add_event(
1671 state,
1672 execution_arn,
1673 "ExecutionSucceeded",
1674 0,
1675 json!({ "output": output_str }),
1676 );
1677
1678 let mut accounts = state.write();
1679 let s = accounts.get_or_create(&account_id);
1680 if let Some(exec) = s.executions.get_mut(execution_arn) {
1681 exec.status = ExecutionStatus::Succeeded;
1682 exec.output = Some(output_str);
1683 exec.stop_date = Some(Utc::now());
1684 }
1685}
1686
1687fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
1688 let account_id = account_id_from_arn(execution_arn).to_string();
1689 {
1691 let accounts = state.read();
1692 if let Some(s) = accounts.get(&account_id) {
1693 if let Some(exec) = s.executions.get(execution_arn) {
1694 if exec.status != ExecutionStatus::Running {
1695 return;
1696 }
1697 }
1698 }
1699 }
1700
1701 add_event(
1702 state,
1703 execution_arn,
1704 "ExecutionFailed",
1705 0,
1706 json!({ "error": error, "cause": cause }),
1707 );
1708
1709 let mut accounts = state.write();
1710 let s = accounts.get_or_create(&account_id);
1711 if let Some(exec) = s.executions.get_mut(execution_arn) {
1712 exec.status = ExecutionStatus::Failed;
1713 exec.error = Some(error.to_string());
1714 exec.cause = Some(cause.to_string());
1715 exec.stop_date = Some(Utc::now());
1716 }
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721 use super::*;
1722 use crate::state::Execution;
1723 use parking_lot::RwLock;
1724 use std::sync::Arc;
1725
1726 fn make_state() -> SharedStepFunctionsState {
1727 Arc::new(RwLock::new(
1728 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1729 ))
1730 }
1731
1732 fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
1733 let mut accounts = state.write();
1734 let s = accounts.get_or_create("123456789012");
1735 s.executions.insert(
1736 arn.to_string(),
1737 Execution {
1738 execution_arn: arn.to_string(),
1739 state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
1740 .to_string(),
1741 state_machine_name: "test".to_string(),
1742 name: "exec-1".to_string(),
1743 status: ExecutionStatus::Running,
1744 input,
1745 output: None,
1746 start_date: Utc::now(),
1747 stop_date: None,
1748 error: None,
1749 cause: None,
1750 history_events: vec![],
1751 },
1752 );
1753 }
1754
1755 #[tokio::test]
1756 async fn test_simple_pass_state() {
1757 let state = make_state();
1758 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1759 create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
1760
1761 let definition = json!({
1762 "StartAt": "PassState",
1763 "States": {
1764 "PassState": {
1765 "Type": "Pass",
1766 "Result": {"processed": true},
1767 "End": true
1768 }
1769 }
1770 })
1771 .to_string();
1772
1773 execute_state_machine(
1774 state.clone(),
1775 arn.to_string(),
1776 definition,
1777 Some(r#"{"hello":"world"}"#.to_string()),
1778 None,
1779 None,
1780 )
1781 .await;
1782
1783 let __a = state.read();
1784 let s = __a.default_ref();
1785 let exec = s.executions.get(arn).unwrap();
1786 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1787 assert!(exec.output.is_some());
1788 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1789 assert_eq!(output, json!({"processed": true}));
1790 }
1791
1792 #[tokio::test]
1793 async fn test_pass_chain() {
1794 let state = make_state();
1795 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1796 create_execution(&state, arn, Some(r#"{}"#.to_string()));
1797
1798 let definition = json!({
1799 "StartAt": "First",
1800 "States": {
1801 "First": {
1802 "Type": "Pass",
1803 "Result": "step1",
1804 "ResultPath": "$.first",
1805 "Next": "Second"
1806 },
1807 "Second": {
1808 "Type": "Pass",
1809 "Result": "step2",
1810 "ResultPath": "$.second",
1811 "End": true
1812 }
1813 }
1814 })
1815 .to_string();
1816
1817 execute_state_machine(
1818 state.clone(),
1819 arn.to_string(),
1820 definition,
1821 Some("{}".to_string()),
1822 None,
1823 None,
1824 )
1825 .await;
1826
1827 let __a = state.read();
1828 let s = __a.default_ref();
1829 let exec = s.executions.get(arn).unwrap();
1830 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1831 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1832 assert_eq!(output["first"], json!("step1"));
1833 assert_eq!(output["second"], json!("step2"));
1834 }
1835
1836 #[tokio::test]
1837 async fn test_succeed_state() {
1838 let state = make_state();
1839 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1840 create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
1841
1842 let definition = json!({
1843 "StartAt": "Done",
1844 "States": {
1845 "Done": {
1846 "Type": "Succeed"
1847 }
1848 }
1849 })
1850 .to_string();
1851
1852 execute_state_machine(
1853 state.clone(),
1854 arn.to_string(),
1855 definition,
1856 Some(r#"{"data": "value"}"#.to_string()),
1857 None,
1858 None,
1859 )
1860 .await;
1861
1862 let __a = state.read();
1863 let s = __a.default_ref();
1864 let exec = s.executions.get(arn).unwrap();
1865 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1866 }
1867
1868 #[tokio::test]
1869 async fn test_fail_state() {
1870 let state = make_state();
1871 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1872 create_execution(&state, arn, None);
1873
1874 let definition = json!({
1875 "StartAt": "FailState",
1876 "States": {
1877 "FailState": {
1878 "Type": "Fail",
1879 "Error": "CustomError",
1880 "Cause": "Something went wrong"
1881 }
1882 }
1883 })
1884 .to_string();
1885
1886 execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
1887
1888 let __a = state.read();
1889 let s = __a.default_ref();
1890 let exec = s.executions.get(arn).unwrap();
1891 assert_eq!(exec.status, ExecutionStatus::Failed);
1892 assert_eq!(exec.error.as_deref(), Some("CustomError"));
1893 assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
1894 }
1895
1896 #[tokio::test]
1897 async fn test_history_events_recorded() {
1898 let state = make_state();
1899 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1900 create_execution(&state, arn, Some("{}".to_string()));
1901
1902 let definition = json!({
1903 "StartAt": "PassState",
1904 "States": {
1905 "PassState": {
1906 "Type": "Pass",
1907 "End": true
1908 }
1909 }
1910 })
1911 .to_string();
1912
1913 execute_state_machine(
1914 state.clone(),
1915 arn.to_string(),
1916 definition,
1917 Some("{}".to_string()),
1918 None,
1919 None,
1920 )
1921 .await;
1922
1923 let __a = state.read();
1924 let s = __a.default_ref();
1925 let exec = s.executions.get(arn).unwrap();
1926 let event_types: Vec<&str> = exec
1927 .history_events
1928 .iter()
1929 .map(|e| e.event_type.as_str())
1930 .collect();
1931 assert_eq!(
1932 event_types,
1933 vec![
1934 "ExecutionStarted",
1935 "PassStateEntered",
1936 "PassStateExited",
1937 "ExecutionSucceeded"
1938 ]
1939 );
1940 }
1941
1942 fn drive(state: &SharedStepFunctionsState, arn: &str, def: Value, input: Option<&str>) {
1943 create_execution(state, arn, input.map(|s| s.to_string()));
1944 let fut = execute_state_machine(
1945 state.clone(),
1946 arn.to_string(),
1947 def.to_string(),
1948 input.map(|s| s.to_string()),
1949 None,
1950 None,
1951 );
1952 let rt = tokio::runtime::Builder::new_current_thread()
1953 .enable_time()
1954 .build()
1955 .unwrap();
1956 rt.block_on(fut);
1957 }
1958
1959 fn read_exec<R>(
1960 state: &SharedStepFunctionsState,
1961 arn: &str,
1962 f: impl FnOnce(&Execution) -> R,
1963 ) -> R {
1964 let __a = state.read();
1965 let s = __a.default_ref();
1966 f(s.executions.get(arn).expect("execution missing"))
1967 }
1968
1969 fn arn_for(name: &str) -> String {
1970 format!("arn:aws:states:us-east-1:123456789012:execution:test:{name}")
1971 }
1972
1973 #[test]
1976 fn pass_state_input_output_path_select_fields() {
1977 let state = make_state();
1978 let arn = arn_for("pass-paths");
1979 let def = json!({
1980 "StartAt": "P",
1981 "States": {
1982 "P": {
1983 "Type": "Pass",
1984 "InputPath": "$.inner",
1985 "OutputPath": "$.kept",
1986 "End": true
1987 }
1988 }
1989 });
1990 drive(
1991 &state,
1992 &arn,
1993 def,
1994 Some(r#"{"inner":{"kept":"yes","dropped":true},"sibling":1}"#),
1995 );
1996
1997 read_exec(&state, &arn, |exec| {
1998 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1999 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2000 assert_eq!(output, json!("yes"));
2001 });
2002 }
2003
2004 #[test]
2007 fn succeed_state_honors_input_path_null() {
2008 let state = make_state();
2009 let arn = arn_for("succeed-null");
2010 let def = json!({
2011 "StartAt": "S",
2012 "States": { "S": { "Type": "Succeed", "InputPath": "null" } }
2013 });
2014 drive(&state, &arn, def, Some(r#"{"a":1}"#));
2015
2016 read_exec(&state, &arn, |exec| {
2017 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2018 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2019 assert_eq!(output, json!({}));
2020 });
2021 }
2022
2023 #[test]
2024 fn fail_state_defaults_when_fields_missing() {
2025 let state = make_state();
2026 let arn = arn_for("fail-default");
2027 let def = json!({
2028 "StartAt": "F",
2029 "States": { "F": { "Type": "Fail" } }
2030 });
2031 drive(&state, &arn, def, None);
2032
2033 read_exec(&state, &arn, |exec| {
2034 assert_eq!(exec.status, ExecutionStatus::Failed);
2035 assert_eq!(exec.error.as_deref(), Some("States.Fail"));
2036 assert_eq!(exec.cause.as_deref(), Some(""));
2037 });
2038 }
2039
2040 fn choice_def() -> Value {
2043 json!({
2044 "StartAt": "C",
2045 "States": {
2046 "C": {
2047 "Type": "Choice",
2048 "Choices": [
2049 {
2050 "Variable": "$.n",
2051 "NumericGreaterThan": 10,
2052 "Next": "Big"
2053 }
2054 ],
2055 "Default": "Small"
2056 },
2057 "Big": { "Type": "Pass", "Result": "big", "End": true },
2058 "Small": { "Type": "Pass", "Result": "small", "End": true }
2059 }
2060 })
2061 }
2062
2063 #[test]
2064 fn choice_routes_to_matching_branch() {
2065 let state = make_state();
2066 let arn = arn_for("choice-big");
2067 drive(&state, &arn, choice_def(), Some(r#"{"n":42}"#));
2068
2069 read_exec(&state, &arn, |exec| {
2070 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2071 assert_eq!(
2072 serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2073 json!("big")
2074 );
2075 });
2076 }
2077
2078 #[test]
2079 fn choice_falls_through_to_default() {
2080 let state = make_state();
2081 let arn = arn_for("choice-default");
2082 drive(&state, &arn, choice_def(), Some(r#"{"n":3}"#));
2083
2084 read_exec(&state, &arn, |exec| {
2085 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2086 assert_eq!(
2087 serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2088 json!("small")
2089 );
2090 });
2091 }
2092
2093 #[test]
2094 fn choice_no_match_and_no_default_fails() {
2095 let state = make_state();
2096 let arn = arn_for("choice-nomatch");
2097 let def = json!({
2098 "StartAt": "C",
2099 "States": {
2100 "C": {
2101 "Type": "Choice",
2102 "Choices": [
2103 { "Variable": "$.n", "NumericEquals": 1, "Next": "End1" }
2104 ]
2105 },
2106 "End1": { "Type": "Pass", "End": true }
2107 }
2108 });
2109 drive(&state, &arn, def, Some(r#"{"n":99}"#));
2110
2111 read_exec(&state, &arn, |exec| {
2112 assert_eq!(exec.status, ExecutionStatus::Failed);
2113 assert_eq!(exec.error.as_deref(), Some("States.NoChoiceMatched"));
2114 });
2115 }
2116
2117 #[test]
2120 fn wait_seconds_then_advances() {
2121 let state = make_state();
2122 let arn = arn_for("wait-secs");
2123 let def = json!({
2124 "StartAt": "W",
2125 "States": {
2126 "W": { "Type": "Wait", "Seconds": 0, "Next": "Done" },
2127 "Done": { "Type": "Succeed" }
2128 }
2129 });
2130 drive(&state, &arn, def, Some(r#"{"k":1}"#));
2131
2132 read_exec(&state, &arn, |exec| {
2133 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2134 });
2135 }
2136
2137 #[test]
2138 fn wait_timestamp_in_past_is_noop() {
2139 let state = make_state();
2140 let arn = arn_for("wait-past");
2141 let def = json!({
2142 "StartAt": "W",
2143 "States": {
2144 "W": {
2145 "Type": "Wait",
2146 "Timestamp": "2000-01-01T00:00:00Z",
2147 "End": true
2148 }
2149 }
2150 });
2151 drive(&state, &arn, def, Some(r#"{"k":1}"#));
2152
2153 read_exec(&state, &arn, |exec| {
2154 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2155 });
2156 }
2157
2158 #[test]
2159 fn wait_without_any_duration_falls_through() {
2160 let state = make_state();
2161 let arn = arn_for("wait-none");
2162 let def = json!({
2163 "StartAt": "W",
2164 "States": { "W": { "Type": "Wait", "End": true } }
2165 });
2166 drive(&state, &arn, def, None);
2167
2168 read_exec(&state, &arn, |exec| {
2169 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2170 });
2171 }
2172
2173 #[test]
2176 fn parallel_runs_two_pass_branches_and_collects_results() {
2177 let state = make_state();
2178 let arn = arn_for("parallel-ok");
2179 let def = json!({
2180 "StartAt": "P",
2181 "States": {
2182 "P": {
2183 "Type": "Parallel",
2184 "End": true,
2185 "Branches": [
2186 {
2187 "StartAt": "A",
2188 "States": { "A": { "Type": "Pass", "Result": "a", "End": true } }
2189 },
2190 {
2191 "StartAt": "B",
2192 "States": { "B": { "Type": "Pass", "Result": "b", "End": true } }
2193 }
2194 ]
2195 }
2196 }
2197 });
2198 drive(&state, &arn, def, Some(r#"{}"#));
2199
2200 read_exec(&state, &arn, |exec| {
2201 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2202 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2203 assert_eq!(output, json!(["a", "b"]));
2204 });
2205 }
2206
2207 #[test]
2208 fn parallel_empty_branches_fails() {
2209 let state = make_state();
2210 let arn = arn_for("parallel-empty");
2211 let def = json!({
2212 "StartAt": "P",
2213 "States": { "P": { "Type": "Parallel", "Branches": [], "End": true } }
2214 });
2215 drive(&state, &arn, def, None);
2216
2217 read_exec(&state, &arn, |exec| {
2218 assert_eq!(exec.status, ExecutionStatus::Failed);
2219 assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2220 });
2221 }
2222
2223 #[test]
2226 fn map_iterates_pass_state_over_items_path() {
2227 let state = make_state();
2228 let arn = arn_for("map-ok");
2229 let def = json!({
2230 "StartAt": "M",
2231 "States": {
2232 "M": {
2233 "Type": "Map",
2234 "ItemsPath": "$.items",
2235 "Iterator": {
2236 "StartAt": "Item",
2237 "States": { "Item": { "Type": "Pass", "End": true } }
2238 },
2239 "End": true
2240 }
2241 }
2242 });
2243 drive(&state, &arn, def, Some(r#"{"items":[1,2,3]}"#));
2244
2245 read_exec(&state, &arn, |exec| {
2246 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2247 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2248 assert_eq!(output, json!([1, 2, 3]));
2249 });
2250 }
2251
2252 #[test]
2255 fn task_unsupported_resource_propagates_failure() {
2256 let state = make_state();
2257 let arn = arn_for("task-unsupported");
2258 let def = json!({
2259 "StartAt": "T",
2260 "States": {
2261 "T": {
2262 "Type": "Task",
2263 "Resource": "arn:aws:states:::nothing:here",
2264 "End": true
2265 }
2266 }
2267 });
2268 drive(&state, &arn, def, None);
2269
2270 read_exec(&state, &arn, |exec| {
2271 assert_eq!(exec.status, ExecutionStatus::Failed);
2272 assert_eq!(exec.error.as_deref(), Some("States.TaskFailed"));
2273 assert!(exec.cause.as_deref().unwrap().contains("Unsupported"));
2274 });
2275 }
2276
2277 #[test]
2278 fn task_sqs_send_without_delivery_fails() {
2279 let state = make_state();
2280 let arn = arn_for("task-sqs-nodelivery");
2281 let def = json!({
2282 "StartAt": "T",
2283 "States": {
2284 "T": {
2285 "Type": "Task",
2286 "Resource": "arn:aws:states:::sqs:sendMessage",
2287 "Parameters": { "QueueUrl": "http://localhost/123/q", "MessageBody": "hi" },
2288 "End": true
2289 }
2290 }
2291 });
2292 drive(&state, &arn, def, Some("{}"));
2293
2294 read_exec(&state, &arn, |exec| {
2295 assert_eq!(exec.status, ExecutionStatus::Failed);
2296 assert!(exec.cause.as_deref().unwrap().contains("delivery bus"));
2297 });
2298 }
2299
2300 #[test]
2303 fn task_catch_routes_error_into_handler() {
2304 let state = make_state();
2305 let arn = arn_for("task-catch");
2306 let def = json!({
2307 "StartAt": "T",
2308 "States": {
2309 "T": {
2310 "Type": "Task",
2311 "Resource": "arn:aws:states:::nothing:here",
2312 "Catch": [
2313 { "ErrorEquals": ["States.ALL"], "Next": "Handler", "ResultPath": "$.err" }
2314 ],
2315 "End": true
2316 },
2317 "Handler": { "Type": "Pass", "End": true }
2318 }
2319 });
2320 drive(&state, &arn, def, Some(r#"{"orig":"v"}"#));
2321
2322 read_exec(&state, &arn, |exec| {
2323 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2324 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2325 assert_eq!(output["orig"], json!("v"));
2327 assert_eq!(output["err"]["Error"], json!("States.TaskFailed"));
2328 });
2329 }
2330
2331 #[test]
2334 fn invalid_definition_json_fails_execution() {
2335 let state = make_state();
2336 let arn = arn_for("bad-json");
2337 create_execution(&state, &arn, None);
2338 let rt = tokio::runtime::Builder::new_current_thread()
2339 .enable_time()
2340 .build()
2341 .unwrap();
2342 rt.block_on(execute_state_machine(
2343 state.clone(),
2344 arn.clone(),
2345 "not json{".to_string(),
2346 None,
2347 None,
2348 None,
2349 ));
2350
2351 read_exec(&state, &arn, |exec| {
2352 assert_eq!(exec.status, ExecutionStatus::Failed);
2353 assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2354 assert!(exec.cause.as_deref().unwrap().contains("Failed to parse"));
2355 });
2356 }
2357
2358 #[test]
2359 fn missing_start_at_fails_execution() {
2360 let state = make_state();
2361 let arn = arn_for("no-startat");
2362 let def = json!({ "States": { "X": { "Type": "Succeed" } } });
2363 drive(&state, &arn, def, None);
2364
2365 read_exec(&state, &arn, |exec| {
2366 assert_eq!(exec.status, ExecutionStatus::Failed);
2367 assert!(exec.cause.as_deref().unwrap().contains("StartAt"));
2368 });
2369 }
2370
2371 #[test]
2372 fn next_state_not_found_fails_execution() {
2373 let state = make_state();
2374 let arn = arn_for("dangling-next");
2375 let def = json!({
2376 "StartAt": "A",
2377 "States": { "A": { "Type": "Pass", "Next": "DoesNotExist" } }
2378 });
2379 drive(&state, &arn, def, None);
2380
2381 read_exec(&state, &arn, |exec| {
2382 assert_eq!(exec.status, ExecutionStatus::Failed);
2383 assert!(exec.cause.as_deref().unwrap().contains("not found"));
2384 });
2385 }
2386
2387 #[test]
2388 fn unsupported_state_type_fails_execution() {
2389 let state = make_state();
2390 let arn = arn_for("bad-type");
2391 let def = json!({
2392 "StartAt": "X",
2393 "States": { "X": { "Type": "WatChoo", "End": true } }
2394 });
2395 drive(&state, &arn, def, None);
2396
2397 read_exec(&state, &arn, |exec| {
2398 assert_eq!(exec.status, ExecutionStatus::Failed);
2399 assert!(exec
2400 .cause
2401 .as_deref()
2402 .unwrap()
2403 .contains("Unsupported state type"));
2404 });
2405 }
2406
2407 #[test]
2410 fn apply_parameters_substitutes_json_path_refs() {
2411 let template = json!({
2412 "literal": "constant",
2413 "ref.$": "$.user.id",
2414 "nested": { "inner.$": "$.user.name" },
2415 "list": [ { "x.$": "$.user.id" } ]
2416 });
2417 let input = json!({ "user": { "id": 42, "name": "zoe" } });
2418 let out = apply_parameters(&template, &input);
2419 assert_eq!(out["literal"], json!("constant"));
2420 assert_eq!(out["ref"], json!(42));
2421 assert_eq!(out["nested"]["inner"], json!("zoe"));
2422 assert_eq!(out["list"][0]["x"], json!(42));
2423 }
2424
2425 #[test]
2426 fn next_state_returns_end_name_or_error() {
2427 match next_state(&json!({ "End": true })) {
2428 NextState::End => {}
2429 _ => panic!("expected End"),
2430 }
2431 match next_state(&json!({ "Next": "A" })) {
2432 NextState::Name(n) => assert_eq!(n, "A"),
2433 _ => panic!("expected Name"),
2434 }
2435 match next_state(&json!({})) {
2436 NextState::Error(_) => {}
2437 _ => panic!("expected Error"),
2438 }
2439 }
2440
2441 #[test]
2442 fn apply_state_catcher_matches_wildcard_and_stashes_error() {
2443 let state_def = json!({
2444 "Catch": [
2445 { "ErrorEquals": ["States.ALL"], "Next": "H", "ResultPath": "$.caught" }
2446 ]
2447 });
2448 let input = json!({ "a": 1 });
2449 let (next, new_input) =
2450 apply_state_catcher(&state_def, &input, "Boom", "it exploded").unwrap();
2451 assert_eq!(next, "H");
2452 assert_eq!(new_input["a"], json!(1));
2453 assert_eq!(new_input["caught"]["Error"], json!("Boom"));
2454 assert_eq!(new_input["caught"]["Cause"], json!("it exploded"));
2455 }
2456
2457 #[test]
2458 fn apply_state_catcher_returns_none_without_match() {
2459 let state_def = json!({
2460 "Catch": [
2461 { "ErrorEquals": ["Specific.Error"], "Next": "H" }
2462 ]
2463 });
2464 let input = json!({});
2465 assert!(apply_state_catcher(&state_def, &input, "Other", "why").is_none());
2466 }
2467
2468 #[test]
2469 fn queue_url_to_arn_parses_account_and_name() {
2470 assert_eq!(
2471 queue_url_to_arn("http://sqs.local:4566/123456789012/my-queue"),
2472 "arn:aws:sqs:us-east-1:123456789012:my-queue"
2473 );
2474 }
2475
2476 #[test]
2477 fn queue_url_to_arn_falls_back_for_unparseable_input() {
2478 assert_eq!(queue_url_to_arn("bad"), "bad");
2479 }
2480
2481 #[test]
2482 fn md5_hex_is_deterministic_and_32_chars() {
2483 let a = md5_hex("hello");
2484 let b = md5_hex("hello");
2485 assert_eq!(a, b);
2486 assert_eq!(a.len(), 32);
2487 assert_ne!(a, md5_hex("world"));
2488 }
2489
2490 #[test]
2491 fn apply_update_expression_sets_direct_and_aliased_attrs() {
2492 let mut item: HashMap<String, Value> = HashMap::new();
2493 item.insert("id".to_string(), json!({"S": "1"}));
2494
2495 let mut attr_values = serde_json::Map::new();
2496 attr_values.insert(":n".to_string(), json!({"S": "Alice"}));
2497 attr_values.insert(":c".to_string(), json!({"N": "5"}));
2498
2499 let mut attr_names = serde_json::Map::new();
2500 attr_names.insert("#name".to_string(), json!("name"));
2501
2502 apply_update_expression(
2503 &mut item,
2504 "SET #name = :n, count = :c",
2505 &attr_values,
2506 &attr_names,
2507 );
2508
2509 assert_eq!(item.get("name").unwrap(), &json!({"S": "Alice"}));
2510 assert_eq!(item.get("count").unwrap(), &json!({"N": "5"}));
2511 assert_eq!(item.get("id").unwrap(), &json!({"S": "1"}));
2512 }
2513
2514 #[test]
2515 fn apply_update_expression_accepts_lowercase_set_keyword() {
2516 let mut item: HashMap<String, Value> = HashMap::new();
2517 let mut attr_values = serde_json::Map::new();
2518 attr_values.insert(":v".to_string(), json!({"S": "x"}));
2519 apply_update_expression(
2520 &mut item,
2521 "set field = :v",
2522 &attr_values,
2523 &serde_json::Map::new(),
2524 );
2525 assert_eq!(item.get("field").unwrap(), &json!({"S": "x"}));
2526 }
2527
2528 #[test]
2531 fn task_dynamodb_get_item_without_state_fails() {
2532 let state = make_state();
2533 let arn = arn_for("ddb-get-nostate");
2534 let def = json!({
2535 "StartAt": "T",
2536 "States": {
2537 "T": {
2538 "Type": "Task",
2539 "Resource": "arn:aws:states:::dynamodb:getItem",
2540 "Parameters": { "TableName": "t", "Key": { "id": { "S": "1" } } },
2541 "End": true
2542 }
2543 }
2544 });
2545 drive(&state, &arn, def, Some("{}"));
2546 read_exec(&state, &arn, |exec| {
2547 assert_eq!(exec.status, ExecutionStatus::Failed);
2548 assert!(exec.cause.as_deref().unwrap().contains("DynamoDB"));
2549 });
2550 }
2551
2552 #[test]
2555 fn succeed_execution_is_noop_when_already_terminal() {
2556 let state = make_state();
2557 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already";
2558 create_execution(&state, arn, None);
2559 {
2560 let mut __a = state.write();
2561 let s = __a.default_mut();
2562 s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Failed;
2563 }
2564 succeed_execution(&state, arn, &json!({"x":1}));
2565 let __a = state.read();
2566 let s = __a.default_ref();
2567 let exec = s.executions.get(arn).unwrap();
2568 assert_eq!(exec.status, ExecutionStatus::Failed);
2569 assert!(exec.output.is_none());
2570 }
2571
2572 #[test]
2573 fn fail_execution_is_noop_when_already_terminal() {
2574 let state = make_state();
2575 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already2";
2576 create_execution(&state, arn, None);
2577 {
2578 let mut __a = state.write();
2579 let s = __a.default_mut();
2580 s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Succeeded;
2581 }
2582 fail_execution(&state, arn, "Oops", "nope");
2583 let __a = state.read();
2584 let s = __a.default_ref();
2585 let exec = s.executions.get(arn).unwrap();
2586 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2587 assert!(exec.error.is_none());
2588 }
2589
2590 #[test]
2593 fn pass_state_result_path_merges_into_input() {
2594 let state = make_state();
2595 let arn = arn_for("result-path");
2596 let def = json!({
2597 "StartAt": "P",
2598 "States": {
2599 "P": {"Type": "Pass", "Result": {"x": 2}, "ResultPath": "$.data", "End": true}
2600 }
2601 });
2602 drive(&state, &arn, def, Some(r#"{"a":1}"#));
2603 let output = read_exec(&state, &arn, |e| e.output.clone().unwrap_or_default());
2604 let v: Value = serde_json::from_str(&output).unwrap();
2605 assert_eq!(v["a"], 1);
2606 assert_eq!(v["data"]["x"], 2);
2607 }
2608
2609 #[test]
2612 fn choice_string_greater_than_equals() {
2613 let state = make_state();
2614 let arn = arn_for("choice-sgte");
2615 let def = json!({
2616 "StartAt": "C",
2617 "States": {
2618 "C": {
2619 "Type": "Choice",
2620 "Choices": [
2621 {"Variable": "$.val", "StringGreaterThanEquals": "apple", "Next": "End"}
2622 ],
2623 "Default": "Fail"
2624 },
2625 "End": {"Type": "Pass", "End": true},
2626 "Fail": {"Type": "Fail"}
2627 }
2628 });
2629 drive(&state, &arn, def, Some(r#"{"val":"banana"}"#));
2630 let status = read_exec(&state, &arn, |e| e.status);
2631 assert_eq!(status, ExecutionStatus::Succeeded);
2632 }
2633
2634 #[test]
2635 fn choice_is_present_and_is_null() {
2636 let state = make_state();
2637 let arn = arn_for("choice-ispres");
2638 let def = json!({
2639 "StartAt": "C",
2640 "States": {
2641 "C": {
2642 "Type": "Choice",
2643 "Choices": [
2644 {"Variable": "$.foo", "IsPresent": true, "Next": "End"}
2645 ],
2646 "Default": "Fail"
2647 },
2648 "End": {"Type": "Pass", "End": true},
2649 "Fail": {"Type": "Fail"}
2650 }
2651 });
2652 drive(&state, &arn, def, Some(r#"{"foo":null}"#));
2653 assert_eq!(
2654 read_exec(&state, &arn, |e| e.status),
2655 ExecutionStatus::Succeeded
2656 );
2657 }
2658
2659 #[test]
2660 fn choice_or_short_circuits() {
2661 let state = make_state();
2662 let arn = arn_for("choice-or");
2663 let def = json!({
2664 "StartAt": "C",
2665 "States": {
2666 "C": {
2667 "Type": "Choice",
2668 "Choices": [{
2669 "Or": [
2670 {"Variable": "$.x", "NumericEquals": 99},
2671 {"Variable": "$.y", "StringEquals": "b"}
2672 ],
2673 "Next": "End"
2674 }],
2675 "Default": "Fail"
2676 },
2677 "End": {"Type": "Pass", "End": true},
2678 "Fail": {"Type": "Fail"}
2679 }
2680 });
2681 drive(&state, &arn, def, Some(r#"{"x":1,"y":"b"}"#));
2682 assert_eq!(
2683 read_exec(&state, &arn, |e| e.status),
2684 ExecutionStatus::Succeeded
2685 );
2686 }
2687
2688 #[test]
2689 fn choice_not_negates() {
2690 let state = make_state();
2691 let arn = arn_for("choice-not");
2692 let def = json!({
2693 "StartAt": "C",
2694 "States": {
2695 "C": {
2696 "Type": "Choice",
2697 "Choices": [{
2698 "Not": {"Variable": "$.x", "NumericEquals": 99},
2699 "Next": "End"
2700 }],
2701 "Default": "Fail"
2702 },
2703 "End": {"Type": "Pass", "End": true},
2704 "Fail": {"Type": "Fail"}
2705 }
2706 });
2707 drive(&state, &arn, def, Some(r#"{"x":1}"#));
2708 assert_eq!(
2709 read_exec(&state, &arn, |e| e.status),
2710 ExecutionStatus::Succeeded
2711 );
2712 }
2713
2714 #[test]
2715 fn choice_boolean_equals() {
2716 let state = make_state();
2717 let arn = arn_for("choice-bool");
2718 let def = json!({
2719 "StartAt": "C",
2720 "States": {
2721 "C": {
2722 "Type": "Choice",
2723 "Choices": [
2724 {"Variable": "$.ok", "BooleanEquals": true, "Next": "End"}
2725 ],
2726 "Default": "Fail"
2727 },
2728 "End": {"Type": "Pass", "End": true},
2729 "Fail": {"Type": "Fail"}
2730 }
2731 });
2732 drive(&state, &arn, def, Some(r#"{"ok":true}"#));
2733 assert_eq!(
2734 read_exec(&state, &arn, |e| e.status),
2735 ExecutionStatus::Succeeded
2736 );
2737 }
2738
2739 #[test]
2742 fn wait_seconds_path_uses_input_value() {
2743 let state = make_state();
2744 let arn = arn_for("wait-sp");
2745 let def = json!({
2746 "StartAt": "W",
2747 "States": {
2748 "W": {"Type": "Wait", "SecondsPath": "$.wait", "End": true}
2749 }
2750 });
2751 drive(&state, &arn, def, Some(r#"{"wait":0}"#));
2752 assert_eq!(
2753 read_exec(&state, &arn, |e| e.status),
2754 ExecutionStatus::Succeeded
2755 );
2756 }
2757
2758 #[test]
2761 fn map_state_empty_array_succeeds() {
2762 let state = make_state();
2763 let arn = arn_for("map-empty");
2764 let def = json!({
2765 "StartAt": "M",
2766 "States": {
2767 "M": {
2768 "Type": "Map",
2769 "ItemsPath": "$.items",
2770 "ItemProcessor": {
2771 "StartAt": "P",
2772 "States": {
2773 "P": {"Type": "Pass", "End": true}
2774 }
2775 },
2776 "End": true
2777 }
2778 }
2779 });
2780 drive(&state, &arn, def, Some(r#"{"items":[]}"#));
2781 assert_eq!(
2782 read_exec(&state, &arn, |e| e.status),
2783 ExecutionStatus::Succeeded
2784 );
2785 }
2786
2787 #[test]
2790 fn fail_state_with_explicit_error_and_cause() {
2791 let state = make_state();
2792 let arn = arn_for("fail-fields");
2793 create_execution(&state, &arn, None);
2794 let def = json!({
2795 "StartAt": "F",
2796 "States": {
2797 "F": {"Type": "Fail", "Error": "MyError", "Cause": "my cause"}
2798 }
2799 });
2800 drive(&state, &arn, def, None);
2801 let status = read_exec(&state, &arn, |e| e.status);
2802 assert_eq!(status, ExecutionStatus::Failed);
2803 let err = read_exec(&state, &arn, |e| e.error.clone().unwrap_or_default());
2804 assert_eq!(err, "MyError");
2805 }
2806}