1use proc_macro::TokenStream;
7use proc_macro2::TokenStream as TokenStream2;
8use quote::quote;
9use syn::parse::{Parse, ParseStream};
10use syn::{
11 Attribute, DeriveInput, Expr, ExprCall, ExprLit, ExprMethodCall, ExprPath, ExprStruct, FnArg,
12 Ident, ItemFn, Lit, LitStr, Meta, PatType, Path, ReturnType, Token, Type, TypePath,
13 parenthesized, parse_macro_input,
14};
15
16struct SubscriberArgs {
20 source: Expr,
21 publish: Option<LitStr>,
22}
23
24impl Parse for SubscriberArgs {
25 fn parse(input: ParseStream) -> syn::Result<Self> {
26 let source: Expr = input.parse()?;
27 let mut publish = None;
28 if input.peek(Token![,]) {
29 input.parse::<Token![,]>()?;
30 let keyword: Ident = input.parse()?;
31 if keyword != "publish" {
32 return Err(syn::Error::new(
33 keyword.span(),
34 "expected `publish(\"reply-topic\")`",
35 ));
36 }
37 let content;
38 parenthesized!(content in input);
39 publish = Some(content.parse()?);
40 }
41 Ok(Self { source, publish })
42 }
43}
44
45fn source_tokens(expr: &Expr) -> syn::Result<(TokenStream2, TokenStream2)> {
54 if let Expr::Lit(ExprLit {
55 lit: Lit::Str(name),
56 ..
57 }) = expr
58 {
59 return Ok((
60 quote!(::ruststream::Name),
61 quote!(::ruststream::Name::new(#name)),
62 ));
63 }
64
65 let ty = source_type(expr)?;
66 Ok((quote!(#ty), quote!(#expr)))
67}
68
69fn source_type(expr: &Expr) -> syn::Result<Type> {
74 match expr {
75 Expr::Call(ExprCall { func, .. }) => match &**func {
76 Expr::Path(ExprPath {
77 path, qself: None, ..
78 }) => type_from_constructor_path(path),
79 _ => Err(unsupported_source(expr)),
80 },
81 Expr::Struct(ExprStruct { path, .. }) => Ok(Type::Path(TypePath {
82 qself: None,
83 path: path.clone(),
84 })),
85 Expr::MethodCall(ExprMethodCall { receiver, .. }) => source_type(receiver),
86 _ => Err(unsupported_source(expr)),
87 }
88}
89
90fn type_from_constructor_path(path: &Path) -> syn::Result<Type> {
92 let n = path.segments.len();
93 if n < 2 {
94 return Err(syn::Error::new_spanned(
95 path,
96 "expected `Type::new(..)`: the path must name a type and an associated constructor",
97 ));
98 }
99 let segments = path.segments.iter().take(n - 1).cloned().collect();
100 Ok(Type::Path(TypePath {
101 qself: None,
102 path: Path {
103 leading_colon: path.leading_colon,
104 segments,
105 },
106 }))
107}
108
109fn publish_result_reply(ty: &Type) -> Option<&Type> {
115 let Type::Path(TypePath { qself: None, path }) = ty else {
116 return None;
117 };
118 let last = path.segments.last()?;
119 if last.ident != "Result" {
120 return None;
121 }
122 let syn::PathArguments::AngleBracketed(args) = &last.arguments else {
123 return None;
124 };
125 let mut args = args.args.iter();
126 let (Some(syn::GenericArgument::Type(ok)), Some(syn::GenericArgument::Type(err)), None) =
127 (args.next(), args.next(), args.next())
128 else {
129 return None;
130 };
131 let Type::Path(TypePath {
132 qself: None,
133 path: err_path,
134 }) = err
135 else {
136 return None;
137 };
138 (err_path.segments.last()?.ident == "HandlerResult").then_some(ok)
139}
140
141fn unsupported_source(expr: &Expr) -> syn::Error {
142 syn::Error::new_spanned(
143 expr,
144 "expected a string literal name, `Type::new(..)`, `Type { .. }`, or a builder chain on \
145 one of those - a free function does not expose its type to the macro",
146 )
147}
148
149#[proc_macro_attribute]
178pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
179 let args = parse_macro_input!(attr as SubscriberArgs);
180 let func = parse_macro_input!(item as ItemFn);
181 expand(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
182}
183
184#[proc_macro_attribute]
198pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
199 let func = parse_macro_input!(item as ItemFn);
200 expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
201}
202
203fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
204 if !attr.is_empty() {
205 return Err(syn::Error::new_spanned(
206 attr,
207 "#[ruststream::app] takes no arguments",
208 ));
209 }
210 if let Some(asyncness) = func.sig.asyncness {
211 return Err(syn::Error::new_spanned(
212 asyncness,
213 "#[ruststream::app] requires a synchronous builder returning `RustStream`",
214 ));
215 }
216 if !func.sig.inputs.is_empty() {
217 return Err(syn::Error::new_spanned(
218 &func.sig.inputs,
219 "#[ruststream::app] builder must take no arguments",
220 ));
221 }
222 let name = &func.sig.ident;
223 Ok(quote! {
224 #func
225
226 fn main() -> ::std::process::ExitCode {
227 ::ruststream::runtime::cli::run_main(#name)
228 }
229 }
230 .into())
231}
232
233struct HandlerParts<'a> {
235 vis: &'a syn::Visibility,
236 name: &'a Ident,
237 block: &'a syn::Block,
238 pat: &'a syn::Pat,
239 input_ty: &'a Type,
240 description: TokenStream2,
241 source_ty: TokenStream2,
242 source_expr: TokenStream2,
243 input_schema: TokenStream2,
244 message_meta: TokenStream2,
245 ctx_param: TokenStream2,
246}
247
248fn handler_parts<'a>(args: &SubscriberArgs, func: &'a ItemFn) -> syn::Result<HandlerParts<'a>> {
249 let first = func.sig.inputs.first().ok_or_else(|| {
250 syn::Error::new_spanned(
251 &func.sig,
252 "a #[subscriber] handler must take exactly one message parameter",
253 )
254 })?;
255 let FnArg::Typed(PatType { pat, ty, .. }) = first else {
256 return Err(syn::Error::new_spanned(
257 first,
258 "a #[subscriber] handler cannot take `self`",
259 ));
260 };
261 let Type::Reference(reference) = &**ty else {
262 return Err(syn::Error::new_spanned(
263 ty,
264 "the message parameter must be a reference `&T`",
265 ));
266 };
267 let input_ty = &*reference.elem;
268 let description = doc_description(&func.attrs);
269 let (source_ty, source_expr) = source_tokens(&args.source)?;
270
271 let input_schema = quote! {
275 fn input_schema(&self) -> ::core::option::Option<::std::string::String> {
276 #[allow(unused_imports)]
277 use ::ruststream::__private::NoSchemaProbe as _;
278 ::ruststream::__private::Probe::<#input_ty>::new().schema_json()
279 }
280 };
281
282 let message_meta = quote! {
285 fn message_name(&self) -> ::core::option::Option<&'static str> {
286 #[allow(unused_imports)]
287 use ::ruststream::__private::NoMessageProbe as _;
288 ::ruststream::__private::Probe::<#input_ty>::new().message_name()
289 }
290
291 fn message_description(&self) -> ::core::option::Option<&'static str> {
292 #[allow(unused_imports)]
293 use ::ruststream::__private::NoMessageProbe as _;
294 ::ruststream::__private::Probe::<#input_ty>::new().message_description()
295 }
296 };
297
298 let ctx_param = if let Some(FnArg::Typed(PatType { pat, .. })) = func.sig.inputs.get(1) {
301 quote!(#pat)
302 } else {
303 quote!(_ctx)
304 };
305
306 Ok(HandlerParts {
307 vis: &func.vis,
308 name: &func.sig.ident,
309 block: &func.block,
310 pat,
311 input_ty,
312 description,
313 source_ty,
314 source_expr,
315 input_schema,
316 message_meta,
317 ctx_param,
318 })
319}
320
321fn expand(args: &SubscriberArgs, func: &ItemFn) -> syn::Result<TokenStream> {
322 let parts = handler_parts(args, func)?;
323 let body = if let Some(reply_topic) = &args.publish {
324 expand_publishing(&parts, func, reply_topic)?
325 } else {
326 expand_subscribing(&parts)
327 };
328 Ok(body.into())
329}
330
331fn expand_publishing(
332 parts: &HandlerParts<'_>,
333 func: &ItemFn,
334 reply_topic: &LitStr,
335) -> syn::Result<TokenStream2> {
336 let HandlerParts {
337 vis,
338 name,
339 block,
340 pat,
341 input_ty,
342 description,
343 source_ty,
344 source_expr,
345 input_schema,
346 message_meta,
347 ctx_param,
348 } = parts;
349
350 let declared_ty = match &func.sig.output {
351 ReturnType::Type(_, ty) => &**ty,
352 ReturnType::Default => {
353 return Err(syn::Error::new_spanned(
354 &func.sig,
355 "a publishing handler must return the reply value",
356 ));
357 }
358 };
359 let (reply_ty, call_body) = match publish_result_reply(declared_ty) {
363 Some(reply_ty) => (reply_ty, quote!((async move #block).await)),
364 None => (
365 declared_ty,
366 quote!(::core::result::Result::Ok((async move #block).await)),
367 ),
368 };
369 Ok(quote! {
370 #[allow(non_camel_case_types)]
371 #vis struct #name;
372
373 impl ::ruststream::runtime::PublishingDef for #name {
374 type Input = #input_ty;
375 type Reply = #reply_ty;
376 type Source = #source_ty;
377
378 fn source(&self) -> Self::Source { #source_expr }
379 fn reply_name(&self) -> &str { #reply_topic }
380
381 fn description(&self) -> ::core::option::Option<&str> {
382 #description
383 }
384
385 #input_schema
386
387 #message_meta
388
389 async fn call(
390 &self,
391 #pat: &#input_ty,
392 #ctx_param: &mut ::ruststream::runtime::Context<'_>,
393 ) -> ::core::result::Result<#reply_ty, ::ruststream::runtime::HandlerResult> {
394 #call_body
395 }
396 }
397 })
398}
399
400fn expand_subscribing(parts: &HandlerParts<'_>) -> TokenStream2 {
401 let HandlerParts {
402 vis,
403 name,
404 block,
405 pat,
406 input_ty,
407 description,
408 source_ty,
409 source_expr,
410 input_schema,
411 message_meta,
412 ctx_param,
413 } = parts;
414
415 quote! {
416 #[derive(Clone, Copy)]
417 #[allow(non_camel_case_types)]
418 #vis struct #name;
419
420 impl ::ruststream::runtime::Handler<#input_ty> for #name {
421 async fn handle(
422 &self,
423 #pat: &#input_ty,
424 #ctx_param: &mut ::ruststream::runtime::Context<'_>,
425 ) -> ::ruststream::runtime::HandlerResult {
426 ::ruststream::runtime::IntoHandlerResult::into_handler_result(
427 (async move #block).await,
428 )
429 }
430 }
431
432 impl ::ruststream::runtime::SubscriberDef for #name {
433 type Input = #input_ty;
434 type Handler = Self;
435 type Source = #source_ty;
436
437 fn source(&self) -> Self::Source { #source_expr }
438
439 fn description(&self) -> ::core::option::Option<&str> {
440 #description
441 }
442
443 #input_schema
444
445 #message_meta
446
447 fn into_handler(self) -> Self { self }
448 }
449 }
450}
451
452#[proc_macro_derive(Message)]
462pub fn derive_message(item: TokenStream) -> TokenStream {
463 let input = parse_macro_input!(item as DeriveInput);
464 let name = &input.ident;
465 let name_str = name.to_string();
466 let description = doc_description(&input.attrs);
467 let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
468
469 quote! {
470 impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
471 const NAME: &'static str = #name_str;
472 const DESCRIPTION: ::core::option::Option<&'static str> = #description;
473 }
474 }
475 .into()
476}
477
478fn doc_description(attrs: &[Attribute]) -> TokenStream2 {
480 let lines: Vec<String> = attrs
481 .iter()
482 .filter(|attr| attr.path().is_ident("doc"))
483 .filter_map(|attr| match &attr.meta {
484 Meta::NameValue(nv) => match &nv.value {
485 Expr::Lit(ExprLit {
486 lit: Lit::Str(text),
487 ..
488 }) => Some(text.value().trim().to_owned()),
489 _ => None,
490 },
491 _ => None,
492 })
493 .collect();
494
495 if lines.is_empty() {
496 quote!(::core::option::Option::None)
497 } else {
498 let joined = lines.join("\n");
499 quote!(::core::option::Option::Some(#joined))
500 }
501}