use proc_macro2::{Ident, Span, TokenStream};
use quote::quote;
use rglw::codegen_utils::macro_attribute_arguments::MacroAttributeArguments;
use rglw::codegen_utils::macro_attribute_function_arguments::MacroAttributeFunctionArguments;
use syn::ItemFn;
pub fn implementation(attributes: MacroAttributeArguments, input: ItemFn) -> TokenStream {
let handler_path = attributes.find_string("path").expect("'path' parameter is required!");
let callee_function = input.sig.ident.clone();
let callee_name = input.sig.ident.to_string();
let handler_name = Ident::new(format!("{}_connection_handler", callee_name.clone()).as_str(), Span::call_site());
let function_arguments = MacroAttributeFunctionArguments::parse(&input);
let request_type = function_arguments.get_path_segment_by_type_name("RglwApiInputStream");
let response_type = function_arguments.get_path_segment_by_type_name("RglwApiOutputStream");
if request_type.is_none() {
panic!("You must define an input stream with the type RglwApiInputStream for requests");
}
if response_type.is_none() {
panic!("You must define an input stream with the type RglwApiOutputStream for responses");
}
let request_type = function_arguments.get_stream_of_argument_by_name("RglwApiInputStream");
let response_type = function_arguments.get_stream_of_argument_by_name("RglwApiOutputStream");
if request_type.is_none() {
panic!("You must define an input stream with the type RglwApiInputStream and some request type for requests");
}
if response_type.is_none() {
panic!("You must define an input stream with the type RglwApiOutputStream and some response type for responses");
}
let request_type = request_type.unwrap();
let request_type_without_generics = function_arguments.get_stream_of_argument_without_generics_by_name("RglwApiInputStream").unwrap();
let response_type = response_type.unwrap();
let injectables = function_arguments.get_injectable_arguments();
let expanded = quote! {
let mut complete_path = format!("{}{}", controller_path.clone(), #handler_path);
#input
fn #handler_name(input_pipe: futures::channel::mpsc::UnboundedReceiver<RglwApiRawRequest>, mut output_pipe: futures::channel::mpsc::UnboundedSender<RglwApiRawResponse>) -> futures_util::future::BoxFuture<'static, Result<(), RglwApiError>> {
use futures::FutureExt;
use futures_util::sink::SinkExt;
async move {
let (response_pipe, on_response) = futures::channel::mpsc::unbounded::<RglwApiResponse<#response_type>>();
let (request_pipe, on_request) = futures::channel::mpsc::unbounded::<RglwApiRequest<#request_type>>();
async fn ok_handler() -> Result<(), RglwApiError> {
return Ok(());
}
async fn process_incoming_request(request: RglwApiRawRequest, mut request_pipe: futures::channel::mpsc::UnboundedSender<RglwApiRequest<#request_type>>) -> () {
if request.data.is_none() {
let mapped_request: RglwApiRequest<#request_type> = RglwApiRequest {
user: request.user,
metadata: request.metadata,
data:None,
};
debug!("Processing request without data request on handler...");
request_pipe.send(mapped_request).await.unwrap();
} else {
debug!("Processing request with data request on handler...");
let data: tokio_tungstenite::tungstenite::Message = request.data.unwrap();
let mapped_request: RglwApiRequest<#request_type> = RglwApiRequest {
user: request.user,
metadata: request.metadata,
data: Some(#request_type_without_generics::from_binary_message(data).unwrap()),
};
request_pipe.send(mapped_request).await.unwrap();
}
return ();
}
let incoming_messages = input_pipe.for_each(|request| {
return process_incoming_request(request, request_pipe.clone());
});
async fn process_outgoing_response(response: RglwApiResponse<#response_type>, mut outgoing_pipe: futures::channel::mpsc::UnboundedSender<RglwApiRawResponse>) -> () {
if response.data.is_none() {
debug!("Processing response without data request on handler...");
outgoing_pipe.send(RglwApiRawResponse {
metadata: response.metadata,
data: None,
}).await.unwrap();
} else {
debug!("Processing response with data request on handler...");
let data: #response_type = response.data.unwrap();
outgoing_pipe.send(RglwApiRawResponse {
metadata: response.metadata,
data: Some(data.to_binary_message().unwrap()),
}).await.unwrap();
}
return ();
}
let outgoing_messages = on_response.for_each(|response| {
return process_outgoing_response(response, output_pipe.clone());
});
let call_result = #callee_function(
RglwApiInputStream::new(on_request),
RglwApiOutputStream::<#response_type>::new(response_pipe),
#(#injectables)*
);
futures_util::pin_mut!(incoming_messages, call_result, outgoing_messages);
tokio::select! {
_=incoming_messages=> {},
_=call_result=> {},
_=outgoing_messages=> {},
}
ok_handler().await
}.boxed()
}
let implemented_by = format!("{}::{}", controller_function_name.clone(), #callee_name);
if routes.contains_key(complete_path.clone().as_str()) {
let repeated_route = routes.get(complete_path.clone().as_str()).unwrap().clone();
let repeated_route_implemented_by = repeated_route.implemented_by.clone();
panic!("Duplicated handler path {} implemented by [{}, {}], you must change it or remove one of both!", complete_path.clone(), repeated_route_implemented_by.clone(), implemented_by.clone());
} else {
routes.insert(complete_path.clone(), RglwApiRoute {
path: complete_path.clone(),
handler: #handler_name,
implemented_by: implemented_by.clone()
});
}
};
expanded
}