arcon_codegen 0.1.4

Code generation for Arcon
use crate::types::to_token_stream;
use proc_macro2::{Ident, Span, TokenStream};
use spec::{SocketKind, Source, SourceKind};
use crate::common::*;

pub fn source(name: &str, target: &str, source: &Source, spec_id: &String, ts_extractor: u32) -> TokenStream {
    let source_name = Ident::new(&name, Span::call_site());
    let target = Ident::new(&target, Span::call_site());
    let input_type = to_token_stream(&source.source_type, spec_id);

    let source_stream = match &source.kind {
        SourceKind::Socket { addr, kind } => socket_source(
            &source_name,
            &target,
            &input_type,
            &addr,
            &kind,
            *&source.rate,
            ts_extractor,
        ),
        SourceKind::LocalFile { path } => {
            local_file_source(&source_name, &target, &input_type, &path, *&source.rate)
        }
    };

    source_stream
}

fn socket_source(
    source_name: &Ident,
    target: &Ident,
    input_type: &TokenStream,
    addr: &str,
    kind: &SocketKind,
    rate: u64,
    ts_extraction: u32,
) -> TokenStream {
    let verify = verify_and_start(source_name, "system");

    let sock_kind = {
        match kind {
            SocketKind::Tcp => quote! { SocketKind::Tcp },
            SocketKind::Udp => quote! { SocketKind::Udp },
        }
    };

    let ts_quote = quote! { Some(#ts_extraction) };

    quote! {
        let channel = Channel::Local(#target.actor_ref());
        let channel_strategy: Box<ChannelStrategy<#input_type>> = Box::new(Forward::new(channel));
        let (#source_name, reg) = system.create_and_register(move || {
            let sock_addr = #addr.parse().expect("Failed to parse SocketAddr");
            let source: SocketSource<#input_type> = SocketSource::new(sock_addr, #sock_kind, channel_strategy, #rate, #ts_quote);
            source
        });

        #verify
    }
}

fn local_file_source(
    source_name: &Ident,
    target: &Ident,
    input_type: &TokenStream,
    file_path: &str,
    rate: u64,
) -> TokenStream {
    let verify = verify_and_start(source_name, "system");

    quote! {
        let channel = Channel::Local(#target.actor_ref());
        let channel_strategy: Box<ChannelStrategy<#input_type>> = Box::new(Forward::new(channel));
        let (#source_name, reg) = system.create_and_register(move || {
            let source: LocalFileSource<#input_type> = LocalFileSource::new(
                String::from(#file_path),
                channel_strategy,
                #rate,
            );
            source
        });

        #verify
    }
}