use crate::store::index::IndexEntry;
use crate::store::write::fanout::Notification;
use crate::store::StoreError;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
#[derive(Debug)]
pub enum CanalBatch<I> {
Empty,
One(I),
Many(Vec<I>),
}
impl<I> CanalBatch<I> {
#[must_use]
pub const fn is_empty(&self) -> bool {
matches!(self, Self::Empty)
}
}
pub trait CanalItem {
fn event_id(&self) -> u128;
}
impl CanalItem for IndexEntry {
fn event_id(&self) -> u128 {
self.event_id()
}
}
impl CanalItem for Notification {
fn event_id(&self) -> u128 {
self.event_id
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct CanalClosed;
impl std::fmt::Display for CanalClosed {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "canal closed")
}
}
impl std::error::Error for CanalClosed {}
pub trait Canal: Send {
type Item: CanalItem + Send;
type Error: std::error::Error + Send + Sync + 'static;
fn pull_batch(
&mut self,
max: usize,
deadline: Duration,
) -> Result<CanalBatch<Self::Item>, Self::Error>;
}
pub trait CanalHandle: Send {
fn stop(&self);
fn join(self: Box<Self>) -> Result<(), StoreError>;
fn stop_and_join(self: Box<Self>) -> Result<(), StoreError>;
}
pub(crate) struct SubscriptionWorkerHandle {
stop: Arc<AtomicBool>,
join: Option<JoinHandle<()>>,
error_slot: Arc<Mutex<Option<StoreError>>>,
}
impl SubscriptionWorkerHandle {
pub(crate) fn new(
stop: Arc<AtomicBool>,
join: JoinHandle<()>,
error_slot: Arc<Mutex<Option<StoreError>>>,
) -> Self {
Self {
stop,
join: Some(join),
error_slot,
}
}
fn finish_join(&mut self) -> Result<(), StoreError> {
if let Some(join) = self.join.take() {
join.join().map_err(|_| StoreError::WriterCrashed)?;
}
let mut guard = self.error_slot.lock();
guard.take().map_or(Ok(()), Err)
}
}
impl CanalHandle for SubscriptionWorkerHandle {
fn stop(&self) {
self.stop.store(true, Ordering::Release);
}
fn join(mut self: Box<Self>) -> Result<(), StoreError> {
self.finish_join()
}
fn stop_and_join(mut self: Box<Self>) -> Result<(), StoreError> {
self.stop();
self.finish_join()
}
}
impl Drop for SubscriptionWorkerHandle {
fn drop(&mut self) {
self.stop.store(true, Ordering::Release);
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum ReactorCanal {
#[default]
CursorGuaranteed,
LossySubscription,
}