kurrentdb 1.0.0-alpha.4

Official KurrentDB gRPC client
Documentation
use crate::{EventData, Position, StreamState};
use tokio::sync::{
    mpsc::{UnboundedReceiver, UnboundedSender},
    oneshot,
};
use tracing::{debug, error, warn};

#[derive(Debug)]
pub(crate) struct In {
    req: Req,
    sender: oneshot::Sender<crate::Result<BatchWriteResult>>,
}

#[derive(Debug)]
pub(crate) struct Req {
    pub(crate) id: uuid::Uuid,
    pub(crate) stream_name: String,
    pub(crate) events: Vec<EventData>,
    pub(crate) expected_revision: StreamState,
}

#[derive(Debug)]
pub(crate) struct Out {
    pub(crate) correlation_id: uuid::Uuid,
    pub(crate) result: crate::Result<BatchWriteResult>,
}

#[derive(Debug)]
pub(crate) enum BatchMsg {
    In(In),
    Out(Out),
    Error(crate::Error),
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct BatchWriteResult {
    stream_name: String,
    current_revision: Option<u64>,
    current_position: Option<Position>,
    stream_state: Option<StreamState>,
}

impl BatchWriteResult {
    pub fn new(
        stream_name: String,
        current_revision: Option<u64>,
        current_position: Option<Position>,
        stream_state: Option<StreamState>,
    ) -> Self {
        Self {
            stream_name,
            current_position,
            current_revision,
            stream_state,
        }
    }

    pub fn stream_name(&self) -> &str {
        self.stream_name.as_str()
    }

    pub fn current_revision(&self) -> Option<u64> {
        self.current_revision
    }

    pub fn current_position(&self) -> Option<Position> {
        self.current_position
    }

    pub fn stream_state(&self) -> Option<StreamState> {
        self.stream_state
    }
}

pub struct BatchAppendClient {
    sender: UnboundedSender<BatchMsg>,
}

impl BatchAppendClient {
    pub(crate) fn new(
        sender: UnboundedSender<BatchMsg>,
        mut receiver: UnboundedReceiver<BatchMsg>,
        forward: UnboundedSender<Req>,
    ) -> Self {
        tokio::spawn(async move {
            let mut reg = std::collections::HashMap::<
                uuid::Uuid,
                oneshot::Sender<crate::Result<BatchWriteResult>>,
            >::new();
            while let Some(msg) = receiver.recv().await {
                match msg {
                    BatchMsg::In(msg) => {
                        let correlation_id = msg.req.id;
                        if forward.send(msg.req).is_ok() {
                            reg.insert(correlation_id, msg.sender);
                            debug!("Send batch-append request {}", correlation_id);

                            continue;
                        }

                        error!("Batch-append session has been closed");
                        break;
                    }

                    BatchMsg::Out(resp) => {
                        if let Some(entry) = reg.remove(&resp.correlation_id) {
                            let failed = resp.result.is_err();
                            let _ = entry.send(resp.result);

                            if failed {
                                break;
                            }

                            continue;
                        }

                        warn!(
                            "Unknown batch-append response correlation id: {}",
                            resp.correlation_id
                        );
                    }

                    BatchMsg::Error(e) => {
                        for (_, resp_sender) in reg {
                            let _ = resp_sender.send(Err(e.clone()));
                        }

                        break;
                    }
                }
            }
        });

        Self { sender }
    }

    pub async fn append_to_stream<S: AsRef<str>>(
        &self,
        stream_name: S,
        stream_state: StreamState,
        events: Vec<EventData>,
    ) -> crate::Result<BatchWriteResult> {
        let (sender, receiver) = oneshot::channel();
        let req = Req {
            id: uuid::Uuid::new_v4(),
            stream_name: stream_name.as_ref().to_string(),
            events,
            expected_revision: stream_state,
        };

        let req = In { sender, req };

        if let Err(e) = self.sender.send(BatchMsg::In(req)) {
            error!("[sending-end] Batch-append stream is closed: {}", e);

            let status = tonic::Status::cancelled("Batch-append stream has been closed");
            return Err(crate::Error::ServerError(status.to_string()));
        }

        receiver.await.unwrap_or_else(|e| {
            error!("[receiving-end] Batch-append stream is closed: {}", e);

            let status = tonic::Status::cancelled("Batch-append stream has been closed");

            Err(crate::Error::ServerError(status.to_string()))
        })
    }
}