#![allow(clippy::large_enum_variant)]
use std::cmp::Ordering;
use std::collections::HashMap;
use std::time::Duration;
use crate::private::Sealed;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::Stream;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use thiserror::Error;
use tonic::Status;
use uuid::Uuid;
#[derive(Copy, Clone, Debug)]
pub enum Retry {
Indefinitely,
Only(usize),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Credentials {
#[serde(
serialize_with = "serialize_creds_bytes",
deserialize_with = "deserialize_creds_bytes"
)]
pub(crate) login: Bytes,
#[serde(
serialize_with = "serialize_creds_bytes",
deserialize_with = "deserialize_creds_bytes"
)]
pub(crate) password: Bytes,
}
impl Credentials {
pub fn new<S>(login: S, password: S) -> Credentials
where
S: Into<Bytes>,
{
Credentials {
login: login.into(),
password: password.into(),
}
}
}
struct CredsVisitor;
impl<'de> Visitor<'de> for CredsVisitor {
type Value = Bytes;
fn expecting(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ASCII string")
}
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v.to_string().into())
}
fn visit_borrowed_str<E>(self, v: &'de str) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v.to_string().into())
}
fn visit_string<E>(self, v: String) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(v.into())
}
}
fn serialize_creds_bytes<S>(value: &Bytes, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_bytes(value.as_ref())
}
fn deserialize_creds_bytes<'de, D>(deserializer: D) -> std::result::Result<Bytes, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(CredsVisitor)
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum ExpectedRevision {
Any,
StreamExists,
NoStream,
Exact(u64),
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct Position {
pub commit: u64,
pub prepare: u64,
}
impl Position {
pub fn start() -> Self {
Position {
commit: 0,
prepare: 0,
}
}
pub fn end() -> Self {
Position {
commit: u64::MAX,
prepare: u64::MAX,
}
}
}
impl PartialOrd for Position {
fn partial_cmp(&self, other: &Position) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Position {
fn cmp(&self, other: &Position) -> Ordering {
self.commit
.cmp(&other.commit)
.then(self.prepare.cmp(&other.prepare))
}
}
#[derive(Debug)]
pub struct WriteResult {
pub next_expected_version: u64,
pub position: Position,
}
#[derive(Debug, Clone, Copy)]
pub enum StreamPosition<A> {
Start,
End,
Point(A),
}
#[derive(Debug)]
pub enum ReadEventStatus<A> {
NotFound,
NoStream,
Deleted,
Success(A),
}
#[derive(Debug)]
pub struct ReadEventResult {
pub stream_id: String,
pub event_number: i64,
pub event: ResolvedEvent,
}
#[derive(Debug)]
pub struct RecordedEvent {
pub stream_id: String,
pub id: Uuid,
pub revision: u64,
pub event_type: String,
pub data: Bytes,
pub metadata: HashMap<String, String>,
pub custom_metadata: Bytes,
pub is_json: bool,
pub position: Position,
}
impl RecordedEvent {
pub fn as_json<'a, T>(&'a self) -> serde_json::Result<T>
where
T: Deserialize<'a>,
{
serde_json::from_slice(&self.data[..])
}
}
#[derive(Debug)]
pub struct ResolvedEvent {
pub event: Option<RecordedEvent>,
pub link: Option<RecordedEvent>,
pub commit_position: Option<u64>,
}
impl ResolvedEvent {
pub fn is_resolved(&self) -> bool {
self.event.is_some() && self.link.is_some()
}
pub fn get_original_event(&self) -> &RecordedEvent {
self.link.as_ref().unwrap_or_else(|| {
self.event
.as_ref()
.expect("[get_original_event] Not supposed to happen!")
})
}
pub fn get_original_stream_id(&self) -> &str {
&self.get_original_event().stream_id
}
}
#[derive(Debug, Clone)]
pub enum StreamMetadataResult {
Deleted { stream: String },
NotFound { stream: String },
Success(Box<VersionedMetadata>),
}
#[derive(Debug, Clone)]
pub struct VersionedMetadata {
pub stream: String,
pub version: i64,
pub metadata: StreamMetadata,
}
#[derive(Copy, Clone, Debug)]
pub(crate) enum ReadDirection {
Forward,
Backward,
}
#[derive(Debug, Clone)]
pub enum ReadStreamError {
NoStream(String),
StreamDeleted(String),
NotModified(String),
Error(String),
AccessDenied(String),
}
#[derive(Debug, Clone)]
pub enum ReadStreamStatus<A> {
Success(A),
Error(ReadStreamError),
}
#[derive(Clone)]
pub struct EventData {
pub(crate) payload: Bytes,
pub(crate) id_opt: Option<Uuid>,
pub(crate) metadata: HashMap<String, String>,
pub(crate) custom_metadata: Option<Bytes>,
}
impl EventData {
pub fn json<S, P>(event_type: S, payload: P) -> serde_json::Result<EventData>
where
P: Serialize,
S: AsRef<str>,
{
let payload = Bytes::from(serde_json::to_vec(&payload)?);
let mut metadata = HashMap::new();
metadata.insert("type".to_owned(), event_type.as_ref().to_owned());
metadata.insert("content-type".to_owned(), "application/json".to_owned());
Ok(EventData {
payload,
id_opt: None,
metadata,
custom_metadata: None,
})
}
pub fn binary<S>(event_type: S, payload: Bytes) -> Self
where
S: AsRef<str>,
{
let mut metadata = HashMap::new();
metadata.insert("type".to_owned(), event_type.as_ref().to_owned());
metadata.insert(
"content-type".to_owned(),
"application/octet-stream".to_owned(),
);
EventData {
payload,
id_opt: None,
metadata,
custom_metadata: None,
}
}
pub fn id(self, value: Uuid) -> Self {
EventData {
id_opt: Some(value),
..self
}
}
pub fn metadata_as_json<P>(self, payload: P) -> serde_json::Result<EventData>
where
P: Serialize,
{
let custom_metadata = Some(Bytes::from(serde_json::to_vec(&payload)?));
Ok(EventData {
custom_metadata,
..self
})
}
pub fn metadata(self, payload: Bytes) -> EventData {
EventData {
custom_metadata: Some(payload),
..self
}
}
}
#[derive(Default)]
pub struct StreamMetadataBuilder {
max_count: Option<u64>,
max_age: Option<Duration>,
truncate_before: Option<u64>,
cache_control: Option<Duration>,
acl: Option<StreamAcl>,
properties: HashMap<String, serde_json::Value>,
}
impl StreamMetadataBuilder {
pub fn new() -> StreamMetadataBuilder {
Default::default()
}
pub fn max_count(self, value: u64) -> StreamMetadataBuilder {
StreamMetadataBuilder {
max_count: Some(value),
..self
}
}
pub fn max_age(self, value: Duration) -> StreamMetadataBuilder {
StreamMetadataBuilder {
max_age: Some(value),
..self
}
}
pub fn truncate_before(self, value: u64) -> StreamMetadataBuilder {
StreamMetadataBuilder {
truncate_before: Some(value),
..self
}
}
pub fn cache_control(self, value: Duration) -> StreamMetadataBuilder {
StreamMetadataBuilder {
cache_control: Some(value),
..self
}
}
pub fn acl(self, value: StreamAcl) -> StreamMetadataBuilder {
StreamMetadataBuilder {
acl: Some(value),
..self
}
}
pub fn insert_custom_property<V>(mut self, key: String, value: V) -> StreamMetadataBuilder
where
V: Serialize,
{
let serialized = serde_json::to_value(value).unwrap();
let _ = self.properties.insert(key, serialized);
self
}
pub fn build(self) -> StreamMetadata {
StreamMetadata {
max_count: self.max_count,
max_age: self.max_age,
truncate_before: self.truncate_before,
cache_control: self.cache_control,
acl: self.acl.unwrap_or_default(),
custom_properties: self.properties,
}
}
}
#[derive(Debug, Default, Clone)]
pub struct StreamMetadata {
pub max_count: Option<u64>,
pub max_age: Option<Duration>,
pub truncate_before: Option<u64>,
pub cache_control: Option<Duration>,
pub acl: StreamAcl,
pub custom_properties: HashMap<String, serde_json::Value>,
}
impl StreamMetadata {
pub fn builder() -> StreamMetadataBuilder {
StreamMetadataBuilder::new()
}
}
#[derive(Default, Debug, Clone)]
pub struct StreamAcl {
pub read_roles: Option<Vec<String>>,
pub write_roles: Option<Vec<String>>,
pub delete_roles: Option<Vec<String>>,
pub meta_read_roles: Option<Vec<String>>,
pub meta_write_roles: Option<Vec<String>>,
}
pub struct PersistentSubRead {
pub(crate) inner: Box<dyn Stream<Item = PersistentSubEvent> + Send + Unpin>,
}
impl PersistentSubRead {
pub fn into_inner(self) -> Box<dyn Stream<Item = PersistentSubEvent> + Send + Unpin> {
self.inner
}
pub async fn read_next(&mut self) -> Option<PersistentSubEvent> {
use futures::stream::StreamExt;
self.inner.next().await
}
}
#[derive(Debug)]
pub enum SubEvent {
Confirmed(String),
EventAppeared(ResolvedEvent),
Checkpoint(Position),
}
#[derive(Debug)]
pub struct PersistentSubEvent {
pub inner: ResolvedEvent,
pub retry_count: usize,
}
#[derive(Debug, PartialEq, Eq)]
pub enum NakAction {
Unknown,
Park,
Retry,
Skip,
Stop,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub enum SystemConsumerStrategy {
DispatchToSingle,
RoundRobin,
Pinned,
}
#[derive(Debug, Clone, Copy)]
pub struct PersistentSubscriptionSettings {
pub resolve_link_tos: bool,
pub revision: u64,
pub extra_stats: bool,
pub message_timeout: Duration,
pub max_retry_count: i32,
pub live_buffer_size: i32,
pub read_batch_size: i32,
pub history_buffer_size: i32,
pub checkpoint_after: Duration,
pub min_checkpoint_count: i32,
pub max_checkpoint_count: i32,
pub max_subscriber_count: i32,
pub named_consumer_strategy: SystemConsumerStrategy,
}
impl PersistentSubscriptionSettings {
pub fn default() -> PersistentSubscriptionSettings {
PersistentSubscriptionSettings {
resolve_link_tos: false,
revision: 0,
extra_stats: false,
message_timeout: Duration::from_secs(30),
max_retry_count: 10,
live_buffer_size: 500,
read_batch_size: 20,
history_buffer_size: 500,
checkpoint_after: Duration::from_secs(2),
min_checkpoint_count: 10,
max_checkpoint_count: 1_000,
max_subscriber_count: 0,
named_consumer_strategy: SystemConsumerStrategy::RoundRobin,
}
}
}
impl Default for PersistentSubscriptionSettings {
fn default() -> PersistentSubscriptionSettings {
PersistentSubscriptionSettings::default()
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum PersistActionResult {
Success,
Failure(PersistActionError),
}
impl PersistActionResult {
pub fn is_success(&self) -> bool {
matches!(*self, PersistActionResult::Success)
}
pub fn is_failure(&self) -> bool {
!self.is_success()
}
}
#[derive(Debug, Eq, PartialEq)]
pub enum PersistActionError {
Fail,
AlreadyExists,
DoesNotExist,
AccessDenied,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub enum NodePreference {
Leader,
Follower,
Random,
ReadOnlyReplica,
}
impl Default for NodePreference {
fn default() -> Self {
NodePreference::Random
}
}
impl std::fmt::Display for NodePreference {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
use self::NodePreference::*;
match self {
Leader => write!(f, "Leader"),
Follower => write!(f, "Follower"),
Random => write!(f, "Random"),
ReadOnlyReplica => write!(f, "ReadOnlyReplica"),
}
}
}
#[derive(Debug)]
pub(crate) enum Either<A, B> {
Left(A),
Right(B),
}
impl<A, B> Either<A, B> {
pub(crate) fn as_ref(&self) -> Either<&A, &B> {
match self {
Either::Left(a) => Either::Left(&a),
Either::Right(b) => Either::Right(&b),
}
}
}
#[derive(Debug)]
pub(crate) struct DnsClusterSettings {
pub(crate) endpoint: Endpoint,
}
#[derive(PartialEq, Eq, Serialize, Deserialize, Debug, Clone, Copy)]
pub(crate) enum LookupType {
#[serde(rename = "a")]
LookupA,
#[serde(rename = "srv")]
LookupSRV,
}
impl Default for LookupType {
fn default() -> Self {
LookupType::LookupA
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum CurrentRevision {
Current(u64),
NoStream,
}
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
pub struct WrongExpectedVersion {
pub current: CurrentRevision,
pub expected: ExpectedRevision,
}
impl std::fmt::Display for WrongExpectedVersion {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"WrongExpectedVersion: expected: {:?}, got: {:?}",
self.expected, self.current
)
}
}
impl std::error::Error for WrongExpectedVersion {}
#[derive(Debug, Clone, Eq, Ord, PartialOrd, PartialEq, Serialize, Deserialize)]
pub struct Endpoint {
pub host: String,
pub port: u32,
}
#[derive(Error, Debug)]
pub enum Error {
#[error("Server-side error.")]
ServerError,
#[error("You tried to execute a command that requires a leader node on a follower node. New leader: ")]
NotLeaderException(Endpoint),
#[error("Connection is closed.")]
ConnectionClosed,
#[error("Unmapped gRPC error: {0}.")]
Grpc(Status),
#[error("gRPC connection error: {0}")]
GrpcConnectionError(GrpcConnectionError),
}
impl Error {
pub fn from_grpc(status: Status) -> Self {
match status.code() {
tonic::Code::Unavailable => Error::ServerError,
_ => {
let metadata = status.metadata();
if let Some("not-leader") = metadata.get("exception").and_then(|e| e.to_str().ok())
{
let endpoint = metadata
.get("leader-endpoint-host")
.zip(metadata.get("leader-endpoint-port"))
.and_then(|(host, port)| {
let host = host.to_str().ok()?;
let port = port.to_str().ok()?;
let host = host.to_string();
let port = port.parse().ok()?;
Some(Endpoint { host, port })
});
if let Some(leader) = endpoint {
return Error::NotLeaderException(leader);
}
}
Error::Grpc(status)
}
}
}
}
#[derive(Error, Debug)]
pub enum GrpcConnectionError {
#[error("Max discovery attempt count reached. count: {0}")]
MaxDiscoveryAttemptReached(usize),
#[error("Unmapped gRPC connection error: {0}.")]
Grpc(Status),
}
pub type Result<A> = std::result::Result<A, Error>;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ReadResult<A> {
Ok(A),
StreamNotFound(String),
}
impl<A> ReadResult<A> {
pub fn map<B, F>(self, f: F) -> ReadResult<B>
where
F: FnOnce(A) -> B,
{
match self {
ReadResult::Ok(a) => ReadResult::Ok(f(a)),
ReadResult::StreamNotFound(s) => ReadResult::StreamNotFound(s),
}
}
pub fn ok(self) -> Option<A> {
match self {
ReadResult::Ok(a) => Some(a),
ReadResult::StreamNotFound(_) => None,
}
}
pub const fn is_ok(&self) -> bool {
matches!(*self, ReadResult::Ok(_))
}
pub const fn is_not_found(&self) -> bool {
!self.is_ok()
}
#[inline]
#[track_caller]
pub fn unwrap(self) -> A {
match self {
ReadResult::Ok(a) => a,
ReadResult::StreamNotFound(s) => panic!(
"called `ReadResult::unwrap()` on an `StreamNotFound({})` value",
&s
),
}
}
}
#[async_trait]
pub trait ToCount<'a>: Sealed {
type Selection;
fn to_count(&self) -> usize;
async fn select(
self,
stream: BoxStream<'a, crate::Result<ResolvedEvent>>,
) -> crate::Result<Self::Selection>;
}
#[async_trait]
impl<'a> ToCount<'a> for usize {
type Selection = BoxStream<'a, crate::Result<ResolvedEvent>>;
fn to_count(&self) -> usize {
*self
}
async fn select(
self,
stream: BoxStream<'a, crate::Result<ResolvedEvent>>,
) -> crate::Result<Self::Selection> {
Ok(stream)
}
}
pub struct All;
#[async_trait]
impl<'a> ToCount<'a> for All {
type Selection = BoxStream<'a, crate::Result<ResolvedEvent>>;
fn to_count(&self) -> usize {
usize::MAX
}
async fn select(
self,
stream: BoxStream<'a, crate::Result<ResolvedEvent>>,
) -> crate::Result<Self::Selection> {
Ok(stream)
}
}
pub struct Single;
#[async_trait]
impl<'a> ToCount<'a> for Single {
type Selection = Option<ResolvedEvent>;
fn to_count(&self) -> usize {
1
}
async fn select(
self,
mut stream: BoxStream<'a, crate::Result<ResolvedEvent>>,
) -> crate::Result<Self::Selection> {
use futures::stream::TryStreamExt;
stream.try_next().await
}
}
#[derive(Debug, Clone)]
pub struct SubscriptionFilter {
pub(crate) based_on_stream: bool,
pub(crate) max: Option<u32>,
pub(crate) regex: Option<String>,
pub(crate) prefixes: Vec<String>,
}
impl SubscriptionFilter {
pub fn on_stream_name() -> Self {
SubscriptionFilter {
based_on_stream: true,
max: None,
regex: None,
prefixes: Vec::new(),
}
}
pub fn on_event_type() -> Self {
let mut temp = SubscriptionFilter::on_stream_name();
temp.based_on_stream = false;
temp
}
pub fn max(self, max: u32) -> Self {
SubscriptionFilter {
max: Some(max),
..self
}
}
pub fn regex<A: AsRef<str>>(self, regex: A) -> Self {
SubscriptionFilter {
regex: Some(regex.as_ref().to_string()),
..self
}
}
pub fn add_prefix<A: AsRef<str>>(mut self, prefix: A) -> Self {
self.prefixes.push(prefix.as_ref().to_string());
self
}
}