use super::{AppendTicket, BatchAppendTicket, Outbox, WriterCommand};
use crate::coordinate::Coordinate;
use crate::event::{EventKind, EventPayload};
use crate::store::{BatchAppendItem, Open, Store, StoreError};
use flume::TrySendError;
use serde::Serialize;
pub struct VisibilityFence<'a> {
store: &'a Store<Open>,
token: u64,
closed: bool,
}
impl<'a> VisibilityFence<'a> {
pub(crate) fn new(store: &'a Store<Open>, token: u64) -> Self {
Self {
store,
token,
closed: false,
}
}
pub(crate) fn token(&self) -> u64 {
self.token
}
pub fn submit(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<AppendTicket, StoreError> {
self.store
.submit_with_fence(coord, kind, payload, self.token)
}
pub fn submit_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: crate::id::CorrelationId,
causation_id: crate::id::CausationId,
) -> Result<AppendTicket, StoreError> {
use crate::id::EntityIdType;
self.store.submit_reaction_with_fence(
coord,
kind,
payload,
correlation_id.as_u128(),
causation_id.as_u128(),
self.token,
)
}
pub fn submit_typed<T: EventPayload>(
&self,
coord: &Coordinate,
payload: &T,
) -> Result<AppendTicket, StoreError> {
self.submit(coord, T::KIND, payload)
}
pub fn submit_reaction_typed<T: EventPayload>(
&self,
coord: &Coordinate,
payload: &T,
correlation_id: crate::id::CorrelationId,
causation_id: crate::id::CausationId,
) -> Result<AppendTicket, StoreError> {
self.submit_reaction(coord, T::KIND, payload, correlation_id, causation_id)
}
pub fn submit_batch(
&self,
items: Vec<BatchAppendItem>,
) -> Result<BatchAppendTicket, StoreError> {
self.store.submit_batch_with_fence(items, self.token)
}
pub fn outbox(&self) -> Outbox<'_> {
Outbox::new(self.store, Some(self.token))
}
pub fn commit(mut self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.store
.writer_handle()?
.tx
.send(WriterCommand::CommitVisibilityFence {
token: self.token,
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
self.closed = true;
crate::store::recv_writer_reply(&rx)
}
pub fn cancel(mut self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.store
.writer_handle()?
.tx
.send(WriterCommand::CancelVisibilityFence {
token: self.token,
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
self.closed = true;
crate::store::recv_writer_reply(&rx)
}
}
impl<'a> VisibilityFence<'a> {
fn try_cancel_on_drop(&mut self) -> Result<(), String> {
if self.closed {
return Ok(());
}
let Some(writer) = self.store.writer.as_ref() else {
return Ok(());
};
let writer_tx = writer.tx.clone();
let (tx, _rx) = flume::bounded(1);
let command = WriterCommand::CancelVisibilityFence {
token: self.token,
respond: tx,
};
match writer_tx.try_send(command) {
Ok(()) => Ok(()),
Err(TrySendError::Disconnected(_)) => {
Err("writer channel disconnected during fence drop".to_string())
}
Err(TrySendError::Full(command)) => std::thread::Builder::new()
.name("batpak-fence-drop-cancel".to_string())
.spawn(move || {
drop(writer_tx.send(command));
})
.map(|_| ())
.map_err(|error| format!("failed to spawn drop-cancel helper: {error}")),
}
}
}
impl Drop for VisibilityFence<'_> {
fn drop(&mut self) {
if let Err(e) = self.try_cancel_on_drop() {
tracing::error!(
fence_token = ?self.token,
err = %e,
"visibility-fence cancel enqueue failed on drop; explicit cancel() \
recommended for deterministic cleanup"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::StoreConfig;
use tempfile::TempDir;
#[test]
fn visibility_fence_token_matches_begin_handle() {
let dir = TempDir::new().expect("tempdir");
let store = Store::open(
StoreConfig::new(dir.path())
.with_segment_max_bytes(4096)
.with_sync_every_n_events(1),
)
.expect("open store");
let fence = store.begin_visibility_fence().expect("begin fence");
assert_ne!(fence.token(), 0);
fence.commit().expect("commit fence");
store.close().expect("close store");
}
}