ruststream_macros/lib.rs
1//! Procedural macros for [RustStream](https://github.com/powersemmi/ruststream).
2//!
3//! Re-exported from the `ruststream` crate under the `macros` feature; depend on that rather than
4//! on this crate directly.
5
6mod expand;
7mod parse;
8
9use proc_macro::TokenStream;
10use proc_macro2::TokenStream as TokenStream2;
11use quote::quote;
12use syn::{DeriveInput, ItemFn, parse_macro_input};
13
14use parse::{SubscriberArgs, doc_description};
15
16/// Turns an `async fn` handler into a mountable subscriber definition.
17///
18/// ```ignore
19/// /// Processes incoming orders.
20/// #[subscriber("orders")]
21/// async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack }
22/// // later: broker_scope.include(handle);
23///
24/// // reply form: the return value is encoded and published to "responses" through the
25/// // TypedPublisher (broker + reply codec) passed at wiring time.
26/// #[subscriber("requests", publish("responses"))]
27/// async fn reply(req: &Request) -> Response { /* ... */ }
28/// // later: broker_scope.include_publishing(reply, typed_publisher);
29///
30/// // reply form with explicit ack control: `Ok` publishes the reply, `Err` skips it and the
31/// // dispatcher acts on the returned HandlerResult.
32/// #[subscriber("requests", publish("responses"))]
33/// async fn confirm(req: &Request) -> Result<Response, HandlerResult> { /* ... */ }
34///
35/// // batch form: the handler takes the whole decoded batch as a slice; the source's
36/// // subscriber must implement BatchSubscriber. Mounted with include_batch.
37/// #[subscriber(batch("orders"))]
38/// async fn bill(orders: &[Order]) -> HandlerResult { /* settles the whole batch */ }
39/// ```
40///
41/// Without `publish(..)` the handler returns any `Into<Settle>` (a `Settle`, a `HandlerResult`,
42/// `()`, or `Result<_, E>`). Attach a post-settle continuation with `HandlerResult::ack().and_after`
43/// (any outcome works), which runs after the message is settled. With `publish(..)` it returns the
44/// reply value to publish, or `Result<Reply, HandlerResult>` to control acknowledgement:
45/// `Err(result)` publishes nothing and returns `result` to the dispatcher. The `Result` form is
46/// detected syntactically, so spell it out in the signature (a type alias is treated as a plain
47/// reply type).
48///
49/// Wrapping the source in `batch(..)` switches the definition to a `BatchDef`: the handler takes
50/// `&[T]` and runs once per batch pulled from the broker's `BatchSubscriber` (use the `Buffered`
51/// adapter for brokers without native batching). It returns any `IntoBatchResult` - one outcome
52/// for the whole batch (`HandlerResult`, `()`, `Result<_, E>`), or a per-element vector
53/// (`Vec<Settle>`, or `Vec<HandlerResult>`) to settle element `i` of the slice with outcome `i`,
54/// each element carrying its own optional `and_after` continuation. The source type is recovered
55/// from the constructor path, so a generic source spells its parameters:
56/// `batch(Buffered::<Name>::new(Name::new("orders")))`.
57///
58/// Combining `batch(..)` with `publish(..)` produces a `BatchPublishingDef` (mounted with
59/// `include_batch_publishing`): the handler returns `Vec<Reply>` (or
60/// `Result<Vec<Reply>, HandlerResult>` for explicit ack control, all-or-nothing - selective
61/// outcomes do not compose with a transaction), every reply is published to the reply name, and
62/// the whole batch is acked after. Hand the mount a `TypedPublisher` for independent reply
63/// publishes, or `.transactional()` for one transaction per batch.
64///
65/// A `workers(n)` clause processes up to `n` deliveries (or batches) of this subscriber
66/// concurrently, each in its own task; global processing order is lost by design, and
67/// back-pressure holds at `n` in-flight deliveries. `workers(n, by_key)` switches to `n`
68/// sequential lanes keyed by the message's partition key, preserving per-key ordering
69/// (single-message forms only). The default is the sequential loop.
70///
71/// In both forms the handler may declare an optional second parameter, the per-delivery
72/// `&mut Context`, to read app state or publish manually.
73#[proc_macro_attribute]
74pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
75 let args = parse_macro_input!(attr as SubscriberArgs);
76 let func = parse_macro_input!(item as ItemFn);
77 expand::subscriber(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
78}
79
80/// Generates a `main` entry point for a `RustStream` service.
81///
82/// Place it on a synchronous, argument-free function that builds and returns a `RustStream`
83/// application. The expansion keeps the function and adds a `main` that hands it to
84/// `ruststream::runtime::cli::run_main`, producing a binary that understands the `run` and
85/// `asyncapi gen` commands with no hand-written runtime boilerplate.
86///
87/// ```ignore
88/// #[ruststream::app]
89/// fn app() -> RustStream {
90/// RustStream::new(AppInfo::new("svc", "0.1.0")).register_broker(MemoryBroker::new())
91/// }
92/// ```
93#[proc_macro_attribute]
94pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
95 let func = parse_macro_input!(item as ItemFn);
96 expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
97}
98
99fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
100 if !attr.is_empty() {
101 return Err(syn::Error::new_spanned(
102 attr,
103 "#[ruststream::app] takes no arguments",
104 ));
105 }
106 if let Some(asyncness) = func.sig.asyncness {
107 return Err(syn::Error::new_spanned(
108 asyncness,
109 "#[ruststream::app] requires a synchronous builder returning `RustStream`",
110 ));
111 }
112 if !func.sig.inputs.is_empty() {
113 return Err(syn::Error::new_spanned(
114 &func.sig.inputs,
115 "#[ruststream::app] builder must take no arguments",
116 ));
117 }
118 let name = &func.sig.ident;
119 Ok(quote! {
120 #func
121
122 fn main() -> ::std::process::ExitCode {
123 ::ruststream::runtime::cli::run_main(#name)
124 }
125 }
126 .into())
127}
128
129/// Derives [`Message`](../ruststream/trait.Message.html) metadata: the type name and its doc
130/// comment.
131///
132/// ```ignore
133/// /// An order placed by a customer.
134/// #[derive(Message)]
135/// struct Order { id: u32 }
136/// // Order::NAME == "Order", Order::DESCRIPTION == Some("An order placed by a customer.")
137/// ```
138#[proc_macro_derive(Message)]
139pub fn derive_message(item: TokenStream) -> TokenStream {
140 let input = parse_macro_input!(item as DeriveInput);
141 let name = &input.ident;
142 let name_str = name.to_string();
143 let description = doc_description(&input.attrs);
144 let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
145
146 quote! {
147 impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
148 const NAME: &'static str = #name_str;
149 const DESCRIPTION: ::core::option::Option<&'static str> = #description;
150 }
151 }
152 .into()
153}