1use chrono::{DateTime, Utc};
2
3use super::{DurableTaskError, FailureDetails, OrchestrationStatus};
4use crate::proto;
5
6#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
8pub struct OrchestrationState {
9 pub instance_id: String,
10 pub name: String,
11 pub runtime_status: OrchestrationStatus,
12 pub created_at: Option<DateTime<Utc>>,
13 pub last_updated_at: Option<DateTime<Utc>>,
14 pub serialized_input: Option<String>,
15 pub serialized_output: Option<String>,
16 pub serialized_custom_status: Option<String>,
17 pub failure_details: Option<FailureDetails>,
18}
19
20#[derive(Debug, Clone)]
23pub struct InstanceNotFound;
24
25impl TryFrom<&proto::GetInstanceResponse> for OrchestrationState {
26 type Error = InstanceNotFound;
27
28 fn try_from(response: &proto::GetInstanceResponse) -> std::result::Result<Self, Self::Error> {
33 let state = response.workflow_state.as_ref().ok_or(InstanceNotFound)?;
34 if !response.exists {
35 return Err(InstanceNotFound);
36 }
37
38 Ok(Self {
39 instance_id: state.instance_id.clone(),
40 name: state.name.clone(),
41 runtime_status: OrchestrationStatus::try_from(state.workflow_status)
42 .unwrap_or(OrchestrationStatus::Running),
43 created_at: state.created_timestamp.as_ref().map(timestamp_to_datetime),
44 last_updated_at: state
45 .last_updated_timestamp
46 .as_ref()
47 .map(timestamp_to_datetime),
48 serialized_input: state.input.clone(),
49 serialized_output: state.output.clone(),
50 serialized_custom_status: state.custom_status.clone(),
51 failure_details: state.failure_details.as_ref().map(FailureDetails::from),
52 })
53 }
54}
55
56impl OrchestrationState {
57 pub fn raise_if_failed(&self) -> super::Result<()> {
60 if self.runtime_status == OrchestrationStatus::Failed {
61 let message = self
62 .failure_details
63 .as_ref()
64 .map(|d| d.message.clone())
65 .unwrap_or_else(|| "unknown failure".to_string());
66 return Err(DurableTaskError::OrchestrationFailed {
67 instance_id: self.instance_id.clone(),
68 message,
69 failure_details: self.failure_details.clone(),
70 });
71 }
72 Ok(())
73 }
74}
75
76fn timestamp_to_datetime(ts: &crate::proto::prost_types::Timestamp) -> DateTime<Utc> {
77 let nanos = if ts.nanos >= 0 && ts.nanos <= 999_999_999 {
78 ts.nanos as u32
79 } else {
80 0
81 };
82 DateTime::from_timestamp(ts.seconds, nanos).unwrap_or_default()
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::proto;
89 use crate::proto::prost_types::Timestamp;
90
91 fn make_workflow_state(status: i32) -> proto::WorkflowState {
92 proto::WorkflowState {
93 instance_id: "test-id".into(),
94 name: "TestOrch".into(),
95 version: None,
96 workflow_status: status,
97 scheduled_start_timestamp: None,
98 created_timestamp: Some(Timestamp {
99 seconds: 1_700_000_000,
100 nanos: 500_000_000,
101 }),
102 last_updated_timestamp: Some(Timestamp {
103 seconds: 1_700_000_100,
104 nanos: 0,
105 }),
106 input: Some(r#""hello""#.into()),
107 output: Some(r#""world""#.into()),
108 custom_status: None,
109 failure_details: None,
110 execution_id: None,
111 completed_timestamp: None,
112 parent_instance_id: None,
113 tags: Default::default(),
114 parent_app_id: None,
115 started_at: None,
116 }
117 }
118
119 #[test]
120 fn timestamp_valid_seconds_and_nanos() {
121 let ts = Timestamp {
122 seconds: 1_700_000_000,
123 nanos: 500_000_000,
124 };
125 let dt = timestamp_to_datetime(&ts);
126 assert_eq!(dt.timestamp(), 1_700_000_000);
127 assert_eq!(dt.timestamp_subsec_nanos(), 500_000_000);
128 }
129
130 #[test]
131 fn timestamp_negative_nanos_clamps_to_zero() {
132 let ts = Timestamp {
133 seconds: 1_700_000_000,
134 nanos: -1,
135 };
136 let dt = timestamp_to_datetime(&ts);
137 assert_eq!(dt.timestamp(), 1_700_000_000);
138 assert_eq!(dt.timestamp_subsec_nanos(), 0);
139 }
140
141 #[test]
142 fn timestamp_overflow_nanos_clamps_to_zero() {
143 let ts = Timestamp {
144 seconds: 1_700_000_000,
145 nanos: 1_000_000_000,
146 };
147 let dt = timestamp_to_datetime(&ts);
148 assert_eq!(dt.timestamp(), 1_700_000_000);
149 assert_eq!(dt.timestamp_subsec_nanos(), 0);
150 }
151
152 #[test]
153 fn timestamp_negative_seconds() {
154 let ts = Timestamp {
155 seconds: -1,
156 nanos: 0,
157 };
158 let dt = timestamp_to_datetime(&ts);
159 assert_eq!(dt.timestamp(), -1);
160 }
161
162 #[test]
163 fn try_from_valid_response() {
164 let resp = proto::GetInstanceResponse {
165 exists: true,
166 workflow_state: Some(make_workflow_state(1)), };
168 let state = OrchestrationState::try_from(&resp).unwrap();
169 assert_eq!(state.instance_id, "test-id");
170 assert_eq!(state.name, "TestOrch");
171 assert_eq!(state.runtime_status, OrchestrationStatus::Completed);
172 assert!(state.created_at.is_some());
173 assert_eq!(state.created_at.unwrap().timestamp(), 1_700_000_000);
174 assert_eq!(state.serialized_input.as_deref(), Some(r#""hello""#));
175 }
176
177 #[test]
178 fn try_from_not_exists() {
179 let resp = proto::GetInstanceResponse {
180 exists: false,
181 workflow_state: Some(make_workflow_state(0)),
182 };
183 assert!(OrchestrationState::try_from(&resp).is_err());
184 }
185
186 #[test]
187 fn try_from_no_workflow_state() {
188 let resp = proto::GetInstanceResponse {
189 exists: true,
190 workflow_state: None,
191 };
192 assert!(OrchestrationState::try_from(&resp).is_err());
193 }
194
195 #[test]
196 fn raise_if_failed_ok_for_completed() {
197 let state = OrchestrationState {
198 instance_id: "i".into(),
199 name: "n".into(),
200 runtime_status: OrchestrationStatus::Completed,
201 created_at: None,
202 last_updated_at: None,
203 serialized_input: None,
204 serialized_output: None,
205 serialized_custom_status: None,
206 failure_details: None,
207 };
208 assert!(state.raise_if_failed().is_ok());
209 }
210
211 #[test]
212 fn raise_if_failed_returns_error_for_failed() {
213 let state = OrchestrationState {
214 instance_id: "i1".into(),
215 name: "n".into(),
216 runtime_status: OrchestrationStatus::Failed,
217 created_at: None,
218 last_updated_at: None,
219 serialized_input: None,
220 serialized_output: None,
221 serialized_custom_status: None,
222 failure_details: None,
223 };
224 let err = state.raise_if_failed().unwrap_err();
225 match err {
226 DurableTaskError::OrchestrationFailed {
227 instance_id,
228 message,
229 ..
230 } => {
231 assert_eq!(instance_id, "i1");
232 assert_eq!(message, "unknown failure");
233 }
234 other => panic!("unexpected error: {other:?}"),
235 }
236 }
237
238 #[test]
239 fn raise_if_failed_with_failure_details() {
240 let details = FailureDetails {
241 message: "boom".into(),
242 error_type: "TestError".into(),
243 stack_trace: Some("at line 1".into()),
244 };
245 let state = OrchestrationState {
246 instance_id: "i2".into(),
247 name: "n".into(),
248 runtime_status: OrchestrationStatus::Failed,
249 created_at: None,
250 last_updated_at: None,
251 serialized_input: None,
252 serialized_output: None,
253 serialized_custom_status: None,
254 failure_details: Some(details.clone()),
255 };
256 let err = state.raise_if_failed().unwrap_err();
257 match err {
258 DurableTaskError::OrchestrationFailed {
259 message,
260 failure_details,
261 ..
262 } => {
263 assert_eq!(message, "boom");
264 let fd = failure_details.unwrap();
265 assert_eq!(fd.error_type, "TestError");
266 assert_eq!(fd.stack_trace.as_deref(), Some("at line 1"));
267 }
268 other => panic!("unexpected error: {other:?}"),
269 }
270 }
271}