1use async_trait::async_trait;
7use futures_core::Stream;
8use futures_util::StreamExt;
9use std::iter::Iterator;
10use thiserror::Error;
11use uuid::Uuid;
12
13pub trait DcbEventStoreSync {
15 fn read(
23 &self,
24 query: Option<DcbQuery>,
25 start: Option<u64>,
26 backwards: bool,
27 limit: Option<u32>,
28 subscribe: bool, ) -> DcbResult<Box<dyn DcbReadResponseSync + Send + 'static>>;
30
31 fn read_with_head(
33 &self,
34 query: Option<DcbQuery>,
35 start: Option<u64>,
36 backwards: bool,
37 limit: Option<u32>,
38 ) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
39 let mut response = self.read(query, start, backwards, limit, false)?;
40 response.collect_with_head()
41 }
42
43 fn head(&self) -> DcbResult<Option<u64>>;
47
48 fn get_tracking_info(&self, source: &str) -> DcbResult<Option<u64>>;
50
51 fn append(
55 &self,
56 events: Vec<DcbEvent>,
57 condition: Option<DcbAppendCondition>,
58 tracking_info: Option<TrackingInfo>,
59 ) -> DcbResult<u64>;
60}
61
62pub trait DcbReadResponseSync: Iterator<Item = DcbResult<DcbSequencedEvent>> + Send {
64 fn head(&mut self) -> DcbResult<Option<u64>>;
66 fn collect_with_head(&mut self) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)>;
68 fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>>;
73}
74
75pub trait DcbSubscriptionSync: Iterator<Item = DcbResult<DcbSequencedEvent>> + Send {
77 fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>>;
81}
82
83#[async_trait]
85pub trait DcbEventStoreAsync: Send + Sync {
86 async fn read<'a>(
94 &'a self,
95 query: Option<DcbQuery>,
96 start: Option<u64>,
97 backwards: bool,
98 limit: Option<u32>,
99 subscribe: bool,
100 ) -> DcbResult<Box<dyn DcbReadResponseAsync + Send + 'static>>;
101
102 async fn read_with_head<'a>(
104 &'a self,
105 query: Option<DcbQuery>,
106 after: Option<u64>,
107 backwards: bool,
108 limit: Option<u32>,
109 ) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
110 let mut response = self.read(query, after, backwards, limit, false).await?;
111 response.collect_with_head().await
112 }
113
114 async fn head(&self) -> DcbResult<Option<u64>>;
118
119 async fn get_tracking_info(&self, source: &str) -> DcbResult<Option<u64>>;
121
122 async fn append(
126 &self,
127 events: Vec<DcbEvent>,
128 condition: Option<DcbAppendCondition>,
129 tracking_info: Option<TrackingInfo>,
130 ) -> DcbResult<u64>;
131}
132
133#[async_trait]
135pub trait DcbReadResponseAsync: Stream<Item = DcbResult<DcbSequencedEvent>> + Send + Unpin {
136 async fn head(&mut self) -> DcbResult<Option<u64>>;
137
138 async fn collect_with_head(&mut self) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
139 let mut events = Vec::new();
140 while let Some(result) = self.next().await {
141 events.push(result?); }
143
144 let head = self.head().await?;
145 Ok((events, head))
146 }
147
148 async fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>>;
149}
150
151#[async_trait]
153pub trait DcbSubscriptionAsync: Stream<Item = DcbResult<DcbSequencedEvent>> + Send + Unpin {
154 async fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>>;
155}
156
157#[derive(Debug, Clone, Default)]
159#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
160pub struct DcbQueryItem {
161 pub types: Vec<String>,
163 pub tags: Vec<String>,
165}
166
167impl DcbQueryItem {
168 pub fn new() -> Self {
170 Self {
171 types: vec![],
172 tags: vec![],
173 }
174 }
175
176 pub fn types<I, S>(mut self, types: I) -> Self
178 where
179 I: IntoIterator<Item = S>,
180 S: Into<String>,
181 {
182 self.types = types.into_iter().map(|s| s.into()).collect();
183 self
184 }
185
186 pub fn tags<I, S>(mut self, tags: I) -> Self
188 where
189 I: IntoIterator<Item = S>,
190 S: Into<String>,
191 {
192 self.tags = tags.into_iter().map(|s| s.into()).collect();
193 self
194 }
195}
196
197#[derive(Debug, Clone, Default)]
199#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
200pub struct DcbQuery {
201 pub items: Vec<DcbQueryItem>,
203}
204
205impl DcbQuery {
206 pub fn new() -> Self {
208 Self { items: Vec::new() }
209 }
210
211 pub fn with_items<I>(items: I) -> Self
213 where
214 I: IntoIterator<Item = DcbQueryItem>,
215 {
216 Self {
217 items: items.into_iter().collect(),
218 }
219 }
220
221 pub fn item(mut self, item: DcbQueryItem) -> Self {
223 self.items.push(item);
224 self
225 }
226
227 pub fn items<I>(mut self, items: I) -> Self
229 where
230 I: IntoIterator<Item = DcbQueryItem>,
231 {
232 self.items.extend(items);
233 self
234 }
235}
236
237#[derive(Debug, Clone, Default)]
239#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
240pub struct DcbAppendCondition {
241 pub fail_if_events_match: DcbQuery,
243 pub after: Option<u64>,
245}
246
247impl DcbAppendCondition {
248 pub fn new(fail_if_events_match: DcbQuery) -> Self {
250 Self {
251 fail_if_events_match,
252 after: None,
253 }
254 }
255
256 pub fn after(mut self, after: Option<u64>) -> Self {
257 self.after = after;
258 self
259 }
260}
261
262#[derive(Debug, Clone)]
264#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
265pub struct DcbEvent {
266 pub event_type: String,
268 pub tags: Vec<String>,
270 pub data: Vec<u8>,
272 pub uuid: Option<Uuid>,
274}
275
276impl Default for DcbEvent {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282impl DcbEvent {
283 pub fn new() -> Self {
285 Self {
286 event_type: "".to_string(),
287 data: Vec::new(),
288 tags: Vec::new(),
289 uuid: None,
290 }
291 }
292
293 pub fn event_type<S: Into<String>>(mut self, event_type: S) -> Self {
295 self.event_type = event_type.into();
296 self
297 }
298
299 pub fn data<D: Into<Vec<u8>>>(mut self, data: D) -> Self {
301 self.data = data.into();
302 self
303 }
304
305 pub fn tags<I, S>(mut self, tags: I) -> Self
307 where
308 I: IntoIterator<Item = S>,
309 S: Into<String>,
310 {
311 self.tags = tags.into_iter().map(|s| s.into()).collect();
312 self
313 }
314
315 pub fn uuid(mut self, uuid: Uuid) -> Self {
317 self.uuid = Some(uuid);
318 self
319 }
320}
321
322#[derive(Debug, Clone)]
323#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
324pub struct TrackingInfo {
325 pub source: String,
326 pub position: u64,
327}
328
329#[derive(Debug, Clone)]
331#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
332pub struct DcbSequencedEvent {
333 pub position: u64,
335 pub event: DcbEvent,
337}
338
339#[derive(Error, Debug)]
341#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
342pub enum DcbError {
343 #[error("io error: {0}")]
345 #[cfg_attr(feature = "serde", serde(with = "serde_io_error"))]
346 Io(#[from] std::io::Error),
347
348 #[error("integrity error: condition failed: {0}")]
350 IntegrityError(String),
351 #[error("corruption detected: {0}")]
352 Corruption(String),
353 #[error("invalid argument: {0}")]
355 InvalidArgument(String),
356
357 #[error("initialization error: {0}")]
359 InitializationError(String),
360 #[error("page not found: {0}")]
361 PageNotFound(u64),
362 #[error("dirty page not found: {0}")]
363 DirtyPageNotFound(u64),
364 #[error("root ID mismatched: old {0} new {1}")]
365 RootIDMismatch(u64, u64),
366 #[error("database corrupted: {0}")]
367 DatabaseCorrupted(String),
368 #[error("internal error: {0}")]
369 InternalError(String),
370 #[error("serialization error: {0}")]
371 SerializationError(String),
372 #[error("deserialization error: {0}")]
373 DeserializationError(String),
374 #[error("page already freed: {0}")]
375 PageAlreadyFreed(u64),
376 #[error("page already dirty: {0}")]
377 PageAlreadyDirty(u64),
378 #[error("transport error: {0}")]
379 TransportError(String),
380 #[error("cancelled by user")]
381 CancelledByUser(),
382
383 #[error("authentication error: {0}")]
385 AuthenticationError(String),
386}
387
388pub type DcbResult<T> = Result<T, DcbError>;
389
390#[cfg(feature = "serde")]
391mod serde_io_error {
392 use std::{borrow::Cow, io};
393
394 use serde::{Deserialize, Serialize};
395
396 #[derive(Serialize, Deserialize)]
397 struct IoError {
398 kind: Option<Cow<'static, str>>,
399 message: Option<String>,
400 }
401
402 pub fn serialize<S>(err: &io::Error, serializer: S) -> Result<S::Ok, S::Error>
403 where
404 S: serde::Serializer,
405 {
406 let kind = match err.kind() {
407 io::ErrorKind::NotFound => Some(Cow::Borrowed("NotFound")),
408 io::ErrorKind::PermissionDenied => Some(Cow::Borrowed("PermissionDenied")),
409 io::ErrorKind::ConnectionRefused => Some(Cow::Borrowed("ConnectionRefused")),
410 io::ErrorKind::ConnectionReset => Some(Cow::Borrowed("ConnectionReset")),
411 io::ErrorKind::HostUnreachable => Some(Cow::Borrowed("HostUnreachable")),
412 io::ErrorKind::NetworkUnreachable => Some(Cow::Borrowed("NetworkUnreachable")),
413 io::ErrorKind::ConnectionAborted => Some(Cow::Borrowed("ConnectionAborted")),
414 io::ErrorKind::NotConnected => Some(Cow::Borrowed("NotConnected")),
415 io::ErrorKind::AddrInUse => Some(Cow::Borrowed("AddrInUse")),
416 io::ErrorKind::AddrNotAvailable => Some(Cow::Borrowed("AddrNotAvailable")),
417 io::ErrorKind::NetworkDown => Some(Cow::Borrowed("NetworkDown")),
418 io::ErrorKind::BrokenPipe => Some(Cow::Borrowed("BrokenPipe")),
419 io::ErrorKind::AlreadyExists => Some(Cow::Borrowed("AlreadyExists")),
420 io::ErrorKind::WouldBlock => Some(Cow::Borrowed("WouldBlock")),
421 io::ErrorKind::NotADirectory => Some(Cow::Borrowed("NotADirectory")),
422 io::ErrorKind::IsADirectory => Some(Cow::Borrowed("IsADirectory")),
423 io::ErrorKind::DirectoryNotEmpty => Some(Cow::Borrowed("DirectoryNotEmpty")),
424 io::ErrorKind::ReadOnlyFilesystem => Some(Cow::Borrowed("ReadOnlyFilesystem")),
425 io::ErrorKind::StaleNetworkFileHandle => Some(Cow::Borrowed("StaleNetworkFileHandle")),
426 io::ErrorKind::InvalidInput => Some(Cow::Borrowed("InvalidInput")),
427 io::ErrorKind::InvalidData => Some(Cow::Borrowed("InvalidData")),
428 io::ErrorKind::TimedOut => Some(Cow::Borrowed("TimedOut")),
429 io::ErrorKind::WriteZero => Some(Cow::Borrowed("WriteZero")),
430 io::ErrorKind::StorageFull => Some(Cow::Borrowed("StorageFull")),
431 io::ErrorKind::NotSeekable => Some(Cow::Borrowed("NotSeekable")),
432 io::ErrorKind::QuotaExceeded => Some(Cow::Borrowed("QuotaExceeded")),
433 io::ErrorKind::FileTooLarge => Some(Cow::Borrowed("FileTooLarge")),
434 io::ErrorKind::ResourceBusy => Some(Cow::Borrowed("ResourceBusy")),
435 io::ErrorKind::ExecutableFileBusy => Some(Cow::Borrowed("ExecutableFileBusy")),
436 io::ErrorKind::Deadlock => Some(Cow::Borrowed("Deadlock")),
437 io::ErrorKind::CrossesDevices => Some(Cow::Borrowed("CrossesDevices")),
438 io::ErrorKind::TooManyLinks => Some(Cow::Borrowed("TooManyLinks")),
439 io::ErrorKind::InvalidFilename => Some(Cow::Borrowed("InvalidFilename")),
440 io::ErrorKind::ArgumentListTooLong => Some(Cow::Borrowed("ArgumentListTooLong")),
441 io::ErrorKind::Interrupted => Some(Cow::Borrowed("Interrupted")),
442 io::ErrorKind::Unsupported => Some(Cow::Borrowed("Unsupported")),
443 io::ErrorKind::UnexpectedEof => Some(Cow::Borrowed("UnexpectedEof")),
444 io::ErrorKind::OutOfMemory => Some(Cow::Borrowed("OutOfMemory")),
445 io::ErrorKind::Other => Some(Cow::Borrowed("Other")),
446 _ => None,
447 };
448
449 IoError {
450 kind,
451 message: err.get_ref().map(|err| err.to_string()),
452 }
453 .serialize(serializer)
454 }
455
456 pub fn deserialize<'de, D>(deserializer: D) -> Result<io::Error, D::Error>
457 where
458 D: serde::Deserializer<'de>,
459 {
460 let io_err: IoError = <IoError as Deserialize>::deserialize(deserializer)?;
461 let kind = match io_err.kind.as_deref() {
462 Some("NotFound") => io::ErrorKind::NotFound,
463 Some("PermissionDenied") => io::ErrorKind::PermissionDenied,
464 Some("ConnectionRefused") => io::ErrorKind::ConnectionRefused,
465 Some("ConnectionReset") => io::ErrorKind::ConnectionReset,
466 Some("HostUnreachable") => io::ErrorKind::HostUnreachable,
467 Some("NetworkUnreachable") => io::ErrorKind::NetworkUnreachable,
468 Some("ConnectionAborted") => io::ErrorKind::ConnectionAborted,
469 Some("NotConnected") => io::ErrorKind::NotConnected,
470 Some("AddrInUse") => io::ErrorKind::AddrInUse,
471 Some("AddrNotAvailable") => io::ErrorKind::AddrNotAvailable,
472 Some("NetworkDown") => io::ErrorKind::NetworkDown,
473 Some("BrokenPipe") => io::ErrorKind::BrokenPipe,
474 Some("AlreadyExists") => io::ErrorKind::AlreadyExists,
475 Some("WouldBlock") => io::ErrorKind::WouldBlock,
476 Some("NotADirectory") => io::ErrorKind::NotADirectory,
477 Some("IsADirectory") => io::ErrorKind::IsADirectory,
478 Some("DirectoryNotEmpty") => io::ErrorKind::DirectoryNotEmpty,
479 Some("ReadOnlyFilesystem") => io::ErrorKind::ReadOnlyFilesystem,
480 Some("StaleNetworkFileHandle") => io::ErrorKind::StaleNetworkFileHandle,
481 Some("InvalidInput") => io::ErrorKind::InvalidInput,
482 Some("InvalidData") => io::ErrorKind::InvalidData,
483 Some("TimedOut") => io::ErrorKind::TimedOut,
484 Some("WriteZero") => io::ErrorKind::WriteZero,
485 Some("StorageFull") => io::ErrorKind::StorageFull,
486 Some("NotSeekable") => io::ErrorKind::NotSeekable,
487 Some("QuotaExceeded") => io::ErrorKind::QuotaExceeded,
488 Some("FileTooLarge") => io::ErrorKind::FileTooLarge,
489 Some("ResourceBusy") => io::ErrorKind::ResourceBusy,
490 Some("ExecutableFileBusy") => io::ErrorKind::ExecutableFileBusy,
491 Some("Deadlock") => io::ErrorKind::Deadlock,
492 Some("CrossesDevices") => io::ErrorKind::CrossesDevices,
493 Some("TooManyLinks") => io::ErrorKind::TooManyLinks,
494 Some("InvalidFilename") => io::ErrorKind::InvalidFilename,
495 Some("ArgumentListTooLong") => io::ErrorKind::ArgumentListTooLong,
496 Some("Interrupted") => io::ErrorKind::Interrupted,
497 Some("Unsupported") => io::ErrorKind::Unsupported,
498 Some("UnexpectedEof") => io::ErrorKind::UnexpectedEof,
499 Some("OutOfMemory") => io::ErrorKind::OutOfMemory,
500 Some("Other") => io::ErrorKind::Other,
501 _ => io::ErrorKind::Other,
502 };
503
504 Ok(io::Error::new(
505 kind,
506 io_err
507 .message
508 .unwrap_or_else(|| "unknown error".to_string()),
509 ))
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516
517 struct TestReadResponse {
519 events: Vec<DcbSequencedEvent>,
520 current_index: usize,
521 head_position: Option<u64>,
522 }
523
524 impl TestReadResponse {
525 fn new(events: Vec<DcbSequencedEvent>, head_position: Option<u64>) -> Self {
526 Self {
527 events,
528 current_index: 0,
529 head_position,
530 }
531 }
532 }
533
534 impl Iterator for TestReadResponse {
535 type Item = DcbResult<DcbSequencedEvent>;
536
537 fn next(&mut self) -> Option<Self::Item> {
538 if self.current_index < self.events.len() {
539 let event = self.events[self.current_index].clone();
540 self.current_index += 1;
541 Some(Ok(event))
542 } else {
543 None
544 }
545 }
546 }
547
548 impl DcbReadResponseSync for TestReadResponse {
549 fn head(&mut self) -> DcbResult<Option<u64>> {
550 Ok(self.head_position)
551 }
552
553 fn collect_with_head(&mut self) -> DcbResult<(Vec<DcbSequencedEvent>, Option<u64>)> {
554 todo!()
555 }
556
557 fn next_batch(&mut self) -> DcbResult<Vec<DcbSequencedEvent>> {
558 let mut batch = Vec::new();
559 while let Some(result) = self.next() {
560 match result {
561 Ok(event) => batch.push(event),
562 Err(err) => {
563 panic!("{}", err);
564 }
565 }
566 }
567 Ok(batch)
568 }
569 }
570
571 #[test]
572 fn test_dcb_read_response() {
573 let event1 = DcbEvent {
575 event_type: "test_event".to_string(),
576 data: vec![1, 2, 3],
577 tags: vec!["tag1".to_string(), "tag2".to_string()],
578 uuid: None,
579 };
580
581 let event2 = DcbEvent {
582 event_type: "another_event".to_string(),
583 data: vec![4, 5, 6],
584 tags: vec!["tag2".to_string(), "tag3".to_string()],
585 uuid: None,
586 };
587
588 let seq_event1 = DcbSequencedEvent {
589 event: event1,
590 position: 1,
591 };
592
593 let seq_event2 = DcbSequencedEvent {
594 event: event2,
595 position: 2,
596 };
597
598 let mut response =
600 TestReadResponse::new(vec![seq_event1.clone(), seq_event2.clone()], Some(2));
601
602 assert_eq!(response.head().unwrap(), Some(2));
604
605 assert_eq!(response.next().unwrap().unwrap().position, 1);
607 assert_eq!(response.next().unwrap().unwrap().position, 2);
608 assert!(response.next().is_none());
609 }
610
611 #[test]
612 fn test_event_new() {
613 let event1 = DcbEvent::default()
614 .event_type("type1")
615 .data(b"data1")
616 .tags(["tagX"]);
617
618 assert_eq!(event1.event_type, "type1");
626 assert_eq!(event1.data, b"data1".to_vec());
627 assert_eq!(event1.tags, vec!["tagX".to_string()]);
628 assert_eq!(event1.uuid, None);
629
630 let event2 = DcbEvent::default()
632 .event_type("type2")
633 .data(b"data2")
634 .tags(["tag1", "tag2", "tag3"]);
635 assert_eq!(event2.tags.len(), 3);
636
637 let event3 = DcbEvent::default().event_type("type3");
639 assert_eq!(event3.data.len(), 0);
640 assert_eq!(event3.tags.len(), 0);
641
642 let query_item = DcbQueryItem::new()
644 .types(["type1", "type2"])
645 .tags(["tagA", "tagB"]);
646 assert_eq!(query_item.types.len(), 2);
647 assert_eq!(query_item.tags.len(), 2);
648
649 let query = DcbQuery::new().item(query_item);
651 assert_eq!(query.items.len(), 1);
652
653 println!("\nAll builder API tests passed!");
654 }
655}