use std::sync::Arc;
use tokio::sync::broadcast;
use super::traits::EpochStamped;
use super::{Delta, Durability, EpochWatcher, EpochWatermarks, View};
#[derive(Debug)]
pub enum SubscribeError {
Shutdown,
NotInitialized,
MessageLost,
}
impl std::fmt::Display for SubscribeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SubscribeError::Shutdown => write!(f, "coordinator shut down"),
SubscribeError::NotInitialized => {
write!(f, "initialize() must be called before recv()")
}
SubscribeError::MessageLost => {
write!(f, "message was lost, subscriber is lagging behind")
}
}
}
}
impl std::error::Error for SubscribeError {}
pub struct ViewSubscriber<D: Delta> {
view_rx: broadcast::Receiver<Arc<View<D>>>,
initial_view: Option<Arc<View<D>>>,
watermarks: Arc<EpochWatermarks>,
}
impl<D: Delta> ViewSubscriber<D> {
pub fn new(
view_rx: broadcast::Receiver<Arc<View<D>>>,
initial_view: Arc<View<D>>,
) -> (Self, ViewMonitor) {
let (watermarks, watcher) = EpochWatermarks::new();
let watermarks = Arc::new(watermarks);
let subscriber = Self {
view_rx,
initial_view: Some(initial_view),
watermarks: watermarks.clone(),
};
let monitor = ViewMonitor {
watcher,
watermarks,
};
(subscriber, monitor)
}
pub fn initialize(&mut self) -> Arc<View<D>> {
self.initial_view
.take()
.expect("initialize() must be called exactly once")
}
pub async fn recv(&mut self) -> Result<Arc<View<D>>, SubscribeError> {
if self.initial_view.is_some() {
return Err(SubscribeError::NotInitialized);
}
match self.view_rx.recv().await {
Ok(view) => Ok(view),
Err(broadcast::error::RecvError::Lagged(_)) => Err(SubscribeError::MessageLost), Err(broadcast::error::RecvError::Closed) => Err(SubscribeError::Shutdown),
}
}
pub fn update_applied(&self, epoch: u64) {
self.watermarks.update_applied(epoch);
}
pub fn update_written(&self, epoch: u64) {
self.watermarks.update_written(epoch);
}
pub fn update_durable(&self, epoch: u64) {
self.watermarks.update_durable(epoch);
}
}
#[derive(Clone)]
pub struct ViewMonitor {
watcher: EpochWatcher,
#[allow(dead_code)]
watermarks: Arc<EpochWatermarks>,
}
impl ViewMonitor {
pub async fn wait(&mut self, epoch: u64, durability: Durability) -> Result<(), SubscribeError> {
self.watcher
.wait(epoch, durability)
.await
.map_err(|_| SubscribeError::Shutdown)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_pair() -> (Arc<EpochWatermarks>, ViewMonitor) {
let (watermarks, watcher) = EpochWatermarks::new();
let watermarks = Arc::new(watermarks);
let monitor = ViewMonitor {
watcher,
watermarks: watermarks.clone(),
};
(watermarks, monitor)
}
#[tokio::test]
async fn should_update_and_wait_applied() {
let (watermarks, mut monitor) = create_pair();
watermarks.update_applied(5);
monitor.wait(5, Durability::Applied).await.unwrap();
}
#[tokio::test]
async fn should_update_and_wait_flushed() {
let (watermarks, mut monitor) = create_pair();
watermarks.update_written(3);
monitor.wait(3, Durability::Written).await.unwrap();
}
#[tokio::test]
async fn should_update_and_wait_durable() {
let (watermarks, mut monitor) = create_pair();
watermarks.update_durable(7);
monitor.wait(7, Durability::Durable).await.unwrap();
}
#[tokio::test]
async fn should_wait_for_epoch_already_reached() {
let (watermarks, mut monitor) = create_pair();
watermarks.update_durable(10);
monitor.wait(5, Durability::Durable).await.unwrap();
}
#[tokio::test]
async fn should_track_levels_independently() {
let (watermarks, mut monitor) = create_pair();
watermarks.update_applied(3);
watermarks.update_written(2);
watermarks.update_durable(1);
monitor.wait(3, Durability::Applied).await.unwrap();
monitor.wait(2, Durability::Written).await.unwrap();
monitor.wait(1, Durability::Durable).await.unwrap();
}
}