use core::fmt;
use std::{
collections::HashSet,
future::{IntoFuture, Ready},
sync::{Arc, Mutex},
time::Duration,
};
use tracing::error;
use zenoh_core::{Resolvable, Resolve, Wait};
use zenoh_protocol::{
core::{CongestionControl, Parameters},
network::request::ext::QueryTarget,
};
use zenoh_result::ZResult;
#[cfg(feature = "unstable")]
use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto};
use super::{
builders::querier::QuerierGetBuilder,
key_expr::KeyExpr,
query::QueryConsolidation,
sample::{Locality, QoS},
session::{UndeclarableSealed, WeakSession},
Id,
};
use crate::{
api::{
builders::matching_listener::MatchingListenerBuilder,
cancellation::SyncGroup,
handlers::DefaultHandler,
matching::{MatchingStatus, MatchingStatusType},
query::ReplyKeyExpr,
},
qos::Priority,
};
pub(crate) struct QuerierState {
pub(crate) id: Id,
pub(crate) remote_id: Id,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
}
#[derive(Debug)]
pub struct Querier<'a> {
pub(crate) session: WeakSession,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) qos: QoS,
pub(crate) destination: Locality,
pub(crate) target: QueryTarget,
pub(crate) consolidation: QueryConsolidation,
pub(crate) timeout: Duration,
pub(crate) accept_replies: ReplyKeyExpr,
pub(crate) undeclare_on_drop: bool,
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) callback_sync_group: SyncGroup,
}
impl fmt::Debug for QuerierState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Querier")
.field("id", &self.id)
.field("key_expr", &self.key_expr)
.finish()
}
}
impl<'a> Querier<'a> {
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
EntityGlobalIdProto {
zid: self.session.zid().into(),
eid: self.id,
}
.into()
}
#[inline]
pub fn key_expr(&self) -> &KeyExpr<'a> {
&self.key_expr
}
#[inline]
pub fn congestion_control(&self) -> CongestionControl {
self.qos.congestion_control()
}
#[inline]
pub fn priority(&self) -> Priority {
self.qos.priority()
}
#[inline]
pub fn accept_replies(&self) -> ReplyKeyExpr {
self.accept_replies
}
#[inline]
pub fn get(&self) -> QuerierGetBuilder<'_, '_, DefaultHandler> {
QuerierGetBuilder {
querier: self,
#[cfg(feature = "unstable")]
source_info: None,
value: None,
attachment: None,
parameters: Parameters::empty(),
handler: DefaultHandler::default(),
#[cfg(feature = "unstable")]
cancellation_token: None,
}
}
pub fn undeclare(self) -> QuerierUndeclaration<'a> {
UndeclarableSealed::undeclare_inner(self, ())
}
fn undeclare_impl(&mut self) -> ZResult<()> {
self.undeclare_on_drop = false;
let ids: Vec<Id> = zlock!(self.matching_listeners).drain().collect();
for id in ids {
self.session.undeclare_matches_listener_inner(id)?
}
self.session.undeclare_querier_inner(self.id)
}
pub fn matching_status(&self) -> impl Resolve<ZResult<MatchingStatus>> + '_ {
zenoh_core::ResolveFuture::new(async move {
self.session.matching_status(
self.key_expr(),
self.destination,
MatchingStatusType::Queryables(self.target == QueryTarget::AllComplete),
)
})
}
pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> {
MatchingListenerBuilder {
session: &self.session,
key_expr: &self.key_expr,
destination: self.destination,
matching_listeners: &self.matching_listeners,
matching_status_type: MatchingStatusType::Queryables(
self.target == QueryTarget::AllComplete,
),
handler: DefaultHandler::default(),
parent_callback_sync_group_notifier: self.callback_sync_group.notifier(),
}
}
#[zenoh_macros::internal]
pub fn session(&self) -> &WeakSession {
&self.session
}
}
impl<'a> UndeclarableSealed<()> for Querier<'a> {
type Undeclaration = QuerierUndeclaration<'a>;
fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
QuerierUndeclaration {
querier: self,
wait_callbacks: false,
}
}
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
pub struct QuerierUndeclaration<'a> {
querier: Querier<'a>,
wait_callbacks: bool,
}
impl<'a> QuerierUndeclaration<'a> {
#[zenoh_macros::internal_or_unstable]
pub fn wait_callbacks(mut self) -> Self {
self.wait_callbacks = true;
self
}
}
impl Resolvable for QuerierUndeclaration<'_> {
type To = ZResult<()>;
}
impl Wait for QuerierUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.querier.undeclare_impl()?;
if self.wait_callbacks {
self.querier.callback_sync_group.wait();
}
Ok(())
}
}
impl IntoFuture for QuerierUndeclaration<'_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
impl Drop for Querier<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
if let Err(error) = self.undeclare_impl() {
error!(error);
}
}
}
}