paladin_opkind_derive/
lib.rs

1//! A derive macro for the `RemoteExecute` trait.
2//!
3//! This construction enables operations to be serialized and executed by a
4//! remote service in an opaque manner. It uses the [`linkme`](https://docs.rs/linkme)
5//! crate to collect all operation execution pointers into a single slice that
6//! are gathered into a contiguous section of the binary by the linker.
7//!
8//! # Implementation details
9//!
10//! A globally unique identifier is assigned to each operation. Then, a unique
11//! function that handles deserialization, execution, and re-serialization of
12//! the result is generated for each operation. A pointer to this function is
13//! registered with the distributed slice at the index of the operation's
14//! identifier. This allows execution function to be dereferenced by only the
15//! operation identifier, which is serialized with the task.
16//!
17//! This scheme allows the tasks sent to workers to be opaque, while still
18//! enabling efficient lookup and execution of the appropriate operation.
19extern crate proc_macro;
20
21use std::sync::atomic::{AtomicU8, Ordering};
22
23use proc_macro::TokenStream;
24use quote::quote;
25use syn::{parse_macro_input, Attribute, DeriveInput, Error, Ident, Result};
26
27static OPERATION_ID_COUNTER: AtomicU8 = AtomicU8::new(0);
28
29/// Check if the `internal` attribute is present on the derive macro.
30///
31/// Quoted variables need to be slightly modified if the macro is being
32/// called from the `paladin` crate itself.
33fn get_is_internal(attrs: &mut Vec<Attribute>) -> Result<bool> {
34    let mut is_internal = None;
35    let mut errors: Option<Error> = None;
36
37    attrs.retain(|attr| {
38        if !attr.path().is_ident("paladin") {
39            return true;
40        }
41        if let Err(err) = attr.parse_nested_meta(|meta| {
42            if meta.path.is_ident("internal") {
43                if is_internal.is_some() {
44                    return Err(meta.error("duplicate paladin crate attribute"));
45                }
46
47                is_internal = Some(true);
48                Ok(())
49            } else {
50                Err(meta.error("unsupported paladin attribute"))
51            }
52        }) {
53            match &mut errors {
54                None => errors = Some(err),
55                Some(errors) => errors.combine(err),
56            }
57        }
58        false
59    });
60
61    match errors {
62        None => Ok(is_internal.unwrap_or(false)),
63        Some(errors) => Err(errors),
64    }
65}
66
67/// See the [module level documentation](crate) for more information.
68#[proc_macro_derive(RemoteExecute, attributes(paladin))]
69pub fn operation_derive(input: TokenStream) -> TokenStream {
70    let mut input = parse_macro_input!(input as DeriveInput);
71
72    let is_internal = match get_is_internal(&mut input.attrs) {
73        Ok(path) => path,
74        Err(err) => return err.to_compile_error().into(),
75    };
76
77    // The path to the `paladin` crate.
78    // If the derive macro is being called from the `paladin` crate itself, then
79    // the path is `crate`, otherwise it is `::paladin`.
80    let paladin_path = if is_internal {
81        quote! { crate }
82    } else {
83        quote! { ::paladin }
84    };
85
86    // If the derive macro is being called from the `paladin` crate itself, then
87    // the `linkme` attribute is not needed.
88    // Otherwise, we need to point the `linkme` attribute to the `paladin` so that
89    // consumers of the derive macro do not need to add the `linkme` crate as a
90    // dependency.
91    let linkme_path_override = if is_internal {
92        quote! {}
93    } else {
94        quote! {
95            #[linkme(crate=#paladin_path::__private::linkme)]
96        }
97    };
98
99    let name = &input.ident;
100    let generics = &input.generics;
101    let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
102    let operation_id = OPERATION_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
103    let function_name = Ident::new(&format!("__execute_{}", operation_id), name.span());
104
105    let expanded = quote! {
106        impl #impl_generics #paladin_path::operation::RemoteExecute for #name #ty_generics #where_clause {
107            const ID: u8 = #operation_id;
108        }
109
110        #[#paladin_path::__private::linkme::distributed_slice(#paladin_path::__private::OPERATIONS)]
111        #linkme_path_override
112        fn #function_name(task: #paladin_path::task::AnyTask) -> #paladin_path::__private::futures::future::BoxFuture<
113            'static,
114            #paladin_path::operation::Result<#paladin_path::task::AnyTaskOutput>
115        > {
116            Box::pin(async move {
117                // Define the execution logic in a closure so that it can be retried if it fails.
118                let get_result = || async {
119                    // Deserialize the operation.
120                    let op = #name::from_bytes(task.serializer, &task.op)?;
121
122                    // Deserialize the input.
123                    let input = op.input_from_bytes(task.serializer, &task.input)?;
124                    #paladin_path::__private::tracing::trace!(operation = %stringify!(#name), input = ?input, "executing operation");
125
126                    // Spawn a blocking task to execute the operation.
127                    let output = #paladin_path::__private::tokio::task::spawn_blocking(move || {
128                        // Execute the operation, catching panics.
129                        let typed_output = std::panic::catch_unwind(::std::panic::AssertUnwindSafe(||
130                            op.execute(input)
131                        ))
132                        // Convert panics to fatal operation errors.
133                        .map_err(|_| #paladin_path::operation::FatalError::from_str(
134                            &format!("operation {} panicked", stringify!(#name)),
135                            #paladin_path::operation::FatalStrategy::Terminate
136                        ))??;
137
138                        #paladin_path::__private::tracing::trace!(operation = %stringify!(#name), output = ?typed_output, "operation executed successfully");
139
140                        // Serialize the output.
141                        let serialized_output = op.output_to_bytes(task.serializer, typed_output)?;
142
143                        Ok(serialized_output) as #paladin_path::operation::Result<#paladin_path::__private::bytes::Bytes>
144                    })
145                    .await
146                    .map_err(|e| #paladin_path::operation::FatalError::new(
147                        e,
148                        #paladin_path::operation::FatalStrategy::Terminate
149                    ))??;
150
151                    Ok(output) as #paladin_path::operation::Result<#paladin_path::__private::bytes::Bytes>
152                };
153
154                let result = match get_result().await {
155                    Err(err) => {
156                        // If the operation failed, it according to the error's retry strategy.
157                        err.retry_trace(get_result, |e| {
158                            #paladin_path::__private::tracing::warn!(operation = %stringify!(#name), error = ?e, "transient operation failure");
159                        })
160                        .await
161                    },
162                    result => result
163                };
164
165                // Convert the typed result into an opaque task output.
166                result.map(|output| #paladin_path::task::AnyTaskOutput {
167                    metadata: task.metadata,
168                    output,
169                    serializer: task.serializer,
170                })
171            })
172        }
173    };
174
175    TokenStream::from(expanded)
176}