#![deny(warnings)]
use proc_macro::TokenStream;
use proc_macro2;
use quote::quote;
use syn::{parse, parse_macro_input, Data, DeriveInput, Token, Type};
struct ProcessorArguments
{
pub input_type: Type,
pub output_type: Type,
}
impl parse::Parse for ProcessorArguments
{
fn parse(input: parse::ParseStream) -> syn::Result<Self>
{
let input_type: Type = input.parse()?;
let _: Token![,] = input.parse()?;
let output_type: Type = input.parse()?;
Ok(ProcessorArguments {
input_type,
output_type,
})
}
}
#[proc_macro_derive(Processor, attributes(streams))]
pub fn processor(input: TokenStream) -> TokenStream
{
let input = parse_macro_input!(input as DeriveInput);
let name = input.ident;
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
let mut processor_arguments: Option<ProcessorArguments> = None;
for attr in input.attrs
{
if attr.path().is_ident("streams")
{
processor_arguments = Some(attr.parse_args().unwrap());
}
}
let processor_arguments = processor_arguments.expect("Missing 'streams' attribute.");
let input_type = processor_arguments.input_type;
let output_type = processor_arguments.output_type;
let out = quote! {
impl #impl_generics processor::ProcessorStreams for #name #ty_generics #where_clause
{
type Inputs = #input_type;
type Outputs = #output_type;
type InputStreams = <#input_type as processor::Inputs>::Streams;
type OutputStreams = <#output_type as processor::Outputs>::Streams;
}
};
out.into()
}
fn prepare_stream_fields(data: &Data) -> (Vec<&proc_macro2::Ident>, Vec<&syn::Type>)
{
match data
{
Data::Struct(s) =>
{
let fields_names_iter = s.fields.iter().map(|field| field.ident.as_ref().unwrap());
let fields_types_iter = s.fields.iter().map(|field| &field.ty);
(fields_names_iter.collect(), fields_types_iter.collect())
}
_ => panic!("Only supported for structures"),
}
}
#[proc_macro_derive(InputStreams)]
pub fn input_streams(input: TokenStream) -> TokenStream
{
let input = parse_macro_input!(input as DeriveInput);
let input_visibility = input.vis;
let name = input.ident;
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
let name_streams =
proc_macro2::Ident::new(&format!("{}Streams", name), proc_macro2::Span::call_site());
let (fields_names, fields_types) = prepare_stream_fields(&input.data);
let out = quote! {
#input_visibility struct #name_streams #ty_generics #where_clause
{
#(
#fields_names : async_broadcast::Receiver<#fields_types>,
)*
}
impl #impl_generics processor::Inputs for #name #ty_generics #where_clause
{
type Streams = #name_streams #ty_generics;
}
impl #impl_generics processor::InputStreams for #name_streams #ty_generics #where_clause
{
type Inputs = #name #ty_generics;
fn next(&mut self) -> impl std::future::Future<Output = Result<Self::Inputs>>
{
async {
Ok(Self::Inputs {
#(
#fields_names : self.#fields_names.recv().await?,
)*
})
}
}
}
};
out.into()
}
#[proc_macro_derive(OutputStreams)]
pub fn output_streams(input: TokenStream) -> TokenStream
{
let input = parse_macro_input!(input as DeriveInput);
let input_visibility = input.vis;
let name = input.ident;
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
let name_streams =
proc_macro2::Ident::new(&format!("{}Streams", name), proc_macro2::Span::call_site());
let (fields_names, fields_types) = prepare_stream_fields(&input.data);
let out = quote! {
#input_visibility struct #name_streams #ty_generics #where_clause
{
#(
#fields_names : async_broadcast::Sender<#fields_types>,
)*
}
impl #impl_generics processor::Outputs for #name #ty_generics #where_clause
{
type Streams = #name_streams #ty_generics;
}
impl #impl_generics processor::OutputStreams for #name_streams #ty_generics #where_clause
{
type Outputs = #name #ty_generics;
fn is_full(&self) -> bool
{
#(self. #fields_names .is_full() ) ||*
}
fn next(&mut self, outputs: Self::Outputs)
-> impl std::future::Future<Output = crate::Result<()>>
{
async move {
#(self. #fields_names .broadcast(outputs. #fields_names).await?; )*
Ok(())
}
}
}
};
out.into()
}