iotics-grpc-client 4.0.0

IOTICS gRPC client
Documentation
use anyhow::Context;
use std::sync::Arc;
use std::time::SystemTime;
use tonic::transport::Channel;
use tonic::Streaming;

use crate::client::google::protobuf::{BoolValue, Timestamp};
use crate::client::iotics::api::fetch_interest_request::Arguments as FetchInterestArguments;
use crate::client::iotics::api::interest_api_client::InterestApiClient;
use crate::client::iotics::api::{InputInterest, InputMessage, Interest};

pub use crate::client::iotics::api::send_input_message_request::{
    Arguments as SendMessageArguments, Payload,
};
pub use crate::client::iotics::api::{
    FeedId, FetchInterestRequest, FetchInterestResponse, Headers, InputId, SendInputMessageRequest,
    SendInputMessageResponse, TwinId,
};

use crate::auth_builder::IntoAuthBuilder;
use crate::channel::create_channel;
use crate::helpers::generate_client_app_id;

pub async fn follow(
    auth_builder: Arc<impl IntoAuthBuilder>,
    followed_host_id: Option<&str>,
    followed_twin_id: &str,
    followed_feed_id: &str,
    follower_twin_id: &str,
    fetch_last_stored: bool,
) -> Result<Streaming<FetchInterestResponse>, anyhow::Error> {
    let channel = create_channel(auth_builder.clone(), None, None, None).await?;
    follow_with_channel(
        auth_builder,
        channel,
        followed_host_id,
        followed_twin_id,
        followed_feed_id,
        follower_twin_id,
        fetch_last_stored,
    )
    .await
}

pub async fn follow_with_channel(
    auth_builder: Arc<impl IntoAuthBuilder>,
    channel: Channel,
    followed_host_id: Option<&str>,
    followed_twin_id: &str,
    followed_feed_id: &str,
    follower_twin_id: &str,
    fetch_last_stored: bool,
) -> Result<Streaming<FetchInterestResponse>, anyhow::Error> {
    let mut client = InterestApiClient::new(channel);
    let client_app_id = generate_client_app_id();
    let transaction_ref = vec![client_app_id.clone()];

    let headers = Headers {
        client_app_id,
        transaction_ref: transaction_ref.clone(),
        ..Default::default()
    };

    let follower_twin_id_arg = TwinId {
        id: follower_twin_id.to_string(),
        ..Default::default()
    };

    let interest = Interest {
        follower_twin_id: Some(follower_twin_id_arg),
        followed_feed_id: Some(FeedId {
            id: followed_feed_id.to_string(),
            twin_id: followed_twin_id.to_string(),
            host_id: followed_host_id.unwrap_or_default().to_string(),
        }),
    };
    let mut request = tonic::Request::new(FetchInterestRequest {
        headers: Some(headers),
        args: Some(FetchInterestArguments {
            interest: Some(interest),
        }),
        fetch_last_stored: Some(BoolValue {
            value: fetch_last_stored,
        }),
    });

    let token = auth_builder.get_token()?;

    request.metadata_mut().append(
        "authorization",
        token.parse().context("parse token failed")?,
    );

    let stream = client
        .fetch_interests(request)
        .await
        .with_context(|| {
            format!(
                "Fetching interests failed, transaction ref [{}]",
                transaction_ref.join(", ")
            )
        })?
        .into_inner();

    Ok(stream)
}

pub async fn send_input_message<T: Into<Vec<u8>>>(
    auth_builder: Arc<impl IntoAuthBuilder>,
    receiver_host_id: Option<&str>,
    receiver_twin_id: &str,
    input_id: &str,
    sender_twin_id: &str,
    data: T,
) -> Result<(), anyhow::Error> {
    let channel = create_channel(auth_builder.clone(), None, None, None).await?;
    send_input_message_with_channel(
        auth_builder,
        channel,
        receiver_host_id,
        receiver_twin_id,
        input_id,
        sender_twin_id,
        data,
    )
    .await
}

#[allow(clippy::too_many_arguments)]
pub async fn send_input_message_with_channel<T: Into<Vec<u8>>>(
    auth_builder: Arc<impl IntoAuthBuilder>,
    channel: Channel,
    receiver_host_id: Option<&str>,
    receiver_twin_id: &str,
    input_id: &str,
    sender_twin_id: &str,
    data: T,
) -> Result<(), anyhow::Error> {
    let mut client = InterestApiClient::new(channel);
    let client_app_id = generate_client_app_id();
    let transaction_ref = vec![client_app_id.clone()];

    let headers = Headers {
        client_app_id,
        transaction_ref: transaction_ref.clone(),
        ..Default::default()
    };

    let sender_twin_id = TwinId {
        id: sender_twin_id.to_string(),
        ..Default::default()
    };

    let dest_input_id = InputId {
        id: input_id.to_string(),
        twin_id: receiver_twin_id.to_string(),
        host_id: receiver_host_id.unwrap_or_default().to_string(),
    };

    let interest = InputInterest {
        dest_input_id: Some(dest_input_id),
        sender_twin_id: Some(sender_twin_id),
    };

    let args = SendMessageArguments {
        interest: Some(interest),
    };

    let dtm = std::time::SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?;
    let timestamp = Timestamp {
        seconds: dtm.as_secs() as i64,
        nanos: 0,
    };

    let payload = Payload {
        message: Some(InputMessage {
            occurred_at: Some(timestamp),
            mime: "application/json".to_string(),
            data: data.into(),
        }),
    };

    let mut request = tonic::Request::new(SendInputMessageRequest {
        headers: Some(headers.clone()),
        args: Some(args.clone()),
        payload: Some(payload.clone()),
    });

    let token = auth_builder.get_token()?;

    request.metadata_mut().append(
        "authorization",
        token.parse().context("parse token failed")?,
    );
    if let Err(e) = client.send_input_message(request).await {
        return Err(anyhow::Error::new(e).context(format!(
            "Sending message failed, transaction ref [{}]",
            transaction_ref.join(", ")
        )));
    }

    Ok(())
}