hstreamdb 0.2.2

Rust client library for HStreamDB
Documentation
use std::io;

pub use hstreamdb_pb::{CompressionType, Consumer, RecordId, SpecialOffset, Stream};
use hstreamdb_pb::{HStreamRecord, StreamingFetchRequest};
use num_bigint::ParseBigIntError;
use prost::{DecodeError, Message};
pub use prost_types::{ListValue, Struct, Timestamp};
use tonic::transport;

use crate::producer;

#[derive(Debug)]
pub struct Subscription {
    pub subscription_id: String,
    pub stream_name: String,
    pub ack_timeout_seconds: i32,
    pub max_unacked_records: i32,
    pub offset: SpecialOffset,
    pub creation_time: Option<Timestamp>,
}

#[derive(Debug, Clone)]
pub enum StreamShardOffset {
    Special(SpecialOffset),
    Normal(RecordId),
}

impl From<StreamShardOffset> for hstreamdb_pb::ShardOffset {
    fn from(stream_shard_offset: StreamShardOffset) -> Self {
        let offset = match stream_shard_offset {
            StreamShardOffset::Special(offset) => {
                hstreamdb_pb::shard_offset::Offset::SpecialOffset(offset.into())
            }
            StreamShardOffset::Normal(offset) => {
                hstreamdb_pb::shard_offset::Offset::RecordOffset(offset)
            }
        };
        hstreamdb_pb::ShardOffset {
            offset: Some(offset),
        }
    }
}

impl From<Subscription> for hstreamdb_pb::Subscription {
    fn from(subscription: Subscription) -> Self {
        hstreamdb_pb::Subscription {
            subscription_id: subscription.subscription_id,
            stream_name: subscription.stream_name,
            ack_timeout_seconds: subscription.ack_timeout_seconds,
            max_unacked_records: subscription.max_unacked_records,
            offset: subscription.offset as _,
            creation_time: subscription.creation_time,
        }
    }
}

impl From<hstreamdb_pb::Subscription> for Subscription {
    fn from(subscription: hstreamdb_pb::Subscription) -> Self {
        let offset = subscription.offset();
        Subscription {
            subscription_id: subscription.subscription_id,
            stream_name: subscription.stream_name,
            ack_timeout_seconds: subscription.ack_timeout_seconds,
            max_unacked_records: subscription.max_unacked_records,
            offset,
            creation_time: subscription.creation_time,
        }
    }
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error(transparent)]
    TransportError(transport::Error),
    #[error(transparent)]
    GrpcStatusError(tonic::Status),
    #[error(transparent)]
    CompressError(io::Error),
    #[error(transparent)]
    DecompressError(io::Error),
    #[error(transparent)]
    ParseUrlError(url::ParseError),
    #[error(transparent)]
    PartitionKeyError(PartitionKeyError),
    #[error(transparent)]
    StreamingFetchInitError(tokio::sync::mpsc::error::SendError<StreamingFetchRequest>),
    #[error("failed to unwrap `{0}`")]
    PBUnwrapError(String),
    #[error(transparent)]
    PBDecodeError(DecodeError),
    #[error("no channel is available")]
    NoChannelAvailable,
    #[error("bad argument: {0}")]
    BadArgument(String),
    #[error(transparent)]
    AppenderSendError(producer::SendError),
    #[error("the URL {0} is cannot-be-a-base, or does not have a host, or has the file scheme")]
    InvalidUrl(String),
}

#[derive(Debug, thiserror::Error)]
pub enum PartitionKeyError {
    #[error(transparent)]
    ParseBigIntError(ParseBigIntError),
    #[error("No match for the partition key")]
    NoMatch,
}

impl From<transport::Error> for Error {
    fn from(err: transport::Error) -> Self {
        Error::TransportError(err)
    }
}

impl From<tonic::Status> for Error {
    fn from(err: tonic::Status) -> Self {
        Error::GrpcStatusError(err)
    }
}

impl From<url::ParseError> for Error {
    fn from(err: url::ParseError) -> Self {
        Error::ParseUrlError(err)
    }
}

impl From<producer::SendError> for Error {
    fn from(err: producer::SendError) -> Self {
        Error::AppenderSendError(err)
    }
}

pub type Result<A> = std::result::Result<A, Error>;

#[derive(Debug, Clone)]
pub struct Record {
    pub partition_key: PartitionKey,
    pub payload: Payload,
}

#[derive(Debug, Clone)]
pub enum Payload {
    HRecord(prost_types::Struct),
    RawRecord(Vec<u8>),
}

impl TryFrom<HStreamRecord> for Payload {
    type Error = DecodeError;

    fn try_from(record: HStreamRecord) -> std::result::Result<Self, Self::Error> {
        Ok(match record.header.unwrap().flag() {
            hstreamdb_pb::h_stream_record_header::Flag::Json => {
                Self::HRecord(Struct::decode(record.payload.as_slice())?)
            }
            hstreamdb_pb::h_stream_record_header::Flag::Raw => Self::RawRecord(record.payload),
        })
    }
}

impl Payload {
    pub(crate) fn encoded_len(&self) -> usize {
        match self {
            Payload::HRecord(x) => x.encoded_len(),
            Payload::RawRecord(x) => x.len(),
        }
    }
}

impl Record {
    pub(crate) fn encoded_len(&self) -> usize {
        self.payload.encoded_len()
    }
}

impl From<Vec<u8>> for Payload {
    fn from(payload: Vec<u8>) -> Self {
        Self::RawRecord(payload)
    }
}

impl From<prost_types::Struct> for Payload {
    fn from(payload: prost_types::Struct) -> Self {
        Self::HRecord(payload)
    }
}

pub type ShardId = u64;
pub type PartitionKey = String;