azure_functions/
durable.rs

1//! Module for Durable Functions types.
2use crate::rpc::{
3    status_result::Status, typed_data::Data, InvocationResponse, StatusResult, TypedData,
4};
5use serde_json::Value;
6use std::{
7    cell::RefCell,
8    future::Future,
9    ptr::null,
10    rc::Rc,
11    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12};
13
14mod action_future;
15mod actions;
16mod activity_output;
17mod history;
18mod join_all;
19mod orchestration_output;
20mod orchestration_state;
21mod select_all;
22
23pub use self::action_future::*;
24pub use self::actions::*;
25pub use self::activity_output::*;
26pub(crate) use self::history::*;
27pub use self::join_all::*;
28pub use self::orchestration_output::*;
29pub use self::orchestration_state::*;
30pub use self::select_all::*;
31
32/// Represents a Future returned by the orchestration context.
33pub trait OrchestrationFuture: Future {
34    #[doc(hidden)]
35    fn notify_inner(&mut self);
36
37    #[doc(hidden)]
38    fn event_index(&self) -> Option<usize>;
39}
40
41unsafe fn waker_clone(_: *const ()) -> RawWaker {
42    panic!("orchestration functions cannot perform asynchronous operations");
43}
44
45unsafe fn waker_wake(_: *const ()) {
46    panic!("orchestration functions cannot perform asynchronous operations");
47}
48
49unsafe fn waker_wake_by_ref(_: *const ()) {
50    panic!("orchestration functions cannot perform asynchronous operations");
51}
52
53unsafe fn waker_drop(_: *const ()) {}
54
55#[doc(hidden)]
56pub trait IntoValue {
57    fn into_value(self) -> Value;
58}
59
60impl IntoValue for () {
61    fn into_value(self) -> Value {
62        Value::Null
63    }
64}
65
66/// The entrypoint for orchestration functions.
67#[doc(hidden)]
68pub fn orchestrate<T>(
69    id: String,
70    func: impl Future<Output = T>,
71    state: Rc<RefCell<OrchestrationState>>,
72) -> InvocationResponse
73where
74    T: IntoValue,
75{
76    let waker = unsafe {
77        Waker::from_raw(RawWaker::new(
78            null(),
79            &RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop),
80        ))
81    };
82
83    match Future::poll(Box::pin(func).as_mut(), &mut Context::from_waker(&waker)) {
84        Poll::Ready(output) => {
85            state.borrow_mut().set_output(output.into_value());
86        }
87        Poll::Pending => {}
88    };
89
90    InvocationResponse {
91        invocation_id: id,
92        return_value: Some(TypedData {
93            data: Some(Data::Json(state.borrow().result())),
94        }),
95        result: Some(StatusResult {
96            status: Status::Success as i32,
97            ..Default::default()
98        }),
99        ..Default::default()
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::durable::{EventType, HistoryEvent};
107    use chrono::Utc;
108    use futures::future::FutureExt;
109    use std::{
110        future::Future,
111        ptr::null,
112        task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
113    };
114
115    pub(crate) fn poll<F, T>(mut future: F) -> Poll<T>
116    where
117        F: Future<Output = T> + Unpin,
118    {
119        let waker = unsafe {
120            Waker::from_raw(RawWaker::new(
121                null(),
122                &RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop),
123            ))
124        };
125
126        future.poll_unpin(&mut Context::from_waker(&waker))
127    }
128
129    static mut TIMESTAMP_COUNTER: i64 = 0;
130
131    pub(crate) fn create_event(
132        event_type: EventType,
133        event_id: i32,
134        name: Option<String>,
135        result: Option<String>,
136        task_scheduled_id: Option<i32>,
137    ) -> HistoryEvent {
138        unsafe {
139            TIMESTAMP_COUNTER += 1;
140
141            let offset = chrono::Duration::nanoseconds(TIMESTAMP_COUNTER);
142
143            HistoryEvent {
144                event_type,
145                event_id,
146                is_played: true,
147                timestamp: Utc::now() + offset,
148                is_processed: false,
149                name,
150                input: None,
151                result,
152                task_scheduled_id,
153                instance_id: None,
154                reason: None,
155                details: None,
156                fire_at: None,
157                timer_id: None,
158            }
159        }
160    }
161}