use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use super::watcher::{AppliedIndexWatcher, WaitOutcome};
#[derive(Debug, Default)]
pub struct GroupAppliedWatchers {
inner: RwLock<HashMap<u64, Arc<AppliedIndexWatcher>>>,
}
impl GroupAppliedWatchers {
pub fn new() -> Self {
Self::default()
}
pub fn get_or_create(&self, group_id: u64) -> Arc<AppliedIndexWatcher> {
{
let r = self.inner.read().unwrap_or_else(|p| p.into_inner());
if let Some(w) = r.get(&group_id) {
return Arc::clone(w);
}
}
let mut w = self.inner.write().unwrap_or_else(|p| p.into_inner());
Arc::clone(
w.entry(group_id)
.or_insert_with(|| Arc::new(AppliedIndexWatcher::new())),
)
}
pub fn get(&self, group_id: u64) -> Option<Arc<AppliedIndexWatcher>> {
let r = self.inner.read().unwrap_or_else(|p| p.into_inner());
r.get(&group_id).map(Arc::clone)
}
pub fn bump(&self, group_id: u64, applied_index: u64) {
self.get_or_create(group_id).bump(applied_index);
}
pub fn remove(&self, group_id: u64) {
let removed = {
let mut w = self.inner.write().unwrap_or_else(|p| p.into_inner());
w.remove(&group_id)
};
if let Some(w) = removed {
w.close();
}
}
pub fn wait_for(&self, group_id: u64, target: u64, timeout: Duration) -> WaitOutcome {
self.get_or_create(group_id).wait_for(target, timeout)
}
pub fn len(&self) -> usize {
self.inner.read().unwrap_or_else(|p| p.into_inner()).len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn snapshot(&self) -> Vec<(u64, u64)> {
let r = self.inner.read().unwrap_or_else(|p| p.into_inner());
let mut v: Vec<(u64, u64)> = r.iter().map(|(gid, w)| (*gid, w.current())).collect();
v.sort_by_key(|(gid, _)| *gid);
v
}
pub fn group_ids(&self) -> Vec<u64> {
let r = self.inner.read().unwrap_or_else(|p| p.into_inner());
let mut v: Vec<u64> = r.keys().copied().collect();
v.sort();
v
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn separate_groups_have_independent_watermarks() {
let r = GroupAppliedWatchers::new();
r.bump(1, 5);
r.bump(2, 10);
assert_eq!(r.get(1).unwrap().current(), 5);
assert_eq!(r.get(2).unwrap().current(), 10);
}
#[test]
fn wait_returns_reached_after_bump() {
let r = Arc::new(GroupAppliedWatchers::new());
let r2 = r.clone();
let h = thread::spawn(move || r2.wait_for(7, 5, Duration::from_secs(2)));
thread::sleep(Duration::from_millis(20));
r.bump(7, 5);
assert_eq!(h.join().unwrap(), WaitOutcome::Reached);
}
#[test]
fn remove_wakes_waiter_with_group_gone() {
let r = Arc::new(GroupAppliedWatchers::new());
let r2 = r.clone();
r.get_or_create(3);
let h = thread::spawn(move || r2.wait_for(3, 99, Duration::from_secs(2)));
thread::sleep(Duration::from_millis(20));
r.remove(3);
assert_eq!(h.join().unwrap(), WaitOutcome::GroupGone);
}
#[test]
fn missing_group_creates_watcher_lazily() {
let r = GroupAppliedWatchers::new();
let outcome = r.wait_for(42, 1, Duration::from_millis(20));
assert_eq!(outcome, WaitOutcome::TimedOut);
assert!(r.get(42).is_some());
}
}