use super::{AppendTicket, BatchAppendTicket, Outbox, WriterCommand};
use crate::coordinate::Coordinate;
use crate::event::{EventKind, EventPayload};
use crate::store::{BatchAppendItem, Open, Store, StoreError};
use flume::TrySendError;
use serde::Serialize;
pub struct VisibilityFence<'a> {
store: &'a Store<Open>,
token: u64,
closed: bool,
}
impl<'a> VisibilityFence<'a> {
pub(crate) fn new(store: &'a Store<Open>, token: u64) -> Self {
Self {
store,
token,
closed: false,
}
}
pub fn submit(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<AppendTicket, StoreError> {
self.store
.submit_with_fence(coord, kind, payload, self.token)
}
pub fn submit_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
) -> Result<AppendTicket, StoreError> {
self.store.submit_reaction_with_fence(
coord,
kind,
payload,
correlation_id,
causation_id,
self.token,
)
}
pub fn submit_typed<T: EventPayload>(
&self,
coord: &Coordinate,
payload: &T,
) -> Result<AppendTicket, StoreError> {
self.submit(coord, T::KIND, payload)
}
pub fn submit_reaction_typed<T: EventPayload>(
&self,
coord: &Coordinate,
payload: &T,
correlation_id: u128,
causation_id: u128,
) -> Result<AppendTicket, StoreError> {
self.submit_reaction(coord, T::KIND, payload, correlation_id, causation_id)
}
pub fn submit_batch(
&self,
items: Vec<BatchAppendItem>,
) -> Result<BatchAppendTicket, StoreError> {
self.store.submit_batch_with_fence(items, self.token)
}
pub fn outbox(&self) -> Outbox<'_> {
Outbox::new(self.store, Some(self.token))
}
pub fn commit(mut self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.store
.writer_handle()?
.tx
.send(WriterCommand::CommitVisibilityFence {
token: self.token,
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
self.closed = true;
crate::store::recv_writer_reply(&rx)
}
pub fn cancel(mut self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.store
.writer_handle()?
.tx
.send(WriterCommand::CancelVisibilityFence {
token: self.token,
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
self.closed = true;
crate::store::recv_writer_reply(&rx)
}
}
impl<'a> VisibilityFence<'a> {
fn try_cancel_on_drop(&mut self) -> Result<(), String> {
if self.closed {
return Ok(());
}
let Some(writer) = self.store.writer.as_ref() else {
return Ok(());
};
let writer_tx = writer.tx.clone();
let (tx, _rx) = flume::bounded(1);
let command = WriterCommand::CancelVisibilityFence {
token: self.token,
respond: tx,
};
match writer_tx.try_send(command) {
Ok(()) => Ok(()),
Err(TrySendError::Disconnected(_)) => {
Err("writer channel disconnected during fence drop".to_string())
}
Err(TrySendError::Full(command)) => std::thread::Builder::new()
.name("batpak-fence-drop-cancel".to_string())
.spawn(move || {
let _ = writer_tx.send(command);
})
.map(|_| ())
.map_err(|error| format!("failed to spawn drop-cancel helper: {error}")),
}
}
}
impl Drop for VisibilityFence<'_> {
fn drop(&mut self) {
if let Err(e) = self.try_cancel_on_drop() {
tracing::error!(
fence_token = ?self.token,
err = %e,
"visibility-fence cancel enqueue failed on drop; explicit cancel() \
recommended for deterministic cleanup"
);
}
}
}