bookkeeper-client 0.2.1

Async rust client for Apache BookKeeper
Documentation
use std::collections::HashMap;

use async_trait::async_trait;
use compact_str::CompactString;
use either::Either;
use prost::Message;

use crate::client::errors::{BkError, BkResult, ErrorKind};
use crate::client::metadata::{BookieId, HasLedgerMetadata, LedgerId, LedgerMetadata};
use crate::proto::*;

#[derive(Clone, Debug)]
pub struct Versioned<T> {
    pub value: T,
    pub version: MetaVersion,
}

impl<T> std::ops::Deref for Versioned<T> {
    type Target = T;

    fn deref(&self) -> &T {
        &self.value
    }
}

impl<T> Versioned<T>
where
    T: Clone + std::fmt::Debug,
{
    pub fn new<V: Into<MetaVersion>>(version: V, value: T) -> Versioned<T> {
        Versioned { version: version.into(), value }
    }
}

impl HasLedgerMetadata for Versioned<LedgerMetadata> {
    fn metadata(&self) -> &LedgerMetadata {
        &self.value
    }
}

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct MetaVersion(pub(crate) i64);

impl From<i64> for MetaVersion {
    fn from(u: i64) -> Self {
        MetaVersion(u)
    }
}

impl From<MetaVersion> for i64 {
    fn from(version: MetaVersion) -> i64 {
        version.0
    }
}

#[derive(Clone)]
pub struct BookieServiceInfo {
    pub bookie_id: BookieId,
    pub properties: HashMap<String, String>,
    pub endpoints: Vec<BookieEndpoint>,
}

impl BookieServiceInfo {
    pub fn rpc_endpoint(&self) -> Option<&BookieEndpoint> {
        self.endpoint_for_protocol("bookie-rpc")
    }

    pub fn endpoint_for_protocol(&self, protocol: &str) -> Option<&BookieEndpoint> {
        return self.endpoints.iter().find(|bookie| bookie.protocol == protocol);
    }

    pub fn from_legacy(bookie_id: BookieId) -> Result<BookieServiceInfo, BkError> {
        let parts: Vec<&str> = bookie_id.as_str().split(':').collect();
        if parts.len() != 2 {
            let err =
                BkError::with_message(ErrorKind::MetaInvalidData, format!("invalid legacy bookie id {}", bookie_id));
            return Err(err);
        }
        let port: u16 = match parts[1].parse() {
            Err(_) => {
                let err = BkError::with_message(
                    ErrorKind::MetaUnexpectedResponse,
                    format!("invalid legacy bookie id {}", bookie_id),
                );
                return Err(err);
            },
            Ok(port) => port,
        };
        let host = parts[0].into();
        Ok(BookieServiceInfo {
            bookie_id,
            endpoints: vec![BookieEndpoint { id: "bookie".into(), host, port, protocol: "bookie-rpc".into() }],
            properties: HashMap::new(),
        })
    }

    pub fn from_protobuf(bookie_id: BookieId, bytes: &[u8]) -> Result<BookieServiceInfo, BkError> {
        let bookie_info_pb = match BookieServiceInfoFormat::decode(bytes) {
            Err(_) => {
                let err = BkError::with_description(ErrorKind::MetaInvalidData, &"invalid bookie info format");
                return Err(err);
            },
            Ok(bookie) => bookie,
        };
        Ok(BookieServiceInfo {
            bookie_id,
            endpoints: bookie_info_pb.endpoints.into_iter().map(|endpoint| endpoint.into()).collect(),
            properties: bookie_info_pb.properties,
        })
    }
}

#[derive(Clone)]
pub struct BookieEndpoint {
    pub id: CompactString,
    pub host: CompactString,
    pub port: u16,
    pub protocol: CompactString,
}

impl From<bookie_service_info_format::Endpoint> for BookieEndpoint {
    fn from(endpoint: bookie_service_info_format::Endpoint) -> BookieEndpoint {
        BookieEndpoint {
            id: endpoint.id.into(),
            host: endpoint.host.into(),
            port: endpoint.port as u16,
            protocol: endpoint.protocol.into(),
        }
    }
}

pub enum BookieUpdate {
    Add(BookieServiceInfo),
    Remove(BookieId),
    Reconstruction(Vec<BookieServiceInfo>),
}

#[async_trait]
pub trait BookieUpdateStream: Send {
    async fn next(&mut self) -> BkResult<BookieUpdate>;
}

#[async_trait]
pub trait BookieRegistrationClient {
    async fn watch_readable_bookies(
        &mut self,
    ) -> Result<(Vec<BookieServiceInfo>, Box<dyn BookieUpdateStream>), BkError>;

    async fn watch_writable_bookies(
        &mut self,
    ) -> Result<(Vec<BookieServiceInfo>, Box<dyn BookieUpdateStream>), BkError>;
}

#[async_trait]
pub trait LedgerIdStoreClient {
    async fn generate_ledger_id(&self) -> Result<LedgerId, BkError>;
}

#[async_trait]
pub trait LedgerMetadataStream: Send {
    async fn cancel(&mut self);

    async fn next(&mut self) -> Result<Versioned<LedgerMetadata>, BkError>;
}

#[async_trait]
pub trait LedgerMetadataStoreClient {
    async fn create_ledger_metadata(&self, metadata: &LedgerMetadata) -> Result<MetaVersion, BkError>;

    async fn remove_ledger_metadata(
        &self,
        ledger_id: LedgerId,
        expected_version: Option<MetaVersion>,
    ) -> Result<(), BkError>;

    async fn read_ledger_metadata(&self, ledger_id: LedgerId) -> Result<Versioned<LedgerMetadata>, BkError>;

    async fn watch_ledger_metadata(
        &self,
        ledger_id: LedgerId,
        start_version: MetaVersion,
    ) -> BkResult<Box<dyn LedgerMetadataStream>>;

    async fn write_ledger_metadata(
        &self,
        metadata: &LedgerMetadata,
        expected_version: MetaVersion,
    ) -> Result<Either<Versioned<LedgerMetadata>, MetaVersion>, BkError>;
}

#[async_trait]
pub trait MetaStore: LedgerMetadataStoreClient + LedgerIdStoreClient + BookieRegistrationClient + Send + Sync {}