use super::BatchAppendTicket;
use crate::coordinate::Coordinate;
use crate::event::{EventKind, EventPayload};
use crate::store::{
AppendOptions, AppendReceipt, BatchAppendItem, CausationRef, Open, Store, StoreError,
};
use serde::Serialize;
pub struct Outbox<'a> {
store: &'a Store<Open>,
fence_token: Option<u64>,
items: Vec<BatchAppendItem>,
}
impl<'a> Outbox<'a> {
pub(crate) fn new(store: &'a Store<Open>, fence_token: Option<u64>) -> Self {
Self {
store,
fence_token,
items: Vec::new(),
}
}
pub fn stage(
&mut self,
coord: Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<&mut Self, StoreError> {
self.stage_with_options_and_causation(
coord,
kind,
payload,
AppendOptions::default(),
CausationRef::None,
)
}
pub fn stage_with_options(
&mut self,
coord: Coordinate,
kind: EventKind,
payload: &impl Serialize,
options: AppendOptions,
) -> Result<&mut Self, StoreError> {
self.stage_with_options_and_causation(coord, kind, payload, options, CausationRef::None)
}
pub fn stage_with_causation(
&mut self,
coord: Coordinate,
kind: EventKind,
payload: &impl Serialize,
causation: CausationRef,
) -> Result<&mut Self, StoreError> {
self.stage_with_options_and_causation(
coord,
kind,
payload,
AppendOptions::default(),
causation,
)
}
pub fn stage_with_options_and_causation(
&mut self,
coord: Coordinate,
kind: EventKind,
payload: &impl Serialize,
options: AppendOptions,
causation: CausationRef,
) -> Result<&mut Self, StoreError> {
let item = BatchAppendItem::new(coord, kind, payload, options, causation)?;
self.items.push(item);
Ok(self)
}
pub fn stage_typed<T: EventPayload>(
&mut self,
coord: Coordinate,
payload: &T,
) -> Result<&mut Self, StoreError> {
self.stage_with_options_and_causation(
coord,
T::KIND,
payload,
AppendOptions::default(),
CausationRef::None,
)
}
pub fn stage_typed_with_options<T: EventPayload>(
&mut self,
coord: Coordinate,
payload: &T,
options: AppendOptions,
) -> Result<&mut Self, StoreError> {
self.stage_with_options_and_causation(coord, T::KIND, payload, options, CausationRef::None)
}
pub fn stage_typed_with_causation<T: EventPayload>(
&mut self,
coord: Coordinate,
payload: &T,
causation: CausationRef,
) -> Result<&mut Self, StoreError> {
self.stage_with_options_and_causation(
coord,
T::KIND,
payload,
AppendOptions::default(),
causation,
)
}
pub fn stage_typed_with_options_and_causation<T: EventPayload>(
&mut self,
coord: Coordinate,
payload: &T,
options: AppendOptions,
causation: CausationRef,
) -> Result<&mut Self, StoreError> {
self.stage_with_options_and_causation(coord, T::KIND, payload, options, causation)
}
pub fn push_item(&mut self, item: BatchAppendItem) -> &mut Self {
self.items.push(item);
self
}
pub fn flush(&mut self) -> Result<Vec<AppendReceipt>, StoreError> {
let items = std::mem::take(&mut self.items);
match self.fence_token {
Some(token) => self.store.submit_batch_with_fence(items, token)?.wait(),
None => self.store.append_batch(items),
}
}
pub fn submit_flush(&mut self) -> Result<BatchAppendTicket, StoreError> {
let items = std::mem::take(&mut self.items);
match self.fence_token {
Some(token) => self.store.submit_batch_with_fence(items, token),
None => self.store.submit_batch(items),
}
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
}