use {
super::ActiveChannelsMap,
crate::{
discovery::PeerEntry,
primitives::{IntoIterOrSingle, Tag},
},
core::{
fmt,
pin::Pin,
task::{Context, Poll},
},
futures::FutureExt,
std::{collections::BTreeSet, sync::Arc},
tokio::sync::watch,
tokio_util::sync::ReusableBoxFuture,
};
pub struct When {
pub(crate) online: watch::Receiver<bool>,
pub(crate) active: watch::Receiver<ActiveChannelsMap>,
}
impl Clone for When {
fn clone(&self) -> Self {
Self::new(self.active.clone(), self.online.clone())
}
}
impl When {
pub(crate) fn new(
mut active: watch::Receiver<ActiveChannelsMap>,
mut online: watch::Receiver<bool>,
) -> Self {
active.mark_changed();
online.mark_changed();
Self { online, active }
}
}
impl When {
pub fn online(&self) -> impl Future<Output = ()> + Send + Sync + 'static {
let mut online = self.online.clone();
async move {
if online.wait_for(|v| *v).await.is_err() {
core::future::pending::<()>().await;
}
}
}
pub fn is_online(&self) -> bool {
*self.online.borrow()
}
pub fn offline(&self) -> impl Future<Output = ()> + Send + Sync + 'static {
let mut online = self.online.clone();
async move {
let _ = online.wait_for(|v| !*v).await;
}
}
pub fn subscribed(&self) -> ChannelConditions {
let mut active = self.active.clone();
active.mark_changed();
ChannelConditions {
active: active.clone(),
min_peers: 1,
was_met: false,
is_inverse: false,
predicates: Vec::new(),
changed_fut: ReusableBoxFuture::new(Box::pin(async move {
let _ = active.changed().await;
})),
}
}
pub fn unsubscribed(&self) -> ChannelConditions {
self.subscribed().unmet()
}
}
pub struct ChannelConditions {
active: watch::Receiver<ActiveChannelsMap>,
min_peers: usize,
predicates: Vec<Arc<PeerPredicate>>,
was_met: bool,
is_inverse: bool,
changed_fut: ReusableBoxFuture<'static, ()>,
}
impl ChannelConditions {
#[must_use]
pub const fn minimum_of(mut self, min: usize) -> Self {
self.min_peers = min;
self
}
#[must_use]
pub fn with_tags<V>(self, tags: impl IntoIterOrSingle<Tag, V>) -> Self {
let tags: BTreeSet<Tag> = tags.iterator().into_iter().collect();
self.with_predicate(move |peer: &PeerEntry| tags.is_subset(peer.tags()))
}
#[must_use]
pub fn with_predicate<F>(mut self, predicate: F) -> Self
where
F: Fn(&PeerEntry) -> bool + Send + Sync + 'static,
{
self.predicates.push(Arc::new(predicate));
self
}
pub fn is_condition_met(&self) -> bool {
let matching_peers = self
.active
.borrow()
.values()
.filter(|handle| {
handle.is_connected()
&& self.predicates.iter().all(|pred| pred(&handle.peer))
})
.count();
(matching_peers >= self.min_peers) != self.is_inverse
}
#[must_use]
pub const fn unmet(self) -> Self {
let mut cloned = self;
cloned.is_inverse = true;
cloned
}
}
impl Clone for ChannelConditions {
fn clone(&self) -> Self {
let mut active = self.active.clone();
active.mark_changed();
Self {
active: active.clone(),
min_peers: self.min_peers,
predicates: self.predicates.clone(),
was_met: false,
is_inverse: self.is_inverse,
changed_fut: ReusableBoxFuture::new(Box::pin(async move {
let _ = active.changed().await;
})),
}
}
}
impl fmt::Debug for ChannelConditions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SubscriptionCondition")
.field("min_peers", &self.min_peers)
.field("predicates", &self.predicates.len())
.field("is_condition_met", &self.is_condition_met())
.finish_non_exhaustive()
}
}
impl Future for ChannelConditions {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
let condition_met = this.is_condition_met();
if condition_met && !this.was_met {
this.was_met = true;
return Poll::Ready(());
}
this.was_met = condition_met;
match this.changed_fut.poll_unpin(cx) {
Poll::Ready(()) => {
let mut receiver = this.active.clone();
this.changed_fut.set(Box::pin(async move {
let _ = receiver.changed().await;
}));
}
Poll::Pending => return Poll::Pending,
}
}
}
}
impl PartialEq<bool> for ChannelConditions {
fn eq(&self, other: &bool) -> bool {
self.is_condition_met() == *other
}
}
impl PartialEq<ChannelConditions> for bool {
fn eq(&self, other: &ChannelConditions) -> bool {
*self == other.is_condition_met()
}
}
type PeerPredicate = dyn Fn(&PeerEntry) -> bool + Send + Sync;