azure_functions/durable/
action_future.rs

1use crate::durable::{OrchestrationFuture, OrchestrationState};
2use std::{
3    cell::RefCell,
4    future::Future,
5    pin::Pin,
6    rc::Rc,
7    task::{Context, Poll},
8};
9
10/// Future returned by various `DurableOrchestrationContext` functions.
11pub struct ActionFuture<T> {
12    result: Option<T>,
13    state: Rc<RefCell<OrchestrationState>>,
14    event_index: Option<usize>,
15    is_inner: bool,
16}
17
18impl<T> ActionFuture<T> {
19    pub(crate) fn new(
20        result: Option<T>,
21        state: Rc<RefCell<OrchestrationState>>,
22        event_index: Option<usize>,
23    ) -> Self {
24        assert!(
25            (result.is_none() && event_index.is_none())
26                || (result.is_some() && event_index.is_some())
27        );
28
29        ActionFuture {
30            result,
31            state,
32            event_index,
33            is_inner: false,
34        }
35    }
36}
37
38impl<T> Future for ActionFuture<T>
39where
40    T: Unpin,
41{
42    type Output = T;
43
44    fn poll(mut self: Pin<&mut Self>, _context: &mut Context) -> Poll<T> {
45        if let Some(v) = self.result.take() {
46            if !self.is_inner {
47                self.state.borrow_mut().update(self.event_index.unwrap());
48            }
49            return Poll::Ready(v);
50        }
51
52        Poll::Pending
53    }
54}
55
56impl<T> OrchestrationFuture for ActionFuture<T>
57where
58    T: Unpin,
59{
60    fn notify_inner(&mut self) {
61        self.is_inner = true;
62    }
63
64    fn event_index(&self) -> Option<usize> {
65        self.event_index
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72    use crate::durable::{
73        tests::{create_event, poll},
74        EventType,
75    };
76    use serde_json::{from_str, json};
77    use std::task::Poll;
78
79    #[test]
80    fn it_polls_pending_without_a_result() {
81        let history = vec![create_event(
82            EventType::OrchestratorStarted,
83            -1,
84            None,
85            None,
86            None,
87        )];
88
89        let state = Rc::new(RefCell::new(OrchestrationState::new(history)));
90        let future = ActionFuture::<()>::new(None, state, None);
91
92        assert_eq!(poll(future), Poll::Pending);
93    }
94
95    #[test]
96    fn it_polls_ready_given_a_result() {
97        let history = vec![
98            create_event(EventType::OrchestratorStarted, -1, None, None, None),
99            create_event(
100                EventType::TaskScheduled,
101                0,
102                Some("hello".to_string()),
103                None,
104                None,
105            ),
106            create_event(
107                EventType::TaskCompleted,
108                -1,
109                Some("hello".to_string()),
110                Some(json!("hello").to_string()),
111                Some(0),
112            ),
113        ];
114
115        let mut state = OrchestrationState::new(history);
116        let (idx, event) = state
117            .find_start_event("hello", EventType::TaskScheduled)
118            .unwrap();
119        event.is_processed = true;
120
121        let (idx, event) = state
122            .find_end_event(idx, EventType::TaskCompleted, Some(EventType::TaskFailed))
123            .unwrap();
124        event.is_processed = true;
125
126        let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
127        let state = Rc::new(RefCell::new(state));
128        let future = ActionFuture::new(result, state, Some(idx));
129
130        assert_eq!(future.event_index(), Some(idx));
131        assert_eq!(poll(future), Poll::Ready(json!("hello")));
132    }
133
134    #[test]
135    fn it_updates_state() {
136        let history = vec![
137            create_event(EventType::OrchestratorStarted, -1, None, None, None),
138            create_event(
139                EventType::TaskScheduled,
140                0,
141                Some("hello".to_string()),
142                None,
143                None,
144            ),
145            create_event(EventType::OrchestratorCompleted, -1, None, None, None),
146            create_event(EventType::OrchestratorStarted, -1, None, None, None),
147            create_event(
148                EventType::TaskCompleted,
149                -1,
150                Some("hello".to_string()),
151                Some(json!("hello").to_string()),
152                Some(0),
153            ),
154        ];
155
156        let mut state = OrchestrationState::new(history);
157        assert!(state.is_replaying());
158
159        let (idx, event) = state
160            .find_start_event("hello", EventType::TaskScheduled)
161            .unwrap();
162        event.is_processed = true;
163
164        let (idx, event) = state
165            .find_end_event(idx, EventType::TaskCompleted, Some(EventType::TaskFailed))
166            .unwrap();
167        event.is_processed = true;
168
169        let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
170        let state = Rc::new(RefCell::new(state));
171        let future = ActionFuture::new(result, state.clone(), Some(idx));
172
173        assert_eq!(future.event_index(), Some(idx));
174        assert_eq!(poll(future), Poll::Ready(json!("hello")));
175        assert!(!state.borrow().is_replaying());
176    }
177
178    #[test]
179    fn it_does_not_update_state_when_an_inner_future() {
180        let history = vec![
181            create_event(EventType::OrchestratorStarted, -1, None, None, None),
182            create_event(
183                EventType::TaskScheduled,
184                0,
185                Some("hello".to_string()),
186                None,
187                None,
188            ),
189            create_event(EventType::OrchestratorCompleted, -1, None, None, None),
190            create_event(EventType::OrchestratorStarted, -1, None, None, None),
191            create_event(
192                EventType::TaskCompleted,
193                -1,
194                Some("hello".to_string()),
195                Some(json!("hello").to_string()),
196                Some(0),
197            ),
198        ];
199
200        let mut state = OrchestrationState::new(history);
201        assert!(state.is_replaying());
202
203        let (idx, event) = state
204            .find_start_event("hello", EventType::TaskScheduled)
205            .unwrap();
206        event.is_processed = true;
207
208        let (idx, event) = state
209            .find_end_event(idx, EventType::TaskCompleted, Some(EventType::TaskFailed))
210            .unwrap();
211        event.is_processed = true;
212
213        let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
214        let state = Rc::new(RefCell::new(state));
215        let mut future = ActionFuture::new(result, state.clone(), Some(idx));
216        future.notify_inner();
217
218        assert_eq!(future.event_index(), Some(idx));
219        assert_eq!(poll(future), Poll::Ready(json!("hello")));
220        assert!(state.borrow().is_replaying());
221    }
222}