celestia_grpc_macros/
lib.rs1#![doc = include_str!("../README.md")]
2
3extern crate proc_macro;
4
5use proc_macro::TokenStream;
6use proc_macro2::TokenStream as TokenStream2;
7use quote::{TokenStreamExt, quote};
8use syn::parse::{Parse, ParseStream};
9use syn::punctuated::Punctuated;
10use syn::{Attribute, FnArg, Ident, Signature, Token, parse_macro_input};
11
12#[derive(Debug)]
13struct GrpcMethod {
14 outer_attrs: Vec<Attribute>,
15 inner_attrs: Vec<Attribute>,
16 signature: Signature,
17 _terminating_semi: Token![;],
18}
19
20impl Parse for GrpcMethod {
21 fn parse(input: ParseStream) -> syn::Result<Self> {
22 Ok(GrpcMethod {
23 outer_attrs: input.call(Attribute::parse_outer)?,
24 inner_attrs: input.call(Attribute::parse_inner)?,
25 signature: input.parse()?,
26 _terminating_semi: input.parse()?,
27 })
28 }
29}
30
31impl GrpcMethod {
32 fn instantiate_method(&self, tonic_method: GrpcMethodAttribute) -> TokenStream2 {
33 let mut tokens = TokenStream2::new();
34
35 tokens.append_all(&self.inner_attrs);
36 tokens.append_all(&self.outer_attrs);
37
38 let grpc_client_struct = tonic_method.client;
39 let grpc_method_name = tonic_method.method;
40
41 let signature = self.signature.clone();
42 let params: Vec<_> = self
43 .signature
44 .inputs
45 .iter()
46 .filter_map(|arg| {
47 let FnArg::Typed(arg) = arg else {
48 return None;
49 };
50 Some(&arg.pat)
51 })
52 .collect();
53
54 let method = quote! {
55 pub #signature {
56 let transports = self.inner.transports.clone();
57 let param = crate::grpc::IntoGrpcParam::into_parameter(( #( #params ),* ));
58
59 crate::grpc::AsyncGrpcCall::new(move |call_context: crate::grpc::Context| async move {
60 const MAX_MSG_SIZE: usize = 256 * 1024 * 1024;
62
63 let mut last_error: Option<crate::Error> = None;
64 let transport_snapshot = transports.load();
65
66 for idx in 0..transport_snapshot.len() {
67 let transport_url = transport_snapshot[idx].metadata.url.as_deref();
68 let transport_context = &transport_snapshot[idx].metadata.context;
70
71 let transport = transport_snapshot[idx].clone();
72 let mut client = #grpc_client_struct::new(transport)
73 .max_decoding_message_size(MAX_MSG_SIZE)
74 .max_encoding_message_size(MAX_MSG_SIZE);
75
76 let mut merged_context = transport_context.clone();
78 merged_context.extend(&call_context);
79
80 let mut request = ::tonic::Request::from_parts(
81 merged_context.metadata.clone(),
82 ::tonic::Extensions::new(),
83 ::std::clone::Clone::clone(¶m),
84 );
85
86 if let Some(timeout) = merged_context.timeout {
87 request.set_timeout(timeout);
88 } else {
89 request.set_timeout(::std::time::Duration::from_secs(30));
90 }
91
92 let fut = client.#grpc_method_name(request);
93
94 #[cfg(target_arch = "wasm32")]
95 let fut = ::send_wrapper::SendWrapper::new(fut);
96
97 match fut.await {
98 Ok(resp) => {
99 if idx > 0 {
100 let mut new_transports = transport_snapshot.as_ref().clone();
101 new_transports.swap(0, idx);
102 transports.store(::std::sync::Arc::new(new_transports));
103 }
104 return crate::grpc::FromGrpcResponse::try_from_response(resp.into_inner());
105 }
106 Err(e) => {
107 let error: crate::Error = e.into();
108 if error.is_network_error() {
109 ::tracing::warn!(
110 "Transport {} failed with network error: {}",
111 transport_url.unwrap_or("unknown"),
112 error,
113 );
114 last_error = Some(error);
115 continue;
116 }
117 return Err(error);
118 }
119 }
120 }
121
122 Err(last_error.expect("at least one transport should be tried"))
123 })
124 }
125 };
126
127 tokens.extend(method);
128
129 tokens
130 }
131}
132
133#[derive(Debug)]
134struct GrpcMethodAttribute {
135 method: Ident,
136 client: Punctuated<Ident, Token![::]>,
137}
138
139impl Parse for GrpcMethodAttribute {
140 fn parse(input: ParseStream) -> syn::Result<Self> {
141 let mut parsed = Punctuated::<Ident, Token![::]>::parse_separated_nonempty(input)?;
142
143 let method = parsed.pop().expect("expected client method").into_value();
144 parsed.pop_punct();
145 let client = parsed;
146
147 Ok(GrpcMethodAttribute { method, client })
148 }
149}
150
151#[proc_macro_attribute]
153pub fn grpc_method(attr: TokenStream, item: TokenStream) -> TokenStream {
154 let attributes = parse_macro_input!(attr as GrpcMethodAttribute);
155 let method_sig = parse_macro_input!(item as GrpcMethod);
156
157 let method = method_sig.instantiate_method(attributes);
158
159 method.into()
160}