use std::{
collections::{HashSet, VecDeque},
sync::{Arc, Mutex},
};
use tokio::sync::broadcast;
use crate::util::closed;
use super::Path;
#[derive(Clone)]
pub enum Announced {
Active(Path),
Ended(Path),
}
impl Announced {
pub fn path(&self) -> &Path {
match self {
Announced::Active(path) => path,
Announced::Ended(path) => path,
}
}
}
#[derive(Clone)]
pub struct AnnouncedProducer {
updates: broadcast::Sender<Announced>,
active: Arc<Mutex<HashSet<Path>>>,
closed: closed::Producer,
}
impl AnnouncedProducer {
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
updates: tx,
active: Default::default(),
closed: Default::default(),
}
}
pub fn announce(&mut self, path: Path) -> bool {
if self.active.lock().unwrap().insert(path.clone()) {
let announced = Announced::Active(path);
self.updates.send(announced).ok();
true
} else {
false
}
}
pub fn unannounce(&mut self, path: &Path) -> bool {
if self.active.lock().unwrap().remove(path) {
let announced = Announced::Ended(path.clone());
self.updates.send(announced).ok();
true
} else {
false
}
}
pub fn is_active(&self, path: &Path) -> bool {
self.active.lock().unwrap().contains(path)
}
pub fn subscribe(&self) -> AnnouncedConsumer {
self.subscribe_prefix(Path::default())
}
pub fn subscribe_prefix(&self, prefix: Path) -> AnnouncedConsumer {
AnnouncedConsumer::new(
prefix,
self.active.clone(),
self.updates.subscribe(),
self.closed.subscribe(),
)
}
pub async fn closed(&self) {
self.closed.unused().await
}
}
impl Default for AnnouncedProducer {
fn default() -> Self {
Self::new(32)
}
}
pub struct AnnouncedConsumer {
active: Arc<Mutex<HashSet<Path>>>,
pending: VecDeque<Announced>,
tracked: HashSet<Path>,
updates: broadcast::Receiver<Announced>,
prefix: Path,
_closed: closed::Consumer,
}
impl AnnouncedConsumer {
fn new(
prefix: Path,
active: Arc<Mutex<HashSet<Path>>>,
updates: broadcast::Receiver<Announced>,
closed: closed::Consumer,
) -> Self {
let pending = active
.lock()
.unwrap()
.iter()
.filter(|path| path.has_prefix(&prefix))
.cloned()
.map(Announced::Active)
.collect();
Self {
active,
pending,
updates,
prefix,
tracked: HashSet::new(),
_closed: closed,
}
}
pub async fn next(&mut self) -> Option<Announced> {
loop {
if let Some(announced) = self.pending.pop_front() {
match &announced {
Announced::Active(path) => self.tracked.insert(path.clone()),
Announced::Ended(path) => self.tracked.remove(path),
};
return Some(announced);
}
match self.updates.recv().await {
Ok(announced) => {
match &announced {
Announced::Active(path) => {
if !path.has_prefix(&self.prefix) {
continue;
}
self.tracked.insert(path.clone());
}
Announced::Ended(path) => {
if !self.tracked.remove(path) {
continue;
}
}
};
return Some(announced);
}
Err(broadcast::error::RecvError::Closed) => {
return match self.tracked.iter().next().cloned() {
Some(path) => {
self.tracked.remove(&path);
Some(Announced::Ended(path))
}
None => None,
};
}
Err(broadcast::error::RecvError::Lagged(_)) => {
self.updates.resubscribe();
let active: HashSet<Path> = self
.active
.lock()
.unwrap()
.iter()
.filter(|path| path.has_prefix(&self.prefix))
.cloned()
.collect();
self.pending.clear();
for removed in self.tracked.difference(&active) {
self.pending.push_back(Announced::Ended(removed.clone()));
}
for added in active.difference(&self.tracked) {
self.pending.push_back(Announced::Active(added.clone()));
}
}
}
}
}
pub fn prefix(&self) -> &Path {
&self.prefix
}
}