ruststream-macros 0.3.1

Procedural macros for the RustStream messaging framework.
Documentation
//! Procedural macros for [RustStream](https://github.com/powersemmi/ruststream).
//!
//! Re-exported from the `ruststream` crate under the `macros` feature; depend on that rather than
//! on this crate directly.

mod expand;
mod parse;

use proc_macro::TokenStream;
use proc_macro2::TokenStream as TokenStream2;
use quote::quote;
use syn::{DeriveInput, ItemFn, parse_macro_input};

use parse::{SubscriberArgs, doc_description};

/// Turns an `async fn` handler into a mountable subscriber definition.
///
/// ```ignore
/// /// Processes incoming orders.
/// #[subscriber("orders")]
/// async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack }
/// // later: broker_scope.include(handle);
///
/// // reply form: the return value is encoded and published to "responses" through the
/// // TypedPublisher (broker + reply codec) passed at wiring time.
/// #[subscriber("requests", publish("responses"))]
/// async fn reply(req: &Request) -> Response { /* ... */ }
/// // later: broker_scope.include_publishing(reply, typed_publisher);
///
/// // reply form with explicit ack control: `Ok` publishes the reply, `Err` skips it and the
/// // dispatcher acts on the returned HandlerResult.
/// #[subscriber("requests", publish("responses"))]
/// async fn confirm(req: &Request) -> Result<Response, HandlerResult> { /* ... */ }
///
/// // batch form: the handler takes the whole decoded batch as a slice; the source's
/// // subscriber must implement BatchSubscriber. Mounted with include_batch.
/// #[subscriber(batch("orders"))]
/// async fn bill(orders: &[Order]) -> HandlerResult { /* settles the whole batch */ }
/// ```
///
/// Without `publish(..)` the handler returns any `IntoHandlerResult` (a `HandlerResult`, `()`, or
/// `Result<_, E>`). With `publish(..)` it returns the reply value to publish, or
/// `Result<Reply, HandlerResult>` to control acknowledgement: `Err(result)` publishes nothing and
/// returns `result` to the dispatcher. The `Result` form is detected syntactically, so spell it
/// out in the signature (a type alias is treated as a plain reply type).
///
/// Wrapping the source in `batch(..)` switches the definition to a `BatchDef`: the handler takes
/// `&[T]` and runs once per batch pulled from the broker's `BatchSubscriber` (use the `Buffered`
/// adapter for brokers without native batching). It returns any `IntoBatchResult` - one outcome
/// for the whole batch (`HandlerResult`, `()`, `Result<_, E>`), or `Vec<HandlerResult>` to settle
/// element `i` of the slice with outcome `i`. The source type is recovered from the constructor
/// path, so a generic source spells its parameters:
/// `batch(Buffered::<Name>::new(Name::new("orders")))`.
///
/// Combining `batch(..)` with `publish(..)` produces a `BatchPublishingDef` (mounted with
/// `include_batch_publishing`): the handler returns `Vec<Reply>` (or
/// `Result<Vec<Reply>, HandlerResult>` for explicit ack control, all-or-nothing - selective
/// outcomes do not compose with a transaction), every reply is published to the reply name, and
/// the whole batch is acked after. Hand the mount a `TypedPublisher` for independent reply
/// publishes, or `.transactional()` for one transaction per batch.
///
/// A `workers(n)` clause processes up to `n` deliveries (or batches) of this subscriber
/// concurrently, each in its own task; global processing order is lost by design, and
/// back-pressure holds at `n` in-flight deliveries. `workers(n, by_key)` switches to `n`
/// sequential lanes keyed by the message's partition key, preserving per-key ordering
/// (single-message forms only). The default is the sequential loop.
///
/// In both forms the handler may declare an optional second parameter, the per-delivery
/// `&mut Context`, to read app state or publish manually.
#[proc_macro_attribute]
pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
    let args = parse_macro_input!(attr as SubscriberArgs);
    let func = parse_macro_input!(item as ItemFn);
    expand::subscriber(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
}

/// Generates a `main` entry point for a `RustStream` service.
///
/// Place it on a synchronous, argument-free function that builds and returns a `RustStream`
/// application. The expansion keeps the function and adds a `main` that hands it to
/// `ruststream::runtime::cli::run_main`, producing a binary that understands the `run` and
/// `asyncapi gen` commands with no hand-written runtime boilerplate.
///
/// ```ignore
/// #[ruststream::app]
/// fn app() -> RustStream {
///     RustStream::new(AppInfo::new("svc", "0.1.0")).register_broker(MemoryBroker::new())
/// }
/// ```
#[proc_macro_attribute]
pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
    let func = parse_macro_input!(item as ItemFn);
    expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
}

fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
    if !attr.is_empty() {
        return Err(syn::Error::new_spanned(
            attr,
            "#[ruststream::app] takes no arguments",
        ));
    }
    if let Some(asyncness) = func.sig.asyncness {
        return Err(syn::Error::new_spanned(
            asyncness,
            "#[ruststream::app] requires a synchronous builder returning `RustStream`",
        ));
    }
    if !func.sig.inputs.is_empty() {
        return Err(syn::Error::new_spanned(
            &func.sig.inputs,
            "#[ruststream::app] builder must take no arguments",
        ));
    }
    let name = &func.sig.ident;
    Ok(quote! {
        #func

        fn main() -> ::std::process::ExitCode {
            ::ruststream::runtime::cli::run_main(#name)
        }
    }
    .into())
}

/// Derives [`Message`](../ruststream/trait.Message.html) metadata: the type name and its doc
/// comment.
///
/// ```ignore
/// /// An order placed by a customer.
/// #[derive(Message)]
/// struct Order { id: u32 }
/// // Order::NAME == "Order", Order::DESCRIPTION == Some("An order placed by a customer.")
/// ```
#[proc_macro_derive(Message)]
pub fn derive_message(item: TokenStream) -> TokenStream {
    let input = parse_macro_input!(item as DeriveInput);
    let name = &input.ident;
    let name_str = name.to_string();
    let description = doc_description(&input.attrs);
    let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();

    quote! {
        impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
            const NAME: &'static str = #name_str;
            const DESCRIPTION: ::core::option::Option<&'static str> = #description;
        }
    }
    .into()
}