use std::ops::Deref;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use crate::versioning::Versioned;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Select {
All,
From(u32),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Expected {
Any,
Exact(u32),
}
pub type EventStream<'a, S> = BoxStream<
'a,
Result<
Persisted<<S as EventStore>::SourceId, <S as EventStore>::Event>,
<S as EventStore>::Error,
>,
>;
pub trait AppendError: std::error::Error {
fn is_conflict_error(&self) -> bool;
}
impl AppendError for std::convert::Infallible {
fn is_conflict_error(&self) -> bool {
false
}
}
pub trait EventStore {
type SourceId: Eq;
type Event;
type Error: AppendError;
fn append(
&mut self,
source_id: Self::SourceId,
version: Expected,
events: Vec<Self::Event>,
) -> BoxFuture<Result<u32, Self::Error>>;
fn stream(
&self,
source_id: Self::SourceId,
select: Select,
) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
fn stream_all(&self, select: Select) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
fn remove(&mut self, source_id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Persisted<SourceId, T> {
source_id: SourceId,
version: u32,
sequence_number: u32,
#[cfg_attr(feature = "serde", serde(flatten))]
event: T,
}
impl<SourceId, T> Versioned for Persisted<SourceId, T> {
#[inline]
fn version(&self) -> u32 {
self.version
}
}
impl<SourceId, T> Deref for Persisted<SourceId, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.event
}
}
impl<SourceId, T> Persisted<SourceId, T> {
#[inline]
pub fn from(source_id: SourceId, event: T) -> persistent::EventBuilder<SourceId, T> {
persistent::EventBuilder { source_id, event }
}
#[inline]
pub fn sequence_number(&self) -> u32 {
self.sequence_number
}
#[inline]
pub fn source_id(&self) -> &SourceId {
&self.source_id
}
#[inline]
pub fn take(self) -> T {
self.event
}
}
pub mod persistent {
pub struct EventBuilder<SourceId, T> {
pub(super) event: T,
pub(super) source_id: SourceId,
}
impl<SourceId, T> From<(SourceId, T)> for EventBuilder<SourceId, T> {
#[inline]
fn from(value: (SourceId, T)) -> Self {
let (source_id, event) = value;
Self { source_id, event }
}
}
impl<SourceId, T> EventBuilder<SourceId, T> {
#[inline]
pub fn version(self, value: u32) -> EventBuilderWithVersion<SourceId, T> {
EventBuilderWithVersion {
version: value,
event: self.event,
source_id: self.source_id,
}
}
#[inline]
pub fn sequence_number(self, value: u32) -> EventBuilderWithSequenceNumber<SourceId, T> {
EventBuilderWithSequenceNumber {
sequence_number: value,
event: self.event,
source_id: self.source_id,
}
}
}
pub struct EventBuilderWithVersion<SourceId, T> {
version: u32,
event: T,
source_id: SourceId,
}
impl<SourceId, T> EventBuilderWithVersion<SourceId, T> {
#[inline]
pub fn sequence_number(self, value: u32) -> super::Persisted<SourceId, T> {
super::Persisted {
version: self.version,
event: self.event,
source_id: self.source_id,
sequence_number: value,
}
}
}
pub struct EventBuilderWithSequenceNumber<SourceId, T> {
sequence_number: u32,
event: T,
source_id: SourceId,
}
impl<SourceId, T> EventBuilderWithSequenceNumber<SourceId, T> {
#[inline]
pub fn version(self, value: u32) -> super::Persisted<SourceId, T> {
super::Persisted {
version: value,
event: self.event,
source_id: self.source_id,
sequence_number: self.sequence_number,
}
}
}
}