hstreamdb 0.2.2

Rust client library for HStreamDB
Documentation
use hstreamdb_pb::h_stream_api_client::HStreamApiClient;
use hstreamdb_pb::{StreamingFetchRequest, StreamingFetchResponse};
use prost::Message;
use prost_types::Struct;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Streaming};

use crate::client::Client;
use crate::common::{self, Payload};
use crate::utils::decode_received_records;

impl Client {
    pub async fn streaming_fetch(
        &self,
        consumer_name: String,
        subscription_id: String,
    ) -> common::Result<UnboundedReceiverStream<(Payload, Responder)>> {
        let channel = self.lookup_subscription(subscription_id.clone()).await?;
        let mut channel = HStreamApiClient::connect(channel).await?;

        let request = StreamingFetchRequest {
            subscription_id: subscription_id.clone(),
            consumer_name: consumer_name.clone(),
            ack_ids: Vec::new(),
        };
        let (request_sender, request_receiver) =
            tokio::sync::mpsc::unbounded_channel::<StreamingFetchRequest>();
        let request_stream = UnboundedReceiverStream::new(request_receiver);
        let response = channel
            .streaming_fetch(Request::new(request_stream))
            .await?
            .into_inner();
        request_sender
            .send(request)
            .map_err(common::Error::StreamingFetchInitError)?;

        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(Payload, Responder)>();

        _ = tokio::spawn(fetching(
            consumer_name,
            subscription_id,
            request_sender,
            response,
            sender,
        ));

        Ok(UnboundedReceiverStream::new(receiver))
    }
}

async fn fetching(
    consumer_name: String,
    subscription_id: String,
    ack_sender: UnboundedSender<StreamingFetchRequest>,
    mut init_response: Streaming<StreamingFetchResponse>,
    fetch_stream: UnboundedSender<(Payload, Responder)>,
) {
    loop {
        match init_response.message().await {
            Err(err) => {
                log::error!("streaming fetch error: {err}");
                break;
            }
            Ok(message) => match message {
                None => {
                    return;
                }
                Some(message) => process_streaming_fetch_response(
                    consumer_name.clone(),
                    subscription_id.clone(),
                    message,
                    ack_sender.clone(),
                    fetch_stream.clone(),
                ),
            },
        }
    }
}

type AckFn = Box<dyn FnOnce() -> Result<(), SendError<StreamingFetchRequest>> + Send>;

pub struct Responder(AckFn);

impl Responder {
    pub fn ack(self) -> Result<(), SendError<StreamingFetchRequest>> {
        self.0()
    }

    fn new(ack_fn: AckFn) -> Self {
        Responder(ack_fn)
    }
}

fn process_streaming_fetch_response(
    consumer_name: String,
    subscription_id: String,
    message: StreamingFetchResponse,
    ack_sender: UnboundedSender<StreamingFetchRequest>,
    fetch_stream: UnboundedSender<(Payload, Responder)>,
) {
    match message.received_records {
        None => {
            log::warn!("streaming fetch error: failed to unwrap `received_records`");
        }
        Some(received_records) => match decode_received_records(received_records) {
            Err(err) => {
                log::error!("decode received records error: {err}")
            }
            Ok(records) => {
                for (record_id, record) in records {
                    let record = match record.header {
                        None => {
                            log::error!(
                                "process streaming fetch response error: failed to unwrap record header"
                            );
                            return;
                        }
                        Some(header) => match header.flag() {
                            hstreamdb_pb::h_stream_record_header::Flag::Raw => {
                                Payload::RawRecord(record.payload)
                            }
                            hstreamdb_pb::h_stream_record_header::Flag::Json => {
                                match Struct::decode(record.payload.as_slice()) {
                                    Err(err) => {
                                        log::error!("decode HRecord error: {err}");
                                        return;
                                    }
                                    Ok(payload) => Payload::HRecord(payload),
                                }
                            }
                        },
                    };
                    let ack_sender = ack_sender.clone();
                    let subscription_id = subscription_id.clone();
                    let consumer_name = consumer_name.clone();
                    let ack_fn: AckFn = Box::new(move || {
                        ack_sender.send(StreamingFetchRequest {
                            subscription_id,
                            consumer_name,
                            ack_ids: vec![record_id],
                        })
                    });
                    match fetch_stream.send((record, Responder::new(ack_fn))) {
                        Ok(()) => (),
                        Err(err) => {
                            log::error!("send to fetch stream error: {err}")
                        }
                    }
                }
            }
        },
    }
}