1use chrono::{DateTime, Utc};
2
3use super::{DurableTaskError, FailureDetails, OrchestrationStatus};
4use crate::internal::from_timestamp;
5use crate::proto;
6
7#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
9pub struct OrchestrationState {
10 pub instance_id: String,
11 pub name: String,
12 pub runtime_status: OrchestrationStatus,
13 pub created_at: Option<DateTime<Utc>>,
14 pub last_updated_at: Option<DateTime<Utc>>,
15 pub serialized_input: Option<String>,
16 pub serialized_output: Option<String>,
17 pub serialized_custom_status: Option<String>,
18 pub failure_details: Option<FailureDetails>,
19}
20
21#[derive(Debug, Clone)]
28pub struct InstanceNotFound;
29
30impl TryFrom<&proto::GetInstanceResponse> for OrchestrationState {
31 type Error = InstanceNotFound;
32
33 fn try_from(response: &proto::GetInstanceResponse) -> std::result::Result<Self, Self::Error> {
38 let state = response.workflow_state.as_ref().ok_or(InstanceNotFound)?;
39 if !response.exists {
40 return Err(InstanceNotFound);
41 }
42
43 Ok(Self {
44 instance_id: state.instance_id.clone(),
45 name: state.name.clone(),
46 runtime_status: OrchestrationStatus::try_from(state.workflow_status)
47 .unwrap_or(OrchestrationStatus::Running),
48 created_at: state.created_timestamp.as_ref().and_then(from_timestamp),
49 last_updated_at: state
50 .last_updated_timestamp
51 .as_ref()
52 .and_then(from_timestamp),
53 serialized_input: state.input.clone(),
54 serialized_output: state.output.clone(),
55 serialized_custom_status: state.custom_status.clone(),
56 failure_details: state.failure_details.as_ref().map(FailureDetails::from),
57 })
58 }
59}
60
61impl OrchestrationState {
62 pub fn raise_if_failed(&self) -> super::Result<()> {
65 if self.runtime_status == OrchestrationStatus::Failed {
66 let message = self
67 .failure_details
68 .as_ref()
69 .map(|d| d.message.clone())
70 .unwrap_or_else(|| "unknown failure".to_string());
71 return Err(DurableTaskError::OrchestrationFailed {
72 instance_id: self.instance_id.clone(),
73 message,
74 failure_details: self.failure_details.clone(),
75 });
76 }
77 Ok(())
78 }
79}
80
81#[cfg(test)]
82fn timestamp_to_datetime(ts: &crate::proto::prost_types::Timestamp) -> Option<DateTime<Utc>> {
83 from_timestamp(ts)
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use crate::proto;
90 use crate::proto::prost_types::Timestamp;
91
92 fn make_workflow_state(status: i32) -> proto::WorkflowState {
93 proto::WorkflowState {
94 instance_id: "test-id".into(),
95 name: "TestOrch".into(),
96 version: None,
97 workflow_status: status,
98 scheduled_start_timestamp: None,
99 created_timestamp: Some(Timestamp {
100 seconds: 1_700_000_000,
101 nanos: 500_000_000,
102 }),
103 last_updated_timestamp: Some(Timestamp {
104 seconds: 1_700_000_100,
105 nanos: 0,
106 }),
107 input: Some(r#""hello""#.into()),
108 output: Some(r#""world""#.into()),
109 custom_status: None,
110 failure_details: None,
111 execution_id: None,
112 completed_timestamp: None,
113 parent_instance_id: None,
114 tags: Default::default(),
115 parent_app_id: None,
116 started_at: None,
117 }
118 }
119
120 #[test]
121 fn timestamp_valid_seconds_and_nanos() {
122 let ts = Timestamp {
123 seconds: 1_700_000_000,
124 nanos: 500_000_000,
125 };
126 let dt = timestamp_to_datetime(&ts).expect("valid timestamp");
127 assert_eq!(dt.timestamp(), 1_700_000_000);
128 assert_eq!(dt.timestamp_subsec_nanos(), 500_000_000);
129 }
130
131 #[test]
132 fn timestamp_negative_nanos_returns_none() {
133 let ts = Timestamp {
134 seconds: 1_700_000_000,
135 nanos: -1,
136 };
137 assert!(timestamp_to_datetime(&ts).is_none());
138 }
139
140 #[test]
141 fn timestamp_overflow_nanos_returns_none() {
142 let ts = Timestamp {
143 seconds: 1_700_000_000,
144 nanos: 1_000_000_000,
145 };
146 assert!(timestamp_to_datetime(&ts).is_none());
147 }
148
149 #[test]
150 fn timestamp_negative_seconds() {
151 let ts = Timestamp {
152 seconds: -1,
153 nanos: 0,
154 };
155 let dt = timestamp_to_datetime(&ts).expect("valid");
156 assert_eq!(dt.timestamp(), -1);
157 }
158
159 #[test]
160 fn try_from_valid_response() {
161 let resp = proto::GetInstanceResponse {
162 exists: true,
163 workflow_state: Some(make_workflow_state(1)), };
165 let state = OrchestrationState::try_from(&resp).unwrap();
166 assert_eq!(state.instance_id, "test-id");
167 assert_eq!(state.name, "TestOrch");
168 assert_eq!(state.runtime_status, OrchestrationStatus::Completed);
169 assert!(state.created_at.is_some());
170 assert_eq!(state.created_at.unwrap().timestamp(), 1_700_000_000);
171 assert_eq!(state.serialized_input.as_deref(), Some(r#""hello""#));
172 }
173
174 #[test]
175 fn try_from_not_exists() {
176 let resp = proto::GetInstanceResponse {
177 exists: false,
178 workflow_state: Some(make_workflow_state(0)),
179 };
180 assert!(OrchestrationState::try_from(&resp).is_err());
181 }
182
183 #[test]
184 fn try_from_no_workflow_state() {
185 let resp = proto::GetInstanceResponse {
186 exists: true,
187 workflow_state: None,
188 };
189 assert!(OrchestrationState::try_from(&resp).is_err());
190 }
191
192 #[test]
193 fn raise_if_failed_ok_for_completed() {
194 let state = OrchestrationState {
195 instance_id: "i".into(),
196 name: "n".into(),
197 runtime_status: OrchestrationStatus::Completed,
198 created_at: None,
199 last_updated_at: None,
200 serialized_input: None,
201 serialized_output: None,
202 serialized_custom_status: None,
203 failure_details: None,
204 };
205 assert!(state.raise_if_failed().is_ok());
206 }
207
208 #[test]
209 fn raise_if_failed_returns_error_for_failed() {
210 let state = OrchestrationState {
211 instance_id: "i1".into(),
212 name: "n".into(),
213 runtime_status: OrchestrationStatus::Failed,
214 created_at: None,
215 last_updated_at: None,
216 serialized_input: None,
217 serialized_output: None,
218 serialized_custom_status: None,
219 failure_details: None,
220 };
221 let err = state.raise_if_failed().unwrap_err();
222 match err {
223 DurableTaskError::OrchestrationFailed {
224 instance_id,
225 message,
226 ..
227 } => {
228 assert_eq!(instance_id, "i1");
229 assert_eq!(message, "unknown failure");
230 }
231 other => panic!("unexpected error: {other:?}"),
232 }
233 }
234
235 #[test]
236 fn raise_if_failed_with_failure_details() {
237 let details = FailureDetails {
238 message: "boom".into(),
239 error_type: "TestError".into(),
240 stack_trace: Some("at line 1".into()),
241 };
242 let state = OrchestrationState {
243 instance_id: "i2".into(),
244 name: "n".into(),
245 runtime_status: OrchestrationStatus::Failed,
246 created_at: None,
247 last_updated_at: None,
248 serialized_input: None,
249 serialized_output: None,
250 serialized_custom_status: None,
251 failure_details: Some(details.clone()),
252 };
253 let err = state.raise_if_failed().unwrap_err();
254 match err {
255 DurableTaskError::OrchestrationFailed {
256 message,
257 failure_details,
258 ..
259 } => {
260 assert_eq!(message, "boom");
261 let fd = failure_details.unwrap();
262 assert_eq!(fd.error_type, "TestError");
263 assert_eq!(fd.stack_trace.as_deref(), Some("at line 1"));
264 }
265 other => panic!("unexpected error: {other:?}"),
266 }
267 }
268}