1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::debug;
7
8use fakecloud_core::delivery::DeliveryBus;
9use fakecloud_dynamodb::state::SharedDynamoDbState;
10
11use crate::choice::evaluate_choice;
12use crate::error_handling::{find_catcher, should_retry};
13use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
14use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
15
16pub async fn execute_state_machine(
19 state: SharedStepFunctionsState,
20 execution_arn: String,
21 definition: String,
22 input: Option<String>,
23 delivery: Option<Arc<DeliveryBus>>,
24 dynamodb_state: Option<SharedDynamoDbState>,
25) {
26 let def: Value = match serde_json::from_str(&definition) {
27 Ok(v) => v,
28 Err(e) => {
29 fail_execution(
30 &state,
31 &execution_arn,
32 "States.Runtime",
33 &format!("Failed to parse definition: {e}"),
34 );
35 return;
36 }
37 };
38
39 let raw_input: Value = input
40 .as_deref()
41 .and_then(|s| serde_json::from_str(s).ok())
42 .unwrap_or(json!({}));
43
44 add_event(
46 &state,
47 &execution_arn,
48 "ExecutionStarted",
49 0,
50 json!({
51 "input": serde_json::to_string(&raw_input).unwrap_or_default(),
52 "roleArn": "arn:aws:iam::123456789012:role/execution-role"
53 }),
54 );
55
56 match run_states(
57 &def,
58 raw_input,
59 &delivery,
60 &dynamodb_state,
61 &state,
62 &execution_arn,
63 )
64 .await
65 {
66 Ok(output) => {
67 succeed_execution(&state, &execution_arn, &output);
68 }
69 Err((error, cause)) => {
70 fail_execution(&state, &execution_arn, &error, &cause);
71 }
72 }
73}
74
75type StatesResult<'a> = std::pin::Pin<
76 Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
77>;
78
79fn run_states<'a>(
82 def: &'a Value,
83 input: Value,
84 delivery: &'a Option<Arc<DeliveryBus>>,
85 dynamodb_state: &'a Option<SharedDynamoDbState>,
86 shared_state: &'a SharedStepFunctionsState,
87 execution_arn: &'a str,
88) -> StatesResult<'a> {
89 Box::pin(async move {
90 let start_at = def["StartAt"]
91 .as_str()
92 .ok_or_else(|| {
93 (
94 "States.Runtime".to_string(),
95 "Missing StartAt in definition".to_string(),
96 )
97 })?
98 .to_string();
99
100 let states = def.get("States").ok_or_else(|| {
101 (
102 "States.Runtime".to_string(),
103 "Missing States in definition".to_string(),
104 )
105 })?;
106
107 let mut current_state = start_at;
108 let mut effective_input = input;
109 let mut iteration = 0;
110 let max_iterations = 500;
111
112 loop {
113 iteration += 1;
114 if iteration > max_iterations {
115 return Err((
116 "States.Runtime".to_string(),
117 "Maximum number of state transitions exceeded".to_string(),
118 ));
119 }
120
121 let state_def = states.get(¤t_state).cloned().ok_or_else(|| {
122 (
123 "States.Runtime".to_string(),
124 format!("State '{current_state}' not found in definition"),
125 )
126 })?;
127
128 let state_type = state_def["Type"]
129 .as_str()
130 .ok_or_else(|| {
131 (
132 "States.Runtime".to_string(),
133 format!("State '{current_state}' missing Type field"),
134 )
135 })?
136 .to_string();
137
138 debug!(
139 execution_arn = %execution_arn,
140 state = %current_state,
141 state_type = %state_type,
142 "Executing state"
143 );
144
145 match state_type.as_str() {
146 "Pass" => {
147 let entered_event_id = add_event(
148 shared_state,
149 execution_arn,
150 "PassStateEntered",
151 0,
152 json!({
153 "name": current_state,
154 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
155 }),
156 );
157
158 let result = execute_pass_state(&state_def, &effective_input);
159
160 add_event(
161 shared_state,
162 execution_arn,
163 "PassStateExited",
164 entered_event_id,
165 json!({
166 "name": current_state,
167 "output": serde_json::to_string(&result).unwrap_or_default(),
168 }),
169 );
170
171 effective_input = result;
172
173 match next_state(&state_def) {
174 NextState::Name(next) => current_state = next,
175 NextState::End => return Ok(effective_input),
176 NextState::Error(msg) => {
177 return Err(("States.Runtime".to_string(), msg));
178 }
179 }
180 }
181
182 "Succeed" => {
183 add_event(
184 shared_state,
185 execution_arn,
186 "SucceedStateEntered",
187 0,
188 json!({
189 "name": current_state,
190 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
191 }),
192 );
193
194 let input_path = state_def["InputPath"].as_str();
195 let output_path = state_def["OutputPath"].as_str();
196
197 let processed = if input_path == Some("null") {
198 json!({})
199 } else {
200 apply_input_path(&effective_input, input_path)
201 };
202
203 let output = if output_path == Some("null") {
204 json!({})
205 } else {
206 apply_output_path(&processed, output_path)
207 };
208
209 add_event(
210 shared_state,
211 execution_arn,
212 "SucceedStateExited",
213 0,
214 json!({
215 "name": current_state,
216 "output": serde_json::to_string(&output).unwrap_or_default(),
217 }),
218 );
219
220 return Ok(output);
221 }
222
223 "Fail" => {
224 let error = state_def["Error"]
225 .as_str()
226 .unwrap_or("States.Fail")
227 .to_string();
228 let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
229
230 add_event(
231 shared_state,
232 execution_arn,
233 "FailStateEntered",
234 0,
235 json!({
236 "name": current_state,
237 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
238 }),
239 );
240
241 return Err((error, cause));
242 }
243
244 "Task" => {
245 let entered_event_id = add_event(
246 shared_state,
247 execution_arn,
248 "TaskStateEntered",
249 0,
250 json!({
251 "name": current_state,
252 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
253 }),
254 );
255
256 let result = execute_task_state(
257 &state_def,
258 &effective_input,
259 delivery,
260 dynamodb_state,
261 shared_state,
262 execution_arn,
263 entered_event_id,
264 )
265 .await;
266
267 match result {
268 Ok(output) => {
269 add_event(
270 shared_state,
271 execution_arn,
272 "TaskStateExited",
273 entered_event_id,
274 json!({
275 "name": current_state,
276 "output": serde_json::to_string(&output).unwrap_or_default(),
277 }),
278 );
279
280 effective_input = output;
281
282 match next_state(&state_def) {
283 NextState::Name(next) => current_state = next,
284 NextState::End => return Ok(effective_input),
285 NextState::Error(msg) => {
286 return Err(("States.Runtime".to_string(), msg));
287 }
288 }
289 }
290 Err((error, cause)) => {
291 let catchers =
292 state_def["Catch"].as_array().cloned().unwrap_or_default();
293
294 if let Some((next, result_path)) = find_catcher(&catchers, &error) {
295 let error_output = json!({
296 "Error": error,
297 "Cause": cause,
298 });
299 effective_input = apply_result_path(
300 &effective_input,
301 &error_output,
302 result_path.as_deref(),
303 );
304 current_state = next;
305 } else {
306 return Err((error, cause));
307 }
308 }
309 }
310 }
311
312 "Choice" => {
313 let entered_event_id = add_event(
314 shared_state,
315 execution_arn,
316 "ChoiceStateEntered",
317 0,
318 json!({
319 "name": current_state,
320 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
321 }),
322 );
323
324 let input_path = state_def["InputPath"].as_str();
325 let processed_input = if input_path == Some("null") {
326 json!({})
327 } else {
328 apply_input_path(&effective_input, input_path)
329 };
330
331 match evaluate_choice(&state_def, &processed_input) {
332 Some(next) => {
333 add_event(
334 shared_state,
335 execution_arn,
336 "ChoiceStateExited",
337 entered_event_id,
338 json!({
339 "name": current_state,
340 "output": serde_json::to_string(&effective_input).unwrap_or_default(),
341 }),
342 );
343 current_state = next;
344 }
345 None => {
346 return Err((
347 "States.NoChoiceMatched".to_string(),
348 format!(
349 "No choice rule matched and no Default in state '{current_state}'"
350 ),
351 ));
352 }
353 }
354 }
355
356 "Wait" => {
357 let entered_event_id = add_event(
358 shared_state,
359 execution_arn,
360 "WaitStateEntered",
361 0,
362 json!({
363 "name": current_state,
364 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
365 }),
366 );
367
368 execute_wait_state(&state_def, &effective_input).await;
369
370 add_event(
371 shared_state,
372 execution_arn,
373 "WaitStateExited",
374 entered_event_id,
375 json!({
376 "name": current_state,
377 "output": serde_json::to_string(&effective_input).unwrap_or_default(),
378 }),
379 );
380
381 match next_state(&state_def) {
382 NextState::Name(next) => current_state = next,
383 NextState::End => return Ok(effective_input),
384 NextState::Error(msg) => {
385 return Err(("States.Runtime".to_string(), msg));
386 }
387 }
388 }
389
390 "Parallel" => {
391 let entered_event_id = add_event(
392 shared_state,
393 execution_arn,
394 "ParallelStateEntered",
395 0,
396 json!({
397 "name": current_state,
398 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
399 }),
400 );
401
402 let result = execute_parallel_state(
403 &state_def,
404 &effective_input,
405 delivery,
406 dynamodb_state,
407 shared_state,
408 execution_arn,
409 )
410 .await;
411
412 match result {
413 Ok(output) => {
414 add_event(
415 shared_state,
416 execution_arn,
417 "ParallelStateExited",
418 entered_event_id,
419 json!({
420 "name": current_state,
421 "output": serde_json::to_string(&output).unwrap_or_default(),
422 }),
423 );
424
425 effective_input = output;
426
427 match next_state(&state_def) {
428 NextState::Name(next) => current_state = next,
429 NextState::End => return Ok(effective_input),
430 NextState::Error(msg) => {
431 return Err(("States.Runtime".to_string(), msg));
432 }
433 }
434 }
435 Err((error, cause)) => {
436 let catchers =
437 state_def["Catch"].as_array().cloned().unwrap_or_default();
438
439 if let Some((next, result_path)) = find_catcher(&catchers, &error) {
440 let error_output = json!({
441 "Error": error,
442 "Cause": cause,
443 });
444 effective_input = apply_result_path(
445 &effective_input,
446 &error_output,
447 result_path.as_deref(),
448 );
449 current_state = next;
450 } else {
451 return Err((error, cause));
452 }
453 }
454 }
455 }
456
457 "Map" => {
458 let entered_event_id = add_event(
459 shared_state,
460 execution_arn,
461 "MapStateEntered",
462 0,
463 json!({
464 "name": current_state,
465 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
466 }),
467 );
468
469 let result = execute_map_state(
470 &state_def,
471 &effective_input,
472 delivery,
473 dynamodb_state,
474 shared_state,
475 execution_arn,
476 )
477 .await;
478
479 match result {
480 Ok(output) => {
481 add_event(
482 shared_state,
483 execution_arn,
484 "MapStateExited",
485 entered_event_id,
486 json!({
487 "name": current_state,
488 "output": serde_json::to_string(&output).unwrap_or_default(),
489 }),
490 );
491
492 effective_input = output;
493
494 match next_state(&state_def) {
495 NextState::Name(next) => current_state = next,
496 NextState::End => return Ok(effective_input),
497 NextState::Error(msg) => {
498 return Err(("States.Runtime".to_string(), msg));
499 }
500 }
501 }
502 Err((error, cause)) => {
503 let catchers =
504 state_def["Catch"].as_array().cloned().unwrap_or_default();
505
506 if let Some((next, result_path)) = find_catcher(&catchers, &error) {
507 let error_output = json!({
508 "Error": error,
509 "Cause": cause,
510 });
511 effective_input = apply_result_path(
512 &effective_input,
513 &error_output,
514 result_path.as_deref(),
515 );
516 current_state = next;
517 } else {
518 return Err((error, cause));
519 }
520 }
521 }
522 }
523
524 other => {
525 return Err((
526 "States.Runtime".to_string(),
527 format!("Unsupported state type: '{other}'"),
528 ));
529 }
530 }
531 }
532 })
533}
534
535async fn execute_wait_state(state_def: &Value, input: &Value) {
537 if let Some(seconds) = state_def["Seconds"].as_u64() {
538 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
539 return;
540 }
541
542 if let Some(path) = state_def["SecondsPath"].as_str() {
543 let val = crate::io_processing::resolve_path(input, path);
544 if let Some(seconds) = val.as_u64() {
545 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
546 }
547 return;
548 }
549
550 if let Some(ts_str) = state_def["Timestamp"].as_str() {
551 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
552 let now = Utc::now();
553 let target_utc = target.with_timezone(&chrono::Utc);
554 if target_utc > now {
555 let duration = (target_utc - now).to_std().unwrap_or_default();
556 tokio::time::sleep(duration).await;
557 }
558 }
559 return;
560 }
561
562 if let Some(path) = state_def["TimestampPath"].as_str() {
563 let val = crate::io_processing::resolve_path(input, path);
564 if let Some(ts_str) = val.as_str() {
565 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
566 let now = Utc::now();
567 let target_utc = target.with_timezone(&chrono::Utc);
568 if target_utc > now {
569 let duration = (target_utc - now).to_std().unwrap_or_default();
570 tokio::time::sleep(duration).await;
571 }
572 }
573 }
574 }
575}
576
577fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
579 let input_path = state_def["InputPath"].as_str();
580 let result_path = state_def["ResultPath"].as_str();
581 let output_path = state_def["OutputPath"].as_str();
582
583 let effective_input = if input_path == Some("null") {
584 json!({})
585 } else {
586 apply_input_path(input, input_path)
587 };
588
589 let result = if let Some(r) = state_def.get("Result") {
590 r.clone()
591 } else {
592 effective_input.clone()
593 };
594
595 let after_result = if result_path == Some("null") {
596 input.clone()
597 } else {
598 apply_result_path(input, &result, result_path)
599 };
600
601 if output_path == Some("null") {
602 json!({})
603 } else {
604 apply_output_path(&after_result, output_path)
605 }
606}
607
608async fn execute_task_state(
611 state_def: &Value,
612 input: &Value,
613 delivery: &Option<Arc<DeliveryBus>>,
614 dynamodb_state: &Option<SharedDynamoDbState>,
615 shared_state: &SharedStepFunctionsState,
616 execution_arn: &str,
617 entered_event_id: i64,
618) -> Result<Value, (String, String)> {
619 let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
620
621 let input_path = state_def["InputPath"].as_str();
622 let result_path = state_def["ResultPath"].as_str();
623 let output_path = state_def["OutputPath"].as_str();
624
625 let effective_input = if input_path == Some("null") {
626 json!({})
627 } else {
628 apply_input_path(input, input_path)
629 };
630
631 let task_input = if let Some(params) = state_def.get("Parameters") {
632 apply_parameters(params, &effective_input)
633 } else {
634 effective_input
635 };
636
637 let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
638 let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
639 let mut attempt = 0u32;
640
641 loop {
642 add_event(
643 shared_state,
644 execution_arn,
645 "TaskScheduled",
646 entered_event_id,
647 json!({
648 "resource": resource,
649 "region": "us-east-1",
650 "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
651 }),
652 );
653
654 add_event(
655 shared_state,
656 execution_arn,
657 "TaskStarted",
658 entered_event_id,
659 json!({ "resource": resource }),
660 );
661
662 let invoke_result = invoke_resource(
663 &resource,
664 &task_input,
665 delivery,
666 dynamodb_state,
667 timeout_seconds,
668 )
669 .await;
670
671 match invoke_result {
672 Ok(result) => {
673 add_event(
674 shared_state,
675 execution_arn,
676 "TaskSucceeded",
677 entered_event_id,
678 json!({
679 "resource": resource,
680 "output": serde_json::to_string(&result).unwrap_or_default(),
681 }),
682 );
683
684 let selected = if let Some(selector) = state_def.get("ResultSelector") {
685 apply_parameters(selector, &result)
686 } else {
687 result
688 };
689
690 let after_result = if result_path == Some("null") {
691 input.clone()
692 } else {
693 apply_result_path(input, &selected, result_path)
694 };
695
696 let output = if output_path == Some("null") {
697 json!({})
698 } else {
699 apply_output_path(&after_result, output_path)
700 };
701
702 return Ok(output);
703 }
704 Err((error, cause)) => {
705 add_event(
706 shared_state,
707 execution_arn,
708 "TaskFailed",
709 entered_event_id,
710 json!({ "error": error, "cause": cause }),
711 );
712
713 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
714 attempt += 1;
715 let actual_delay = delay_ms.min(5000);
716 tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
717 continue;
718 }
719
720 return Err((error, cause));
721 }
722 }
723 }
724}
725
726async fn execute_parallel_state(
728 state_def: &Value,
729 input: &Value,
730 delivery: &Option<Arc<DeliveryBus>>,
731 dynamodb_state: &Option<SharedDynamoDbState>,
732 shared_state: &SharedStepFunctionsState,
733 execution_arn: &str,
734) -> Result<Value, (String, String)> {
735 let input_path = state_def["InputPath"].as_str();
736 let result_path = state_def["ResultPath"].as_str();
737 let output_path = state_def["OutputPath"].as_str();
738
739 let effective_input = if input_path == Some("null") {
740 json!({})
741 } else {
742 apply_input_path(input, input_path)
743 };
744
745 let branches = state_def["Branches"]
746 .as_array()
747 .cloned()
748 .unwrap_or_default();
749
750 if branches.is_empty() {
751 return Err((
752 "States.Runtime".to_string(),
753 "Parallel state has no Branches".to_string(),
754 ));
755 }
756
757 let mut handles = Vec::new();
759 for branch_def in &branches {
760 let branch = branch_def.clone();
761 let branch_input = effective_input.clone();
762 let delivery = delivery.clone();
763 let ddb = dynamodb_state.clone();
764 let state = shared_state.clone();
765 let arn = execution_arn.to_string();
766
767 handles.push(tokio::spawn(async move {
768 run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
769 }));
770 }
771
772 let mut results = Vec::with_capacity(handles.len());
774 for handle in handles {
775 let result = handle.await.map_err(|e| {
776 (
777 "States.Runtime".to_string(),
778 format!("Branch execution panicked: {e}"),
779 )
780 })??;
781 results.push(result);
782 }
783
784 let branch_output = Value::Array(results);
785
786 let selected = if let Some(selector) = state_def.get("ResultSelector") {
788 apply_parameters(selector, &branch_output)
789 } else {
790 branch_output
791 };
792
793 let after_result = if result_path == Some("null") {
795 input.clone()
796 } else {
797 apply_result_path(input, &selected, result_path)
798 };
799
800 let output = if output_path == Some("null") {
802 json!({})
803 } else {
804 apply_output_path(&after_result, output_path)
805 };
806
807 Ok(output)
808}
809
810async fn execute_map_state(
812 state_def: &Value,
813 input: &Value,
814 delivery: &Option<Arc<DeliveryBus>>,
815 dynamodb_state: &Option<SharedDynamoDbState>,
816 shared_state: &SharedStepFunctionsState,
817 execution_arn: &str,
818) -> Result<Value, (String, String)> {
819 let input_path = state_def["InputPath"].as_str();
820 let result_path = state_def["ResultPath"].as_str();
821 let output_path = state_def["OutputPath"].as_str();
822
823 let effective_input = if input_path == Some("null") {
824 json!({})
825 } else {
826 apply_input_path(input, input_path)
827 };
828
829 let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
831 let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
832 let items = items_value.as_array().cloned().unwrap_or_default();
833
834 let iterator_def = state_def
836 .get("ItemProcessor")
837 .or_else(|| state_def.get("Iterator"))
838 .cloned()
839 .ok_or_else(|| {
840 (
841 "States.Runtime".to_string(),
842 "Map state has no ItemProcessor or Iterator".to_string(),
843 )
844 })?;
845
846 let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
847 let effective_concurrency = if max_concurrency == 0 {
848 40
849 } else {
850 max_concurrency as usize
851 };
852
853 let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
854
855 let mut handles = Vec::new();
857 for (index, item) in items.into_iter().enumerate() {
858 let iter_def = iterator_def.clone();
859 let delivery = delivery.clone();
860 let ddb = dynamodb_state.clone();
861 let state = shared_state.clone();
862 let arn = execution_arn.to_string();
863 let sem = semaphore.clone();
864
865 let item_input = if let Some(selector) = state_def.get("ItemSelector") {
867 let mut ctx = serde_json::Map::new();
868 ctx.insert("value".to_string(), item.clone());
869 ctx.insert("index".to_string(), json!(index));
870 apply_parameters(selector, &Value::Object(ctx))
871 } else {
872 item
873 };
874
875 add_event(
876 shared_state,
877 execution_arn,
878 "MapIterationStarted",
879 0,
880 json!({ "index": index }),
881 );
882
883 handles.push(tokio::spawn(async move {
884 let _permit = sem
885 .acquire()
886 .await
887 .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
888 let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
889 Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
890 }));
891 }
892
893 let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
895 for handle in handles {
896 let (index, result) = handle.await.map_err(|e| {
897 (
898 "States.Runtime".to_string(),
899 format!("Map iteration panicked: {e}"),
900 )
901 })??;
902
903 match result {
904 Ok(output) => {
905 add_event(
906 shared_state,
907 execution_arn,
908 "MapIterationSucceeded",
909 0,
910 json!({ "index": index }),
911 );
912 results.push((index, output));
913 }
914 Err((error, cause)) => {
915 add_event(
916 shared_state,
917 execution_arn,
918 "MapIterationFailed",
919 0,
920 json!({ "index": index, "error": error }),
921 );
922 return Err((error, cause));
923 }
924 }
925 }
926
927 results.sort_by_key(|(i, _)| *i);
929 let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
930
931 let selected = if let Some(selector) = state_def.get("ResultSelector") {
933 apply_parameters(selector, &map_output)
934 } else {
935 map_output
936 };
937
938 let after_result = if result_path == Some("null") {
940 input.clone()
941 } else {
942 apply_result_path(input, &selected, result_path)
943 };
944
945 let output = if output_path == Some("null") {
947 json!({})
948 } else {
949 apply_output_path(&after_result, output_path)
950 };
951
952 Ok(output)
953}
954
955async fn invoke_resource(
957 resource: &str,
958 input: &Value,
959 delivery: &Option<Arc<DeliveryBus>>,
960 dynamodb_state: &Option<SharedDynamoDbState>,
961 timeout_seconds: Option<u64>,
962) -> Result<Value, (String, String)> {
963 if resource.contains(":lambda:") && resource.contains(":function:") {
965 return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
966 }
967
968 if resource.starts_with("arn:aws:states:::lambda:invoke") {
970 let function_name = input["FunctionName"].as_str().unwrap_or("");
971 let payload = if let Some(p) = input.get("Payload") {
972 p.clone()
973 } else {
974 input.clone()
975 };
976 return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
977 }
978
979 if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
980 return invoke_sqs_send_message(input, delivery);
981 }
982
983 if resource.starts_with("arn:aws:states:::sns:publish") {
984 return invoke_sns_publish(input, delivery);
985 }
986
987 if resource.starts_with("arn:aws:states:::events:putEvents") {
988 return invoke_eventbridge_put_events(input, delivery);
989 }
990
991 if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
992 return invoke_dynamodb_get_item(input, dynamodb_state);
993 }
994
995 if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
996 return invoke_dynamodb_put_item(input, dynamodb_state);
997 }
998
999 if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1000 return invoke_dynamodb_delete_item(input, dynamodb_state);
1001 }
1002
1003 if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1004 return invoke_dynamodb_update_item(input, dynamodb_state);
1005 }
1006
1007 Err((
1008 "States.TaskFailed".to_string(),
1009 format!("Unsupported resource: {resource}"),
1010 ))
1011}
1012
1013fn invoke_sqs_send_message(
1015 input: &Value,
1016 delivery: &Option<Arc<DeliveryBus>>,
1017) -> Result<Value, (String, String)> {
1018 let delivery = delivery.as_ref().ok_or_else(|| {
1019 (
1020 "States.TaskFailed".to_string(),
1021 "No delivery bus configured for SQS".to_string(),
1022 )
1023 })?;
1024
1025 let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1026 (
1027 "States.TaskFailed".to_string(),
1028 "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1029 )
1030 })?;
1031
1032 let message_body = input["MessageBody"]
1033 .as_str()
1034 .map(|s| s.to_string())
1035 .unwrap_or_else(|| {
1036 serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1038 });
1039
1040 let queue_arn = queue_url_to_arn(queue_url);
1044
1045 delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1046
1047 Ok(json!({
1048 "MessageId": uuid::Uuid::new_v4().to_string(),
1049 "MD5OfMessageBody": md5_hex(&message_body),
1050 }))
1051}
1052
1053fn invoke_sns_publish(
1055 input: &Value,
1056 delivery: &Option<Arc<DeliveryBus>>,
1057) -> Result<Value, (String, String)> {
1058 let delivery = delivery.as_ref().ok_or_else(|| {
1059 (
1060 "States.TaskFailed".to_string(),
1061 "No delivery bus configured for SNS".to_string(),
1062 )
1063 })?;
1064
1065 let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1066 (
1067 "States.TaskFailed".to_string(),
1068 "Missing TopicArn in SNS publish parameters".to_string(),
1069 )
1070 })?;
1071
1072 let message = input["Message"]
1073 .as_str()
1074 .map(|s| s.to_string())
1075 .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1076
1077 let subject = input["Subject"].as_str();
1078
1079 delivery.publish_to_sns(topic_arn, &message, subject);
1080
1081 Ok(json!({
1082 "MessageId": uuid::Uuid::new_v4().to_string(),
1083 }))
1084}
1085
1086fn invoke_eventbridge_put_events(
1088 input: &Value,
1089 delivery: &Option<Arc<DeliveryBus>>,
1090) -> Result<Value, (String, String)> {
1091 let delivery = delivery.as_ref().ok_or_else(|| {
1092 (
1093 "States.TaskFailed".to_string(),
1094 "No delivery bus configured for EventBridge".to_string(),
1095 )
1096 })?;
1097
1098 let entries = input["Entries"]
1099 .as_array()
1100 .ok_or_else(|| {
1101 (
1102 "States.TaskFailed".to_string(),
1103 "Missing Entries in EventBridge putEvents parameters".to_string(),
1104 )
1105 })?
1106 .clone();
1107
1108 let mut event_ids = Vec::new();
1109 for entry in &entries {
1110 let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1111 let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1112 let detail = entry["Detail"]
1113 .as_str()
1114 .map(|s| s.to_string())
1115 .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1116 let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1117
1118 delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1119 event_ids.push(uuid::Uuid::new_v4().to_string());
1120 }
1121
1122 Ok(json!({
1123 "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1124 "FailedEntryCount": 0,
1125 }))
1126}
1127
1128fn invoke_dynamodb_get_item(
1130 input: &Value,
1131 dynamodb_state: &Option<SharedDynamoDbState>,
1132) -> Result<Value, (String, String)> {
1133 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1134 (
1135 "States.TaskFailed".to_string(),
1136 "No DynamoDB state configured".to_string(),
1137 )
1138 })?;
1139
1140 let table_name = input["TableName"].as_str().ok_or_else(|| {
1141 (
1142 "States.TaskFailed".to_string(),
1143 "Missing TableName in DynamoDB getItem parameters".to_string(),
1144 )
1145 })?;
1146
1147 let key = input
1148 .get("Key")
1149 .and_then(|k| k.as_object())
1150 .ok_or_else(|| {
1151 (
1152 "States.TaskFailed".to_string(),
1153 "Missing Key in DynamoDB getItem parameters".to_string(),
1154 )
1155 })?;
1156
1157 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1158
1159 let state = ddb.read();
1160 let table = state.tables.get(table_name).ok_or_else(|| {
1161 (
1162 "States.TaskFailed".to_string(),
1163 format!("Table '{table_name}' not found"),
1164 )
1165 })?;
1166
1167 let item = table
1168 .find_item_index(&key_map)
1169 .map(|idx| table.items[idx].clone());
1170
1171 match item {
1172 Some(item_map) => {
1173 let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1174 Ok(json!({ "Item": item_value }))
1175 }
1176 None => Ok(json!({})),
1177 }
1178}
1179
1180fn invoke_dynamodb_put_item(
1182 input: &Value,
1183 dynamodb_state: &Option<SharedDynamoDbState>,
1184) -> Result<Value, (String, String)> {
1185 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1186 (
1187 "States.TaskFailed".to_string(),
1188 "No DynamoDB state configured".to_string(),
1189 )
1190 })?;
1191
1192 let table_name = input["TableName"].as_str().ok_or_else(|| {
1193 (
1194 "States.TaskFailed".to_string(),
1195 "Missing TableName in DynamoDB putItem parameters".to_string(),
1196 )
1197 })?;
1198
1199 let item = input
1200 .get("Item")
1201 .and_then(|i| i.as_object())
1202 .ok_or_else(|| {
1203 (
1204 "States.TaskFailed".to_string(),
1205 "Missing Item in DynamoDB putItem parameters".to_string(),
1206 )
1207 })?;
1208
1209 let item_map: HashMap<String, Value> =
1210 item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1211
1212 let mut state = ddb.write();
1213 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1214 (
1215 "States.TaskFailed".to_string(),
1216 format!("Table '{table_name}' not found"),
1217 )
1218 })?;
1219
1220 if let Some(idx) = table.find_item_index(&item_map) {
1222 table.items[idx] = item_map;
1223 } else {
1224 table.items.push(item_map);
1225 }
1226
1227 Ok(json!({}))
1228}
1229
1230fn invoke_dynamodb_delete_item(
1232 input: &Value,
1233 dynamodb_state: &Option<SharedDynamoDbState>,
1234) -> Result<Value, (String, String)> {
1235 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1236 (
1237 "States.TaskFailed".to_string(),
1238 "No DynamoDB state configured".to_string(),
1239 )
1240 })?;
1241
1242 let table_name = input["TableName"].as_str().ok_or_else(|| {
1243 (
1244 "States.TaskFailed".to_string(),
1245 "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1246 )
1247 })?;
1248
1249 let key = input
1250 .get("Key")
1251 .and_then(|k| k.as_object())
1252 .ok_or_else(|| {
1253 (
1254 "States.TaskFailed".to_string(),
1255 "Missing Key in DynamoDB deleteItem parameters".to_string(),
1256 )
1257 })?;
1258
1259 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1260
1261 let mut state = ddb.write();
1262 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1263 (
1264 "States.TaskFailed".to_string(),
1265 format!("Table '{table_name}' not found"),
1266 )
1267 })?;
1268
1269 if let Some(idx) = table.find_item_index(&key_map) {
1270 table.items.remove(idx);
1271 }
1272
1273 Ok(json!({}))
1274}
1275
1276fn invoke_dynamodb_update_item(
1279 input: &Value,
1280 dynamodb_state: &Option<SharedDynamoDbState>,
1281) -> Result<Value, (String, String)> {
1282 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1283 (
1284 "States.TaskFailed".to_string(),
1285 "No DynamoDB state configured".to_string(),
1286 )
1287 })?;
1288
1289 let table_name = input["TableName"].as_str().ok_or_else(|| {
1290 (
1291 "States.TaskFailed".to_string(),
1292 "Missing TableName in DynamoDB updateItem parameters".to_string(),
1293 )
1294 })?;
1295
1296 let key = input
1297 .get("Key")
1298 .and_then(|k| k.as_object())
1299 .ok_or_else(|| {
1300 (
1301 "States.TaskFailed".to_string(),
1302 "Missing Key in DynamoDB updateItem parameters".to_string(),
1303 )
1304 })?;
1305
1306 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1307
1308 let mut state = ddb.write();
1309 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1310 (
1311 "States.TaskFailed".to_string(),
1312 format!("Table '{table_name}' not found"),
1313 )
1314 })?;
1315
1316 if let Some(update_expr) = input["UpdateExpression"].as_str() {
1318 let attr_values = input
1319 .get("ExpressionAttributeValues")
1320 .and_then(|v| v.as_object())
1321 .cloned()
1322 .unwrap_or_default();
1323 let attr_names = input
1324 .get("ExpressionAttributeNames")
1325 .and_then(|v| v.as_object())
1326 .cloned()
1327 .unwrap_or_default();
1328
1329 if let Some(idx) = table.find_item_index(&key_map) {
1330 apply_update_expression(
1331 &mut table.items[idx],
1332 update_expr,
1333 &attr_values,
1334 &attr_names,
1335 );
1336 } else {
1337 let mut new_item = key_map;
1339 apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1340 table.items.push(new_item);
1341 }
1342 }
1343
1344 Ok(json!({}))
1345}
1346
1347fn apply_update_expression(
1349 item: &mut HashMap<String, Value>,
1350 expr: &str,
1351 attr_values: &serde_json::Map<String, Value>,
1352 attr_names: &serde_json::Map<String, Value>,
1353) {
1354 let trimmed = expr.trim();
1357 let set_part = if trimmed.len() >= 4 && trimmed[..4].eq_ignore_ascii_case("SET ") {
1358 &trimmed[4..]
1359 } else {
1360 trimmed
1361 };
1362
1363 for assignment in set_part.split(',') {
1364 let parts: Vec<&str> = assignment.splitn(2, '=').collect();
1365 if parts.len() == 2 {
1366 let attr_ref = parts[0].trim();
1367 let val_ref = parts[1].trim();
1368
1369 let attr_name = if attr_ref.starts_with('#') {
1371 attr_names
1372 .get(attr_ref)
1373 .and_then(|v| v.as_str())
1374 .unwrap_or(attr_ref)
1375 .to_string()
1376 } else {
1377 attr_ref.to_string()
1378 };
1379
1380 if val_ref.starts_with(':') {
1382 if let Some(val) = attr_values.get(val_ref) {
1383 item.insert(attr_name, val.clone());
1384 }
1385 }
1386 }
1387 }
1388}
1389
1390fn queue_url_to_arn(url: &str) -> String {
1393 let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1394 if parts.len() >= 2 {
1395 let queue_name = parts[0];
1396 let account_id = parts[1];
1397 format!("arn:aws:sqs:us-east-1:{account_id}:{queue_name}")
1398 } else {
1399 url.to_string()
1400 }
1401}
1402
1403fn md5_hex(data: &str) -> String {
1405 use md5::Digest;
1406 let result = md5::Md5::digest(data.as_bytes());
1407 format!("{result:032x}")
1408}
1409
1410async fn invoke_lambda_direct(
1412 function_arn: &str,
1413 input: &Value,
1414 delivery: &Option<Arc<DeliveryBus>>,
1415 timeout_seconds: Option<u64>,
1416) -> Result<Value, (String, String)> {
1417 let delivery = delivery.as_ref().ok_or_else(|| {
1418 (
1419 "States.TaskFailed".to_string(),
1420 "No delivery bus configured for Lambda invocation".to_string(),
1421 )
1422 })?;
1423
1424 let payload = serde_json::to_string(input).unwrap_or_default();
1425
1426 let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1427
1428 let result = if let Some(timeout) = timeout_seconds {
1429 match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1430 Ok(r) => r,
1431 Err(_) => {
1432 return Err((
1433 "States.Timeout".to_string(),
1434 format!("Task timed out after {timeout} seconds"),
1435 ));
1436 }
1437 }
1438 } else {
1439 invoke_future.await
1440 };
1441
1442 match result {
1443 Some(Ok(bytes)) => {
1444 let response_str = String::from_utf8_lossy(&bytes);
1445 let value: Value =
1446 serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1447 Ok(value)
1448 }
1449 Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1450 None => {
1451 Ok(json!({}))
1453 }
1454 }
1455}
1456
1457fn apply_parameters(template: &Value, input: &Value) -> Value {
1459 match template {
1460 Value::Object(map) => {
1461 let mut result = serde_json::Map::new();
1462 for (key, value) in map {
1463 if let Some(stripped) = key.strip_suffix(".$") {
1464 if let Some(path) = value.as_str() {
1465 result.insert(
1466 stripped.to_string(),
1467 crate::io_processing::resolve_path(input, path),
1468 );
1469 }
1470 } else {
1471 result.insert(key.clone(), apply_parameters(value, input));
1472 }
1473 }
1474 Value::Object(result)
1475 }
1476 Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1477 other => other.clone(),
1478 }
1479}
1480
1481enum NextState {
1482 Name(String),
1483 End,
1484 Error(String),
1485}
1486
1487fn next_state(state_def: &Value) -> NextState {
1488 if state_def["End"].as_bool() == Some(true) {
1489 return NextState::End;
1490 }
1491 match state_def["Next"].as_str() {
1492 Some(next) => NextState::Name(next.to_string()),
1493 None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
1494 }
1495}
1496
1497fn add_event(
1498 state: &SharedStepFunctionsState,
1499 execution_arn: &str,
1500 event_type: &str,
1501 previous_event_id: i64,
1502 details: Value,
1503) -> i64 {
1504 let mut s = state.write();
1505 if let Some(exec) = s.executions.get_mut(execution_arn) {
1506 let id = exec.history_events.len() as i64 + 1;
1507 exec.history_events.push(HistoryEvent {
1508 id,
1509 event_type: event_type.to_string(),
1510 timestamp: Utc::now(),
1511 previous_event_id,
1512 details,
1513 });
1514 id
1515 } else {
1516 0
1517 }
1518}
1519
1520fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
1521 let output_str = serde_json::to_string(output).unwrap_or_default();
1522
1523 add_event(
1524 state,
1525 execution_arn,
1526 "ExecutionSucceeded",
1527 0,
1528 json!({ "output": output_str }),
1529 );
1530
1531 let mut s = state.write();
1532 if let Some(exec) = s.executions.get_mut(execution_arn) {
1533 exec.status = ExecutionStatus::Succeeded;
1534 exec.output = Some(output_str);
1535 exec.stop_date = Some(Utc::now());
1536 }
1537}
1538
1539fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
1540 add_event(
1541 state,
1542 execution_arn,
1543 "ExecutionFailed",
1544 0,
1545 json!({ "error": error, "cause": cause }),
1546 );
1547
1548 let mut s = state.write();
1549 if let Some(exec) = s.executions.get_mut(execution_arn) {
1550 exec.status = ExecutionStatus::Failed;
1551 exec.error = Some(error.to_string());
1552 exec.cause = Some(cause.to_string());
1553 exec.stop_date = Some(Utc::now());
1554 }
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559 use super::*;
1560 use crate::state::{Execution, StepFunctionsState};
1561 use parking_lot::RwLock;
1562 use std::sync::Arc;
1563
1564 fn make_state() -> SharedStepFunctionsState {
1565 Arc::new(RwLock::new(StepFunctionsState::new(
1566 "123456789012",
1567 "us-east-1",
1568 )))
1569 }
1570
1571 fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
1572 let mut s = state.write();
1573 s.executions.insert(
1574 arn.to_string(),
1575 Execution {
1576 execution_arn: arn.to_string(),
1577 state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
1578 .to_string(),
1579 state_machine_name: "test".to_string(),
1580 name: "exec-1".to_string(),
1581 status: ExecutionStatus::Running,
1582 input,
1583 output: None,
1584 start_date: Utc::now(),
1585 stop_date: None,
1586 error: None,
1587 cause: None,
1588 history_events: vec![],
1589 },
1590 );
1591 }
1592
1593 #[tokio::test]
1594 async fn test_simple_pass_state() {
1595 let state = make_state();
1596 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1597 create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
1598
1599 let definition = json!({
1600 "StartAt": "PassState",
1601 "States": {
1602 "PassState": {
1603 "Type": "Pass",
1604 "Result": {"processed": true},
1605 "End": true
1606 }
1607 }
1608 })
1609 .to_string();
1610
1611 execute_state_machine(
1612 state.clone(),
1613 arn.to_string(),
1614 definition,
1615 Some(r#"{"hello":"world"}"#.to_string()),
1616 None,
1617 None,
1618 )
1619 .await;
1620
1621 let s = state.read();
1622 let exec = s.executions.get(arn).unwrap();
1623 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1624 assert!(exec.output.is_some());
1625 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1626 assert_eq!(output, json!({"processed": true}));
1627 }
1628
1629 #[tokio::test]
1630 async fn test_pass_chain() {
1631 let state = make_state();
1632 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1633 create_execution(&state, arn, Some(r#"{}"#.to_string()));
1634
1635 let definition = json!({
1636 "StartAt": "First",
1637 "States": {
1638 "First": {
1639 "Type": "Pass",
1640 "Result": "step1",
1641 "ResultPath": "$.first",
1642 "Next": "Second"
1643 },
1644 "Second": {
1645 "Type": "Pass",
1646 "Result": "step2",
1647 "ResultPath": "$.second",
1648 "End": true
1649 }
1650 }
1651 })
1652 .to_string();
1653
1654 execute_state_machine(
1655 state.clone(),
1656 arn.to_string(),
1657 definition,
1658 Some("{}".to_string()),
1659 None,
1660 None,
1661 )
1662 .await;
1663
1664 let s = state.read();
1665 let exec = s.executions.get(arn).unwrap();
1666 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1667 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1668 assert_eq!(output["first"], json!("step1"));
1669 assert_eq!(output["second"], json!("step2"));
1670 }
1671
1672 #[tokio::test]
1673 async fn test_succeed_state() {
1674 let state = make_state();
1675 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1676 create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
1677
1678 let definition = json!({
1679 "StartAt": "Done",
1680 "States": {
1681 "Done": {
1682 "Type": "Succeed"
1683 }
1684 }
1685 })
1686 .to_string();
1687
1688 execute_state_machine(
1689 state.clone(),
1690 arn.to_string(),
1691 definition,
1692 Some(r#"{"data": "value"}"#.to_string()),
1693 None,
1694 None,
1695 )
1696 .await;
1697
1698 let s = state.read();
1699 let exec = s.executions.get(arn).unwrap();
1700 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1701 }
1702
1703 #[tokio::test]
1704 async fn test_fail_state() {
1705 let state = make_state();
1706 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1707 create_execution(&state, arn, None);
1708
1709 let definition = json!({
1710 "StartAt": "FailState",
1711 "States": {
1712 "FailState": {
1713 "Type": "Fail",
1714 "Error": "CustomError",
1715 "Cause": "Something went wrong"
1716 }
1717 }
1718 })
1719 .to_string();
1720
1721 execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
1722
1723 let s = state.read();
1724 let exec = s.executions.get(arn).unwrap();
1725 assert_eq!(exec.status, ExecutionStatus::Failed);
1726 assert_eq!(exec.error.as_deref(), Some("CustomError"));
1727 assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
1728 }
1729
1730 #[tokio::test]
1731 async fn test_history_events_recorded() {
1732 let state = make_state();
1733 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1734 create_execution(&state, arn, Some("{}".to_string()));
1735
1736 let definition = json!({
1737 "StartAt": "PassState",
1738 "States": {
1739 "PassState": {
1740 "Type": "Pass",
1741 "End": true
1742 }
1743 }
1744 })
1745 .to_string();
1746
1747 execute_state_machine(
1748 state.clone(),
1749 arn.to_string(),
1750 definition,
1751 Some("{}".to_string()),
1752 None,
1753 None,
1754 )
1755 .await;
1756
1757 let s = state.read();
1758 let exec = s.executions.get(arn).unwrap();
1759 let event_types: Vec<&str> = exec
1760 .history_events
1761 .iter()
1762 .map(|e| e.event_type.as_str())
1763 .collect();
1764 assert_eq!(
1765 event_types,
1766 vec![
1767 "ExecutionStarted",
1768 "PassStateEntered",
1769 "PassStateExited",
1770 "ExecutionSucceeded"
1771 ]
1772 );
1773 }
1774}