arcon_macros 0.2.2

Macros for Arcon
Documentation
use proc_macro::TokenStream;
use syn::DeriveInput;

pub fn derive_arrow(input: TokenStream) -> TokenStream {
    let input: DeriveInput = syn::parse(input).unwrap();
    let name = &input.ident;

    if let syn::Data::Struct(ref s) = input.data {
        let mut arrow_types = Vec::new();
        let mut builders = Vec::new();

        if let syn::Fields::Named(ref fields_named) = s.fields {
            for (field_pos, field) in fields_named.named.iter().enumerate() {
                let ident = field.ident.clone();
                let ty = &field.ty;
                let arrow_quote = quote! { ::arcon::Field::new(stringify!(#ident), <#ty as ToArrow>::arrow_type(), false), };
                arrow_types.push(arrow_quote);

                let builder_quote = quote! {
                    {
                        let value = self.#ident;
                        match builder.field_builder::<<#ty as ToArrow>::Builder>(#field_pos) {
                            Some(b) => b.append_value(value)?,
                            None => return Err(::arcon::ArrowError::SchemaError(format!("Failed to downcast Arrow Builder"))),
                        }
                    }
                };
                builders.push(builder_quote);
            }

            let timestamp_pos = builders.len();

            // Add nullable timestamp field
            let timestamp_quote =
                quote! { ::arcon::Field::new("_timestamp", ::arcon::DataType::UInt64, true), };
            arrow_types.push(timestamp_quote);

            // builder quote for the last timestamp column.
            // assumes there is an timestamp: Option<u64> in scope
            let builder_quote = quote! {
                match builder.field_builder::<::arcon::UInt64Builder>(#timestamp_pos) {
                    Some(b) =>  {
                        match timestamp {
                            Some(ts) => b.append_value(ts)?,
                            None => b.append_null()?,
                        }
                    }
                    None => return Err(::arcon::ArrowError::SchemaError(format!("Failed to downcast Arrow Builder"))),
                }
            };
            builders.push(builder_quote);
        } else {
            panic!("#[derive(Arrow)] requires named fields");
        }

        let generics = &input.generics;
        let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();

        let fields: proc_macro2::TokenStream = {
            quote! {
                vec![#(#arrow_types)*]
            }
        };

        let output: proc_macro2::TokenStream = {
            quote! {
                impl #impl_generics ::arcon::ToArrow for #name #ty_generics #where_clause {
                    type Builder = ::arcon::StructBuilder;

                    fn arrow_type() -> ::arcon::DataType {
                        ::arcon::DataType::Struct(#fields)
                    }
                    fn schema() -> ::arcon::Schema {
                        ::arcon::Schema::new(#fields)
                    }
                    fn append(self, builder: &mut ::arcon::StructBuilder, timestamp: Option<u64>) -> Result<(), ::arcon::ArrowError> {
                        #(#builders)*
                        Ok(())
                    }
                    fn table() -> ::arcon::MutableTable {
                        let builder = ::arcon::StructBuilder::from_fields(#fields, ::arcon::RECORD_BATCH_SIZE);
                        let table_name = stringify!(#name).to_lowercase();
                        ::arcon::MutableTable::new(::arcon::RecordBatchBuilder::new(table_name, Self::schema(), builder))
                    }
                }
            }
        };

        proc_macro::TokenStream::from(output)
    } else {
        panic!("#[derive(Arrow)] only works for structs");
    }
}