1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_core::delivery::DeliveryBus;
9use fakecloud_dynamodb::state::SharedDynamoDbState;
10
11use crate::choice::evaluate_choice;
12use crate::error_handling::{find_catcher, should_retry};
13use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
14use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
15
16pub async fn execute_state_machine(
19 state: SharedStepFunctionsState,
20 execution_arn: String,
21 definition: String,
22 input: Option<String>,
23 delivery: Option<Arc<DeliveryBus>>,
24 dynamodb_state: Option<SharedDynamoDbState>,
25) {
26 let def: Value = match serde_json::from_str(&definition) {
27 Ok(v) => v,
28 Err(e) => {
29 fail_execution(
30 &state,
31 &execution_arn,
32 "States.Runtime",
33 &format!("Failed to parse definition: {e}"),
34 );
35 return;
36 }
37 };
38
39 let raw_input: Value = input
40 .as_deref()
41 .and_then(|s| serde_json::from_str(s).ok())
42 .unwrap_or(json!({}));
43
44 add_event(
46 &state,
47 &execution_arn,
48 "ExecutionStarted",
49 0,
50 json!({
51 "input": serde_json::to_string(&raw_input).unwrap_or_default(),
52 "roleArn": "arn:aws:iam::123456789012:role/execution-role"
53 }),
54 );
55
56 match run_states(
57 &def,
58 raw_input,
59 &delivery,
60 &dynamodb_state,
61 &state,
62 &execution_arn,
63 )
64 .await
65 {
66 Ok(output) => {
67 succeed_execution(&state, &execution_arn, &output);
68 }
69 Err((error, cause)) => {
70 fail_execution(&state, &execution_arn, &error, &cause);
71 }
72 }
73}
74
75type StatesResult<'a> = std::pin::Pin<
76 Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
77>;
78
79fn run_states<'a>(
82 def: &'a Value,
83 input: Value,
84 delivery: &'a Option<Arc<DeliveryBus>>,
85 dynamodb_state: &'a Option<SharedDynamoDbState>,
86 shared_state: &'a SharedStepFunctionsState,
87 execution_arn: &'a str,
88) -> StatesResult<'a> {
89 Box::pin(async move {
90 let start_at = def["StartAt"]
91 .as_str()
92 .ok_or_else(|| {
93 (
94 "States.Runtime".to_string(),
95 "Missing StartAt in definition".to_string(),
96 )
97 })?
98 .to_string();
99
100 let states = def.get("States").ok_or_else(|| {
101 (
102 "States.Runtime".to_string(),
103 "Missing States in definition".to_string(),
104 )
105 })?;
106
107 let mut current_state = start_at;
108 let mut effective_input = input;
109 let mut iteration = 0;
110 let max_iterations = 500;
111
112 loop {
113 iteration += 1;
114 if iteration > max_iterations {
115 return Err((
116 "States.Runtime".to_string(),
117 "Maximum number of state transitions exceeded".to_string(),
118 ));
119 }
120
121 let state_def = states.get(¤t_state).cloned().ok_or_else(|| {
122 (
123 "States.Runtime".to_string(),
124 format!("State '{current_state}' not found in definition"),
125 )
126 })?;
127
128 let state_type = state_def["Type"]
129 .as_str()
130 .ok_or_else(|| {
131 (
132 "States.Runtime".to_string(),
133 format!("State '{current_state}' missing Type field"),
134 )
135 })?
136 .to_string();
137
138 debug!(
139 execution_arn = %execution_arn,
140 state = %current_state,
141 state_type = %state_type,
142 "Executing state"
143 );
144
145 match state_type.as_str() {
146 "Pass" => {
147 let entered_event_id = add_event(
148 shared_state,
149 execution_arn,
150 "PassStateEntered",
151 0,
152 json!({
153 "name": current_state,
154 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
155 }),
156 );
157
158 let result = execute_pass_state(&state_def, &effective_input);
159
160 add_event(
161 shared_state,
162 execution_arn,
163 "PassStateExited",
164 entered_event_id,
165 json!({
166 "name": current_state,
167 "output": serde_json::to_string(&result).unwrap_or_default(),
168 }),
169 );
170
171 effective_input = result;
172
173 match next_state(&state_def) {
174 NextState::Name(next) => current_state = next,
175 NextState::End => return Ok(effective_input),
176 NextState::Error(msg) => {
177 return Err(("States.Runtime".to_string(), msg));
178 }
179 }
180 }
181
182 "Succeed" => {
183 add_event(
184 shared_state,
185 execution_arn,
186 "SucceedStateEntered",
187 0,
188 json!({
189 "name": current_state,
190 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
191 }),
192 );
193
194 let input_path = state_def["InputPath"].as_str();
195 let output_path = state_def["OutputPath"].as_str();
196
197 let processed = if input_path == Some("null") {
198 json!({})
199 } else {
200 apply_input_path(&effective_input, input_path)
201 };
202
203 let output = if output_path == Some("null") {
204 json!({})
205 } else {
206 apply_output_path(&processed, output_path)
207 };
208
209 add_event(
210 shared_state,
211 execution_arn,
212 "SucceedStateExited",
213 0,
214 json!({
215 "name": current_state,
216 "output": serde_json::to_string(&output).unwrap_or_default(),
217 }),
218 );
219
220 return Ok(output);
221 }
222
223 "Fail" => {
224 let error = state_def["Error"]
225 .as_str()
226 .unwrap_or("States.Fail")
227 .to_string();
228 let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
229
230 add_event(
231 shared_state,
232 execution_arn,
233 "FailStateEntered",
234 0,
235 json!({
236 "name": current_state,
237 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
238 }),
239 );
240
241 return Err((error, cause));
242 }
243
244 "Task" => {
245 let entered_event_id = add_event(
246 shared_state,
247 execution_arn,
248 "TaskStateEntered",
249 0,
250 json!({
251 "name": current_state,
252 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
253 }),
254 );
255
256 let result = execute_task_state(
257 &state_def,
258 &effective_input,
259 delivery,
260 dynamodb_state,
261 shared_state,
262 execution_arn,
263 entered_event_id,
264 )
265 .await;
266
267 match result {
268 Ok(output) => {
269 add_event(
270 shared_state,
271 execution_arn,
272 "TaskStateExited",
273 entered_event_id,
274 json!({
275 "name": current_state,
276 "output": serde_json::to_string(&output).unwrap_or_default(),
277 }),
278 );
279
280 effective_input = output;
281
282 match next_state(&state_def) {
283 NextState::Name(next) => current_state = next,
284 NextState::End => return Ok(effective_input),
285 NextState::Error(msg) => {
286 return Err(("States.Runtime".to_string(), msg));
287 }
288 }
289 }
290 Err((error, cause)) => {
291 let catchers =
292 state_def["Catch"].as_array().cloned().unwrap_or_default();
293
294 if let Some((next, result_path)) = find_catcher(&catchers, &error) {
295 let error_output = json!({
296 "Error": error,
297 "Cause": cause,
298 });
299 effective_input = apply_result_path(
300 &effective_input,
301 &error_output,
302 result_path.as_deref(),
303 );
304 current_state = next;
305 } else {
306 return Err((error, cause));
307 }
308 }
309 }
310 }
311
312 "Choice" => {
313 let entered_event_id = add_event(
314 shared_state,
315 execution_arn,
316 "ChoiceStateEntered",
317 0,
318 json!({
319 "name": current_state,
320 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
321 }),
322 );
323
324 let input_path = state_def["InputPath"].as_str();
325 let processed_input = if input_path == Some("null") {
326 json!({})
327 } else {
328 apply_input_path(&effective_input, input_path)
329 };
330
331 match evaluate_choice(&state_def, &processed_input) {
332 Some(next) => {
333 add_event(
334 shared_state,
335 execution_arn,
336 "ChoiceStateExited",
337 entered_event_id,
338 json!({
339 "name": current_state,
340 "output": serde_json::to_string(&effective_input).unwrap_or_default(),
341 }),
342 );
343 current_state = next;
344 }
345 None => {
346 return Err((
347 "States.NoChoiceMatched".to_string(),
348 format!(
349 "No choice rule matched and no Default in state '{current_state}'"
350 ),
351 ));
352 }
353 }
354 }
355
356 "Wait" => {
357 let entered_event_id = add_event(
358 shared_state,
359 execution_arn,
360 "WaitStateEntered",
361 0,
362 json!({
363 "name": current_state,
364 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
365 }),
366 );
367
368 execute_wait_state(&state_def, &effective_input).await;
369
370 add_event(
371 shared_state,
372 execution_arn,
373 "WaitStateExited",
374 entered_event_id,
375 json!({
376 "name": current_state,
377 "output": serde_json::to_string(&effective_input).unwrap_or_default(),
378 }),
379 );
380
381 match next_state(&state_def) {
382 NextState::Name(next) => current_state = next,
383 NextState::End => return Ok(effective_input),
384 NextState::Error(msg) => {
385 return Err(("States.Runtime".to_string(), msg));
386 }
387 }
388 }
389
390 "Parallel" => {
391 let entered_event_id = add_event(
392 shared_state,
393 execution_arn,
394 "ParallelStateEntered",
395 0,
396 json!({
397 "name": current_state,
398 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
399 }),
400 );
401
402 let result = execute_parallel_state(
403 &state_def,
404 &effective_input,
405 delivery,
406 dynamodb_state,
407 shared_state,
408 execution_arn,
409 )
410 .await;
411
412 match result {
413 Ok(output) => {
414 add_event(
415 shared_state,
416 execution_arn,
417 "ParallelStateExited",
418 entered_event_id,
419 json!({
420 "name": current_state,
421 "output": serde_json::to_string(&output).unwrap_or_default(),
422 }),
423 );
424
425 effective_input = output;
426
427 match next_state(&state_def) {
428 NextState::Name(next) => current_state = next,
429 NextState::End => return Ok(effective_input),
430 NextState::Error(msg) => {
431 return Err(("States.Runtime".to_string(), msg));
432 }
433 }
434 }
435 Err((error, cause)) => {
436 let catchers =
437 state_def["Catch"].as_array().cloned().unwrap_or_default();
438
439 if let Some((next, result_path)) = find_catcher(&catchers, &error) {
440 let error_output = json!({
441 "Error": error,
442 "Cause": cause,
443 });
444 effective_input = apply_result_path(
445 &effective_input,
446 &error_output,
447 result_path.as_deref(),
448 );
449 current_state = next;
450 } else {
451 return Err((error, cause));
452 }
453 }
454 }
455 }
456
457 "Map" => {
458 let entered_event_id = add_event(
459 shared_state,
460 execution_arn,
461 "MapStateEntered",
462 0,
463 json!({
464 "name": current_state,
465 "input": serde_json::to_string(&effective_input).unwrap_or_default(),
466 }),
467 );
468
469 let result = execute_map_state(
470 &state_def,
471 &effective_input,
472 delivery,
473 dynamodb_state,
474 shared_state,
475 execution_arn,
476 )
477 .await;
478
479 match result {
480 Ok(output) => {
481 add_event(
482 shared_state,
483 execution_arn,
484 "MapStateExited",
485 entered_event_id,
486 json!({
487 "name": current_state,
488 "output": serde_json::to_string(&output).unwrap_or_default(),
489 }),
490 );
491
492 effective_input = output;
493
494 match next_state(&state_def) {
495 NextState::Name(next) => current_state = next,
496 NextState::End => return Ok(effective_input),
497 NextState::Error(msg) => {
498 return Err(("States.Runtime".to_string(), msg));
499 }
500 }
501 }
502 Err((error, cause)) => {
503 let catchers =
504 state_def["Catch"].as_array().cloned().unwrap_or_default();
505
506 if let Some((next, result_path)) = find_catcher(&catchers, &error) {
507 let error_output = json!({
508 "Error": error,
509 "Cause": cause,
510 });
511 effective_input = apply_result_path(
512 &effective_input,
513 &error_output,
514 result_path.as_deref(),
515 );
516 current_state = next;
517 } else {
518 return Err((error, cause));
519 }
520 }
521 }
522 }
523
524 other => {
525 return Err((
526 "States.Runtime".to_string(),
527 format!("Unsupported state type: '{other}'"),
528 ));
529 }
530 }
531 }
532 })
533}
534
535async fn execute_wait_state(state_def: &Value, input: &Value) {
537 if let Some(seconds) = state_def["Seconds"].as_u64() {
538 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
539 return;
540 }
541
542 if let Some(path) = state_def["SecondsPath"].as_str() {
543 let val = crate::io_processing::resolve_path(input, path);
544 if let Some(seconds) = val.as_u64() {
545 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
546 }
547 return;
548 }
549
550 if let Some(ts_str) = state_def["Timestamp"].as_str() {
551 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
552 let now = Utc::now();
553 let target_utc = target.with_timezone(&chrono::Utc);
554 if target_utc > now {
555 let duration = (target_utc - now).to_std().unwrap_or_default();
556 tokio::time::sleep(duration).await;
557 }
558 }
559 return;
560 }
561
562 if let Some(path) = state_def["TimestampPath"].as_str() {
563 let val = crate::io_processing::resolve_path(input, path);
564 if let Some(ts_str) = val.as_str() {
565 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
566 let now = Utc::now();
567 let target_utc = target.with_timezone(&chrono::Utc);
568 if target_utc > now {
569 let duration = (target_utc - now).to_std().unwrap_or_default();
570 tokio::time::sleep(duration).await;
571 }
572 }
573 }
574 return;
575 }
576
577 warn!(
578 "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
579 );
580}
581
582fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
584 let input_path = state_def["InputPath"].as_str();
585 let result_path = state_def["ResultPath"].as_str();
586 let output_path = state_def["OutputPath"].as_str();
587
588 let effective_input = if input_path == Some("null") {
589 json!({})
590 } else {
591 apply_input_path(input, input_path)
592 };
593
594 let result = if let Some(r) = state_def.get("Result") {
595 r.clone()
596 } else {
597 effective_input.clone()
598 };
599
600 let after_result = if result_path == Some("null") {
601 input.clone()
602 } else {
603 apply_result_path(input, &result, result_path)
604 };
605
606 if output_path == Some("null") {
607 json!({})
608 } else {
609 apply_output_path(&after_result, output_path)
610 }
611}
612
613async fn execute_task_state(
616 state_def: &Value,
617 input: &Value,
618 delivery: &Option<Arc<DeliveryBus>>,
619 dynamodb_state: &Option<SharedDynamoDbState>,
620 shared_state: &SharedStepFunctionsState,
621 execution_arn: &str,
622 entered_event_id: i64,
623) -> Result<Value, (String, String)> {
624 let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
625
626 let input_path = state_def["InputPath"].as_str();
627 let result_path = state_def["ResultPath"].as_str();
628 let output_path = state_def["OutputPath"].as_str();
629
630 let effective_input = if input_path == Some("null") {
631 json!({})
632 } else {
633 apply_input_path(input, input_path)
634 };
635
636 let task_input = if let Some(params) = state_def.get("Parameters") {
637 apply_parameters(params, &effective_input)
638 } else {
639 effective_input
640 };
641
642 let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
643 let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
644 let mut attempt = 0u32;
645
646 loop {
647 add_event(
648 shared_state,
649 execution_arn,
650 "TaskScheduled",
651 entered_event_id,
652 json!({
653 "resource": resource,
654 "region": "us-east-1",
655 "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
656 }),
657 );
658
659 add_event(
660 shared_state,
661 execution_arn,
662 "TaskStarted",
663 entered_event_id,
664 json!({ "resource": resource }),
665 );
666
667 let invoke_result = invoke_resource(
668 &resource,
669 &task_input,
670 delivery,
671 dynamodb_state,
672 timeout_seconds,
673 )
674 .await;
675
676 match invoke_result {
677 Ok(result) => {
678 add_event(
679 shared_state,
680 execution_arn,
681 "TaskSucceeded",
682 entered_event_id,
683 json!({
684 "resource": resource,
685 "output": serde_json::to_string(&result).unwrap_or_default(),
686 }),
687 );
688
689 let selected = if let Some(selector) = state_def.get("ResultSelector") {
690 apply_parameters(selector, &result)
691 } else {
692 result
693 };
694
695 let after_result = if result_path == Some("null") {
696 input.clone()
697 } else {
698 apply_result_path(input, &selected, result_path)
699 };
700
701 let output = if output_path == Some("null") {
702 json!({})
703 } else {
704 apply_output_path(&after_result, output_path)
705 };
706
707 return Ok(output);
708 }
709 Err((error, cause)) => {
710 add_event(
711 shared_state,
712 execution_arn,
713 "TaskFailed",
714 entered_event_id,
715 json!({ "error": error, "cause": cause }),
716 );
717
718 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
719 attempt += 1;
720 let actual_delay = delay_ms.min(5000);
721 tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
722 continue;
723 }
724
725 return Err((error, cause));
726 }
727 }
728 }
729}
730
731async fn execute_parallel_state(
733 state_def: &Value,
734 input: &Value,
735 delivery: &Option<Arc<DeliveryBus>>,
736 dynamodb_state: &Option<SharedDynamoDbState>,
737 shared_state: &SharedStepFunctionsState,
738 execution_arn: &str,
739) -> Result<Value, (String, String)> {
740 let input_path = state_def["InputPath"].as_str();
741 let result_path = state_def["ResultPath"].as_str();
742 let output_path = state_def["OutputPath"].as_str();
743
744 let effective_input = if input_path == Some("null") {
745 json!({})
746 } else {
747 apply_input_path(input, input_path)
748 };
749
750 let branches = state_def["Branches"]
751 .as_array()
752 .cloned()
753 .unwrap_or_default();
754
755 if branches.is_empty() {
756 return Err((
757 "States.Runtime".to_string(),
758 "Parallel state has no Branches".to_string(),
759 ));
760 }
761
762 let mut handles = Vec::new();
764 for branch_def in &branches {
765 let branch = branch_def.clone();
766 let branch_input = effective_input.clone();
767 let delivery = delivery.clone();
768 let ddb = dynamodb_state.clone();
769 let state = shared_state.clone();
770 let arn = execution_arn.to_string();
771
772 handles.push(tokio::spawn(async move {
773 run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
774 }));
775 }
776
777 let mut results = Vec::with_capacity(handles.len());
779 for handle in handles {
780 let result = handle.await.map_err(|e| {
781 (
782 "States.Runtime".to_string(),
783 format!("Branch execution panicked: {e}"),
784 )
785 })??;
786 results.push(result);
787 }
788
789 let branch_output = Value::Array(results);
790
791 let selected = if let Some(selector) = state_def.get("ResultSelector") {
793 apply_parameters(selector, &branch_output)
794 } else {
795 branch_output
796 };
797
798 let after_result = if result_path == Some("null") {
800 input.clone()
801 } else {
802 apply_result_path(input, &selected, result_path)
803 };
804
805 let output = if output_path == Some("null") {
807 json!({})
808 } else {
809 apply_output_path(&after_result, output_path)
810 };
811
812 Ok(output)
813}
814
815async fn execute_map_state(
817 state_def: &Value,
818 input: &Value,
819 delivery: &Option<Arc<DeliveryBus>>,
820 dynamodb_state: &Option<SharedDynamoDbState>,
821 shared_state: &SharedStepFunctionsState,
822 execution_arn: &str,
823) -> Result<Value, (String, String)> {
824 let input_path = state_def["InputPath"].as_str();
825 let result_path = state_def["ResultPath"].as_str();
826 let output_path = state_def["OutputPath"].as_str();
827
828 let effective_input = if input_path == Some("null") {
829 json!({})
830 } else {
831 apply_input_path(input, input_path)
832 };
833
834 let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
836 let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
837 let items = items_value.as_array().cloned().unwrap_or_default();
838
839 let iterator_def = state_def
841 .get("ItemProcessor")
842 .or_else(|| state_def.get("Iterator"))
843 .cloned()
844 .ok_or_else(|| {
845 (
846 "States.Runtime".to_string(),
847 "Map state has no ItemProcessor or Iterator".to_string(),
848 )
849 })?;
850
851 let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
852 let effective_concurrency = if max_concurrency == 0 {
853 40
854 } else {
855 max_concurrency as usize
856 };
857
858 let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
859
860 let mut handles = Vec::new();
862 for (index, item) in items.into_iter().enumerate() {
863 let iter_def = iterator_def.clone();
864 let delivery = delivery.clone();
865 let ddb = dynamodb_state.clone();
866 let state = shared_state.clone();
867 let arn = execution_arn.to_string();
868 let sem = semaphore.clone();
869
870 let item_input = if let Some(selector) = state_def.get("ItemSelector") {
872 let mut ctx = serde_json::Map::new();
873 ctx.insert("value".to_string(), item.clone());
874 ctx.insert("index".to_string(), json!(index));
875 apply_parameters(selector, &Value::Object(ctx))
876 } else {
877 item
878 };
879
880 add_event(
881 shared_state,
882 execution_arn,
883 "MapIterationStarted",
884 0,
885 json!({ "index": index }),
886 );
887
888 handles.push(tokio::spawn(async move {
889 let _permit = sem
890 .acquire()
891 .await
892 .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
893 let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
894 Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
895 }));
896 }
897
898 let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
900 for handle in handles {
901 let (index, result) = handle.await.map_err(|e| {
902 (
903 "States.Runtime".to_string(),
904 format!("Map iteration panicked: {e}"),
905 )
906 })??;
907
908 match result {
909 Ok(output) => {
910 add_event(
911 shared_state,
912 execution_arn,
913 "MapIterationSucceeded",
914 0,
915 json!({ "index": index }),
916 );
917 results.push((index, output));
918 }
919 Err((error, cause)) => {
920 add_event(
921 shared_state,
922 execution_arn,
923 "MapIterationFailed",
924 0,
925 json!({ "index": index, "error": error }),
926 );
927 return Err((error, cause));
928 }
929 }
930 }
931
932 results.sort_by_key(|(i, _)| *i);
934 let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
935
936 let selected = if let Some(selector) = state_def.get("ResultSelector") {
938 apply_parameters(selector, &map_output)
939 } else {
940 map_output
941 };
942
943 let after_result = if result_path == Some("null") {
945 input.clone()
946 } else {
947 apply_result_path(input, &selected, result_path)
948 };
949
950 let output = if output_path == Some("null") {
952 json!({})
953 } else {
954 apply_output_path(&after_result, output_path)
955 };
956
957 Ok(output)
958}
959
960async fn invoke_resource(
962 resource: &str,
963 input: &Value,
964 delivery: &Option<Arc<DeliveryBus>>,
965 dynamodb_state: &Option<SharedDynamoDbState>,
966 timeout_seconds: Option<u64>,
967) -> Result<Value, (String, String)> {
968 if resource.contains(":lambda:") && resource.contains(":function:") {
970 return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
971 }
972
973 if resource.starts_with("arn:aws:states:::lambda:invoke") {
975 let function_name = input["FunctionName"].as_str().unwrap_or("");
976 let payload = if let Some(p) = input.get("Payload") {
977 p.clone()
978 } else {
979 input.clone()
980 };
981 return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
982 }
983
984 if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
985 return invoke_sqs_send_message(input, delivery);
986 }
987
988 if resource.starts_with("arn:aws:states:::sns:publish") {
989 return invoke_sns_publish(input, delivery);
990 }
991
992 if resource.starts_with("arn:aws:states:::events:putEvents") {
993 return invoke_eventbridge_put_events(input, delivery);
994 }
995
996 if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
997 return invoke_dynamodb_get_item(input, dynamodb_state);
998 }
999
1000 if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1001 return invoke_dynamodb_put_item(input, dynamodb_state);
1002 }
1003
1004 if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1005 return invoke_dynamodb_delete_item(input, dynamodb_state);
1006 }
1007
1008 if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1009 return invoke_dynamodb_update_item(input, dynamodb_state);
1010 }
1011
1012 Err((
1013 "States.TaskFailed".to_string(),
1014 format!("Unsupported resource: {resource}"),
1015 ))
1016}
1017
1018fn invoke_sqs_send_message(
1020 input: &Value,
1021 delivery: &Option<Arc<DeliveryBus>>,
1022) -> Result<Value, (String, String)> {
1023 let delivery = delivery.as_ref().ok_or_else(|| {
1024 (
1025 "States.TaskFailed".to_string(),
1026 "No delivery bus configured for SQS".to_string(),
1027 )
1028 })?;
1029
1030 let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1031 (
1032 "States.TaskFailed".to_string(),
1033 "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1034 )
1035 })?;
1036
1037 let message_body = input["MessageBody"]
1038 .as_str()
1039 .map(|s| s.to_string())
1040 .unwrap_or_else(|| {
1041 serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1043 });
1044
1045 let queue_arn = queue_url_to_arn(queue_url);
1049
1050 delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1051
1052 Ok(json!({
1053 "MessageId": uuid::Uuid::new_v4().to_string(),
1054 "MD5OfMessageBody": md5_hex(&message_body),
1055 }))
1056}
1057
1058fn invoke_sns_publish(
1060 input: &Value,
1061 delivery: &Option<Arc<DeliveryBus>>,
1062) -> Result<Value, (String, String)> {
1063 let delivery = delivery.as_ref().ok_or_else(|| {
1064 (
1065 "States.TaskFailed".to_string(),
1066 "No delivery bus configured for SNS".to_string(),
1067 )
1068 })?;
1069
1070 let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1071 (
1072 "States.TaskFailed".to_string(),
1073 "Missing TopicArn in SNS publish parameters".to_string(),
1074 )
1075 })?;
1076
1077 let message = input["Message"]
1078 .as_str()
1079 .map(|s| s.to_string())
1080 .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1081
1082 let subject = input["Subject"].as_str();
1083
1084 delivery.publish_to_sns(topic_arn, &message, subject);
1085
1086 Ok(json!({
1087 "MessageId": uuid::Uuid::new_v4().to_string(),
1088 }))
1089}
1090
1091fn invoke_eventbridge_put_events(
1093 input: &Value,
1094 delivery: &Option<Arc<DeliveryBus>>,
1095) -> Result<Value, (String, String)> {
1096 let delivery = delivery.as_ref().ok_or_else(|| {
1097 (
1098 "States.TaskFailed".to_string(),
1099 "No delivery bus configured for EventBridge".to_string(),
1100 )
1101 })?;
1102
1103 let entries = input["Entries"]
1104 .as_array()
1105 .ok_or_else(|| {
1106 (
1107 "States.TaskFailed".to_string(),
1108 "Missing Entries in EventBridge putEvents parameters".to_string(),
1109 )
1110 })?
1111 .clone();
1112
1113 let mut event_ids = Vec::new();
1114 for entry in &entries {
1115 let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1116 let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1117 let detail = entry["Detail"]
1118 .as_str()
1119 .map(|s| s.to_string())
1120 .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1121 let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1122
1123 delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1124 event_ids.push(uuid::Uuid::new_v4().to_string());
1125 }
1126
1127 Ok(json!({
1128 "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1129 "FailedEntryCount": 0,
1130 }))
1131}
1132
1133fn invoke_dynamodb_get_item(
1135 input: &Value,
1136 dynamodb_state: &Option<SharedDynamoDbState>,
1137) -> Result<Value, (String, String)> {
1138 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1139 (
1140 "States.TaskFailed".to_string(),
1141 "No DynamoDB state configured".to_string(),
1142 )
1143 })?;
1144
1145 let table_name = input["TableName"].as_str().ok_or_else(|| {
1146 (
1147 "States.TaskFailed".to_string(),
1148 "Missing TableName in DynamoDB getItem parameters".to_string(),
1149 )
1150 })?;
1151
1152 let key = input
1153 .get("Key")
1154 .and_then(|k| k.as_object())
1155 .ok_or_else(|| {
1156 (
1157 "States.TaskFailed".to_string(),
1158 "Missing Key in DynamoDB getItem parameters".to_string(),
1159 )
1160 })?;
1161
1162 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1163
1164 let state = ddb.read();
1165 let table = state.tables.get(table_name).ok_or_else(|| {
1166 (
1167 "States.TaskFailed".to_string(),
1168 format!("Table '{table_name}' not found"),
1169 )
1170 })?;
1171
1172 let item = table
1173 .find_item_index(&key_map)
1174 .map(|idx| table.items[idx].clone());
1175
1176 match item {
1177 Some(item_map) => {
1178 let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1179 Ok(json!({ "Item": item_value }))
1180 }
1181 None => Ok(json!({})),
1182 }
1183}
1184
1185fn invoke_dynamodb_put_item(
1187 input: &Value,
1188 dynamodb_state: &Option<SharedDynamoDbState>,
1189) -> Result<Value, (String, String)> {
1190 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1191 (
1192 "States.TaskFailed".to_string(),
1193 "No DynamoDB state configured".to_string(),
1194 )
1195 })?;
1196
1197 let table_name = input["TableName"].as_str().ok_or_else(|| {
1198 (
1199 "States.TaskFailed".to_string(),
1200 "Missing TableName in DynamoDB putItem parameters".to_string(),
1201 )
1202 })?;
1203
1204 let item = input
1205 .get("Item")
1206 .and_then(|i| i.as_object())
1207 .ok_or_else(|| {
1208 (
1209 "States.TaskFailed".to_string(),
1210 "Missing Item in DynamoDB putItem parameters".to_string(),
1211 )
1212 })?;
1213
1214 let item_map: HashMap<String, Value> =
1215 item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1216
1217 let mut state = ddb.write();
1218 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1219 (
1220 "States.TaskFailed".to_string(),
1221 format!("Table '{table_name}' not found"),
1222 )
1223 })?;
1224
1225 if let Some(idx) = table.find_item_index(&item_map) {
1227 table.items[idx] = item_map;
1228 } else {
1229 table.items.push(item_map);
1230 }
1231
1232 Ok(json!({}))
1233}
1234
1235fn invoke_dynamodb_delete_item(
1237 input: &Value,
1238 dynamodb_state: &Option<SharedDynamoDbState>,
1239) -> Result<Value, (String, String)> {
1240 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1241 (
1242 "States.TaskFailed".to_string(),
1243 "No DynamoDB state configured".to_string(),
1244 )
1245 })?;
1246
1247 let table_name = input["TableName"].as_str().ok_or_else(|| {
1248 (
1249 "States.TaskFailed".to_string(),
1250 "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1251 )
1252 })?;
1253
1254 let key = input
1255 .get("Key")
1256 .and_then(|k| k.as_object())
1257 .ok_or_else(|| {
1258 (
1259 "States.TaskFailed".to_string(),
1260 "Missing Key in DynamoDB deleteItem parameters".to_string(),
1261 )
1262 })?;
1263
1264 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1265
1266 let mut state = ddb.write();
1267 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1268 (
1269 "States.TaskFailed".to_string(),
1270 format!("Table '{table_name}' not found"),
1271 )
1272 })?;
1273
1274 if let Some(idx) = table.find_item_index(&key_map) {
1275 table.items.remove(idx);
1276 }
1277
1278 Ok(json!({}))
1279}
1280
1281fn invoke_dynamodb_update_item(
1284 input: &Value,
1285 dynamodb_state: &Option<SharedDynamoDbState>,
1286) -> Result<Value, (String, String)> {
1287 let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1288 (
1289 "States.TaskFailed".to_string(),
1290 "No DynamoDB state configured".to_string(),
1291 )
1292 })?;
1293
1294 let table_name = input["TableName"].as_str().ok_or_else(|| {
1295 (
1296 "States.TaskFailed".to_string(),
1297 "Missing TableName in DynamoDB updateItem parameters".to_string(),
1298 )
1299 })?;
1300
1301 let key = input
1302 .get("Key")
1303 .and_then(|k| k.as_object())
1304 .ok_or_else(|| {
1305 (
1306 "States.TaskFailed".to_string(),
1307 "Missing Key in DynamoDB updateItem parameters".to_string(),
1308 )
1309 })?;
1310
1311 let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1312
1313 let mut state = ddb.write();
1314 let table = state.tables.get_mut(table_name).ok_or_else(|| {
1315 (
1316 "States.TaskFailed".to_string(),
1317 format!("Table '{table_name}' not found"),
1318 )
1319 })?;
1320
1321 if let Some(update_expr) = input["UpdateExpression"].as_str() {
1323 let attr_values = input
1324 .get("ExpressionAttributeValues")
1325 .and_then(|v| v.as_object())
1326 .cloned()
1327 .unwrap_or_default();
1328 let attr_names = input
1329 .get("ExpressionAttributeNames")
1330 .and_then(|v| v.as_object())
1331 .cloned()
1332 .unwrap_or_default();
1333
1334 if let Some(idx) = table.find_item_index(&key_map) {
1335 apply_update_expression(
1336 &mut table.items[idx],
1337 update_expr,
1338 &attr_values,
1339 &attr_names,
1340 );
1341 } else {
1342 let mut new_item = key_map;
1344 apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1345 table.items.push(new_item);
1346 }
1347 }
1348
1349 Ok(json!({}))
1350}
1351
1352fn apply_update_expression(
1354 item: &mut HashMap<String, Value>,
1355 expr: &str,
1356 attr_values: &serde_json::Map<String, Value>,
1357 attr_names: &serde_json::Map<String, Value>,
1358) {
1359 let trimmed = expr.trim();
1362 let set_part = if trimmed.len() >= 4 && trimmed[..4].eq_ignore_ascii_case("SET ") {
1363 &trimmed[4..]
1364 } else {
1365 trimmed
1366 };
1367
1368 for assignment in set_part.split(',') {
1369 let parts: Vec<&str> = assignment.splitn(2, '=').collect();
1370 if parts.len() == 2 {
1371 let attr_ref = parts[0].trim();
1372 let val_ref = parts[1].trim();
1373
1374 let attr_name = if attr_ref.starts_with('#') {
1376 attr_names
1377 .get(attr_ref)
1378 .and_then(|v| v.as_str())
1379 .unwrap_or(attr_ref)
1380 .to_string()
1381 } else {
1382 attr_ref.to_string()
1383 };
1384
1385 if val_ref.starts_with(':') {
1387 if let Some(val) = attr_values.get(val_ref) {
1388 item.insert(attr_name, val.clone());
1389 }
1390 }
1391 }
1392 }
1393}
1394
1395fn queue_url_to_arn(url: &str) -> String {
1398 let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1399 if parts.len() >= 2 {
1400 let queue_name = parts[0];
1401 let account_id = parts[1];
1402 format!("arn:aws:sqs:us-east-1:{account_id}:{queue_name}")
1403 } else {
1404 url.to_string()
1405 }
1406}
1407
1408fn md5_hex(data: &str) -> String {
1410 use md5::Digest;
1411 let result = md5::Md5::digest(data.as_bytes());
1412 format!("{result:032x}")
1413}
1414
1415async fn invoke_lambda_direct(
1417 function_arn: &str,
1418 input: &Value,
1419 delivery: &Option<Arc<DeliveryBus>>,
1420 timeout_seconds: Option<u64>,
1421) -> Result<Value, (String, String)> {
1422 let delivery = delivery.as_ref().ok_or_else(|| {
1423 (
1424 "States.TaskFailed".to_string(),
1425 "No delivery bus configured for Lambda invocation".to_string(),
1426 )
1427 })?;
1428
1429 let payload = serde_json::to_string(input).unwrap_or_default();
1430
1431 let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1432
1433 let result = if let Some(timeout) = timeout_seconds {
1434 match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1435 Ok(r) => r,
1436 Err(_) => {
1437 return Err((
1438 "States.Timeout".to_string(),
1439 format!("Task timed out after {timeout} seconds"),
1440 ));
1441 }
1442 }
1443 } else {
1444 invoke_future.await
1445 };
1446
1447 match result {
1448 Some(Ok(bytes)) => {
1449 let response_str = String::from_utf8_lossy(&bytes);
1450 let value: Value =
1451 serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1452 Ok(value)
1453 }
1454 Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1455 None => {
1456 Ok(json!({}))
1458 }
1459 }
1460}
1461
1462fn apply_parameters(template: &Value, input: &Value) -> Value {
1464 match template {
1465 Value::Object(map) => {
1466 let mut result = serde_json::Map::new();
1467 for (key, value) in map {
1468 if let Some(stripped) = key.strip_suffix(".$") {
1469 if let Some(path) = value.as_str() {
1470 result.insert(
1471 stripped.to_string(),
1472 crate::io_processing::resolve_path(input, path),
1473 );
1474 }
1475 } else {
1476 result.insert(key.clone(), apply_parameters(value, input));
1477 }
1478 }
1479 Value::Object(result)
1480 }
1481 Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1482 other => other.clone(),
1483 }
1484}
1485
1486enum NextState {
1487 Name(String),
1488 End,
1489 Error(String),
1490}
1491
1492fn next_state(state_def: &Value) -> NextState {
1493 if state_def["End"].as_bool() == Some(true) {
1494 return NextState::End;
1495 }
1496 match state_def["Next"].as_str() {
1497 Some(next) => NextState::Name(next.to_string()),
1498 None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
1499 }
1500}
1501
1502fn add_event(
1503 state: &SharedStepFunctionsState,
1504 execution_arn: &str,
1505 event_type: &str,
1506 previous_event_id: i64,
1507 details: Value,
1508) -> i64 {
1509 let mut s = state.write();
1510 if let Some(exec) = s.executions.get_mut(execution_arn) {
1511 let id = exec.history_events.len() as i64 + 1;
1512 exec.history_events.push(HistoryEvent {
1513 id,
1514 event_type: event_type.to_string(),
1515 timestamp: Utc::now(),
1516 previous_event_id,
1517 details,
1518 });
1519 id
1520 } else {
1521 0
1522 }
1523}
1524
1525fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
1526 {
1528 let s = state.read();
1529 if let Some(exec) = s.executions.get(execution_arn) {
1530 if exec.status != ExecutionStatus::Running {
1531 return;
1532 }
1533 }
1534 }
1535
1536 let output_str = serde_json::to_string(output).unwrap_or_default();
1537
1538 add_event(
1539 state,
1540 execution_arn,
1541 "ExecutionSucceeded",
1542 0,
1543 json!({ "output": output_str }),
1544 );
1545
1546 let mut s = state.write();
1547 if let Some(exec) = s.executions.get_mut(execution_arn) {
1548 exec.status = ExecutionStatus::Succeeded;
1549 exec.output = Some(output_str);
1550 exec.stop_date = Some(Utc::now());
1551 }
1552}
1553
1554fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
1555 {
1557 let s = state.read();
1558 if let Some(exec) = s.executions.get(execution_arn) {
1559 if exec.status != ExecutionStatus::Running {
1560 return;
1561 }
1562 }
1563 }
1564
1565 add_event(
1566 state,
1567 execution_arn,
1568 "ExecutionFailed",
1569 0,
1570 json!({ "error": error, "cause": cause }),
1571 );
1572
1573 let mut s = state.write();
1574 if let Some(exec) = s.executions.get_mut(execution_arn) {
1575 exec.status = ExecutionStatus::Failed;
1576 exec.error = Some(error.to_string());
1577 exec.cause = Some(cause.to_string());
1578 exec.stop_date = Some(Utc::now());
1579 }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584 use super::*;
1585 use crate::state::{Execution, StepFunctionsState};
1586 use parking_lot::RwLock;
1587 use std::sync::Arc;
1588
1589 fn make_state() -> SharedStepFunctionsState {
1590 Arc::new(RwLock::new(StepFunctionsState::new(
1591 "123456789012",
1592 "us-east-1",
1593 )))
1594 }
1595
1596 fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
1597 let mut s = state.write();
1598 s.executions.insert(
1599 arn.to_string(),
1600 Execution {
1601 execution_arn: arn.to_string(),
1602 state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
1603 .to_string(),
1604 state_machine_name: "test".to_string(),
1605 name: "exec-1".to_string(),
1606 status: ExecutionStatus::Running,
1607 input,
1608 output: None,
1609 start_date: Utc::now(),
1610 stop_date: None,
1611 error: None,
1612 cause: None,
1613 history_events: vec![],
1614 },
1615 );
1616 }
1617
1618 #[tokio::test]
1619 async fn test_simple_pass_state() {
1620 let state = make_state();
1621 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1622 create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
1623
1624 let definition = json!({
1625 "StartAt": "PassState",
1626 "States": {
1627 "PassState": {
1628 "Type": "Pass",
1629 "Result": {"processed": true},
1630 "End": true
1631 }
1632 }
1633 })
1634 .to_string();
1635
1636 execute_state_machine(
1637 state.clone(),
1638 arn.to_string(),
1639 definition,
1640 Some(r#"{"hello":"world"}"#.to_string()),
1641 None,
1642 None,
1643 )
1644 .await;
1645
1646 let s = state.read();
1647 let exec = s.executions.get(arn).unwrap();
1648 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1649 assert!(exec.output.is_some());
1650 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1651 assert_eq!(output, json!({"processed": true}));
1652 }
1653
1654 #[tokio::test]
1655 async fn test_pass_chain() {
1656 let state = make_state();
1657 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1658 create_execution(&state, arn, Some(r#"{}"#.to_string()));
1659
1660 let definition = json!({
1661 "StartAt": "First",
1662 "States": {
1663 "First": {
1664 "Type": "Pass",
1665 "Result": "step1",
1666 "ResultPath": "$.first",
1667 "Next": "Second"
1668 },
1669 "Second": {
1670 "Type": "Pass",
1671 "Result": "step2",
1672 "ResultPath": "$.second",
1673 "End": true
1674 }
1675 }
1676 })
1677 .to_string();
1678
1679 execute_state_machine(
1680 state.clone(),
1681 arn.to_string(),
1682 definition,
1683 Some("{}".to_string()),
1684 None,
1685 None,
1686 )
1687 .await;
1688
1689 let s = state.read();
1690 let exec = s.executions.get(arn).unwrap();
1691 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1692 let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1693 assert_eq!(output["first"], json!("step1"));
1694 assert_eq!(output["second"], json!("step2"));
1695 }
1696
1697 #[tokio::test]
1698 async fn test_succeed_state() {
1699 let state = make_state();
1700 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1701 create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
1702
1703 let definition = json!({
1704 "StartAt": "Done",
1705 "States": {
1706 "Done": {
1707 "Type": "Succeed"
1708 }
1709 }
1710 })
1711 .to_string();
1712
1713 execute_state_machine(
1714 state.clone(),
1715 arn.to_string(),
1716 definition,
1717 Some(r#"{"data": "value"}"#.to_string()),
1718 None,
1719 None,
1720 )
1721 .await;
1722
1723 let s = state.read();
1724 let exec = s.executions.get(arn).unwrap();
1725 assert_eq!(exec.status, ExecutionStatus::Succeeded);
1726 }
1727
1728 #[tokio::test]
1729 async fn test_fail_state() {
1730 let state = make_state();
1731 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1732 create_execution(&state, arn, None);
1733
1734 let definition = json!({
1735 "StartAt": "FailState",
1736 "States": {
1737 "FailState": {
1738 "Type": "Fail",
1739 "Error": "CustomError",
1740 "Cause": "Something went wrong"
1741 }
1742 }
1743 })
1744 .to_string();
1745
1746 execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
1747
1748 let s = state.read();
1749 let exec = s.executions.get(arn).unwrap();
1750 assert_eq!(exec.status, ExecutionStatus::Failed);
1751 assert_eq!(exec.error.as_deref(), Some("CustomError"));
1752 assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
1753 }
1754
1755 #[tokio::test]
1756 async fn test_history_events_recorded() {
1757 let state = make_state();
1758 let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1759 create_execution(&state, arn, Some("{}".to_string()));
1760
1761 let definition = json!({
1762 "StartAt": "PassState",
1763 "States": {
1764 "PassState": {
1765 "Type": "Pass",
1766 "End": true
1767 }
1768 }
1769 })
1770 .to_string();
1771
1772 execute_state_machine(
1773 state.clone(),
1774 arn.to_string(),
1775 definition,
1776 Some("{}".to_string()),
1777 None,
1778 None,
1779 )
1780 .await;
1781
1782 let s = state.read();
1783 let exec = s.executions.get(arn).unwrap();
1784 let event_types: Vec<&str> = exec
1785 .history_events
1786 .iter()
1787 .map(|e| e.event_type.as_str())
1788 .collect();
1789 assert_eq!(
1790 event_types,
1791 vec![
1792 "ExecutionStarted",
1793 "PassStateEntered",
1794 "PassStateExited",
1795 "ExecutionSucceeded"
1796 ]
1797 );
1798 }
1799}