use crate::coordinate::Region;
use crate::store::delivery::canal::{Canal, CanalBatch, CanalClosed};
use crate::store::write::fanout::{notification_matches_region, Notification};
use flume::{Receiver, RecvTimeoutError, TryRecvError};
use std::time::Duration;
pub struct Subscription {
rx: Receiver<Notification>,
region: Region,
}
impl Subscription {
pub(crate) fn new(rx: Receiver<Notification>, region: Region) -> Self {
Self { rx, region }
}
pub fn recv(&self) -> Option<Notification> {
loop {
match self.rx.recv() {
Ok(notif) => {
if notification_matches_region(&self.region, ¬if) {
return Some(notif);
}
}
Err(_) => return None, }
}
}
pub fn filtered_receiver(&self) -> &Receiver<Notification> {
&self.rx
}
#[doc(hidden)]
pub fn receiver(&self) -> &Receiver<Notification> {
&self.rx
}
pub fn ops(self) -> SubscriptionOps {
SubscriptionOps {
sub: self,
filters: Vec::new(),
map_fn: None,
limit: None,
count: 0,
}
}
}
impl Canal for Subscription {
type Item = Notification;
type Error = CanalClosed;
fn pull_batch(
&mut self,
max: usize,
deadline: Duration,
) -> Result<CanalBatch<Self::Item>, Self::Error> {
if max == 0 {
return Ok(CanalBatch::Empty);
}
let first = match self.rx.recv_timeout(deadline) {
Ok(notification) => notification,
Err(RecvTimeoutError::Timeout) => return Ok(CanalBatch::Empty),
Err(RecvTimeoutError::Disconnected) => return Err(CanalClosed),
};
if max == 1 {
return Ok(CanalBatch::One(first));
}
let mut rest = Vec::new();
while rest.len() + 1 < max {
match self.rx.try_recv() {
Ok(notification) => rest.push(notification),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Err(CanalClosed),
}
}
if rest.is_empty() {
Ok(CanalBatch::One(first))
} else {
let mut items = Vec::with_capacity(rest.len() + 1);
items.push(first);
items.extend(rest);
Ok(CanalBatch::Many(items))
}
}
}
type NotifFilter = Box<dyn Fn(&Notification) -> bool + Send>;
type NotifMapper = Box<dyn Fn(&Notification) -> Option<Notification> + Send>;
pub struct SubscriptionOps {
sub: Subscription,
filters: Vec<NotifFilter>,
map_fn: Option<NotifMapper>,
limit: Option<usize>,
count: usize,
}
impl SubscriptionOps {
pub fn filter<F: Fn(&Notification) -> bool + Send + 'static>(mut self, f: F) -> Self {
self.filters.push(Box::new(f));
self
}
pub fn map<F: Fn(&Notification) -> Option<Notification> + Send + 'static>(
mut self,
f: F,
) -> Self {
self.map_fn = Some(Box::new(f));
self
}
pub fn take(mut self, n: usize) -> Self {
self.limit = Some(n);
self
}
pub fn scan<S, F>(self, initial: S, f: F) -> ScanSubscriptionOps<S, F>
where
S: Clone + Send + 'static,
F: FnMut(&mut S, &Notification) -> Option<S> + Send + 'static,
{
ScanSubscriptionOps {
ops: self,
state: initial,
fold: f,
}
}
pub fn recv(&mut self) -> Option<Notification> {
if let Some(limit) = self.limit {
if self.count >= limit {
return None;
}
}
loop {
let notif = self.sub.recv()?;
if self.filters.iter().all(|f| f(¬if)) {
let result = if let Some(ref map_fn) = self.map_fn {
map_fn(¬if)
} else {
Some(notif)
};
if let Some(n) = result {
self.count += 1;
return Some(n);
}
}
}
}
}
pub struct ScanSubscriptionOps<S, F> {
ops: SubscriptionOps,
state: S,
fold: F,
}
impl<S, F> ScanSubscriptionOps<S, F>
where
S: Clone + Send + 'static,
F: FnMut(&mut S, &Notification) -> Option<S> + Send + 'static,
{
pub fn recv(&mut self) -> Option<S> {
loop {
let notif = self.ops.recv()?;
if let Some(next) = (self.fold)(&mut self.state, ¬if) {
self.state = next.clone();
return Some(next);
}
}
}
}