rglw_api_codegen 0.0.1

Rustyglware websocket api.
Documentation
use proc_macro2::{Ident, Span, TokenStream};
use quote::quote;
use rglw::codegen_utils::macro_attribute_arguments::MacroAttributeArguments;
use rglw::codegen_utils::macro_attribute_function_arguments::MacroAttributeFunctionArguments;
use syn::ItemFn;

pub fn implementation(attributes: MacroAttributeArguments, input: ItemFn) -> TokenStream {

    let handler_path = attributes.find_string("path").expect("'path' parameter is required!");

    let callee_function = input.sig.ident.clone();

    let callee_name = input.sig.ident.to_string();

    let handler_name = Ident::new(format!("{}_connection_handler", callee_name.clone()).as_str(), Span::call_site());

    let function_arguments = MacroAttributeFunctionArguments::parse(&input);

    let request_type = function_arguments.get_path_segment_by_type_name("RglwApiInputStream");
    let response_type = function_arguments.get_path_segment_by_type_name("RglwApiOutputStream");

    if request_type.is_none() {
        panic!("You must define an input stream with the type RglwApiInputStream for requests");
    }

    if response_type.is_none() {
        panic!("You must define an input stream with the type RglwApiOutputStream for responses");
    }

    let request_type = function_arguments.get_stream_of_argument_by_name("RglwApiInputStream");
    let response_type = function_arguments.get_stream_of_argument_by_name("RglwApiOutputStream");

    if request_type.is_none() {
        panic!("You must define an input stream with the type RglwApiInputStream and some request type for requests");
    }

    if response_type.is_none() {
        panic!("You must define an input stream with the type RglwApiOutputStream and some response type for responses");
    }

    let request_type = request_type.unwrap();
    let request_type_without_generics = function_arguments.get_stream_of_argument_without_generics_by_name("RglwApiInputStream").unwrap();
    let response_type = response_type.unwrap();

    let injectables = function_arguments.get_injectable_arguments();
    
    let expanded = quote! {
        let mut complete_path = format!("{}{}", controller_path.clone(), #handler_path);
        #input
        fn #handler_name(input_pipe: futures::channel::mpsc::UnboundedReceiver<RglwApiRawRequest>, mut output_pipe: futures::channel::mpsc::UnboundedSender<RglwApiRawResponse>) -> futures_util::future::BoxFuture<'static, Result<(), RglwApiError>> {
            use futures::FutureExt;
            use futures_util::sink::SinkExt;
            async move {
                let (response_pipe, on_response) = futures::channel::mpsc::unbounded::<RglwApiResponse<#response_type>>();
                let (request_pipe, on_request) = futures::channel::mpsc::unbounded::<RglwApiRequest<#request_type>>();

                async fn ok_handler() -> Result<(), RglwApiError> {
                    return Ok(());
                }

                async fn process_incoming_request(request: RglwApiRawRequest, mut request_pipe: futures::channel::mpsc::UnboundedSender<RglwApiRequest<#request_type>>) -> () {
                    if request.data.is_none() {
                        let mapped_request: RglwApiRequest<#request_type> = RglwApiRequest {
                            user: request.user,
                            metadata: request.metadata,
                            data:None,
                        };
                        debug!("Processing request without data request on handler...");
                        request_pipe.send(mapped_request).await.unwrap();
                    } else {
                        debug!("Processing request with data request on handler...");
                        let data: tokio_tungstenite::tungstenite::Message = request.data.unwrap();
                        let mapped_request: RglwApiRequest<#request_type> = RglwApiRequest {
                            user: request.user,
                            metadata: request.metadata,
                            data: Some(#request_type_without_generics::from_binary_message(data).unwrap()),
                        };
                        request_pipe.send(mapped_request).await.unwrap();
                    }
                    return ();
                }

                let incoming_messages = input_pipe.for_each(|request| {
                    return process_incoming_request(request, request_pipe.clone());
                });

                async fn process_outgoing_response(response: RglwApiResponse<#response_type>, mut outgoing_pipe: futures::channel::mpsc::UnboundedSender<RglwApiRawResponse>) -> () {
                    if response.data.is_none() {
                        debug!("Processing response without data request on handler...");
                        outgoing_pipe.send(RglwApiRawResponse {
                            metadata: response.metadata,
                            data: None,
                        }).await.unwrap();
                    } else {
                        debug!("Processing response with data request on handler...");
                        let data: #response_type = response.data.unwrap();
                        outgoing_pipe.send(RglwApiRawResponse {
                            metadata: response.metadata,
                            data: Some(data.to_binary_message().unwrap()),
                        }).await.unwrap();
                    }
                    return ();
                }

                let outgoing_messages = on_response.for_each(|response| {
                    return process_outgoing_response(response, output_pipe.clone());
                });
                
                let call_result = #callee_function(
                    RglwApiInputStream::new(on_request),
                    RglwApiOutputStream::<#response_type>::new(response_pipe),
                    #(#injectables)*
                );
                
                futures_util::pin_mut!(incoming_messages, call_result, outgoing_messages);
                tokio::select! {
                    _=incoming_messages=> {},
                    _=call_result=> {},
                    _=outgoing_messages=> {},
                }
                ok_handler().await
            }.boxed()
        }
        let implemented_by = format!("{}::{}", controller_function_name.clone(), #callee_name);
        if routes.contains_key(complete_path.clone().as_str()) {
            let repeated_route = routes.get(complete_path.clone().as_str()).unwrap().clone();
            let repeated_route_implemented_by = repeated_route.implemented_by.clone();
            panic!("Duplicated handler path {} implemented by [{}, {}], you must change it or remove one of both!", complete_path.clone(), repeated_route_implemented_by.clone(), implemented_by.clone());
        } else {
            routes.insert(complete_path.clone(), RglwApiRoute {
                path: complete_path.clone(),
                handler: #handler_name,
                implemented_by: implemented_by.clone()
            });
        }
    };
    expanded
}