Skip to main content

dapr_durabletask/api/
orchestration_state.rs

1use chrono::{DateTime, Utc};
2
3use super::{DurableTaskError, FailureDetails, OrchestrationStatus};
4use crate::proto;
5
6/// Snapshot of an orchestration instance's current state.
7#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
8pub struct OrchestrationState {
9    pub instance_id: String,
10    pub name: String,
11    pub runtime_status: OrchestrationStatus,
12    pub created_at: Option<DateTime<Utc>>,
13    pub last_updated_at: Option<DateTime<Utc>>,
14    pub serialized_input: Option<String>,
15    pub serialized_output: Option<String>,
16    pub serialized_custom_status: Option<String>,
17    pub failure_details: Option<FailureDetails>,
18}
19
20/// Error returned when converting a `GetInstanceResponse` to an `OrchestrationState`
21/// when the instance does not exist or has no workflow state.
22#[derive(Debug, Clone)]
23pub struct InstanceNotFound;
24
25impl TryFrom<&proto::GetInstanceResponse> for OrchestrationState {
26    type Error = InstanceNotFound;
27
28    /// Constructs an `OrchestrationState` from a proto `GetInstanceResponse`.
29    ///
30    /// Returns `Err(InstanceNotFound)` if the response indicates the instance
31    /// does not exist or has no workflow state.
32    fn try_from(response: &proto::GetInstanceResponse) -> std::result::Result<Self, Self::Error> {
33        let state = response.workflow_state.as_ref().ok_or(InstanceNotFound)?;
34        if !response.exists {
35            return Err(InstanceNotFound);
36        }
37
38        Ok(Self {
39            instance_id: state.instance_id.clone(),
40            name: state.name.clone(),
41            runtime_status: OrchestrationStatus::try_from(state.workflow_status)
42                .unwrap_or(OrchestrationStatus::Running),
43            created_at: state.created_timestamp.as_ref().map(timestamp_to_datetime),
44            last_updated_at: state
45                .last_updated_timestamp
46                .as_ref()
47                .map(timestamp_to_datetime),
48            serialized_input: state.input.clone(),
49            serialized_output: state.output.clone(),
50            serialized_custom_status: state.custom_status.clone(),
51            failure_details: state.failure_details.as_ref().map(FailureDetails::from),
52        })
53    }
54}
55
56impl OrchestrationState {
57    /// Returns `Ok(())` if the orchestration has not failed, or an error with
58    /// the failure details if it has.
59    pub fn raise_if_failed(&self) -> super::Result<()> {
60        if self.runtime_status == OrchestrationStatus::Failed {
61            let message = self
62                .failure_details
63                .as_ref()
64                .map(|d| d.message.clone())
65                .unwrap_or_else(|| "unknown failure".to_string());
66            return Err(DurableTaskError::OrchestrationFailed {
67                instance_id: self.instance_id.clone(),
68                message,
69                failure_details: self.failure_details.clone(),
70            });
71        }
72        Ok(())
73    }
74}
75
76fn timestamp_to_datetime(ts: &crate::proto::prost_types::Timestamp) -> DateTime<Utc> {
77    let nanos = if ts.nanos >= 0 && ts.nanos <= 999_999_999 {
78        ts.nanos as u32
79    } else {
80        0
81    };
82    DateTime::from_timestamp(ts.seconds, nanos).unwrap_or_default()
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::proto;
89    use crate::proto::prost_types::Timestamp;
90
91    fn make_workflow_state(status: i32) -> proto::WorkflowState {
92        proto::WorkflowState {
93            instance_id: "test-id".into(),
94            name: "TestOrch".into(),
95            version: None,
96            workflow_status: status,
97            scheduled_start_timestamp: None,
98            created_timestamp: Some(Timestamp {
99                seconds: 1_700_000_000,
100                nanos: 500_000_000,
101            }),
102            last_updated_timestamp: Some(Timestamp {
103                seconds: 1_700_000_100,
104                nanos: 0,
105            }),
106            input: Some(r#""hello""#.into()),
107            output: Some(r#""world""#.into()),
108            custom_status: None,
109            failure_details: None,
110            execution_id: None,
111            completed_timestamp: None,
112            parent_instance_id: None,
113            tags: Default::default(),
114            parent_app_id: None,
115            started_at: None,
116        }
117    }
118
119    #[test]
120    fn timestamp_valid_seconds_and_nanos() {
121        let ts = Timestamp {
122            seconds: 1_700_000_000,
123            nanos: 500_000_000,
124        };
125        let dt = timestamp_to_datetime(&ts);
126        assert_eq!(dt.timestamp(), 1_700_000_000);
127        assert_eq!(dt.timestamp_subsec_nanos(), 500_000_000);
128    }
129
130    #[test]
131    fn timestamp_negative_nanos_clamps_to_zero() {
132        let ts = Timestamp {
133            seconds: 1_700_000_000,
134            nanos: -1,
135        };
136        let dt = timestamp_to_datetime(&ts);
137        assert_eq!(dt.timestamp(), 1_700_000_000);
138        assert_eq!(dt.timestamp_subsec_nanos(), 0);
139    }
140
141    #[test]
142    fn timestamp_overflow_nanos_clamps_to_zero() {
143        let ts = Timestamp {
144            seconds: 1_700_000_000,
145            nanos: 1_000_000_000,
146        };
147        let dt = timestamp_to_datetime(&ts);
148        assert_eq!(dt.timestamp(), 1_700_000_000);
149        assert_eq!(dt.timestamp_subsec_nanos(), 0);
150    }
151
152    #[test]
153    fn timestamp_negative_seconds() {
154        let ts = Timestamp {
155            seconds: -1,
156            nanos: 0,
157        };
158        let dt = timestamp_to_datetime(&ts);
159        assert_eq!(dt.timestamp(), -1);
160    }
161
162    #[test]
163    fn try_from_valid_response() {
164        let resp = proto::GetInstanceResponse {
165            exists: true,
166            workflow_state: Some(make_workflow_state(1)), // Completed
167        };
168        let state = OrchestrationState::try_from(&resp).unwrap();
169        assert_eq!(state.instance_id, "test-id");
170        assert_eq!(state.name, "TestOrch");
171        assert_eq!(state.runtime_status, OrchestrationStatus::Completed);
172        assert!(state.created_at.is_some());
173        assert_eq!(state.created_at.unwrap().timestamp(), 1_700_000_000);
174        assert_eq!(state.serialized_input.as_deref(), Some(r#""hello""#));
175    }
176
177    #[test]
178    fn try_from_not_exists() {
179        let resp = proto::GetInstanceResponse {
180            exists: false,
181            workflow_state: Some(make_workflow_state(0)),
182        };
183        assert!(OrchestrationState::try_from(&resp).is_err());
184    }
185
186    #[test]
187    fn try_from_no_workflow_state() {
188        let resp = proto::GetInstanceResponse {
189            exists: true,
190            workflow_state: None,
191        };
192        assert!(OrchestrationState::try_from(&resp).is_err());
193    }
194
195    #[test]
196    fn raise_if_failed_ok_for_completed() {
197        let state = OrchestrationState {
198            instance_id: "i".into(),
199            name: "n".into(),
200            runtime_status: OrchestrationStatus::Completed,
201            created_at: None,
202            last_updated_at: None,
203            serialized_input: None,
204            serialized_output: None,
205            serialized_custom_status: None,
206            failure_details: None,
207        };
208        assert!(state.raise_if_failed().is_ok());
209    }
210
211    #[test]
212    fn raise_if_failed_returns_error_for_failed() {
213        let state = OrchestrationState {
214            instance_id: "i1".into(),
215            name: "n".into(),
216            runtime_status: OrchestrationStatus::Failed,
217            created_at: None,
218            last_updated_at: None,
219            serialized_input: None,
220            serialized_output: None,
221            serialized_custom_status: None,
222            failure_details: None,
223        };
224        let err = state.raise_if_failed().unwrap_err();
225        match err {
226            DurableTaskError::OrchestrationFailed {
227                instance_id,
228                message,
229                ..
230            } => {
231                assert_eq!(instance_id, "i1");
232                assert_eq!(message, "unknown failure");
233            }
234            other => panic!("unexpected error: {other:?}"),
235        }
236    }
237
238    #[test]
239    fn raise_if_failed_with_failure_details() {
240        let details = FailureDetails {
241            message: "boom".into(),
242            error_type: "TestError".into(),
243            stack_trace: Some("at line 1".into()),
244        };
245        let state = OrchestrationState {
246            instance_id: "i2".into(),
247            name: "n".into(),
248            runtime_status: OrchestrationStatus::Failed,
249            created_at: None,
250            last_updated_at: None,
251            serialized_input: None,
252            serialized_output: None,
253            serialized_custom_status: None,
254            failure_details: Some(details.clone()),
255        };
256        let err = state.raise_if_failed().unwrap_err();
257        match err {
258            DurableTaskError::OrchestrationFailed {
259                message,
260                failure_details,
261                ..
262            } => {
263                assert_eq!(message, "boom");
264                let fd = failure_details.unwrap();
265                assert_eq!(fd.error_type, "TestError");
266                assert_eq!(fd.stack_trace.as_deref(), Some("at line 1"));
267            }
268            other => panic!("unexpected error: {other:?}"),
269        }
270    }
271}