paladin_opkind_derive/
lib.rs1extern crate proc_macro;
20
21use std::sync::atomic::{AtomicU8, Ordering};
22
23use proc_macro::TokenStream;
24use quote::quote;
25use syn::{parse_macro_input, Attribute, DeriveInput, Error, Ident, Result};
26
27static OPERATION_ID_COUNTER: AtomicU8 = AtomicU8::new(0);
28
29fn get_is_internal(attrs: &mut Vec<Attribute>) -> Result<bool> {
34 let mut is_internal = None;
35 let mut errors: Option<Error> = None;
36
37 attrs.retain(|attr| {
38 if !attr.path().is_ident("paladin") {
39 return true;
40 }
41 if let Err(err) = attr.parse_nested_meta(|meta| {
42 if meta.path.is_ident("internal") {
43 if is_internal.is_some() {
44 return Err(meta.error("duplicate paladin crate attribute"));
45 }
46
47 is_internal = Some(true);
48 Ok(())
49 } else {
50 Err(meta.error("unsupported paladin attribute"))
51 }
52 }) {
53 match &mut errors {
54 None => errors = Some(err),
55 Some(errors) => errors.combine(err),
56 }
57 }
58 false
59 });
60
61 match errors {
62 None => Ok(is_internal.unwrap_or(false)),
63 Some(errors) => Err(errors),
64 }
65}
66
67#[proc_macro_derive(RemoteExecute, attributes(paladin))]
69pub fn operation_derive(input: TokenStream) -> TokenStream {
70 let mut input = parse_macro_input!(input as DeriveInput);
71
72 let is_internal = match get_is_internal(&mut input.attrs) {
73 Ok(path) => path,
74 Err(err) => return err.to_compile_error().into(),
75 };
76
77 let paladin_path = if is_internal {
81 quote! { crate }
82 } else {
83 quote! { ::paladin }
84 };
85
86 let linkme_path_override = if is_internal {
92 quote! {}
93 } else {
94 quote! {
95 #[linkme(crate=#paladin_path::__private::linkme)]
96 }
97 };
98
99 let name = &input.ident;
100 let generics = &input.generics;
101 let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
102 let operation_id = OPERATION_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
103 let function_name = Ident::new(&format!("__execute_{}", operation_id), name.span());
104
105 let expanded = quote! {
106 impl #impl_generics #paladin_path::operation::RemoteExecute for #name #ty_generics #where_clause {
107 const ID: u8 = #operation_id;
108 }
109
110 #[#paladin_path::__private::linkme::distributed_slice(#paladin_path::__private::OPERATIONS)]
111 #linkme_path_override
112 fn #function_name(task: #paladin_path::task::AnyTask) -> #paladin_path::__private::futures::future::BoxFuture<
113 'static,
114 #paladin_path::operation::Result<#paladin_path::task::AnyTaskOutput>
115 > {
116 Box::pin(async move {
117 let get_result = || async {
119 let op = #name::from_bytes(task.serializer, &task.op)?;
121
122 let input = op.input_from_bytes(task.serializer, &task.input)?;
124 #paladin_path::__private::tracing::trace!(operation = %stringify!(#name), input = ?input, "executing operation");
125
126 let output = #paladin_path::__private::tokio::task::spawn_blocking(move || {
128 let typed_output = std::panic::catch_unwind(::std::panic::AssertUnwindSafe(||
130 op.execute(input)
131 ))
132 .map_err(|_| #paladin_path::operation::FatalError::from_str(
134 &format!("operation {} panicked", stringify!(#name)),
135 #paladin_path::operation::FatalStrategy::Terminate
136 ))??;
137
138 #paladin_path::__private::tracing::trace!(operation = %stringify!(#name), output = ?typed_output, "operation executed successfully");
139
140 let serialized_output = op.output_to_bytes(task.serializer, typed_output)?;
142
143 Ok(serialized_output) as #paladin_path::operation::Result<#paladin_path::__private::bytes::Bytes>
144 })
145 .await
146 .map_err(|e| #paladin_path::operation::FatalError::new(
147 e,
148 #paladin_path::operation::FatalStrategy::Terminate
149 ))??;
150
151 Ok(output) as #paladin_path::operation::Result<#paladin_path::__private::bytes::Bytes>
152 };
153
154 let result = match get_result().await {
155 Err(err) => {
156 err.retry_trace(get_result, |e| {
158 #paladin_path::__private::tracing::warn!(operation = %stringify!(#name), error = ?e, "transient operation failure");
159 })
160 .await
161 },
162 result => result
163 };
164
165 result.map(|output| #paladin_path::task::AnyTaskOutput {
167 metadata: task.metadata,
168 output,
169 serializer: task.serializer,
170 })
171 })
172 }
173 };
174
175 TokenStream::from(expanded)
176}