frpc-lib 0.1.0

Library for Fluid RPC to dynamically invoke gRPC services
Documentation
use std::str::FromStr;

use anyhow::{bail, Result};
use chrono::Local;
use futures::channel::mpsc::UnboundedSender;
use prost_reflect::{DescriptorPool, DynamicMessage, ReflectMessage};
use regex::Regex;
use serde_json::Deserializer;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tonic::codegen::http::uri::PathAndQuery;
use tonic::{client::Grpc, transport::Channel, Request};
use tonic::{Response, Status};

use crate::misc::error_helper::send_invoker_error;
use crate::stream::fluid_message_received::FluidMessageReceived;
use crate::stream::fluid_stream_event::FluidStreamEvent;
use crate::{
    invoker::invoker_error::InvokerError,
    loader::{file_loader::load_from_files, reflection_loader_v1::load_from_server_reflection},
};

use super::dynamic_message_codec::DynamicMessageCodec;

pub async fn invoke(
    mut tx: UnboundedSender<FluidStreamEvent>,
    server_url: String,
    target_method: String,
    data: Option<String>,
    file_paths: Option<Vec<String>>,
    include_paths: Option<Vec<String>>,
    token: CancellationToken,
) -> Result<()> {
    if (file_paths.is_some() && include_paths.is_none())
        || (file_paths.is_none() && include_paths.is_some())
    {
        bail!(send_invoker_error(
            &mut tx,
            InvokerError::InvalidArguments(String::from(
                "Must supply Proto File Paths and Include File Paths to load protos from files.",
            ))
        ));
    }

    let pool = match if file_paths.is_some() && include_paths.is_some() {
        load_from_files(file_paths.unwrap(), include_paths.unwrap())
    } else {
        load_from_server_reflection(server_url.clone()).await
    } {
        Ok(p) => p,
        Err(e) => bail!(send_invoker_error(
            &mut tx,
            InvokerError::DescriptorPoolLoadFailed(e.to_string())
        )),
    };

    invoke_with_pool(tx, pool, server_url, target_method, data, token).await
}

pub async fn invoke_with_pool(
    mut tx: UnboundedSender<FluidStreamEvent>,
    pool: DescriptorPool,
    server_url: String,
    target_method: String,
    data: Option<String>,
    token: CancellationToken,
) -> Result<()> {
    let re = Regex::new(r"^(\/)?(?<servicePath>\w+(\.\w+)+)\/(?<methodName>\w+)$").unwrap();
    let caps = re.captures(&target_method);

    let service_path;
    let method_name;

    match caps {
        Some(cap) => {
            service_path = match cap.name("servicePath") {
                Some(val) => val.as_str().to_string(),
                None => bail!(send_invoker_error(
                    &mut tx,
                    InvokerError::InvalidTargetMethod(target_method)
                )),
            };
            method_name = match cap.name("methodName") {
                Some(val) => val.as_str().to_string(),
                None => bail!(send_invoker_error(
                    &mut tx,
                    InvokerError::InvalidTargetMethod(target_method)
                )),
            }
        }
        None => bail!(send_invoker_error(
            &mut tx,
            InvokerError::InvalidTargetMethod(target_method)
        )),
    }

    let service = match pool.get_service_by_name(&service_path) {
        Some(s) => s,
        None => bail!(send_invoker_error(
            &mut tx,
            InvokerError::ServiceNotFound(service_path)
        )),
    };

    let method = match service.methods().find(|m| m.name().eq(&method_name)) {
        Some(m) => m,
        None => bail!(send_invoker_error(
            &mut tx,
            InvokerError::MethodNotFound(method_name)
        )),
    };

    tx.unbounded_send(FluidStreamEvent::InitiatingConnection(Local::now()))?;

    let endpoint = match Channel::from_shared(server_url.clone()) {
        Ok(c) => c,
        Err(e) => bail!(send_invoker_error(
            &mut tx,
            InvokerError::CreateConnectionFailed(e.to_string())
        )),
    };

    let channel = match endpoint.connect().await {
        Ok(c) => c,
        Err(e) => bail!(send_invoker_error(
            &mut tx,
            InvokerError::CreateConnectionFailed(e.to_string())
        )),
    };

    let mut client = Grpc::new(channel);

    client.ready().await?;

    tx.unbounded_send(FluidStreamEvent::Connected(Local::now()))?;

    let data = &data.unwrap_or(String::from("{}"));

    let mut deserializer = Deserializer::from_str(data);
    let message = match DynamicMessage::deserialize(method.clone().input(), &mut deserializer) {
        Ok(message) => message,
        Err(e) => bail!(send_invoker_error(
            &mut tx,
            InvokerError::DynamicMessageDecodingFailed(e.to_string())
        )),
    };
    deserializer.end()?;

    let request = Request::new(message.clone());
    let path = PathAndQuery::from_str(&format!("/{}/{}", &service_path, &method_name)).unwrap();

    let mut deserializer = Deserializer::from_str("{}");
    let output_message = DynamicMessage::deserialize(method.clone().output(), &mut deserializer)?;
    deserializer.end()?;

    match method.clone().is_server_streaming() {
        true => {
            let stream = client
                .server_streaming(
                    request,
                    path,
                    DynamicMessageCodec::new(output_message.descriptor()),
                )
                .await;

            let mut streaming = stream?.into_inner();

            loop {
                tokio::select! {
                    result = streaming.next() => {
                         match result {
                            Some(Ok(message)) => {
                                let _ = tx.unbounded_send(FluidStreamEvent::StreamingMessageReceived(
                                    FluidMessageReceived::new(message),
                                ));
                            }
                            Some(Err(e)) => bail!(send_invoker_error(
                                &mut tx,
                                InvokerError::StreamingError(e.to_string())
                            )),
                            None => break,
                        };
                    },
                    _ = token.cancelled() => {
                        break;
                    },
                }
            }
        }
        false => {
            let response: Result<Response<DynamicMessage>, Status> = client
                .unary(
                    request,
                    path,
                    DynamicMessageCodec::new(output_message.descriptor()),
                )
                .await;

            match response {
                Ok(response) => {
                    let _ = tx.unbounded_send(FluidStreamEvent::UnaryMessageReceived(
                        FluidMessageReceived::new(response.into_inner()),
                    ));
                }
                Err(e) => bail!(send_invoker_error(
                    &mut tx,
                    InvokerError::UnaryError(e.to_string())
                )),
            }
        }
    };

    Ok(())
}