1use proc_macro::TokenStream;
7use proc_macro2::TokenStream as TokenStream2;
8use quote::quote;
9use syn::parse::{Parse, ParseStream};
10use syn::{
11 Attribute, DeriveInput, Expr, ExprCall, ExprLit, ExprPath, ExprStruct, FnArg, Ident, ItemFn,
12 Lit, LitStr, Meta, PatType, Path, ReturnType, Token, Type, TypePath, parenthesized,
13 parse_macro_input,
14};
15
16struct SubscriberArgs {
20 source: Expr,
21 publish: Option<LitStr>,
22}
23
24impl Parse for SubscriberArgs {
25 fn parse(input: ParseStream) -> syn::Result<Self> {
26 let source: Expr = input.parse()?;
27 let mut publish = None;
28 if input.peek(Token![,]) {
29 input.parse::<Token![,]>()?;
30 let keyword: Ident = input.parse()?;
31 if keyword != "publish" {
32 return Err(syn::Error::new(
33 keyword.span(),
34 "expected `publish(\"reply-topic\")`",
35 ));
36 }
37 let content;
38 parenthesized!(content in input);
39 publish = Some(content.parse()?);
40 }
41 Ok(Self { source, publish })
42 }
43}
44
45fn source_tokens(expr: &Expr) -> syn::Result<(TokenStream2, TokenStream2)> {
52 if let Expr::Lit(ExprLit {
53 lit: Lit::Str(name),
54 ..
55 }) = expr
56 {
57 return Ok((
58 quote!(::ruststream::Name),
59 quote!(::ruststream::Name::new(#name)),
60 ));
61 }
62
63 let ty: Type = match expr {
64 Expr::Call(ExprCall { func, .. }) => match &**func {
65 Expr::Path(ExprPath {
66 path, qself: None, ..
67 }) => type_from_constructor_path(path)?,
68 _ => return Err(unsupported_source(expr)),
69 },
70 Expr::Struct(ExprStruct { path, .. }) => Type::Path(TypePath {
71 qself: None,
72 path: path.clone(),
73 }),
74 _ => return Err(unsupported_source(expr)),
75 };
76 Ok((quote!(#ty), quote!(#expr)))
77}
78
79fn type_from_constructor_path(path: &Path) -> syn::Result<Type> {
81 let n = path.segments.len();
82 if n < 2 {
83 return Err(syn::Error::new_spanned(
84 path,
85 "expected `Type::new(..)`: the path must name a type and an associated constructor",
86 ));
87 }
88 let segments = path.segments.iter().take(n - 1).cloned().collect();
89 Ok(Type::Path(TypePath {
90 qself: None,
91 path: Path {
92 leading_colon: path.leading_colon,
93 segments,
94 },
95 }))
96}
97
98fn unsupported_source(expr: &Expr) -> syn::Error {
99 syn::Error::new_spanned(
100 expr,
101 "expected a string literal name, `Type::new(..)`, or `Type { .. }` - \
102 free functions and builder chains do not expose their type to the macro",
103 )
104}
105
106#[proc_macro_attribute]
124pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
125 let args = parse_macro_input!(attr as SubscriberArgs);
126 let func = parse_macro_input!(item as ItemFn);
127 expand(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
128}
129
130#[proc_macro_attribute]
144pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
145 let func = parse_macro_input!(item as ItemFn);
146 expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
147}
148
149fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
150 if !attr.is_empty() {
151 return Err(syn::Error::new_spanned(
152 attr,
153 "#[ruststream::app] takes no arguments",
154 ));
155 }
156 if let Some(asyncness) = func.sig.asyncness {
157 return Err(syn::Error::new_spanned(
158 asyncness,
159 "#[ruststream::app] requires a synchronous builder returning `RustStream`",
160 ));
161 }
162 if !func.sig.inputs.is_empty() {
163 return Err(syn::Error::new_spanned(
164 &func.sig.inputs,
165 "#[ruststream::app] builder must take no arguments",
166 ));
167 }
168 let name = &func.sig.ident;
169 Ok(quote! {
170 #func
171
172 fn main() -> ::std::process::ExitCode {
173 ::ruststream::runtime::cli::run_main(#name)
174 }
175 }
176 .into())
177}
178
179fn expand(args: &SubscriberArgs, func: &ItemFn) -> syn::Result<TokenStream> {
180 let vis = &func.vis;
181 let name = &func.sig.ident;
182 let block = &func.block;
183
184 let first = func.sig.inputs.first().ok_or_else(|| {
185 syn::Error::new_spanned(
186 &func.sig,
187 "a #[subscriber] handler must take exactly one message parameter",
188 )
189 })?;
190 let FnArg::Typed(PatType { pat, ty, .. }) = first else {
191 return Err(syn::Error::new_spanned(
192 first,
193 "a #[subscriber] handler cannot take `self`",
194 ));
195 };
196 let Type::Reference(reference) = &**ty else {
197 return Err(syn::Error::new_spanned(
198 ty,
199 "the message parameter must be a reference `&T`",
200 ));
201 };
202 let input_ty = &reference.elem;
203 let description = doc_description(&func.attrs);
204 let (source_ty, source_expr) = source_tokens(&args.source)?;
205
206 let input_schema = quote! {
210 fn input_schema(&self) -> ::core::option::Option<::std::string::String> {
211 #[allow(unused_imports)]
212 use ::ruststream::__private::NoSchemaProbe as _;
213 ::ruststream::__private::Probe::<#input_ty>::new().schema_json()
214 }
215 };
216
217 let ctx_param = if let Some(FnArg::Typed(PatType { pat, .. })) = func.sig.inputs.get(1) {
220 quote!(#pat)
221 } else {
222 quote!(_ctx)
223 };
224
225 let body = if let Some(reply_topic) = &args.publish {
226 let reply_ty = match &func.sig.output {
227 ReturnType::Type(_, ty) => &**ty,
228 ReturnType::Default => {
229 return Err(syn::Error::new_spanned(
230 &func.sig,
231 "a publishing handler must return the reply value",
232 ));
233 }
234 };
235 quote! {
236 #[allow(non_camel_case_types)]
237 #vis struct #name;
238
239 impl ::ruststream::runtime::PublishingDef for #name {
240 type Input = #input_ty;
241 type Reply = #reply_ty;
242 type Source = #source_ty;
243
244 fn source(&self) -> Self::Source { #source_expr }
245 fn reply_name(&self) -> &str { #reply_topic }
246
247 fn description(&self) -> ::core::option::Option<&str> {
248 #description
249 }
250
251 #input_schema
252
253 fn call(
254 &self,
255 #pat: &#input_ty,
256 ) -> impl ::core::future::Future<Output = #reply_ty> + ::core::marker::Send {
257 async move #block
258 }
259 }
260 }
261 } else {
262 quote! {
263 #[derive(Clone, Copy)]
264 #[allow(non_camel_case_types)]
265 #vis struct #name;
266
267 impl ::ruststream::runtime::Handler<#input_ty> for #name {
268 async fn handle(
269 &self,
270 #pat: &#input_ty,
271 #ctx_param: &mut ::ruststream::runtime::Context<'_>,
272 ) -> ::ruststream::runtime::HandlerResult {
273 ::ruststream::runtime::IntoHandlerResult::into_handler_result(
274 (async move #block).await,
275 )
276 }
277 }
278
279 impl ::ruststream::runtime::SubscriberDef for #name {
280 type Input = #input_ty;
281 type Handler = Self;
282 type Source = #source_ty;
283
284 fn source(&self) -> Self::Source { #source_expr }
285
286 fn description(&self) -> ::core::option::Option<&str> {
287 #description
288 }
289
290 #input_schema
291
292 fn into_handler(self) -> Self { self }
293 }
294 }
295 };
296
297 Ok(body.into())
298}
299
300#[proc_macro_derive(Message)]
310pub fn derive_message(item: TokenStream) -> TokenStream {
311 let input = parse_macro_input!(item as DeriveInput);
312 let name = &input.ident;
313 let name_str = name.to_string();
314 let description = doc_description(&input.attrs);
315 let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
316
317 quote! {
318 impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
319 const NAME: &'static str = #name_str;
320 const DESCRIPTION: ::core::option::Option<&'static str> = #description;
321 }
322 }
323 .into()
324}
325
326fn doc_description(attrs: &[Attribute]) -> TokenStream2 {
328 let lines: Vec<String> = attrs
329 .iter()
330 .filter(|attr| attr.path().is_ident("doc"))
331 .filter_map(|attr| match &attr.meta {
332 Meta::NameValue(nv) => match &nv.value {
333 Expr::Lit(ExprLit {
334 lit: Lit::Str(text),
335 ..
336 }) => Some(text.value().trim().to_owned()),
337 _ => None,
338 },
339 _ => None,
340 })
341 .collect();
342
343 if lines.is_empty() {
344 quote!(::core::option::Option::None)
345 } else {
346 let joined = lines.join("\n");
347 quote!(::core::option::Option::Some(#joined))
348 }
349}