Skip to main content

zenoh/api/builders/
querier.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    future::{IntoFuture, Ready},
16    time::Duration,
17};
18
19use zenoh_core::{Resolvable, Wait};
20use zenoh_protocol::{
21    core::{CongestionControl, Parameters},
22    network::request::ext::QueryTarget,
23};
24use zenoh_result::ZResult;
25
26use super::sample::QoSBuilderTrait;
27#[cfg(feature = "unstable")]
28use crate::api::cancellation::CancellationTokenBuilderTrait;
29#[cfg(feature = "unstable")]
30use crate::api::sample::SourceInfo;
31use crate::{
32    api::{
33        builders::sample::{EncodingBuilderTrait, SampleBuilderTrait},
34        bytes::ZBytes,
35        cancellation::SyncGroup,
36        encoding::Encoding,
37        handlers::{locked, Callback, DefaultHandler, IntoHandler},
38        querier::Querier,
39        sample::{Locality, QoSBuilder},
40        selector::REPLY_KEY_EXPR_ANY_SEL_PARAM,
41    },
42    bytes::OptionZBytes,
43    key_expr::KeyExpr,
44    qos::Priority,
45    query::{QueryConsolidation, Reply, ReplyKeyExpr},
46    Session,
47};
48
49/// A builder for initializing a [`Querier`](crate::query::Querier).
50/// Returned by the
51/// [`Session::declare_querier`](crate::Session::declare_querier) method.
52///
53/// # Examples
54/// ```
55/// # #[tokio::main]
56/// # async fn main() {
57/// use zenoh::{query::{ConsolidationMode, QueryTarget}};
58///
59/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
60/// let querier = session.declare_querier("key/expression")
61///     .target(QueryTarget::All)
62///     .consolidation(ConsolidationMode::None)
63///     .await
64///     .unwrap();
65/// let replies = querier.get()
66///     .parameters("value>1")
67///     .await
68///     .unwrap();
69/// while let Ok(reply) = replies.recv_async().await {
70///     println!("Received {:?}", reply.result())
71/// }
72/// # }
73/// ```
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
75#[derive(Debug)]
76pub struct QuerierBuilder<'a, 'b> {
77    pub(crate) session: &'a Session,
78    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
79    pub(crate) target: QueryTarget,
80    pub(crate) consolidation: QueryConsolidation,
81    pub(crate) qos: QoSBuilder,
82    pub(crate) destination: Locality,
83    pub(crate) timeout: Duration,
84    pub(crate) accept_replies: ReplyKeyExpr,
85}
86
87#[zenoh_macros::internal_trait]
88impl QoSBuilderTrait for QuerierBuilder<'_, '_> {
89    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
90        let qos = self.qos.congestion_control(congestion_control);
91        Self { qos, ..self }
92    }
93
94    fn priority(self, priority: Priority) -> Self {
95        let qos = self.qos.priority(priority);
96        Self { qos, ..self }
97    }
98
99    fn express(self, is_express: bool) -> Self {
100        let qos = self.qos.express(is_express);
101        Self { qos, ..self }
102    }
103}
104
105impl QuerierBuilder<'_, '_> {
106    /// Change the target(s) of the querier's queries.
107    ///
108    /// This method allows to specify whether the request should just return the
109    /// data available in the network which matches the key expression
110    /// ([QueryTarget::BestMatching], default) or if it should arrive to
111    /// all queryables matching the key expression ([QueryTarget::All],
112    /// [QueryTarget::AllComplete]).
113    ///
114    /// See also the [`complete`](crate::query::QueryableBuilder::complete) setting
115    /// of the [`Queryable`](crate::query::Queryable)
116    #[inline]
117    pub fn target(self, target: QueryTarget) -> Self {
118        Self { target, ..self }
119    }
120
121    /// Change the consolidation mode of the querier's queries.
122    ///
123    /// The multiple replies to a query may arrive from the network. The
124    /// [`ConsolidationMode`](crate::query::ConsolidationMode) enum defines
125    /// the strategies of filtering and reordering these replies.
126    /// The wrapper struct [`QueryConsolidation`](crate::query::QueryConsolidation)
127    /// allows to set an [`ConsolidationMode::AUTO`](crate::query::QueryConsolidation::AUTO)
128    /// mode, which lets the implementation choose the best strategy.
129    #[inline]
130    pub fn consolidation<QC: Into<QueryConsolidation>>(self, consolidation: QC) -> Self {
131        Self {
132            consolidation: consolidation.into(),
133            ..self
134        }
135    }
136
137    /// Restrict the matching queryables that will receive the queries
138    /// to the ones that have the given [`Locality`](Locality).
139    #[inline]
140    pub fn allowed_destination(self, destination: Locality) -> Self {
141        Self {
142            destination,
143            ..self
144        }
145    }
146
147    /// Set the query timeout.
148    #[inline]
149    pub fn timeout(self, timeout: Duration) -> Self {
150        Self { timeout, ..self }
151    }
152
153    /// See details in the [`ReplyKeyExpr`](crate::query::ReplyKeyExpr) documentation.
154    ///
155    /// Queries may or may not accept replies on key expressions that do not intersect with their own key expression.
156    /// This setter allows you to define whether this querier accepts such disjoint replies.
157    pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self {
158        Self {
159            accept_replies: accept,
160            ..self
161        }
162    }
163}
164
165impl<'b> Resolvable for QuerierBuilder<'_, 'b> {
166    type To = ZResult<Querier<'b>>;
167}
168
169impl Wait for QuerierBuilder<'_, '_> {
170    fn wait(self) -> <Self as Resolvable>::To {
171        let mut key_expr = self.key_expr?;
172        key_expr = self.session.declare_keyexpr(key_expr).wait()?;
173        let id = self
174            .session
175            .declare_querier_inner(key_expr.clone(), self.destination)?;
176        Ok(Querier {
177            session: self.session.downgrade(),
178            id,
179            key_expr,
180            qos: self.qos.into(),
181            destination: self.destination,
182            undeclare_on_drop: true,
183            target: self.target,
184            consolidation: self.consolidation,
185            timeout: self.timeout,
186            accept_replies: self.accept_replies,
187            matching_listeners: Default::default(),
188            callback_sync_group: SyncGroup::default(),
189        })
190    }
191}
192
193impl IntoFuture for QuerierBuilder<'_, '_> {
194    type Output = <Self as Resolvable>::To;
195    type IntoFuture = Ready<<Self as Resolvable>::To>;
196
197    fn into_future(self) -> Self::IntoFuture {
198        std::future::ready(self.wait())
199    }
200}
201
202/// A builder for configuring a [`get`](crate::query::Querier::get)
203/// operation from a [`Querier`](crate::query::Querier).
204/// The builder resolves to a [`handler`](crate::handlers) generating a series of
205/// [`Reply`](crate::api::query::Reply) for each response received.
206///
207/// # Examples
208/// ```
209/// # #[tokio::main]
210/// # async fn main() {
211/// use zenoh::{query::{ConsolidationMode, QueryTarget}};
212///
213/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
214/// let querier = session.declare_querier("key/expression")
215///     .target(QueryTarget::All)
216///     .consolidation(ConsolidationMode::None)
217///     .await
218///     .unwrap();
219/// let replies = querier
220///     .get()
221///     .parameters("value>1")
222///     .await
223///     .unwrap();
224/// while let Ok(reply) = replies.recv_async().await {
225///     println!("Received {:?}", reply.result())
226/// }
227/// # }
228/// ```
229#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
230#[derive(Debug)]
231pub struct QuerierGetBuilder<'a, 'b, Handler> {
232    pub(crate) querier: &'a Querier<'a>,
233    pub(crate) parameters: Parameters<'b>,
234    pub(crate) handler: Handler,
235    pub(crate) value: Option<(ZBytes, Encoding)>,
236    pub(crate) attachment: Option<ZBytes>,
237    #[cfg(feature = "unstable")]
238    pub(crate) source_info: Option<SourceInfo>,
239    #[cfg(feature = "unstable")]
240    pub(crate) cancellation_token: Option<crate::api::cancellation::CancellationToken>,
241}
242
243#[cfg(feature = "unstable")]
244#[zenoh_macros::internal_trait]
245impl<Handler> CancellationTokenBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
246    /// Provide a cancellation token that can be used later to interrupt GET operation.
247    ///
248    /// # Examples
249    /// ```
250    /// # #[tokio::main]
251    /// # async fn main() {
252    /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
253    ///
254    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
255    /// let querier = session.declare_querier("key/expression")
256    ///     .target(QueryTarget::All)
257    ///     .consolidation(ConsolidationMode::None)
258    ///     .await
259    ///     .unwrap();
260    /// let ct = zenoh::cancellation::CancellationToken::default();
261    /// let _ = querier
262    ///     .get()
263    ///     .callback(|reply| {println!("Received {:?}", reply.result());})
264    ///     .cancellation_token(ct.clone())
265    ///     .await
266    ///     .unwrap();
267    ///
268    /// tokio::task::spawn(async move {
269    ///     tokio::time::sleep(std::time::Duration::from_secs(10)).await;
270    ///     ct.cancel().await.unwrap();
271    /// });
272    /// # }
273    /// ```
274    #[zenoh_macros::unstable_doc]
275    fn cancellation_token(
276        self,
277        cancellation_token: crate::api::cancellation::CancellationToken,
278    ) -> Self {
279        Self {
280            cancellation_token: Some(cancellation_token),
281            ..self
282        }
283    }
284}
285
286#[zenoh_macros::internal_trait]
287impl<Handler> SampleBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
288    #[zenoh_macros::unstable]
289    fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self {
290        Self {
291            source_info: source_info.into(),
292            ..self
293        }
294    }
295
296    fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self {
297        let attachment: OptionZBytes = attachment.into();
298        Self {
299            attachment: attachment.into(),
300            ..self
301        }
302    }
303}
304
305#[zenoh_macros::internal_trait]
306impl<Handler> EncodingBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
307    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
308        let mut value = self.value.unwrap_or_default();
309        value.1 = encoding.into();
310        Self {
311            value: Some(value),
312            ..self
313        }
314    }
315}
316
317impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
318    /// Receive the replies for this query with a callback.
319    ///
320    /// # Examples
321    /// ```
322    /// # #[tokio::main]
323    /// # async fn main() {
324    /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
325    ///
326    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
327    /// let querier = session.declare_querier("key/expression")
328    ///     .target(QueryTarget::All)
329    ///     .consolidation(ConsolidationMode::None)
330    ///     .await
331    ///     .unwrap();
332    /// let _ = querier
333    ///     .get()
334    ///     .callback(|reply| {println!("Received {:?}", reply.result());})
335    ///     .await
336    ///     .unwrap();
337    /// # }
338    /// ```
339    #[inline]
340    pub fn callback<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
341    where
342        F: Fn(Reply) + Send + Sync + 'static,
343    {
344        self.with(Callback::from(callback))
345    }
346
347    /// Receive the replies for this query with a mutable callback.
348    ///
349    /// Using this guarantees that your callback will never be called concurrently.
350    /// If your callback is also accepted by the [`callback`](crate::query::QuerierGetBuilder::callback) method, we suggest you use it instead of `callback_mut`.
351    ///
352    /// # Examples
353    /// ```
354    /// # #[tokio::main]
355    /// # async fn main() {
356    /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
357    ///
358    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
359    /// let querier = session.declare_querier("key/expression")
360    ///     .target(QueryTarget::All)
361    ///     .consolidation(ConsolidationMode::None)
362    ///     .await
363    ///     .unwrap();
364    /// let mut n = 0;
365    /// let _ = querier
366    ///     .get()
367    ///     .callback_mut(move |reply| {n += 1;})
368    ///     .await
369    ///     .unwrap();
370    /// # }
371    /// ```
372    #[inline]
373    pub fn callback_mut<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
374    where
375        F: FnMut(Reply) + Send + Sync + 'static,
376    {
377        self.callback(locked(callback))
378    }
379
380    /// Receive the replies for this query with a [`Handler`](crate::handlers::IntoHandler).
381    ///
382    /// # Examples
383    /// ```
384    /// # #[tokio::main]
385    /// # async fn main() {
386    /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
387    ///
388    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
389    /// let querier = session.declare_querier("key/expression")
390    ///     .target(QueryTarget::All)
391    ///     .consolidation(ConsolidationMode::None)
392    ///     .await
393    ///     .unwrap();
394    /// let replies = querier
395    ///     .get()
396    ///     .with(flume::bounded(32))
397    ///     .await
398    ///     .unwrap();
399    /// while let Ok(reply) = replies.recv_async().await {
400    ///     println!("Received {:?}", reply.result());
401    /// }
402    /// # }
403    /// ```
404    #[inline]
405    pub fn with<Handler>(self, handler: Handler) -> QuerierGetBuilder<'a, 'b, Handler>
406    where
407        Handler: IntoHandler<Reply>,
408    {
409        let QuerierGetBuilder {
410            querier,
411            parameters,
412            value,
413            attachment,
414            #[cfg(feature = "unstable")]
415            source_info,
416            handler: _,
417            #[cfg(feature = "unstable")]
418            cancellation_token,
419        } = self;
420        QuerierGetBuilder {
421            querier,
422            parameters,
423            value,
424            attachment,
425            #[cfg(feature = "unstable")]
426            source_info,
427            handler,
428            #[cfg(feature = "unstable")]
429            cancellation_token,
430        }
431    }
432}
433impl<'b, Handler> QuerierGetBuilder<'_, 'b, Handler> {
434    /// Set the query payload.
435    #[inline]
436    pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
437    where
438        IntoZBytes: Into<ZBytes>,
439    {
440        let mut value = self.value.unwrap_or_default();
441        value.0 = payload.into();
442        self.value = Some(value);
443        self
444    }
445
446    /// Set the query selector parameters.
447    #[inline]
448    pub fn parameters<P>(mut self, parameters: P) -> Self
449    where
450        P: Into<Parameters<'b>>,
451    {
452        self.parameters = parameters.into();
453        self
454    }
455}
456
457impl<Handler> Resolvable for QuerierGetBuilder<'_, '_, Handler>
458where
459    Handler: IntoHandler<Reply> + Send,
460    Handler::Handler: Send,
461{
462    type To = ZResult<Handler::Handler>;
463}
464
465impl<Handler> Wait for QuerierGetBuilder<'_, '_, Handler>
466where
467    Handler: IntoHandler<Reply> + Send,
468    Handler::Handler: Send,
469{
470    fn wait(self) -> <Self as Resolvable>::To {
471        let (callback, receiver) = self.handler.into_handler();
472        #[allow(unused_mut)]
473        // mut is only needed when building with "unstable" feature, which might add extra internal parameters on top of the user-provided ones
474        let mut parameters = self.parameters.clone();
475        if self.querier.accept_replies() == ReplyKeyExpr::Any {
476            parameters.insert(REPLY_KEY_EXPR_ANY_SEL_PARAM, "");
477        }
478        self.querier.session.query(
479            &self.querier.key_expr,
480            &parameters,
481            self.querier.target,
482            self.querier.consolidation,
483            self.querier.qos,
484            self.querier.destination,
485            self.querier.timeout,
486            self.value,
487            self.attachment,
488            #[cfg(feature = "unstable")]
489            self.source_info,
490            callback,
491            #[cfg(feature = "unstable")]
492            self.cancellation_token,
493            Some(self.querier.id),
494            self.querier.callback_sync_group.notifier(),
495        )?;
496        Ok(receiver)
497    }
498}
499
500impl<Handler> IntoFuture for QuerierGetBuilder<'_, '_, Handler>
501where
502    Handler: IntoHandler<Reply> + Send,
503    Handler::Handler: Send,
504{
505    type Output = <Self as Resolvable>::To;
506    type IntoFuture = Ready<<Self as Resolvable>::To>;
507
508    fn into_future(self) -> Self::IntoFuture {
509        std::future::ready(self.wait())
510    }
511}