#[cfg(feature = "dangerous-test-hooks")]
use super::CooperativePump;
use super::{AppendReceipt, AppendReply, BatchAppendReply, StoreError};
struct Ticket<T> {
rx: flume::Receiver<Result<T, StoreError>>,
#[cfg(feature = "dangerous-test-hooks")]
pump: Option<CooperativePump>,
}
impl<T> Ticket<T> {
#[cfg(feature = "dangerous-test-hooks")]
fn new(rx: flume::Receiver<Result<T, StoreError>>, pump: Option<CooperativePump>) -> Self {
Self { rx, pump }
}
#[cfg(not(feature = "dangerous-test-hooks"))]
fn new(rx: flume::Receiver<Result<T, StoreError>>) -> Self {
Self { rx }
}
fn wait(self) -> Result<T, StoreError> {
#[cfg(feature = "dangerous-test-hooks")]
if let Some(pump) = &self.pump {
pump.pump();
}
crate::store::recv_writer_reply(&self.rx)
}
fn try_check(&self) -> Option<Result<T, StoreError>> {
match self.rx.try_recv() {
Ok(value) => Some(value),
Err(flume::TryRecvError::Disconnected) => Some(Err(StoreError::WriterCrashed)),
Err(flume::TryRecvError::Empty) => None,
}
}
fn receiver(&self) -> &flume::Receiver<Result<T, StoreError>> {
&self.rx
}
}
#[must_use = "an AppendTicket must be awaited (`.wait()`) or polled; dropping it discards the append outcome and any writer error"]
pub struct AppendTicket {
inner: Ticket<AppendReceipt>,
}
impl AppendTicket {
#[cfg(feature = "dangerous-test-hooks")]
pub(crate) fn new(rx: flume::Receiver<AppendReply>, pump: Option<CooperativePump>) -> Self {
Self {
inner: Ticket::new(rx, pump),
}
}
#[cfg(not(feature = "dangerous-test-hooks"))]
pub(crate) fn new(rx: flume::Receiver<AppendReply>) -> Self {
Self {
inner: Ticket::new(rx),
}
}
pub fn wait(self) -> AppendReply {
self.inner.wait()
}
pub fn try_check(&self) -> Option<AppendReply> {
self.inner.try_check()
}
pub fn receiver(&self) -> &flume::Receiver<AppendReply> {
self.inner.receiver()
}
}
#[must_use = "a BatchAppendTicket must be awaited (`.wait()`) or polled; dropping it discards the batch outcome and any writer error"]
pub struct BatchAppendTicket {
inner: Ticket<Vec<AppendReceipt>>,
}
impl BatchAppendTicket {
#[cfg(feature = "dangerous-test-hooks")]
pub(crate) fn new(
rx: flume::Receiver<BatchAppendReply>,
pump: Option<CooperativePump>,
) -> Self {
Self {
inner: Ticket::new(rx, pump),
}
}
#[cfg(not(feature = "dangerous-test-hooks"))]
pub(crate) fn new(rx: flume::Receiver<BatchAppendReply>) -> Self {
Self {
inner: Ticket::new(rx),
}
}
pub fn wait(self) -> BatchAppendReply {
self.inner.wait()
}
pub fn try_check(&self) -> Option<BatchAppendReply> {
self.inner.try_check()
}
pub fn receiver(&self) -> &flume::Receiver<BatchAppendReply> {
self.inner.receiver()
}
}
#[cfg(test)]
#[path = "ticket_mutation_kill.rs"]
mod ticket_mutation_kill;