Skip to main content

ruststream_macros/
lib.rs

1//! Procedural macros for [RustStream](https://github.com/powersemmi/ruststream).
2//!
3//! Re-exported from the `ruststream` crate under the `macros` feature; depend on that rather than
4//! on this crate directly.
5
6use proc_macro::TokenStream;
7use proc_macro2::TokenStream as TokenStream2;
8use quote::quote;
9use syn::parse::{Parse, ParseStream};
10use syn::{
11    Attribute, DeriveInput, Expr, ExprCall, ExprLit, ExprPath, ExprStruct, FnArg, Ident, ItemFn,
12    Lit, LitStr, Meta, PatType, Path, ReturnType, Token, Type, TypePath, parenthesized,
13    parse_macro_input,
14};
15
16/// Arguments to `#[subscriber(..)]`: the subscription source (a string literal name, or a
17/// descriptor constructor `Type::new(..)` / `Type { .. }`) and an optional `publish("topic")`
18/// clause naming the reply destination.
19struct SubscriberArgs {
20    source: Expr,
21    publish: Option<LitStr>,
22}
23
24impl Parse for SubscriberArgs {
25    fn parse(input: ParseStream) -> syn::Result<Self> {
26        let source: Expr = input.parse()?;
27        let mut publish = None;
28        if input.peek(Token![,]) {
29            input.parse::<Token![,]>()?;
30            let keyword: Ident = input.parse()?;
31            if keyword != "publish" {
32                return Err(syn::Error::new(
33                    keyword.span(),
34                    "expected `publish(\"reply-topic\")`",
35                ));
36            }
37            let content;
38            parenthesized!(content in input);
39            publish = Some(content.parse()?);
40        }
41        Ok(Self { source, publish })
42    }
43}
44
45/// Derives the subscription `Source` type and a constructor expression from the macro argument.
46///
47/// A string literal `"orders"` becomes `(Name, Name::new("orders"))`; a constructor expression
48/// `RedisStream::new(..)` or `RedisStream { .. }` becomes `(RedisStream, <the expr verbatim>)` by
49/// pulling the type out of the call/struct path. Free functions (`redis::stream(..)`) and builder
50/// chains are rejected - their result type is not visible in the tokens.
51fn source_tokens(expr: &Expr) -> syn::Result<(TokenStream2, TokenStream2)> {
52    if let Expr::Lit(ExprLit {
53        lit: Lit::Str(name),
54        ..
55    }) = expr
56    {
57        return Ok((
58            quote!(::ruststream::Name),
59            quote!(::ruststream::Name::new(#name)),
60        ));
61    }
62
63    let ty: Type = match expr {
64        Expr::Call(ExprCall { func, .. }) => match &**func {
65            Expr::Path(ExprPath {
66                path, qself: None, ..
67            }) => type_from_constructor_path(path)?,
68            _ => return Err(unsupported_source(expr)),
69        },
70        Expr::Struct(ExprStruct { path, .. }) => Type::Path(TypePath {
71            qself: None,
72            path: path.clone(),
73        }),
74        _ => return Err(unsupported_source(expr)),
75    };
76    Ok((quote!(#ty), quote!(#expr)))
77}
78
79/// Builds the type from a constructor path by dropping the final segment (`Type::new` -> `Type`).
80fn type_from_constructor_path(path: &Path) -> syn::Result<Type> {
81    let n = path.segments.len();
82    if n < 2 {
83        return Err(syn::Error::new_spanned(
84            path,
85            "expected `Type::new(..)`: the path must name a type and an associated constructor",
86        ));
87    }
88    let segments = path.segments.iter().take(n - 1).cloned().collect();
89    Ok(Type::Path(TypePath {
90        qself: None,
91        path: Path {
92            leading_colon: path.leading_colon,
93            segments,
94        },
95    }))
96}
97
98fn unsupported_source(expr: &Expr) -> syn::Error {
99    syn::Error::new_spanned(
100        expr,
101        "expected a string literal name, `Type::new(..)`, or `Type { .. }` - \
102         free functions and builder chains do not expose their type to the macro",
103    )
104}
105
106/// Turns an `async fn` handler into a mountable subscriber definition.
107///
108/// ```ignore
109/// /// Processes incoming orders.
110/// #[subscriber("orders")]
111/// async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack }
112/// // later: broker_scope.include(handle, JsonCodec);
113///
114/// // reply form: the return value is encoded and published to "responses" through the
115/// // TypedPublisher (broker + reply codec) passed at wiring time.
116/// #[subscriber("requests", publish("responses"))]
117/// async fn reply(req: &Request) -> Response { /* ... */ }
118/// // later: broker_scope.include_publishing(reply, JsonCodec, typed_publisher);
119/// ```
120///
121/// Without `publish(..)` the handler returns any `IntoHandlerResult` (a `HandlerResult`, `()`, or
122/// `Result<_, E>`). With `publish(..)` it returns the reply value to publish.
123#[proc_macro_attribute]
124pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
125    let args = parse_macro_input!(attr as SubscriberArgs);
126    let func = parse_macro_input!(item as ItemFn);
127    expand(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
128}
129
130/// Generates a `main` entry point for a `RustStream` service.
131///
132/// Place it on a synchronous, argument-free function that builds and returns a `RustStream`
133/// application. The expansion keeps the function and adds a `main` that hands it to
134/// `ruststream::runtime::cli::run_main`, producing a binary that understands the `run` and
135/// `asyncapi gen` commands with no hand-written runtime boilerplate.
136///
137/// ```ignore
138/// #[ruststream::app]
139/// fn app() -> RustStream {
140///     RustStream::new(AppInfo::new("svc", "0.1.0")).register_broker(MemoryBroker::new())
141/// }
142/// ```
143#[proc_macro_attribute]
144pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
145    let func = parse_macro_input!(item as ItemFn);
146    expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
147}
148
149fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
150    if !attr.is_empty() {
151        return Err(syn::Error::new_spanned(
152            attr,
153            "#[ruststream::app] takes no arguments",
154        ));
155    }
156    if let Some(asyncness) = func.sig.asyncness {
157        return Err(syn::Error::new_spanned(
158            asyncness,
159            "#[ruststream::app] requires a synchronous builder returning `RustStream`",
160        ));
161    }
162    if !func.sig.inputs.is_empty() {
163        return Err(syn::Error::new_spanned(
164            &func.sig.inputs,
165            "#[ruststream::app] builder must take no arguments",
166        ));
167    }
168    let name = &func.sig.ident;
169    Ok(quote! {
170        #func
171
172        fn main() -> ::std::process::ExitCode {
173            ::ruststream::runtime::cli::run_main(#name)
174        }
175    }
176    .into())
177}
178
179fn expand(args: &SubscriberArgs, func: &ItemFn) -> syn::Result<TokenStream> {
180    let vis = &func.vis;
181    let name = &func.sig.ident;
182    let block = &func.block;
183
184    let first = func.sig.inputs.first().ok_or_else(|| {
185        syn::Error::new_spanned(
186            &func.sig,
187            "a #[subscriber] handler must take exactly one message parameter",
188        )
189    })?;
190    let FnArg::Typed(PatType { pat, ty, .. }) = first else {
191        return Err(syn::Error::new_spanned(
192            first,
193            "a #[subscriber] handler cannot take `self`",
194        ));
195    };
196    let Type::Reference(reference) = &**ty else {
197        return Err(syn::Error::new_spanned(
198            ty,
199            "the message parameter must be a reference `&T`",
200        ));
201    };
202    let input_ty = &reference.elem;
203    let description = doc_description(&func.attrs);
204    let (source_ty, source_expr) = source_tokens(&args.source)?;
205
206    // Captures the input type's JSON Schema for AsyncAPI when it implements `JsonSchema` (and the
207    // `asyncapi` feature is on), via the autoref-specialization probe; `None` otherwise. The
208    // concrete input type makes the trait selection resolve at the call site.
209    let input_schema = quote! {
210        fn input_schema(&self) -> ::core::option::Option<::std::string::String> {
211            #[allow(unused_imports)]
212            use ::ruststream::__private::NoSchemaProbe as _;
213            ::ruststream::__private::Probe::<#input_ty>::new().schema_json()
214        }
215    };
216
217    // Optional second handler parameter: the per-delivery `&mut Context`. If the user declares it,
218    // bind it to their name; otherwise generate an ignored binding.
219    let ctx_param = if let Some(FnArg::Typed(PatType { pat, .. })) = func.sig.inputs.get(1) {
220        quote!(#pat)
221    } else {
222        quote!(_ctx)
223    };
224
225    let body = if let Some(reply_topic) = &args.publish {
226        let reply_ty = match &func.sig.output {
227            ReturnType::Type(_, ty) => &**ty,
228            ReturnType::Default => {
229                return Err(syn::Error::new_spanned(
230                    &func.sig,
231                    "a publishing handler must return the reply value",
232                ));
233            }
234        };
235        quote! {
236            #[allow(non_camel_case_types)]
237            #vis struct #name;
238
239            impl ::ruststream::runtime::PublishingDef for #name {
240                type Input = #input_ty;
241                type Reply = #reply_ty;
242                type Source = #source_ty;
243
244                fn source(&self) -> Self::Source { #source_expr }
245                fn reply_name(&self) -> &str { #reply_topic }
246
247                fn description(&self) -> ::core::option::Option<&str> {
248                    #description
249                }
250
251                #input_schema
252
253                fn call(
254                    &self,
255                    #pat: &#input_ty,
256                ) -> impl ::core::future::Future<Output = #reply_ty> + ::core::marker::Send {
257                    async move #block
258                }
259            }
260        }
261    } else {
262        quote! {
263            #[derive(Clone, Copy)]
264            #[allow(non_camel_case_types)]
265            #vis struct #name;
266
267            impl ::ruststream::runtime::Handler<#input_ty> for #name {
268                async fn handle(
269                    &self,
270                    #pat: &#input_ty,
271                    #ctx_param: &mut ::ruststream::runtime::Context<'_>,
272                ) -> ::ruststream::runtime::HandlerResult {
273                    ::ruststream::runtime::IntoHandlerResult::into_handler_result(
274                        (async move #block).await,
275                    )
276                }
277            }
278
279            impl ::ruststream::runtime::SubscriberDef for #name {
280                type Input = #input_ty;
281                type Handler = Self;
282                type Source = #source_ty;
283
284                fn source(&self) -> Self::Source { #source_expr }
285
286                fn description(&self) -> ::core::option::Option<&str> {
287                    #description
288                }
289
290                #input_schema
291
292                fn into_handler(self) -> Self { self }
293            }
294        }
295    };
296
297    Ok(body.into())
298}
299
300/// Derives [`Message`](../ruststream/trait.Message.html) metadata: the type name and its doc
301/// comment.
302///
303/// ```ignore
304/// /// An order placed by a customer.
305/// #[derive(Message)]
306/// struct Order { id: u32 }
307/// // Order::NAME == "Order", Order::DESCRIPTION == Some("An order placed by a customer.")
308/// ```
309#[proc_macro_derive(Message)]
310pub fn derive_message(item: TokenStream) -> TokenStream {
311    let input = parse_macro_input!(item as DeriveInput);
312    let name = &input.ident;
313    let name_str = name.to_string();
314    let description = doc_description(&input.attrs);
315    let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
316
317    quote! {
318        impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
319            const NAME: &'static str = #name_str;
320            const DESCRIPTION: ::core::option::Option<&'static str> = #description;
321        }
322    }
323    .into()
324}
325
326/// Collects doc-comment lines from `attrs` into a single description literal, or `None`.
327fn doc_description(attrs: &[Attribute]) -> TokenStream2 {
328    let lines: Vec<String> = attrs
329        .iter()
330        .filter(|attr| attr.path().is_ident("doc"))
331        .filter_map(|attr| match &attr.meta {
332            Meta::NameValue(nv) => match &nv.value {
333                Expr::Lit(ExprLit {
334                    lit: Lit::Str(text),
335                    ..
336                }) => Some(text.value().trim().to_owned()),
337                _ => None,
338            },
339            _ => None,
340        })
341        .collect();
342
343    if lines.is_empty() {
344        quote!(::core::option::Option::None)
345    } else {
346        let joined = lines.join("\n");
347        quote!(::core::option::Option::Some(#joined))
348    }
349}