1use crate::durable::{OrchestrationFuture, OrchestrationState};
2use std::{
3 cell::RefCell,
4 future::Future,
5 pin::Pin,
6 rc::Rc,
7 task::{Context, Poll},
8};
9
10pub struct ActionFuture<T> {
12 result: Option<T>,
13 state: Rc<RefCell<OrchestrationState>>,
14 event_index: Option<usize>,
15 is_inner: bool,
16}
17
18impl<T> ActionFuture<T> {
19 pub(crate) fn new(
20 result: Option<T>,
21 state: Rc<RefCell<OrchestrationState>>,
22 event_index: Option<usize>,
23 ) -> Self {
24 assert!(
25 (result.is_none() && event_index.is_none())
26 || (result.is_some() && event_index.is_some())
27 );
28
29 ActionFuture {
30 result,
31 state,
32 event_index,
33 is_inner: false,
34 }
35 }
36}
37
38impl<T> Future for ActionFuture<T>
39where
40 T: Unpin,
41{
42 type Output = T;
43
44 fn poll(mut self: Pin<&mut Self>, _context: &mut Context) -> Poll<T> {
45 if let Some(v) = self.result.take() {
46 if !self.is_inner {
47 self.state.borrow_mut().update(self.event_index.unwrap());
48 }
49 return Poll::Ready(v);
50 }
51
52 Poll::Pending
53 }
54}
55
56impl<T> OrchestrationFuture for ActionFuture<T>
57where
58 T: Unpin,
59{
60 fn notify_inner(&mut self) {
61 self.is_inner = true;
62 }
63
64 fn event_index(&self) -> Option<usize> {
65 self.event_index
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::*;
72 use crate::durable::{
73 tests::{create_event, poll},
74 EventType,
75 };
76 use serde_json::{from_str, json};
77 use std::task::Poll;
78
79 #[test]
80 fn it_polls_pending_without_a_result() {
81 let history = vec![create_event(
82 EventType::OrchestratorStarted,
83 -1,
84 None,
85 None,
86 None,
87 )];
88
89 let state = Rc::new(RefCell::new(OrchestrationState::new(history)));
90 let future = ActionFuture::<()>::new(None, state, None);
91
92 assert_eq!(poll(future), Poll::Pending);
93 }
94
95 #[test]
96 fn it_polls_ready_given_a_result() {
97 let history = vec![
98 create_event(EventType::OrchestratorStarted, -1, None, None, None),
99 create_event(
100 EventType::TaskScheduled,
101 0,
102 Some("hello".to_string()),
103 None,
104 None,
105 ),
106 create_event(
107 EventType::TaskCompleted,
108 -1,
109 Some("hello".to_string()),
110 Some(json!("hello").to_string()),
111 Some(0),
112 ),
113 ];
114
115 let mut state = OrchestrationState::new(history);
116 let (idx, event) = state
117 .find_start_event("hello", EventType::TaskScheduled)
118 .unwrap();
119 event.is_processed = true;
120
121 let (idx, event) = state
122 .find_end_event(idx, EventType::TaskCompleted, Some(EventType::TaskFailed))
123 .unwrap();
124 event.is_processed = true;
125
126 let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
127 let state = Rc::new(RefCell::new(state));
128 let future = ActionFuture::new(result, state, Some(idx));
129
130 assert_eq!(future.event_index(), Some(idx));
131 assert_eq!(poll(future), Poll::Ready(json!("hello")));
132 }
133
134 #[test]
135 fn it_updates_state() {
136 let history = vec![
137 create_event(EventType::OrchestratorStarted, -1, None, None, None),
138 create_event(
139 EventType::TaskScheduled,
140 0,
141 Some("hello".to_string()),
142 None,
143 None,
144 ),
145 create_event(EventType::OrchestratorCompleted, -1, None, None, None),
146 create_event(EventType::OrchestratorStarted, -1, None, None, None),
147 create_event(
148 EventType::TaskCompleted,
149 -1,
150 Some("hello".to_string()),
151 Some(json!("hello").to_string()),
152 Some(0),
153 ),
154 ];
155
156 let mut state = OrchestrationState::new(history);
157 assert!(state.is_replaying());
158
159 let (idx, event) = state
160 .find_start_event("hello", EventType::TaskScheduled)
161 .unwrap();
162 event.is_processed = true;
163
164 let (idx, event) = state
165 .find_end_event(idx, EventType::TaskCompleted, Some(EventType::TaskFailed))
166 .unwrap();
167 event.is_processed = true;
168
169 let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
170 let state = Rc::new(RefCell::new(state));
171 let future = ActionFuture::new(result, state.clone(), Some(idx));
172
173 assert_eq!(future.event_index(), Some(idx));
174 assert_eq!(poll(future), Poll::Ready(json!("hello")));
175 assert!(!state.borrow().is_replaying());
176 }
177
178 #[test]
179 fn it_does_not_update_state_when_an_inner_future() {
180 let history = vec![
181 create_event(EventType::OrchestratorStarted, -1, None, None, None),
182 create_event(
183 EventType::TaskScheduled,
184 0,
185 Some("hello".to_string()),
186 None,
187 None,
188 ),
189 create_event(EventType::OrchestratorCompleted, -1, None, None, None),
190 create_event(EventType::OrchestratorStarted, -1, None, None, None),
191 create_event(
192 EventType::TaskCompleted,
193 -1,
194 Some("hello".to_string()),
195 Some(json!("hello").to_string()),
196 Some(0),
197 ),
198 ];
199
200 let mut state = OrchestrationState::new(history);
201 assert!(state.is_replaying());
202
203 let (idx, event) = state
204 .find_start_event("hello", EventType::TaskScheduled)
205 .unwrap();
206 event.is_processed = true;
207
208 let (idx, event) = state
209 .find_end_event(idx, EventType::TaskCompleted, Some(EventType::TaskFailed))
210 .unwrap();
211 event.is_processed = true;
212
213 let result = Some(from_str(&event.result.as_ref().unwrap()).unwrap());
214 let state = Rc::new(RefCell::new(state));
215 let mut future = ActionFuture::new(result, state.clone(), Some(idx));
216 future.notify_inner();
217
218 assert_eq!(future.event_index(), Some(idx));
219 assert_eq!(poll(future), Poll::Ready(json!("hello")));
220 assert!(state.borrow().is_replaying());
221 }
222}