Skip to main content

celestia_grpc_macros/
lib.rs

1#![doc = include_str!("../README.md")]
2
3extern crate proc_macro;
4
5use proc_macro::TokenStream;
6use proc_macro2::TokenStream as TokenStream2;
7use quote::{TokenStreamExt, quote};
8use syn::parse::{Parse, ParseStream};
9use syn::punctuated::Punctuated;
10use syn::{Attribute, FnArg, Ident, Signature, Token, parse_macro_input};
11
12#[derive(Debug)]
13struct GrpcMethod {
14    outer_attrs: Vec<Attribute>,
15    inner_attrs: Vec<Attribute>,
16    signature: Signature,
17    _terminating_semi: Token![;],
18}
19
20impl Parse for GrpcMethod {
21    fn parse(input: ParseStream) -> syn::Result<Self> {
22        Ok(GrpcMethod {
23            outer_attrs: input.call(Attribute::parse_outer)?,
24            inner_attrs: input.call(Attribute::parse_inner)?,
25            signature: input.parse()?,
26            _terminating_semi: input.parse()?,
27        })
28    }
29}
30
31impl GrpcMethod {
32    fn instantiate_method(&self, tonic_method: GrpcMethodAttribute) -> TokenStream2 {
33        let mut tokens = TokenStream2::new();
34
35        tokens.append_all(&self.inner_attrs);
36        tokens.append_all(&self.outer_attrs);
37
38        let grpc_client_struct = tonic_method.client;
39        let grpc_method_name = tonic_method.method;
40
41        let signature = self.signature.clone();
42        let params: Vec<_> = self
43            .signature
44            .inputs
45            .iter()
46            .filter_map(|arg| {
47                let FnArg::Typed(arg) = arg else {
48                    return None;
49                };
50                Some(&arg.pat)
51            })
52            .collect();
53
54        let method = quote! {
55            pub #signature {
56                let transports = self.inner.transports.clone();
57                let param = crate::grpc::IntoGrpcParam::into_parameter(( #( #params ),* ));
58
59                crate::grpc::AsyncGrpcCall::new(move |call_context: crate::grpc::Context| async move {
60                    // 256 mb, future proof as celestia blocks grow
61                    const MAX_MSG_SIZE: usize = 256 * 1024 * 1024;
62
63                    let mut last_error: Option<crate::Error> = None;
64                    let transport_snapshot = transports.load();
65
66                    for idx in 0..transport_snapshot.len() {
67                        let transport_url = transport_snapshot[idx].metadata.url.as_deref();
68                        // Get the transport's endpoint-specific context
69                        let transport_context = &transport_snapshot[idx].metadata.context;
70
71                        let transport = transport_snapshot[idx].clone();
72                        let mut client = #grpc_client_struct::new(transport)
73                            .max_decoding_message_size(MAX_MSG_SIZE)
74                            .max_encoding_message_size(MAX_MSG_SIZE);
75
76                        // Merge transport context with per-call context
77                        let mut merged_context = transport_context.clone();
78                        merged_context.extend(&call_context);
79
80                        let mut request = ::tonic::Request::from_parts(
81                            merged_context.metadata.clone(),
82                            ::tonic::Extensions::new(),
83                            ::std::clone::Clone::clone(&param),
84                        );
85
86                        if let Some(timeout) = merged_context.timeout {
87                            request.set_timeout(timeout);
88                        } else {
89                            request.set_timeout(::std::time::Duration::from_secs(30));
90                        }
91
92                        let fut = client.#grpc_method_name(request);
93
94                        #[cfg(target_arch = "wasm32")]
95                        let fut = ::send_wrapper::SendWrapper::new(fut);
96
97                        match fut.await {
98                            Ok(resp) => {
99                                if idx > 0 {
100                                    let mut new_transports = transport_snapshot.as_ref().clone();
101                                    new_transports.swap(0, idx);
102                                    transports.store(::std::sync::Arc::new(new_transports));
103                                }
104                                return crate::grpc::FromGrpcResponse::try_from_response(resp.into_inner());
105                            }
106                            Err(e) => {
107                                let error: crate::Error = e.into();
108                                if error.is_network_error() {
109                                    ::tracing::warn!(
110                                        "Transport {} failed with network error: {}",
111                                        transport_url.unwrap_or("unknown"),
112                                        error,
113                                    );
114                                    last_error = Some(error);
115                                    continue;
116                                }
117                                return Err(error);
118                            }
119                        }
120                    }
121
122                    Err(last_error.expect("at least one transport should be tried"))
123                })
124            }
125        };
126
127        tokens.extend(method);
128
129        tokens
130    }
131}
132
133#[derive(Debug)]
134struct GrpcMethodAttribute {
135    method: Ident,
136    client: Punctuated<Ident, Token![::]>,
137}
138
139impl Parse for GrpcMethodAttribute {
140    fn parse(input: ParseStream) -> syn::Result<Self> {
141        let mut parsed = Punctuated::<Ident, Token![::]>::parse_separated_nonempty(input)?;
142
143        let method = parsed.pop().expect("expected client method").into_value();
144        parsed.pop_punct();
145        let client = parsed;
146
147        Ok(GrpcMethodAttribute { method, client })
148    }
149}
150
151/// Annotate a function signature passing ServiceClient method to be called
152#[proc_macro_attribute]
153pub fn grpc_method(attr: TokenStream, item: TokenStream) -> TokenStream {
154    let attributes = parse_macro_input!(attr as GrpcMethodAttribute);
155    let method_sig = parse_macro_input!(item as GrpcMethod);
156
157    let method = method_sig.instantiate_method(attributes);
158
159    method.into()
160}