1use crate::{
2 durable::{
3 Action, ActionFuture, EventType, HistoryEvent, JoinAll, OrchestrationFuture,
4 OrchestrationState, RetryOptions, SelectAll,
5 },
6 rpc::{typed_data::Data, TypedData},
7};
8use chrono::{DateTime, Utc};
9use serde::Deserialize;
10use serde_json::{from_str, Value};
11use std::{cell::RefCell, collections::HashMap, rc::Rc};
12
13pub struct DurableOrchestrationContext {
60 pub instance_id: String,
62 pub parent_instance_id: Option<String>,
64 pub input: Value,
66 state: Rc<RefCell<OrchestrationState>>,
67}
68
69impl DurableOrchestrationContext {
70 #[doc(hidden)]
71 pub fn new(data: TypedData, _metadata: HashMap<String, TypedData>) -> Self {
72 #[derive(Deserialize)]
73 #[serde(rename_all = "camelCase")]
74 struct BindingData {
75 instance_id: String,
76 parent_instance_id: Option<String>,
77 input: Value,
78 history: Vec<HistoryEvent>,
79 }
80
81 match &data.data {
82 Some(Data::String(s)) => {
83 let data: BindingData =
84 from_str(s).expect("failed to parse orchestration context data");
85
86 DurableOrchestrationContext {
87 instance_id: data.instance_id,
88 parent_instance_id: data.parent_instance_id,
89 input: data.input,
90 state: Rc::new(RefCell::new(OrchestrationState::new(data.history))),
91 }
92 }
93 _ => panic!("expected JSON data for orchestration context data"),
94 }
95 }
96
97 pub fn is_replaying(&self) -> bool {
99 self.state.borrow().is_replaying()
100 }
101
102 pub fn current_time(&self) -> DateTime<Utc> {
104 self.state.borrow().current_time()
105 }
106
107 pub fn set_custom_status<S>(&self, status: S)
109 where
110 S: Into<Value>,
111 {
112 self.state.borrow_mut().set_custom_status(status.into());
113 }
114
115 pub fn new_guid(&self) -> uuid::Uuid {
117 let mut state = self.state.borrow_mut();
118 state.new_guid(&self.instance_id)
119 }
120
121 #[doc(hidden)]
122 pub fn state(&self) -> Rc<RefCell<OrchestrationState>> {
123 self.state.clone()
124 }
125
126 #[must_use = "futures do nothing unless you `.await` or poll them"]
132 pub fn join_all<I>(&self, iter: I) -> JoinAll<I::Item>
133 where
134 I: IntoIterator,
135 I::Item: OrchestrationFuture,
136 {
137 JoinAll::new(self.state.clone(), iter)
138 }
139
140 pub fn select_all<I>(&self, iter: I) -> SelectAll<I::Item>
150 where
151 I: IntoIterator,
152 I::Item: OrchestrationFuture,
153 {
154 SelectAll::new(self.state.clone(), iter)
155 }
156
157 #[must_use = "futures do nothing unless you `.await` or poll them"]
159 pub fn call_activity<D>(
160 &self,
161 activity_name: &str,
162 data: D,
163 ) -> ActionFuture<Result<Value, String>>
164 where
165 D: Into<Value>,
166 {
167 self.perform_call_action(
168 Action::CallActivity {
169 function_name: activity_name.to_string(),
170 input: data.into(),
171 },
172 activity_name,
173 EventType::TaskScheduled,
174 EventType::TaskCompleted,
175 Some(EventType::TaskFailed),
176 )
177 }
178
179 #[must_use = "futures do nothing unless you `.await` or poll them"]
181 pub fn call_activity_with_retry<D>(
182 &self,
183 activity_name: &str,
184 data: D,
185 retry_options: RetryOptions,
186 ) -> ActionFuture<Result<Value, String>>
187 where
188 D: Into<Value>,
189 {
190 self.perform_call_action(
191 Action::CallActivityWithRetry {
192 function_name: activity_name.to_string(),
193 retry_options,
194 input: data.into(),
195 },
196 activity_name,
197 EventType::TaskScheduled,
198 EventType::TaskCompleted,
199 Some(EventType::TaskFailed),
200 )
201 }
202
203 #[must_use = "futures do nothing unless you `.await` or poll them"]
205 pub fn call_sub_orchestrator<D>(
206 &self,
207 function_name: &str,
208 instance_id: Option<String>,
209 data: D,
210 ) -> ActionFuture<Result<Value, String>>
211 where
212 D: Into<Value>,
213 {
214 self.perform_call_action(
215 Action::CallSubOrchestrator {
216 function_name: function_name.to_string(),
217 instance_id,
218 input: data.into(),
219 },
220 function_name,
221 EventType::SubOrchestrationInstanceCreated,
222 EventType::SubOrchestrationInstanceCompleted,
223 Some(EventType::SubOrchestrationInstanceFailed),
224 )
225 }
226
227 #[must_use = "futures do nothing unless you `.await` or poll them"]
229 pub fn call_sub_orchestrator_with_retry<D>(
230 &self,
231 function_name: &str,
232 instance_id: Option<String>,
233 data: D,
234 retry_options: RetryOptions,
235 ) -> ActionFuture<Result<Value, String>>
236 where
237 D: Into<Value>,
238 {
239 self.perform_call_action(
240 Action::CallSubOrchestratorWithRetry {
241 function_name: function_name.to_string(),
242 retry_options,
243 instance_id,
244 input: data.into(),
245 },
246 function_name,
247 EventType::SubOrchestrationInstanceCreated,
248 EventType::SubOrchestrationInstanceCompleted,
249 Some(EventType::SubOrchestrationInstanceFailed),
250 )
251 }
252
253 pub fn continue_as_new<D>(&self, input: D, preserve_unprocessed_events: bool)
255 where
256 D: Into<Value>,
257 {
258 let mut state = self.state.borrow_mut();
259
260 state.push_action(Action::ContinueAsNew {
261 input: input.into(),
262 preserve_unprocessed_events,
263 });
264 }
265
266 #[must_use = "futures do nothing unless you `.await` or poll them"]
268 pub fn create_timer(&self, fire_at: DateTime<Utc>) -> ActionFuture<()> {
269 let mut state = self.state.borrow_mut();
270
271 state.push_action(Action::CreateTimer {
272 fire_at,
273 canceled: false,
274 });
275
276 let mut result = None;
277 let mut event_index = None;
278
279 if let Some((idx, created)) = state.find_timer_created() {
280 created.is_processed = true;
281
282 if let Some((idx, fired)) = state.find_timer_fired(idx) {
283 fired.is_processed = true;
284 event_index = Some(idx);
285 result = Some(());
286 }
287 }
288
289 ActionFuture::new(result, self.state.clone(), event_index)
290 }
291
292 pub fn wait_for_event(&self, name: &str) -> ActionFuture<Result<Value, String>> {
294 let mut state = self.state.borrow_mut();
295
296 state.push_action(Action::WaitForExternalEvent {
297 external_event_name: name.to_string(),
298 });
299
300 let mut input = None;
301 let mut event_index = None;
302
303 if let Some((idx, raised)) = state.find_event_raised(name) {
304 raised.is_processed = true;
305 input = Some(Ok(raised
307 .input
308 .as_ref()
309 .map(|v| {
310 v.as_str()
311 .map(|s| from_str(&s).unwrap_or_default())
312 .unwrap_or_default()
313 })
314 .unwrap_or_default()));
315 event_index = Some(idx);
316 }
317
318 ActionFuture::new(input, self.state.clone(), event_index)
319 }
320
321 fn perform_call_action(
322 &self,
323 action: Action,
324 name: &str,
325 started_type: EventType,
326 completed_type: EventType,
327 failed_type: Option<EventType>,
328 ) -> ActionFuture<Result<Value, String>> {
329 let mut state = self.state.borrow_mut();
330
331 state.push_action(action);
332
333 let mut result: Option<Result<Value, String>> = None;
334 let mut event_index = None;
335
336 if let Some((idx, scheduled)) = state.find_start_event(name, started_type) {
337 scheduled.is_processed = true;
338
339 if let Some((idx, finished)) = state.find_end_event(idx, completed_type, failed_type) {
340 finished.is_processed = true;
341 event_index = Some(idx);
342
343 if finished.event_type == completed_type {
344 result = Some(Ok(finished
345 .result
346 .as_ref()
347 .map(|s| from_str(&s).unwrap_or_default())
348 .unwrap_or_default()));
349 } else if let Some(failed_type) = failed_type {
350 if finished.event_type == failed_type {
351 result = Some(Err(finished.reason.clone().unwrap_or_default()));
352 }
353 } else {
354 panic!("event must be a completion or a failure");
355 }
356 }
357 }
358
359 ActionFuture::new(result, self.state.clone(), event_index)
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use super::*;
366 use crate::durable::{EventType, HistoryEvent};
367 use crate::rpc::typed_data::Data;
368 use chrono::DateTime;
369
370 #[test]
371 #[should_panic(expected = "expected JSON data for orchestration context data")]
372 fn new_panics_if_no_data_provided() {
373 let data = TypedData { data: None };
374
375 let _ = DurableOrchestrationContext::new(data, HashMap::new());
376 }
377
378 #[test]
379 #[should_panic(expected = "failed to parse orchestration context data")]
380 fn new_panics_if_no_json_provided() {
381 let data = TypedData {
382 data: Some(Data::String(r#"{ }"#.to_owned())),
383 };
384
385 let _ = DurableOrchestrationContext::new(data, HashMap::new());
386 }
387
388 #[test]
389 #[should_panic(expected = "failed to find orchestrator started event")]
390 fn it_panics_if_missing_history() {
391 let data = TypedData {
392 data: Some(Data::String(
393 r#"{
394 "instanceId":"49497890673e4a75ab380e7a956c607b",
395 "isReplaying":false,
396 "parentInstanceId":"1234123412341234123412341234",
397 "input": [],
398 "history": []
399 }"#
400 .to_owned(),
401 )),
402 };
403
404 DurableOrchestrationContext::new(data, HashMap::new());
405 }
406
407 #[test]
408 fn new_constructs_an_orchestration_context_with_history() {
409 let data = TypedData {
410 data: Some(Data::String(
411 r#"{
412 "history":[
413 {
414 "EventType":12,
415 "EventId":-1,
416 "IsPlayed":true,
417 "Timestamp":"2019-07-18T06:22:27.016757Z"
418 },
419 {
420 "OrchestrationInstance":{
421 "InstanceId":"49497890673e4a75ab380e7a956c607b",
422 "ExecutionId":"5d2025984bef476bbaacefaa499a4f5f"
423 },
424 "EventType":0,
425 "ParentInstance":null,
426 "Name":"HelloWorld",
427 "Version":"",
428 "Input":"{}",
429 "Tags":null,
430 "EventId":-1,
431 "IsPlayed":false,
432 "Timestamp":"2019-07-18T06:22:26.626966Z"
433 }
434 ],
435 "instanceId":"49497890673e4a75ab380e7a956c607b",
436 "isReplaying":false,
437 "parentInstanceId":null,
438 "input": []
439 }"#
440 .to_owned(),
441 )),
442 };
443
444 let context = DurableOrchestrationContext::new(data, HashMap::new());
445 assert_eq!(context.instance_id, "49497890673e4a75ab380e7a956c607b");
446 assert_eq!(context.parent_instance_id, None);
447 assert!(!context.is_replaying());
448 assert_eq!(context.input, serde_json::Value::Array(vec![]));
449 assert_eq!(
450 context.state.borrow().history,
451 vec![
452 HistoryEvent {
453 event_type: EventType::OrchestratorStarted,
454 event_id: -1,
455 is_played: true,
456 timestamp: DateTime::<Utc>::from(
457 DateTime::parse_from_rfc3339("2019-07-18T06:22:27.016757Z").unwrap()
458 ),
459 is_processed: false,
460 name: None,
461 input: None,
462 result: None,
463 task_scheduled_id: None,
464 instance_id: None,
465 reason: None,
466 details: None,
467 fire_at: None,
468 timer_id: None,
469 },
470 HistoryEvent {
471 event_type: EventType::ExecutionStarted,
472 event_id: -1,
473 is_played: false,
474 timestamp: DateTime::<Utc>::from(
475 DateTime::parse_from_rfc3339("2019-07-18T06:22:26.626966Z").unwrap()
476 ),
477 is_processed: false,
478 name: Some("HelloWorld".to_owned()),
479 input: Some("{}".into()),
480 result: None,
481 task_scheduled_id: None,
482 instance_id: None,
483 reason: None,
484 details: None,
485 fire_at: None,
486 timer_id: None,
487 }
488 ]
489 );
490 }
491}