Skip to main content

ruststream_macros/
lib.rs

1//! Procedural macros for [RustStream](https://github.com/powersemmi/ruststream).
2//!
3//! Re-exported from the `ruststream` crate under the `macros` feature; depend on that rather than
4//! on this crate directly.
5
6use proc_macro::TokenStream;
7use proc_macro2::TokenStream as TokenStream2;
8use quote::quote;
9use syn::parse::{Parse, ParseStream};
10use syn::{
11    Attribute, DeriveInput, Expr, ExprCall, ExprLit, ExprMethodCall, ExprPath, ExprStruct, FnArg,
12    Ident, ItemFn, Lit, LitStr, Meta, PatType, Path, ReturnType, Token, Type, TypePath,
13    parenthesized, parse_macro_input,
14};
15
16/// Arguments to `#[subscriber(..)]`: the subscription source (a string literal name, or a
17/// descriptor constructor `Type::new(..)` / `Type { .. }`) and an optional `publish("topic")`
18/// clause naming the reply destination.
19struct SubscriberArgs {
20    source: Expr,
21    publish: Option<LitStr>,
22}
23
24impl Parse for SubscriberArgs {
25    fn parse(input: ParseStream) -> syn::Result<Self> {
26        let source: Expr = input.parse()?;
27        let mut publish = None;
28        if input.peek(Token![,]) {
29            input.parse::<Token![,]>()?;
30            let keyword: Ident = input.parse()?;
31            if keyword != "publish" {
32                return Err(syn::Error::new(
33                    keyword.span(),
34                    "expected `publish(\"reply-topic\")`",
35                ));
36            }
37            let content;
38            parenthesized!(content in input);
39            publish = Some(content.parse()?);
40        }
41        Ok(Self { source, publish })
42    }
43}
44
45/// Derives the subscription `Source` type and a constructor expression from the macro argument.
46///
47/// A string literal `"orders"` becomes `(Name, Name::new("orders"))`; a constructor expression
48/// `RedisStream::new(..)` or `RedisStream { .. }` becomes `(RedisStream, <the expr verbatim>)` by
49/// pulling the type out of the call/struct path. A builder chain
50/// `SubscribeOptions::new(..).jetstream(..)` is followed down its receivers to that base
51/// constructor, so fluent options that return `Self` can be written inline. Free functions
52/// (`redis::stream(..)`) are still rejected - their result type is not visible in the tokens.
53fn source_tokens(expr: &Expr) -> syn::Result<(TokenStream2, TokenStream2)> {
54    if let Expr::Lit(ExprLit {
55        lit: Lit::Str(name),
56        ..
57    }) = expr
58    {
59        return Ok((
60            quote!(::ruststream::Name),
61            quote!(::ruststream::Name::new(#name)),
62        ));
63    }
64
65    let ty = source_type(expr)?;
66    Ok((quote!(#ty), quote!(#expr)))
67}
68
69/// Recovers the source type from a constructor expression, following a builder chain's receivers
70/// down to the base `Type::new(..)` / `Type { .. }`. Methods in the chain are assumed to return
71/// `Self`; a builder that returns a different type produces a type-mismatch the user can see and
72/// fix. Free functions and other shapes are rejected (their type is not visible in the tokens).
73fn source_type(expr: &Expr) -> syn::Result<Type> {
74    match expr {
75        Expr::Call(ExprCall { func, .. }) => match &**func {
76            Expr::Path(ExprPath {
77                path, qself: None, ..
78            }) => type_from_constructor_path(path),
79            _ => Err(unsupported_source(expr)),
80        },
81        Expr::Struct(ExprStruct { path, .. }) => Ok(Type::Path(TypePath {
82            qself: None,
83            path: path.clone(),
84        })),
85        Expr::MethodCall(ExprMethodCall { receiver, .. }) => source_type(receiver),
86        _ => Err(unsupported_source(expr)),
87    }
88}
89
90/// Builds the type from a constructor path by dropping the final segment (`Type::new` -> `Type`).
91fn type_from_constructor_path(path: &Path) -> syn::Result<Type> {
92    let n = path.segments.len();
93    if n < 2 {
94        return Err(syn::Error::new_spanned(
95            path,
96            "expected `Type::new(..)`: the path must name a type and an associated constructor",
97        ));
98    }
99    let segments = path.segments.iter().take(n - 1).cloned().collect();
100    Ok(Type::Path(TypePath {
101        qself: None,
102        path: Path {
103            leading_colon: path.leading_colon,
104            segments,
105        },
106    }))
107}
108
109/// If `ty` is syntactically `Result<Reply, HandlerResult>` (under any path prefix, e.g.
110/// `std::result::Result` / `ruststream::runtime::HandlerResult`), returns the reply type.
111///
112/// The check is token-based: a type alias hiding the `Result` is not recognized and is treated as
113/// a plain reply type, which then fails to compile with a `Serialize` error the user can act on.
114fn publish_result_reply(ty: &Type) -> Option<&Type> {
115    let Type::Path(TypePath { qself: None, path }) = ty else {
116        return None;
117    };
118    let last = path.segments.last()?;
119    if last.ident != "Result" {
120        return None;
121    }
122    let syn::PathArguments::AngleBracketed(args) = &last.arguments else {
123        return None;
124    };
125    let mut args = args.args.iter();
126    let (Some(syn::GenericArgument::Type(ok)), Some(syn::GenericArgument::Type(err)), None) =
127        (args.next(), args.next(), args.next())
128    else {
129        return None;
130    };
131    let Type::Path(TypePath {
132        qself: None,
133        path: err_path,
134    }) = err
135    else {
136        return None;
137    };
138    (err_path.segments.last()?.ident == "HandlerResult").then_some(ok)
139}
140
141fn unsupported_source(expr: &Expr) -> syn::Error {
142    syn::Error::new_spanned(
143        expr,
144        "expected a string literal name, `Type::new(..)`, `Type { .. }`, or a builder chain on \
145         one of those - a free function does not expose its type to the macro",
146    )
147}
148
149/// Turns an `async fn` handler into a mountable subscriber definition.
150///
151/// ```ignore
152/// /// Processes incoming orders.
153/// #[subscriber("orders")]
154/// async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack }
155/// // later: broker_scope.include(handle);
156///
157/// // reply form: the return value is encoded and published to "responses" through the
158/// // TypedPublisher (broker + reply codec) passed at wiring time.
159/// #[subscriber("requests", publish("responses"))]
160/// async fn reply(req: &Request) -> Response { /* ... */ }
161/// // later: broker_scope.include_publishing(reply, typed_publisher);
162///
163/// // reply form with explicit ack control: `Ok` publishes the reply, `Err` skips it and the
164/// // dispatcher acts on the returned HandlerResult.
165/// #[subscriber("requests", publish("responses"))]
166/// async fn confirm(req: &Request) -> Result<Response, HandlerResult> { /* ... */ }
167/// ```
168///
169/// Without `publish(..)` the handler returns any `IntoHandlerResult` (a `HandlerResult`, `()`, or
170/// `Result<_, E>`). With `publish(..)` it returns the reply value to publish, or
171/// `Result<Reply, HandlerResult>` to control acknowledgement: `Err(result)` publishes nothing and
172/// returns `result` to the dispatcher. The `Result` form is detected syntactically, so spell it
173/// out in the signature (a type alias is treated as a plain reply type).
174///
175/// In both forms the handler may declare an optional second parameter, the per-delivery
176/// `&mut Context`, to read app state or publish manually.
177#[proc_macro_attribute]
178pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
179    let args = parse_macro_input!(attr as SubscriberArgs);
180    let func = parse_macro_input!(item as ItemFn);
181    expand(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
182}
183
184/// Generates a `main` entry point for a `RustStream` service.
185///
186/// Place it on a synchronous, argument-free function that builds and returns a `RustStream`
187/// application. The expansion keeps the function and adds a `main` that hands it to
188/// `ruststream::runtime::cli::run_main`, producing a binary that understands the `run` and
189/// `asyncapi gen` commands with no hand-written runtime boilerplate.
190///
191/// ```ignore
192/// #[ruststream::app]
193/// fn app() -> RustStream {
194///     RustStream::new(AppInfo::new("svc", "0.1.0")).register_broker(MemoryBroker::new())
195/// }
196/// ```
197#[proc_macro_attribute]
198pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
199    let func = parse_macro_input!(item as ItemFn);
200    expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
201}
202
203fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
204    if !attr.is_empty() {
205        return Err(syn::Error::new_spanned(
206            attr,
207            "#[ruststream::app] takes no arguments",
208        ));
209    }
210    if let Some(asyncness) = func.sig.asyncness {
211        return Err(syn::Error::new_spanned(
212            asyncness,
213            "#[ruststream::app] requires a synchronous builder returning `RustStream`",
214        ));
215    }
216    if !func.sig.inputs.is_empty() {
217        return Err(syn::Error::new_spanned(
218            &func.sig.inputs,
219            "#[ruststream::app] builder must take no arguments",
220        ));
221    }
222    let name = &func.sig.ident;
223    Ok(quote! {
224        #func
225
226        fn main() -> ::std::process::ExitCode {
227            ::ruststream::runtime::cli::run_main(#name)
228        }
229    }
230    .into())
231}
232
233/// The pieces of the handler shared by both expansion forms, extracted from the signature.
234struct HandlerParts<'a> {
235    vis: &'a syn::Visibility,
236    name: &'a Ident,
237    block: &'a syn::Block,
238    pat: &'a syn::Pat,
239    input_ty: &'a Type,
240    description: TokenStream2,
241    source_ty: TokenStream2,
242    source_expr: TokenStream2,
243    input_schema: TokenStream2,
244    message_meta: TokenStream2,
245    ctx_param: TokenStream2,
246}
247
248fn handler_parts<'a>(args: &SubscriberArgs, func: &'a ItemFn) -> syn::Result<HandlerParts<'a>> {
249    let first = func.sig.inputs.first().ok_or_else(|| {
250        syn::Error::new_spanned(
251            &func.sig,
252            "a #[subscriber] handler must take exactly one message parameter",
253        )
254    })?;
255    let FnArg::Typed(PatType { pat, ty, .. }) = first else {
256        return Err(syn::Error::new_spanned(
257            first,
258            "a #[subscriber] handler cannot take `self`",
259        ));
260    };
261    let Type::Reference(reference) = &**ty else {
262        return Err(syn::Error::new_spanned(
263            ty,
264            "the message parameter must be a reference `&T`",
265        ));
266    };
267    let input_ty = &*reference.elem;
268    let description = doc_description(&func.attrs);
269    let (source_ty, source_expr) = source_tokens(&args.source)?;
270
271    // Captures the input type's JSON Schema for AsyncAPI when it implements `JsonSchema` (and the
272    // `asyncapi` feature is on), via the autoref-specialization probe; `None` otherwise. The
273    // concrete input type makes the trait selection resolve at the call site.
274    let input_schema = quote! {
275        fn input_schema(&self) -> ::core::option::Option<::std::string::String> {
276            #[allow(unused_imports)]
277            use ::ruststream::__private::NoSchemaProbe as _;
278            ::ruststream::__private::Probe::<#input_ty>::new().schema_json()
279        }
280    };
281
282    // Captures the input type's `Message` name / description when it implements that trait, via
283    // the same autoref-specialization probe; `None` otherwise.
284    let message_meta = quote! {
285        fn message_name(&self) -> ::core::option::Option<&'static str> {
286            #[allow(unused_imports)]
287            use ::ruststream::__private::NoMessageProbe as _;
288            ::ruststream::__private::Probe::<#input_ty>::new().message_name()
289        }
290
291        fn message_description(&self) -> ::core::option::Option<&'static str> {
292            #[allow(unused_imports)]
293            use ::ruststream::__private::NoMessageProbe as _;
294            ::ruststream::__private::Probe::<#input_ty>::new().message_description()
295        }
296    };
297
298    // Optional second handler parameter: the per-delivery `&mut Context`. If the user declares it,
299    // bind it to their name; otherwise generate an ignored binding.
300    let ctx_param = if let Some(FnArg::Typed(PatType { pat, .. })) = func.sig.inputs.get(1) {
301        quote!(#pat)
302    } else {
303        quote!(_ctx)
304    };
305
306    Ok(HandlerParts {
307        vis: &func.vis,
308        name: &func.sig.ident,
309        block: &func.block,
310        pat,
311        input_ty,
312        description,
313        source_ty,
314        source_expr,
315        input_schema,
316        message_meta,
317        ctx_param,
318    })
319}
320
321fn expand(args: &SubscriberArgs, func: &ItemFn) -> syn::Result<TokenStream> {
322    let parts = handler_parts(args, func)?;
323    let body = if let Some(reply_topic) = &args.publish {
324        expand_publishing(&parts, func, reply_topic)?
325    } else {
326        expand_subscribing(&parts)
327    };
328    Ok(body.into())
329}
330
331fn expand_publishing(
332    parts: &HandlerParts<'_>,
333    func: &ItemFn,
334    reply_topic: &LitStr,
335) -> syn::Result<TokenStream2> {
336    let HandlerParts {
337        vis,
338        name,
339        block,
340        pat,
341        input_ty,
342        description,
343        source_ty,
344        source_expr,
345        input_schema,
346        message_meta,
347        ctx_param,
348    } = parts;
349
350    let declared_ty = match &func.sig.output {
351        ReturnType::Type(_, ty) => &**ty,
352        ReturnType::Default => {
353            return Err(syn::Error::new_spanned(
354                &func.sig,
355                "a publishing handler must return the reply value",
356            ));
357        }
358    };
359    // `-> Result<Reply, HandlerResult>` lets the handler skip the publish: `Err(result)` is
360    // returned to the dispatcher as-is. A plain `-> Reply` is wrapped in `Ok` here. The check
361    // is syntactic, so a type alias hiding the `Result` is treated as a plain reply type.
362    let (reply_ty, call_body) = match publish_result_reply(declared_ty) {
363        Some(reply_ty) => (reply_ty, quote!((async move #block).await)),
364        None => (
365            declared_ty,
366            quote!(::core::result::Result::Ok((async move #block).await)),
367        ),
368    };
369    Ok(quote! {
370        #[allow(non_camel_case_types)]
371        #vis struct #name;
372
373        impl ::ruststream::runtime::PublishingDef for #name {
374            type Input = #input_ty;
375            type Reply = #reply_ty;
376            type Source = #source_ty;
377
378            fn source(&self) -> Self::Source { #source_expr }
379            fn reply_name(&self) -> &str { #reply_topic }
380
381            fn description(&self) -> ::core::option::Option<&str> {
382                #description
383            }
384
385            #input_schema
386
387            #message_meta
388
389            async fn call(
390                &self,
391                #pat: &#input_ty,
392                #ctx_param: &mut ::ruststream::runtime::Context<'_>,
393            ) -> ::core::result::Result<#reply_ty, ::ruststream::runtime::HandlerResult> {
394                #call_body
395            }
396        }
397    })
398}
399
400fn expand_subscribing(parts: &HandlerParts<'_>) -> TokenStream2 {
401    let HandlerParts {
402        vis,
403        name,
404        block,
405        pat,
406        input_ty,
407        description,
408        source_ty,
409        source_expr,
410        input_schema,
411        message_meta,
412        ctx_param,
413    } = parts;
414
415    quote! {
416            #[derive(Clone, Copy)]
417            #[allow(non_camel_case_types)]
418            #vis struct #name;
419
420            impl ::ruststream::runtime::Handler<#input_ty> for #name {
421                async fn handle(
422                    &self,
423                    #pat: &#input_ty,
424                    #ctx_param: &mut ::ruststream::runtime::Context<'_>,
425                ) -> ::ruststream::runtime::HandlerResult {
426                    ::ruststream::runtime::IntoHandlerResult::into_handler_result(
427                        (async move #block).await,
428                    )
429                }
430            }
431
432            impl ::ruststream::runtime::SubscriberDef for #name {
433                type Input = #input_ty;
434                type Handler = Self;
435                type Source = #source_ty;
436
437                fn source(&self) -> Self::Source { #source_expr }
438
439                fn description(&self) -> ::core::option::Option<&str> {
440                    #description
441                }
442
443                #input_schema
444
445                #message_meta
446
447                fn into_handler(self) -> Self { self }
448            }
449    }
450}
451
452/// Derives [`Message`](../ruststream/trait.Message.html) metadata: the type name and its doc
453/// comment.
454///
455/// ```ignore
456/// /// An order placed by a customer.
457/// #[derive(Message)]
458/// struct Order { id: u32 }
459/// // Order::NAME == "Order", Order::DESCRIPTION == Some("An order placed by a customer.")
460/// ```
461#[proc_macro_derive(Message)]
462pub fn derive_message(item: TokenStream) -> TokenStream {
463    let input = parse_macro_input!(item as DeriveInput);
464    let name = &input.ident;
465    let name_str = name.to_string();
466    let description = doc_description(&input.attrs);
467    let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
468
469    quote! {
470        impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
471            const NAME: &'static str = #name_str;
472            const DESCRIPTION: ::core::option::Option<&'static str> = #description;
473        }
474    }
475    .into()
476}
477
478/// Collects doc-comment lines from `attrs` into a single description literal, or `None`.
479fn doc_description(attrs: &[Attribute]) -> TokenStream2 {
480    let lines: Vec<String> = attrs
481        .iter()
482        .filter(|attr| attr.path().is_ident("doc"))
483        .filter_map(|attr| match &attr.meta {
484            Meta::NameValue(nv) => match &nv.value {
485                Expr::Lit(ExprLit {
486                    lit: Lit::Str(text),
487                    ..
488                }) => Some(text.value().trim().to_owned()),
489                _ => None,
490            },
491            _ => None,
492        })
493        .collect();
494
495    if lines.is_empty() {
496        quote!(::core::option::Option::None)
497    } else {
498        let joined = lines.join("\n");
499        quote!(::core::option::Option::Some(#joined))
500    }
501}