1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#[macro_use]
extern crate quote;
#[macro_use]
extern crate syn;

use syn::DeriveInput;

use proc_macro::TokenStream;

#[proc_macro_derive(NamedFunction)]
pub fn derive_named_function(input: TokenStream) -> TokenStream {
    // Parse the input tokens into a syntax tree
    let input = parse_macro_input!(input as DeriveInput);

    // Build the output, possibly using quasi-quotation
    let name = &input.ident;
    let (im, ty, wh) = input.generics.split_for_impl();
    let expanded = quote! {
        impl #im rlink::api::function::NamedFunction for #name #ty #wh {
            fn name(&self) -> &str {
                stringify!(#name)
            }
        }
    };

    // Hand the output tokens back to the compiler
    TokenStream::from(expanded)
}

#[proc_macro_derive(CheckpointFunction)]
pub fn derive_checkpoint_function(input: TokenStream) -> TokenStream {
    // Parse the input tokens into a syntax tree
    let input = parse_macro_input!(input as DeriveInput);

    // Build the output, possibly using quasi-quotation
    let name = &input.ident;
    let (im, ty, wh) = input.generics.split_for_impl();
    let expanded = quote! {
        impl #im rlink::api::checkpoint::CheckpointFunction for #name #ty #wh {}
    };

    // Hand the output tokens back to the compiler
    TokenStream::from(expanded)
}

#[proc_macro_derive(Function)]
pub fn derive_function(input: TokenStream) -> TokenStream {
    // Parse the input tokens into a syntax tree
    let input = parse_macro_input!(input as DeriveInput);

    // Build the output, possibly using quasi-quotation
    let name = &input.ident;
    let (im, ty, wh) = input.generics.split_for_impl();
    let expanded = quote! {
        impl #im rlink::api::function::NamedFunction for #name #ty #wh {
            fn name(&self) -> &str {
                stringify!(#name)
            }
        }

        impl #im rlink::api::checkpoint::CheckpointFunction for #name #ty #wh {}
    };

    // Hand the output tokens back to the compiler
    TokenStream::from(expanded)
}

#[proc_macro_attribute]
#[cfg(not(test))]
pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
    let mut input = syn::parse_macro_input!(item as syn::ItemFn);
    let args = syn::parse_macro_input!(args as syn::AttributeArgs);

    let sig = &mut input.sig; // eg: struct Name, function sig
    let name = &sig.ident;
    let inputs = &sig.inputs;
    let body = &input.block;
    let _attrs = &input.attrs;
    let _vis = input.vis; // eg: pub, pub(crate), etc...

    if sig.asyncness.is_some() {
        let msg = "the async keyword is unsupported from the function declaration";
        return syn::Error::new_spanned(sig.fn_token, msg)
            .to_compile_error()
            .into();
    } else if name != "main" {
        let msg = "the function name must be `main`";
        return syn::Error::new_spanned(&sig.inputs, msg)
            .to_compile_error()
            .into();
    } else if inputs.is_empty() {
        let msg = "the main function accept arguments";
        return syn::Error::new_spanned(&sig.inputs, msg)
            .to_compile_error()
            .into();
    }

    // if args.len() != 1 {
    //     let msg = "the main function accept arguments";
    //     return syn::Error::new_spanned(&args., msg)
    //         .to_compile_error()
    //         .into();
    // }

    let arg0 = &args[0];
    let stream_fn = if let syn::NestedMeta::Meta(syn::Meta::Path(path)) = arg0 {
        let ident = path.get_ident();
        if ident.is_none() {
            let msg = "Must have specified ident";
            return syn::Error::new_spanned(path, msg).to_compile_error().into();
        }

        ident.unwrap()
    } else {
        let msg = "Must have specified ident..";
        return syn::Error::new_spanned(arg0, msg).to_compile_error().into();
    };

    let result = quote! {
        #[derive(Clone, Debug)]
        pub struct GenStreamJob {}

        impl rlink::api::env::StreamJob for GenStreamJob {
            fn prepare_properties(&self, properties: &mut Properties) {
                #body
            }

            fn build_stream(
                &self,
                properties: &Properties,
                env: &StreamExecutionEnvironment,
            ) -> SinkStream {
                #stream_fn(properties, env)
            }
        }

        fn main() {
            rlink::api::env::execute("job_name", GenStreamJob{});
        }
    };

    // println!("{}", result.to_string());
    result.into()
}