iotics-grpc-client 4.0.0

IOTICS gRPC client
Documentation
use anyhow::Context;
use std::sync::Arc;
use tokio::sync::mpsc;
use tonic::transport::Channel;

use crate::client::iotics::api::input_api_client::InputApiClient;
use crate::client::iotics::api::{
    delete_input_request, describe_input_request, receive_input_message_request,
    DeleteInputResponse, DescribeInputResponse,
};

pub use crate::client::iotics::api::{
    DeleteInputRequest, DescribeInputRequest, Headers, InputId, ReceiveInputMessageRequest,
    ReceiveInputMessageResponse,
};

use crate::auth_builder::IntoAuthBuilder;
use crate::channel::create_channel;
use crate::helpers::generate_client_app_id;

pub async fn receive_input_messages(
    auth_builder: Arc<impl IntoAuthBuilder>,
    twin_id: &str,
    input_id: &str,
) -> Result<mpsc::Receiver<Result<Vec<u8>, anyhow::Error>>, anyhow::Error> {
    let channel = create_channel(auth_builder.clone(), None, None, None).await?;
    receive_input_messages_with_channel(auth_builder, channel, twin_id, input_id).await
}

pub async fn receive_input_messages_with_channel(
    auth_builder: Arc<impl IntoAuthBuilder>,
    channel: Channel,
    twin_id: &str,
    input_id: &str,
) -> Result<mpsc::Receiver<Result<Vec<u8>, anyhow::Error>>, anyhow::Error> {
    let mut client = InputApiClient::new(channel);
    let client_app_id = generate_client_app_id();
    let transaction_ref = vec![client_app_id.clone()];

    let headers = Headers {
        client_app_id,
        transaction_ref,
        ..Default::default()
    };

    let mut request = tonic::Request::new(ReceiveInputMessageRequest {
        headers: Some(headers),
        args: Some(receive_input_message_request::Arguments {
            input_id: Some(InputId {
                id: input_id.to_string(),
                twin_id: twin_id.to_string(),
                ..Default::default()
            }),
        }),
    });

    let token = auth_builder.get_token()?;

    request.metadata_mut().append(
        "authorization",
        token.parse().context("parse token failed")?,
    );

    let (tx, rx) = mpsc::channel::<Result<Vec<u8>, anyhow::Error>>(16384);

    let fut = async move {
        let stream = client.receive_input_messages(request).await;

        match stream {
            Ok(mut stream) => {
                let stream = stream.get_mut();

                while let Ok(Some(response)) = stream.message().await {
                    match response.payload {
                        Some(payload) => {
                            match payload.message {
                                Some(message) => {
                                    if tx.send(Ok(message.data)).await.is_err() {
                                        break;
                                    }
                                }
                                None => {
                                    if tx
                                        .send(Err(anyhow::anyhow!("Empty input payload")))
                                        .await
                                        .is_err()
                                    {
                                        break;
                                    }
                                }
                            };
                        }
                        None => {
                            if tx
                                .send(Err(anyhow::anyhow!("Empty input response")))
                                .await
                                .is_err()
                            {
                                break;
                            }
                        }
                    };
                }
            }
            Err(e) => {
                let _ = tx.send(Err(e.into())).await;
            }
        }
    };

    tokio::spawn(fut);
    Ok(rx)
}

pub async fn describe_input(
    auth_builder: Arc<impl IntoAuthBuilder>,
    twin_id: &str,
    input_id: &str,
    remote_host_id: Option<&str>,
) -> Result<DescribeInputResponse, anyhow::Error> {
    let channel = create_channel(auth_builder.clone(), None, None, None).await?;
    describe_input_with_channel(auth_builder, channel, twin_id, input_id, remote_host_id).await
}

pub async fn describe_input_with_channel(
    auth_builder: Arc<impl IntoAuthBuilder>,
    channel: Channel,
    twin_id: &str,
    input_id: &str,
    remote_host_id: Option<&str>,
) -> Result<DescribeInputResponse, anyhow::Error> {
    let mut client = InputApiClient::new(channel);
    let client_app_id = generate_client_app_id();
    let transaction_ref = vec![client_app_id.clone()];

    let headers = Headers {
        client_app_id,
        transaction_ref: transaction_ref.clone(),
        ..Default::default()
    };

    let mut request = tonic::Request::new(DescribeInputRequest {
        headers: Some(headers),
        args: Some(describe_input_request::Arguments {
            input_id: Some(InputId {
                id: input_id.to_string(),
                twin_id: twin_id.to_string(),
                host_id: remote_host_id.unwrap_or_default().to_string(),
            }),
        }),
    });

    let token = auth_builder.get_token()?;

    request.metadata_mut().append(
        "authorization",
        token.parse().context("parse token failed")?,
    );

    let result = client.describe_input(request).await.with_context(|| {
        format!(
            "Describing input failed, transaction ref [{}]",
            transaction_ref.join(", ")
        )
    })?;
    let result = result.into_inner();

    Ok(result)
}

pub async fn delete_input(
    auth_builder: Arc<impl IntoAuthBuilder>,
    twin_id: &str,
    input_id: &str,
) -> Result<DeleteInputResponse, anyhow::Error> {
    let channel = create_channel(auth_builder.clone(), None, None, None).await?;
    delete_input_with_client(auth_builder, channel, twin_id, input_id).await
}

pub async fn delete_input_with_client(
    auth_builder: Arc<impl IntoAuthBuilder>,
    channel: Channel,
    twin_id: &str,
    input_id: &str,
) -> Result<DeleteInputResponse, anyhow::Error> {
    let mut client = InputApiClient::new(channel);
    let client_app_id = generate_client_app_id();
    let transaction_ref = vec![client_app_id.clone()];

    let headers = Headers {
        client_app_id,
        transaction_ref: transaction_ref.clone(),
        ..Default::default()
    };

    let mut request = tonic::Request::new(DeleteInputRequest {
        headers: Some(headers),
        args: Some(delete_input_request::Arguments {
            input_id: Some(InputId {
                id: input_id.to_string(),
                twin_id: twin_id.to_string(),
                ..Default::default()
            }),
        }),
    });

    let token = auth_builder.get_token()?;

    request.metadata_mut().append(
        "authorization",
        token.parse().context("parse token failed")?,
    );

    let result = client.delete_input(request).await.with_context(|| {
        format!(
            "Deleting input failed, transaction ref [{}]",
            transaction_ref.join(", ")
        )
    })?;
    let result = result.into_inner();

    Ok(result)
}