use std::{
future::{IntoFuture, Ready},
time::Duration,
};
use zenoh_core::{Resolvable, Wait};
use zenoh_protocol::{core::CongestionControl, network::request::ext::QueryTarget};
use zenoh_result::ZResult;
#[cfg(feature = "unstable")]
use crate::api::cancellation::CancellationTokenBuilderTrait;
#[cfg(feature = "unstable")]
use crate::api::sample::SourceInfo;
use crate::{
api::{
builders::sample::{EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait},
bytes::ZBytes,
encoding::Encoding,
handlers::{locked, Callback, DefaultHandler, IntoHandler},
publisher::Priority,
query::ReplyKeyExpr,
sample::{Locality, QoSBuilder},
selector::{Selector, REPLY_KEY_EXPR_ANY_SEL_PARAM},
session::Session,
},
bytes::OptionZBytes,
query::{QueryConsolidation, Reply},
};
#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
#[derive(Debug)]
pub struct SessionGetBuilder<'a, 'b, Handler> {
pub(crate) session: &'a Session,
pub(crate) selector: ZResult<Selector<'b>>,
pub(crate) target: QueryTarget,
pub(crate) consolidation: QueryConsolidation,
pub(crate) qos: QoSBuilder,
pub(crate) destination: Locality,
pub(crate) timeout: Duration,
pub(crate) handler: Handler,
pub(crate) value: Option<(ZBytes, Encoding)>,
pub(crate) attachment: Option<ZBytes>,
#[cfg(feature = "unstable")]
pub(crate) source_info: Option<SourceInfo>,
#[cfg(feature = "unstable")]
pub(crate) cancellation_token: Option<crate::api::cancellation::CancellationToken>,
}
#[zenoh_macros::internal_trait]
impl<Handler> SampleBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
#[zenoh_macros::unstable]
fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self {
Self {
source_info: source_info.into(),
..self
}
}
fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self {
let attachment: OptionZBytes = attachment.into();
Self {
attachment: attachment.into(),
..self
}
}
}
#[zenoh_macros::internal_trait]
impl<Handler> QoSBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let qos = self.qos.congestion_control(congestion_control);
Self { qos, ..self }
}
fn priority(self, priority: Priority) -> Self {
let qos = self.qos.priority(priority);
Self { qos, ..self }
}
fn express(self, is_express: bool) -> Self {
let qos = self.qos.express(is_express);
Self { qos, ..self }
}
}
#[zenoh_macros::internal_trait]
impl<Handler> EncodingBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
let mut value = self.value.unwrap_or_default();
value.1 = encoding.into();
Self {
value: Some(value),
..self
}
}
}
#[cfg(feature = "unstable")]
#[zenoh_macros::internal_trait]
impl<Handler> CancellationTokenBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
#[zenoh_macros::unstable_doc]
fn cancellation_token(
self,
cancellation_token: crate::api::cancellation::CancellationToken,
) -> Self {
Self {
cancellation_token: Some(cancellation_token),
..self
}
}
}
impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> {
#[inline]
pub fn callback<F>(self, callback: F) -> SessionGetBuilder<'a, 'b, Callback<Reply>>
where
F: Fn(Reply) + Send + Sync + 'static,
{
self.with(Callback::from(callback))
}
#[inline]
pub fn callback_mut<F>(self, callback: F) -> SessionGetBuilder<'a, 'b, Callback<Reply>>
where
F: FnMut(Reply) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
#[inline]
pub fn with<Handler>(self, handler: Handler) -> SessionGetBuilder<'a, 'b, Handler>
where
Handler: IntoHandler<Reply>,
{
let SessionGetBuilder {
session,
selector,
target,
consolidation,
qos,
destination,
timeout,
value,
attachment,
#[cfg(feature = "unstable")]
source_info,
handler: _,
#[cfg(feature = "unstable")]
cancellation_token,
} = self;
SessionGetBuilder {
session,
selector,
target,
consolidation,
qos,
destination,
timeout,
value,
attachment,
#[cfg(feature = "unstable")]
source_info,
handler,
#[cfg(feature = "unstable")]
cancellation_token,
}
}
}
impl<Handler> SessionGetBuilder<'_, '_, Handler> {
#[inline]
pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
where
IntoZBytes: Into<ZBytes>,
{
let mut value = self.value.unwrap_or_default();
value.0 = payload.into();
self.value = Some(value);
self
}
#[inline]
pub fn target(self, target: QueryTarget) -> Self {
Self { target, ..self }
}
#[inline]
pub fn consolidation<QC: Into<QueryConsolidation>>(self, consolidation: QC) -> Self {
Self {
consolidation: consolidation.into(),
..self
}
}
#[inline]
pub fn allowed_destination(self, destination: Locality) -> Self {
Self {
destination,
..self
}
}
#[inline]
pub fn timeout(self, timeout: Duration) -> Self {
Self { timeout, ..self }
}
pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self {
if accept == ReplyKeyExpr::Any {
if let Ok(Selector {
key_expr,
mut parameters,
}) = self.selector
{
parameters.to_mut().insert(REPLY_KEY_EXPR_ANY_SEL_PARAM, "");
let selector = Ok(Selector {
key_expr,
parameters,
});
return Self { selector, ..self };
}
}
self
}
}
impl<Handler> Resolvable for SessionGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
type To = ZResult<Handler::Handler>;
}
impl<Handler> Wait for SessionGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
let (callback, receiver) = self.handler.into_handler();
let Selector {
key_expr,
parameters,
} = self.selector?;
self.session.query(
&key_expr,
¶meters,
self.target,
self.consolidation,
self.qos.into(),
self.destination,
self.timeout,
self.value,
self.attachment,
#[cfg(feature = "unstable")]
self.source_info,
callback,
#[cfg(feature = "unstable")]
self.cancellation_token,
None,
None,
)?;
Ok(receiver)
}
}
impl<Handler> IntoFuture for SessionGetBuilder<'_, '_, Handler>
where
Handler: IntoHandler<Reply> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
}
}