Skip to main content

durable_macros/
lib.rs

1use proc_macro::TokenStream;
2use quote::quote;
3use syn::{ItemFn, parse_macro_input};
4
5/// Marks a function as a durable workflow.
6///
7/// If the function has a single parameter `ctx: Ctx`, it is automatically
8/// registered for crash recovery via `inventory`. When `durable::init()`
9/// recovers a stale task whose name matches this function, it will be
10/// re-spawned automatically.
11///
12/// ```ignore
13/// #[durable::workflow]
14/// async fn daily_etl(ctx: Ctx) -> Result<EtlReport, DurableError> {
15///     let data = ctx.step("extract", || async { fetch().await }).await?;
16///     ctx.complete(&data).await?;
17///     Ok(data)
18/// }
19/// ```
20#[proc_macro_attribute]
21pub fn workflow(_attr: TokenStream, item: TokenStream) -> TokenStream {
22    let input_fn = parse_macro_input!(item as ItemFn);
23    let fn_name = &input_fn.sig.ident;
24    let fn_name_str = fn_name.to_string();
25    let vis = &input_fn.vis;
26    let sig = &input_fn.sig;
27    let body = &input_fn.block;
28    let attrs = &input_fn.attrs;
29
30    // Only emit auto-registration for functions with exactly 1 parameter
31    // whose type is `Ctx`. Multi-param or non-Ctx functions can't be
32    // auto-resumed since recovery only provides a Ctx.
33    let has_ctx_param = input_fn.sig.inputs.len() == 1
34        && matches!(&input_fn.sig.inputs[0], syn::FnArg::Typed(pat) if {
35            let ty = &*pat.ty;
36            matches!(ty, syn::Type::Path(tp) if tp.path.segments.last().is_some_and(|s| s.ident == "Ctx"))
37        });
38    let registration = if has_ctx_param {
39        let start_fn_name = syn::Ident::new(
40            &format!("start_{}", fn_name),
41            fn_name.span(),
42        );
43        quote! {
44            ::durable::inventory::submit! {
45                ::durable::WorkflowRegistration {
46                    name: #fn_name_str,
47                    resume_fn: |ctx| ::std::boxed::Box::pin(async move {
48                        let _ = #fn_name(ctx).await?;
49                        Ok(())
50                    }),
51                }
52            }
53
54            /// Auto-generated by `#[durable::workflow]`. Starts (or idempotently
55            /// attaches to) a root workflow, recording the handler name for crash
56            /// recovery.
57            #vis async fn #start_fn_name(
58                db: &::sea_orm::DatabaseConnection,
59                name: &str,
60                input: ::std::option::Option<::serde_json::Value>,
61            ) -> ::std::result::Result<::durable::Ctx, ::durable::DurableError> {
62                ::durable::Ctx::start_with_handler(db, name, input, ::std::option::Option::Some(#fn_name_str)).await
63            }
64        }
65    } else {
66        quote! {}
67    };
68
69    let expanded = quote! {
70        #(#attrs)*
71        #vis #sig {
72            let _workflow_name = #fn_name_str;
73            tracing::info!(workflow = _workflow_name, "workflow started");
74            let _result: Result<_, _> = async { #body }.await;
75            match &_result {
76                Ok(_) => tracing::info!(workflow = _workflow_name, "workflow completed"),
77                Err(e) => tracing::error!(workflow = _workflow_name, error = %e, "workflow failed"),
78            }
79            _result
80        }
81
82        #registration
83    };
84
85    TokenStream::from(expanded)
86}
87
88/// Marks a function as a durable step.
89///
90/// The first parameter must be `ctx: &Ctx`. The macro wraps the function
91/// to check for saved output, execute if needed, and save the result.
92///
93/// ```ignore
94/// #[durable::step]
95/// async fn fetch_data(ctx: &Ctx, url: &str) -> Result<Data> {
96///     // ...
97/// }
98/// ```
99#[proc_macro_attribute]
100pub fn step(_attr: TokenStream, item: TokenStream) -> TokenStream {
101    let input_fn = parse_macro_input!(item as ItemFn);
102    let fn_name = &input_fn.sig.ident;
103    let fn_name_str = fn_name.to_string();
104    let vis = &input_fn.vis;
105    let sig = &input_fn.sig;
106    let body = &input_fn.block;
107    let attrs = &input_fn.attrs;
108
109    let expanded = quote! {
110        #(#attrs)*
111        #vis #sig {
112            let _step_name = #fn_name_str;
113            tracing::debug!(step = _step_name, "step executing");
114            let _result: Result<_, _> = async { #body }.await;
115            match &_result {
116                Ok(_) => tracing::debug!(step = _step_name, "step completed"),
117                Err(e) => tracing::warn!(step = _step_name, error = %e, "step failed"),
118            }
119            _result
120        }
121    };
122
123    TokenStream::from(expanded)
124}