azure_functions/
durable.rs1use crate::rpc::{
3 status_result::Status, typed_data::Data, InvocationResponse, StatusResult, TypedData,
4};
5use serde_json::Value;
6use std::{
7 cell::RefCell,
8 future::Future,
9 ptr::null,
10 rc::Rc,
11 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12};
13
14mod action_future;
15mod actions;
16mod activity_output;
17mod history;
18mod join_all;
19mod orchestration_output;
20mod orchestration_state;
21mod select_all;
22
23pub use self::action_future::*;
24pub use self::actions::*;
25pub use self::activity_output::*;
26pub(crate) use self::history::*;
27pub use self::join_all::*;
28pub use self::orchestration_output::*;
29pub use self::orchestration_state::*;
30pub use self::select_all::*;
31
32pub trait OrchestrationFuture: Future {
34 #[doc(hidden)]
35 fn notify_inner(&mut self);
36
37 #[doc(hidden)]
38 fn event_index(&self) -> Option<usize>;
39}
40
41unsafe fn waker_clone(_: *const ()) -> RawWaker {
42 panic!("orchestration functions cannot perform asynchronous operations");
43}
44
45unsafe fn waker_wake(_: *const ()) {
46 panic!("orchestration functions cannot perform asynchronous operations");
47}
48
49unsafe fn waker_wake_by_ref(_: *const ()) {
50 panic!("orchestration functions cannot perform asynchronous operations");
51}
52
53unsafe fn waker_drop(_: *const ()) {}
54
55#[doc(hidden)]
56pub trait IntoValue {
57 fn into_value(self) -> Value;
58}
59
60impl IntoValue for () {
61 fn into_value(self) -> Value {
62 Value::Null
63 }
64}
65
66#[doc(hidden)]
68pub fn orchestrate<T>(
69 id: String,
70 func: impl Future<Output = T>,
71 state: Rc<RefCell<OrchestrationState>>,
72) -> InvocationResponse
73where
74 T: IntoValue,
75{
76 let waker = unsafe {
77 Waker::from_raw(RawWaker::new(
78 null(),
79 &RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop),
80 ))
81 };
82
83 match Future::poll(Box::pin(func).as_mut(), &mut Context::from_waker(&waker)) {
84 Poll::Ready(output) => {
85 state.borrow_mut().set_output(output.into_value());
86 }
87 Poll::Pending => {}
88 };
89
90 InvocationResponse {
91 invocation_id: id,
92 return_value: Some(TypedData {
93 data: Some(Data::Json(state.borrow().result())),
94 }),
95 result: Some(StatusResult {
96 status: Status::Success as i32,
97 ..Default::default()
98 }),
99 ..Default::default()
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::durable::{EventType, HistoryEvent};
107 use chrono::Utc;
108 use futures::future::FutureExt;
109 use std::{
110 future::Future,
111 ptr::null,
112 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
113 };
114
115 pub(crate) fn poll<F, T>(mut future: F) -> Poll<T>
116 where
117 F: Future<Output = T> + Unpin,
118 {
119 let waker = unsafe {
120 Waker::from_raw(RawWaker::new(
121 null(),
122 &RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop),
123 ))
124 };
125
126 future.poll_unpin(&mut Context::from_waker(&waker))
127 }
128
129 static mut TIMESTAMP_COUNTER: i64 = 0;
130
131 pub(crate) fn create_event(
132 event_type: EventType,
133 event_id: i32,
134 name: Option<String>,
135 result: Option<String>,
136 task_scheduled_id: Option<i32>,
137 ) -> HistoryEvent {
138 unsafe {
139 TIMESTAMP_COUNTER += 1;
140
141 let offset = chrono::Duration::nanoseconds(TIMESTAMP_COUNTER);
142
143 HistoryEvent {
144 event_type,
145 event_id,
146 is_played: true,
147 timestamp: Utc::now() + offset,
148 is_processed: false,
149 name,
150 input: None,
151 result,
152 task_scheduled_id,
153 instance_id: None,
154 reason: None,
155 details: None,
156 fire_at: None,
157 timer_id: None,
158 }
159 }
160 }
161}