use crate::store::index::IndexEntry;
use crate::store::platform::spawn::JobHandle;
use crate::store::write::fanout::Notification;
use crate::store::StoreError;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug)]
pub enum CanalBatch<I> {
Empty,
One(I),
Many(Vec<I>),
}
impl<I> CanalBatch<I> {
#[must_use]
pub const fn is_empty(&self) -> bool {
matches!(self, Self::Empty)
}
}
pub trait CanalItem {
fn event_id(&self) -> crate::id::EventId;
}
impl CanalItem for IndexEntry {
fn event_id(&self) -> crate::id::EventId {
self.event_id()
}
}
impl CanalItem for Notification {
fn event_id(&self) -> crate::id::EventId {
self.event_id
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CanalClosed;
impl std::fmt::Display for CanalClosed {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "canal closed")
}
}
impl std::error::Error for CanalClosed {}
pub trait Canal: Send {
type Item: CanalItem + Send;
type Error: std::error::Error + Send + Sync + 'static;
fn pull_batch(
&mut self,
max: usize,
deadline: Duration,
) -> Result<CanalBatch<Self::Item>, Self::Error>;
}
pub trait CanalHandle: Send {
fn stop(&self);
fn join(self: Box<Self>) -> Result<(), StoreError>;
fn stop_and_join(self: Box<Self>) -> Result<(), StoreError>;
}
pub(crate) struct SubscriptionWorkerHandle {
stop: Arc<AtomicBool>,
join: Option<Box<dyn JobHandle>>,
error_slot: Arc<Mutex<Option<StoreError>>>,
}
impl SubscriptionWorkerHandle {
pub(crate) fn new(
stop: Arc<AtomicBool>,
join: Box<dyn JobHandle>,
error_slot: Arc<Mutex<Option<StoreError>>>,
) -> Self {
Self {
stop,
join: Some(join),
error_slot,
}
}
fn finish_join(&mut self) -> Result<(), StoreError> {
if let Some(join) = self.join.take() {
join.join().map_err(|_| StoreError::WriterCrashed)?;
}
let mut guard = self.error_slot.lock();
guard.take().map_or(Ok(()), Err)
}
}
impl CanalHandle for SubscriptionWorkerHandle {
fn stop(&self) {
self.stop.store(true, Ordering::Release);
}
fn join(mut self: Box<Self>) -> Result<(), StoreError> {
self.finish_join()
}
fn stop_and_join(mut self: Box<Self>) -> Result<(), StoreError> {
self.stop();
self.finish_join()
}
}
impl Drop for SubscriptionWorkerHandle {
fn drop(&mut self) {
self.stop.store(true, Ordering::Release);
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum ReactorCanal {
#[default]
CursorGuaranteed,
LossySubscription,
}
#[cfg(test)]
mod tests {
use super::{CanalBatch, CanalClosed, CanalHandle, SubscriptionWorkerHandle};
use crate::store::platform::spawn::{Spawn, ThreadSpawn};
use crate::store::StoreError;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
fn handle_with_seeded_error(
seeded: Option<StoreError>,
) -> (SubscriptionWorkerHandle, Arc<AtomicBool>) {
let stop = Arc::new(AtomicBool::new(false));
let error_slot = Arc::new(Mutex::new(seeded));
let spawner = ThreadSpawn;
let job = spawner
.spawn("canal-finish-join-proof".to_string(), None, Box::new(|| {}))
.expect("spawn a trivial finished job");
let handle = SubscriptionWorkerHandle::new(Arc::clone(&stop), job, Arc::clone(&error_slot));
(handle, stop)
}
#[test]
fn join_surfaces_the_stashed_store_error_not_a_hardwired_ok() {
let (handle, _stop) = handle_with_seeded_error(Some(StoreError::WriterCrashed));
let result = Box::new(handle).join();
assert!(
matches!(result, Err(StoreError::WriterCrashed)),
"PROPERTY: join must return the worker's stashed terminal error; \
the Ok(()) mutants of finish_join/join swallow it, got {result:?}"
);
}
#[test]
fn stop_and_join_surfaces_the_stashed_store_error() {
let (handle, stop) = handle_with_seeded_error(Some(StoreError::WriterCrashed));
let result = Box::new(handle).stop_and_join();
assert!(
matches!(result, Err(StoreError::WriterCrashed)),
"PROPERTY: stop_and_join must return the stashed error; the Ok(()) \
mutant swallows it, got {result:?}"
);
assert!(
stop.load(Ordering::Acquire),
"stop_and_join must also raise the stop flag"
);
}
#[test]
fn a_clean_worker_joins_ok() {
let (handle, _stop) = handle_with_seeded_error(None);
Box::new(handle)
.join()
.expect("a worker with no stashed error joins cleanly");
}
#[test]
fn drop_signals_stop_to_the_background_worker() {
let (handle, stop) = handle_with_seeded_error(None);
assert!(
!stop.load(Ordering::Acquire),
"sanity: the stop flag starts unset"
);
drop(handle);
assert!(
stop.load(Ordering::Acquire),
"PROPERTY: Drop must raise the stop flag; the `()` mutant leaks the worker"
);
}
#[test]
fn canal_closed_displays_its_terminal_text() {
assert_eq!(
CanalClosed.to_string(),
"canal closed",
"PROPERTY: CanalClosed renders exact terminal text; the mutant emits \"\""
);
}
#[test]
fn r4_canal_batch_is_empty_only_for_the_empty_variant() {
assert!(
CanalBatch::<u32>::Empty.is_empty(),
"PROPERTY: an Empty batch reports is_empty() == true; the `false` \
mutant makes every idle poll look like deliverable work"
);
assert!(
!CanalBatch::One(7_u32).is_empty(),
"a One batch is non-empty"
);
assert!(
!CanalBatch::Many(vec![1_u32, 2]).is_empty(),
"a Many batch is non-empty"
);
}
}