Skip to main content

fakecloud_stepfunctions/
state.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9pub type SharedStepFunctionsState =
10    Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<StepFunctionsState>>>;
11
12impl fakecloud_core::multi_account::AccountState for StepFunctionsState {
13    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
14        Self::new(account_id, region)
15    }
16}
17
18pub const STEPFUNCTIONS_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
19
20#[derive(Debug, Serialize, Deserialize)]
21pub struct StepFunctionsSnapshot {
22    pub schema_version: u32,
23    #[serde(default)]
24    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<StepFunctionsState>>,
25    #[serde(default)]
26    pub state: Option<StepFunctionsState>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct StepFunctionsState {
31    pub account_id: String,
32    pub region: String,
33    /// State machines keyed by ARN.
34    #[serde(default)]
35    pub state_machines: HashMap<String, StateMachine>,
36    /// Executions keyed by execution ARN.
37    #[serde(default)]
38    pub executions: HashMap<String, Execution>,
39    #[serde(default)]
40    pub activities: HashMap<String, Activity>,
41    #[serde(default)]
42    pub state_machine_versions: HashMap<String, StateMachineVersion>,
43    #[serde(default)]
44    pub state_machine_aliases: HashMap<String, StateMachineAlias>,
45    #[serde(default)]
46    pub map_runs: HashMap<String, MapRun>,
47    /// Pending task tokens issued for sync activities + their outcome.
48    #[serde(default)]
49    pub task_tokens: HashMap<String, TaskTokenState>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct Activity {
54    pub name: String,
55    pub arn: String,
56    pub creation_date: DateTime<Utc>,
57    pub tags: HashMap<String, String>,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct StateMachineVersion {
62    pub state_machine_arn: String,
63    pub version: i64,
64    pub revision_id: String,
65    pub description: String,
66    pub creation_date: DateTime<Utc>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct StateMachineAlias {
71    pub name: String,
72    pub arn: String,
73    pub description: String,
74    pub routing_configuration: Vec<AliasRoute>,
75    pub creation_date: DateTime<Utc>,
76    pub update_date: DateTime<Utc>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct AliasRoute {
81    pub state_machine_version_arn: String,
82    pub weight: i32,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MapRun {
87    pub map_run_arn: String,
88    pub execution_arn: String,
89    pub max_concurrency: i32,
90    pub tolerated_failure_percentage: f64,
91    pub tolerated_failure_count: i64,
92    pub status: String,
93    pub start_date: DateTime<Utc>,
94    pub stop_date: Option<DateTime<Utc>>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct TaskTokenState {
99    pub activity_arn: String,
100    /// PENDING (waiting for `GetActivityTask` to dequeue) /
101    /// IN_PROGRESS (worker has picked it up) /
102    /// SUCCEEDED / FAILED / TIMED_OUT.
103    pub status: String,
104    pub output: Option<String>,
105    pub error: Option<String>,
106    pub cause: Option<String>,
107    /// Input the state machine wanted the worker to process. `None`
108    /// for tokens minted by external `GetActivityTask` callers without
109    /// any associated activity execution (legacy synthetic path).
110    #[serde(default)]
111    pub input: Option<String>,
112    #[serde(default = "default_now")]
113    pub created_at: DateTime<Utc>,
114    #[serde(default)]
115    pub last_heartbeat_at: Option<DateTime<Utc>>,
116    /// Per AWS docs: state machine fails the task if no heartbeat in
117    /// this many seconds while the worker is running.
118    #[serde(default)]
119    pub heartbeat_seconds: Option<i64>,
120    /// Overall timeout for the task; counted from `created_at`.
121    #[serde(default)]
122    pub timeout_seconds: Option<i64>,
123}
124
125fn default_now() -> DateTime<Utc> {
126    Utc::now()
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct StateMachine {
131    pub name: String,
132    pub arn: String,
133    pub definition: String,
134    pub role_arn: String,
135    pub machine_type: StateMachineType,
136    pub status: StateMachineStatus,
137    pub creation_date: DateTime<Utc>,
138    pub update_date: DateTime<Utc>,
139    pub tags: HashMap<String, String>,
140    pub revision_id: String,
141    pub logging_configuration: Option<Value>,
142    pub tracing_configuration: Option<Value>,
143    pub description: String,
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
147pub enum StateMachineType {
148    Standard,
149    Express,
150}
151
152impl StateMachineType {
153    pub fn as_str(&self) -> &'static str {
154        match self {
155            Self::Standard => "STANDARD",
156            Self::Express => "EXPRESS",
157        }
158    }
159
160    pub fn parse(s: &str) -> Option<Self> {
161        match s {
162            "STANDARD" => Some(Self::Standard),
163            "EXPRESS" => Some(Self::Express),
164            _ => None,
165        }
166    }
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
170pub enum StateMachineStatus {
171    Active,
172    Deleting,
173}
174
175impl StateMachineStatus {
176    pub fn as_str(&self) -> &'static str {
177        match self {
178            Self::Active => "ACTIVE",
179            Self::Deleting => "DELETING",
180        }
181    }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct Execution {
186    pub execution_arn: String,
187    pub state_machine_arn: String,
188    pub state_machine_name: String,
189    pub name: String,
190    pub status: ExecutionStatus,
191    pub input: Option<String>,
192    pub output: Option<String>,
193    pub start_date: DateTime<Utc>,
194    pub stop_date: Option<DateTime<Utc>>,
195    pub error: Option<String>,
196    pub cause: Option<String>,
197    pub history_events: Vec<HistoryEvent>,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
201pub enum ExecutionStatus {
202    Running,
203    Succeeded,
204    Failed,
205    TimedOut,
206    Aborted,
207    PendingRedrive,
208}
209
210impl ExecutionStatus {
211    pub fn as_str(&self) -> &'static str {
212        match self {
213            Self::Running => "RUNNING",
214            Self::Succeeded => "SUCCEEDED",
215            Self::Failed => "FAILED",
216            Self::TimedOut => "TIMED_OUT",
217            Self::Aborted => "ABORTED",
218            Self::PendingRedrive => "PENDING_REDRIVE",
219        }
220    }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct HistoryEvent {
225    pub id: i64,
226    pub event_type: String,
227    pub timestamp: DateTime<Utc>,
228    pub previous_event_id: i64,
229    pub details: Value,
230}
231
232impl StepFunctionsState {
233    pub fn new(account_id: &str, region: &str) -> Self {
234        Self {
235            account_id: account_id.to_string(),
236            region: region.to_string(),
237            state_machines: HashMap::new(),
238            executions: HashMap::new(),
239            activities: HashMap::new(),
240            state_machine_versions: HashMap::new(),
241            state_machine_aliases: HashMap::new(),
242            map_runs: HashMap::new(),
243            task_tokens: HashMap::new(),
244        }
245    }
246
247    pub fn reset(&mut self) {
248        self.state_machines.clear();
249        self.executions.clear();
250        self.activities.clear();
251        self.state_machine_versions.clear();
252        self.state_machine_aliases.clear();
253        self.map_runs.clear();
254        self.task_tokens.clear();
255    }
256
257    pub fn state_machine_arn(&self, name: &str) -> String {
258        format!(
259            "arn:aws:states:{}:{}:stateMachine:{}",
260            self.region, self.account_id, name
261        )
262    }
263
264    pub fn execution_arn(&self, state_machine_name: &str, execution_name: &str) -> String {
265        format!(
266            "arn:aws:states:{}:{}:execution:{}:{}",
267            self.region, self.account_id, state_machine_name, execution_name
268        )
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn state_machine_type_as_str() {
278        assert_eq!(StateMachineType::Standard.as_str(), "STANDARD");
279        assert_eq!(StateMachineType::Express.as_str(), "EXPRESS");
280    }
281
282    #[test]
283    fn state_machine_type_parse() {
284        assert_eq!(
285            StateMachineType::parse("STANDARD"),
286            Some(StateMachineType::Standard)
287        );
288        assert_eq!(
289            StateMachineType::parse("EXPRESS"),
290            Some(StateMachineType::Express)
291        );
292        assert_eq!(StateMachineType::parse("bogus"), None);
293    }
294
295    #[test]
296    fn state_machine_status_as_str() {
297        assert_eq!(StateMachineStatus::Active.as_str(), "ACTIVE");
298        assert_eq!(StateMachineStatus::Deleting.as_str(), "DELETING");
299    }
300
301    #[test]
302    fn execution_status_as_str() {
303        assert_eq!(ExecutionStatus::Running.as_str(), "RUNNING");
304        assert_eq!(ExecutionStatus::Succeeded.as_str(), "SUCCEEDED");
305        assert_eq!(ExecutionStatus::Failed.as_str(), "FAILED");
306        assert_eq!(ExecutionStatus::TimedOut.as_str(), "TIMED_OUT");
307        assert_eq!(ExecutionStatus::Aborted.as_str(), "ABORTED");
308        assert_eq!(ExecutionStatus::PendingRedrive.as_str(), "PENDING_REDRIVE");
309    }
310
311    #[test]
312    fn state_machine_arn_format() {
313        let state = StepFunctionsState::new("123456789012", "us-east-1");
314        assert_eq!(
315            state.state_machine_arn("my-sm"),
316            "arn:aws:states:us-east-1:123456789012:stateMachine:my-sm"
317        );
318    }
319
320    #[test]
321    fn execution_arn_format() {
322        let state = StepFunctionsState::new("123456789012", "us-east-1");
323        assert_eq!(
324            state.execution_arn("sm", "exec-1"),
325            "arn:aws:states:us-east-1:123456789012:execution:sm:exec-1"
326        );
327    }
328
329    #[test]
330    fn state_reset_clears_all() {
331        let mut state = StepFunctionsState::new("123456789012", "us-east-1");
332        state.state_machines.insert(
333            "x".to_string(),
334            StateMachine {
335                name: "sm".to_string(),
336                arn: "arn:aws:states:us-east-1:123:stateMachine:sm".to_string(),
337                definition: "{}".to_string(),
338                role_arn: "r".to_string(),
339                machine_type: StateMachineType::Standard,
340                status: StateMachineStatus::Active,
341                creation_date: Utc::now(),
342                update_date: Utc::now(),
343                tags: HashMap::new(),
344                revision_id: "v1".to_string(),
345                logging_configuration: None,
346                tracing_configuration: None,
347                description: String::new(),
348            },
349        );
350        state.reset();
351        assert!(state.state_machines.is_empty());
352        assert!(state.executions.is_empty());
353    }
354}