use std::sync::Arc;
use crate::coordinate::Coordinate;
use crate::event::EventPayload;
use crate::store::append::{AppendOptions, BatchAppendItem, CausationRef};
use crate::store::{AppendReceipt, Open, Store, StoreError};
pub struct ReactionBatch {
items: Vec<BatchAppendItem>,
}
impl ReactionBatch {
pub(crate) fn new() -> Self {
Self { items: Vec::new() }
}
pub fn push_typed<T: EventPayload>(
&mut self,
coord: Coordinate,
payload: &T,
causation: CausationRef,
) -> Result<(), StoreError> {
self.push_typed_with_options(coord, payload, AppendOptions::default(), causation)
}
pub fn push_typed_with_options<T: EventPayload>(
&mut self,
coord: Coordinate,
payload: &T,
options: AppendOptions,
causation: CausationRef,
) -> Result<(), StoreError> {
let item = BatchAppendItem::typed(coord, payload, options, causation)?;
self.items.push(item);
Ok(())
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
pub(crate) fn flush(
self,
store: &Arc<Store<Open>>,
correlation_id: u128,
causation_id: u128,
) -> Result<Vec<AppendReceipt>, StoreError> {
if self.items.is_empty() {
return Ok(Vec::new());
}
store.append_reaction_batch(
crate::id::CorrelationId::from(correlation_id),
crate::id::CausationId::from(causation_id),
self.items,
)
}
}
impl Default for ReactionBatch {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use crate::coordinate::Coordinate;
use crate::store::{Store, StoreConfig};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct InternalA {
n: u64,
}
impl crate::event::EventPayload for InternalA {
const KIND: crate::event::EventKind = crate::event::EventKind::custom(6, 1);
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct InternalB {
s: String,
}
impl crate::event::EventPayload for InternalB {
const KIND: crate::event::EventKind = crate::event::EventKind::custom(6, 2);
}
fn open_store() -> (Arc<Store<Open>>, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let store = Store::open(StoreConfig::new(dir.path())).expect("open");
(Arc::new(store), dir)
}
#[test]
fn flush_returns_empty_receipts_for_empty_batch() {
let (store, _dir) = open_store();
let batch = ReactionBatch::new();
let receipts = batch.flush(&store, 0, 0).expect("flush empty");
assert!(receipts.is_empty());
}
#[test]
fn flush_commits_multi_item_batch_atomically() {
let (store, _dir) = open_store();
let source = store
.append_typed(
&Coordinate::new("entity:reaction-internal-src", "scope:test").unwrap(),
&InternalA { n: 1 },
)
.expect("source append");
let before = store.stats().global_sequence;
let target_coord = Coordinate::new("entity:reaction-internal-tgt", "scope:test").unwrap();
let mut batch = ReactionBatch::new();
batch
.push_typed(
target_coord.clone(),
&InternalA { n: 2 },
CausationRef::None,
)
.unwrap();
batch
.push_typed(
target_coord.clone(),
&InternalB {
s: "chained".into(),
},
CausationRef::PriorItem(0),
)
.unwrap();
assert_eq!(batch.len(), 2);
let receipts = batch
.flush(
&store,
{
use crate::id::EntityIdType;
source.event_id.as_u128()
},
{
use crate::id::EntityIdType;
source.event_id.as_u128()
},
)
.expect("flush");
assert_eq!(
receipts.len(),
2,
"PROPERTY: flush returns one receipt per pushed item"
);
let after = store.stats().global_sequence;
assert_eq!(
after - before,
2,
"PROPERTY: atomic flush advances sequence by exactly item count"
);
assert_eq!(store.by_fact_typed::<InternalA>().len(), 2);
assert_eq!(store.by_fact_typed::<InternalB>().len(), 1);
}
#[test]
fn prior_item_causation_resolves_within_flush() {
let (store, _dir) = open_store();
let source = store
.append_typed(
&Coordinate::new("entity:reaction-chain-src", "scope:test").unwrap(),
&InternalA { n: 10 },
)
.expect("source");
let target = Coordinate::new("entity:reaction-chain-tgt", "scope:test").unwrap();
let mut batch = ReactionBatch::new();
batch
.push_typed(target.clone(), &InternalA { n: 11 }, CausationRef::None)
.unwrap();
batch
.push_typed(
target.clone(),
&InternalB {
s: "after-0".into(),
},
CausationRef::PriorItem(0),
)
.unwrap();
let receipts = batch
.flush(
&store,
{
use crate::id::EntityIdType;
source.event_id.as_u128()
},
{
use crate::id::EntityIdType;
source.event_id.as_u128()
},
)
.expect("flush");
assert_eq!(receipts.len(), 2);
let second = store.get(receipts[1].event_id).expect("get second");
assert_eq!(
second.event.header.causation_id,
Some({
use crate::id::EntityIdType;
crate::id::CausationId::from(receipts[0].event_id.as_u128())
}),
"PROPERTY: PriorItem causation resolves to first item's event_id"
);
}
}