1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::service::SharedServiceRegistry;
16use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
17
18#[allow(clippy::too_many_arguments)]
21pub async fn execute_state_machine(
22 state: SharedStepFunctionsState,
23 execution_arn: String,
24 definition: String,
25 input: Option<String>,
26 delivery: Option<Arc<DeliveryBus>>,
27 dynamodb_state: Option<SharedDynamoDbState>,
28 registry: Option<SharedServiceRegistry>,
29 logging_configuration: Option<Value>,
30) {
31 let def: Value = match serde_json::from_str(&definition) {
32 Ok(v) => v,
33 Err(e) => {
34 fail_execution(
35 &state,
36 &execution_arn,
37 "States.Runtime",
38 &format!("Failed to parse definition: {e}"),
39 );
40 return;
41 }
42 };
43
44 let raw_input: Value = input
45 .as_deref()
46 .and_then(|s| serde_json::from_str(s).ok())
47 .unwrap_or(json!({}));
48
49 add_event(
51 &state,
52 &execution_arn,
53 "ExecutionStarted",
54 0,
55 json!({
56 "input": serde_json::to_string(&raw_input).expect("serde_json::Value serialization is infallible"),
57 "roleArn": "arn:aws:iam::123456789012:role/execution-role"
58 }),
59 );
60
61 let def_owned = def;
67 let state_clone = state.clone();
68 let execution_arn_clone = execution_arn.clone();
69 let delivery_clone = delivery.clone();
70 let dynamodb_state_clone = dynamodb_state.clone();
71 let registry_clone = registry.clone();
72 let handle = tokio::spawn(async move {
73 run_states(
74 &def_owned,
75 raw_input,
76 &delivery_clone,
77 &dynamodb_state_clone,
78 ®istry_clone,
79 &state_clone,
80 &execution_arn_clone,
81 )
82 .await
83 });
84
85 match handle.await {
86 Ok(Ok(output)) => {
87 succeed_execution(&state, &execution_arn, &output);
88 }
89 Ok(Err((error, cause))) => {
90 fail_execution(&state, &execution_arn, &error, &cause);
91 }
92 Err(join_err) => {
93 let msg = if join_err.is_panic() {
94 let payload = join_err.into_panic();
95 if let Some(s) = payload.downcast_ref::<String>() {
96 s.clone()
97 } else if let Some(s) = payload.downcast_ref::<&'static str>() {
98 (*s).to_string()
99 } else {
100 "execution task panicked".to_string()
101 }
102 } else {
103 format!("execution task cancelled: {join_err}")
104 };
105 tracing::error!(
106 execution_arn = %execution_arn,
107 panic = %msg,
108 "Step Functions execution panicked"
109 );
110 fail_execution(&state, &execution_arn, "States.Runtime", &msg);
111 }
112 }
113
114 deliver_execution_logs(
116 &state,
117 &execution_arn,
118 delivery.as_ref(),
119 logging_configuration.as_ref(),
120 );
121}
122
123type StatesResult<'a> = std::pin::Pin<
124 Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
125>;
126
127pub(crate) enum Advance {
129 Next(String, Value),
131 End(Value),
133 Fail(String, String),
135}
136
137async fn run_wait_state(
138 name: &str,
139 state_def: &Value,
140 input: Value,
141 shared_state: &SharedStepFunctionsState,
142 execution_arn: &str,
143) -> Advance {
144 let entered_event_id = add_event(
145 shared_state,
146 execution_arn,
147 "WaitStateEntered",
148 0,
149 json!({
150 "name": name,
151 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
152 }),
153 );
154
155 execute_wait_state(state_def, &input).await;
156
157 add_event(
158 shared_state,
159 execution_arn,
160 "WaitStateExited",
161 entered_event_id,
162 json!({
163 "name": name,
164 "output": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
165 }),
166 );
167
168 advance_from_next(state_def, input)
169}
170
171#[allow(clippy::too_many_arguments)]
172async fn run_task_state(
173 name: &str,
174 state_def: &Value,
175 input: Value,
176 delivery: &Option<Arc<DeliveryBus>>,
177 dynamodb_state: &Option<SharedDynamoDbState>,
178 registry: &Option<SharedServiceRegistry>,
179 shared_state: &SharedStepFunctionsState,
180 execution_arn: &str,
181) -> Advance {
182 let entered_event_id = add_event(
183 shared_state,
184 execution_arn,
185 "TaskStateEntered",
186 0,
187 json!({
188 "name": name,
189 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
190 }),
191 );
192
193 let result = execute_task_state(
194 name,
195 state_def,
196 &input,
197 delivery,
198 dynamodb_state,
199 registry,
200 shared_state,
201 execution_arn,
202 entered_event_id,
203 )
204 .await;
205
206 match result {
207 Ok(output) => {
208 add_event(
209 shared_state,
210 execution_arn,
211 "TaskStateExited",
212 entered_event_id,
213 json!({
214 "name": name,
215 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
216 }),
217 );
218 advance_from_next(state_def, output)
219 }
220 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
221 }
222}
223
224#[allow(clippy::too_many_arguments)]
225async fn run_parallel_state(
226 name: &str,
227 state_def: &Value,
228 input: Value,
229 delivery: &Option<Arc<DeliveryBus>>,
230 dynamodb_state: &Option<SharedDynamoDbState>,
231 registry: &Option<SharedServiceRegistry>,
232 shared_state: &SharedStepFunctionsState,
233 execution_arn: &str,
234) -> Advance {
235 let entered_event_id = add_event(
236 shared_state,
237 execution_arn,
238 "ParallelStateEntered",
239 0,
240 json!({
241 "name": name,
242 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
243 }),
244 );
245
246 let result = execute_parallel_state(
247 state_def,
248 &input,
249 delivery,
250 dynamodb_state,
251 registry,
252 shared_state,
253 execution_arn,
254 )
255 .await;
256
257 match result {
258 Ok(output) => {
259 add_event(
260 shared_state,
261 execution_arn,
262 "ParallelStateExited",
263 entered_event_id,
264 json!({
265 "name": name,
266 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
267 }),
268 );
269 advance_from_next(state_def, output)
270 }
271 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
272 }
273}
274
275#[allow(clippy::too_many_arguments)]
276async fn run_map_state(
277 name: &str,
278 state_def: &Value,
279 input: Value,
280 delivery: &Option<Arc<DeliveryBus>>,
281 dynamodb_state: &Option<SharedDynamoDbState>,
282 registry: &Option<SharedServiceRegistry>,
283 shared_state: &SharedStepFunctionsState,
284 execution_arn: &str,
285) -> Advance {
286 let entered_event_id = add_event(
287 shared_state,
288 execution_arn,
289 "MapStateEntered",
290 0,
291 json!({
292 "name": name,
293 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
294 }),
295 );
296
297 let result = execute_map_state(
298 state_def,
299 &input,
300 delivery,
301 dynamodb_state,
302 registry,
303 shared_state,
304 execution_arn,
305 )
306 .await;
307
308 match result {
309 Ok(output) => {
310 add_event(
311 shared_state,
312 execution_arn,
313 "MapStateExited",
314 entered_event_id,
315 json!({
316 "name": name,
317 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
318 }),
319 );
320 advance_from_next(state_def, output)
321 }
322 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
323 }
324}
325
326async fn execute_wait_state(state_def: &Value, input: &Value) {
328 if let Some(seconds) = state_def["Seconds"].as_u64() {
329 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
330 return;
331 }
332
333 if let Some(path) = state_def["SecondsPath"].as_str() {
334 let val = crate::io_processing::resolve_path(input, path);
335 if let Some(seconds) = val.as_u64() {
336 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
337 }
338 return;
339 }
340
341 if let Some(ts_str) = state_def["Timestamp"].as_str() {
342 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
343 let now = Utc::now();
344 let target_utc = target.with_timezone(&chrono::Utc);
345 if target_utc > now {
346 let duration = (target_utc - now).to_std().unwrap_or_default();
347 tokio::time::sleep(duration).await;
348 }
349 }
350 return;
351 }
352
353 if let Some(path) = state_def["TimestampPath"].as_str() {
354 let val = crate::io_processing::resolve_path(input, path);
355 if let Some(ts_str) = val.as_str() {
356 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
357 let now = Utc::now();
358 let target_utc = target.with_timezone(&chrono::Utc);
359 if target_utc > now {
360 let duration = (target_utc - now).to_std().unwrap_or_default();
361 tokio::time::sleep(duration).await;
362 }
363 }
364 }
365 return;
366 }
367
368 warn!(
369 "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
370 );
371}
372
373#[allow(clippy::too_many_arguments)]
376async fn execute_task_state(
377 name: &str,
378 state_def: &Value,
379 input: &Value,
380 delivery: &Option<Arc<DeliveryBus>>,
381 dynamodb_state: &Option<SharedDynamoDbState>,
382 registry: &Option<SharedServiceRegistry>,
383 shared_state: &SharedStepFunctionsState,
384 execution_arn: &str,
385 entered_event_id: i64,
386) -> Result<Value, (String, String)> {
387 let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
388
389 let input_path = state_def["InputPath"].as_str();
390 let result_path = state_def["ResultPath"].as_str();
391 let output_path = state_def["OutputPath"].as_str();
392
393 let effective_input = if input_path == Some("null") {
394 json!({})
395 } else {
396 apply_input_path(input, input_path)
397 };
398
399 let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
400 let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
401 let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
402 let mut attempt = 0u32;
403
404 let is_wait_for_task_token = resource.contains(".waitForTaskToken");
405 let task_token = if is_wait_for_task_token {
406 let token = format!(
407 "FCToken-{}-{}",
408 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
409 uuid::Uuid::new_v4().simple(),
410 );
411 let account_id = account_id_from_arn(execution_arn);
412 let context = json!({
413 "Task": { "Token": token.clone() },
414 "Execution": { "Id": execution_arn },
415 "State": { "Name": name },
416 });
417 {
418 let mut accounts = shared_state.write();
419 let state = accounts.get_or_create(account_id);
420 state.task_tokens.insert(
421 token.clone(),
422 crate::state::TaskTokenState {
423 activity_arn: String::new(),
424 status: "PENDING".to_string(),
425 output: None,
426 error: None,
427 cause: None,
428 input: None,
429 created_at: chrono::Utc::now(),
430 last_heartbeat_at: None,
431 heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
432 timeout_seconds: timeout_seconds.map(|s| s as i64),
433 },
434 );
435 }
436 Some((token, context))
437 } else {
438 None
439 };
440
441 let task_input = if let Some(params) = state_def.get("Parameters") {
442 if let Some((_, ctx)) = &task_token {
443 apply_parameters(params, &effective_input, Some(ctx))
444 } else {
445 apply_parameters(params, &effective_input, None)
446 }
447 } else {
448 effective_input
449 };
450
451 loop {
452 add_event(
453 shared_state,
454 execution_arn,
455 "TaskScheduled",
456 entered_event_id,
457 json!({
458 "resource": resource,
459 "region": "us-east-1",
460 "parameters": serde_json::to_string(&task_input).expect("serde_json::Value serialization is infallible"),
461 }),
462 );
463
464 add_event(
465 shared_state,
466 execution_arn,
467 "TaskStarted",
468 entered_event_id,
469 json!({ "resource": resource }),
470 );
471
472 let invoke_result = invoke_resource(
473 &resource,
474 &task_input,
475 delivery,
476 dynamodb_state,
477 registry,
478 execution_arn,
479 timeout_seconds,
480 heartbeat_seconds,
481 shared_state,
482 )
483 .await;
484
485 match invoke_result {
486 Ok(result) => {
487 if let Some((token, _)) = &task_token {
488 let account_id = account_id_from_arn(execution_arn);
489 match poll_task_token(
490 shared_state,
491 account_id,
492 token,
493 timeout_seconds,
494 heartbeat_seconds,
495 )
496 .await
497 {
498 Ok(output) => {
499 add_event(
500 shared_state,
501 execution_arn,
502 "TaskSucceeded",
503 entered_event_id,
504 json!({
505 "resource": resource,
506 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
507 }),
508 );
509
510 let selected = if let Some(selector) = state_def.get("ResultSelector") {
511 apply_parameters(selector, &output, None)
512 } else {
513 output
514 };
515
516 let after_result = if result_path == Some("null") {
517 input.clone()
518 } else {
519 apply_result_path(input, &selected, result_path)
520 };
521
522 let output = if output_path == Some("null") {
523 json!({})
524 } else {
525 apply_output_path(&after_result, output_path)
526 };
527
528 return Ok(output);
529 }
530 Err((error, cause)) => {
531 add_event(
532 shared_state,
533 execution_arn,
534 "TaskFailed",
535 entered_event_id,
536 json!({ "error": error, "cause": cause }),
537 );
538
539 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
540 attempt += 1;
541 let actual_delay = delay_ms.min(5000);
542 tokio::time::sleep(tokio::time::Duration::from_millis(
543 actual_delay,
544 ))
545 .await;
546 continue;
547 }
548
549 return Err((error, cause));
550 }
551 }
552 }
553
554 add_event(
555 shared_state,
556 execution_arn,
557 "TaskSucceeded",
558 entered_event_id,
559 json!({
560 "resource": resource,
561 "output": serde_json::to_string(&result).expect("serde_json::Value serialization is infallible"),
562 }),
563 );
564
565 let selected = if let Some(selector) = state_def.get("ResultSelector") {
566 apply_parameters(selector, &result, None)
567 } else {
568 result
569 };
570
571 let after_result = if result_path == Some("null") {
572 input.clone()
573 } else {
574 apply_result_path(input, &selected, result_path)
575 };
576
577 let output = if output_path == Some("null") {
578 json!({})
579 } else {
580 apply_output_path(&after_result, output_path)
581 };
582
583 return Ok(output);
584 }
585 Err((error, cause)) => {
586 add_event(
587 shared_state,
588 execution_arn,
589 "TaskFailed",
590 entered_event_id,
591 json!({ "error": error, "cause": cause }),
592 );
593
594 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
595 attempt += 1;
596 let actual_delay = delay_ms.min(5000);
597 tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
598 continue;
599 }
600
601 return Err((error, cause));
602 }
603 }
604 }
605}
606
607async fn execute_parallel_state(
609 state_def: &Value,
610 input: &Value,
611 delivery: &Option<Arc<DeliveryBus>>,
612 dynamodb_state: &Option<SharedDynamoDbState>,
613 registry: &Option<SharedServiceRegistry>,
614 shared_state: &SharedStepFunctionsState,
615 execution_arn: &str,
616) -> Result<Value, (String, String)> {
617 let input_path = state_def["InputPath"].as_str();
618 let result_path = state_def["ResultPath"].as_str();
619 let output_path = state_def["OutputPath"].as_str();
620
621 let effective_input = if input_path == Some("null") {
622 json!({})
623 } else {
624 apply_input_path(input, input_path)
625 };
626
627 let branches = state_def["Branches"]
628 .as_array()
629 .cloned()
630 .unwrap_or_default();
631
632 if branches.is_empty() {
633 return Err((
634 "States.Runtime".to_string(),
635 "Parallel state has no Branches".to_string(),
636 ));
637 }
638
639 let mut handles = Vec::new();
641 for branch_def in &branches {
642 let branch = branch_def.clone();
643 let branch_input = effective_input.clone();
644 let delivery = delivery.clone();
645 let ddb = dynamodb_state.clone();
646 let reg = registry.clone();
647 let state = shared_state.clone();
648 let arn = execution_arn.to_string();
649
650 handles.push(tokio::spawn(async move {
651 run_states(&branch, branch_input, &delivery, &ddb, ®, &state, &arn).await
652 }));
653 }
654
655 let mut results = Vec::with_capacity(handles.len());
657 for handle in handles {
658 let result = handle.await.map_err(|e| {
659 (
660 "States.Runtime".to_string(),
661 format!("Branch execution panicked: {e}"),
662 )
663 })??;
664 results.push(result);
665 }
666
667 let branch_output = Value::Array(results);
668
669 let selected = if let Some(selector) = state_def.get("ResultSelector") {
671 apply_parameters(selector, &branch_output, None)
672 } else {
673 branch_output
674 };
675
676 let after_result = if result_path == Some("null") {
678 input.clone()
679 } else {
680 apply_result_path(input, &selected, result_path)
681 };
682
683 let output = if output_path == Some("null") {
685 json!({})
686 } else {
687 apply_output_path(&after_result, output_path)
688 };
689
690 Ok(output)
691}
692
693#[allow(clippy::too_many_arguments)]
697async fn execute_map_state(
698 state_def: &Value,
699 input: &Value,
700 delivery: &Option<Arc<DeliveryBus>>,
701 dynamodb_state: &Option<SharedDynamoDbState>,
702 registry: &Option<SharedServiceRegistry>,
703 shared_state: &SharedStepFunctionsState,
704 execution_arn: &str,
705) -> Result<Value, (String, String)> {
706 let input_path = state_def["InputPath"].as_str();
707 let result_path = state_def["ResultPath"].as_str();
708 let output_path = state_def["OutputPath"].as_str();
709
710 let effective_input = if input_path == Some("null") {
711 json!({})
712 } else {
713 apply_input_path(input, input_path)
714 };
715
716 let max_concurrency = if let Some(path) = state_def["MaxConcurrencyPath"].as_str() {
718 crate::io_processing::resolve_path(&effective_input, path)
719 .as_u64()
720 .unwrap_or(0)
721 } else {
722 state_def["MaxConcurrency"].as_u64().unwrap_or(0)
723 };
724 let effective_concurrency = if max_concurrency == 0 {
725 40
726 } else {
727 max_concurrency as usize
728 };
729
730 let items = if let Some(item_reader) = state_def.get("ItemReader") {
732 read_items_from_s3(item_reader, registry, execution_arn).await?
733 } else {
734 let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
735 let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
736 items_value.as_array().cloned().unwrap_or_default()
737 };
738
739 let batch_config = state_def.get("ItemBatcher").cloned();
741 let batched_items = if let Some(ref batcher) = batch_config {
742 apply_item_batcher(&items, batcher, &effective_input)
743 } else {
744 items
745 };
746
747 let iterator_def = state_def
749 .get("ItemProcessor")
750 .or_else(|| state_def.get("Iterator"))
751 .cloned()
752 .ok_or_else(|| {
753 (
754 "States.Runtime".to_string(),
755 "Map state has no ItemProcessor or Iterator".to_string(),
756 )
757 })?;
758
759 let tolerated_failure_percentage = state_def["ToleratedFailurePercentage"]
760 .as_f64()
761 .unwrap_or(0.0);
762 let total_items = batched_items.len() as f64;
763 let mut failure_count = 0usize;
764
765 let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
766
767 let mut handles = Vec::new();
769 for (index, batch_item) in batched_items.into_iter().enumerate() {
770 let iter_def = iterator_def.clone();
771 let delivery = delivery.clone();
772 let ddb = dynamodb_state.clone();
773 let reg = registry.clone();
774 let state = shared_state.clone();
775 let arn = execution_arn.to_string();
776 let sem = semaphore.clone();
777
778 let item_input = if let Some(selector) = state_def.get("ItemSelector") {
780 let mut ctx = serde_json::Map::new();
781 ctx.insert("value".to_string(), batch_item.clone());
782 ctx.insert("index".to_string(), json!(index));
783 apply_parameters(selector, &Value::Object(ctx), None)
784 } else {
785 batch_item
786 };
787
788 add_event(
789 shared_state,
790 execution_arn,
791 "MapIterationStarted",
792 0,
793 json!({ "index": index }),
794 );
795
796 handles.push(tokio::spawn(async move {
797 let _permit = sem
798 .acquire()
799 .await
800 .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
801 let result =
802 run_states(&iter_def, item_input, &delivery, &ddb, ®, &state, &arn).await;
803 Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
804 }));
805 }
806
807 let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
809 for handle in handles {
810 let (index, result) = handle.await.map_err(|e| {
811 (
812 "States.Runtime".to_string(),
813 format!("Map iteration panicked: {e}"),
814 )
815 })??;
816
817 match result {
818 Ok(output) => {
819 add_event(
820 shared_state,
821 execution_arn,
822 "MapIterationSucceeded",
823 0,
824 json!({ "index": index }),
825 );
826 results.push((index, output));
827 }
828 Err((error, cause)) => {
829 add_event(
830 shared_state,
831 execution_arn,
832 "MapIterationFailed",
833 0,
834 json!({ "index": index, "error": error }),
835 );
836 failure_count += 1;
837 let failure_percentage = (failure_count as f64 / total_items) * 100.0;
838 if failure_percentage > tolerated_failure_percentage {
839 return Err((error, cause));
840 }
841 }
842 }
843 }
844
845 results.sort_by_key(|(i, _)| *i);
847 let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
848
849 if let Some(result_writer) = state_def.get("ResultWriter") {
851 write_map_results_to_s3(result_writer, registry, execution_arn, &map_output).await?;
852 }
853
854 let selected = if let Some(selector) = state_def.get("ResultSelector") {
856 apply_parameters(selector, &map_output, None)
857 } else {
858 map_output
859 };
860
861 let after_result = if result_path == Some("null") {
863 input.clone()
864 } else {
865 apply_result_path(input, &selected, result_path)
866 };
867
868 let output = if output_path == Some("null") {
870 json!({})
871 } else {
872 apply_output_path(&after_result, output_path)
873 };
874
875 Ok(output)
876}
877
878async fn read_items_from_s3(
880 item_reader: &Value,
881 registry: &Option<SharedServiceRegistry>,
882 execution_arn: &str,
883) -> Result<Vec<Value>, (String, String)> {
884 let resource = item_reader["Resource"]
885 .as_str()
886 .unwrap_or("arn:aws:states:::s3:getObject");
887 if !resource.contains("s3:getObject") {
888 return Err((
889 "States.Runtime".to_string(),
890 format!("ItemReader unsupported resource: {resource}"),
891 ));
892 }
893
894 let params = item_reader
895 .get("Parameters")
896 .cloned()
897 .unwrap_or_else(|| json!({}));
898 let bucket = params["Bucket"].as_str().ok_or_else(|| {
899 (
900 "States.Runtime".to_string(),
901 "ItemReader missing Bucket".to_string(),
902 )
903 })?;
904 let key = params["Key"].as_str().ok_or_else(|| {
905 (
906 "States.Runtime".to_string(),
907 "ItemReader missing Key".to_string(),
908 )
909 })?;
910
911 let registry_arc = resolve_registry(registry)?;
912 let account_id = account_from_execution_arn(execution_arn);
913
914 let body = call_sdk_action_raw_bytes(
915 ®istry_arc,
916 "s3",
917 "GetObject",
918 &json!({ "Bucket": bucket, "Key": key }),
919 &account_id,
920 )
921 .await?;
922
923 let parsed: Value = serde_json::from_slice(&body).map_err(|e| {
925 (
926 "States.Runtime".to_string(),
927 format!("ItemReader failed to parse S3 object as JSON: {e}"),
928 )
929 })?;
930
931 parsed.as_array().cloned().ok_or_else(|| {
932 (
933 "States.Runtime".to_string(),
934 "ItemReader S3 object is not a JSON array".to_string(),
935 )
936 })
937}
938
939async fn write_map_results_to_s3(
941 result_writer: &Value,
942 registry: &Option<SharedServiceRegistry>,
943 execution_arn: &str,
944 results: &Value,
945) -> Result<(), (String, String)> {
946 let resource = result_writer["Resource"]
947 .as_str()
948 .unwrap_or("arn:aws:states:::s3:putObject");
949 if !resource.contains("s3:putObject") {
950 return Err((
951 "States.Runtime".to_string(),
952 format!("ResultWriter unsupported resource: {resource}"),
953 ));
954 }
955
956 let params = result_writer
957 .get("Parameters")
958 .cloned()
959 .unwrap_or_else(|| json!({}));
960 let bucket = params["Bucket"].as_str().ok_or_else(|| {
961 (
962 "States.Runtime".to_string(),
963 "ResultWriter missing Bucket".to_string(),
964 )
965 })?;
966 let prefix = params["Prefix"].as_str().unwrap_or("map-results/");
967
968 let registry_arc = resolve_registry(registry)?;
969 let account_id = account_from_execution_arn(execution_arn);
970
971 use bytes::Bytes;
972 let body = Bytes::from(
973 serde_json::to_vec(results).expect("serde_json::Value serialization is infallible"),
974 );
975
976 use fakecloud_core::service::AwsRequest;
978 use http::{HeaderMap, Method};
979 let service = registry_arc.get("s3").ok_or_else(|| {
980 (
981 "States.TaskFailed".to_string(),
982 "S3 service not available for ResultWriter".to_string(),
983 )
984 })?;
985
986 let req = AwsRequest {
987 service: "s3".to_string(),
988 action: "PutObject".to_string(),
989 region: "us-east-1".to_string(),
990 account_id: account_id.to_string(),
991 request_id: uuid::Uuid::new_v4().to_string(),
992 headers: HeaderMap::new(),
993 query_params: std::collections::HashMap::new(),
994 body,
995 body_stream: parking_lot::Mutex::new(None),
996 path_segments: vec![bucket.to_string(), format!("{prefix}result.json")],
997 raw_path: format!("/{bucket}/{prefix}result.json"),
998 raw_query: String::new(),
999 method: Method::PUT,
1000 is_query_protocol: false,
1001 access_key_id: None,
1002 principal: None,
1003 };
1004
1005 service.handle(req).await.map_err(|err| {
1006 let code = err.code().to_string();
1007 let msg = err.message();
1008 (
1009 format!("S3.{code}"),
1010 format!("ResultWriter PutObject failed: {msg}"),
1011 )
1012 })?;
1013
1014 Ok(())
1015}
1016
1017fn apply_item_batcher(items: &[Value], batcher: &Value, _effective_input: &Value) -> Vec<Value> {
1019 let max_per_batch = batcher["MaxItemsPerBatch"].as_u64().unwrap_or(u64::MAX) as usize;
1020 let max_bytes = batcher["MaxInputBytesPerBatch"].as_u64().unwrap_or(0) as usize;
1021 let batch_input = batcher.get("BatchInput").cloned();
1022
1023 let mut batches: Vec<Vec<Value>> = Vec::new();
1024 let mut current_batch: Vec<Value> = Vec::new();
1025 let mut current_bytes = 0usize;
1026
1027 for item in items.iter().cloned() {
1028 let item_bytes = serde_json::to_vec(&item).unwrap_or_default().len();
1029 if !current_batch.is_empty()
1030 && (current_batch.len() >= max_per_batch
1031 || (max_bytes > 0 && current_bytes + item_bytes > max_bytes))
1032 {
1033 batches.push(current_batch);
1034 current_batch = Vec::new();
1035 current_bytes = 0;
1036 }
1037 current_bytes += item_bytes;
1038 current_batch.push(item);
1039 }
1040 if !current_batch.is_empty() {
1041 batches.push(current_batch);
1042 }
1043
1044 batches
1045 .into_iter()
1046 .enumerate()
1047 .map(|(index, batch)| {
1048 let mut map = serde_json::Map::new();
1049 map.insert("index".to_string(), json!(index));
1050 map.insert("items".to_string(), Value::Array(batch));
1051 if let Some(Value::Object(ref obj)) = batch_input {
1052 for (k, v) in obj {
1053 map.insert(k.clone(), v.clone());
1054 }
1055 }
1056 Value::Object(map)
1057 })
1058 .collect()
1059}
1060
1061#[allow(clippy::too_many_arguments)]
1063async fn invoke_resource(
1064 resource: &str,
1065 input: &Value,
1066 delivery: &Option<Arc<DeliveryBus>>,
1067 dynamodb_state: &Option<SharedDynamoDbState>,
1068 registry: &Option<SharedServiceRegistry>,
1069 execution_arn: &str,
1070 timeout_seconds: Option<u64>,
1071 heartbeat_seconds: Option<u64>,
1072 shared_state: &SharedStepFunctionsState,
1073) -> Result<Value, (String, String)> {
1074 if resource.contains(":states:") && resource.contains(":activity:") {
1076 return invoke_activity(
1077 resource,
1078 input,
1079 shared_state,
1080 timeout_seconds,
1081 heartbeat_seconds,
1082 )
1083 .await;
1084 }
1085
1086 if resource.contains(":lambda:") && resource.contains(":function:") {
1088 return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1089 }
1090
1091 if resource.starts_with("arn:aws:states:::lambda:invoke") {
1093 let function_name = input["FunctionName"].as_str().unwrap_or("");
1094 let payload = if let Some(p) = input.get("Payload") {
1095 p.clone()
1096 } else {
1097 input.clone()
1098 };
1099 return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds)
1109 .await
1110 .map(|payload| {
1111 json!({
1112 "ExecutedVersion": "$LATEST",
1113 "Payload": payload,
1114 "StatusCode": 200,
1115 })
1116 });
1117 }
1118
1119 if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1120 return invoke_sqs_send_message(input, delivery);
1121 }
1122
1123 if resource.starts_with("arn:aws:states:::sns:publish") {
1124 return invoke_sns_publish(input, delivery);
1125 }
1126
1127 if resource.starts_with("arn:aws:states:::events:putEvents") {
1128 return invoke_eventbridge_put_events(input, delivery);
1129 }
1130
1131 if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1132 return invoke_dynamodb_get_item(input, dynamodb_state);
1133 }
1134
1135 if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1136 return invoke_dynamodb_put_item(input, dynamodb_state);
1137 }
1138
1139 if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1140 return invoke_dynamodb_delete_item(input, dynamodb_state);
1141 }
1142
1143 if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1144 return invoke_dynamodb_update_item(input, dynamodb_state);
1145 }
1146
1147 if let Some(tail) = resource.strip_prefix("arn:aws:states:::") {
1155 if tail.starts_with("states:startExecution") {
1156 let account_id = account_from_execution_arn(execution_arn);
1157 let result =
1158 invoke_aws_sdk_integration(tail, input, registry, &account_id, timeout_seconds)
1159 .await;
1160 if let Ok(ref value) = result {
1166 if let Some(inner_arn) = value
1167 .get("executionArn")
1168 .or_else(|| value.get("ExecutionArn"))
1169 .and_then(Value::as_str)
1170 {
1171 let mut accounts = shared_state.write();
1172 if let Some(state) = accounts.get_mut(&account_id) {
1173 if let Some(exec) = state.executions.get_mut(inner_arn) {
1174 exec.parent_execution_arn = Some(execution_arn.to_string());
1175 }
1176 }
1177 }
1178 }
1179 return result;
1180 }
1181 }
1182
1183 if let Some(rest) = resource.strip_prefix("arn:aws:states:::aws-sdk:") {
1189 let account_id = account_from_execution_arn(execution_arn);
1190 return invoke_aws_sdk_integration(rest, input, registry, &account_id, timeout_seconds)
1191 .await;
1192 }
1193
1194 if let Some(tail) = resource.strip_prefix("arn:aws:states:::") {
1198 if tail.contains(".sync") {
1199 let account_id = account_from_execution_arn(execution_arn);
1200 return invoke_aws_sdk_integration(tail, input, registry, &account_id, timeout_seconds)
1201 .await;
1202 }
1203 }
1204
1205 Err((
1206 "States.TaskFailed".to_string(),
1207 format!("Unsupported resource: {resource}"),
1208 ))
1209}
1210
1211fn camel_to_pascal(action: &str) -> String {
1216 let mut chars = action.chars();
1217 match chars.next() {
1218 None => String::new(),
1219 Some(first) => first.to_ascii_uppercase().to_string() + chars.as_str(),
1220 }
1221}
1222
1223fn map_sdk_service_id(service_id: &str) -> &str {
1228 match service_id {
1229 "sfn" => "states",
1230 "cloudwatchlogs" => "logs",
1231 other => other,
1233 }
1234}
1235
1236fn account_from_execution_arn(execution_arn: &str) -> String {
1240 execution_arn
1241 .split(':')
1242 .nth(4)
1243 .filter(|s| !s.is_empty())
1244 .unwrap_or("123456789012")
1245 .to_string()
1246}
1247
1248async fn invoke_aws_sdk_integration(
1253 tail: &str,
1254 input: &Value,
1255 registry: &Option<SharedServiceRegistry>,
1256 account_id: &str,
1257 timeout_seconds: Option<u64>,
1258) -> Result<Value, (String, String)> {
1259 let registry_arc = resolve_registry(registry)?;
1260
1261 let mut parts = tail.splitn(2, ':');
1267 let service_id = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
1268 (
1269 "States.TaskFailed".to_string(),
1270 format!("Invalid aws-sdk Resource ARN: missing service in '{tail}'"),
1271 )
1272 })?;
1273 let action_with_mod = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
1274 (
1275 "States.TaskFailed".to_string(),
1276 format!("Invalid aws-sdk Resource ARN: missing action in '{tail}'"),
1277 )
1278 })?;
1279 let action_camel = action_with_mod
1280 .split('.')
1281 .next()
1282 .filter(|s| !s.is_empty())
1283 .ok_or_else(|| {
1284 (
1285 "States.TaskFailed".to_string(),
1286 format!("Invalid aws-sdk Resource ARN: empty action in '{tail}'"),
1287 )
1288 })?;
1289 let modifiers: Vec<&str> = action_with_mod.split('.').skip(1).collect();
1290 let is_sync = modifiers.iter().any(|m| *m == "sync" || *m == "sync:2");
1291
1292 let action_pascal = camel_to_pascal(action_camel);
1293 let service_name = map_sdk_service_id(service_id).to_string();
1294
1295 let translated_input = match service_name.as_str() {
1300 "ecs" => translate_ecs_keys_to_camel(input),
1301 _ => input.clone(),
1302 };
1303
1304 let initial = call_sdk_action(
1305 ®istry_arc,
1306 &service_name,
1307 &action_pascal,
1308 &translated_input,
1309 account_id,
1310 )
1311 .await?;
1312
1313 if !is_sync {
1314 return Ok(initial);
1315 }
1316
1317 sync_wait(
1322 ®istry_arc,
1323 &service_name,
1324 &action_pascal,
1325 &initial,
1326 &translated_input,
1327 account_id,
1328 timeout_seconds,
1329 )
1330 .await
1331}
1332
1333fn translate_ecs_keys_to_camel(input: &Value) -> Value {
1340 let Some(obj) = input.as_object() else {
1341 return input.clone();
1342 };
1343 let mut out = serde_json::Map::with_capacity(obj.len());
1344 for (k, v) in obj.iter() {
1345 let camel = match k.as_str() {
1346 "Cluster" => "cluster",
1347 "TaskDefinition" => "taskDefinition",
1348 "LaunchType" => "launchType",
1349 "Group" => "group",
1350 "Overrides" => "overrides",
1351 "PlatformVersion" => "platformVersion",
1352 "NetworkConfiguration" => "networkConfiguration",
1353 "Tags" => "tags",
1354 "EnableExecuteCommand" => "enableExecuteCommand",
1355 "PropagateTags" => "propagateTags",
1356 "ReferenceId" => "referenceId",
1357 "StartedBy" => "startedBy",
1358 "Count" => "count",
1359 "CapacityProviderStrategy" => "capacityProviderStrategy",
1360 "PlacementConstraints" => "placementConstraints",
1361 "PlacementStrategy" => "placementStrategy",
1362 other => other,
1363 };
1364 out.insert(camel.to_string(), v.clone());
1365 }
1366 Value::Object(out)
1367}
1368
1369fn resolve_registry(
1370 registry: &Option<SharedServiceRegistry>,
1371) -> Result<Arc<fakecloud_core::registry::ServiceRegistry>, (String, String)> {
1372 let registry_handle = registry.as_ref().ok_or_else(|| {
1373 (
1374 "States.TaskFailed".to_string(),
1375 "No service registry configured for aws-sdk integration".to_string(),
1376 )
1377 })?;
1378 registry_handle.get().cloned().ok_or_else(|| {
1379 (
1380 "States.TaskFailed".to_string(),
1381 "Service registry not yet initialised; aws-sdk integration unavailable".to_string(),
1382 )
1383 })
1384}
1385
1386async fn call_sdk_action(
1388 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1389 service_name: &str,
1390 action_pascal: &str,
1391 input: &Value,
1392 account_id: &str,
1393) -> Result<Value, (String, String)> {
1394 use bytes::Bytes;
1395 use fakecloud_core::service::AwsRequest;
1396 use http::{HeaderMap, Method};
1397
1398 let service = registry.get(service_name).ok_or_else(|| {
1399 (
1400 "States.TaskFailed".to_string(),
1401 format!("Unknown aws-sdk service '{service_name}'"),
1402 )
1403 })?;
1404
1405 let body_bytes = Bytes::from(
1406 serde_json::to_vec(input).expect("serde_json::Value serialization is infallible"),
1407 );
1408
1409 let req = AwsRequest {
1410 service: service_name.to_string(),
1411 action: action_pascal.to_string(),
1412 region: "us-east-1".to_string(),
1413 account_id: account_id.to_string(),
1414 request_id: uuid::Uuid::new_v4().to_string(),
1415 headers: HeaderMap::new(),
1416 query_params: std::collections::HashMap::new(),
1417 body: body_bytes,
1418 body_stream: parking_lot::Mutex::new(None),
1419 path_segments: vec![],
1420 raw_path: "/".to_string(),
1421 raw_query: String::new(),
1422 method: Method::POST,
1423 is_query_protocol: false,
1424 access_key_id: None,
1425 principal: None,
1426 };
1427
1428 let response = service.handle(req).await.map_err(|err| {
1429 let code = err.code().to_string();
1430 let msg = err.message();
1431 let prefix_service = match service_name {
1432 "dynamodb" => "DynamoDb".to_string(),
1433 "states" => "Sfn".to_string(),
1434 other => camel_to_pascal(other),
1435 };
1436 (
1437 format!("{prefix_service}.{code}"),
1438 format!("{action_pascal} failed: {msg}"),
1439 )
1440 })?;
1441
1442 let response_bytes = match response.body {
1443 fakecloud_core::service::ResponseBody::Bytes(b) => b,
1444 fakecloud_core::service::ResponseBody::File { .. } => {
1445 return Err((
1446 "States.TaskFailed".to_string(),
1447 "aws-sdk integration: file-backed response not supported".to_string(),
1448 ));
1449 }
1450 };
1451
1452 if response_bytes.is_empty() {
1453 return Ok(json!({}));
1454 }
1455 serde_json::from_slice(&response_bytes).map_err(|e| {
1456 (
1457 "States.TaskFailed".to_string(),
1458 format!("aws-sdk integration: failed to parse response JSON: {e}"),
1459 )
1460 })
1461}
1462
1463async fn call_sdk_action_raw_bytes(
1466 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1467 service_name: &str,
1468 action_pascal: &str,
1469 input: &Value,
1470 account_id: &str,
1471) -> Result<bytes::Bytes, (String, String)> {
1472 use bytes::Bytes;
1473 use fakecloud_core::service::AwsRequest;
1474 use http::{HeaderMap, Method};
1475
1476 let service = registry.get(service_name).ok_or_else(|| {
1477 (
1478 "States.TaskFailed".to_string(),
1479 format!("Unknown aws-sdk service '{service_name}'"),
1480 )
1481 })?;
1482
1483 let body_bytes = Bytes::from(
1484 serde_json::to_vec(input).expect("serde_json::Value serialization is infallible"),
1485 );
1486
1487 let req = AwsRequest {
1488 service: service_name.to_string(),
1489 action: action_pascal.to_string(),
1490 region: "us-east-1".to_string(),
1491 account_id: account_id.to_string(),
1492 request_id: uuid::Uuid::new_v4().to_string(),
1493 headers: HeaderMap::new(),
1494 query_params: std::collections::HashMap::new(),
1495 body: body_bytes,
1496 body_stream: parking_lot::Mutex::new(None),
1497 path_segments: vec![],
1498 raw_path: "/".to_string(),
1499 raw_query: String::new(),
1500 method: Method::POST,
1501 is_query_protocol: false,
1502 access_key_id: None,
1503 principal: None,
1504 };
1505
1506 let response = service.handle(req).await.map_err(|err| {
1507 let code = err.code().to_string();
1508 let msg = err.message();
1509 let prefix_service = match service_name {
1510 "dynamodb" => "DynamoDb".to_string(),
1511 "states" => "Sfn".to_string(),
1512 other => camel_to_pascal(other),
1513 };
1514 (
1515 format!("{prefix_service}.{code}"),
1516 format!("{action_pascal} failed: {msg}"),
1517 )
1518 })?;
1519
1520 match response.body {
1521 fakecloud_core::service::ResponseBody::Bytes(b) => Ok(b),
1522 fakecloud_core::service::ResponseBody::File { .. } => Err((
1523 "States.TaskFailed".to_string(),
1524 "aws-sdk integration: file-backed response not supported".to_string(),
1525 )),
1526 }
1527}
1528
1529const SYNC_DEFAULT_TIMEOUT_SECS: u64 = 300;
1535const SYNC_POLL_INTERVAL_MS: u64 = 200;
1536
1537async fn sync_wait(
1541 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1542 service_name: &str,
1543 action_pascal: &str,
1544 initial: &Value,
1545 input: &Value,
1546 account_id: &str,
1547 timeout_seconds: Option<u64>,
1548) -> Result<Value, (String, String)> {
1549 match (service_name, action_pascal) {
1550 ("ecs", "RunTask") => {
1551 sync_wait_ecs_run_task(registry, initial, input, account_id, timeout_seconds).await
1552 }
1553 ("athena", "StartQueryExecution") => {
1554 sync_wait_athena_query(registry, initial, account_id, timeout_seconds).await
1555 }
1556 ("states", "StartExecution") => {
1557 sync_wait_states_start_execution(registry, initial, account_id, timeout_seconds).await
1558 }
1559 ("glue", "StartJobRun") => {
1560 let job_run_id = initial
1566 .get("JobRunId")
1567 .and_then(Value::as_str)
1568 .unwrap_or_default()
1569 .to_string();
1570 let job_name = input
1571 .get("JobName")
1572 .and_then(Value::as_str)
1573 .unwrap_or_default()
1574 .to_string();
1575 let deadline = sync_deadline(timeout_seconds);
1576 loop {
1577 let described = call_sdk_action(
1578 registry,
1579 "glue",
1580 "GetJobRun",
1581 &json!({ "JobName": job_name, "RunId": job_run_id }),
1582 account_id,
1583 )
1584 .await?;
1585 let state = described
1586 .get("JobRun")
1587 .and_then(|r| r.get("JobRunState"))
1588 .and_then(Value::as_str)
1589 .unwrap_or("");
1590 match state {
1591 "SUCCEEDED" => return Ok(described),
1592 "FAILED" | "STOPPED" | "TIMEOUT" | "ERROR" => {
1593 return Err((
1594 "States.TaskFailed".to_string(),
1595 format!("Glue job run {job_run_id} ended in state {state}"),
1596 ));
1597 }
1598 _ => {}
1599 }
1600 if std::time::Instant::now() >= deadline {
1601 return Err((
1602 "States.Timeout".to_string(),
1603 format!(
1604 "glue:startJobRun.sync timed out after {}s for run {job_run_id}",
1605 sync_timeout_secs(timeout_seconds)
1606 ),
1607 ));
1608 }
1609 tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1610 }
1611 }
1612 _ => Err((
1613 "States.TaskFailed".to_string(),
1614 format!(
1615 "`.sync` is not supported for {service_name}:{action_pascal} yet — \
1616 supported: ecs:RunTask, athena:StartQueryExecution, glue:StartJobRun, states:StartExecution"
1617 ),
1618 )),
1619 }
1620}
1621
1622async fn sync_wait_ecs_run_task(
1623 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1624 initial: &Value,
1625 input: &Value,
1626 account_id: &str,
1627 timeout_seconds: Option<u64>,
1628) -> Result<Value, (String, String)> {
1629 let tasks = initial
1630 .get("tasks")
1631 .and_then(Value::as_array)
1632 .ok_or_else(|| {
1633 (
1634 "States.TaskFailed".to_string(),
1635 "ecs:RunTask.sync: response missing 'tasks' array".to_string(),
1636 )
1637 })?;
1638 if tasks.is_empty() {
1639 return Err((
1640 "States.TaskFailed".to_string(),
1641 "ecs:RunTask.sync: no tasks were started".to_string(),
1642 ));
1643 }
1644 let task_arns: Vec<String> = tasks
1645 .iter()
1646 .filter_map(|t| t.get("taskArn").and_then(Value::as_str).map(String::from))
1647 .collect();
1648 let cluster = input
1649 .get("cluster")
1650 .or_else(|| input.get("Cluster"))
1651 .and_then(Value::as_str)
1652 .map(String::from);
1653
1654 let deadline = sync_deadline(timeout_seconds);
1655 loop {
1656 let mut describe_input = json!({ "tasks": task_arns });
1657 if let Some(c) = &cluster {
1658 describe_input["cluster"] = json!(c);
1659 }
1660 let described = call_sdk_action(
1661 registry,
1662 "ecs",
1663 "DescribeTasks",
1664 &describe_input,
1665 account_id,
1666 )
1667 .await?;
1668 let described_tasks = described
1669 .get("tasks")
1670 .and_then(Value::as_array)
1671 .cloned()
1672 .unwrap_or_default();
1673 let all_stopped = !described_tasks.is_empty()
1674 && described_tasks
1675 .iter()
1676 .all(|t| t.get("lastStatus").and_then(Value::as_str) == Some("STOPPED"));
1677 if all_stopped {
1678 let any_failed = described_tasks.iter().any(|t| {
1683 let stop_code = t.get("stopCode").and_then(Value::as_str);
1684 let bad_stop = matches!(
1685 stop_code,
1686 Some(
1687 "TaskFailedToStart"
1688 | "EssentialContainerExited"
1689 | "ServiceSchedulerInitiated"
1690 )
1691 );
1692 let bad_exit = t
1693 .get("containers")
1694 .and_then(Value::as_array)
1695 .map(|cs| {
1696 cs.iter().any(|c| {
1697 c.get("exitCode")
1698 .and_then(Value::as_i64)
1699 .map(|n| n != 0)
1700 .unwrap_or(false)
1701 })
1702 })
1703 .unwrap_or(false);
1704 bad_stop || bad_exit
1705 });
1706 if any_failed {
1707 let cause = described_tasks
1708 .iter()
1709 .find_map(|t| {
1710 t.get("stoppedReason")
1711 .and_then(Value::as_str)
1712 .map(String::from)
1713 })
1714 .unwrap_or_else(|| "ECS task failed".to_string());
1715 return Err(("States.TaskFailed".to_string(), cause));
1716 }
1717 return Ok(described);
1718 }
1719 if std::time::Instant::now() >= deadline {
1720 return Err((
1721 "States.Timeout".to_string(),
1722 format!(
1723 "ecs:RunTask.sync timed out after {}s waiting for {} task(s) to STOP",
1724 sync_timeout_secs(timeout_seconds),
1725 task_arns.len()
1726 ),
1727 ));
1728 }
1729 tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1730 }
1731}
1732
1733async fn sync_wait_athena_query(
1734 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1735 initial: &Value,
1736 account_id: &str,
1737 timeout_seconds: Option<u64>,
1738) -> Result<Value, (String, String)> {
1739 let qid = initial
1740 .get("QueryExecutionId")
1741 .and_then(Value::as_str)
1742 .ok_or_else(|| {
1743 (
1744 "States.TaskFailed".to_string(),
1745 "athena:StartQueryExecution.sync: response missing QueryExecutionId".to_string(),
1746 )
1747 })?
1748 .to_string();
1749
1750 let deadline = sync_deadline(timeout_seconds);
1751 loop {
1752 let described = call_sdk_action(
1753 registry,
1754 "athena",
1755 "GetQueryExecution",
1756 &json!({ "QueryExecutionId": qid }),
1757 account_id,
1758 )
1759 .await?;
1760 let state = described
1761 .get("QueryExecution")
1762 .and_then(|qe| qe.get("Status"))
1763 .and_then(|s| s.get("State"))
1764 .and_then(Value::as_str)
1765 .unwrap_or("");
1766 match state {
1767 "SUCCEEDED" => return Ok(described),
1768 "FAILED" | "CANCELLED" => {
1769 let cause = described
1770 .get("QueryExecution")
1771 .and_then(|qe| qe.get("Status"))
1772 .and_then(|s| s.get("StateChangeReason"))
1773 .and_then(Value::as_str)
1774 .unwrap_or("Athena query reached terminal failure state")
1775 .to_string();
1776 return Err(("States.TaskFailed".to_string(), cause));
1777 }
1778 _ => {}
1779 }
1780 if std::time::Instant::now() >= deadline {
1781 return Err((
1782 "States.Timeout".to_string(),
1783 format!(
1784 "athena:StartQueryExecution.sync timed out after {}s for query {qid}",
1785 sync_timeout_secs(timeout_seconds)
1786 ),
1787 ));
1788 }
1789 tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1790 }
1791}
1792
1793async fn sync_wait_states_start_execution(
1799 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1800 initial: &Value,
1801 account_id: &str,
1802 timeout_seconds: Option<u64>,
1803) -> Result<Value, (String, String)> {
1804 let exec_arn = initial
1805 .get("executionArn")
1806 .or_else(|| initial.get("ExecutionArn"))
1807 .and_then(Value::as_str)
1808 .ok_or_else(|| {
1809 (
1810 "States.TaskFailed".to_string(),
1811 "states:startExecution.sync: response missing executionArn".to_string(),
1812 )
1813 })?
1814 .to_string();
1815
1816 let deadline = sync_deadline(timeout_seconds);
1817 loop {
1818 let described = call_sdk_action(
1819 registry,
1820 "states",
1821 "DescribeExecution",
1822 &json!({ "executionArn": exec_arn }),
1823 account_id,
1824 )
1825 .await?;
1826 let status = described
1827 .get("status")
1828 .or_else(|| described.get("Status"))
1829 .and_then(Value::as_str)
1830 .unwrap_or("");
1831 match status {
1832 "SUCCEEDED" => return Ok(described),
1833 "FAILED" | "TIMED_OUT" | "ABORTED" => {
1834 let cause = described
1835 .get("cause")
1836 .or_else(|| described.get("Cause"))
1837 .and_then(Value::as_str)
1838 .unwrap_or("Nested execution reached terminal failure state")
1839 .to_string();
1840 return Err(("States.TaskFailed".to_string(), cause));
1841 }
1842 _ => {}
1843 }
1844 if std::time::Instant::now() >= deadline {
1845 return Err((
1846 "States.Timeout".to_string(),
1847 format!(
1848 "states:startExecution.sync timed out after {}s for {exec_arn}",
1849 sync_timeout_secs(timeout_seconds)
1850 ),
1851 ));
1852 }
1853 tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1854 }
1855}
1856
1857fn sync_timeout_secs(timeout_seconds: Option<u64>) -> u64 {
1858 timeout_seconds.unwrap_or(SYNC_DEFAULT_TIMEOUT_SECS)
1859}
1860
1861fn sync_deadline(timeout_seconds: Option<u64>) -> std::time::Instant {
1862 std::time::Instant::now() + std::time::Duration::from_secs(sync_timeout_secs(timeout_seconds))
1863}
1864
1865#[derive(Clone, Copy)]
1866pub(crate) enum UpdateClause {
1867 Set,
1868 Remove,
1869 Add,
1870 Delete,
1871}
1872
1873async fn invoke_lambda_direct(
1875 function_arn: &str,
1876 input: &Value,
1877 delivery: &Option<Arc<DeliveryBus>>,
1878 timeout_seconds: Option<u64>,
1879) -> Result<Value, (String, String)> {
1880 let delivery = delivery.as_ref().ok_or_else(|| {
1881 (
1882 "States.TaskFailed".to_string(),
1883 "No delivery bus configured for Lambda invocation".to_string(),
1884 )
1885 })?;
1886
1887 let payload =
1888 serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1889
1890 let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1891
1892 let result = if let Some(timeout) = timeout_seconds {
1893 match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1894 Ok(r) => r,
1895 Err(_) => {
1896 return Err((
1897 "States.Timeout".to_string(),
1898 format!("Task timed out after {timeout} seconds"),
1899 ));
1900 }
1901 }
1902 } else {
1903 invoke_future.await
1904 };
1905
1906 match result {
1907 Some(Ok(bytes)) => {
1908 let response_str = String::from_utf8_lossy(&bytes);
1909 let value: Value =
1910 serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1911
1912 if let Some(obj) = value.as_object() {
1926 if obj.contains_key("errorType") && obj.contains_key("errorMessage") {
1927 let error_type = obj
1928 .get("errorType")
1929 .and_then(Value::as_str)
1930 .unwrap_or("Exception")
1931 .to_string();
1932 let cause = serde_json::to_string(&value)
1933 .expect("serde_json::Value serialization is infallible");
1934 return Err((error_type, cause));
1935 }
1936 }
1937
1938 Ok(value)
1939 }
1940 Some(Err(e)) => {
1941 if e.starts_with("Function not found") {
1951 Err(("Lambda.ResourceNotFoundException".to_string(), e))
1952 } else {
1953 Err(("Lambda.Unknown".to_string(), e))
1954 }
1955 }
1956 None => {
1957 Ok(json!({}))
1959 }
1960 }
1961}
1962
1963async fn invoke_activity(
1968 activity_arn: &str,
1969 input: &Value,
1970 shared_state: &SharedStepFunctionsState,
1971 timeout_seconds: Option<u64>,
1972 heartbeat_seconds: Option<u64>,
1973) -> Result<Value, (String, String)> {
1974 use crate::state::TaskTokenState;
1975
1976 let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
1978 {
1979 let accounts = shared_state.read();
1980 let exists = accounts
1981 .get(&activity_account)
1982 .map(|s| s.activities.contains_key(activity_arn))
1983 .unwrap_or(false);
1984 if !exists {
1985 return Err((
1986 "States.TaskFailed".to_string(),
1987 format!("Activity does not exist: {activity_arn}"),
1988 ));
1989 }
1990 }
1991
1992 let token = format!(
1993 "FCToken-{}-{}",
1994 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
1995 uuid::Uuid::new_v4().simple(),
1996 );
1997 let now = chrono::Utc::now();
1998 let input_str =
1999 serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
2000 {
2001 let mut accounts = shared_state.write();
2002 let state = accounts.get_or_create(&activity_account);
2003 state.task_tokens.insert(
2004 token.clone(),
2005 TaskTokenState {
2006 activity_arn: activity_arn.to_string(),
2007 status: "PENDING".to_string(),
2008 output: None,
2009 error: None,
2010 cause: None,
2011 input: Some(input_str),
2012 created_at: now,
2013 last_heartbeat_at: None,
2014 heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
2015 timeout_seconds: timeout_seconds.map(|s| s as i64),
2016 },
2017 );
2018 }
2019
2020 poll_task_token(
2021 shared_state,
2022 &activity_account,
2023 &token,
2024 timeout_seconds,
2025 heartbeat_seconds,
2026 )
2027 .await
2028}
2029
2030pub(crate) enum NextState {
2031 Name(String),
2032 End,
2033 Error(String),
2034}
2035
2036#[path = "interpreter_helpers.rs"]
2037mod interpreter_helpers;
2038pub(crate) use interpreter_helpers::*;
2039
2040#[cfg(test)]
2041#[path = "interpreter_tests.rs"]
2042mod tests;