1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::state::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
16
17pub async fn execute_state_machine(
20 state: SharedStepFunctionsState,
21 execution_arn: String,
22 definition: String,
23 input: Option<String>,
24 delivery: Option<Arc<DeliveryBus>>,
25 dynamodb_state: Option<SharedDynamoDbState>,
26) {
27 let def: Value = match serde_json::from_str(&definition) {
28 Ok(v) => v,
29 Err(e) => {
30 fail_execution(
31 &state,
32 &execution_arn,
33 "States.Runtime",
34 &format!("Failed to parse definition: {e}"),
35 );
36 return;
37 }
38 };
39
40 let raw_input: Value = input
41 .as_deref()
42 .and_then(|s| serde_json::from_str(s).ok())
43 .unwrap_or(json!({}));
44
45 add_event(
47 &state,
48 &execution_arn,
49 "ExecutionStarted",
50 0,
51 json!({
52 "input": serde_json::to_string(&raw_input).unwrap_or_default(),
53 "roleArn": "arn:aws:iam::123456789012:role/execution-role"
54 }),
55 );
56
57 match run_states(
58 &def,
59 raw_input,
60 &delivery,
61 &dynamodb_state,
62 &state,
63 &execution_arn,
64 )
65 .await
66 {
67 Ok(output) => {
68 succeed_execution(&state, &execution_arn, &output);
69 }
70 Err((error, cause)) => {
71 fail_execution(&state, &execution_arn, &error, &cause);
72 }
73 }
74}
75
76type StatesResult<'a> = std::pin::Pin<
77 Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
78>;
79
80fn run_states<'a>(
83 def: &'a Value,
84 input: Value,
85 delivery: &'a Option<Arc<DeliveryBus>>,
86 dynamodb_state: &'a Option<SharedDynamoDbState>,
87 shared_state: &'a SharedStepFunctionsState,
88 execution_arn: &'a str,
89) -> StatesResult<'a> {
90 Box::pin(async move {
91 let start_at = def["StartAt"]
92 .as_str()
93 .ok_or_else(|| {
94 (
95 "States.Runtime".to_string(),
96 "Missing StartAt in definition".to_string(),
97 )
98 })?
99 .to_string();
100
101 let states = def.get("States").ok_or_else(|| {
102 (
103 "States.Runtime".to_string(),
104 "Missing States in definition".to_string(),
105 )
106 })?;
107
108 let mut current_state = start_at;
109 let mut effective_input = input;
110 let mut iteration = 0;
111 let max_iterations = 500;
112
113 loop {
114 iteration += 1;
115 if iteration > max_iterations {
116 return Err((
117 "States.Runtime".to_string(),
118 "Maximum number of state transitions exceeded".to_string(),
119 ));
120 }
121
122 let state_def = states.get(¤t_state).cloned().ok_or_else(|| {
123 (
124 "States.Runtime".to_string(),
125 format!("State '{current_state}' not found in definition"),
126 )
127 })?;
128
129 let state_type = state_def["Type"]
130 .as_str()
131 .ok_or_else(|| {
132 (
133 "States.Runtime".to_string(),
134 format!("State '{current_state}' missing Type field"),
135 )
136 })?
137 .to_string();
138
139 debug!(
140 execution_arn = %execution_arn,
141 state = %current_state,
142 state_type = %state_type,
143 "Executing state"
144 );
145
146 let advance = match state_type.as_str() {
147 "Pass" => run_pass_state(
148 ¤t_state,
149 &state_def,
150 effective_input,
151 shared_state,
152 execution_arn,
153 ),
154 "Succeed" => run_succeed_state(
155 ¤t_state,
156 &state_def,
157 effective_input,
158 shared_state,
159 execution_arn,
160 ),
161 "Fail" => run_fail_state(
162 ¤t_state,
163 &state_def,
164 effective_input,
165 shared_state,
166 execution_arn,
167 ),
168 "Choice" => run_choice_state(
169 ¤t_state,
170 &state_def,
171 effective_input,
172 shared_state,
173 execution_arn,
174 ),
175 "Wait" => {
176 run_wait_state(
177 ¤t_state,
178 &state_def,
179 effective_input,
180 shared_state,
181 execution_arn,
182 )
183 .await
184 }
185 "Task" => {
186 run_task_state(
187 ¤t_state,
188 &state_def,
189 effective_input,
190 delivery,
191 dynamodb_state,
192 shared_state,
193 execution_arn,
194 )
195 .await
196 }
197 "Parallel" => {
198 run_parallel_state(
199 ¤t_state,
200 &state_def,
201 effective_input,
202 delivery,
203 dynamodb_state,
204 shared_state,
205 execution_arn,
206 )
207 .await
208 }
209 "Map" => {
210 run_map_state(
211 ¤t_state,
212 &state_def,
213 effective_input,
214 delivery,
215 dynamodb_state,
216 shared_state,
217 execution_arn,
218 )
219 .await
220 }
221 other => Advance::Fail(
222 "States.Runtime".to_string(),
223 format!("Unsupported state type: '{other}'"),
224 ),
225 };
226
227 match advance {
228 Advance::Next(next, new_input) => {
229 effective_input = new_input;
230 current_state = next;
231 }
232 Advance::End(output) => return Ok(output),
233 Advance::Fail(error, cause) => return Err((error, cause)),
234 }
235 }
236 })
237}
238
239enum Advance {
241 Next(String, Value),
243 End(Value),
245 Fail(String, String),
247}
248
249fn advance_from_next(state_def: &Value, input: Value) -> Advance {
250 match next_state(state_def) {
251 NextState::Name(next) => Advance::Next(next, input),
252 NextState::End => Advance::End(input),
253 NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
254 }
255}
256
257fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
258 match apply_state_catcher(state_def, input, &error, &cause) {
259 Some((next, new_input)) => Advance::Next(next, new_input),
260 None => Advance::Fail(error, cause),
261 }
262}
263
264fn run_pass_state(
265 name: &str,
266 state_def: &Value,
267 input: Value,
268 shared_state: &SharedStepFunctionsState,
269 execution_arn: &str,
270) -> Advance {
271 let entered_event_id = add_event(
272 shared_state,
273 execution_arn,
274 "PassStateEntered",
275 0,
276 json!({
277 "name": name,
278 "input": serde_json::to_string(&input).unwrap_or_default(),
279 }),
280 );
281
282 let result = execute_pass_state(state_def, &input);
283
284 add_event(
285 shared_state,
286 execution_arn,
287 "PassStateExited",
288 entered_event_id,
289 json!({
290 "name": name,
291 "output": serde_json::to_string(&result).unwrap_or_default(),
292 }),
293 );
294
295 advance_from_next(state_def, result)
296}
297
298fn run_succeed_state(
299 name: &str,
300 state_def: &Value,
301 input: Value,
302 shared_state: &SharedStepFunctionsState,
303 execution_arn: &str,
304) -> Advance {
305 add_event(
306 shared_state,
307 execution_arn,
308 "SucceedStateEntered",
309 0,
310 json!({
311 "name": name,
312 "input": serde_json::to_string(&input).unwrap_or_default(),
313 }),
314 );
315
316 let input_path = state_def["InputPath"].as_str();
317 let output_path = state_def["OutputPath"].as_str();
318
319 let processed = if input_path == Some("null") {
320 json!({})
321 } else {
322 apply_input_path(&input, input_path)
323 };
324
325 let output = if output_path == Some("null") {
326 json!({})
327 } else {
328 apply_output_path(&processed, output_path)
329 };
330
331 add_event(
332 shared_state,
333 execution_arn,
334 "SucceedStateExited",
335 0,
336 json!({
337 "name": name,
338 "output": serde_json::to_string(&output).unwrap_or_default(),
339 }),
340 );
341
342 Advance::End(output)
343}
344
345fn run_fail_state(
346 name: &str,
347 state_def: &Value,
348 input: Value,
349 shared_state: &SharedStepFunctionsState,
350 execution_arn: &str,
351) -> Advance {
352 let error = state_def["Error"]
353 .as_str()
354 .unwrap_or("States.Fail")
355 .to_string();
356 let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
357
358 add_event(
359 shared_state,
360 execution_arn,
361 "FailStateEntered",
362 0,
363 json!({
364 "name": name,
365 "input": serde_json::to_string(&input).unwrap_or_default(),
366 }),
367 );
368
369 Advance::Fail(error, cause)
370}
371
372fn run_choice_state(
373 name: &str,
374 state_def: &Value,
375 input: Value,
376 shared_state: &SharedStepFunctionsState,
377 execution_arn: &str,
378) -> Advance {
379 let entered_event_id = add_event(
380 shared_state,
381 execution_arn,
382 "ChoiceStateEntered",
383 0,
384 json!({
385 "name": name,
386 "input": serde_json::to_string(&input).unwrap_or_default(),
387 }),
388 );
389
390 let input_path = state_def["InputPath"].as_str();
391 let processed_input = if input_path == Some("null") {
392 json!({})
393 } else {
394 apply_input_path(&input, input_path)
395 };
396
397 match evaluate_choice(state_def, &processed_input) {
398 Some(next) => {
399 add_event(
400 shared_state,
401 execution_arn,
402 "ChoiceStateExited",
403 entered_event_id,
404 json!({
405 "name": name,
406 "output": serde_json::to_string(&input).unwrap_or_default(),
407 }),
408 );
409 Advance::Next(next, input)
410 }
411 None => Advance::Fail(
412 "States.NoChoiceMatched".to_string(),
413 format!("No choice rule matched and no Default in state '{name}'"),
414 ),
415 }
416}
417
418async fn run_wait_state(
419 name: &str,
420 state_def: &Value,
421 input: Value,
422 shared_state: &SharedStepFunctionsState,
423 execution_arn: &str,
424) -> Advance {
425 let entered_event_id = add_event(
426 shared_state,
427 execution_arn,
428 "WaitStateEntered",
429 0,
430 json!({
431 "name": name,
432 "input": serde_json::to_string(&input).unwrap_or_default(),
433 }),
434 );
435
436 execute_wait_state(state_def, &input).await;
437
438 add_event(
439 shared_state,
440 execution_arn,
441 "WaitStateExited",
442 entered_event_id,
443 json!({
444 "name": name,
445 "output": serde_json::to_string(&input).unwrap_or_default(),
446 }),
447 );
448
449 advance_from_next(state_def, input)
450}
451
452#[allow(clippy::too_many_arguments)]
453async fn run_task_state(
454 name: &str,
455 state_def: &Value,
456 input: Value,
457 delivery: &Option<Arc<DeliveryBus>>,
458 dynamodb_state: &Option<SharedDynamoDbState>,
459 shared_state: &SharedStepFunctionsState,
460 execution_arn: &str,
461) -> Advance {
462 let entered_event_id = add_event(
463 shared_state,
464 execution_arn,
465 "TaskStateEntered",
466 0,
467 json!({
468 "name": name,
469 "input": serde_json::to_string(&input).unwrap_or_default(),
470 }),
471 );
472
473 let result = execute_task_state(
474 state_def,
475 &input,
476 delivery,
477 dynamodb_state,
478 shared_state,
479 execution_arn,
480 entered_event_id,
481 )
482 .await;
483
484 match result {
485 Ok(output) => {
486 add_event(
487 shared_state,
488 execution_arn,
489 "TaskStateExited",
490 entered_event_id,
491 json!({
492 "name": name,
493 "output": serde_json::to_string(&output).unwrap_or_default(),
494 }),
495 );
496 advance_from_next(state_def, output)
497 }
498 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
499 }
500}
501
502#[allow(clippy::too_many_arguments)]
503async fn run_parallel_state(
504 name: &str,
505 state_def: &Value,
506 input: Value,
507 delivery: &Option<Arc<DeliveryBus>>,
508 dynamodb_state: &Option<SharedDynamoDbState>,
509 shared_state: &SharedStepFunctionsState,
510 execution_arn: &str,
511) -> Advance {
512 let entered_event_id = add_event(
513 shared_state,
514 execution_arn,
515 "ParallelStateEntered",
516 0,
517 json!({
518 "name": name,
519 "input": serde_json::to_string(&input).unwrap_or_default(),
520 }),
521 );
522
523 let result = execute_parallel_state(
524 state_def,
525 &input,
526 delivery,
527 dynamodb_state,
528 shared_state,
529 execution_arn,
530 )
531 .await;
532
533 match result {
534 Ok(output) => {
535 add_event(
536 shared_state,
537 execution_arn,
538 "ParallelStateExited",
539 entered_event_id,
540 json!({
541 "name": name,
542 "output": serde_json::to_string(&output).unwrap_or_default(),
543 }),
544 );
545 advance_from_next(state_def, output)
546 }
547 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
548 }
549}
550
551#[allow(clippy::too_many_arguments)]
552async fn run_map_state(
553 name: &str,
554 state_def: &Value,
555 input: Value,
556 delivery: &Option<Arc<DeliveryBus>>,
557 dynamodb_state: &Option<SharedDynamoDbState>,
558 shared_state: &SharedStepFunctionsState,
559 execution_arn: &str,
560) -> Advance {
561 let entered_event_id = add_event(
562 shared_state,
563 execution_arn,
564 "MapStateEntered",
565 0,
566 json!({
567 "name": name,
568 "input": serde_json::to_string(&input).unwrap_or_default(),
569 }),
570 );
571
572 let result = execute_map_state(
573 state_def,
574 &input,
575 delivery,
576 dynamodb_state,
577 shared_state,
578 execution_arn,
579 )
580 .await;
581
582 match result {
583 Ok(output) => {
584 add_event(
585 shared_state,
586 execution_arn,
587 "MapStateExited",
588 entered_event_id,
589 json!({
590 "name": name,
591 "output": serde_json::to_string(&output).unwrap_or_default(),
592 }),
593 );
594 advance_from_next(state_def, output)
595 }
596 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
597 }
598}
599
600async fn execute_wait_state(state_def: &Value, input: &Value) {
602 if let Some(seconds) = state_def["Seconds"].as_u64() {
603 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
604 return;
605 }
606
607 if let Some(path) = state_def["SecondsPath"].as_str() {
608 let val = crate::io_processing::resolve_path(input, path);
609 if let Some(seconds) = val.as_u64() {
610 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
611 }
612 return;
613 }
614
615 if let Some(ts_str) = state_def["Timestamp"].as_str() {
616 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
617 let now = Utc::now();
618 let target_utc = target.with_timezone(&chrono::Utc);
619 if target_utc > now {
620 let duration = (target_utc - now).to_std().unwrap_or_default();
621 tokio::time::sleep(duration).await;
622 }
623 }
624 return;
625 }
626
627 if let Some(path) = state_def["TimestampPath"].as_str() {
628 let val = crate::io_processing::resolve_path(input, path);
629 if let Some(ts_str) = val.as_str() {
630 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
631 let now = Utc::now();
632 let target_utc = target.with_timezone(&chrono::Utc);
633 if target_utc > now {
634 let duration = (target_utc - now).to_std().unwrap_or_default();
635 tokio::time::sleep(duration).await;
636 }
637 }
638 }
639 return;
640 }
641
642 warn!(
643 "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
644 );
645}
646
647fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
649 let input_path = state_def["InputPath"].as_str();
650 let result_path = state_def["ResultPath"].as_str();
651 let output_path = state_def["OutputPath"].as_str();
652
653 let effective_input = if input_path == Some("null") {
654 json!({})
655 } else {
656 apply_input_path(input, input_path)
657 };
658
659 let result = if let Some(r) = state_def.get("Result") {
660 r.clone()
661 } else {
662 effective_input.clone()
663 };
664
665 let after_result = if result_path == Some("null") {
666 input.clone()
667 } else {
668 apply_result_path(input, &result, result_path)
669 };
670
671 if output_path == Some("null") {
672 json!({})
673 } else {
674 apply_output_path(&after_result, output_path)
675 }
676}
677
678async fn execute_task_state(
681 state_def: &Value,
682 input: &Value,
683 delivery: &Option<Arc<DeliveryBus>>,
684 dynamodb_state: &Option<SharedDynamoDbState>,
685 shared_state: &SharedStepFunctionsState,
686 execution_arn: &str,
687 entered_event_id: i64,
688) -> Result<Value, (String, String)> {
689 let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
690
691 let input_path = state_def["InputPath"].as_str();
692 let result_path = state_def["ResultPath"].as_str();
693 let output_path = state_def["OutputPath"].as_str();
694
695 let effective_input = if input_path == Some("null") {
696 json!({})
697 } else {
698 apply_input_path(input, input_path)
699 };
700
701 let task_input = if let Some(params) = state_def.get("Parameters") {
702 apply_parameters(params, &effective_input)
703 } else {
704 effective_input
705 };
706
707 let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
708 let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
709 let mut attempt = 0u32;
710
711 loop {
712 add_event(
713 shared_state,
714 execution_arn,
715 "TaskScheduled",
716 entered_event_id,
717 json!({
718 "resource": resource,
719 "region": "us-east-1",
720 "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
721 }),
722 );
723
724 add_event(
725 shared_state,
726 execution_arn,
727 "TaskStarted",
728 entered_event_id,
729 json!({ "resource": resource }),
730 );
731
732 let invoke_result = invoke_resource(
733 &resource,
734 &task_input,
735 delivery,
736 dynamodb_state,
737 timeout_seconds,
738 )
739 .await;
740
741 match invoke_result {
742 Ok(result) => {
743 add_event(
744 shared_state,
745 execution_arn,
746 "TaskSucceeded",
747 entered_event_id,
748 json!({
749 "resource": resource,
750 "output": serde_json::to_string(&result).unwrap_or_default(),
751 }),
752 );
753
754 let selected = if let Some(selector) = state_def.get("ResultSelector") {
755 apply_parameters(selector, &result)
756 } else {
757 result
758 };
759
760 let after_result = if result_path == Some("null") {
761 input.clone()
762 } else {
763 apply_result_path(input, &selected, result_path)
764 };
765
766 let output = if output_path == Some("null") {
767 json!({})
768 } else {
769 apply_output_path(&after_result, output_path)
770 };
771
772 return Ok(output);
773 }
774 Err((error, cause)) => {
775 add_event(
776 shared_state,
777 execution_arn,
778 "TaskFailed",
779 entered_event_id,
780 json!({ "error": error, "cause": cause }),
781 );
782
783 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
784 attempt += 1;
785 let actual_delay = delay_ms.min(5000);
786 tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
787 continue;
788 }
789
790 return Err((error, cause));
791 }
792 }
793 }
794}
795
796async fn execute_parallel_state(
798 state_def: &Value,
799 input: &Value,
800 delivery: &Option<Arc<DeliveryBus>>,
801 dynamodb_state: &Option<SharedDynamoDbState>,
802 shared_state: &SharedStepFunctionsState,
803 execution_arn: &str,
804) -> Result<Value, (String, String)> {
805 let input_path = state_def["InputPath"].as_str();
806 let result_path = state_def["ResultPath"].as_str();
807 let output_path = state_def["OutputPath"].as_str();
808
809 let effective_input = if input_path == Some("null") {
810 json!({})
811 } else {
812 apply_input_path(input, input_path)
813 };
814
815 let branches = state_def["Branches"]
816 .as_array()
817 .cloned()
818 .unwrap_or_default();
819
820 if branches.is_empty() {
821 return Err((
822 "States.Runtime".to_string(),
823 "Parallel state has no Branches".to_string(),
824 ));
825 }
826
827 let mut handles = Vec::new();
829 for branch_def in &branches {
830 let branch = branch_def.clone();
831 let branch_input = effective_input.clone();
832 let delivery = delivery.clone();
833 let ddb = dynamodb_state.clone();
834 let state = shared_state.clone();
835 let arn = execution_arn.to_string();
836
837 handles.push(tokio::spawn(async move {
838 run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
839 }));
840 }
841
842 let mut results = Vec::with_capacity(handles.len());
844 for handle in handles {
845 let result = handle.await.map_err(|e| {
846 (
847 "States.Runtime".to_string(),
848 format!("Branch execution panicked: {e}"),
849 )
850 })??;
851 results.push(result);
852 }
853
854 let branch_output = Value::Array(results);
855
856 let selected = if let Some(selector) = state_def.get("ResultSelector") {
858 apply_parameters(selector, &branch_output)
859 } else {
860 branch_output
861 };
862
863 let after_result = if result_path == Some("null") {
865 input.clone()
866 } else {
867 apply_result_path(input, &selected, result_path)
868 };
869
870 let output = if output_path == Some("null") {
872 json!({})
873 } else {
874 apply_output_path(&after_result, output_path)
875 };
876
877 Ok(output)
878}
879
880async fn execute_map_state(
882 state_def: &Value,
883 input: &Value,
884 delivery: &Option<Arc<DeliveryBus>>,
885 dynamodb_state: &Option<SharedDynamoDbState>,
886 shared_state: &SharedStepFunctionsState,
887 execution_arn: &str,
888) -> Result<Value, (String, String)> {
889 let input_path = state_def["InputPath"].as_str();
890 let result_path = state_def["ResultPath"].as_str();
891 let output_path = state_def["OutputPath"].as_str();
892
893 let effective_input = if input_path == Some("null") {
894 json!({})
895 } else {
896 apply_input_path(input, input_path)
897 };
898
899 let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
901 let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
902 let items = items_value.as_array().cloned().unwrap_or_default();
903
904 let iterator_def = state_def
906 .get("ItemProcessor")
907 .or_else(|| state_def.get("Iterator"))
908 .cloned()
909 .ok_or_else(|| {
910 (
911 "States.Runtime".to_string(),
912 "Map state has no ItemProcessor or Iterator".to_string(),
913 )
914 })?;
915
916 let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
917 let effective_concurrency = if max_concurrency == 0 {
918 40
919 } else {
920 max_concurrency as usize
921 };
922
923 let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
924
925 let mut handles = Vec::new();
927 for (index, item) in items.into_iter().enumerate() {
928 let iter_def = iterator_def.clone();
929 let delivery = delivery.clone();
930 let ddb = dynamodb_state.clone();
931 let state = shared_state.clone();
932 let arn = execution_arn.to_string();
933 let sem = semaphore.clone();
934
935 let item_input = if let Some(selector) = state_def.get("ItemSelector") {
937 let mut ctx = serde_json::Map::new();
938 ctx.insert("value".to_string(), item.clone());
939 ctx.insert("index".to_string(), json!(index));
940 apply_parameters(selector, &Value::Object(ctx))
941 } else {
942 item
943 };
944
945 add_event(
946 shared_state,
947 execution_arn,
948 "MapIterationStarted",
949 0,
950 json!({ "index": index }),
951 );
952
953 handles.push(tokio::spawn(async move {
954 let _permit = sem
955 .acquire()
956 .await
957 .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
958 let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
959 Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
960 }));
961 }
962
963 let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
965 for handle in handles {
966 let (index, result) = handle.await.map_err(|e| {
967 (
968 "States.Runtime".to_string(),
969 format!("Map iteration panicked: {e}"),
970 )
971 })??;
972
973 match result {
974 Ok(output) => {
975 add_event(
976 shared_state,
977 execution_arn,
978 "MapIterationSucceeded",
979 0,
980 json!({ "index": index }),
981 );
982 results.push((index, output));
983 }
984 Err((error, cause)) => {
985 add_event(
986 shared_state,
987 execution_arn,
988 "MapIterationFailed",
989 0,
990 json!({ "index": index, "error": error }),
991 );
992 return Err((error, cause));
993 }
994 }
995 }
996
997 results.sort_by_key(|(i, _)| *i);
999 let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
1000
1001 let selected = if let Some(selector) = state_def.get("ResultSelector") {
1003 apply_parameters(selector, &map_output)
1004 } else {
1005 map_output
1006 };
1007
1008 let after_result = if result_path == Some("null") {
1010 input.clone()
1011 } else {
1012 apply_result_path(input, &selected, result_path)
1013 };
1014
1015 let output = if output_path == Some("null") {
1017 json!({})
1018 } else {
1019 apply_output_path(&after_result, output_path)
1020 };
1021
1022 Ok(output)
1023}
1024
1025async fn invoke_resource(
1027 resource: &str,
1028 input: &Value,
1029 delivery: &Option<Arc<DeliveryBus>>,
1030 dynamodb_state: &Option<SharedDynamoDbState>,
1031 timeout_seconds: Option<u64>,
1032) -> Result<Value, (String, String)> {
1033 if resource.contains(":lambda:") && resource.contains(":function:") {
1035 return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1036 }
1037
1038 if resource.starts_with("arn:aws:states:::lambda:invoke") {
1040 let function_name = input["FunctionName"].as_str().unwrap_or("");
1041 let payload = if let Some(p) = input.get("Payload") {
1042 p.clone()
1043 } else {
1044 input.clone()
1045 };
1046 return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1047 }
1048
1049 if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1050 return invoke_sqs_send_message(input, delivery);
1051 }
1052
1053 if resource.starts_with("arn:aws:states:::sns:publish") {
1054 return invoke_sns_publish(input, delivery);
1055 }
1056
1057 if resource.starts_with("arn:aws:states:::events:putEvents") {
1058 return invoke_eventbridge_put_events(input, delivery);
1059 }
1060
1061 if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1062 return invoke_dynamodb_get_item(input, dynamodb_state);
1063 }
1064
1065 if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1066 return invoke_dynamodb_put_item(input, dynamodb_state);
1067 }
1068
1069 if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1070 return invoke_dynamodb_delete_item(input, dynamodb_state);
1071 }
1072
1073 if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1074 return invoke_dynamodb_update_item(input, dynamodb_state);
1075 }
1076
1077 Err((
1078 "States.TaskFailed".to_string(),
1079 format!("Unsupported resource: {resource}"),
1080 ))
1081}
1082
1083fn invoke_sqs_send_message(
1085 input: &Value,
1086 delivery: &Option<Arc<DeliveryBus>>,
1087) -> Result<Value, (String, String)> {
1088 let delivery = delivery.as_ref().ok_or_else(|| {
1089 (
1090 "States.TaskFailed".to_string(),
1091 "No delivery bus configured for SQS".to_string(),
1092 )
1093 })?;
1094
1095 let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1096 (
1097 "States.TaskFailed".to_string(),
1098 "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1099 )
1100 })?;
1101
1102 let message_body = input["MessageBody"]
1103 .as_str()
1104 .map(|s| s.to_string())
1105 .unwrap_or_else(|| {
1106 serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1108 });
1109
1110 let queue_arn = queue_url_to_arn(queue_url);
1114
1115 delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1116
1117 Ok(json!({
1118 "MessageId": uuid::Uuid::new_v4().to_string(),
1119 "MD5OfMessageBody": md5_hex(&message_body),
1120 }))
1121}
1122
1123fn invoke_sns_publish(
1125 input: &Value,
1126 delivery: &Option<Arc<DeliveryBus>>,
1127) -> Result<Value, (String, String)> {
1128 let delivery = delivery.as_ref().ok_or_else(|| {
1129 (
1130 "States.TaskFailed".to_string(),
1131 "No delivery bus configured for SNS".to_string(),
1132 )
1133 })?;
1134
1135 let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1136 (
1137 "States.TaskFailed".to_string(),
1138 "Missing TopicArn in SNS publish parameters".to_string(),
1139 )
1140 })?;
1141
1142 let message = input["Message"]
1143 .as_str()
1144 .map(|s| s.to_string())
1145 .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1146
1147 let subject = input["Subject"].as_str();
1148
1149 delivery.publish_to_sns(topic_arn, &message, subject);
1150
1151 Ok(json!({
1152 "MessageId": uuid::Uuid::new_v4().to_string(),
1153 }))
1154}
1155
1156fn invoke_eventbridge_put_events(
1158 input: &Value,
1159 delivery: &Option<Arc<DeliveryBus>>,
1160) -> Result<Value, (String, String)> {
1161 let delivery = delivery.as_ref().ok_or_else(|| {
1162 (
1163 "States.TaskFailed".to_string(),
1164 "No delivery bus configured for EventBridge".to_string(),
1165 )
1166 })?;
1167
1168 let entries = input["Entries"]
1169 .as_array()
1170 .ok_or_else(|| {
1171 (
1172 "States.TaskFailed".to_string(),
1173 "Missing Entries in EventBridge putEvents parameters".to_string(),
1174 )
1175 })?
1176 .clone();
1177
1178 let mut event_ids = Vec::new();
1179 for entry in &entries {
1180 let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1181 let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1182 let detail = entry["Detail"]
1183 .as_str()
1184 .map(|s| s.to_string())
1185 .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1186 let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1187
1188 delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1189 event_ids.push(uuid::Uuid::new_v4().to_string());
1190 }
1191
1192 Ok(json!({
1193 "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1194 "FailedEntryCount": 0,
1195 }))
1196}
1197
1198fn invoke_dynamodb_get_item(
1200 input: &Value,
1201 dynamodb_state: &Option<SharedDynamoDbState>,
1202) -> Result<Value, (String, String)> {
1203 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1204 (
1205 "States.TaskFailed".to_string(),
1206 "No DynamoDB state configured".to_string(),
1207 )
1208 })?;
1209
1210 let table_name = input["TableName"].as_str().ok_or_else(|| {
1211 (
1212 "States.TaskFailed".to_string(),
1213 "Missing TableName in DynamoDB getItem parameters".to_string(),
1214 )
1215 })?;
1216
1217 let key = input
1218 .get("Key")
1219 .and_then(|k| k.as_object())
1220 .ok_or_else(|| {
1221 (
1222 "States.TaskFailed".to_string(),
1223 "Missing Key in DynamoDB getItem parameters".to_string(),
1224 )
1225 })?;
1226
1227 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1228
1229 let state = ddb.read();
1230 let table = state.tables.get(table_name).ok_or_else(|| {
1231 (
1232 "States.TaskFailed".to_string(),
1233 format!("Table '{table_name}' not found"),
1234 )
1235 })?;
1236
1237 let item = table
1238 .find_item_index(&key_map)
1239 .map(|idx| table.items[idx].clone());
1240
1241 match item {
1242 Some(item_map) => {
1243 let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1244 Ok(json!({ "Item": item_value }))
1245 }
1246 None => Ok(json!({})),
1247 }
1248}
1249
1250fn invoke_dynamodb_put_item(
1252 input: &Value,
1253 dynamodb_state: &Option<SharedDynamoDbState>,
1254) -> Result<Value, (String, String)> {
1255 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1256 (
1257 "States.TaskFailed".to_string(),
1258 "No DynamoDB state configured".to_string(),
1259 )
1260 })?;
1261
1262 let table_name = input["TableName"].as_str().ok_or_else(|| {
1263 (
1264 "States.TaskFailed".to_string(),
1265 "Missing TableName in DynamoDB putItem parameters".to_string(),
1266 )
1267 })?;
1268
1269 let item = input
1270 .get("Item")
1271 .and_then(|i| i.as_object())
1272 .ok_or_else(|| {
1273 (
1274 "States.TaskFailed".to_string(),
1275 "Missing Item in DynamoDB putItem parameters".to_string(),
1276 )
1277 })?;
1278
1279 let item_map: HashMap<String, Value> =
1280 item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1281
1282 let mut state = ddb.write();
1283 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1284 (
1285 "States.TaskFailed".to_string(),
1286 format!("Table '{table_name}' not found"),
1287 )
1288 })?;
1289
1290 if let Some(idx) = table.find_item_index(&item_map) {
1292 table.items[idx] = item_map;
1293 } else {
1294 table.items.push(item_map);
1295 }
1296
1297 Ok(json!({}))
1298}
1299
1300fn invoke_dynamodb_delete_item(
1302 input: &Value,
1303 dynamodb_state: &Option<SharedDynamoDbState>,
1304) -> Result<Value, (String, String)> {
1305 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1306 (
1307 "States.TaskFailed".to_string(),
1308 "No DynamoDB state configured".to_string(),
1309 )
1310 })?;
1311
1312 let table_name = input["TableName"].as_str().ok_or_else(|| {
1313 (
1314 "States.TaskFailed".to_string(),
1315 "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1316 )
1317 })?;
1318
1319 let key = input
1320 .get("Key")
1321 .and_then(|k| k.as_object())
1322 .ok_or_else(|| {
1323 (
1324 "States.TaskFailed".to_string(),
1325 "Missing Key in DynamoDB deleteItem parameters".to_string(),
1326 )
1327 })?;
1328
1329 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1330
1331 let mut state = ddb.write();
1332 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1333 (
1334 "States.TaskFailed".to_string(),
1335 format!("Table '{table_name}' not found"),
1336 )
1337 })?;
1338
1339 if let Some(idx) = table.find_item_index(&key_map) {
1340 table.items.remove(idx);
1341 }
1342
1343 Ok(json!({}))
1344}
1345
1346fn invoke_dynamodb_update_item(
1349 input: &Value,
1350 dynamodb_state: &Option<SharedDynamoDbState>,
1351) -> Result<Value, (String, String)> {
1352 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1353 (
1354 "States.TaskFailed".to_string(),
1355 "No DynamoDB state configured".to_string(),
1356 )
1357 })?;
1358
1359 let table_name = input["TableName"].as_str().ok_or_else(|| {
1360 (
1361 "States.TaskFailed".to_string(),
1362 "Missing TableName in DynamoDB updateItem parameters".to_string(),
1363 )
1364 })?;
1365
1366 let key = input
1367 .get("Key")
1368 .and_then(|k| k.as_object())
1369 .ok_or_else(|| {
1370 (
1371 "States.TaskFailed".to_string(),
1372 "Missing Key in DynamoDB updateItem parameters".to_string(),
1373 )
1374 })?;
1375
1376 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1377
1378 let mut state = ddb.write();
1379 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1380 (
1381 "States.TaskFailed".to_string(),
1382 format!("Table '{table_name}' not found"),
1383 )
1384 })?;
1385
1386 if let Some(update_expr) = input["UpdateExpression"].as_str() {
1388 let attr_values = input
1389 .get("ExpressionAttributeValues")
1390 .and_then(|v| v.as_object())
1391 .cloned()
1392 .unwrap_or_default();
1393 let attr_names = input
1394 .get("ExpressionAttributeNames")
1395 .and_then(|v| v.as_object())
1396 .cloned()
1397 .unwrap_or_default();
1398
1399 if let Some(idx) = table.find_item_index(&key_map) {
1400 apply_update_expression(
1401 &mut table.items[idx],
1402 update_expr,
1403 &attr_values,
1404 &attr_names,
1405 );
1406 } else {
1407 let mut new_item = key_map;
1409 apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1410 table.items.push(new_item);
1411 }
1412 }
1413
1414 Ok(json!({}))
1415}
1416
1417fn apply_update_expression(
1419 item: &mut HashMap<String, Value>,
1420 expr: &str,
1421 attr_values: &serde_json::Map<String, Value>,
1422 attr_names: &serde_json::Map<String, Value>,
1423) {
1424 let trimmed = expr.trim();
1427 let set_part = if trimmed.len() >= 4 && trimmed[..4].eq_ignore_ascii_case("SET ") {
1428 &trimmed[4..]
1429 } else {
1430 trimmed
1431 };
1432
1433 for assignment in set_part.split(',') {
1434 let parts: Vec<&str> = assignment.splitn(2, '=').collect();
1435 if parts.len() == 2 {
1436 let attr_ref = parts[0].trim();
1437 let val_ref = parts[1].trim();
1438
1439 let attr_name = if attr_ref.starts_with('#') {
1441 attr_names
1442 .get(attr_ref)
1443 .and_then(|v| v.as_str())
1444 .unwrap_or(attr_ref)
1445 .to_string()
1446 } else {
1447 attr_ref.to_string()
1448 };
1449
1450 if val_ref.starts_with(':') {
1452 if let Some(val) = attr_values.get(val_ref) {
1453 item.insert(attr_name, val.clone());
1454 }
1455 }
1456 }
1457 }
1458}
1459
1460fn queue_url_to_arn(url: &str) -> String {
1463 let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1464 if parts.len() >= 2 {
1465 let queue_name = parts[0];
1466 let account_id = parts[1];
1467 Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
1468 } else {
1469 url.to_string()
1470 }
1471}
1472
1473fn md5_hex(data: &str) -> String {
1475 use md5::Digest;
1476 let result = md5::Md5::digest(data.as_bytes());
1477 format!("{result:032x}")
1478}
1479
1480async fn invoke_lambda_direct(
1482 function_arn: &str,
1483 input: &Value,
1484 delivery: &Option<Arc<DeliveryBus>>,
1485 timeout_seconds: Option<u64>,
1486) -> Result<Value, (String, String)> {
1487 let delivery = delivery.as_ref().ok_or_else(|| {
1488 (
1489 "States.TaskFailed".to_string(),
1490 "No delivery bus configured for Lambda invocation".to_string(),
1491 )
1492 })?;
1493
1494 let payload = serde_json::to_string(input).unwrap_or_default();
1495
1496 let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1497
1498 let result = if let Some(timeout) = timeout_seconds {
1499 match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1500 Ok(r) => r,
1501 Err(_) => {
1502 return Err((
1503 "States.Timeout".to_string(),
1504 format!("Task timed out after {timeout} seconds"),
1505 ));
1506 }
1507 }
1508 } else {
1509 invoke_future.await
1510 };
1511
1512 match result {
1513 Some(Ok(bytes)) => {
1514 let response_str = String::from_utf8_lossy(&bytes);
1515 let value: Value =
1516 serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1517 Ok(value)
1518 }
1519 Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1520 None => {
1521 Ok(json!({}))
1523 }
1524 }
1525}
1526
1527fn apply_parameters(template: &Value, input: &Value) -> Value {
1529 match template {
1530 Value::Object(map) => {
1531 let mut result = serde_json::Map::new();
1532 for (key, value) in map {
1533 if let Some(stripped) = key.strip_suffix(".$") {
1534 if let Some(path) = value.as_str() {
1535 result.insert(
1536 stripped.to_string(),
1537 crate::io_processing::resolve_path(input, path),
1538 );
1539 }
1540 } else {
1541 result.insert(key.clone(), apply_parameters(value, input));
1542 }
1543 }
1544 Value::Object(result)
1545 }
1546 Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1547 other => other.clone(),
1548 }
1549}
1550
1551enum NextState {
1552 Name(String),
1553 End,
1554 Error(String),
1555}
1556
1557fn next_state(state_def: &Value) -> NextState {
1558 if state_def["End"].as_bool() == Some(true) {
1559 return NextState::End;
1560 }
1561 match state_def["Next"].as_str() {
1562 Some(next) => NextState::Name(next.to_string()),
1563 None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
1564 }
1565}
1566
1567fn apply_state_catcher(
1572 state_def: &Value,
1573 effective_input: &Value,
1574 error: &str,
1575 cause: &str,
1576) -> Option<(String, Value)> {
1577 let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
1578 let (next, result_path) = find_catcher(&catchers, error)?;
1579 let error_output = json!({
1580 "Error": error,
1581 "Cause": cause,
1582 });
1583 let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
1584 Some((next, new_input))
1585}
1586
1587fn add_event(
1588 state: &SharedStepFunctionsState,
1589 execution_arn: &str,
1590 event_type: &str,
1591 previous_event_id: i64,
1592 details: Value,
1593) -> i64 {
1594 let mut s = state.write();
1595 if let Some(exec) = s.executions.get_mut(execution_arn) {
1596 let id = exec.history_events.len() as i64 + 1;
1597 exec.history_events.push(HistoryEvent {
1598 id,
1599 event_type: event_type.to_string(),
1600 timestamp: Utc::now(),
1601 previous_event_id,
1602 details,
1603 });
1604 id
1605 } else {
1606 0
1607 }
1608}
1609
1610fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
1611 {
1613 let s = state.read();
1614 if let Some(exec) = s.executions.get(execution_arn) {
1615 if exec.status != ExecutionStatus::Running {
1616 return;
1617 }
1618 }
1619 }
1620
1621 let output_str = serde_json::to_string(output).unwrap_or_default();
1622
1623 add_event(
1624 state,
1625 execution_arn,
1626 "ExecutionSucceeded",
1627 0,
1628 json!({ "output": output_str }),
1629 );
1630
1631 let mut s = state.write();
1632 if let Some(exec) = s.executions.get_mut(execution_arn) {
1633 exec.status = ExecutionStatus::Succeeded;
1634 exec.output = Some(output_str);
1635 exec.stop_date = Some(Utc::now());
1636 }
1637}
1638
1639fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
1640 {
1642 let s = state.read();
1643 if let Some(exec) = s.executions.get(execution_arn) {
1644 if exec.status != ExecutionStatus::Running {
1645 return;
1646 }
1647 }
1648 }
1649
1650 add_event(
1651 state,
1652 execution_arn,
1653 "ExecutionFailed",
1654 0,
1655 json!({ "error": error, "cause": cause }),
1656 );
1657
1658 let mut s = state.write();
1659 if let Some(exec) = s.executions.get_mut(execution_arn) {
1660 exec.status = ExecutionStatus::Failed;
1661 exec.error = Some(error.to_string());
1662 exec.cause = Some(cause.to_string());
1663 exec.stop_date = Some(Utc::now());
1664 }
1665}
1666
1667#[cfg(test)]
1668mod tests {
1669 use super::*;
1670 use crate::state::{Execution, StepFunctionsState};
1671 use parking_lot::RwLock;
1672 use std::sync::Arc;
1673
1674 fn make_state() -> SharedStepFunctionsState {
1675 Arc::new(RwLock::new(StepFunctionsState::new(
1676 "123456789012",
1677 "us-east-1",
1678 )))
1679 }
1680
1681 fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
1682 let mut s = state.write();
1683 s.executions.insert(
1684 arn.to_string(),
1685 Execution {
1686 execution_arn: arn.to_string(),
1687 state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
1688 .to_string(),
1689 state_machine_name: "test".to_string(),
1690 name: "exec-1".to_string(),
1691 status: ExecutionStatus::Running,
1692 input,
1693 output: None,
1694 start_date: Utc::now(),
1695 stop_date: None,
1696 error: None,
1697 cause: None,
1698 history_events: vec![],
1699 },
1700 );
1701 }
1702
1703 #[tokio::test]
1704 async fn test_simple_pass_state() {
1705 let state = make_state();
1706 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1707 create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
1708
1709 let definition = json!({
1710 "StartAt": "PassState",
1711 "States": {
1712 "PassState": {
1713 "Type": "Pass",
1714 "Result": {"processed": true},
1715 "End": true
1716 }
1717 }
1718 })
1719 .to_string();
1720
1721 execute_state_machine(
1722 state.clone(),
1723 arn.to_string(),
1724 definition,
1725 Some(r#"{"hello":"world"}"#.to_string()),
1726 None,
1727 None,
1728 )
1729 .await;
1730
1731 let s = state.read();
1732 let exec = s.executions.get(arn).unwrap();
1733 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1734 assert!(exec.output.is_some());
1735 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1736 assert_eq!(output, json!({"processed": true}));
1737 }
1738
1739 #[tokio::test]
1740 async fn test_pass_chain() {
1741 let state = make_state();
1742 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1743 create_execution(&state, arn, Some(r#"{}"#.to_string()));
1744
1745 let definition = json!({
1746 "StartAt": "First",
1747 "States": {
1748 "First": {
1749 "Type": "Pass",
1750 "Result": "step1",
1751 "ResultPath": "$.first",
1752 "Next": "Second"
1753 },
1754 "Second": {
1755 "Type": "Pass",
1756 "Result": "step2",
1757 "ResultPath": "$.second",
1758 "End": true
1759 }
1760 }
1761 })
1762 .to_string();
1763
1764 execute_state_machine(
1765 state.clone(),
1766 arn.to_string(),
1767 definition,
1768 Some("{}".to_string()),
1769 None,
1770 None,
1771 )
1772 .await;
1773
1774 let s = state.read();
1775 let exec = s.executions.get(arn).unwrap();
1776 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1777 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1778 assert_eq!(output["first"], json!("step1"));
1779 assert_eq!(output["second"], json!("step2"));
1780 }
1781
1782 #[tokio::test]
1783 async fn test_succeed_state() {
1784 let state = make_state();
1785 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1786 create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
1787
1788 let definition = json!({
1789 "StartAt": "Done",
1790 "States": {
1791 "Done": {
1792 "Type": "Succeed"
1793 }
1794 }
1795 })
1796 .to_string();
1797
1798 execute_state_machine(
1799 state.clone(),
1800 arn.to_string(),
1801 definition,
1802 Some(r#"{"data": "value"}"#.to_string()),
1803 None,
1804 None,
1805 )
1806 .await;
1807
1808 let s = state.read();
1809 let exec = s.executions.get(arn).unwrap();
1810 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1811 }
1812
1813 #[tokio::test]
1814 async fn test_fail_state() {
1815 let state = make_state();
1816 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1817 create_execution(&state, arn, None);
1818
1819 let definition = json!({
1820 "StartAt": "FailState",
1821 "States": {
1822 "FailState": {
1823 "Type": "Fail",
1824 "Error": "CustomError",
1825 "Cause": "Something went wrong"
1826 }
1827 }
1828 })
1829 .to_string();
1830
1831 execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
1832
1833 let s = state.read();
1834 let exec = s.executions.get(arn).unwrap();
1835 assert_eq!(exec.status, ExecutionStatus::Failed);
1836 assert_eq!(exec.error.as_deref(), Some("CustomError"));
1837 assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
1838 }
1839
1840 #[tokio::test]
1841 async fn test_history_events_recorded() {
1842 let state = make_state();
1843 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1844 create_execution(&state, arn, Some("{}".to_string()));
1845
1846 let definition = json!({
1847 "StartAt": "PassState",
1848 "States": {
1849 "PassState": {
1850 "Type": "Pass",
1851 "End": true
1852 }
1853 }
1854 })
1855 .to_string();
1856
1857 execute_state_machine(
1858 state.clone(),
1859 arn.to_string(),
1860 definition,
1861 Some("{}".to_string()),
1862 None,
1863 None,
1864 )
1865 .await;
1866
1867 let s = state.read();
1868 let exec = s.executions.get(arn).unwrap();
1869 let event_types: Vec<&str> = exec
1870 .history_events
1871 .iter()
1872 .map(|e| e.event_type.as_str())
1873 .collect();
1874 assert_eq!(
1875 event_types,
1876 vec![
1877 "ExecutionStarted",
1878 "PassStateEntered",
1879 "PassStateExited",
1880 "ExecutionSucceeded"
1881 ]
1882 );
1883 }
1884}