1use proc_macro::TokenStream;
7use proc_macro2::TokenStream as TokenStream2;
8use quote::quote;
9use syn::parse::{Parse, ParseStream};
10use syn::{
11 Attribute, DeriveInput, Expr, ExprCall, ExprLit, ExprMethodCall, ExprPath, ExprStruct, FnArg,
12 Ident, ItemFn, Lit, LitStr, Meta, PatType, Path, ReturnType, Token, Type, TypePath,
13 parenthesized, parse_macro_input,
14};
15
16struct SubscriberArgs {
20 source: Expr,
21 publish: Option<LitStr>,
22}
23
24impl Parse for SubscriberArgs {
25 fn parse(input: ParseStream) -> syn::Result<Self> {
26 let source: Expr = input.parse()?;
27 let mut publish = None;
28 if input.peek(Token![,]) {
29 input.parse::<Token![,]>()?;
30 let keyword: Ident = input.parse()?;
31 if keyword != "publish" {
32 return Err(syn::Error::new(
33 keyword.span(),
34 "expected `publish(\"reply-topic\")`",
35 ));
36 }
37 let content;
38 parenthesized!(content in input);
39 publish = Some(content.parse()?);
40 }
41 Ok(Self { source, publish })
42 }
43}
44
45fn source_tokens(expr: &Expr) -> syn::Result<(TokenStream2, TokenStream2)> {
54 if let Expr::Lit(ExprLit {
55 lit: Lit::Str(name),
56 ..
57 }) = expr
58 {
59 return Ok((
60 quote!(::ruststream::Name),
61 quote!(::ruststream::Name::new(#name)),
62 ));
63 }
64
65 let ty = source_type(expr)?;
66 Ok((quote!(#ty), quote!(#expr)))
67}
68
69fn source_type(expr: &Expr) -> syn::Result<Type> {
74 match expr {
75 Expr::Call(ExprCall { func, .. }) => match &**func {
76 Expr::Path(ExprPath {
77 path, qself: None, ..
78 }) => type_from_constructor_path(path),
79 _ => Err(unsupported_source(expr)),
80 },
81 Expr::Struct(ExprStruct { path, .. }) => Ok(Type::Path(TypePath {
82 qself: None,
83 path: path.clone(),
84 })),
85 Expr::MethodCall(ExprMethodCall { receiver, .. }) => source_type(receiver),
86 _ => Err(unsupported_source(expr)),
87 }
88}
89
90fn type_from_constructor_path(path: &Path) -> syn::Result<Type> {
92 let n = path.segments.len();
93 if n < 2 {
94 return Err(syn::Error::new_spanned(
95 path,
96 "expected `Type::new(..)`: the path must name a type and an associated constructor",
97 ));
98 }
99 let segments = path.segments.iter().take(n - 1).cloned().collect();
100 Ok(Type::Path(TypePath {
101 qself: None,
102 path: Path {
103 leading_colon: path.leading_colon,
104 segments,
105 },
106 }))
107}
108
109fn unsupported_source(expr: &Expr) -> syn::Error {
110 syn::Error::new_spanned(
111 expr,
112 "expected a string literal name, `Type::new(..)`, `Type { .. }`, or a builder chain on \
113 one of those - a free function does not expose its type to the macro",
114 )
115}
116
117#[proc_macro_attribute]
135pub fn subscriber(attr: TokenStream, item: TokenStream) -> TokenStream {
136 let args = parse_macro_input!(attr as SubscriberArgs);
137 let func = parse_macro_input!(item as ItemFn);
138 expand(&args, &func).unwrap_or_else(|err| err.to_compile_error().into())
139}
140
141#[proc_macro_attribute]
155pub fn app(attr: TokenStream, item: TokenStream) -> TokenStream {
156 let func = parse_macro_input!(item as ItemFn);
157 expand_app(&attr.into(), &func).unwrap_or_else(|err| err.to_compile_error().into())
158}
159
160fn expand_app(attr: &TokenStream2, func: &ItemFn) -> syn::Result<TokenStream> {
161 if !attr.is_empty() {
162 return Err(syn::Error::new_spanned(
163 attr,
164 "#[ruststream::app] takes no arguments",
165 ));
166 }
167 if let Some(asyncness) = func.sig.asyncness {
168 return Err(syn::Error::new_spanned(
169 asyncness,
170 "#[ruststream::app] requires a synchronous builder returning `RustStream`",
171 ));
172 }
173 if !func.sig.inputs.is_empty() {
174 return Err(syn::Error::new_spanned(
175 &func.sig.inputs,
176 "#[ruststream::app] builder must take no arguments",
177 ));
178 }
179 let name = &func.sig.ident;
180 Ok(quote! {
181 #func
182
183 fn main() -> ::std::process::ExitCode {
184 ::ruststream::runtime::cli::run_main(#name)
185 }
186 }
187 .into())
188}
189
190fn expand(args: &SubscriberArgs, func: &ItemFn) -> syn::Result<TokenStream> {
191 let vis = &func.vis;
192 let name = &func.sig.ident;
193 let block = &func.block;
194
195 let first = func.sig.inputs.first().ok_or_else(|| {
196 syn::Error::new_spanned(
197 &func.sig,
198 "a #[subscriber] handler must take exactly one message parameter",
199 )
200 })?;
201 let FnArg::Typed(PatType { pat, ty, .. }) = first else {
202 return Err(syn::Error::new_spanned(
203 first,
204 "a #[subscriber] handler cannot take `self`",
205 ));
206 };
207 let Type::Reference(reference) = &**ty else {
208 return Err(syn::Error::new_spanned(
209 ty,
210 "the message parameter must be a reference `&T`",
211 ));
212 };
213 let input_ty = &reference.elem;
214 let description = doc_description(&func.attrs);
215 let (source_ty, source_expr) = source_tokens(&args.source)?;
216
217 let input_schema = quote! {
221 fn input_schema(&self) -> ::core::option::Option<::std::string::String> {
222 #[allow(unused_imports)]
223 use ::ruststream::__private::NoSchemaProbe as _;
224 ::ruststream::__private::Probe::<#input_ty>::new().schema_json()
225 }
226 };
227
228 let ctx_param = if let Some(FnArg::Typed(PatType { pat, .. })) = func.sig.inputs.get(1) {
231 quote!(#pat)
232 } else {
233 quote!(_ctx)
234 };
235
236 let body = if let Some(reply_topic) = &args.publish {
237 let reply_ty = match &func.sig.output {
238 ReturnType::Type(_, ty) => &**ty,
239 ReturnType::Default => {
240 return Err(syn::Error::new_spanned(
241 &func.sig,
242 "a publishing handler must return the reply value",
243 ));
244 }
245 };
246 quote! {
247 #[allow(non_camel_case_types)]
248 #vis struct #name;
249
250 impl ::ruststream::runtime::PublishingDef for #name {
251 type Input = #input_ty;
252 type Reply = #reply_ty;
253 type Source = #source_ty;
254
255 fn source(&self) -> Self::Source { #source_expr }
256 fn reply_name(&self) -> &str { #reply_topic }
257
258 fn description(&self) -> ::core::option::Option<&str> {
259 #description
260 }
261
262 #input_schema
263
264 fn call(
265 &self,
266 #pat: &#input_ty,
267 ) -> impl ::core::future::Future<Output = #reply_ty> + ::core::marker::Send {
268 async move #block
269 }
270 }
271 }
272 } else {
273 quote! {
274 #[derive(Clone, Copy)]
275 #[allow(non_camel_case_types)]
276 #vis struct #name;
277
278 impl ::ruststream::runtime::Handler<#input_ty> for #name {
279 async fn handle(
280 &self,
281 #pat: &#input_ty,
282 #ctx_param: &mut ::ruststream::runtime::Context<'_>,
283 ) -> ::ruststream::runtime::HandlerResult {
284 ::ruststream::runtime::IntoHandlerResult::into_handler_result(
285 (async move #block).await,
286 )
287 }
288 }
289
290 impl ::ruststream::runtime::SubscriberDef for #name {
291 type Input = #input_ty;
292 type Handler = Self;
293 type Source = #source_ty;
294
295 fn source(&self) -> Self::Source { #source_expr }
296
297 fn description(&self) -> ::core::option::Option<&str> {
298 #description
299 }
300
301 #input_schema
302
303 fn into_handler(self) -> Self { self }
304 }
305 }
306 };
307
308 Ok(body.into())
309}
310
311#[proc_macro_derive(Message)]
321pub fn derive_message(item: TokenStream) -> TokenStream {
322 let input = parse_macro_input!(item as DeriveInput);
323 let name = &input.ident;
324 let name_str = name.to_string();
325 let description = doc_description(&input.attrs);
326 let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
327
328 quote! {
329 impl #impl_generics ::ruststream::Message for #name #ty_generics #where_clause {
330 const NAME: &'static str = #name_str;
331 const DESCRIPTION: ::core::option::Option<&'static str> = #description;
332 }
333 }
334 .into()
335}
336
337fn doc_description(attrs: &[Attribute]) -> TokenStream2 {
339 let lines: Vec<String> = attrs
340 .iter()
341 .filter(|attr| attr.path().is_ident("doc"))
342 .filter_map(|attr| match &attr.meta {
343 Meta::NameValue(nv) => match &nv.value {
344 Expr::Lit(ExprLit {
345 lit: Lit::Str(text),
346 ..
347 }) => Some(text.value().trim().to_owned()),
348 _ => None,
349 },
350 _ => None,
351 })
352 .collect();
353
354 if lines.is_empty() {
355 quote!(::core::option::Option::None)
356 } else {
357 let joined = lines.join("\n");
358 quote!(::core::option::Option::Some(#joined))
359 }
360}