use std::{
collections::HashSet,
fmt,
future::{IntoFuture, Ready},
sync::{Arc, Mutex},
};
use tracing::error;
use zenoh_core::{Resolvable, Wait};
use zenoh_result::ZResult;
use super::{
handlers::Callback,
key_expr::KeyExpr,
sample::Locality,
session::{UndeclarableSealed, WeakSession},
Id,
};
use crate::api::{cancellation::SyncGroup, handlers::CallbackParameter};
#[derive(Copy, Clone, Debug)]
pub struct MatchingStatus {
pub(crate) matching: bool,
}
impl CallbackParameter for MatchingStatus {
type Message<'a> = Self;
fn from_message(msg: Self::Message<'_>) -> Self {
msg
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) enum MatchingStatusType {
Subscribers,
Queryables(bool),
}
impl MatchingStatus {
pub fn matching(&self) -> bool {
self.matching
}
}
pub(crate) struct MatchingListenerState {
pub(crate) id: Id,
pub(crate) current: Mutex<bool>,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
pub(crate) match_type: MatchingStatusType,
pub(crate) callback: Callback<MatchingStatus>,
}
impl MatchingListenerState {
pub(crate) fn is_matching(&self, key_expr: &KeyExpr, match_type: MatchingStatusType) -> bool {
match match_type {
MatchingStatusType::Subscribers => {
self.match_type == MatchingStatusType::Subscribers
&& self.key_expr.intersects(key_expr)
}
MatchingStatusType::Queryables(false) => {
self.match_type == MatchingStatusType::Queryables(false)
&& self.key_expr.intersects(key_expr)
}
MatchingStatusType::Queryables(true) => {
(self.match_type == MatchingStatusType::Queryables(false)
&& self.key_expr.intersects(key_expr))
|| (self.match_type == MatchingStatusType::Queryables(true)
&& key_expr.includes(&self.key_expr))
}
}
}
}
impl fmt::Debug for MatchingListenerState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MatchingListener")
.field("id", &self.id)
.field("key_expr", &self.key_expr)
.field("match_type", &self.match_type)
.finish()
}
}
#[derive(Debug)]
pub(crate) struct MatchingListenerInner {
pub(crate) session: WeakSession,
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) id: Id,
pub(crate) undeclare_on_drop: bool,
}
#[derive(Debug)]
pub struct MatchingListener<Handler> {
pub(crate) inner: MatchingListenerInner,
pub(crate) handler: Handler,
pub(crate) callback_sync_group: SyncGroup,
}
impl<Handler> MatchingListener<Handler> {
#[inline]
pub fn undeclare(self) -> MatchingListenerUndeclaration<Handler>
where
Handler: Send,
{
self.undeclare_inner(())
}
fn undeclare_impl(&mut self) -> ZResult<()> {
self.inner.undeclare_on_drop = false;
zlock!(self.inner.matching_listeners).remove(&self.inner.id);
self.inner
.session
.undeclare_matches_listener_inner(self.inner.id)
}
pub fn handler(&self) -> &Handler {
&self.handler
}
pub fn handler_mut(&mut self) -> &mut Handler {
&mut self.handler
}
#[zenoh_macros::internal]
pub fn set_background(&mut self, background: bool) {
self.inner.undeclare_on_drop = !background;
}
}
impl<Handler> Drop for MatchingListener<Handler> {
fn drop(&mut self) {
if self.inner.undeclare_on_drop {
if let Err(error) = self.undeclare_impl() {
error!(error);
}
}
}
}
impl<Handler: Send> UndeclarableSealed<()> for MatchingListener<Handler> {
type Undeclaration = MatchingListenerUndeclaration<Handler>;
fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
MatchingListenerUndeclaration {
listener: self,
wait_callbacks: false,
}
}
}
impl<Handler> std::ops::Deref for MatchingListener<Handler> {
type Target = Handler;
fn deref(&self) -> &Self::Target {
&self.handler
}
}
impl<Handler> std::ops::DerefMut for MatchingListener<Handler> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.handler
}
}
pub struct MatchingListenerUndeclaration<Handler> {
listener: MatchingListener<Handler>,
wait_callbacks: bool,
}
impl<Handler> MatchingListenerUndeclaration<Handler> {
#[zenoh_macros::internal_or_unstable]
pub fn wait_callbacks(mut self) -> Self {
self.wait_callbacks = true;
self
}
}
impl<Handler> Resolvable for MatchingListenerUndeclaration<Handler> {
type To = ZResult<()>;
}
impl<Handler> Wait for MatchingListenerUndeclaration<Handler> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.listener.undeclare_impl()?;
if self.wait_callbacks {
self.listener.callback_sync_group.wait();
}
Ok(())
}
}
impl<Handler> IntoFuture for MatchingListenerUndeclaration<Handler> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}