Skip to main content

dapr_durabletask/api/
orchestration_state.rs

1use chrono::{DateTime, Utc};
2
3use super::{DurableTaskError, FailureDetails, OrchestrationStatus};
4use crate::internal::from_timestamp;
5use crate::proto;
6
7/// Snapshot of an orchestration instance's current state.
8#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
9pub struct OrchestrationState {
10    pub instance_id: String,
11    pub name: String,
12    pub runtime_status: OrchestrationStatus,
13    pub created_at: Option<DateTime<Utc>>,
14    pub last_updated_at: Option<DateTime<Utc>>,
15    pub serialized_input: Option<String>,
16    pub serialized_output: Option<String>,
17    pub serialized_custom_status: Option<String>,
18    pub failure_details: Option<FailureDetails>,
19}
20
21/// Error returned when converting a `GetInstanceResponse` to an `OrchestrationState`
22/// when the instance does not exist or has no workflow state.
23///
24/// This is the dedicated `TryFrom` error type. At the client API boundary it is
25/// typically wrapped into [`DurableTaskError::InstanceNotFound`] with the
26/// caller-known instance ID attached.
27#[derive(Debug, Clone)]
28pub struct InstanceNotFound;
29
30impl TryFrom<&proto::GetInstanceResponse> for OrchestrationState {
31    type Error = InstanceNotFound;
32
33    /// Constructs an `OrchestrationState` from a proto `GetInstanceResponse`.
34    ///
35    /// Returns `Err(InstanceNotFound)` if the response indicates the instance
36    /// does not exist or has no workflow state.
37    fn try_from(response: &proto::GetInstanceResponse) -> std::result::Result<Self, Self::Error> {
38        let state = response.workflow_state.as_ref().ok_or(InstanceNotFound)?;
39        if !response.exists {
40            return Err(InstanceNotFound);
41        }
42
43        Ok(Self {
44            instance_id: state.instance_id.clone(),
45            name: state.name.clone(),
46            runtime_status: OrchestrationStatus::try_from(state.workflow_status)
47                .unwrap_or(OrchestrationStatus::Running),
48            created_at: state.created_timestamp.as_ref().and_then(from_timestamp),
49            last_updated_at: state
50                .last_updated_timestamp
51                .as_ref()
52                .and_then(from_timestamp),
53            serialized_input: state.input.clone(),
54            serialized_output: state.output.clone(),
55            serialized_custom_status: state.custom_status.clone(),
56            failure_details: state.failure_details.as_ref().map(FailureDetails::from),
57        })
58    }
59}
60
61impl OrchestrationState {
62    /// Returns `Ok(())` if the orchestration has not failed, or an error with
63    /// the failure details if it has.
64    pub fn raise_if_failed(&self) -> super::Result<()> {
65        if self.runtime_status == OrchestrationStatus::Failed {
66            let message = self
67                .failure_details
68                .as_ref()
69                .map(|d| d.message.clone())
70                .unwrap_or_else(|| "unknown failure".to_string());
71            return Err(DurableTaskError::OrchestrationFailed {
72                instance_id: self.instance_id.clone(),
73                message,
74                failure_details: self.failure_details.clone(),
75            });
76        }
77        Ok(())
78    }
79}
80
81#[cfg(test)]
82fn timestamp_to_datetime(ts: &crate::proto::prost_types::Timestamp) -> Option<DateTime<Utc>> {
83    from_timestamp(ts)
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use crate::proto;
90    use crate::proto::prost_types::Timestamp;
91
92    fn make_workflow_state(status: i32) -> proto::WorkflowState {
93        proto::WorkflowState {
94            instance_id: "test-id".into(),
95            name: "TestOrch".into(),
96            version: None,
97            workflow_status: status,
98            scheduled_start_timestamp: None,
99            created_timestamp: Some(Timestamp {
100                seconds: 1_700_000_000,
101                nanos: 500_000_000,
102            }),
103            last_updated_timestamp: Some(Timestamp {
104                seconds: 1_700_000_100,
105                nanos: 0,
106            }),
107            input: Some(r#""hello""#.into()),
108            output: Some(r#""world""#.into()),
109            custom_status: None,
110            failure_details: None,
111            execution_id: None,
112            completed_timestamp: None,
113            parent_instance_id: None,
114            tags: Default::default(),
115            parent_app_id: None,
116            started_at: None,
117        }
118    }
119
120    #[test]
121    fn timestamp_valid_seconds_and_nanos() {
122        let ts = Timestamp {
123            seconds: 1_700_000_000,
124            nanos: 500_000_000,
125        };
126        let dt = timestamp_to_datetime(&ts).expect("valid timestamp");
127        assert_eq!(dt.timestamp(), 1_700_000_000);
128        assert_eq!(dt.timestamp_subsec_nanos(), 500_000_000);
129    }
130
131    #[test]
132    fn timestamp_negative_nanos_returns_none() {
133        let ts = Timestamp {
134            seconds: 1_700_000_000,
135            nanos: -1,
136        };
137        assert!(timestamp_to_datetime(&ts).is_none());
138    }
139
140    #[test]
141    fn timestamp_overflow_nanos_returns_none() {
142        let ts = Timestamp {
143            seconds: 1_700_000_000,
144            nanos: 1_000_000_000,
145        };
146        assert!(timestamp_to_datetime(&ts).is_none());
147    }
148
149    #[test]
150    fn timestamp_negative_seconds() {
151        let ts = Timestamp {
152            seconds: -1,
153            nanos: 0,
154        };
155        let dt = timestamp_to_datetime(&ts).expect("valid");
156        assert_eq!(dt.timestamp(), -1);
157    }
158
159    #[test]
160    fn try_from_valid_response() {
161        let resp = proto::GetInstanceResponse {
162            exists: true,
163            workflow_state: Some(make_workflow_state(1)), // Completed
164        };
165        let state = OrchestrationState::try_from(&resp).unwrap();
166        assert_eq!(state.instance_id, "test-id");
167        assert_eq!(state.name, "TestOrch");
168        assert_eq!(state.runtime_status, OrchestrationStatus::Completed);
169        assert!(state.created_at.is_some());
170        assert_eq!(state.created_at.unwrap().timestamp(), 1_700_000_000);
171        assert_eq!(state.serialized_input.as_deref(), Some(r#""hello""#));
172    }
173
174    #[test]
175    fn try_from_not_exists() {
176        let resp = proto::GetInstanceResponse {
177            exists: false,
178            workflow_state: Some(make_workflow_state(0)),
179        };
180        assert!(OrchestrationState::try_from(&resp).is_err());
181    }
182
183    #[test]
184    fn try_from_no_workflow_state() {
185        let resp = proto::GetInstanceResponse {
186            exists: true,
187            workflow_state: None,
188        };
189        assert!(OrchestrationState::try_from(&resp).is_err());
190    }
191
192    #[test]
193    fn raise_if_failed_ok_for_completed() {
194        let state = OrchestrationState {
195            instance_id: "i".into(),
196            name: "n".into(),
197            runtime_status: OrchestrationStatus::Completed,
198            created_at: None,
199            last_updated_at: None,
200            serialized_input: None,
201            serialized_output: None,
202            serialized_custom_status: None,
203            failure_details: None,
204        };
205        assert!(state.raise_if_failed().is_ok());
206    }
207
208    #[test]
209    fn raise_if_failed_returns_error_for_failed() {
210        let state = OrchestrationState {
211            instance_id: "i1".into(),
212            name: "n".into(),
213            runtime_status: OrchestrationStatus::Failed,
214            created_at: None,
215            last_updated_at: None,
216            serialized_input: None,
217            serialized_output: None,
218            serialized_custom_status: None,
219            failure_details: None,
220        };
221        let err = state.raise_if_failed().unwrap_err();
222        match err {
223            DurableTaskError::OrchestrationFailed {
224                instance_id,
225                message,
226                ..
227            } => {
228                assert_eq!(instance_id, "i1");
229                assert_eq!(message, "unknown failure");
230            }
231            other => panic!("unexpected error: {other:?}"),
232        }
233    }
234
235    #[test]
236    fn raise_if_failed_with_failure_details() {
237        let details = FailureDetails {
238            message: "boom".into(),
239            error_type: "TestError".into(),
240            stack_trace: Some("at line 1".into()),
241        };
242        let state = OrchestrationState {
243            instance_id: "i2".into(),
244            name: "n".into(),
245            runtime_status: OrchestrationStatus::Failed,
246            created_at: None,
247            last_updated_at: None,
248            serialized_input: None,
249            serialized_output: None,
250            serialized_custom_status: None,
251            failure_details: Some(details.clone()),
252        };
253        let err = state.raise_if_failed().unwrap_err();
254        match err {
255            DurableTaskError::OrchestrationFailed {
256                message,
257                failure_details,
258                ..
259            } => {
260                assert_eq!(message, "boom");
261                let fd = failure_details.unwrap();
262                assert_eq!(fd.error_type, "TestError");
263                assert_eq!(fd.stack_trace.as_deref(), Some("at line 1"));
264            }
265            other => panic!("unexpected error: {other:?}"),
266        }
267    }
268}