use super::{AppendSubmission, AppendTicket, BatchAppendTicket, WriterCommand, WriterHandle};
use crate::coordinate::Coordinate;
use crate::event::EventKind;
use crate::store::append::checked_append_bytes;
use crate::store::{BatchAppendItem, Open, Store, StoreError};
use serde::Serialize;
impl Store<Open> {
pub(crate) fn submit_batch_with_fence(
&self,
items: Vec<BatchAppendItem>,
token: u64,
) -> Result<BatchAppendTicket, StoreError> {
self.submit_batch_with_fence_impl(items, Some(token))
}
pub(crate) fn submit_batch_with_fence_impl(
&self,
items: Vec<BatchAppendItem>,
token: Option<u64>,
) -> Result<BatchAppendTicket, StoreError> {
let _lifecycle = self.lifecycle_gate.lock();
let (tx, rx) = flume::bounded(1);
let command = match token {
Some(token) => WriterCommand::FenceAppendBatch {
token,
items,
respond: tx,
},
None => WriterCommand::AppendBatch { items, respond: tx },
};
self.writer_handle()?
.tx
.send(command)
.map_err(|_| StoreError::WriterCrashed)?;
Ok(BatchAppendTicket::new(rx))
}
pub(crate) fn submit_prepared(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
submission: AppendSubmission,
) -> Result<AppendTicket, StoreError> {
let _lifecycle = self.lifecycle_gate.lock();
submission.validate_route(self)?;
submission.validate_idempotency(self)?;
let event = submission.build_event(payload, kind, self.runtime.now_us())?;
let append_bytes =
checked_append_bytes(event.payload.len(), submission.receipt_extensions())?;
if append_bytes > self.config.single_append_max_bytes as usize {
return Err(StoreError::Configuration(format!(
"single append bytes {} exceeds max {}",
append_bytes, self.config.single_append_max_bytes
)));
}
let (tx, rx) = flume::bounded(1);
let command = submission.into_command(coord.clone(), kind, event, tx);
self.writer_handle()?
.tx
.send(command)
.map_err(|_| StoreError::WriterCrashed)?;
Ok(AppendTicket::new(rx))
}
pub(crate) fn writer_handle(&self) -> Result<&WriterHandle, StoreError> {
let writer = self.writer.as_ref().ok_or(StoreError::WriterCrashed)?;
writer.fail_if_exited()?;
Ok(writer)
}
pub(crate) fn ensure_no_active_public_fence(&self) -> Result<(), StoreError> {
if self.index.active_visibility_fence().is_some() {
return Err(StoreError::VisibilityFenceActive);
}
Ok(())
}
pub(crate) fn submit_with_fence(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
token: u64,
) -> Result<AppendTicket, StoreError> {
self.submit_prepared(
coord,
kind,
payload,
AppendSubmission::root_under_fence(token, self.runtime.clock()),
)
}
pub(crate) fn submit_reaction_with_fence(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
token: u64,
) -> Result<AppendTicket, StoreError> {
self.submit_prepared(
coord,
kind,
payload,
AppendSubmission::reaction_under_fence(
token,
self.runtime.clock(),
correlation_id,
causation_id,
),
)
}
pub(crate) fn submit_pressure_gate(&self) -> Option<crate::outcome::Outcome<AppendTicket>> {
let writer = self.writer.as_ref()?;
self.pressure_retry_outcome(writer.tx.len())
}
pub(crate) fn submit_pressure_gate_batch(
&self,
) -> Option<crate::outcome::Outcome<BatchAppendTicket>> {
let writer = self.writer.as_ref()?;
self.pressure_retry_outcome(writer.tx.len())
}
pub(crate) fn pressure_retry_threshold(&self) -> usize {
self.runtime.pressure_retry_threshold
}
fn pressure_retry_outcome<T>(&self, queued: usize) -> Option<crate::outcome::Outcome<T>> {
if queued < self.pressure_retry_threshold() {
return None;
}
Some(crate::outcome::Outcome::retry(
10,
1,
1,
format!(
"writer mailbox at {queued}/{} queued commands",
self.config.writer.channel_capacity
),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::StoreConfig;
use tempfile::TempDir;
#[test]
fn pressure_retry_threshold_reflects_validated_config() {
let dir = TempDir::new().expect("tempdir");
let store = Store::open(
StoreConfig::new(dir.path())
.with_segment_max_bytes(4096)
.with_writer_channel_capacity(10)
.with_writer_pressure_retry_threshold_pct(60),
)
.expect("open store");
assert_eq!(store.pressure_retry_threshold(), 6);
assert!(store.submit_pressure_gate().is_none());
assert!(
store
.pressure_retry_outcome::<BatchAppendTicket>(5)
.is_none(),
"PROPERTY: queued commands below the retry threshold must pass without retry advice"
);
assert!(
store
.pressure_retry_outcome::<BatchAppendTicket>(6)
.is_some(),
"PROPERTY: queued commands exactly at the retry threshold must produce retry advice; \
a <= comparison waits one command too long"
);
store.close().expect("close store");
}
}