Skip to main content

ruststream_macros/
lib.rs

1//! Procedural macros for [RustStream](https://github.com/powersemmi/ruststream).
2//!
3//! Re-exported from the `ruststream` crate under the `macros` feature; depend on that rather than
4//! on this crate directly.
5
6mod expand;
7mod parse;
8
9use proc_macro::TokenStream;
10use proc_macro2::TokenStream as TokenStream2;
11use quote::quote;
12use syn::{DeriveInput, ItemFn, parse_macro_input};
13
14use parse::{SubscriberArgs, doc_description};
15
16/// Turns an `async fn` handler into a mountable subscriber definition.
17///
18/// ```ignore
19/// /// Processes incoming orders.
20/// #[subscriber("orders")]
21/// async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack }
22/// // later: broker_scope.include(handle);
23///
24/// // reply form: the return value is encoded and published to "responses" through the
25/// // TypedPublisher (broker + reply codec) passed at wiring time.
26/// #[subscriber("requests", publish("responses"))]
27/// async fn reply(req: &Request) -> Response { /* ... */ }
28/// // later: broker_scope.include_publishing(reply, typed_publisher);
29///
30/// // reply form with explicit ack control: `Ok` publishes the reply, `Err` skips it and the
31/// // dispatcher acts on the returned HandlerResult.
32/// #[subscriber("requests", publish("responses"))]
33/// async fn confirm(req: &Request) -> Result<Response, HandlerResult> { /* ... */ }
34///
35/// // batch form: the handler takes the whole decoded batch as a slice; the source's
36/// // subscriber must implement BatchSubscriber. Mounted with include_batch.
37/// #[subscriber(batch("orders"))]
38/// async fn bill(orders: &[Order]) -> HandlerResult { /* settles the whole batch */ }
39/// ```
40///
41/// Without `publish(..)` the handler returns any `IntoHandlerResult` (a `HandlerResult`, `()`, or
42/// `Result<_, E>`). With `publish(..)` it returns the reply value to publish, or
43/// `Result<Reply, HandlerResult>` to control acknowledgement: `Err(result)` publishes nothing and
44/// returns `result` to the dispatcher. The `Result` form is detected syntactically, so spell it
45/// out in the signature (a type alias is treated as a plain reply type).
46///
47/// Wrapping the source in `batch(..)` switches the definition to a `BatchDef`: the handler takes
48/// `&[T]` and runs once per batch pulled from the broker's `BatchSubscriber` (use the `Buffered`
49/// adapter for brokers without native batching). It returns any `IntoBatchResult` - one outcome
50/// for the whole batch (`HandlerResult`, `()`, `Result<_, E>`), or `Vec<HandlerResult>` to settle
51/// element `i` of the slice with outcome `i`. The source type is recovered from the constructor
52/// path, so a generic source spells its parameters:
53/// `batch(Buffered::<Name>::new(Name::new("orders")))`.
54///
55/// Combining `batch(..)` with `publish(..)` produces a `BatchPublishingDef` (mounted with
56/// `include_batch_publishing`): the handler returns `Vec<Reply>` (or
57/// `Result<Vec<Reply>, HandlerResult>` for explicit ack control, all-or-nothing - selective
58/// outcomes do not compose with a transaction), every reply is published to the reply name, and
59/// the whole batch is acked after. Hand the mount a `TypedPublisher` for independent reply
60/// publishes, or `.transactional()` for one transaction per batch.
61///
62/// A `workers(n)` clause processes up to `n` deliveries (or batches) of this subscriber
63/// concurrently, each in its own task; global processing order is lost by design, and
64/// back-pressure holds at `n` in-flight deliveries. `workers(n, by_key)` switches to `n`
65/// sequential lanes keyed by the message's partition key, preserving per-key ordering
66/// (single-message forms only). The default is the sequential loop.
67///
68/// In both forms the handler may declare an optional second parameter, the per-delivery
69/// `&mut Context`, to read app state or publish manually.
70#[proc_macro_attribute]
71pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
72    let args = parse_macro_input!(attr as SubscriberArgs);
73    let func = parse_macro_input!(item as ItemFn);
74    expand::subscriber(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
75}
76
77/// Generates a `main` entry point for a `RustStream` service.
78///
79/// Place it on a synchronous, argument-free function that builds and returns a `RustStream`
80/// application. The expansion keeps the function and adds a `main` that hands it to
81/// `ruststream::runtime::cli::run_main`, producing a binary that understands the `run` and
82/// `asyncapi gen` commands with no hand-written runtime boilerplate.
83///
84/// ```ignore
85/// #[ruststream::app]
86/// fn app() -> RustStream {
87///     RustStream::new(AppInfo::new("svc", "0.1.0")).register_broker(MemoryBroker::new())
88/// }
89/// ```
90#[proc_macro_attribute]
91pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
92    let func = parse_macro_input!(item as ItemFn);
93    expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
94}
95
96fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
97    if !attr.is_empty() {
98        return Err(syn::Error::new_spanned(
99            attr,
100            "#[ruststream::app] takes no arguments",
101        ));
102    }
103    if let Some(asyncness) = func.sig.asyncness {
104        return Err(syn::Error::new_spanned(
105            asyncness,
106            "#[ruststream::app] requires a synchronous builder returning `RustStream`",
107        ));
108    }
109    if !func.sig.inputs.is_empty() {
110        return Err(syn::Error::new_spanned(
111            &func.sig.inputs,
112            "#[ruststream::app] builder must take no arguments",
113        ));
114    }
115    let name = &func.sig.ident;
116    Ok(quote! {
117        #func
118
119        fn main() -> ::std::process::ExitCode {
120            ::ruststream::runtime::cli::run_main(#name)
121        }
122    }
123    .into())
124}
125
126/// Derives [`Message`](../ruststream/trait.Message.html) metadata: the type name and its doc
127/// comment.
128///
129/// ```ignore
130/// /// An order placed by a customer.
131/// #[derive(Message)]
132/// struct Order { id: u32 }
133/// // Order::NAME == "Order", Order::DESCRIPTION == Some("An order placed by a customer.")
134/// ```
135#[proc_macro_derive(Message)]
136pub fn derive_message(item: TokenStream) -> TokenStream {
137    let input = parse_macro_input!(item as DeriveInput);
138    let name = &input.ident;
139    let name_str = name.to_string();
140    let description = doc_description(&input.attrs);
141    let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
142
143    quote! {
144        impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
145            const NAME: &'static str = #name_str;
146            const DESCRIPTION: ::core::option::Option<&'static str> = #description;
147        }
148    }
149    .into()
150}