Skip to main content

durable_macros/
lib.rs

1use proc_macro::TokenStream;
2use quote::quote;
3use syn::{ItemFn, parse_macro_input};
4
5/// Marks a function as a durable workflow.
6///
7/// If the function has a single parameter `ctx: Ctx`, it is automatically
8/// registered for crash recovery via `inventory`. When `durable::init()`
9/// recovers a stale task whose name matches this function, it will be
10/// re-spawned automatically.
11///
12/// ```ignore
13/// #[durable::workflow]
14/// async fn daily_etl(ctx: Ctx) -> Result<EtlReport, DurableError> {
15///     let data = ctx.step("extract", || async { fetch().await }).await?;
16///     ctx.complete(&data).await?;
17///     Ok(data)
18/// }
19/// ```
20#[proc_macro_attribute]
21pub fn workflow(_attr: TokenStream, item: TokenStream) -> TokenStream {
22    let input_fn = parse_macro_input!(item as ItemFn);
23    let fn_name = &input_fn.sig.ident;
24    let fn_name_str = fn_name.to_string();
25    let vis = &input_fn.vis;
26    let sig = &input_fn.sig;
27    let body = &input_fn.block;
28    let attrs = &input_fn.attrs;
29
30    // Only emit auto-registration for functions with exactly 1 parameter
31    // whose type is `Ctx`. Multi-param or non-Ctx functions can't be
32    // auto-resumed since recovery only provides a Ctx.
33    let has_ctx_param = input_fn.sig.inputs.len() == 1
34        && matches!(&input_fn.sig.inputs[0], syn::FnArg::Typed(pat) if {
35            let ty = &*pat.ty;
36            matches!(ty, syn::Type::Path(tp) if tp.path.segments.last().is_some_and(|s| s.ident == "Ctx"))
37        });
38    let registration = if has_ctx_param {
39        quote! {
40            ::durable::inventory::submit! {
41                ::durable::WorkflowRegistration {
42                    name: #fn_name_str,
43                    resume_fn: |ctx| ::std::boxed::Box::pin(async move {
44                        let _ = #fn_name(ctx).await?;
45                        Ok(())
46                    }),
47                }
48            }
49        }
50    } else {
51        quote! {}
52    };
53
54    let expanded = quote! {
55        #(#attrs)*
56        #vis #sig {
57            let _workflow_name = #fn_name_str;
58            tracing::info!(workflow = _workflow_name, "workflow started");
59            let _result: Result<_, _> = async { #body }.await;
60            match &_result {
61                Ok(_) => tracing::info!(workflow = _workflow_name, "workflow completed"),
62                Err(e) => tracing::error!(workflow = _workflow_name, error = %e, "workflow failed"),
63            }
64            _result
65        }
66
67        #registration
68    };
69
70    TokenStream::from(expanded)
71}
72
73/// Marks a function as a durable step.
74///
75/// The first parameter must be `ctx: &Ctx`. The macro wraps the function
76/// to check for saved output, execute if needed, and save the result.
77///
78/// ```ignore
79/// #[durable::step]
80/// async fn fetch_data(ctx: &Ctx, url: &str) -> Result<Data> {
81///     // ...
82/// }
83/// ```
84#[proc_macro_attribute]
85pub fn step(_attr: TokenStream, item: TokenStream) -> TokenStream {
86    let input_fn = parse_macro_input!(item as ItemFn);
87    let fn_name = &input_fn.sig.ident;
88    let fn_name_str = fn_name.to_string();
89    let vis = &input_fn.vis;
90    let sig = &input_fn.sig;
91    let body = &input_fn.block;
92    let attrs = &input_fn.attrs;
93
94    let expanded = quote! {
95        #(#attrs)*
96        #vis #sig {
97            let _step_name = #fn_name_str;
98            tracing::debug!(step = _step_name, "step executing");
99            let _result: Result<_, _> = async { #body }.await;
100            match &_result {
101                Ok(_) => tracing::debug!(step = _step_name, "step completed"),
102                Err(e) => tracing::warn!(step = _step_name, error = %e, "step failed"),
103            }
104            _result
105        }
106    };
107
108    TokenStream::from(expanded)
109}