1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::state::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
16
17pub async fn execute_state_machine(
20 state: SharedStepFunctionsState,
21 execution_arn: String,
22 definition: String,
23 input: Option<String>,
24 delivery: Option<Arc<DeliveryBus>>,
25 dynamodb_state: Option<SharedDynamoDbState>,
26) {
27 let def: Value = match serde_json::from_str(&definition) {
28 Ok(v) => v,
29 Err(e) => {
30 fail_execution(
31 &state,
32 &execution_arn,
33 "States.Runtime",
34 &format!("Failed to parse definition: {e}"),
35 );
36 return;
37 }
38 };
39
40 let raw_input: Value = input
41 .as_deref()
42 .and_then(|s| serde_json::from_str(s).ok())
43 .unwrap_or(json!({}));
44
45 add_event(
47 &state,
48 &execution_arn,
49 "ExecutionStarted",
50 0,
51 json!({
52 "input": serde_json::to_string(&raw_input).expect("serde_json::Value serialization is infallible"),
53 "roleArn": "arn:aws:iam::123456789012:role/execution-role"
54 }),
55 );
56
57 let def_owned = def;
63 let state_clone = state.clone();
64 let execution_arn_clone = execution_arn.clone();
65 let delivery_clone = delivery.clone();
66 let dynamodb_state_clone = dynamodb_state.clone();
67 let handle = tokio::spawn(async move {
68 run_states(
69 &def_owned,
70 raw_input,
71 &delivery_clone,
72 &dynamodb_state_clone,
73 &state_clone,
74 &execution_arn_clone,
75 )
76 .await
77 });
78
79 match handle.await {
80 Ok(Ok(output)) => {
81 succeed_execution(&state, &execution_arn, &output);
82 }
83 Ok(Err((error, cause))) => {
84 fail_execution(&state, &execution_arn, &error, &cause);
85 }
86 Err(join_err) => {
87 let msg = if join_err.is_panic() {
88 let payload = join_err.into_panic();
89 if let Some(s) = payload.downcast_ref::<String>() {
90 s.clone()
91 } else if let Some(s) = payload.downcast_ref::<&'static str>() {
92 (*s).to_string()
93 } else {
94 "execution task panicked".to_string()
95 }
96 } else {
97 format!("execution task cancelled: {join_err}")
98 };
99 tracing::error!(
100 execution_arn = %execution_arn,
101 panic = %msg,
102 "Step Functions execution panicked"
103 );
104 fail_execution(&state, &execution_arn, "States.Runtime", &msg);
105 }
106 }
107}
108
109type StatesResult<'a> = std::pin::Pin<
110 Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
111>;
112
113fn run_states<'a>(
116 def: &'a Value,
117 input: Value,
118 delivery: &'a Option<Arc<DeliveryBus>>,
119 dynamodb_state: &'a Option<SharedDynamoDbState>,
120 shared_state: &'a SharedStepFunctionsState,
121 execution_arn: &'a str,
122) -> StatesResult<'a> {
123 Box::pin(async move {
124 let start_at = def["StartAt"]
125 .as_str()
126 .ok_or_else(|| {
127 (
128 "States.Runtime".to_string(),
129 "Missing StartAt in definition".to_string(),
130 )
131 })?
132 .to_string();
133
134 let states = def.get("States").ok_or_else(|| {
135 (
136 "States.Runtime".to_string(),
137 "Missing States in definition".to_string(),
138 )
139 })?;
140
141 let mut current_state = start_at;
142 let mut effective_input = input;
143 let mut iteration = 0;
144 let max_iterations = 500;
145
146 loop {
147 iteration += 1;
148 if iteration > max_iterations {
149 return Err((
150 "States.Runtime".to_string(),
151 "Maximum number of state transitions exceeded".to_string(),
152 ));
153 }
154
155 let state_def = states.get(¤t_state).cloned().ok_or_else(|| {
156 (
157 "States.Runtime".to_string(),
158 format!("State '{current_state}' not found in definition"),
159 )
160 })?;
161
162 let state_type = state_def["Type"]
163 .as_str()
164 .ok_or_else(|| {
165 (
166 "States.Runtime".to_string(),
167 format!("State '{current_state}' missing Type field"),
168 )
169 })?
170 .to_string();
171
172 debug!(
173 execution_arn = %execution_arn,
174 state = %current_state,
175 state_type = %state_type,
176 "Executing state"
177 );
178
179 let advance = match state_type.as_str() {
180 "Pass" => run_pass_state(
181 ¤t_state,
182 &state_def,
183 effective_input,
184 shared_state,
185 execution_arn,
186 ),
187 "Succeed" => run_succeed_state(
188 ¤t_state,
189 &state_def,
190 effective_input,
191 shared_state,
192 execution_arn,
193 ),
194 "Fail" => run_fail_state(
195 ¤t_state,
196 &state_def,
197 effective_input,
198 shared_state,
199 execution_arn,
200 ),
201 "Choice" => run_choice_state(
202 ¤t_state,
203 &state_def,
204 effective_input,
205 shared_state,
206 execution_arn,
207 ),
208 "Wait" => {
209 run_wait_state(
210 ¤t_state,
211 &state_def,
212 effective_input,
213 shared_state,
214 execution_arn,
215 )
216 .await
217 }
218 "Task" => {
219 run_task_state(
220 ¤t_state,
221 &state_def,
222 effective_input,
223 delivery,
224 dynamodb_state,
225 shared_state,
226 execution_arn,
227 )
228 .await
229 }
230 "Parallel" => {
231 run_parallel_state(
232 ¤t_state,
233 &state_def,
234 effective_input,
235 delivery,
236 dynamodb_state,
237 shared_state,
238 execution_arn,
239 )
240 .await
241 }
242 "Map" => {
243 run_map_state(
244 ¤t_state,
245 &state_def,
246 effective_input,
247 delivery,
248 dynamodb_state,
249 shared_state,
250 execution_arn,
251 )
252 .await
253 }
254 other => Advance::Fail(
255 "States.Runtime".to_string(),
256 format!("Unsupported state type: '{other}'"),
257 ),
258 };
259
260 match advance {
261 Advance::Next(next, new_input) => {
262 effective_input = new_input;
263 current_state = next;
264 }
265 Advance::End(output) => return Ok(output),
266 Advance::Fail(error, cause) => return Err((error, cause)),
267 }
268 }
269 })
270}
271
272enum Advance {
274 Next(String, Value),
276 End(Value),
278 Fail(String, String),
280}
281
282fn advance_from_next(state_def: &Value, input: Value) -> Advance {
283 match next_state(state_def) {
284 NextState::Name(next) => Advance::Next(next, input),
285 NextState::End => Advance::End(input),
286 NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
287 }
288}
289
290fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
291 match apply_state_catcher(state_def, input, &error, &cause) {
292 Some((next, new_input)) => Advance::Next(next, new_input),
293 None => Advance::Fail(error, cause),
294 }
295}
296
297fn run_pass_state(
298 name: &str,
299 state_def: &Value,
300 input: Value,
301 shared_state: &SharedStepFunctionsState,
302 execution_arn: &str,
303) -> Advance {
304 let entered_event_id = add_event(
305 shared_state,
306 execution_arn,
307 "PassStateEntered",
308 0,
309 json!({
310 "name": name,
311 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
312 }),
313 );
314
315 let result = execute_pass_state(state_def, &input);
316
317 add_event(
318 shared_state,
319 execution_arn,
320 "PassStateExited",
321 entered_event_id,
322 json!({
323 "name": name,
324 "output": serde_json::to_string(&result).expect("serde_json::Value serialization is infallible"),
325 }),
326 );
327
328 advance_from_next(state_def, result)
329}
330
331fn run_succeed_state(
332 name: &str,
333 state_def: &Value,
334 input: Value,
335 shared_state: &SharedStepFunctionsState,
336 execution_arn: &str,
337) -> Advance {
338 add_event(
339 shared_state,
340 execution_arn,
341 "SucceedStateEntered",
342 0,
343 json!({
344 "name": name,
345 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
346 }),
347 );
348
349 let input_path = state_def["InputPath"].as_str();
350 let output_path = state_def["OutputPath"].as_str();
351
352 let processed = if input_path == Some("null") {
353 json!({})
354 } else {
355 apply_input_path(&input, input_path)
356 };
357
358 let output = if output_path == Some("null") {
359 json!({})
360 } else {
361 apply_output_path(&processed, output_path)
362 };
363
364 add_event(
365 shared_state,
366 execution_arn,
367 "SucceedStateExited",
368 0,
369 json!({
370 "name": name,
371 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
372 }),
373 );
374
375 Advance::End(output)
376}
377
378fn run_fail_state(
379 name: &str,
380 state_def: &Value,
381 input: Value,
382 shared_state: &SharedStepFunctionsState,
383 execution_arn: &str,
384) -> Advance {
385 let error = state_def["Error"]
386 .as_str()
387 .unwrap_or("States.Fail")
388 .to_string();
389 let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
390
391 add_event(
392 shared_state,
393 execution_arn,
394 "FailStateEntered",
395 0,
396 json!({
397 "name": name,
398 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
399 }),
400 );
401
402 Advance::Fail(error, cause)
403}
404
405fn run_choice_state(
406 name: &str,
407 state_def: &Value,
408 input: Value,
409 shared_state: &SharedStepFunctionsState,
410 execution_arn: &str,
411) -> Advance {
412 let entered_event_id = add_event(
413 shared_state,
414 execution_arn,
415 "ChoiceStateEntered",
416 0,
417 json!({
418 "name": name,
419 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
420 }),
421 );
422
423 let input_path = state_def["InputPath"].as_str();
424 let processed_input = if input_path == Some("null") {
425 json!({})
426 } else {
427 apply_input_path(&input, input_path)
428 };
429
430 match evaluate_choice(state_def, &processed_input) {
431 Some(next) => {
432 add_event(
433 shared_state,
434 execution_arn,
435 "ChoiceStateExited",
436 entered_event_id,
437 json!({
438 "name": name,
439 "output": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
440 }),
441 );
442 Advance::Next(next, input)
443 }
444 None => Advance::Fail(
445 "States.NoChoiceMatched".to_string(),
446 format!("No choice rule matched and no Default in state '{name}'"),
447 ),
448 }
449}
450
451async fn run_wait_state(
452 name: &str,
453 state_def: &Value,
454 input: Value,
455 shared_state: &SharedStepFunctionsState,
456 execution_arn: &str,
457) -> Advance {
458 let entered_event_id = add_event(
459 shared_state,
460 execution_arn,
461 "WaitStateEntered",
462 0,
463 json!({
464 "name": name,
465 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
466 }),
467 );
468
469 execute_wait_state(state_def, &input).await;
470
471 add_event(
472 shared_state,
473 execution_arn,
474 "WaitStateExited",
475 entered_event_id,
476 json!({
477 "name": name,
478 "output": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
479 }),
480 );
481
482 advance_from_next(state_def, input)
483}
484
485#[allow(clippy::too_many_arguments)]
486async fn run_task_state(
487 name: &str,
488 state_def: &Value,
489 input: Value,
490 delivery: &Option<Arc<DeliveryBus>>,
491 dynamodb_state: &Option<SharedDynamoDbState>,
492 shared_state: &SharedStepFunctionsState,
493 execution_arn: &str,
494) -> Advance {
495 let entered_event_id = add_event(
496 shared_state,
497 execution_arn,
498 "TaskStateEntered",
499 0,
500 json!({
501 "name": name,
502 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
503 }),
504 );
505
506 let result = execute_task_state(
507 state_def,
508 &input,
509 delivery,
510 dynamodb_state,
511 shared_state,
512 execution_arn,
513 entered_event_id,
514 )
515 .await;
516
517 match result {
518 Ok(output) => {
519 add_event(
520 shared_state,
521 execution_arn,
522 "TaskStateExited",
523 entered_event_id,
524 json!({
525 "name": name,
526 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
527 }),
528 );
529 advance_from_next(state_def, output)
530 }
531 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
532 }
533}
534
535#[allow(clippy::too_many_arguments)]
536async fn run_parallel_state(
537 name: &str,
538 state_def: &Value,
539 input: Value,
540 delivery: &Option<Arc<DeliveryBus>>,
541 dynamodb_state: &Option<SharedDynamoDbState>,
542 shared_state: &SharedStepFunctionsState,
543 execution_arn: &str,
544) -> Advance {
545 let entered_event_id = add_event(
546 shared_state,
547 execution_arn,
548 "ParallelStateEntered",
549 0,
550 json!({
551 "name": name,
552 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
553 }),
554 );
555
556 let result = execute_parallel_state(
557 state_def,
558 &input,
559 delivery,
560 dynamodb_state,
561 shared_state,
562 execution_arn,
563 )
564 .await;
565
566 match result {
567 Ok(output) => {
568 add_event(
569 shared_state,
570 execution_arn,
571 "ParallelStateExited",
572 entered_event_id,
573 json!({
574 "name": name,
575 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
576 }),
577 );
578 advance_from_next(state_def, output)
579 }
580 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
581 }
582}
583
584#[allow(clippy::too_many_arguments)]
585async fn run_map_state(
586 name: &str,
587 state_def: &Value,
588 input: Value,
589 delivery: &Option<Arc<DeliveryBus>>,
590 dynamodb_state: &Option<SharedDynamoDbState>,
591 shared_state: &SharedStepFunctionsState,
592 execution_arn: &str,
593) -> Advance {
594 let entered_event_id = add_event(
595 shared_state,
596 execution_arn,
597 "MapStateEntered",
598 0,
599 json!({
600 "name": name,
601 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
602 }),
603 );
604
605 let result = execute_map_state(
606 state_def,
607 &input,
608 delivery,
609 dynamodb_state,
610 shared_state,
611 execution_arn,
612 )
613 .await;
614
615 match result {
616 Ok(output) => {
617 add_event(
618 shared_state,
619 execution_arn,
620 "MapStateExited",
621 entered_event_id,
622 json!({
623 "name": name,
624 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
625 }),
626 );
627 advance_from_next(state_def, output)
628 }
629 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
630 }
631}
632
633async fn execute_wait_state(state_def: &Value, input: &Value) {
635 if let Some(seconds) = state_def["Seconds"].as_u64() {
636 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
637 return;
638 }
639
640 if let Some(path) = state_def["SecondsPath"].as_str() {
641 let val = crate::io_processing::resolve_path(input, path);
642 if let Some(seconds) = val.as_u64() {
643 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
644 }
645 return;
646 }
647
648 if let Some(ts_str) = state_def["Timestamp"].as_str() {
649 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
650 let now = Utc::now();
651 let target_utc = target.with_timezone(&chrono::Utc);
652 if target_utc > now {
653 let duration = (target_utc - now).to_std().unwrap_or_default();
654 tokio::time::sleep(duration).await;
655 }
656 }
657 return;
658 }
659
660 if let Some(path) = state_def["TimestampPath"].as_str() {
661 let val = crate::io_processing::resolve_path(input, path);
662 if let Some(ts_str) = val.as_str() {
663 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
664 let now = Utc::now();
665 let target_utc = target.with_timezone(&chrono::Utc);
666 if target_utc > now {
667 let duration = (target_utc - now).to_std().unwrap_or_default();
668 tokio::time::sleep(duration).await;
669 }
670 }
671 }
672 return;
673 }
674
675 warn!(
676 "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
677 );
678}
679
680fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
682 let input_path = state_def["InputPath"].as_str();
683 let result_path = state_def["ResultPath"].as_str();
684 let output_path = state_def["OutputPath"].as_str();
685
686 let effective_input = if input_path == Some("null") {
687 json!({})
688 } else {
689 apply_input_path(input, input_path)
690 };
691
692 let result = if let Some(r) = state_def.get("Result") {
693 r.clone()
694 } else {
695 effective_input.clone()
696 };
697
698 let after_result = if result_path == Some("null") {
699 input.clone()
700 } else {
701 apply_result_path(input, &result, result_path)
702 };
703
704 if output_path == Some("null") {
705 json!({})
706 } else {
707 apply_output_path(&after_result, output_path)
708 }
709}
710
711async fn execute_task_state(
714 state_def: &Value,
715 input: &Value,
716 delivery: &Option<Arc<DeliveryBus>>,
717 dynamodb_state: &Option<SharedDynamoDbState>,
718 shared_state: &SharedStepFunctionsState,
719 execution_arn: &str,
720 entered_event_id: i64,
721) -> Result<Value, (String, String)> {
722 let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
723
724 let input_path = state_def["InputPath"].as_str();
725 let result_path = state_def["ResultPath"].as_str();
726 let output_path = state_def["OutputPath"].as_str();
727
728 let effective_input = if input_path == Some("null") {
729 json!({})
730 } else {
731 apply_input_path(input, input_path)
732 };
733
734 let task_input = if let Some(params) = state_def.get("Parameters") {
735 apply_parameters(params, &effective_input)
736 } else {
737 effective_input
738 };
739
740 let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
741 let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
742 let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
743 let mut attempt = 0u32;
744
745 loop {
746 add_event(
747 shared_state,
748 execution_arn,
749 "TaskScheduled",
750 entered_event_id,
751 json!({
752 "resource": resource,
753 "region": "us-east-1",
754 "parameters": serde_json::to_string(&task_input).expect("serde_json::Value serialization is infallible"),
755 }),
756 );
757
758 add_event(
759 shared_state,
760 execution_arn,
761 "TaskStarted",
762 entered_event_id,
763 json!({ "resource": resource }),
764 );
765
766 let invoke_result = invoke_resource(
767 &resource,
768 &task_input,
769 delivery,
770 dynamodb_state,
771 timeout_seconds,
772 heartbeat_seconds,
773 shared_state,
774 )
775 .await;
776
777 match invoke_result {
778 Ok(result) => {
779 add_event(
780 shared_state,
781 execution_arn,
782 "TaskSucceeded",
783 entered_event_id,
784 json!({
785 "resource": resource,
786 "output": serde_json::to_string(&result).expect("serde_json::Value serialization is infallible"),
787 }),
788 );
789
790 let selected = if let Some(selector) = state_def.get("ResultSelector") {
791 apply_parameters(selector, &result)
792 } else {
793 result
794 };
795
796 let after_result = if result_path == Some("null") {
797 input.clone()
798 } else {
799 apply_result_path(input, &selected, result_path)
800 };
801
802 let output = if output_path == Some("null") {
803 json!({})
804 } else {
805 apply_output_path(&after_result, output_path)
806 };
807
808 return Ok(output);
809 }
810 Err((error, cause)) => {
811 add_event(
812 shared_state,
813 execution_arn,
814 "TaskFailed",
815 entered_event_id,
816 json!({ "error": error, "cause": cause }),
817 );
818
819 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
820 attempt += 1;
821 let actual_delay = delay_ms.min(5000);
822 tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
823 continue;
824 }
825
826 return Err((error, cause));
827 }
828 }
829 }
830}
831
832async fn execute_parallel_state(
834 state_def: &Value,
835 input: &Value,
836 delivery: &Option<Arc<DeliveryBus>>,
837 dynamodb_state: &Option<SharedDynamoDbState>,
838 shared_state: &SharedStepFunctionsState,
839 execution_arn: &str,
840) -> Result<Value, (String, String)> {
841 let input_path = state_def["InputPath"].as_str();
842 let result_path = state_def["ResultPath"].as_str();
843 let output_path = state_def["OutputPath"].as_str();
844
845 let effective_input = if input_path == Some("null") {
846 json!({})
847 } else {
848 apply_input_path(input, input_path)
849 };
850
851 let branches = state_def["Branches"]
852 .as_array()
853 .cloned()
854 .unwrap_or_default();
855
856 if branches.is_empty() {
857 return Err((
858 "States.Runtime".to_string(),
859 "Parallel state has no Branches".to_string(),
860 ));
861 }
862
863 let mut handles = Vec::new();
865 for branch_def in &branches {
866 let branch = branch_def.clone();
867 let branch_input = effective_input.clone();
868 let delivery = delivery.clone();
869 let ddb = dynamodb_state.clone();
870 let state = shared_state.clone();
871 let arn = execution_arn.to_string();
872
873 handles.push(tokio::spawn(async move {
874 run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
875 }));
876 }
877
878 let mut results = Vec::with_capacity(handles.len());
880 for handle in handles {
881 let result = handle.await.map_err(|e| {
882 (
883 "States.Runtime".to_string(),
884 format!("Branch execution panicked: {e}"),
885 )
886 })??;
887 results.push(result);
888 }
889
890 let branch_output = Value::Array(results);
891
892 let selected = if let Some(selector) = state_def.get("ResultSelector") {
894 apply_parameters(selector, &branch_output)
895 } else {
896 branch_output
897 };
898
899 let after_result = if result_path == Some("null") {
901 input.clone()
902 } else {
903 apply_result_path(input, &selected, result_path)
904 };
905
906 let output = if output_path == Some("null") {
908 json!({})
909 } else {
910 apply_output_path(&after_result, output_path)
911 };
912
913 Ok(output)
914}
915
916async fn execute_map_state(
918 state_def: &Value,
919 input: &Value,
920 delivery: &Option<Arc<DeliveryBus>>,
921 dynamodb_state: &Option<SharedDynamoDbState>,
922 shared_state: &SharedStepFunctionsState,
923 execution_arn: &str,
924) -> Result<Value, (String, String)> {
925 let input_path = state_def["InputPath"].as_str();
926 let result_path = state_def["ResultPath"].as_str();
927 let output_path = state_def["OutputPath"].as_str();
928
929 let effective_input = if input_path == Some("null") {
930 json!({})
931 } else {
932 apply_input_path(input, input_path)
933 };
934
935 let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
937 let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
938 let items = items_value.as_array().cloned().unwrap_or_default();
939
940 let iterator_def = state_def
942 .get("ItemProcessor")
943 .or_else(|| state_def.get("Iterator"))
944 .cloned()
945 .ok_or_else(|| {
946 (
947 "States.Runtime".to_string(),
948 "Map state has no ItemProcessor or Iterator".to_string(),
949 )
950 })?;
951
952 let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
953 let effective_concurrency = if max_concurrency == 0 {
954 40
955 } else {
956 max_concurrency as usize
957 };
958
959 let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
960
961 let mut handles = Vec::new();
963 for (index, item) in items.into_iter().enumerate() {
964 let iter_def = iterator_def.clone();
965 let delivery = delivery.clone();
966 let ddb = dynamodb_state.clone();
967 let state = shared_state.clone();
968 let arn = execution_arn.to_string();
969 let sem = semaphore.clone();
970
971 let item_input = if let Some(selector) = state_def.get("ItemSelector") {
973 let mut ctx = serde_json::Map::new();
974 ctx.insert("value".to_string(), item.clone());
975 ctx.insert("index".to_string(), json!(index));
976 apply_parameters(selector, &Value::Object(ctx))
977 } else {
978 item
979 };
980
981 add_event(
982 shared_state,
983 execution_arn,
984 "MapIterationStarted",
985 0,
986 json!({ "index": index }),
987 );
988
989 handles.push(tokio::spawn(async move {
990 let _permit = sem
991 .acquire()
992 .await
993 .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
994 let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
995 Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
996 }));
997 }
998
999 let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
1001 for handle in handles {
1002 let (index, result) = handle.await.map_err(|e| {
1003 (
1004 "States.Runtime".to_string(),
1005 format!("Map iteration panicked: {e}"),
1006 )
1007 })??;
1008
1009 match result {
1010 Ok(output) => {
1011 add_event(
1012 shared_state,
1013 execution_arn,
1014 "MapIterationSucceeded",
1015 0,
1016 json!({ "index": index }),
1017 );
1018 results.push((index, output));
1019 }
1020 Err((error, cause)) => {
1021 add_event(
1022 shared_state,
1023 execution_arn,
1024 "MapIterationFailed",
1025 0,
1026 json!({ "index": index, "error": error }),
1027 );
1028 return Err((error, cause));
1029 }
1030 }
1031 }
1032
1033 results.sort_by_key(|(i, _)| *i);
1035 let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
1036
1037 let selected = if let Some(selector) = state_def.get("ResultSelector") {
1039 apply_parameters(selector, &map_output)
1040 } else {
1041 map_output
1042 };
1043
1044 let after_result = if result_path == Some("null") {
1046 input.clone()
1047 } else {
1048 apply_result_path(input, &selected, result_path)
1049 };
1050
1051 let output = if output_path == Some("null") {
1053 json!({})
1054 } else {
1055 apply_output_path(&after_result, output_path)
1056 };
1057
1058 Ok(output)
1059}
1060
1061#[allow(clippy::too_many_arguments)]
1063async fn invoke_resource(
1064 resource: &str,
1065 input: &Value,
1066 delivery: &Option<Arc<DeliveryBus>>,
1067 dynamodb_state: &Option<SharedDynamoDbState>,
1068 timeout_seconds: Option<u64>,
1069 heartbeat_seconds: Option<u64>,
1070 shared_state: &SharedStepFunctionsState,
1071) -> Result<Value, (String, String)> {
1072 if resource.contains(":states:") && resource.contains(":activity:") {
1074 return invoke_activity(
1075 resource,
1076 input,
1077 shared_state,
1078 timeout_seconds,
1079 heartbeat_seconds,
1080 )
1081 .await;
1082 }
1083
1084 if resource.contains(":lambda:") && resource.contains(":function:") {
1086 return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1087 }
1088
1089 if resource.starts_with("arn:aws:states:::lambda:invoke") {
1091 let function_name = input["FunctionName"].as_str().unwrap_or("");
1092 let payload = if let Some(p) = input.get("Payload") {
1093 p.clone()
1094 } else {
1095 input.clone()
1096 };
1097 return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1098 }
1099
1100 if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1101 return invoke_sqs_send_message(input, delivery);
1102 }
1103
1104 if resource.starts_with("arn:aws:states:::sns:publish") {
1105 return invoke_sns_publish(input, delivery);
1106 }
1107
1108 if resource.starts_with("arn:aws:states:::events:putEvents") {
1109 return invoke_eventbridge_put_events(input, delivery);
1110 }
1111
1112 if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1113 return invoke_dynamodb_get_item(input, dynamodb_state);
1114 }
1115
1116 if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1117 return invoke_dynamodb_put_item(input, dynamodb_state);
1118 }
1119
1120 if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1121 return invoke_dynamodb_delete_item(input, dynamodb_state);
1122 }
1123
1124 if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1125 return invoke_dynamodb_update_item(input, dynamodb_state);
1126 }
1127
1128 Err((
1129 "States.TaskFailed".to_string(),
1130 format!("Unsupported resource: {resource}"),
1131 ))
1132}
1133
1134fn invoke_sqs_send_message(
1136 input: &Value,
1137 delivery: &Option<Arc<DeliveryBus>>,
1138) -> Result<Value, (String, String)> {
1139 let delivery = delivery.as_ref().ok_or_else(|| {
1140 (
1141 "States.TaskFailed".to_string(),
1142 "No delivery bus configured for SQS".to_string(),
1143 )
1144 })?;
1145
1146 let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1147 (
1148 "States.TaskFailed".to_string(),
1149 "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1150 )
1151 })?;
1152
1153 let message_body = input["MessageBody"]
1154 .as_str()
1155 .map(|s| s.to_string())
1156 .unwrap_or_else(|| {
1157 serde_json::to_string(&input["MessageBody"])
1159 .expect("serde_json::Value serialization is infallible")
1160 });
1161
1162 let queue_arn = queue_url_to_arn(queue_url);
1166
1167 delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1168
1169 Ok(json!({
1170 "MessageId": uuid::Uuid::new_v4().to_string(),
1171 "MD5OfMessageBody": md5_hex(&message_body),
1172 }))
1173}
1174
1175fn invoke_sns_publish(
1177 input: &Value,
1178 delivery: &Option<Arc<DeliveryBus>>,
1179) -> Result<Value, (String, String)> {
1180 let delivery = delivery.as_ref().ok_or_else(|| {
1181 (
1182 "States.TaskFailed".to_string(),
1183 "No delivery bus configured for SNS".to_string(),
1184 )
1185 })?;
1186
1187 let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1188 (
1189 "States.TaskFailed".to_string(),
1190 "Missing TopicArn in SNS publish parameters".to_string(),
1191 )
1192 })?;
1193
1194 let message = input["Message"]
1195 .as_str()
1196 .map(|s| s.to_string())
1197 .unwrap_or_else(|| {
1198 serde_json::to_string(&input["Message"])
1199 .expect("serde_json::Value serialization is infallible")
1200 });
1201
1202 let subject = input["Subject"].as_str();
1203
1204 delivery.publish_to_sns(topic_arn, &message, subject);
1205
1206 Ok(json!({
1207 "MessageId": uuid::Uuid::new_v4().to_string(),
1208 }))
1209}
1210
1211fn invoke_eventbridge_put_events(
1213 input: &Value,
1214 delivery: &Option<Arc<DeliveryBus>>,
1215) -> Result<Value, (String, String)> {
1216 let delivery = delivery.as_ref().ok_or_else(|| {
1217 (
1218 "States.TaskFailed".to_string(),
1219 "No delivery bus configured for EventBridge".to_string(),
1220 )
1221 })?;
1222
1223 let entries = input["Entries"]
1224 .as_array()
1225 .ok_or_else(|| {
1226 (
1227 "States.TaskFailed".to_string(),
1228 "Missing Entries in EventBridge putEvents parameters".to_string(),
1229 )
1230 })?
1231 .clone();
1232
1233 let mut event_ids = Vec::new();
1234 for entry in &entries {
1235 let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1236 let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1237 let detail = entry["Detail"]
1238 .as_str()
1239 .map(|s| s.to_string())
1240 .unwrap_or_else(|| {
1241 serde_json::to_string(&entry["Detail"])
1242 .expect("serde_json::Value serialization is infallible")
1243 });
1244 let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1245
1246 delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1247 event_ids.push(uuid::Uuid::new_v4().to_string());
1248 }
1249
1250 Ok(json!({
1251 "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1252 "FailedEntryCount": 0,
1253 }))
1254}
1255
1256fn invoke_dynamodb_get_item(
1258 input: &Value,
1259 dynamodb_state: &Option<SharedDynamoDbState>,
1260) -> Result<Value, (String, String)> {
1261 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1262 (
1263 "States.TaskFailed".to_string(),
1264 "No DynamoDB state configured".to_string(),
1265 )
1266 })?;
1267
1268 let table_name = input["TableName"].as_str().ok_or_else(|| {
1269 (
1270 "States.TaskFailed".to_string(),
1271 "Missing TableName in DynamoDB getItem parameters".to_string(),
1272 )
1273 })?;
1274
1275 let key = input
1276 .get("Key")
1277 .and_then(|k| k.as_object())
1278 .ok_or_else(|| {
1279 (
1280 "States.TaskFailed".to_string(),
1281 "Missing Key in DynamoDB getItem parameters".to_string(),
1282 )
1283 })?;
1284
1285 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1286
1287 let __mas = ddb.read();
1288 let state = __mas.default_ref();
1289 let table = state.tables.get(table_name).ok_or_else(|| {
1290 (
1291 "States.TaskFailed".to_string(),
1292 format!("Table '{table_name}' not found"),
1293 )
1294 })?;
1295
1296 let item = table
1297 .find_item_index(&key_map)
1298 .map(|idx| table.items[idx].clone());
1299
1300 match item {
1301 Some(item_map) => {
1302 let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1303 Ok(json!({ "Item": item_value }))
1304 }
1305 None => Ok(json!({})),
1306 }
1307}
1308
1309fn invoke_dynamodb_put_item(
1311 input: &Value,
1312 dynamodb_state: &Option<SharedDynamoDbState>,
1313) -> Result<Value, (String, String)> {
1314 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1315 (
1316 "States.TaskFailed".to_string(),
1317 "No DynamoDB state configured".to_string(),
1318 )
1319 })?;
1320
1321 let table_name = input["TableName"].as_str().ok_or_else(|| {
1322 (
1323 "States.TaskFailed".to_string(),
1324 "Missing TableName in DynamoDB putItem parameters".to_string(),
1325 )
1326 })?;
1327
1328 let item = input
1329 .get("Item")
1330 .and_then(|i| i.as_object())
1331 .ok_or_else(|| {
1332 (
1333 "States.TaskFailed".to_string(),
1334 "Missing Item in DynamoDB putItem parameters".to_string(),
1335 )
1336 })?;
1337
1338 let item_map: HashMap<String, Value> =
1339 item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1340
1341 let mut __mas = ddb.write();
1342 let state = __mas.default_mut();
1343 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1344 (
1345 "States.TaskFailed".to_string(),
1346 format!("Table '{table_name}' not found"),
1347 )
1348 })?;
1349
1350 if let Some(idx) = table.find_item_index(&item_map) {
1352 table.items[idx] = item_map;
1353 } else {
1354 table.items.push(item_map);
1355 }
1356
1357 Ok(json!({}))
1358}
1359
1360fn invoke_dynamodb_delete_item(
1362 input: &Value,
1363 dynamodb_state: &Option<SharedDynamoDbState>,
1364) -> Result<Value, (String, String)> {
1365 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1366 (
1367 "States.TaskFailed".to_string(),
1368 "No DynamoDB state configured".to_string(),
1369 )
1370 })?;
1371
1372 let table_name = input["TableName"].as_str().ok_or_else(|| {
1373 (
1374 "States.TaskFailed".to_string(),
1375 "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1376 )
1377 })?;
1378
1379 let key = input
1380 .get("Key")
1381 .and_then(|k| k.as_object())
1382 .ok_or_else(|| {
1383 (
1384 "States.TaskFailed".to_string(),
1385 "Missing Key in DynamoDB deleteItem parameters".to_string(),
1386 )
1387 })?;
1388
1389 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1390
1391 let mut __mas = ddb.write();
1392 let state = __mas.default_mut();
1393 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1394 (
1395 "States.TaskFailed".to_string(),
1396 format!("Table '{table_name}' not found"),
1397 )
1398 })?;
1399
1400 if let Some(idx) = table.find_item_index(&key_map) {
1401 table.items.remove(idx);
1402 }
1403
1404 Ok(json!({}))
1405}
1406
1407fn invoke_dynamodb_update_item(
1412 input: &Value,
1413 dynamodb_state: &Option<SharedDynamoDbState>,
1414) -> Result<Value, (String, String)> {
1415 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1416 (
1417 "States.TaskFailed".to_string(),
1418 "No DynamoDB state configured".to_string(),
1419 )
1420 })?;
1421
1422 let table_name = input["TableName"].as_str().ok_or_else(|| {
1423 (
1424 "States.TaskFailed".to_string(),
1425 "Missing TableName in DynamoDB updateItem parameters".to_string(),
1426 )
1427 })?;
1428
1429 let key = input
1430 .get("Key")
1431 .and_then(|k| k.as_object())
1432 .ok_or_else(|| {
1433 (
1434 "States.TaskFailed".to_string(),
1435 "Missing Key in DynamoDB updateItem parameters".to_string(),
1436 )
1437 })?;
1438
1439 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1440
1441 let mut __mas = ddb.write();
1442 let state = __mas.default_mut();
1443 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1444 (
1445 "States.TaskFailed".to_string(),
1446 format!("Table '{table_name}' not found"),
1447 )
1448 })?;
1449
1450 if let Some(update_expr) = input["UpdateExpression"].as_str() {
1452 let attr_values = input
1453 .get("ExpressionAttributeValues")
1454 .and_then(|v| v.as_object())
1455 .cloned()
1456 .unwrap_or_default();
1457 let attr_names = input
1458 .get("ExpressionAttributeNames")
1459 .and_then(|v| v.as_object())
1460 .cloned()
1461 .unwrap_or_default();
1462
1463 if let Some(idx) = table.find_item_index(&key_map) {
1464 apply_update_expression(
1465 &mut table.items[idx],
1466 update_expr,
1467 &attr_values,
1468 &attr_names,
1469 );
1470 } else {
1471 let mut new_item = key_map;
1473 apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1474 table.items.push(new_item);
1475 }
1476 }
1477
1478 Ok(json!({}))
1479}
1480
1481fn apply_update_expression(
1483 item: &mut HashMap<String, Value>,
1484 expr: &str,
1485 attr_values: &serde_json::Map<String, Value>,
1486 attr_names: &serde_json::Map<String, Value>,
1487) {
1488 let clauses = split_update_clauses(expr);
1493 for (clause, body) in clauses {
1494 match clause {
1495 UpdateClause::Set => apply_set(item, &body, attr_values, attr_names),
1496 UpdateClause::Remove => apply_remove(item, &body, attr_names),
1497 UpdateClause::Add => apply_add(item, &body, attr_values, attr_names),
1498 UpdateClause::Delete => apply_delete(item, &body, attr_values, attr_names),
1499 }
1500 }
1501}
1502
1503#[derive(Clone, Copy)]
1504enum UpdateClause {
1505 Set,
1506 Remove,
1507 Add,
1508 Delete,
1509}
1510
1511fn split_update_clauses(expr: &str) -> Vec<(UpdateClause, String)> {
1512 let mut out = Vec::new();
1513 let mut current: Option<UpdateClause> = None;
1514 let mut buf = String::new();
1515 for token in expr.split_whitespace() {
1516 let upper = token.to_ascii_uppercase();
1517 let next_clause = match upper.as_str() {
1518 "SET" => Some(UpdateClause::Set),
1519 "REMOVE" => Some(UpdateClause::Remove),
1520 "ADD" => Some(UpdateClause::Add),
1521 "DELETE" => Some(UpdateClause::Delete),
1522 _ => None,
1523 };
1524 if let Some(nc) = next_clause {
1525 if let Some(prev) = current.take() {
1526 out.push((prev, buf.trim().to_string()));
1527 buf.clear();
1528 }
1529 current = Some(nc);
1530 } else if current.is_some() {
1531 if !buf.is_empty() {
1532 buf.push(' ');
1533 }
1534 buf.push_str(token);
1535 }
1536 }
1537 if let Some(c) = current {
1538 out.push((c, buf.trim().to_string()));
1539 }
1540 out
1541}
1542
1543fn resolve_attr_name(token: &str, attr_names: &serde_json::Map<String, Value>) -> String {
1544 if token.starts_with('#') {
1545 attr_names
1546 .get(token)
1547 .and_then(|v| v.as_str())
1548 .unwrap_or(token)
1549 .to_string()
1550 } else {
1551 token.to_string()
1552 }
1553}
1554
1555fn apply_set(
1556 item: &mut HashMap<String, Value>,
1557 body: &str,
1558 attr_values: &serde_json::Map<String, Value>,
1559 attr_names: &serde_json::Map<String, Value>,
1560) {
1561 for assignment in split_top_commas(body) {
1562 let Some((lhs, rhs)) = assignment.split_once('=') else {
1563 continue;
1564 };
1565 let attr_name = resolve_attr_name(lhs.trim(), attr_names);
1566 let value = evaluate_set_rhs(rhs.trim(), &attr_name, item, attr_values, attr_names);
1567 if let Some(v) = value {
1568 item.insert(attr_name, v);
1569 }
1570 }
1571}
1572
1573fn evaluate_set_rhs(
1574 rhs: &str,
1575 attr_name: &str,
1576 item: &HashMap<String, Value>,
1577 attr_values: &serde_json::Map<String, Value>,
1578 attr_names: &serde_json::Map<String, Value>,
1579) -> Option<Value> {
1580 if let Some(args) = rhs
1582 .strip_prefix("if_not_exists(")
1583 .and_then(|s| s.strip_suffix(')'))
1584 {
1585 let parts: Vec<&str> = args.splitn(2, ',').collect();
1586 if parts.len() == 2 {
1587 let path = resolve_attr_name(parts[0].trim(), attr_names);
1588 if item.contains_key(&path) {
1589 return item.get(&path).cloned();
1590 }
1591 return resolve_value(parts[1].trim(), attr_values);
1592 }
1593 return None;
1594 }
1595 for op in ['+', '-'] {
1597 if let Some((left, right)) = split_top_op(rhs, op) {
1598 let left = left.trim();
1599 let right = right.trim();
1600 let left_val = if left.starts_with(':') {
1601 resolve_value(left, attr_values)
1602 } else {
1603 let name = resolve_attr_name(left, attr_names);
1604 item.get(&name).cloned()
1605 };
1606 let right_val = if right.starts_with(':') {
1607 resolve_value(right, attr_values)
1608 } else {
1609 let name = resolve_attr_name(right, attr_names);
1610 item.get(&name).cloned()
1611 };
1612 return arithmetic(left_val.as_ref(), op, right_val.as_ref());
1613 }
1614 }
1615 if rhs.starts_with(':') {
1617 return resolve_value(rhs, attr_values);
1618 }
1619 if rhs.starts_with('#') {
1620 let _ = attr_name;
1621 let name = resolve_attr_name(rhs, attr_names);
1622 return item.get(&name).cloned();
1623 }
1624 None
1625}
1626
1627fn arithmetic(left: Option<&Value>, op: char, right: Option<&Value>) -> Option<Value> {
1628 let lf = number_from_dynamo(left?)?;
1629 let rf = number_from_dynamo(right?)?;
1630 let out = match op {
1631 '+' => lf + rf,
1632 '-' => lf - rf,
1633 _ => return None,
1634 };
1635 Some(json!({ "N": format_number(out) }))
1636}
1637
1638fn number_from_dynamo(v: &Value) -> Option<f64> {
1639 v.get("N")?.as_str()?.parse().ok()
1640}
1641
1642fn format_number(n: f64) -> String {
1643 if n.fract() == 0.0 && n.is_finite() && n >= i64::MIN as f64 && n < i64::MAX as f64 {
1647 format!("{}", n as i64)
1648 } else {
1649 format!("{n}")
1650 }
1651}
1652
1653fn resolve_value(token: &str, attr_values: &serde_json::Map<String, Value>) -> Option<Value> {
1654 attr_values.get(token).cloned()
1655}
1656
1657fn apply_remove(
1658 item: &mut HashMap<String, Value>,
1659 body: &str,
1660 attr_names: &serde_json::Map<String, Value>,
1661) {
1662 for path in split_top_commas(body) {
1663 let name = resolve_attr_name(path.trim(), attr_names);
1664 item.remove(&name);
1665 }
1666}
1667
1668fn apply_add(
1669 item: &mut HashMap<String, Value>,
1670 body: &str,
1671 attr_values: &serde_json::Map<String, Value>,
1672 attr_names: &serde_json::Map<String, Value>,
1673) {
1674 for clause in split_top_commas(body) {
1678 let mut parts = clause.split_whitespace();
1679 let Some(path) = parts.next() else { continue };
1680 let Some(value_ref) = parts.next() else {
1681 continue;
1682 };
1683 let attr_name = resolve_attr_name(path, attr_names);
1684 let Some(delta) = resolve_value(value_ref, attr_values) else {
1685 continue;
1686 };
1687 let current = item.get(&attr_name).cloned();
1688 let next = match (current.as_ref(), &delta) {
1689 (None, _) => delta.clone(),
1690 (Some(cur), _) => arithmetic(Some(cur), '+', Some(&delta)).unwrap_or(delta.clone()),
1691 };
1692 item.insert(attr_name, next);
1693 }
1694}
1695
1696fn apply_delete(
1697 item: &mut HashMap<String, Value>,
1698 body: &str,
1699 attr_values: &serde_json::Map<String, Value>,
1700 attr_names: &serde_json::Map<String, Value>,
1701) {
1702 for clause in split_top_commas(body) {
1705 let mut parts = clause.split_whitespace();
1706 let Some(path) = parts.next() else { continue };
1707 let Some(value_ref) = parts.next() else {
1708 continue;
1709 };
1710 let attr_name = resolve_attr_name(path, attr_names);
1711 let Some(elements) = resolve_value(value_ref, attr_values) else {
1712 continue;
1713 };
1714 let Some(current) = item.get_mut(&attr_name) else {
1715 continue;
1716 };
1717 for set_kind in ["SS", "NS", "BS"] {
1718 if let (Some(cur_arr), Some(rem_arr)) = (
1719 current.get_mut(set_kind).and_then(|v| v.as_array_mut()),
1720 elements.get(set_kind).and_then(|v| v.as_array()),
1721 ) {
1722 let to_remove: std::collections::HashSet<String> = rem_arr
1723 .iter()
1724 .filter_map(|v| v.as_str().map(String::from))
1725 .collect();
1726 cur_arr.retain(|v| !v.as_str().is_some_and(|s| to_remove.contains(s)));
1727 if cur_arr.is_empty() {
1728 item.remove(&attr_name);
1729 }
1730 break;
1731 }
1732 }
1733 }
1734}
1735
1736fn split_top_commas(s: &str) -> Vec<String> {
1737 let mut out = Vec::new();
1740 let mut depth = 0i32;
1741 let mut buf = String::new();
1742 for c in s.chars() {
1743 match c {
1744 '(' => {
1745 depth += 1;
1746 buf.push(c);
1747 }
1748 ')' => {
1749 depth -= 1;
1750 buf.push(c);
1751 }
1752 ',' if depth == 0 => {
1753 out.push(std::mem::take(&mut buf).trim().to_string());
1754 }
1755 _ => buf.push(c),
1756 }
1757 }
1758 if !buf.trim().is_empty() {
1759 out.push(buf.trim().to_string());
1760 }
1761 out
1762}
1763
1764fn split_top_op(s: &str, op: char) -> Option<(&str, &str)> {
1765 let mut depth = 0i32;
1766 for (i, c) in s.char_indices() {
1767 match c {
1768 '(' => depth += 1,
1769 ')' => depth -= 1,
1770 c if c == op && depth == 0 && i > 0 => {
1771 return Some((&s[..i], &s[i + c.len_utf8()..]));
1772 }
1773 _ => {}
1774 }
1775 }
1776 None
1777}
1778
1779fn queue_url_to_arn(url: &str) -> String {
1782 let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1783 if parts.len() >= 2 {
1784 let queue_name = parts[0];
1785 let account_id = parts[1];
1786 Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
1787 } else {
1788 url.to_string()
1789 }
1790}
1791
1792fn md5_hex(data: &str) -> String {
1794 use md5::Digest;
1795 let result = md5::Md5::digest(data.as_bytes());
1796 format!("{result:032x}")
1797}
1798
1799async fn invoke_lambda_direct(
1801 function_arn: &str,
1802 input: &Value,
1803 delivery: &Option<Arc<DeliveryBus>>,
1804 timeout_seconds: Option<u64>,
1805) -> Result<Value, (String, String)> {
1806 let delivery = delivery.as_ref().ok_or_else(|| {
1807 (
1808 "States.TaskFailed".to_string(),
1809 "No delivery bus configured for Lambda invocation".to_string(),
1810 )
1811 })?;
1812
1813 let payload =
1814 serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1815
1816 let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1817
1818 let result = if let Some(timeout) = timeout_seconds {
1819 match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1820 Ok(r) => r,
1821 Err(_) => {
1822 return Err((
1823 "States.Timeout".to_string(),
1824 format!("Task timed out after {timeout} seconds"),
1825 ));
1826 }
1827 }
1828 } else {
1829 invoke_future.await
1830 };
1831
1832 match result {
1833 Some(Ok(bytes)) => {
1834 let response_str = String::from_utf8_lossy(&bytes);
1835 let value: Value =
1836 serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1837 Ok(value)
1838 }
1839 Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1840 None => {
1841 Ok(json!({}))
1843 }
1844 }
1845}
1846
1847async fn invoke_activity(
1852 activity_arn: &str,
1853 input: &Value,
1854 shared_state: &SharedStepFunctionsState,
1855 timeout_seconds: Option<u64>,
1856 heartbeat_seconds: Option<u64>,
1857) -> Result<Value, (String, String)> {
1858 use crate::state::TaskTokenState;
1859
1860 let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
1862 {
1863 let accounts = shared_state.read();
1864 let exists = accounts
1865 .get(&activity_account)
1866 .map(|s| s.activities.contains_key(activity_arn))
1867 .unwrap_or(false);
1868 if !exists {
1869 return Err((
1870 "States.TaskFailed".to_string(),
1871 format!("Activity does not exist: {activity_arn}"),
1872 ));
1873 }
1874 }
1875
1876 let token = format!(
1877 "FCToken-{}-{}",
1878 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
1879 uuid::Uuid::new_v4().simple(),
1880 );
1881 let now = chrono::Utc::now();
1882 let input_str =
1883 serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1884 {
1885 let mut accounts = shared_state.write();
1886 let state = accounts.get_or_create(&activity_account);
1887 state.task_tokens.insert(
1888 token.clone(),
1889 TaskTokenState {
1890 activity_arn: activity_arn.to_string(),
1891 status: "PENDING".to_string(),
1892 output: None,
1893 error: None,
1894 cause: None,
1895 input: Some(input_str),
1896 created_at: now,
1897 last_heartbeat_at: None,
1898 heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
1899 timeout_seconds: timeout_seconds.map(|s| s as i64),
1900 },
1901 );
1902 }
1903
1904 let absolute_deadline =
1908 std::time::Instant::now() + std::time::Duration::from_secs(timeout_seconds.unwrap_or(3600));
1909 loop {
1910 let now_ts = chrono::Utc::now();
1911 let snapshot = {
1912 let accounts = shared_state.read();
1913 accounts
1914 .get(&activity_account)
1915 .and_then(|s| s.task_tokens.get(&token).cloned())
1916 };
1917 let Some(entry) = snapshot else {
1918 return Err((
1919 "States.TaskFailed".to_string(),
1920 "Activity task token disappeared".to_string(),
1921 ));
1922 };
1923 match entry.status.as_str() {
1924 "SUCCEEDED" => {
1925 cleanup_token(shared_state, &activity_account, &token);
1926 let output = entry.output.unwrap_or_else(|| "{}".to_string());
1927 let value: Value = serde_json::from_str(&output).unwrap_or(Value::String(output));
1928 return Ok(value);
1929 }
1930 "FAILED" => {
1931 cleanup_token(shared_state, &activity_account, &token);
1932 return Err((
1933 entry
1934 .error
1935 .unwrap_or_else(|| "States.TaskFailed".to_string()),
1936 entry.cause.unwrap_or_default(),
1937 ));
1938 }
1939 _ => {}
1940 }
1941 if entry.status == "IN_PROGRESS" {
1944 if let Some(hb) = entry.heartbeat_seconds {
1945 let last = entry.last_heartbeat_at.unwrap_or(entry.created_at);
1946 if (now_ts - last).num_seconds() > hb {
1947 cleanup_token(shared_state, &activity_account, &token);
1948 return Err((
1949 "States.HeartbeatTimeout".to_string(),
1950 format!("Activity worker missed heartbeat ({hb}s window)"),
1951 ));
1952 }
1953 }
1954 }
1955 if std::time::Instant::now() >= absolute_deadline {
1956 cleanup_token(shared_state, &activity_account, &token);
1957 let secs = timeout_seconds.unwrap_or(3600);
1958 return Err((
1959 "States.Timeout".to_string(),
1960 format!("Activity timed out after {secs} seconds"),
1961 ));
1962 }
1963 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1964 }
1965}
1966
1967fn cleanup_token(shared_state: &SharedStepFunctionsState, account_id: &str, token: &str) {
1968 let mut accounts = shared_state.write();
1969 if let Some(state) = accounts.get_mut(account_id) {
1970 state.task_tokens.remove(token);
1971 }
1972}
1973
1974fn apply_parameters(template: &Value, input: &Value) -> Value {
1976 match template {
1977 Value::Object(map) => {
1978 let mut result = serde_json::Map::new();
1979 for (key, value) in map {
1980 if let Some(stripped) = key.strip_suffix(".$") {
1981 if let Some(path) = value.as_str() {
1982 result.insert(
1983 stripped.to_string(),
1984 crate::io_processing::resolve_path(input, path),
1985 );
1986 }
1987 } else {
1988 result.insert(key.clone(), apply_parameters(value, input));
1989 }
1990 }
1991 Value::Object(result)
1992 }
1993 Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1994 other => other.clone(),
1995 }
1996}
1997
1998enum NextState {
1999 Name(String),
2000 End,
2001 Error(String),
2002}
2003
2004fn next_state(state_def: &Value) -> NextState {
2005 if state_def["End"].as_bool() == Some(true) {
2006 return NextState::End;
2007 }
2008 match state_def["Next"].as_str() {
2009 Some(next) => NextState::Name(next.to_string()),
2010 None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
2011 }
2012}
2013
2014fn apply_state_catcher(
2019 state_def: &Value,
2020 effective_input: &Value,
2021 error: &str,
2022 cause: &str,
2023) -> Option<(String, Value)> {
2024 let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
2025 let (next, result_path) = find_catcher(&catchers, error)?;
2026 let error_output = json!({
2027 "Error": error,
2028 "Cause": cause,
2029 });
2030 let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
2031 Some((next, new_input))
2032}
2033
2034fn account_id_from_arn(arn: &str) -> &str {
2036 arn.split(':').nth(4).unwrap_or("000000000000")
2037}
2038
2039fn add_event(
2040 state: &SharedStepFunctionsState,
2041 execution_arn: &str,
2042 event_type: &str,
2043 previous_event_id: i64,
2044 details: Value,
2045) -> i64 {
2046 let account_id = account_id_from_arn(execution_arn).to_string();
2047 let mut accounts = state.write();
2048 let s = accounts.get_or_create(&account_id);
2049 if let Some(exec) = s.executions.get_mut(execution_arn) {
2050 let id = exec.history_events.len() as i64 + 1;
2051 exec.history_events.push(HistoryEvent {
2052 id,
2053 event_type: event_type.to_string(),
2054 timestamp: Utc::now(),
2055 previous_event_id,
2056 details,
2057 });
2058 id
2059 } else {
2060 0
2061 }
2062}
2063
2064fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
2065 let account_id = account_id_from_arn(execution_arn).to_string();
2066 {
2068 let accounts = state.read();
2069 if let Some(s) = accounts.get(&account_id) {
2070 if let Some(exec) = s.executions.get(execution_arn) {
2071 if exec.status != ExecutionStatus::Running {
2072 return;
2073 }
2074 }
2075 }
2076 }
2077
2078 let output_str =
2079 serde_json::to_string(output).expect("serde_json::Value serialization is infallible");
2080
2081 add_event(
2082 state,
2083 execution_arn,
2084 "ExecutionSucceeded",
2085 0,
2086 json!({ "output": output_str }),
2087 );
2088
2089 let mut accounts = state.write();
2090 let s = accounts.get_or_create(&account_id);
2091 if let Some(exec) = s.executions.get_mut(execution_arn) {
2092 exec.status = ExecutionStatus::Succeeded;
2093 exec.output = Some(output_str);
2094 exec.stop_date = Some(Utc::now());
2095 }
2096}
2097
2098fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
2099 let account_id = account_id_from_arn(execution_arn).to_string();
2100 {
2102 let accounts = state.read();
2103 if let Some(s) = accounts.get(&account_id) {
2104 if let Some(exec) = s.executions.get(execution_arn) {
2105 if exec.status != ExecutionStatus::Running {
2106 return;
2107 }
2108 }
2109 }
2110 }
2111
2112 add_event(
2113 state,
2114 execution_arn,
2115 "ExecutionFailed",
2116 0,
2117 json!({ "error": error, "cause": cause }),
2118 );
2119
2120 let mut accounts = state.write();
2121 let s = accounts.get_or_create(&account_id);
2122 if let Some(exec) = s.executions.get_mut(execution_arn) {
2123 exec.status = ExecutionStatus::Failed;
2124 exec.error = Some(error.to_string());
2125 exec.cause = Some(cause.to_string());
2126 exec.stop_date = Some(Utc::now());
2127 }
2128}
2129
2130#[cfg(test)]
2131mod tests {
2132 use super::*;
2133 use crate::state::Execution;
2134 use parking_lot::RwLock;
2135 use std::sync::Arc;
2136
2137 fn make_state() -> SharedStepFunctionsState {
2138 Arc::new(RwLock::new(
2139 fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2140 ))
2141 }
2142
2143 fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
2144 let mut accounts = state.write();
2145 let s = accounts.get_or_create("123456789012");
2146 s.executions.insert(
2147 arn.to_string(),
2148 Execution {
2149 execution_arn: arn.to_string(),
2150 state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
2151 .to_string(),
2152 state_machine_name: "test".to_string(),
2153 name: "exec-1".to_string(),
2154 status: ExecutionStatus::Running,
2155 input,
2156 output: None,
2157 start_date: Utc::now(),
2158 stop_date: None,
2159 error: None,
2160 cause: None,
2161 history_events: vec![],
2162 },
2163 );
2164 }
2165
2166 #[tokio::test]
2167 async fn test_simple_pass_state() {
2168 let state = make_state();
2169 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2170 create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
2171
2172 let definition = json!({
2173 "StartAt": "PassState",
2174 "States": {
2175 "PassState": {
2176 "Type": "Pass",
2177 "Result": {"processed": true},
2178 "End": true
2179 }
2180 }
2181 })
2182 .to_string();
2183
2184 execute_state_machine(
2185 state.clone(),
2186 arn.to_string(),
2187 definition,
2188 Some(r#"{"hello":"world"}"#.to_string()),
2189 None,
2190 None,
2191 )
2192 .await;
2193
2194 let __a = state.read();
2195 let s = __a.default_ref();
2196 let exec = s.executions.get(arn).unwrap();
2197 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2198 assert!(exec.output.is_some());
2199 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2200 assert_eq!(output, json!({"processed": true}));
2201 }
2202
2203 #[tokio::test]
2204 async fn test_pass_chain() {
2205 let state = make_state();
2206 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2207 create_execution(&state, arn, Some(r#"{}"#.to_string()));
2208
2209 let definition = json!({
2210 "StartAt": "First",
2211 "States": {
2212 "First": {
2213 "Type": "Pass",
2214 "Result": "step1",
2215 "ResultPath": "$.first",
2216 "Next": "Second"
2217 },
2218 "Second": {
2219 "Type": "Pass",
2220 "Result": "step2",
2221 "ResultPath": "$.second",
2222 "End": true
2223 }
2224 }
2225 })
2226 .to_string();
2227
2228 execute_state_machine(
2229 state.clone(),
2230 arn.to_string(),
2231 definition,
2232 Some("{}".to_string()),
2233 None,
2234 None,
2235 )
2236 .await;
2237
2238 let __a = state.read();
2239 let s = __a.default_ref();
2240 let exec = s.executions.get(arn).unwrap();
2241 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2242 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2243 assert_eq!(output["first"], json!("step1"));
2244 assert_eq!(output["second"], json!("step2"));
2245 }
2246
2247 #[tokio::test]
2248 async fn test_succeed_state() {
2249 let state = make_state();
2250 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2251 create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
2252
2253 let definition = json!({
2254 "StartAt": "Done",
2255 "States": {
2256 "Done": {
2257 "Type": "Succeed"
2258 }
2259 }
2260 })
2261 .to_string();
2262
2263 execute_state_machine(
2264 state.clone(),
2265 arn.to_string(),
2266 definition,
2267 Some(r#"{"data": "value"}"#.to_string()),
2268 None,
2269 None,
2270 )
2271 .await;
2272
2273 let __a = state.read();
2274 let s = __a.default_ref();
2275 let exec = s.executions.get(arn).unwrap();
2276 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2277 }
2278
2279 #[tokio::test]
2280 async fn test_fail_state() {
2281 let state = make_state();
2282 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2283 create_execution(&state, arn, None);
2284
2285 let definition = json!({
2286 "StartAt": "FailState",
2287 "States": {
2288 "FailState": {
2289 "Type": "Fail",
2290 "Error": "CustomError",
2291 "Cause": "Something went wrong"
2292 }
2293 }
2294 })
2295 .to_string();
2296
2297 execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
2298
2299 let __a = state.read();
2300 let s = __a.default_ref();
2301 let exec = s.executions.get(arn).unwrap();
2302 assert_eq!(exec.status, ExecutionStatus::Failed);
2303 assert_eq!(exec.error.as_deref(), Some("CustomError"));
2304 assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
2305 }
2306
2307 #[tokio::test]
2308 async fn test_history_events_recorded() {
2309 let state = make_state();
2310 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2311 create_execution(&state, arn, Some("{}".to_string()));
2312
2313 let definition = json!({
2314 "StartAt": "PassState",
2315 "States": {
2316 "PassState": {
2317 "Type": "Pass",
2318 "End": true
2319 }
2320 }
2321 })
2322 .to_string();
2323
2324 execute_state_machine(
2325 state.clone(),
2326 arn.to_string(),
2327 definition,
2328 Some("{}".to_string()),
2329 None,
2330 None,
2331 )
2332 .await;
2333
2334 let __a = state.read();
2335 let s = __a.default_ref();
2336 let exec = s.executions.get(arn).unwrap();
2337 let event_types: Vec<&str> = exec
2338 .history_events
2339 .iter()
2340 .map(|e| e.event_type.as_str())
2341 .collect();
2342 assert_eq!(
2343 event_types,
2344 vec![
2345 "ExecutionStarted",
2346 "PassStateEntered",
2347 "PassStateExited",
2348 "ExecutionSucceeded"
2349 ]
2350 );
2351 }
2352
2353 fn drive(state: &SharedStepFunctionsState, arn: &str, def: Value, input: Option<&str>) {
2354 create_execution(state, arn, input.map(|s| s.to_string()));
2355 let fut = execute_state_machine(
2356 state.clone(),
2357 arn.to_string(),
2358 def.to_string(),
2359 input.map(|s| s.to_string()),
2360 None,
2361 None,
2362 );
2363 let rt = tokio::runtime::Builder::new_current_thread()
2364 .enable_time()
2365 .build()
2366 .unwrap();
2367 rt.block_on(fut);
2368 }
2369
2370 fn read_exec<R>(
2371 state: &SharedStepFunctionsState,
2372 arn: &str,
2373 f: impl FnOnce(&Execution) -> R,
2374 ) -> R {
2375 let __a = state.read();
2376 let s = __a.default_ref();
2377 f(s.executions.get(arn).expect("execution missing"))
2378 }
2379
2380 fn arn_for(name: &str) -> String {
2381 format!("arn:aws:states:us-east-1:123456789012:execution:test:{name}")
2382 }
2383
2384 #[test]
2387 fn pass_state_input_output_path_select_fields() {
2388 let state = make_state();
2389 let arn = arn_for("pass-paths");
2390 let def = json!({
2391 "StartAt": "P",
2392 "States": {
2393 "P": {
2394 "Type": "Pass",
2395 "InputPath": "$.inner",
2396 "OutputPath": "$.kept",
2397 "End": true
2398 }
2399 }
2400 });
2401 drive(
2402 &state,
2403 &arn,
2404 def,
2405 Some(r#"{"inner":{"kept":"yes","dropped":true},"sibling":1}"#),
2406 );
2407
2408 read_exec(&state, &arn, |exec| {
2409 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2410 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2411 assert_eq!(output, json!("yes"));
2412 });
2413 }
2414
2415 #[test]
2418 fn succeed_state_honors_input_path_null() {
2419 let state = make_state();
2420 let arn = arn_for("succeed-null");
2421 let def = json!({
2422 "StartAt": "S",
2423 "States": { "S": { "Type": "Succeed", "InputPath": "null" } }
2424 });
2425 drive(&state, &arn, def, Some(r#"{"a":1}"#));
2426
2427 read_exec(&state, &arn, |exec| {
2428 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2429 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2430 assert_eq!(output, json!({}));
2431 });
2432 }
2433
2434 #[test]
2435 fn fail_state_defaults_when_fields_missing() {
2436 let state = make_state();
2437 let arn = arn_for("fail-default");
2438 let def = json!({
2439 "StartAt": "F",
2440 "States": { "F": { "Type": "Fail" } }
2441 });
2442 drive(&state, &arn, def, None);
2443
2444 read_exec(&state, &arn, |exec| {
2445 assert_eq!(exec.status, ExecutionStatus::Failed);
2446 assert_eq!(exec.error.as_deref(), Some("States.Fail"));
2447 assert_eq!(exec.cause.as_deref(), Some(""));
2448 });
2449 }
2450
2451 fn choice_def() -> Value {
2454 json!({
2455 "StartAt": "C",
2456 "States": {
2457 "C": {
2458 "Type": "Choice",
2459 "Choices": [
2460 {
2461 "Variable": "$.n",
2462 "NumericGreaterThan": 10,
2463 "Next": "Big"
2464 }
2465 ],
2466 "Default": "Small"
2467 },
2468 "Big": { "Type": "Pass", "Result": "big", "End": true },
2469 "Small": { "Type": "Pass", "Result": "small", "End": true }
2470 }
2471 })
2472 }
2473
2474 #[test]
2475 fn choice_routes_to_matching_branch() {
2476 let state = make_state();
2477 let arn = arn_for("choice-big");
2478 drive(&state, &arn, choice_def(), Some(r#"{"n":42}"#));
2479
2480 read_exec(&state, &arn, |exec| {
2481 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2482 assert_eq!(
2483 serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2484 json!("big")
2485 );
2486 });
2487 }
2488
2489 #[test]
2490 fn choice_falls_through_to_default() {
2491 let state = make_state();
2492 let arn = arn_for("choice-default");
2493 drive(&state, &arn, choice_def(), Some(r#"{"n":3}"#));
2494
2495 read_exec(&state, &arn, |exec| {
2496 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2497 assert_eq!(
2498 serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2499 json!("small")
2500 );
2501 });
2502 }
2503
2504 #[test]
2505 fn choice_no_match_and_no_default_fails() {
2506 let state = make_state();
2507 let arn = arn_for("choice-nomatch");
2508 let def = json!({
2509 "StartAt": "C",
2510 "States": {
2511 "C": {
2512 "Type": "Choice",
2513 "Choices": [
2514 { "Variable": "$.n", "NumericEquals": 1, "Next": "End1" }
2515 ]
2516 },
2517 "End1": { "Type": "Pass", "End": true }
2518 }
2519 });
2520 drive(&state, &arn, def, Some(r#"{"n":99}"#));
2521
2522 read_exec(&state, &arn, |exec| {
2523 assert_eq!(exec.status, ExecutionStatus::Failed);
2524 assert_eq!(exec.error.as_deref(), Some("States.NoChoiceMatched"));
2525 });
2526 }
2527
2528 #[test]
2531 fn wait_seconds_then_advances() {
2532 let state = make_state();
2533 let arn = arn_for("wait-secs");
2534 let def = json!({
2535 "StartAt": "W",
2536 "States": {
2537 "W": { "Type": "Wait", "Seconds": 0, "Next": "Done" },
2538 "Done": { "Type": "Succeed" }
2539 }
2540 });
2541 drive(&state, &arn, def, Some(r#"{"k":1}"#));
2542
2543 read_exec(&state, &arn, |exec| {
2544 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2545 });
2546 }
2547
2548 #[test]
2549 fn wait_timestamp_in_past_is_noop() {
2550 let state = make_state();
2551 let arn = arn_for("wait-past");
2552 let def = json!({
2553 "StartAt": "W",
2554 "States": {
2555 "W": {
2556 "Type": "Wait",
2557 "Timestamp": "2000-01-01T00:00:00Z",
2558 "End": true
2559 }
2560 }
2561 });
2562 drive(&state, &arn, def, Some(r#"{"k":1}"#));
2563
2564 read_exec(&state, &arn, |exec| {
2565 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2566 });
2567 }
2568
2569 #[test]
2570 fn wait_without_any_duration_falls_through() {
2571 let state = make_state();
2572 let arn = arn_for("wait-none");
2573 let def = json!({
2574 "StartAt": "W",
2575 "States": { "W": { "Type": "Wait", "End": true } }
2576 });
2577 drive(&state, &arn, def, None);
2578
2579 read_exec(&state, &arn, |exec| {
2580 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2581 });
2582 }
2583
2584 #[test]
2587 fn parallel_runs_two_pass_branches_and_collects_results() {
2588 let state = make_state();
2589 let arn = arn_for("parallel-ok");
2590 let def = json!({
2591 "StartAt": "P",
2592 "States": {
2593 "P": {
2594 "Type": "Parallel",
2595 "End": true,
2596 "Branches": [
2597 {
2598 "StartAt": "A",
2599 "States": { "A": { "Type": "Pass", "Result": "a", "End": true } }
2600 },
2601 {
2602 "StartAt": "B",
2603 "States": { "B": { "Type": "Pass", "Result": "b", "End": true } }
2604 }
2605 ]
2606 }
2607 }
2608 });
2609 drive(&state, &arn, def, Some(r#"{}"#));
2610
2611 read_exec(&state, &arn, |exec| {
2612 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2613 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2614 assert_eq!(output, json!(["a", "b"]));
2615 });
2616 }
2617
2618 #[test]
2619 fn parallel_empty_branches_fails() {
2620 let state = make_state();
2621 let arn = arn_for("parallel-empty");
2622 let def = json!({
2623 "StartAt": "P",
2624 "States": { "P": { "Type": "Parallel", "Branches": [], "End": true } }
2625 });
2626 drive(&state, &arn, def, None);
2627
2628 read_exec(&state, &arn, |exec| {
2629 assert_eq!(exec.status, ExecutionStatus::Failed);
2630 assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2631 });
2632 }
2633
2634 #[test]
2637 fn map_iterates_pass_state_over_items_path() {
2638 let state = make_state();
2639 let arn = arn_for("map-ok");
2640 let def = json!({
2641 "StartAt": "M",
2642 "States": {
2643 "M": {
2644 "Type": "Map",
2645 "ItemsPath": "$.items",
2646 "Iterator": {
2647 "StartAt": "Item",
2648 "States": { "Item": { "Type": "Pass", "End": true } }
2649 },
2650 "End": true
2651 }
2652 }
2653 });
2654 drive(&state, &arn, def, Some(r#"{"items":[1,2,3]}"#));
2655
2656 read_exec(&state, &arn, |exec| {
2657 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2658 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2659 assert_eq!(output, json!([1, 2, 3]));
2660 });
2661 }
2662
2663 #[test]
2666 fn task_unsupported_resource_propagates_failure() {
2667 let state = make_state();
2668 let arn = arn_for("task-unsupported");
2669 let def = json!({
2670 "StartAt": "T",
2671 "States": {
2672 "T": {
2673 "Type": "Task",
2674 "Resource": "arn:aws:states:::nothing:here",
2675 "End": true
2676 }
2677 }
2678 });
2679 drive(&state, &arn, def, None);
2680
2681 read_exec(&state, &arn, |exec| {
2682 assert_eq!(exec.status, ExecutionStatus::Failed);
2683 assert_eq!(exec.error.as_deref(), Some("States.TaskFailed"));
2684 assert!(exec.cause.as_deref().unwrap().contains("Unsupported"));
2685 });
2686 }
2687
2688 #[test]
2689 fn task_sqs_send_without_delivery_fails() {
2690 let state = make_state();
2691 let arn = arn_for("task-sqs-nodelivery");
2692 let def = json!({
2693 "StartAt": "T",
2694 "States": {
2695 "T": {
2696 "Type": "Task",
2697 "Resource": "arn:aws:states:::sqs:sendMessage",
2698 "Parameters": { "QueueUrl": "http://localhost/123/q", "MessageBody": "hi" },
2699 "End": true
2700 }
2701 }
2702 });
2703 drive(&state, &arn, def, Some("{}"));
2704
2705 read_exec(&state, &arn, |exec| {
2706 assert_eq!(exec.status, ExecutionStatus::Failed);
2707 assert!(exec.cause.as_deref().unwrap().contains("delivery bus"));
2708 });
2709 }
2710
2711 #[test]
2714 fn task_catch_routes_error_into_handler() {
2715 let state = make_state();
2716 let arn = arn_for("task-catch");
2717 let def = json!({
2718 "StartAt": "T",
2719 "States": {
2720 "T": {
2721 "Type": "Task",
2722 "Resource": "arn:aws:states:::nothing:here",
2723 "Catch": [
2724 { "ErrorEquals": ["States.ALL"], "Next": "Handler", "ResultPath": "$.err" }
2725 ],
2726 "End": true
2727 },
2728 "Handler": { "Type": "Pass", "End": true }
2729 }
2730 });
2731 drive(&state, &arn, def, Some(r#"{"orig":"v"}"#));
2732
2733 read_exec(&state, &arn, |exec| {
2734 assert_eq!(exec.status, ExecutionStatus::Succeeded);
2735 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2736 assert_eq!(output["orig"], json!("v"));
2738 assert_eq!(output["err"]["Error"], json!("States.TaskFailed"));
2739 });
2740 }
2741
2742 #[test]
2745 fn invalid_definition_json_fails_execution() {
2746 let state = make_state();
2747 let arn = arn_for("bad-json");
2748 create_execution(&state, &arn, None);
2749 let rt = tokio::runtime::Builder::new_current_thread()
2750 .enable_time()
2751 .build()
2752 .unwrap();
2753 rt.block_on(execute_state_machine(
2754 state.clone(),
2755 arn.clone(),
2756 "not json{".to_string(),
2757 None,
2758 None,
2759 None,
2760 ));
2761
2762 read_exec(&state, &arn, |exec| {
2763 assert_eq!(exec.status, ExecutionStatus::Failed);
2764 assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2765 assert!(exec.cause.as_deref().unwrap().contains("Failed to parse"));
2766 });
2767 }
2768
2769 #[test]
2770 fn missing_start_at_fails_execution() {
2771 let state = make_state();
2772 let arn = arn_for("no-startat");
2773 let def = json!({ "States": { "X": { "Type": "Succeed" } } });
2774 drive(&state, &arn, def, None);
2775
2776 read_exec(&state, &arn, |exec| {
2777 assert_eq!(exec.status, ExecutionStatus::Failed);
2778 assert!(exec.cause.as_deref().unwrap().contains("StartAt"));
2779 });
2780 }
2781
2782 #[test]
2783 fn next_state_not_found_fails_execution() {
2784 let state = make_state();
2785 let arn = arn_for("dangling-next");
2786 let def = json!({
2787 "StartAt": "A",
2788 "States": { "A": { "Type": "Pass", "Next": "DoesNotExist" } }
2789 });
2790 drive(&state, &arn, def, None);
2791
2792 read_exec(&state, &arn, |exec| {
2793 assert_eq!(exec.status, ExecutionStatus::Failed);
2794 assert!(exec.cause.as_deref().unwrap().contains("not found"));
2795 });
2796 }
2797
2798 #[test]
2799 fn unsupported_state_type_fails_execution() {
2800 let state = make_state();
2801 let arn = arn_for("bad-type");
2802 let def = json!({
2803 "StartAt": "X",
2804 "States": { "X": { "Type": "WatChoo", "End": true } }
2805 });
2806 drive(&state, &arn, def, None);
2807
2808 read_exec(&state, &arn, |exec| {
2809 assert_eq!(exec.status, ExecutionStatus::Failed);
2810 assert!(exec
2811 .cause
2812 .as_deref()
2813 .unwrap()
2814 .contains("Unsupported state type"));
2815 });
2816 }
2817
2818 #[test]
2821 fn apply_parameters_substitutes_json_path_refs() {
2822 let template = json!({
2823 "literal": "constant",
2824 "ref.$": "$.user.id",
2825 "nested": { "inner.$": "$.user.name" },
2826 "list": [ { "x.$": "$.user.id" } ]
2827 });
2828 let input = json!({ "user": { "id": 42, "name": "zoe" } });
2829 let out = apply_parameters(&template, &input);
2830 assert_eq!(out["literal"], json!("constant"));
2831 assert_eq!(out["ref"], json!(42));
2832 assert_eq!(out["nested"]["inner"], json!("zoe"));
2833 assert_eq!(out["list"][0]["x"], json!(42));
2834 }
2835
2836 #[test]
2837 fn next_state_returns_end_name_or_error() {
2838 match next_state(&json!({ "End": true })) {
2839 NextState::End => {}
2840 _ => panic!("expected End"),
2841 }
2842 match next_state(&json!({ "Next": "A" })) {
2843 NextState::Name(n) => assert_eq!(n, "A"),
2844 _ => panic!("expected Name"),
2845 }
2846 match next_state(&json!({})) {
2847 NextState::Error(_) => {}
2848 _ => panic!("expected Error"),
2849 }
2850 }
2851
2852 #[test]
2853 fn apply_state_catcher_matches_wildcard_and_stashes_error() {
2854 let state_def = json!({
2855 "Catch": [
2856 { "ErrorEquals": ["States.ALL"], "Next": "H", "ResultPath": "$.caught" }
2857 ]
2858 });
2859 let input = json!({ "a": 1 });
2860 let (next, new_input) =
2861 apply_state_catcher(&state_def, &input, "Boom", "it exploded").unwrap();
2862 assert_eq!(next, "H");
2863 assert_eq!(new_input["a"], json!(1));
2864 assert_eq!(new_input["caught"]["Error"], json!("Boom"));
2865 assert_eq!(new_input["caught"]["Cause"], json!("it exploded"));
2866 }
2867
2868 #[test]
2869 fn apply_state_catcher_returns_none_without_match() {
2870 let state_def = json!({
2871 "Catch": [
2872 { "ErrorEquals": ["Specific.Error"], "Next": "H" }
2873 ]
2874 });
2875 let input = json!({});
2876 assert!(apply_state_catcher(&state_def, &input, "Other", "why").is_none());
2877 }
2878
2879 #[test]
2880 fn queue_url_to_arn_parses_account_and_name() {
2881 assert_eq!(
2882 queue_url_to_arn("http://sqs.local:4566/123456789012/my-queue"),
2883 "arn:aws:sqs:us-east-1:123456789012:my-queue"
2884 );
2885 }
2886
2887 #[test]
2888 fn queue_url_to_arn_falls_back_for_unparseable_input() {
2889 assert_eq!(queue_url_to_arn("bad"), "bad");
2890 }
2891
2892 #[test]
2893 fn md5_hex_is_deterministic_and_32_chars() {
2894 let a = md5_hex("hello");
2895 let b = md5_hex("hello");
2896 assert_eq!(a, b);
2897 assert_eq!(a.len(), 32);
2898 assert_ne!(a, md5_hex("world"));
2899 }
2900
2901 #[test]
2902 fn apply_update_expression_sets_direct_and_aliased_attrs() {
2903 let mut item: HashMap<String, Value> = HashMap::new();
2904 item.insert("id".to_string(), json!({"S": "1"}));
2905
2906 let mut attr_values = serde_json::Map::new();
2907 attr_values.insert(":n".to_string(), json!({"S": "Alice"}));
2908 attr_values.insert(":c".to_string(), json!({"N": "5"}));
2909
2910 let mut attr_names = serde_json::Map::new();
2911 attr_names.insert("#name".to_string(), json!("name"));
2912
2913 apply_update_expression(
2914 &mut item,
2915 "SET #name = :n, count = :c",
2916 &attr_values,
2917 &attr_names,
2918 );
2919
2920 assert_eq!(item.get("name").unwrap(), &json!({"S": "Alice"}));
2921 assert_eq!(item.get("count").unwrap(), &json!({"N": "5"}));
2922 assert_eq!(item.get("id").unwrap(), &json!({"S": "1"}));
2923 }
2924
2925 #[test]
2926 fn apply_update_expression_accepts_lowercase_set_keyword() {
2927 let mut item: HashMap<String, Value> = HashMap::new();
2928 let mut attr_values = serde_json::Map::new();
2929 attr_values.insert(":v".to_string(), json!({"S": "x"}));
2930 apply_update_expression(
2931 &mut item,
2932 "set field = :v",
2933 &attr_values,
2934 &serde_json::Map::new(),
2935 );
2936 assert_eq!(item.get("field").unwrap(), &json!({"S": "x"}));
2937 }
2938
2939 #[test]
2940 fn apply_update_expression_set_arithmetic_increments_counter() {
2941 let mut item: HashMap<String, Value> = HashMap::new();
2942 item.insert("count".to_string(), json!({"N": "10"}));
2943 let mut attr_values = serde_json::Map::new();
2944 attr_values.insert(":inc".to_string(), json!({"N": "3"}));
2945 apply_update_expression(
2946 &mut item,
2947 "SET count = count + :inc",
2948 &attr_values,
2949 &serde_json::Map::new(),
2950 );
2951 assert_eq!(item.get("count").unwrap(), &json!({"N": "13"}));
2952 }
2953
2954 #[test]
2955 fn apply_update_expression_set_decrement() {
2956 let mut item: HashMap<String, Value> = HashMap::new();
2957 item.insert("count".to_string(), json!({"N": "10"}));
2958 let mut attr_values = serde_json::Map::new();
2959 attr_values.insert(":d".to_string(), json!({"N": "4"}));
2960 apply_update_expression(
2961 &mut item,
2962 "SET count = count - :d",
2963 &attr_values,
2964 &serde_json::Map::new(),
2965 );
2966 assert_eq!(item.get("count").unwrap(), &json!({"N": "6"}));
2967 }
2968
2969 #[test]
2970 fn apply_update_expression_remove_drops_attributes() {
2971 let mut item: HashMap<String, Value> = HashMap::new();
2972 item.insert("a".to_string(), json!({"S": "x"}));
2973 item.insert("b".to_string(), json!({"S": "y"}));
2974 item.insert("c".to_string(), json!({"S": "z"}));
2975 apply_update_expression(
2976 &mut item,
2977 "REMOVE a, c",
2978 &serde_json::Map::new(),
2979 &serde_json::Map::new(),
2980 );
2981 assert!(!item.contains_key("a"));
2982 assert!(item.contains_key("b"));
2983 assert!(!item.contains_key("c"));
2984 }
2985
2986 #[test]
2987 fn apply_update_expression_add_increments_existing_or_initializes() {
2988 let mut item: HashMap<String, Value> = HashMap::new();
2990 item.insert("count".to_string(), json!({"N": "5"}));
2991 let mut attr_values = serde_json::Map::new();
2992 attr_values.insert(":inc".to_string(), json!({"N": "2"}));
2993 apply_update_expression(
2994 &mut item,
2995 "ADD count :inc",
2996 &attr_values,
2997 &serde_json::Map::new(),
2998 );
2999 assert_eq!(item.get("count").unwrap(), &json!({"N": "7"}));
3000
3001 let mut item2: HashMap<String, Value> = HashMap::new();
3003 apply_update_expression(
3004 &mut item2,
3005 "ADD count :inc",
3006 &attr_values,
3007 &serde_json::Map::new(),
3008 );
3009 assert_eq!(item2.get("count").unwrap(), &json!({"N": "2"}));
3010 }
3011
3012 #[test]
3013 fn apply_update_expression_delete_removes_set_elements() {
3014 let mut item: HashMap<String, Value> = HashMap::new();
3015 item.insert("tags".to_string(), json!({"SS": ["a", "b", "c"]}));
3016 let mut attr_values = serde_json::Map::new();
3017 attr_values.insert(":rm".to_string(), json!({"SS": ["b"]}));
3018 apply_update_expression(
3019 &mut item,
3020 "DELETE tags :rm",
3021 &attr_values,
3022 &serde_json::Map::new(),
3023 );
3024 assert_eq!(item.get("tags").unwrap(), &json!({"SS": ["a", "c"]}));
3025 }
3026
3027 #[test]
3028 fn apply_update_expression_if_not_exists_initializes_only_when_absent() {
3029 let mut item: HashMap<String, Value> = HashMap::new();
3031 let mut attr_values = serde_json::Map::new();
3032 attr_values.insert(":zero".to_string(), json!({"N": "0"}));
3033 apply_update_expression(
3034 &mut item,
3035 "SET count = if_not_exists(count, :zero)",
3036 &attr_values,
3037 &serde_json::Map::new(),
3038 );
3039 assert_eq!(item.get("count").unwrap(), &json!({"N": "0"}));
3040
3041 let mut item2: HashMap<String, Value> = HashMap::new();
3043 item2.insert("count".to_string(), json!({"N": "42"}));
3044 apply_update_expression(
3045 &mut item2,
3046 "SET count = if_not_exists(count, :zero)",
3047 &attr_values,
3048 &serde_json::Map::new(),
3049 );
3050 assert_eq!(item2.get("count").unwrap(), &json!({"N": "42"}));
3051 }
3052
3053 #[test]
3054 fn apply_update_expression_combines_clauses() {
3055 let mut item: HashMap<String, Value> = HashMap::new();
3056 item.insert("a".to_string(), json!({"S": "old"}));
3057 item.insert("b".to_string(), json!({"N": "1"}));
3058 item.insert("c".to_string(), json!({"S": "drop"}));
3059 let mut attr_values = serde_json::Map::new();
3060 attr_values.insert(":new".to_string(), json!({"S": "new"}));
3061 attr_values.insert(":one".to_string(), json!({"N": "1"}));
3062 apply_update_expression(
3063 &mut item,
3064 "SET a = :new ADD b :one REMOVE c",
3065 &attr_values,
3066 &serde_json::Map::new(),
3067 );
3068 assert_eq!(item.get("a").unwrap(), &json!({"S": "new"}));
3069 assert_eq!(item.get("b").unwrap(), &json!({"N": "2"}));
3070 assert!(!item.contains_key("c"));
3071 }
3072
3073 #[test]
3076 fn task_dynamodb_get_item_without_state_fails() {
3077 let state = make_state();
3078 let arn = arn_for("ddb-get-nostate");
3079 let def = json!({
3080 "StartAt": "T",
3081 "States": {
3082 "T": {
3083 "Type": "Task",
3084 "Resource": "arn:aws:states:::dynamodb:getItem",
3085 "Parameters": { "TableName": "t", "Key": { "id": { "S": "1" } } },
3086 "End": true
3087 }
3088 }
3089 });
3090 drive(&state, &arn, def, Some("{}"));
3091 read_exec(&state, &arn, |exec| {
3092 assert_eq!(exec.status, ExecutionStatus::Failed);
3093 assert!(exec.cause.as_deref().unwrap().contains("DynamoDB"));
3094 });
3095 }
3096
3097 #[test]
3100 fn succeed_execution_is_noop_when_already_terminal() {
3101 let state = make_state();
3102 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already";
3103 create_execution(&state, arn, None);
3104 {
3105 let mut __a = state.write();
3106 let s = __a.default_mut();
3107 s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Failed;
3108 }
3109 succeed_execution(&state, arn, &json!({"x":1}));
3110 let __a = state.read();
3111 let s = __a.default_ref();
3112 let exec = s.executions.get(arn).unwrap();
3113 assert_eq!(exec.status, ExecutionStatus::Failed);
3114 assert!(exec.output.is_none());
3115 }
3116
3117 #[test]
3118 fn fail_execution_is_noop_when_already_terminal() {
3119 let state = make_state();
3120 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already2";
3121 create_execution(&state, arn, None);
3122 {
3123 let mut __a = state.write();
3124 let s = __a.default_mut();
3125 s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Succeeded;
3126 }
3127 fail_execution(&state, arn, "Oops", "nope");
3128 let __a = state.read();
3129 let s = __a.default_ref();
3130 let exec = s.executions.get(arn).unwrap();
3131 assert_eq!(exec.status, ExecutionStatus::Succeeded);
3132 assert!(exec.error.is_none());
3133 }
3134
3135 #[test]
3138 fn pass_state_result_path_merges_into_input() {
3139 let state = make_state();
3140 let arn = arn_for("result-path");
3141 let def = json!({
3142 "StartAt": "P",
3143 "States": {
3144 "P": {"Type": "Pass", "Result": {"x": 2}, "ResultPath": "$.data", "End": true}
3145 }
3146 });
3147 drive(&state, &arn, def, Some(r#"{"a":1}"#));
3148 let output = read_exec(&state, &arn, |e| e.output.clone().unwrap_or_default());
3149 let v: Value = serde_json::from_str(&output).unwrap();
3150 assert_eq!(v["a"], 1);
3151 assert_eq!(v["data"]["x"], 2);
3152 }
3153
3154 #[test]
3157 fn choice_string_greater_than_equals() {
3158 let state = make_state();
3159 let arn = arn_for("choice-sgte");
3160 let def = json!({
3161 "StartAt": "C",
3162 "States": {
3163 "C": {
3164 "Type": "Choice",
3165 "Choices": [
3166 {"Variable": "$.val", "StringGreaterThanEquals": "apple", "Next": "End"}
3167 ],
3168 "Default": "Fail"
3169 },
3170 "End": {"Type": "Pass", "End": true},
3171 "Fail": {"Type": "Fail"}
3172 }
3173 });
3174 drive(&state, &arn, def, Some(r#"{"val":"banana"}"#));
3175 let status = read_exec(&state, &arn, |e| e.status);
3176 assert_eq!(status, ExecutionStatus::Succeeded);
3177 }
3178
3179 #[test]
3180 fn choice_is_present_and_is_null() {
3181 let state = make_state();
3182 let arn = arn_for("choice-ispres");
3183 let def = json!({
3184 "StartAt": "C",
3185 "States": {
3186 "C": {
3187 "Type": "Choice",
3188 "Choices": [
3189 {"Variable": "$.foo", "IsPresent": true, "Next": "End"}
3190 ],
3191 "Default": "Fail"
3192 },
3193 "End": {"Type": "Pass", "End": true},
3194 "Fail": {"Type": "Fail"}
3195 }
3196 });
3197 drive(&state, &arn, def, Some(r#"{"foo":null}"#));
3198 assert_eq!(
3199 read_exec(&state, &arn, |e| e.status),
3200 ExecutionStatus::Succeeded
3201 );
3202 }
3203
3204 #[test]
3205 fn choice_or_short_circuits() {
3206 let state = make_state();
3207 let arn = arn_for("choice-or");
3208 let def = json!({
3209 "StartAt": "C",
3210 "States": {
3211 "C": {
3212 "Type": "Choice",
3213 "Choices": [{
3214 "Or": [
3215 {"Variable": "$.x", "NumericEquals": 99},
3216 {"Variable": "$.y", "StringEquals": "b"}
3217 ],
3218 "Next": "End"
3219 }],
3220 "Default": "Fail"
3221 },
3222 "End": {"Type": "Pass", "End": true},
3223 "Fail": {"Type": "Fail"}
3224 }
3225 });
3226 drive(&state, &arn, def, Some(r#"{"x":1,"y":"b"}"#));
3227 assert_eq!(
3228 read_exec(&state, &arn, |e| e.status),
3229 ExecutionStatus::Succeeded
3230 );
3231 }
3232
3233 #[test]
3234 fn choice_not_negates() {
3235 let state = make_state();
3236 let arn = arn_for("choice-not");
3237 let def = json!({
3238 "StartAt": "C",
3239 "States": {
3240 "C": {
3241 "Type": "Choice",
3242 "Choices": [{
3243 "Not": {"Variable": "$.x", "NumericEquals": 99},
3244 "Next": "End"
3245 }],
3246 "Default": "Fail"
3247 },
3248 "End": {"Type": "Pass", "End": true},
3249 "Fail": {"Type": "Fail"}
3250 }
3251 });
3252 drive(&state, &arn, def, Some(r#"{"x":1}"#));
3253 assert_eq!(
3254 read_exec(&state, &arn, |e| e.status),
3255 ExecutionStatus::Succeeded
3256 );
3257 }
3258
3259 #[test]
3260 fn choice_boolean_equals() {
3261 let state = make_state();
3262 let arn = arn_for("choice-bool");
3263 let def = json!({
3264 "StartAt": "C",
3265 "States": {
3266 "C": {
3267 "Type": "Choice",
3268 "Choices": [
3269 {"Variable": "$.ok", "BooleanEquals": true, "Next": "End"}
3270 ],
3271 "Default": "Fail"
3272 },
3273 "End": {"Type": "Pass", "End": true},
3274 "Fail": {"Type": "Fail"}
3275 }
3276 });
3277 drive(&state, &arn, def, Some(r#"{"ok":true}"#));
3278 assert_eq!(
3279 read_exec(&state, &arn, |e| e.status),
3280 ExecutionStatus::Succeeded
3281 );
3282 }
3283
3284 #[test]
3287 fn wait_seconds_path_uses_input_value() {
3288 let state = make_state();
3289 let arn = arn_for("wait-sp");
3290 let def = json!({
3291 "StartAt": "W",
3292 "States": {
3293 "W": {"Type": "Wait", "SecondsPath": "$.wait", "End": true}
3294 }
3295 });
3296 drive(&state, &arn, def, Some(r#"{"wait":0}"#));
3297 assert_eq!(
3298 read_exec(&state, &arn, |e| e.status),
3299 ExecutionStatus::Succeeded
3300 );
3301 }
3302
3303 #[test]
3306 fn map_state_empty_array_succeeds() {
3307 let state = make_state();
3308 let arn = arn_for("map-empty");
3309 let def = json!({
3310 "StartAt": "M",
3311 "States": {
3312 "M": {
3313 "Type": "Map",
3314 "ItemsPath": "$.items",
3315 "ItemProcessor": {
3316 "StartAt": "P",
3317 "States": {
3318 "P": {"Type": "Pass", "End": true}
3319 }
3320 },
3321 "End": true
3322 }
3323 }
3324 });
3325 drive(&state, &arn, def, Some(r#"{"items":[]}"#));
3326 assert_eq!(
3327 read_exec(&state, &arn, |e| e.status),
3328 ExecutionStatus::Succeeded
3329 );
3330 }
3331
3332 #[test]
3335 fn fail_state_with_explicit_error_and_cause() {
3336 let state = make_state();
3337 let arn = arn_for("fail-fields");
3338 create_execution(&state, &arn, None);
3339 let def = json!({
3340 "StartAt": "F",
3341 "States": {
3342 "F": {"Type": "Fail", "Error": "MyError", "Cause": "my cause"}
3343 }
3344 });
3345 drive(&state, &arn, def, None);
3346 let status = read_exec(&state, &arn, |e| e.status);
3347 assert_eq!(status, ExecutionStatus::Failed);
3348 let err = read_exec(&state, &arn, |e| e.error.clone().unwrap_or_default());
3349 assert_eq!(err, "MyError");
3350 }
3351}