Skip to main content

ruststream_macros/
lib.rs

1//! Procedural macros for [RustStream](https://github.com/powersemmi/ruststream).
2//!
3//! Re-exported from the `ruststream` crate under the `macros` feature; depend on that rather than
4//! on this crate directly.
5
6use proc_macro::TokenStream;
7use proc_macro2::TokenStream as TokenStream2;
8use quote::quote;
9use syn::parse::{Parse, ParseStream};
10use syn::{
11    Attribute, DeriveInput, Expr, ExprCall, ExprLit, ExprMethodCall, ExprPath, ExprStruct, FnArg,
12    Ident, ItemFn, Lit, LitStr, Meta, PatType, Path, ReturnType, Token, Type, TypePath,
13    parenthesized, parse_macro_input,
14};
15
16/// Arguments to `#[subscriber(..)]`: the subscription source (a string literal name, or a
17/// descriptor constructor `Type::new(..)` / `Type { .. }`) and an optional `publish("topic")`
18/// clause naming the reply destination.
19struct SubscriberArgs {
20    source: Expr,
21    publish: Option<LitStr>,
22}
23
24impl Parse for SubscriberArgs {
25    fn parse(input: ParseStream) -> syn::Result<Self> {
26        let source: Expr = input.parse()?;
27        let mut publish = None;
28        if input.peek(Token![,]) {
29            input.parse::<Token![,]>()?;
30            let keyword: Ident = input.parse()?;
31            if keyword != "publish" {
32                return Err(syn::Error::new(
33                    keyword.span(),
34                    "expected `publish(\"reply-topic\")`",
35                ));
36            }
37            let content;
38            parenthesized!(content in input);
39            publish = Some(content.parse()?);
40        }
41        Ok(Self { source, publish })
42    }
43}
44
45/// Derives the subscription `Source` type and a constructor expression from the macro argument.
46///
47/// A string literal `"orders"` becomes `(Name, Name::new("orders"))`; a constructor expression
48/// `RedisStream::new(..)` or `RedisStream { .. }` becomes `(RedisStream, <the expr verbatim>)` by
49/// pulling the type out of the call/struct path. A builder chain
50/// `SubscribeOptions::new(..).jetstream(..)` is followed down its receivers to that base
51/// constructor, so fluent options that return `Self` can be written inline. Free functions
52/// (`redis::stream(..)`) are still rejected - their result type is not visible in the tokens.
53fn source_tokens(expr: &Expr) -> syn::Result<(TokenStream2, TokenStream2)> {
54    if let Expr::Lit(ExprLit {
55        lit: Lit::Str(name),
56        ..
57    }) = expr
58    {
59        return Ok((
60            quote!(::ruststream::Name),
61            quote!(::ruststream::Name::new(#name)),
62        ));
63    }
64
65    let ty = source_type(expr)?;
66    Ok((quote!(#ty), quote!(#expr)))
67}
68
69/// Recovers the source type from a constructor expression, following a builder chain's receivers
70/// down to the base `Type::new(..)` / `Type { .. }`. Methods in the chain are assumed to return
71/// `Self`; a builder that returns a different type produces a type-mismatch the user can see and
72/// fix. Free functions and other shapes are rejected (their type is not visible in the tokens).
73fn source_type(expr: &Expr) -> syn::Result<Type> {
74    match expr {
75        Expr::Call(ExprCall { func, .. }) => match &**func {
76            Expr::Path(ExprPath {
77                path, qself: None, ..
78            }) => type_from_constructor_path(path),
79            _ => Err(unsupported_source(expr)),
80        },
81        Expr::Struct(ExprStruct { path, .. }) => Ok(Type::Path(TypePath {
82            qself: None,
83            path: path.clone(),
84        })),
85        Expr::MethodCall(ExprMethodCall { receiver, .. }) => source_type(receiver),
86        _ => Err(unsupported_source(expr)),
87    }
88}
89
90/// Builds the type from a constructor path by dropping the final segment (`Type::new` -> `Type`).
91fn type_from_constructor_path(path: &Path) -> syn::Result<Type> {
92    let n = path.segments.len();
93    if n < 2 {
94        return Err(syn::Error::new_spanned(
95            path,
96            "expected `Type::new(..)`: the path must name a type and an associated constructor",
97        ));
98    }
99    let segments = path.segments.iter().take(n - 1).cloned().collect();
100    Ok(Type::Path(TypePath {
101        qself: None,
102        path: Path {
103            leading_colon: path.leading_colon,
104            segments,
105        },
106    }))
107}
108
109fn unsupported_source(expr: &Expr) -> syn::Error {
110    syn::Error::new_spanned(
111        expr,
112        "expected a string literal name, `Type::new(..)`, `Type { .. }`, or a builder chain on \
113         one of those - a free function does not expose its type to the macro",
114    )
115}
116
117/// Turns an `async fn` handler into a mountable subscriber definition.
118///
119/// ```ignore
120/// /// Processes incoming orders.
121/// #[subscriber("orders")]
122/// async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack }
123/// // later: broker_scope.include(handle, JsonCodec);
124///
125/// // reply form: the return value is encoded and published to "responses" through the
126/// // TypedPublisher (broker + reply codec) passed at wiring time.
127/// #[subscriber("requests", publish("responses"))]
128/// async fn reply(req: &Request) -> Response { /* ... */ }
129/// // later: broker_scope.include_publishing(reply, JsonCodec, typed_publisher);
130/// ```
131///
132/// Without `publish(..)` the handler returns any `IntoHandlerResult` (a `HandlerResult`, `()`, or
133/// `Result<_, E>`). With `publish(..)` it returns the reply value to publish.
134#[proc_macro_attribute]
135pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
136    let args = parse_macro_input!(attr as SubscriberArgs);
137    let func = parse_macro_input!(item as ItemFn);
138    expand(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
139}
140
141/// Generates a `main` entry point for a `RustStream` service.
142///
143/// Place it on a synchronous, argument-free function that builds and returns a `RustStream`
144/// application. The expansion keeps the function and adds a `main` that hands it to
145/// `ruststream::runtime::cli::run_main`, producing a binary that understands the `run` and
146/// `asyncapi gen` commands with no hand-written runtime boilerplate.
147///
148/// ```ignore
149/// #[ruststream::app]
150/// fn app() -> RustStream {
151///     RustStream::new(AppInfo::new("svc", "0.1.0")).register_broker(MemoryBroker::new())
152/// }
153/// ```
154#[proc_macro_attribute]
155pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
156    let func = parse_macro_input!(item as ItemFn);
157    expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
158}
159
160fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
161    if !attr.is_empty() {
162        return Err(syn::Error::new_spanned(
163            attr,
164            "#[ruststream::app] takes no arguments",
165        ));
166    }
167    if let Some(asyncness) = func.sig.asyncness {
168        return Err(syn::Error::new_spanned(
169            asyncness,
170            "#[ruststream::app] requires a synchronous builder returning `RustStream`",
171        ));
172    }
173    if !func.sig.inputs.is_empty() {
174        return Err(syn::Error::new_spanned(
175            &func.sig.inputs,
176            "#[ruststream::app] builder must take no arguments",
177        ));
178    }
179    let name = &func.sig.ident;
180    Ok(quote! {
181        #func
182
183        fn main() -> ::std::process::ExitCode {
184            ::ruststream::runtime::cli::run_main(#name)
185        }
186    }
187    .into())
188}
189
190fn expand(args: &SubscriberArgs, func: &ItemFn) -> syn::Result<TokenStream> {
191    let vis = &func.vis;
192    let name = &func.sig.ident;
193    let block = &func.block;
194
195    let first = func.sig.inputs.first().ok_or_else(|| {
196        syn::Error::new_spanned(
197            &func.sig,
198            "a #[subscriber] handler must take exactly one message parameter",
199        )
200    })?;
201    let FnArg::Typed(PatType { pat, ty, .. }) = first else {
202        return Err(syn::Error::new_spanned(
203            first,
204            "a #[subscriber] handler cannot take `self`",
205        ));
206    };
207    let Type::Reference(reference) = &**ty else {
208        return Err(syn::Error::new_spanned(
209            ty,
210            "the message parameter must be a reference `&T`",
211        ));
212    };
213    let input_ty = &reference.elem;
214    let description = doc_description(&func.attrs);
215    let (source_ty, source_expr) = source_tokens(&args.source)?;
216
217    // Captures the input type's JSON Schema for AsyncAPI when it implements `JsonSchema` (and the
218    // `asyncapi` feature is on), via the autoref-specialization probe; `None` otherwise. The
219    // concrete input type makes the trait selection resolve at the call site.
220    let input_schema = quote! {
221        fn input_schema(&self) -> ::core::option::Option<::std::string::String> {
222            #[allow(unused_imports)]
223            use ::ruststream::__private::NoSchemaProbe as _;
224            ::ruststream::__private::Probe::<#input_ty>::new().schema_json()
225        }
226    };
227
228    // Optional second handler parameter: the per-delivery `&mut Context`. If the user declares it,
229    // bind it to their name; otherwise generate an ignored binding.
230    let ctx_param = if let Some(FnArg::Typed(PatType { pat, .. })) = func.sig.inputs.get(1) {
231        quote!(#pat)
232    } else {
233        quote!(_ctx)
234    };
235
236    let body = if let Some(reply_topic) = &args.publish {
237        let reply_ty = match &func.sig.output {
238            ReturnType::Type(_, ty) => &**ty,
239            ReturnType::Default => {
240                return Err(syn::Error::new_spanned(
241                    &func.sig,
242                    "a publishing handler must return the reply value",
243                ));
244            }
245        };
246        quote! {
247            #[allow(non_camel_case_types)]
248            #vis struct #name;
249
250            impl ::ruststream::runtime::PublishingDef for #name {
251                type Input = #input_ty;
252                type Reply = #reply_ty;
253                type Source = #source_ty;
254
255                fn source(&self) -> Self::Source { #source_expr }
256                fn reply_name(&self) -> &str { #reply_topic }
257
258                fn description(&self) -> ::core::option::Option<&str> {
259                    #description
260                }
261
262                #input_schema
263
264                fn call(
265                    &self,
266                    #pat: &#input_ty,
267                ) -> impl ::core::future::Future<Output = #reply_ty> + ::core::marker::Send {
268                    async move #block
269                }
270            }
271        }
272    } else {
273        quote! {
274            #[derive(Clone, Copy)]
275            #[allow(non_camel_case_types)]
276            #vis struct #name;
277
278            impl ::ruststream::runtime::Handler<#input_ty> for #name {
279                async fn handle(
280                    &self,
281                    #pat: &#input_ty,
282                    #ctx_param: &mut ::ruststream::runtime::Context<'_>,
283                ) -> ::ruststream::runtime::HandlerResult {
284                    ::ruststream::runtime::IntoHandlerResult::into_handler_result(
285                        (async move #block).await,
286                    )
287                }
288            }
289
290            impl ::ruststream::runtime::SubscriberDef for #name {
291                type Input = #input_ty;
292                type Handler = Self;
293                type Source = #source_ty;
294
295                fn source(&self) -> Self::Source { #source_expr }
296
297                fn description(&self) -> ::core::option::Option<&str> {
298                    #description
299                }
300
301                #input_schema
302
303                fn into_handler(self) -> Self { self }
304            }
305        }
306    };
307
308    Ok(body.into())
309}
310
311/// Derives [`Message`](../ruststream/trait.Message.html) metadata: the type name and its doc
312/// comment.
313///
314/// ```ignore
315/// /// An order placed by a customer.
316/// #[derive(Message)]
317/// struct Order { id: u32 }
318/// // Order::NAME == "Order", Order::DESCRIPTION == Some("An order placed by a customer.")
319/// ```
320#[proc_macro_derive(Message)]
321pub fn derive_message(item: TokenStream) -> TokenStream {
322    let input = parse_macro_input!(item as DeriveInput);
323    let name = &input.ident;
324    let name_str = name.to_string();
325    let description = doc_description(&input.attrs);
326    let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
327
328    quote! {
329        impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
330            const NAME: &'static str = #name_str;
331            const DESCRIPTION: ::core::option::Option<&'static str> = #description;
332        }
333    }
334    .into()
335}
336
337/// Collects doc-comment lines from `attrs` into a single description literal, or `None`.
338fn doc_description(attrs: &[Attribute]) -> TokenStream2 {
339    let lines: Vec<String> = attrs
340        .iter()
341        .filter(|attr| attr.path().is_ident("doc"))
342        .filter_map(|attr| match &attr.meta {
343            Meta::NameValue(nv) => match &nv.value {
344                Expr::Lit(ExprLit {
345                    lit: Lit::Str(text),
346                    ..
347                }) => Some(text.value().trim().to_owned()),
348                _ => None,
349            },
350            _ => None,
351        })
352        .collect();
353
354    if lines.is_empty() {
355        quote!(::core::option::Option::None)
356    } else {
357        let joined = lines.join("\n");
358        quote!(::core::option::Option::Some(#joined))
359    }
360}