use async_trait::async_trait;
use futures_core::Stream;
use futures_util::StreamExt;
use std::iter::Iterator;
use thiserror::Error;
use uuid::Uuid;
pub trait DCBEventStoreSync {
fn read(
&self,
query: Option<DCBQuery>,
start: Option<u64>,
backwards: bool,
limit: Option<u32>,
subscribe: bool,
) -> DCBResult<Box<dyn DCBReadResponseSync + Send + 'static>>;
fn read_with_head(
&self,
query: Option<DCBQuery>,
start: Option<u64>,
backwards: bool,
limit: Option<u32>,
) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
let mut response = self.read(query, start, backwards, limit, false)?;
response.collect_with_head()
}
fn head(&self) -> DCBResult<Option<u64>>;
fn get_tracking_info(&self, source: &str) -> DCBResult<Option<u64>>;
fn append(
&self,
events: Vec<DCBEvent>,
condition: Option<DCBAppendCondition>,
tracking_info: Option<TrackingInfo>,
) -> DCBResult<u64>;
}
pub trait DCBReadResponseSync: Iterator<Item = DCBResult<DCBSequencedEvent>> + Send {
fn head(&mut self) -> DCBResult<Option<u64>>;
fn collect_with_head(&mut self) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)>;
fn next_batch(&mut self) -> DCBResult<Vec<DCBSequencedEvent>>;
}
#[async_trait]
pub trait DCBEventStoreAsync: Send + Sync {
async fn read<'a>(
&'a self,
query: Option<DCBQuery>,
start: Option<u64>,
backwards: bool,
limit: Option<u32>,
subscribe: bool,
) -> DCBResult<Box<dyn DCBReadResponseAsync + Send + 'static>>;
async fn read_with_head<'a>(
&'a self,
query: Option<DCBQuery>,
after: Option<u64>,
backwards: bool,
limit: Option<u32>,
) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
let mut response = self.read(query, after, backwards, limit, false).await?;
response.collect_with_head().await
}
async fn head(&self) -> DCBResult<Option<u64>>;
async fn get_tracking_info(&self, source: &str) -> DCBResult<Option<u64>>;
async fn append(
&self,
events: Vec<DCBEvent>,
condition: Option<DCBAppendCondition>,
tracking_info: Option<TrackingInfo>,
) -> DCBResult<u64>;
}
#[async_trait]
pub trait DCBReadResponseAsync: Stream<Item = DCBResult<DCBSequencedEvent>> + Send + Unpin {
async fn head(&mut self) -> DCBResult<Option<u64>>;
async fn collect_with_head(&mut self) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
let mut events = Vec::new();
while let Some(result) = self.next().await {
events.push(result?); }
let head = self.head().await?;
Ok((events, head))
}
async fn next_batch(&mut self) -> DCBResult<Vec<DCBSequencedEvent>>;
}
#[derive(Debug, Clone, Default)]
pub struct DCBQueryItem {
pub types: Vec<String>,
pub tags: Vec<String>,
}
impl DCBQueryItem {
pub fn new() -> Self {
Self {
types: vec![],
tags: vec![],
}
}
pub fn types<I, S>(mut self, types: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.types = types.into_iter().map(|s| s.into()).collect();
self
}
pub fn tags<I, S>(mut self, tags: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.tags = tags.into_iter().map(|s| s.into()).collect();
self
}
}
#[derive(Debug, Clone, Default)]
pub struct DCBQuery {
pub items: Vec<DCBQueryItem>,
}
impl DCBQuery {
pub fn new() -> Self {
Self { items: Vec::new() }
}
pub fn with_items<I>(items: I) -> Self
where
I: IntoIterator<Item = DCBQueryItem>,
{
Self {
items: items.into_iter().collect(),
}
}
pub fn item(mut self, item: DCBQueryItem) -> Self {
self.items.push(item);
self
}
pub fn items<I>(mut self, items: I) -> Self
where
I: IntoIterator<Item = DCBQueryItem>,
{
self.items.extend(items);
self
}
}
#[derive(Debug, Clone, Default)]
pub struct DCBAppendCondition {
pub fail_if_events_match: DCBQuery,
pub after: Option<u64>,
}
impl DCBAppendCondition {
pub fn new(fail_if_events_match: DCBQuery) -> Self {
Self {
fail_if_events_match,
after: None,
}
}
pub fn after(mut self, after: Option<u64>) -> Self {
self.after = after;
self
}
}
#[derive(Debug, Clone)]
pub struct DCBEvent {
pub event_type: String,
pub tags: Vec<String>,
pub data: Vec<u8>,
pub uuid: Option<Uuid>,
}
impl Default for DCBEvent {
fn default() -> Self {
Self::new()
}
}
impl DCBEvent {
pub fn new() -> Self {
Self {
event_type: "".to_string(),
data: Vec::new(),
tags: Vec::new(),
uuid: None,
}
}
pub fn event_type<S: Into<String>>(mut self, event_type: S) -> Self {
self.event_type = event_type.into();
self
}
pub fn data<D: Into<Vec<u8>>>(mut self, data: D) -> Self {
self.data = data.into();
self
}
pub fn tags<I, S>(mut self, tags: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.tags = tags.into_iter().map(|s| s.into()).collect();
self
}
pub fn uuid(mut self, uuid: Uuid) -> Self {
self.uuid = Some(uuid);
self
}
}
#[derive(Debug, Clone)]
pub struct TrackingInfo {
pub source: String,
pub position: u64,
}
#[derive(Debug, Clone)]
pub struct DCBSequencedEvent {
pub position: u64,
pub event: DCBEvent,
}
#[derive(Error, Debug)]
pub enum DCBError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Integrity error: condition failed: {0}")]
IntegrityError(String),
#[error("Corruption detected: {0}")]
Corruption(String),
#[error("invalid argument: {0}")]
InvalidArgument(String),
#[error("Initialization error: {0:?}")]
InitializationError(String),
#[error("Page not found: {0:?}")]
PageNotFound(u64),
#[error("Dirty page not found: {0:?}")]
DirtyPageNotFound(u64),
#[error("Root ID mismatched: old {0:?} new {1:?}")]
RootIDMismatch(u64, u64),
#[error("Database corrupted: {0}")]
DatabaseCorrupted(String),
#[error("Internal error: {0}")]
InternalError(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Deserialization error: {0}")]
DeserializationError(String),
#[error("Page already freed: {0:?}")]
PageAlreadyFreed(u64),
#[error("Page already dirty: {0:?}")]
PageAlreadyDirty(u64),
#[error("Transport error: {0}")]
TransportError(String),
#[error("Cancelled by user")]
CancelledByUser(),
#[error("Authentication error: {0}")]
AuthenticationError(String),
}
pub type DCBResult<T> = Result<T, DCBError>;
#[cfg(test)]
mod tests {
use super::*;
struct TestReadResponse {
events: Vec<DCBSequencedEvent>,
current_index: usize,
head_position: Option<u64>,
}
impl TestReadResponse {
fn new(events: Vec<DCBSequencedEvent>, head_position: Option<u64>) -> Self {
Self {
events,
current_index: 0,
head_position,
}
}
}
impl Iterator for TestReadResponse {
type Item = DCBResult<DCBSequencedEvent>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index < self.events.len() {
let event = self.events[self.current_index].clone();
self.current_index += 1;
Some(Ok(event))
} else {
None
}
}
}
impl DCBReadResponseSync for TestReadResponse {
fn head(&mut self) -> DCBResult<Option<u64>> {
Ok(self.head_position)
}
fn collect_with_head(&mut self) -> DCBResult<(Vec<DCBSequencedEvent>, Option<u64>)> {
todo!()
}
fn next_batch(&mut self) -> DCBResult<Vec<DCBSequencedEvent>> {
let mut batch = Vec::new();
while let Some(result) = self.next() {
match result {
Ok(event) => batch.push(event),
Err(err) => {
panic!("{}", err);
}
}
}
Ok(batch)
}
}
#[test]
fn test_dcb_read_response() {
let event1 = DCBEvent {
event_type: "test_event".to_string(),
data: vec![1, 2, 3],
tags: vec!["tag1".to_string(), "tag2".to_string()],
uuid: None,
};
let event2 = DCBEvent {
event_type: "another_event".to_string(),
data: vec![4, 5, 6],
tags: vec!["tag2".to_string(), "tag3".to_string()],
uuid: None,
};
let seq_event1 = DCBSequencedEvent {
event: event1,
position: 1,
};
let seq_event2 = DCBSequencedEvent {
event: event2,
position: 2,
};
let mut response =
TestReadResponse::new(vec![seq_event1.clone(), seq_event2.clone()], Some(2));
assert_eq!(response.head().unwrap(), Some(2));
assert_eq!(response.next().unwrap().unwrap().position, 1);
assert_eq!(response.next().unwrap().unwrap().position, 2);
assert!(response.next().is_none());
}
#[test]
fn test_event_new() {
let event1 = DCBEvent::default()
.event_type("type1")
.data(b"data1")
.tags(["tagX"]);
assert_eq!(event1.event_type, "type1");
assert_eq!(event1.data, b"data1".to_vec());
assert_eq!(event1.tags, vec!["tagX".to_string()]);
assert_eq!(event1.uuid, None);
let event2 = DCBEvent::default()
.event_type("type2")
.data(b"data2")
.tags(["tag1", "tag2", "tag3"]);
assert_eq!(event2.tags.len(), 3);
let event3 = DCBEvent::default().event_type("type3");
assert_eq!(event3.data.len(), 0);
assert_eq!(event3.tags.len(), 0);
let query_item = DCBQueryItem::new()
.types(["type1", "type2"])
.tags(["tagA", "tagB"]);
assert_eq!(query_item.types.len(), 2);
assert_eq!(query_item.tags.len(), 2);
let query = DCBQuery::new().item(query_item);
assert_eq!(query.items.len(), 1);
println!("\nAll builder API tests passed!");
}
}