1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::service::SharedServiceRegistry;
16use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
17
18#[allow(clippy::too_many_arguments)]
21pub async fn execute_state_machine(
22 state: SharedStepFunctionsState,
23 execution_arn: String,
24 definition: String,
25 input: Option<String>,
26 delivery: Option<Arc<DeliveryBus>>,
27 dynamodb_state: Option<SharedDynamoDbState>,
28 registry: Option<SharedServiceRegistry>,
29 logging_configuration: Option<Value>,
30) {
31 let def: Value = match serde_json::from_str(&definition) {
32 Ok(v) => v,
33 Err(e) => {
34 fail_execution(
35 &state,
36 &execution_arn,
37 "States.Runtime",
38 &format!("Failed to parse definition: {e}"),
39 );
40 return;
41 }
42 };
43
44 let raw_input: Value = input
45 .as_deref()
46 .and_then(|s| serde_json::from_str(s).ok())
47 .unwrap_or(json!({}));
48
49 add_event(
51 &state,
52 &execution_arn,
53 "ExecutionStarted",
54 0,
55 json!({
56 "input": serde_json::to_string(&raw_input).expect("serde_json::Value serialization is infallible"),
57 "roleArn": "arn:aws:iam::123456789012:role/execution-role"
58 }),
59 );
60
61 let def_owned = def;
67 let state_clone = state.clone();
68 let execution_arn_clone = execution_arn.clone();
69 let delivery_clone = delivery.clone();
70 let dynamodb_state_clone = dynamodb_state.clone();
71 let registry_clone = registry.clone();
72 let handle = tokio::spawn(async move {
73 run_states(
74 &def_owned,
75 raw_input,
76 &delivery_clone,
77 &dynamodb_state_clone,
78 ®istry_clone,
79 &state_clone,
80 &execution_arn_clone,
81 )
82 .await
83 });
84
85 match handle.await {
86 Ok(Ok(output)) => {
87 succeed_execution(&state, &execution_arn, &output);
88 }
89 Ok(Err((error, cause))) => {
90 fail_execution(&state, &execution_arn, &error, &cause);
91 }
92 Err(join_err) => {
93 let msg = if join_err.is_panic() {
94 let payload = join_err.into_panic();
95 if let Some(s) = payload.downcast_ref::<String>() {
96 s.clone()
97 } else if let Some(s) = payload.downcast_ref::<&'static str>() {
98 (*s).to_string()
99 } else {
100 "execution task panicked".to_string()
101 }
102 } else {
103 format!("execution task cancelled: {join_err}")
104 };
105 tracing::error!(
106 execution_arn = %execution_arn,
107 panic = %msg,
108 "Step Functions execution panicked"
109 );
110 fail_execution(&state, &execution_arn, "States.Runtime", &msg);
111 }
112 }
113
114 deliver_execution_logs(
116 &state,
117 &execution_arn,
118 delivery.as_ref(),
119 logging_configuration.as_ref(),
120 );
121}
122
123type StatesResult<'a> = std::pin::Pin<
124 Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
125>;
126
127pub(crate) enum Advance {
129 Next(String, Value),
131 End(Value),
133 Fail(String, String),
135}
136
137async fn run_wait_state(
138 name: &str,
139 state_def: &Value,
140 input: Value,
141 shared_state: &SharedStepFunctionsState,
142 execution_arn: &str,
143) -> Advance {
144 let entered_event_id = add_event(
145 shared_state,
146 execution_arn,
147 "WaitStateEntered",
148 0,
149 json!({
150 "name": name,
151 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
152 }),
153 );
154
155 execute_wait_state(state_def, &input).await;
156
157 add_event(
158 shared_state,
159 execution_arn,
160 "WaitStateExited",
161 entered_event_id,
162 json!({
163 "name": name,
164 "output": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
165 }),
166 );
167
168 advance_from_next(state_def, input)
169}
170
171#[allow(clippy::too_many_arguments)]
172async fn run_task_state(
173 name: &str,
174 state_def: &Value,
175 input: Value,
176 delivery: &Option<Arc<DeliveryBus>>,
177 dynamodb_state: &Option<SharedDynamoDbState>,
178 registry: &Option<SharedServiceRegistry>,
179 shared_state: &SharedStepFunctionsState,
180 execution_arn: &str,
181) -> Advance {
182 let entered_event_id = add_event(
183 shared_state,
184 execution_arn,
185 "TaskStateEntered",
186 0,
187 json!({
188 "name": name,
189 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
190 }),
191 );
192
193 let result = execute_task_state(
194 name,
195 state_def,
196 &input,
197 delivery,
198 dynamodb_state,
199 registry,
200 shared_state,
201 execution_arn,
202 entered_event_id,
203 )
204 .await;
205
206 match result {
207 Ok(output) => {
208 add_event(
209 shared_state,
210 execution_arn,
211 "TaskStateExited",
212 entered_event_id,
213 json!({
214 "name": name,
215 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
216 }),
217 );
218 advance_from_next(state_def, output)
219 }
220 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
221 }
222}
223
224#[allow(clippy::too_many_arguments)]
225async fn run_parallel_state(
226 name: &str,
227 state_def: &Value,
228 input: Value,
229 delivery: &Option<Arc<DeliveryBus>>,
230 dynamodb_state: &Option<SharedDynamoDbState>,
231 registry: &Option<SharedServiceRegistry>,
232 shared_state: &SharedStepFunctionsState,
233 execution_arn: &str,
234) -> Advance {
235 let entered_event_id = add_event(
236 shared_state,
237 execution_arn,
238 "ParallelStateEntered",
239 0,
240 json!({
241 "name": name,
242 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
243 }),
244 );
245
246 let result = execute_parallel_state(
247 state_def,
248 &input,
249 delivery,
250 dynamodb_state,
251 registry,
252 shared_state,
253 execution_arn,
254 )
255 .await;
256
257 match result {
258 Ok(output) => {
259 add_event(
260 shared_state,
261 execution_arn,
262 "ParallelStateExited",
263 entered_event_id,
264 json!({
265 "name": name,
266 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
267 }),
268 );
269 advance_from_next(state_def, output)
270 }
271 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
272 }
273}
274
275#[allow(clippy::too_many_arguments)]
276async fn run_map_state(
277 name: &str,
278 state_def: &Value,
279 input: Value,
280 delivery: &Option<Arc<DeliveryBus>>,
281 dynamodb_state: &Option<SharedDynamoDbState>,
282 registry: &Option<SharedServiceRegistry>,
283 shared_state: &SharedStepFunctionsState,
284 execution_arn: &str,
285) -> Advance {
286 let entered_event_id = add_event(
287 shared_state,
288 execution_arn,
289 "MapStateEntered",
290 0,
291 json!({
292 "name": name,
293 "input": serde_json::to_string(&input).expect("serde_json::Value serialization is infallible"),
294 }),
295 );
296
297 let result = execute_map_state(
298 state_def,
299 &input,
300 delivery,
301 dynamodb_state,
302 registry,
303 shared_state,
304 execution_arn,
305 )
306 .await;
307
308 match result {
309 Ok(output) => {
310 add_event(
311 shared_state,
312 execution_arn,
313 "MapStateExited",
314 entered_event_id,
315 json!({
316 "name": name,
317 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
318 }),
319 );
320 advance_from_next(state_def, output)
321 }
322 Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
323 }
324}
325
326async fn execute_wait_state(state_def: &Value, input: &Value) {
328 if let Some(seconds) = state_def["Seconds"].as_u64() {
329 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
330 return;
331 }
332
333 if let Some(path) = state_def["SecondsPath"].as_str() {
334 let val = crate::io_processing::resolve_path(input, path);
335 if let Some(seconds) = val.as_u64() {
336 tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
337 }
338 return;
339 }
340
341 if let Some(ts_str) = state_def["Timestamp"].as_str() {
342 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
343 let now = Utc::now();
344 let target_utc = target.with_timezone(&chrono::Utc);
345 if target_utc > now {
346 let duration = (target_utc - now).to_std().unwrap_or_default();
347 tokio::time::sleep(duration).await;
348 }
349 }
350 return;
351 }
352
353 if let Some(path) = state_def["TimestampPath"].as_str() {
354 let val = crate::io_processing::resolve_path(input, path);
355 if let Some(ts_str) = val.as_str() {
356 if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
357 let now = Utc::now();
358 let target_utc = target.with_timezone(&chrono::Utc);
359 if target_utc > now {
360 let duration = (target_utc - now).to_std().unwrap_or_default();
361 tokio::time::sleep(duration).await;
362 }
363 }
364 }
365 return;
366 }
367
368 warn!(
369 "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
370 );
371}
372
373#[allow(clippy::too_many_arguments)]
376async fn execute_task_state(
377 name: &str,
378 state_def: &Value,
379 input: &Value,
380 delivery: &Option<Arc<DeliveryBus>>,
381 dynamodb_state: &Option<SharedDynamoDbState>,
382 registry: &Option<SharedServiceRegistry>,
383 shared_state: &SharedStepFunctionsState,
384 execution_arn: &str,
385 entered_event_id: i64,
386) -> Result<Value, (String, String)> {
387 let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
388
389 let input_path = state_def["InputPath"].as_str();
390 let result_path = state_def["ResultPath"].as_str();
391 let output_path = state_def["OutputPath"].as_str();
392
393 let effective_input = if input_path == Some("null") {
394 json!({})
395 } else {
396 apply_input_path(input, input_path)
397 };
398
399 let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
400 let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
401 let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
402 let mut attempt = 0u32;
403
404 let is_wait_for_task_token = resource.contains(".waitForTaskToken");
405 let task_token = if is_wait_for_task_token {
406 let token = format!(
407 "FCToken-{}-{}",
408 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
409 uuid::Uuid::new_v4().simple(),
410 );
411 let account_id = account_id_from_arn(execution_arn);
412 let context = json!({
413 "Task": { "Token": token.clone() },
414 "Execution": { "Id": execution_arn },
415 "State": { "Name": name },
416 });
417 {
418 let mut accounts = shared_state.write();
419 let state = accounts.get_or_create(account_id);
420 state.task_tokens.insert(
421 token.clone(),
422 crate::state::TaskTokenState {
423 activity_arn: String::new(),
424 status: "PENDING".to_string(),
425 output: None,
426 error: None,
427 cause: None,
428 input: None,
429 created_at: chrono::Utc::now(),
430 last_heartbeat_at: None,
431 heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
432 timeout_seconds: timeout_seconds.map(|s| s as i64),
433 },
434 );
435 }
436 Some((token, context))
437 } else {
438 None
439 };
440
441 let task_input = if let Some(params) = state_def.get("Parameters") {
442 if let Some((_, ctx)) = &task_token {
443 apply_parameters(params, &effective_input, Some(ctx))
444 } else {
445 apply_parameters(params, &effective_input, None)
446 }
447 } else {
448 effective_input
449 };
450
451 loop {
452 add_event(
453 shared_state,
454 execution_arn,
455 "TaskScheduled",
456 entered_event_id,
457 json!({
458 "resource": resource,
459 "region": "us-east-1",
460 "parameters": serde_json::to_string(&task_input).expect("serde_json::Value serialization is infallible"),
461 }),
462 );
463
464 add_event(
465 shared_state,
466 execution_arn,
467 "TaskStarted",
468 entered_event_id,
469 json!({ "resource": resource }),
470 );
471
472 let invoke_result = invoke_resource(
473 &resource,
474 &task_input,
475 delivery,
476 dynamodb_state,
477 registry,
478 execution_arn,
479 timeout_seconds,
480 heartbeat_seconds,
481 shared_state,
482 )
483 .await;
484
485 match invoke_result {
486 Ok(result) => {
487 if let Some((token, _)) = &task_token {
488 let account_id = account_id_from_arn(execution_arn);
489 match poll_task_token(
490 shared_state,
491 account_id,
492 token,
493 timeout_seconds,
494 heartbeat_seconds,
495 )
496 .await
497 {
498 Ok(output) => {
499 add_event(
500 shared_state,
501 execution_arn,
502 "TaskSucceeded",
503 entered_event_id,
504 json!({
505 "resource": resource,
506 "output": serde_json::to_string(&output).expect("serde_json::Value serialization is infallible"),
507 }),
508 );
509
510 let selected = if let Some(selector) = state_def.get("ResultSelector") {
511 apply_parameters(selector, &output, None)
512 } else {
513 output
514 };
515
516 let after_result = if result_path == Some("null") {
517 input.clone()
518 } else {
519 apply_result_path(input, &selected, result_path)
520 };
521
522 let output = if output_path == Some("null") {
523 json!({})
524 } else {
525 apply_output_path(&after_result, output_path)
526 };
527
528 return Ok(output);
529 }
530 Err((error, cause)) => {
531 add_event(
532 shared_state,
533 execution_arn,
534 "TaskFailed",
535 entered_event_id,
536 json!({ "error": error, "cause": cause }),
537 );
538
539 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
540 attempt += 1;
541 let actual_delay = delay_ms.min(5000);
542 tokio::time::sleep(tokio::time::Duration::from_millis(
543 actual_delay,
544 ))
545 .await;
546 continue;
547 }
548
549 return Err((error, cause));
550 }
551 }
552 }
553
554 add_event(
555 shared_state,
556 execution_arn,
557 "TaskSucceeded",
558 entered_event_id,
559 json!({
560 "resource": resource,
561 "output": serde_json::to_string(&result).expect("serde_json::Value serialization is infallible"),
562 }),
563 );
564
565 let selected = if let Some(selector) = state_def.get("ResultSelector") {
566 apply_parameters(selector, &result, None)
567 } else {
568 result
569 };
570
571 let after_result = if result_path == Some("null") {
572 input.clone()
573 } else {
574 apply_result_path(input, &selected, result_path)
575 };
576
577 let output = if output_path == Some("null") {
578 json!({})
579 } else {
580 apply_output_path(&after_result, output_path)
581 };
582
583 return Ok(output);
584 }
585 Err((error, cause)) => {
586 add_event(
587 shared_state,
588 execution_arn,
589 "TaskFailed",
590 entered_event_id,
591 json!({ "error": error, "cause": cause }),
592 );
593
594 if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
595 attempt += 1;
596 let actual_delay = delay_ms.min(5000);
597 tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
598 continue;
599 }
600
601 return Err((error, cause));
602 }
603 }
604 }
605}
606
607async fn execute_parallel_state(
609 state_def: &Value,
610 input: &Value,
611 delivery: &Option<Arc<DeliveryBus>>,
612 dynamodb_state: &Option<SharedDynamoDbState>,
613 registry: &Option<SharedServiceRegistry>,
614 shared_state: &SharedStepFunctionsState,
615 execution_arn: &str,
616) -> Result<Value, (String, String)> {
617 let input_path = state_def["InputPath"].as_str();
618 let result_path = state_def["ResultPath"].as_str();
619 let output_path = state_def["OutputPath"].as_str();
620
621 let effective_input = if input_path == Some("null") {
622 json!({})
623 } else {
624 apply_input_path(input, input_path)
625 };
626
627 let branches = state_def["Branches"]
628 .as_array()
629 .cloned()
630 .unwrap_or_default();
631
632 if branches.is_empty() {
633 return Err((
634 "States.Runtime".to_string(),
635 "Parallel state has no Branches".to_string(),
636 ));
637 }
638
639 let mut handles = Vec::new();
641 for branch_def in &branches {
642 let branch = branch_def.clone();
643 let branch_input = effective_input.clone();
644 let delivery = delivery.clone();
645 let ddb = dynamodb_state.clone();
646 let reg = registry.clone();
647 let state = shared_state.clone();
648 let arn = execution_arn.to_string();
649
650 handles.push(tokio::spawn(async move {
651 run_states(&branch, branch_input, &delivery, &ddb, ®, &state, &arn).await
652 }));
653 }
654
655 let mut results = Vec::with_capacity(handles.len());
657 for handle in handles {
658 let result = handle.await.map_err(|e| {
659 (
660 "States.Runtime".to_string(),
661 format!("Branch execution panicked: {e}"),
662 )
663 })??;
664 results.push(result);
665 }
666
667 let branch_output = Value::Array(results);
668
669 let selected = if let Some(selector) = state_def.get("ResultSelector") {
671 apply_parameters(selector, &branch_output, None)
672 } else {
673 branch_output
674 };
675
676 let after_result = if result_path == Some("null") {
678 input.clone()
679 } else {
680 apply_result_path(input, &selected, result_path)
681 };
682
683 let output = if output_path == Some("null") {
685 json!({})
686 } else {
687 apply_output_path(&after_result, output_path)
688 };
689
690 Ok(output)
691}
692
693#[allow(clippy::too_many_arguments)]
697async fn execute_map_state(
698 state_def: &Value,
699 input: &Value,
700 delivery: &Option<Arc<DeliveryBus>>,
701 dynamodb_state: &Option<SharedDynamoDbState>,
702 registry: &Option<SharedServiceRegistry>,
703 shared_state: &SharedStepFunctionsState,
704 execution_arn: &str,
705) -> Result<Value, (String, String)> {
706 let input_path = state_def["InputPath"].as_str();
707 let result_path = state_def["ResultPath"].as_str();
708 let output_path = state_def["OutputPath"].as_str();
709
710 let effective_input = if input_path == Some("null") {
711 json!({})
712 } else {
713 apply_input_path(input, input_path)
714 };
715
716 let max_concurrency = if let Some(path) = state_def["MaxConcurrencyPath"].as_str() {
718 crate::io_processing::resolve_path(&effective_input, path)
719 .as_u64()
720 .unwrap_or(0)
721 } else {
722 state_def["MaxConcurrency"].as_u64().unwrap_or(0)
723 };
724 let effective_concurrency = if max_concurrency == 0 {
725 40
726 } else {
727 max_concurrency as usize
728 };
729
730 let items = if let Some(item_reader) = state_def.get("ItemReader") {
732 read_items_from_s3(item_reader, registry, execution_arn).await?
733 } else {
734 let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
735 let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
736 items_value.as_array().cloned().unwrap_or_default()
737 };
738
739 let batch_config = state_def.get("ItemBatcher").cloned();
741 let batched_items = if let Some(ref batcher) = batch_config {
742 apply_item_batcher(&items, batcher, &effective_input)
743 } else {
744 items
745 };
746
747 let iterator_def = state_def
749 .get("ItemProcessor")
750 .or_else(|| state_def.get("Iterator"))
751 .cloned()
752 .ok_or_else(|| {
753 (
754 "States.Runtime".to_string(),
755 "Map state has no ItemProcessor or Iterator".to_string(),
756 )
757 })?;
758
759 let tolerated_failure_percentage = state_def["ToleratedFailurePercentage"]
760 .as_f64()
761 .unwrap_or(0.0);
762 let total_items = batched_items.len() as f64;
763 let mut failure_count = 0usize;
764
765 let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
766
767 let mut handles = Vec::new();
769 for (index, batch_item) in batched_items.into_iter().enumerate() {
770 let iter_def = iterator_def.clone();
771 let delivery = delivery.clone();
772 let ddb = dynamodb_state.clone();
773 let reg = registry.clone();
774 let state = shared_state.clone();
775 let arn = execution_arn.to_string();
776 let sem = semaphore.clone();
777
778 let item_input = if let Some(selector) = state_def.get("ItemSelector") {
780 let mut ctx = serde_json::Map::new();
781 ctx.insert("value".to_string(), batch_item.clone());
782 ctx.insert("index".to_string(), json!(index));
783 apply_parameters(selector, &Value::Object(ctx), None)
784 } else {
785 batch_item
786 };
787
788 add_event(
789 shared_state,
790 execution_arn,
791 "MapIterationStarted",
792 0,
793 json!({ "index": index }),
794 );
795
796 handles.push(tokio::spawn(async move {
797 let _permit = sem
798 .acquire()
799 .await
800 .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
801 let result =
802 run_states(&iter_def, item_input, &delivery, &ddb, ®, &state, &arn).await;
803 Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
804 }));
805 }
806
807 let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
809 for handle in handles {
810 let (index, result) = handle.await.map_err(|e| {
811 (
812 "States.Runtime".to_string(),
813 format!("Map iteration panicked: {e}"),
814 )
815 })??;
816
817 match result {
818 Ok(output) => {
819 add_event(
820 shared_state,
821 execution_arn,
822 "MapIterationSucceeded",
823 0,
824 json!({ "index": index }),
825 );
826 results.push((index, output));
827 }
828 Err((error, cause)) => {
829 add_event(
830 shared_state,
831 execution_arn,
832 "MapIterationFailed",
833 0,
834 json!({ "index": index, "error": error }),
835 );
836 failure_count += 1;
837 let failure_percentage = (failure_count as f64 / total_items) * 100.0;
838 if failure_percentage > tolerated_failure_percentage {
839 return Err((error, cause));
840 }
841 }
842 }
843 }
844
845 results.sort_by_key(|(i, _)| *i);
847 let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
848
849 if let Some(result_writer) = state_def.get("ResultWriter") {
851 write_map_results_to_s3(result_writer, registry, execution_arn, &map_output).await?;
852 }
853
854 let selected = if let Some(selector) = state_def.get("ResultSelector") {
856 apply_parameters(selector, &map_output, None)
857 } else {
858 map_output
859 };
860
861 let after_result = if result_path == Some("null") {
863 input.clone()
864 } else {
865 apply_result_path(input, &selected, result_path)
866 };
867
868 let output = if output_path == Some("null") {
870 json!({})
871 } else {
872 apply_output_path(&after_result, output_path)
873 };
874
875 Ok(output)
876}
877
878async fn read_items_from_s3(
880 item_reader: &Value,
881 registry: &Option<SharedServiceRegistry>,
882 execution_arn: &str,
883) -> Result<Vec<Value>, (String, String)> {
884 let resource = item_reader["Resource"]
885 .as_str()
886 .unwrap_or("arn:aws:states:::s3:getObject");
887 if !resource.contains("s3:getObject") {
888 return Err((
889 "States.Runtime".to_string(),
890 format!("ItemReader unsupported resource: {resource}"),
891 ));
892 }
893
894 let params = item_reader
895 .get("Parameters")
896 .cloned()
897 .unwrap_or_else(|| json!({}));
898 let bucket = params["Bucket"].as_str().ok_or_else(|| {
899 (
900 "States.Runtime".to_string(),
901 "ItemReader missing Bucket".to_string(),
902 )
903 })?;
904 let key = params["Key"].as_str().ok_or_else(|| {
905 (
906 "States.Runtime".to_string(),
907 "ItemReader missing Key".to_string(),
908 )
909 })?;
910
911 let registry_arc = resolve_registry(registry)?;
912 let account_id = account_from_execution_arn(execution_arn);
913
914 let body = call_sdk_action_raw_bytes(
915 ®istry_arc,
916 "s3",
917 "GetObject",
918 &json!({ "Bucket": bucket, "Key": key }),
919 &account_id,
920 )
921 .await?;
922
923 let parsed: Value = serde_json::from_slice(&body).map_err(|e| {
925 (
926 "States.Runtime".to_string(),
927 format!("ItemReader failed to parse S3 object as JSON: {e}"),
928 )
929 })?;
930
931 parsed.as_array().cloned().ok_or_else(|| {
932 (
933 "States.Runtime".to_string(),
934 "ItemReader S3 object is not a JSON array".to_string(),
935 )
936 })
937}
938
939async fn write_map_results_to_s3(
941 result_writer: &Value,
942 registry: &Option<SharedServiceRegistry>,
943 execution_arn: &str,
944 results: &Value,
945) -> Result<(), (String, String)> {
946 let resource = result_writer["Resource"]
947 .as_str()
948 .unwrap_or("arn:aws:states:::s3:putObject");
949 if !resource.contains("s3:putObject") {
950 return Err((
951 "States.Runtime".to_string(),
952 format!("ResultWriter unsupported resource: {resource}"),
953 ));
954 }
955
956 let params = result_writer
957 .get("Parameters")
958 .cloned()
959 .unwrap_or_else(|| json!({}));
960 let bucket = params["Bucket"].as_str().ok_or_else(|| {
961 (
962 "States.Runtime".to_string(),
963 "ResultWriter missing Bucket".to_string(),
964 )
965 })?;
966 let prefix = params["Prefix"].as_str().unwrap_or("map-results/");
967
968 let registry_arc = resolve_registry(registry)?;
969 let account_id = account_from_execution_arn(execution_arn);
970
971 use bytes::Bytes;
972 let body = Bytes::from(
973 serde_json::to_vec(results).expect("serde_json::Value serialization is infallible"),
974 );
975
976 use fakecloud_core::service::AwsRequest;
978 use http::{HeaderMap, Method};
979 let service = registry_arc.get("s3").ok_or_else(|| {
980 (
981 "States.TaskFailed".to_string(),
982 "S3 service not available for ResultWriter".to_string(),
983 )
984 })?;
985
986 let req = AwsRequest {
987 service: "s3".to_string(),
988 action: "PutObject".to_string(),
989 region: "us-east-1".to_string(),
990 account_id: account_id.to_string(),
991 request_id: uuid::Uuid::new_v4().to_string(),
992 headers: HeaderMap::new(),
993 query_params: std::collections::HashMap::new(),
994 body,
995 body_stream: parking_lot::Mutex::new(None),
996 path_segments: vec![bucket.to_string(), format!("{prefix}result.json")],
997 raw_path: format!("/{bucket}/{prefix}result.json"),
998 raw_query: String::new(),
999 method: Method::PUT,
1000 is_query_protocol: false,
1001 access_key_id: None,
1002 principal: None,
1003 };
1004
1005 service.handle(req).await.map_err(|err| {
1006 let code = err.code().to_string();
1007 let msg = err.message();
1008 (
1009 format!("S3.{code}"),
1010 format!("ResultWriter PutObject failed: {msg}"),
1011 )
1012 })?;
1013
1014 Ok(())
1015}
1016
1017fn apply_item_batcher(items: &[Value], batcher: &Value, _effective_input: &Value) -> Vec<Value> {
1019 let max_per_batch = batcher["MaxItemsPerBatch"].as_u64().unwrap_or(u64::MAX) as usize;
1020 let max_bytes = batcher["MaxInputBytesPerBatch"].as_u64().unwrap_or(0) as usize;
1021 let batch_input = batcher.get("BatchInput").cloned();
1022
1023 let mut batches: Vec<Vec<Value>> = Vec::new();
1024 let mut current_batch: Vec<Value> = Vec::new();
1025 let mut current_bytes = 0usize;
1026
1027 for item in items.iter().cloned() {
1028 let item_bytes = serde_json::to_vec(&item).unwrap_or_default().len();
1029 if !current_batch.is_empty()
1030 && (current_batch.len() >= max_per_batch
1031 || (max_bytes > 0 && current_bytes + item_bytes > max_bytes))
1032 {
1033 batches.push(current_batch);
1034 current_batch = Vec::new();
1035 current_bytes = 0;
1036 }
1037 current_bytes += item_bytes;
1038 current_batch.push(item);
1039 }
1040 if !current_batch.is_empty() {
1041 batches.push(current_batch);
1042 }
1043
1044 batches
1045 .into_iter()
1046 .enumerate()
1047 .map(|(index, batch)| {
1048 let mut map = serde_json::Map::new();
1049 map.insert("index".to_string(), json!(index));
1050 map.insert("items".to_string(), Value::Array(batch));
1051 if let Some(Value::Object(ref obj)) = batch_input {
1052 for (k, v) in obj {
1053 map.insert(k.clone(), v.clone());
1054 }
1055 }
1056 Value::Object(map)
1057 })
1058 .collect()
1059}
1060
1061#[allow(clippy::too_many_arguments)]
1063async fn invoke_resource(
1064 resource: &str,
1065 input: &Value,
1066 delivery: &Option<Arc<DeliveryBus>>,
1067 dynamodb_state: &Option<SharedDynamoDbState>,
1068 registry: &Option<SharedServiceRegistry>,
1069 execution_arn: &str,
1070 timeout_seconds: Option<u64>,
1071 heartbeat_seconds: Option<u64>,
1072 shared_state: &SharedStepFunctionsState,
1073) -> Result<Value, (String, String)> {
1074 if resource.contains(":states:") && resource.contains(":activity:") {
1076 return invoke_activity(
1077 resource,
1078 input,
1079 shared_state,
1080 timeout_seconds,
1081 heartbeat_seconds,
1082 )
1083 .await;
1084 }
1085
1086 if resource.contains(":lambda:") && resource.contains(":function:") {
1088 return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1089 }
1090
1091 if resource.starts_with("arn:aws:states:::lambda:invoke") {
1093 let function_name = input["FunctionName"].as_str().unwrap_or("");
1094 let payload = if let Some(p) = input.get("Payload") {
1095 p.clone()
1096 } else {
1097 input.clone()
1098 };
1099 return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1100 }
1101
1102 if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1103 return invoke_sqs_send_message(input, delivery);
1104 }
1105
1106 if resource.starts_with("arn:aws:states:::sns:publish") {
1107 return invoke_sns_publish(input, delivery);
1108 }
1109
1110 if resource.starts_with("arn:aws:states:::events:putEvents") {
1111 return invoke_eventbridge_put_events(input, delivery);
1112 }
1113
1114 if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1115 return invoke_dynamodb_get_item(input, dynamodb_state);
1116 }
1117
1118 if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1119 return invoke_dynamodb_put_item(input, dynamodb_state);
1120 }
1121
1122 if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1123 return invoke_dynamodb_delete_item(input, dynamodb_state);
1124 }
1125
1126 if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1127 return invoke_dynamodb_update_item(input, dynamodb_state);
1128 }
1129
1130 if let Some(tail) = resource.strip_prefix("arn:aws:states:::") {
1138 if tail.starts_with("states:startExecution") {
1139 let account_id = account_from_execution_arn(execution_arn);
1140 let result =
1141 invoke_aws_sdk_integration(tail, input, registry, &account_id, timeout_seconds)
1142 .await;
1143 if let Ok(ref value) = result {
1149 if let Some(inner_arn) = value
1150 .get("executionArn")
1151 .or_else(|| value.get("ExecutionArn"))
1152 .and_then(Value::as_str)
1153 {
1154 let mut accounts = shared_state.write();
1155 if let Some(state) = accounts.get_mut(&account_id) {
1156 if let Some(exec) = state.executions.get_mut(inner_arn) {
1157 exec.parent_execution_arn = Some(execution_arn.to_string());
1158 }
1159 }
1160 }
1161 }
1162 return result;
1163 }
1164 }
1165
1166 if let Some(rest) = resource.strip_prefix("arn:aws:states:::aws-sdk:") {
1172 let account_id = account_from_execution_arn(execution_arn);
1173 return invoke_aws_sdk_integration(rest, input, registry, &account_id, timeout_seconds)
1174 .await;
1175 }
1176
1177 if let Some(tail) = resource.strip_prefix("arn:aws:states:::") {
1181 if tail.contains(".sync") {
1182 let account_id = account_from_execution_arn(execution_arn);
1183 return invoke_aws_sdk_integration(tail, input, registry, &account_id, timeout_seconds)
1184 .await;
1185 }
1186 }
1187
1188 Err((
1189 "States.TaskFailed".to_string(),
1190 format!("Unsupported resource: {resource}"),
1191 ))
1192}
1193
1194fn camel_to_pascal(action: &str) -> String {
1199 let mut chars = action.chars();
1200 match chars.next() {
1201 None => String::new(),
1202 Some(first) => first.to_ascii_uppercase().to_string() + chars.as_str(),
1203 }
1204}
1205
1206fn map_sdk_service_id(service_id: &str) -> &str {
1211 match service_id {
1212 "sfn" => "states",
1213 "cloudwatchlogs" => "logs",
1214 other => other,
1216 }
1217}
1218
1219fn account_from_execution_arn(execution_arn: &str) -> String {
1223 execution_arn
1224 .split(':')
1225 .nth(4)
1226 .filter(|s| !s.is_empty())
1227 .unwrap_or("123456789012")
1228 .to_string()
1229}
1230
1231async fn invoke_aws_sdk_integration(
1236 tail: &str,
1237 input: &Value,
1238 registry: &Option<SharedServiceRegistry>,
1239 account_id: &str,
1240 timeout_seconds: Option<u64>,
1241) -> Result<Value, (String, String)> {
1242 let registry_arc = resolve_registry(registry)?;
1243
1244 let mut parts = tail.splitn(2, ':');
1250 let service_id = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
1251 (
1252 "States.TaskFailed".to_string(),
1253 format!("Invalid aws-sdk Resource ARN: missing service in '{tail}'"),
1254 )
1255 })?;
1256 let action_with_mod = parts.next().filter(|s| !s.is_empty()).ok_or_else(|| {
1257 (
1258 "States.TaskFailed".to_string(),
1259 format!("Invalid aws-sdk Resource ARN: missing action in '{tail}'"),
1260 )
1261 })?;
1262 let action_camel = action_with_mod
1263 .split('.')
1264 .next()
1265 .filter(|s| !s.is_empty())
1266 .ok_or_else(|| {
1267 (
1268 "States.TaskFailed".to_string(),
1269 format!("Invalid aws-sdk Resource ARN: empty action in '{tail}'"),
1270 )
1271 })?;
1272 let modifiers: Vec<&str> = action_with_mod.split('.').skip(1).collect();
1273 let is_sync = modifiers.iter().any(|m| *m == "sync" || *m == "sync:2");
1274
1275 let action_pascal = camel_to_pascal(action_camel);
1276 let service_name = map_sdk_service_id(service_id).to_string();
1277
1278 let translated_input = match service_name.as_str() {
1283 "ecs" => translate_ecs_keys_to_camel(input),
1284 _ => input.clone(),
1285 };
1286
1287 let initial = call_sdk_action(
1288 ®istry_arc,
1289 &service_name,
1290 &action_pascal,
1291 &translated_input,
1292 account_id,
1293 )
1294 .await?;
1295
1296 if !is_sync {
1297 return Ok(initial);
1298 }
1299
1300 sync_wait(
1305 ®istry_arc,
1306 &service_name,
1307 &action_pascal,
1308 &initial,
1309 &translated_input,
1310 account_id,
1311 timeout_seconds,
1312 )
1313 .await
1314}
1315
1316fn translate_ecs_keys_to_camel(input: &Value) -> Value {
1323 let Some(obj) = input.as_object() else {
1324 return input.clone();
1325 };
1326 let mut out = serde_json::Map::with_capacity(obj.len());
1327 for (k, v) in obj.iter() {
1328 let camel = match k.as_str() {
1329 "Cluster" => "cluster",
1330 "TaskDefinition" => "taskDefinition",
1331 "LaunchType" => "launchType",
1332 "Group" => "group",
1333 "Overrides" => "overrides",
1334 "PlatformVersion" => "platformVersion",
1335 "NetworkConfiguration" => "networkConfiguration",
1336 "Tags" => "tags",
1337 "EnableExecuteCommand" => "enableExecuteCommand",
1338 "PropagateTags" => "propagateTags",
1339 "ReferenceId" => "referenceId",
1340 "StartedBy" => "startedBy",
1341 "Count" => "count",
1342 "CapacityProviderStrategy" => "capacityProviderStrategy",
1343 "PlacementConstraints" => "placementConstraints",
1344 "PlacementStrategy" => "placementStrategy",
1345 other => other,
1346 };
1347 out.insert(camel.to_string(), v.clone());
1348 }
1349 Value::Object(out)
1350}
1351
1352fn resolve_registry(
1353 registry: &Option<SharedServiceRegistry>,
1354) -> Result<Arc<fakecloud_core::registry::ServiceRegistry>, (String, String)> {
1355 let registry_handle = registry.as_ref().ok_or_else(|| {
1356 (
1357 "States.TaskFailed".to_string(),
1358 "No service registry configured for aws-sdk integration".to_string(),
1359 )
1360 })?;
1361 registry_handle.get().cloned().ok_or_else(|| {
1362 (
1363 "States.TaskFailed".to_string(),
1364 "Service registry not yet initialised; aws-sdk integration unavailable".to_string(),
1365 )
1366 })
1367}
1368
1369async fn call_sdk_action(
1371 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1372 service_name: &str,
1373 action_pascal: &str,
1374 input: &Value,
1375 account_id: &str,
1376) -> Result<Value, (String, String)> {
1377 use bytes::Bytes;
1378 use fakecloud_core::service::AwsRequest;
1379 use http::{HeaderMap, Method};
1380
1381 let service = registry.get(service_name).ok_or_else(|| {
1382 (
1383 "States.TaskFailed".to_string(),
1384 format!("Unknown aws-sdk service '{service_name}'"),
1385 )
1386 })?;
1387
1388 let body_bytes = Bytes::from(
1389 serde_json::to_vec(input).expect("serde_json::Value serialization is infallible"),
1390 );
1391
1392 let req = AwsRequest {
1393 service: service_name.to_string(),
1394 action: action_pascal.to_string(),
1395 region: "us-east-1".to_string(),
1396 account_id: account_id.to_string(),
1397 request_id: uuid::Uuid::new_v4().to_string(),
1398 headers: HeaderMap::new(),
1399 query_params: std::collections::HashMap::new(),
1400 body: body_bytes,
1401 body_stream: parking_lot::Mutex::new(None),
1402 path_segments: vec![],
1403 raw_path: "/".to_string(),
1404 raw_query: String::new(),
1405 method: Method::POST,
1406 is_query_protocol: false,
1407 access_key_id: None,
1408 principal: None,
1409 };
1410
1411 let response = service.handle(req).await.map_err(|err| {
1412 let code = err.code().to_string();
1413 let msg = err.message();
1414 let prefix_service = match service_name {
1415 "dynamodb" => "DynamoDb".to_string(),
1416 "states" => "Sfn".to_string(),
1417 other => camel_to_pascal(other),
1418 };
1419 (
1420 format!("{prefix_service}.{code}"),
1421 format!("{action_pascal} failed: {msg}"),
1422 )
1423 })?;
1424
1425 let response_bytes = match response.body {
1426 fakecloud_core::service::ResponseBody::Bytes(b) => b,
1427 fakecloud_core::service::ResponseBody::File { .. } => {
1428 return Err((
1429 "States.TaskFailed".to_string(),
1430 "aws-sdk integration: file-backed response not supported".to_string(),
1431 ));
1432 }
1433 };
1434
1435 if response_bytes.is_empty() {
1436 return Ok(json!({}));
1437 }
1438 serde_json::from_slice(&response_bytes).map_err(|e| {
1439 (
1440 "States.TaskFailed".to_string(),
1441 format!("aws-sdk integration: failed to parse response JSON: {e}"),
1442 )
1443 })
1444}
1445
1446async fn call_sdk_action_raw_bytes(
1449 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1450 service_name: &str,
1451 action_pascal: &str,
1452 input: &Value,
1453 account_id: &str,
1454) -> Result<bytes::Bytes, (String, String)> {
1455 use bytes::Bytes;
1456 use fakecloud_core::service::AwsRequest;
1457 use http::{HeaderMap, Method};
1458
1459 let service = registry.get(service_name).ok_or_else(|| {
1460 (
1461 "States.TaskFailed".to_string(),
1462 format!("Unknown aws-sdk service '{service_name}'"),
1463 )
1464 })?;
1465
1466 let body_bytes = Bytes::from(
1467 serde_json::to_vec(input).expect("serde_json::Value serialization is infallible"),
1468 );
1469
1470 let req = AwsRequest {
1471 service: service_name.to_string(),
1472 action: action_pascal.to_string(),
1473 region: "us-east-1".to_string(),
1474 account_id: account_id.to_string(),
1475 request_id: uuid::Uuid::new_v4().to_string(),
1476 headers: HeaderMap::new(),
1477 query_params: std::collections::HashMap::new(),
1478 body: body_bytes,
1479 body_stream: parking_lot::Mutex::new(None),
1480 path_segments: vec![],
1481 raw_path: "/".to_string(),
1482 raw_query: String::new(),
1483 method: Method::POST,
1484 is_query_protocol: false,
1485 access_key_id: None,
1486 principal: None,
1487 };
1488
1489 let response = service.handle(req).await.map_err(|err| {
1490 let code = err.code().to_string();
1491 let msg = err.message();
1492 let prefix_service = match service_name {
1493 "dynamodb" => "DynamoDb".to_string(),
1494 "states" => "Sfn".to_string(),
1495 other => camel_to_pascal(other),
1496 };
1497 (
1498 format!("{prefix_service}.{code}"),
1499 format!("{action_pascal} failed: {msg}"),
1500 )
1501 })?;
1502
1503 match response.body {
1504 fakecloud_core::service::ResponseBody::Bytes(b) => Ok(b),
1505 fakecloud_core::service::ResponseBody::File { .. } => Err((
1506 "States.TaskFailed".to_string(),
1507 "aws-sdk integration: file-backed response not supported".to_string(),
1508 )),
1509 }
1510}
1511
1512const SYNC_DEFAULT_TIMEOUT_SECS: u64 = 300;
1518const SYNC_POLL_INTERVAL_MS: u64 = 200;
1519
1520async fn sync_wait(
1524 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1525 service_name: &str,
1526 action_pascal: &str,
1527 initial: &Value,
1528 input: &Value,
1529 account_id: &str,
1530 timeout_seconds: Option<u64>,
1531) -> Result<Value, (String, String)> {
1532 match (service_name, action_pascal) {
1533 ("ecs", "RunTask") => {
1534 sync_wait_ecs_run_task(registry, initial, input, account_id, timeout_seconds).await
1535 }
1536 ("athena", "StartQueryExecution") => {
1537 sync_wait_athena_query(registry, initial, account_id, timeout_seconds).await
1538 }
1539 ("states", "StartExecution") => {
1540 sync_wait_states_start_execution(registry, initial, account_id, timeout_seconds).await
1541 }
1542 ("glue", "StartJobRun") => {
1543 let job_run_id = initial
1549 .get("JobRunId")
1550 .and_then(Value::as_str)
1551 .unwrap_or("synthetic")
1552 .to_string();
1553 let job_name = input
1554 .get("JobName")
1555 .and_then(Value::as_str)
1556 .unwrap_or("")
1557 .to_string();
1558 Ok(json!({
1559 "JobRun": {
1560 "Id": job_run_id,
1561 "JobName": job_name,
1562 "JobRunState": "SUCCEEDED",
1563 }
1564 }))
1565 }
1566 _ => Err((
1567 "States.TaskFailed".to_string(),
1568 format!(
1569 "`.sync` is not supported for {service_name}:{action_pascal} yet — \
1570 supported: ecs:RunTask, athena:StartQueryExecution, glue:StartJobRun, states:StartExecution"
1571 ),
1572 )),
1573 }
1574}
1575
1576async fn sync_wait_ecs_run_task(
1577 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1578 initial: &Value,
1579 input: &Value,
1580 account_id: &str,
1581 timeout_seconds: Option<u64>,
1582) -> Result<Value, (String, String)> {
1583 let tasks = initial
1584 .get("tasks")
1585 .and_then(Value::as_array)
1586 .ok_or_else(|| {
1587 (
1588 "States.TaskFailed".to_string(),
1589 "ecs:RunTask.sync: response missing 'tasks' array".to_string(),
1590 )
1591 })?;
1592 if tasks.is_empty() {
1593 return Err((
1594 "States.TaskFailed".to_string(),
1595 "ecs:RunTask.sync: no tasks were started".to_string(),
1596 ));
1597 }
1598 let task_arns: Vec<String> = tasks
1599 .iter()
1600 .filter_map(|t| t.get("taskArn").and_then(Value::as_str).map(String::from))
1601 .collect();
1602 let cluster = input
1603 .get("cluster")
1604 .or_else(|| input.get("Cluster"))
1605 .and_then(Value::as_str)
1606 .map(String::from);
1607
1608 let deadline = sync_deadline(timeout_seconds);
1609 loop {
1610 let mut describe_input = json!({ "tasks": task_arns });
1611 if let Some(c) = &cluster {
1612 describe_input["cluster"] = json!(c);
1613 }
1614 let described = call_sdk_action(
1615 registry,
1616 "ecs",
1617 "DescribeTasks",
1618 &describe_input,
1619 account_id,
1620 )
1621 .await?;
1622 let described_tasks = described
1623 .get("tasks")
1624 .and_then(Value::as_array)
1625 .cloned()
1626 .unwrap_or_default();
1627 let all_stopped = !described_tasks.is_empty()
1628 && described_tasks
1629 .iter()
1630 .all(|t| t.get("lastStatus").and_then(Value::as_str) == Some("STOPPED"));
1631 if all_stopped {
1632 let any_failed = described_tasks.iter().any(|t| {
1637 let stop_code = t.get("stopCode").and_then(Value::as_str);
1638 let bad_stop = matches!(
1639 stop_code,
1640 Some(
1641 "TaskFailedToStart"
1642 | "EssentialContainerExited"
1643 | "ServiceSchedulerInitiated"
1644 )
1645 );
1646 let bad_exit = t
1647 .get("containers")
1648 .and_then(Value::as_array)
1649 .map(|cs| {
1650 cs.iter().any(|c| {
1651 c.get("exitCode")
1652 .and_then(Value::as_i64)
1653 .map(|n| n != 0)
1654 .unwrap_or(false)
1655 })
1656 })
1657 .unwrap_or(false);
1658 bad_stop || bad_exit
1659 });
1660 if any_failed {
1661 let cause = described_tasks
1662 .iter()
1663 .find_map(|t| {
1664 t.get("stoppedReason")
1665 .and_then(Value::as_str)
1666 .map(String::from)
1667 })
1668 .unwrap_or_else(|| "ECS task failed".to_string());
1669 return Err(("States.TaskFailed".to_string(), cause));
1670 }
1671 return Ok(described);
1672 }
1673 if std::time::Instant::now() >= deadline {
1674 return Err((
1675 "States.Timeout".to_string(),
1676 format!(
1677 "ecs:RunTask.sync timed out after {}s waiting for {} task(s) to STOP",
1678 sync_timeout_secs(timeout_seconds),
1679 task_arns.len()
1680 ),
1681 ));
1682 }
1683 tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1684 }
1685}
1686
1687async fn sync_wait_athena_query(
1688 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1689 initial: &Value,
1690 account_id: &str,
1691 timeout_seconds: Option<u64>,
1692) -> Result<Value, (String, String)> {
1693 let qid = initial
1694 .get("QueryExecutionId")
1695 .and_then(Value::as_str)
1696 .ok_or_else(|| {
1697 (
1698 "States.TaskFailed".to_string(),
1699 "athena:StartQueryExecution.sync: response missing QueryExecutionId".to_string(),
1700 )
1701 })?
1702 .to_string();
1703
1704 let deadline = sync_deadline(timeout_seconds);
1705 loop {
1706 let described = call_sdk_action(
1707 registry,
1708 "athena",
1709 "GetQueryExecution",
1710 &json!({ "QueryExecutionId": qid }),
1711 account_id,
1712 )
1713 .await?;
1714 let state = described
1715 .get("QueryExecution")
1716 .and_then(|qe| qe.get("Status"))
1717 .and_then(|s| s.get("State"))
1718 .and_then(Value::as_str)
1719 .unwrap_or("");
1720 match state {
1721 "SUCCEEDED" => return Ok(described),
1722 "FAILED" | "CANCELLED" => {
1723 let cause = described
1724 .get("QueryExecution")
1725 .and_then(|qe| qe.get("Status"))
1726 .and_then(|s| s.get("StateChangeReason"))
1727 .and_then(Value::as_str)
1728 .unwrap_or("Athena query reached terminal failure state")
1729 .to_string();
1730 return Err(("States.TaskFailed".to_string(), cause));
1731 }
1732 _ => {}
1733 }
1734 if std::time::Instant::now() >= deadline {
1735 return Err((
1736 "States.Timeout".to_string(),
1737 format!(
1738 "athena:StartQueryExecution.sync timed out after {}s for query {qid}",
1739 sync_timeout_secs(timeout_seconds)
1740 ),
1741 ));
1742 }
1743 tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1744 }
1745}
1746
1747async fn sync_wait_states_start_execution(
1753 registry: &Arc<fakecloud_core::registry::ServiceRegistry>,
1754 initial: &Value,
1755 account_id: &str,
1756 timeout_seconds: Option<u64>,
1757) -> Result<Value, (String, String)> {
1758 let exec_arn = initial
1759 .get("executionArn")
1760 .or_else(|| initial.get("ExecutionArn"))
1761 .and_then(Value::as_str)
1762 .ok_or_else(|| {
1763 (
1764 "States.TaskFailed".to_string(),
1765 "states:startExecution.sync: response missing executionArn".to_string(),
1766 )
1767 })?
1768 .to_string();
1769
1770 let deadline = sync_deadline(timeout_seconds);
1771 loop {
1772 let described = call_sdk_action(
1773 registry,
1774 "states",
1775 "DescribeExecution",
1776 &json!({ "executionArn": exec_arn }),
1777 account_id,
1778 )
1779 .await?;
1780 let status = described
1781 .get("status")
1782 .or_else(|| described.get("Status"))
1783 .and_then(Value::as_str)
1784 .unwrap_or("");
1785 match status {
1786 "SUCCEEDED" => return Ok(described),
1787 "FAILED" | "TIMED_OUT" | "ABORTED" => {
1788 let cause = described
1789 .get("cause")
1790 .or_else(|| described.get("Cause"))
1791 .and_then(Value::as_str)
1792 .unwrap_or("Nested execution reached terminal failure state")
1793 .to_string();
1794 return Err(("States.TaskFailed".to_string(), cause));
1795 }
1796 _ => {}
1797 }
1798 if std::time::Instant::now() >= deadline {
1799 return Err((
1800 "States.Timeout".to_string(),
1801 format!(
1802 "states:startExecution.sync timed out after {}s for {exec_arn}",
1803 sync_timeout_secs(timeout_seconds)
1804 ),
1805 ));
1806 }
1807 tokio::time::sleep(std::time::Duration::from_millis(SYNC_POLL_INTERVAL_MS)).await;
1808 }
1809}
1810
1811fn sync_timeout_secs(timeout_seconds: Option<u64>) -> u64 {
1812 timeout_seconds.unwrap_or(SYNC_DEFAULT_TIMEOUT_SECS)
1813}
1814
1815fn sync_deadline(timeout_seconds: Option<u64>) -> std::time::Instant {
1816 std::time::Instant::now() + std::time::Duration::from_secs(sync_timeout_secs(timeout_seconds))
1817}
1818
1819#[derive(Clone, Copy)]
1820pub(crate) enum UpdateClause {
1821 Set,
1822 Remove,
1823 Add,
1824 Delete,
1825}
1826
1827async fn invoke_lambda_direct(
1829 function_arn: &str,
1830 input: &Value,
1831 delivery: &Option<Arc<DeliveryBus>>,
1832 timeout_seconds: Option<u64>,
1833) -> Result<Value, (String, String)> {
1834 let delivery = delivery.as_ref().ok_or_else(|| {
1835 (
1836 "States.TaskFailed".to_string(),
1837 "No delivery bus configured for Lambda invocation".to_string(),
1838 )
1839 })?;
1840
1841 let payload =
1842 serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1843
1844 let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1845
1846 let result = if let Some(timeout) = timeout_seconds {
1847 match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1848 Ok(r) => r,
1849 Err(_) => {
1850 return Err((
1851 "States.Timeout".to_string(),
1852 format!("Task timed out after {timeout} seconds"),
1853 ));
1854 }
1855 }
1856 } else {
1857 invoke_future.await
1858 };
1859
1860 match result {
1861 Some(Ok(bytes)) => {
1862 let response_str = String::from_utf8_lossy(&bytes);
1863 let value: Value =
1864 serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1865 Ok(value)
1866 }
1867 Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1868 None => {
1869 Ok(json!({}))
1871 }
1872 }
1873}
1874
1875async fn invoke_activity(
1880 activity_arn: &str,
1881 input: &Value,
1882 shared_state: &SharedStepFunctionsState,
1883 timeout_seconds: Option<u64>,
1884 heartbeat_seconds: Option<u64>,
1885) -> Result<Value, (String, String)> {
1886 use crate::state::TaskTokenState;
1887
1888 let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
1890 {
1891 let accounts = shared_state.read();
1892 let exists = accounts
1893 .get(&activity_account)
1894 .map(|s| s.activities.contains_key(activity_arn))
1895 .unwrap_or(false);
1896 if !exists {
1897 return Err((
1898 "States.TaskFailed".to_string(),
1899 format!("Activity does not exist: {activity_arn}"),
1900 ));
1901 }
1902 }
1903
1904 let token = format!(
1905 "FCToken-{}-{}",
1906 chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
1907 uuid::Uuid::new_v4().simple(),
1908 );
1909 let now = chrono::Utc::now();
1910 let input_str =
1911 serde_json::to_string(input).expect("serde_json::Value serialization is infallible");
1912 {
1913 let mut accounts = shared_state.write();
1914 let state = accounts.get_or_create(&activity_account);
1915 state.task_tokens.insert(
1916 token.clone(),
1917 TaskTokenState {
1918 activity_arn: activity_arn.to_string(),
1919 status: "PENDING".to_string(),
1920 output: None,
1921 error: None,
1922 cause: None,
1923 input: Some(input_str),
1924 created_at: now,
1925 last_heartbeat_at: None,
1926 heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
1927 timeout_seconds: timeout_seconds.map(|s| s as i64),
1928 },
1929 );
1930 }
1931
1932 poll_task_token(
1933 shared_state,
1934 &activity_account,
1935 &token,
1936 timeout_seconds,
1937 heartbeat_seconds,
1938 )
1939 .await
1940}
1941
1942pub(crate) enum NextState {
1943 Name(String),
1944 End,
1945 Error(String),
1946}
1947
1948#[path = "interpreter_helpers.rs"]
1949mod interpreter_helpers;
1950pub(crate) use interpreter_helpers::*;
1951
1952#[cfg(test)]
1953#[path = "interpreter_tests.rs"]
1954mod tests;