use std::{
future::{IntoFuture, Ready},
time::Duration,
};
use zenoh_core::{Resolvable, Wait};
use zenoh_protocol::{
core::{CongestionControl, Parameters},
network::request::ext::QueryTarget,
};
use zenoh_result::ZResult;
use super::sample::QoSBuilderTrait;
#[cfg(feature = "unstable")]
use crate::api::cancellation::CancellationTokenBuilderTrait;
#[cfg(feature = "unstable")]
use crate::api::sample::SourceInfo;
use crate::{
api::{
builders::sample::{EncodingBuilderTrait, SampleBuilderTrait},
bytes::ZBytes,
cancellation::SyncGroup,
encoding::Encoding,
handlers::{locked, Callback, DefaultHandler, IntoHandler},
querier::Querier,
sample::{Locality, QoSBuilder},
selector::REPLY_KEY_EXPR_ANY_SEL_PARAM,
},
bytes::OptionZBytes,
key_expr::KeyExpr,
qos::Priority,
query::{QueryConsolidation, Reply, ReplyKeyExpr},
Session,
};
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct QuerierBuilder<'a, 'b> {
pub(crate) session: &'a Session,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) target: QueryTarget,
pub(crate) consolidation: QueryConsolidation,
pub(crate) qos: QoSBuilder,
pub(crate) destination: Locality,
pub(crate) timeout: Duration,
pub(crate) accept_replies: ReplyKeyExpr,
}
#[zenoh_macros::internal_trait]
impl QoSBuilderTrait for QuerierBuilder<'_, '_> {
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let qos = self.qos.congestion_control(congestion_control);
Self { qos, ..self }
}
fn priority(self, priority: Priority) -> Self {
let qos = self.qos.priority(priority);
Self { qos, ..self }
}
fn express(self, is_express: bool) -> Self {
let qos = self.qos.express(is_express);
Self { qos, ..self }
}
}
impl QuerierBuilder<'_, '_> {
#[inline]
pub fn target(self, target: QueryTarget) -> Self {
Self { target, ..self }
}
#[inline]
pub fn consolidation<QC: Into<QueryConsolidation>>(self, consolidation: QC) -> Self {
Self {
consolidation: consolidation.into(),
..self
}
}
#[inline]
pub fn allowed_destination(self, destination: Locality) -> Self {
Self {
destination,
..self
}
}
#[inline]
pub fn timeout(self, timeout: Duration) -> Self {
Self { timeout, ..self }
}
pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self {
Self {
accept_replies: accept,
..self
}
}
}
impl<'b> Resolvable for QuerierBuilder<'_, 'b> {
type To = ZResult<Querier<'b>>;
}
impl Wait for QuerierBuilder<'_, '_> {
fn wait(self) -> <Self as Resolvable>::To {
let mut key_expr = self.key_expr?;
key_expr = self.session.declare_keyexpr(key_expr).wait()?;
let id = self
.session
.declare_querier_inner(key_expr.clone(), self.destination)?;
Ok(Querier {
session: self.session.downgrade(),
id,
key_expr,
qos: self.qos.into(),
destination: self.destination,
undeclare_on_drop: true,
target: self.target,
consolidation: self.consolidation,
timeout: self.timeout,
accept_replies: self.accept_replies,
matching_listeners: Default::default(),
callback_sync_group: SyncGroup::default(),
})
}
}
impl IntoFuture for QuerierBuilder<'_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct QuerierGetBuilder<'a, 'b, Handler> {
pub(crate) querier: &'a Querier<'a>,
pub(crate) parameters: Parameters<'b>,
pub(crate) handler: Handler,
pub(crate) value: Option<(ZBytes, Encoding)>,
pub(crate) attachment: Option<ZBytes>,
#[cfg(feature = "unstable")]
pub(crate) source_info: Option<SourceInfo>,
#[cfg(feature = "unstable")]
pub(crate) cancellation_token: Option<crate::api::cancellation::CancellationToken>,
}
#[cfg(feature = "unstable")]
#[zenoh_macros::internal_trait]
impl<Handler> CancellationTokenBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
#[zenoh_macros::unstable_doc]
fn cancellation_token(
self,
cancellation_token: crate::api::cancellation::CancellationToken,
) -> Self {
Self {
cancellation_token: Some(cancellation_token),
..self
}
}
}
#[zenoh_macros::internal_trait]
impl<Handler> SampleBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
#[zenoh_macros::unstable]
fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self {
Self {
source_info: source_info.into(),
..self
}
}
fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
attachment: attachment.into(),
..self
}
}
}
#[zenoh_macros::internal_trait]
impl<Handler> EncodingBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
let mut value = self.value.unwrap_or_default();
value.1 = encoding.into();
Self {
value: Some(value),
..self
}
}
}
impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
#[inline]
pub fn callback<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
where
F: Fn(Reply) + Send + Sync + 'static,
{
self.with(Callback::from(callback))
}
#[inline]
pub fn callback_mut<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
where
F: FnMut(Reply) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
#[inline]
pub fn with<Handler>(self, handler: Handler) -> QuerierGetBuilder<'a, 'b, Handler>
where
Handler: IntoHandler<Reply>,
{
let QuerierGetBuilder {
querier,
parameters,
value,
attachment,
#[cfg(feature = "unstable")]
source_info,
handler: _,
#[cfg(feature = "unstable")]
cancellation_token,
} = self;
QuerierGetBuilder {
querier,
parameters,
value,
attachment,
#[cfg(feature = "unstable")]
source_info,
handler,
#[cfg(feature = "unstable")]
cancellation_token,
}
}
}
impl<'b, Handler> QuerierGetBuilder<'_, 'b, Handler> {
#[inline]
pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
where
IntoZBytes: Into<ZBytes>,
{
let mut value = self.value.unwrap_or_default();
value.0 = payload.into();
self.value = Some(value);
self
}
#[inline]
pub fn parameters<P>(mut self, parameters: P) -> Self
where
P: Into<Parameters<'b>>,
{
self.parameters = parameters.into();
self
}
}
impl<Handler> Resolvable for QuerierGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
type To = ZResult<Handler::Handler>;
}
impl<Handler> Wait for QuerierGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
let (callback, receiver) = self.handler.into_handler();
#[allow(unused_mut)]
let mut parameters = self.parameters.clone();
if self.querier.accept_replies() == ReplyKeyExpr::Any {
parameters.insert(REPLY_KEY_EXPR_ANY_SEL_PARAM, "");
}
self.querier.session.query(
&self.querier.key_expr,
¶meters,
self.querier.target,
self.querier.consolidation,
self.querier.qos,
self.querier.destination,
self.querier.timeout,
self.value,
self.attachment,
#[cfg(feature = "unstable")]
self.source_info,
callback,
#[cfg(feature = "unstable")]
self.cancellation_token,
Some(self.querier.id),
self.querier.callback_sync_group.notifier(),
)?;
Ok(receiver)
}
}
impl<Handler> IntoFuture for QuerierGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}