zenoh/api/builders/
querier.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    future::{IntoFuture, Ready},
16    time::Duration,
17};
18
19use zenoh_core::{Resolvable, Wait};
20use zenoh_protocol::{
21    core::{CongestionControl, Parameters},
22    network::request::ext::QueryTarget,
23};
24use zenoh_result::ZResult;
25
26use super::sample::QoSBuilderTrait;
27#[cfg(feature = "unstable")]
28use crate::api::query::ReplyKeyExpr;
29#[cfg(feature = "unstable")]
30use crate::api::sample::SourceInfo;
31#[cfg(feature = "unstable")]
32use crate::query::ZenohParameters;
33use crate::{
34    api::{
35        builders::sample::{EncodingBuilderTrait, SampleBuilderTrait},
36        bytes::ZBytes,
37        encoding::Encoding,
38        handlers::{locked, Callback, DefaultHandler, IntoHandler},
39        querier::Querier,
40        sample::{Locality, QoSBuilder},
41    },
42    bytes::OptionZBytes,
43    key_expr::KeyExpr,
44    qos::Priority,
45    query::{QueryConsolidation, Reply},
46    Session,
47};
48
49/// A builder for initializing a [`Querier`](crate::query::Querier).
50/// Returned by the
51/// [`Session::declare_querier`](crate::Session::declare_querier) method.
52///
53/// # Examples
54/// ```
55/// # #[tokio::main]
56/// # async fn main() {
57/// use zenoh::{query::{ConsolidationMode, QueryTarget}};
58///
59/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
60/// let querier = session.declare_querier("key/expression")
61///     .target(QueryTarget::All)
62///     .consolidation(ConsolidationMode::None)
63///     .await
64///     .unwrap();
65/// let replies = querier.get()
66///     .parameters("value>1")
67///     .await
68///     .unwrap();
69/// while let Ok(reply) = replies.recv_async().await {
70///     println!("Received {:?}", reply.result())
71/// }
72/// # }
73/// ```
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
75#[derive(Debug)]
76pub struct QuerierBuilder<'a, 'b> {
77    pub(crate) session: &'a Session,
78    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
79    pub(crate) target: QueryTarget,
80    pub(crate) consolidation: QueryConsolidation,
81    pub(crate) qos: QoSBuilder,
82    pub(crate) destination: Locality,
83    pub(crate) timeout: Duration,
84    #[cfg(feature = "unstable")]
85    pub(crate) accept_replies: ReplyKeyExpr,
86}
87
88#[zenoh_macros::internal_trait]
89impl QoSBuilderTrait for QuerierBuilder<'_, '_> {
90    fn congestion_control(self, congestion_control: CongestionControl) -> Self {
91        let qos = self.qos.congestion_control(congestion_control);
92        Self { qos, ..self }
93    }
94
95    fn priority(self, priority: Priority) -> Self {
96        let qos = self.qos.priority(priority);
97        Self { qos, ..self }
98    }
99
100    fn express(self, is_express: bool) -> Self {
101        let qos = self.qos.express(is_express);
102        Self { qos, ..self }
103    }
104}
105
106impl QuerierBuilder<'_, '_> {
107    /// Change the target(s) of the querier's queries.
108    ///
109    /// This method allows to specify whether the request should just return the
110    /// data available in the network which matches the key expression
111    /// ([QueryTarget::BestMatching], default) or if it should arrive to
112    /// all queryables matching the key expression ([QueryTarget::All],
113    /// [QueryTarget::AllComplete]).
114    ///
115    /// See also the [`complete`](crate::query::QueryableBuilder::complete) setting
116    /// of the [`Queryable`](crate::query::Queryable)
117    #[inline]
118    pub fn target(self, target: QueryTarget) -> Self {
119        Self { target, ..self }
120    }
121
122    /// Change the consolidation mode of the querier's queries.
123    ///
124    /// The multiple replies to a query may arrive from the network. The
125    /// [`ConsolidationMode`](crate::query::ConsolidationMode) enum defines
126    /// the strategies of filtering and reordering these replies.
127    /// The wrapper struct [`QueryConsolidation`](crate::query::QueryConsolidation)
128    /// allows to set an [`ConsolidationMode::AUTO`](crate::query::QueryConsolidation::AUTO)
129    /// mode, which lets the implementation choose the best strategy.
130    #[inline]
131    pub fn consolidation<QC: Into<QueryConsolidation>>(self, consolidation: QC) -> Self {
132        Self {
133            consolidation: consolidation.into(),
134            ..self
135        }
136    }
137
138    /// Restrict the matching queryables that will receive the queries
139    /// to the ones that have the given [`Locality`](Locality).
140    #[inline]
141    pub fn allowed_destination(self, destination: Locality) -> Self {
142        Self {
143            destination,
144            ..self
145        }
146    }
147
148    /// Set the query timeout.
149    #[inline]
150    pub fn timeout(self, timeout: Duration) -> Self {
151        Self { timeout, ..self }
152    }
153
154    /// See details in the [`ReplyKeyExpr`](crate::query::ReplyKeyExpr) documentation.
155    /// Queries may or may not accept replies on key expressions that do not intersect with their own key expression.
156    /// This setter allows you to define whether this querier accepts such disjoint replies.
157    #[zenoh_macros::unstable]
158    pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self {
159        Self {
160            accept_replies: accept,
161            ..self
162        }
163    }
164}
165
166impl<'b> Resolvable for QuerierBuilder<'_, 'b> {
167    type To = ZResult<Querier<'b>>;
168}
169
170impl Wait for QuerierBuilder<'_, '_> {
171    fn wait(self) -> <Self as Resolvable>::To {
172        let mut key_expr = self.key_expr?;
173        if !key_expr.is_fully_optimized(&self.session.0) {
174            key_expr = self.session.declare_keyexpr(key_expr).wait()?;
175        }
176        let id = self
177            .session
178            .0
179            .declare_querier_inner(key_expr.clone(), self.destination)?;
180        Ok(Querier {
181            session: self.session.downgrade(),
182            id,
183            key_expr,
184            qos: self.qos.into(),
185            destination: self.destination,
186            undeclare_on_drop: true,
187            target: self.target,
188            consolidation: self.consolidation,
189            timeout: self.timeout,
190            #[cfg(feature = "unstable")]
191            accept_replies: self.accept_replies,
192            matching_listeners: Default::default(),
193        })
194    }
195}
196
197impl IntoFuture for QuerierBuilder<'_, '_> {
198    type Output = <Self as Resolvable>::To;
199    type IntoFuture = Ready<<Self as Resolvable>::To>;
200
201    fn into_future(self) -> Self::IntoFuture {
202        std::future::ready(self.wait())
203    }
204}
205
206/// A builder for configuring a [`get`](crate::query::Querier::get)
207/// operation from a [`Querier`](crate::query::Querier).
208/// The builder resolves to a [`handler`](crate::handlers) generating a series of
209/// [`Reply`](crate::api::query::Reply) for each response received.
210///
211/// # Examples
212/// ```
213/// # #[tokio::main]
214/// # async fn main() {
215/// use zenoh::{query::{ConsolidationMode, QueryTarget}};
216///
217/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
218/// let querier = session.declare_querier("key/expression")
219///     .target(QueryTarget::All)
220///     .consolidation(ConsolidationMode::None)
221///     .await
222///     .unwrap();
223/// let replies = querier
224///     .get()
225///     .parameters("value>1")
226///     .await
227///     .unwrap();
228/// while let Ok(reply) = replies.recv_async().await {
229///     println!("Received {:?}", reply.result())
230/// }
231/// # }
232/// ```
233#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
234#[derive(Debug)]
235pub struct QuerierGetBuilder<'a, 'b, Handler> {
236    pub(crate) querier: &'a Querier<'a>,
237    pub(crate) parameters: Parameters<'b>,
238    pub(crate) handler: Handler,
239    pub(crate) value: Option<(ZBytes, Encoding)>,
240    pub(crate) attachment: Option<ZBytes>,
241    #[cfg(feature = "unstable")]
242    pub(crate) source_info: SourceInfo,
243}
244
245#[zenoh_macros::internal_trait]
246impl<Handler> SampleBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
247    #[zenoh_macros::unstable]
248    fn source_info(self, source_info: SourceInfo) -> Self {
249        Self {
250            source_info,
251            ..self
252        }
253    }
254
255    fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self {
256        let attachment: OptionZBytes = attachment.into();
257        Self {
258            attachment: attachment.into(),
259            ..self
260        }
261    }
262}
263
264#[zenoh_macros::internal_trait]
265impl<Handler> EncodingBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
266    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
267        let mut value = self.value.unwrap_or_default();
268        value.1 = encoding.into();
269        Self {
270            value: Some(value),
271            ..self
272        }
273    }
274}
275
276impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
277    /// Receive the replies for this query with a callback.
278    ///
279    /// # Examples
280    /// ```
281    /// # #[tokio::main]
282    /// # async fn main() {
283    /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
284    ///
285    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
286    /// let querier = session.declare_querier("key/expression")
287    ///     .target(QueryTarget::All)
288    ///     .consolidation(ConsolidationMode::None)
289    ///     .await
290    ///     .unwrap();
291    /// let _ = querier
292    ///     .get()
293    ///     .callback(|reply| {println!("Received {:?}", reply.result());})
294    ///     .await
295    ///     .unwrap();
296    /// # }
297    /// ```
298    #[inline]
299    pub fn callback<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
300    where
301        F: Fn(Reply) + Send + Sync + 'static,
302    {
303        self.with(Callback::from(callback))
304    }
305
306    /// Receive the replies for this query with a mutable callback.
307    ///
308    /// Using this guarantees that your callback will never be called concurrently.
309    /// If your callback is also accepted by the [`callback`](crate::query::QuerierGetBuilder::callback) method, we suggest you use it instead of `callback_mut`.
310    ///
311    /// # Examples
312    /// ```
313    /// # #[tokio::main]
314    /// # async fn main() {
315    /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
316    ///
317    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
318    /// let querier = session.declare_querier("key/expression")
319    ///     .target(QueryTarget::All)
320    ///     .consolidation(ConsolidationMode::None)
321    ///     .await
322    ///     .unwrap();
323    /// let mut n = 0;
324    /// let _ = querier
325    ///     .get()
326    ///     .callback_mut(move |reply| {n += 1;})
327    ///     .await
328    ///     .unwrap();
329    /// # }
330    /// ```
331    #[inline]
332    pub fn callback_mut<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
333    where
334        F: FnMut(Reply) + Send + Sync + 'static,
335    {
336        self.callback(locked(callback))
337    }
338
339    /// Receive the replies for this query with a [`Handler`](crate::handlers::IntoHandler).
340    ///
341    /// # Examples
342    /// ```
343    /// # #[tokio::main]
344    /// # async fn main() {
345    /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
346    ///
347    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
348    /// let querier = session.declare_querier("key/expression")
349    ///     .target(QueryTarget::All)
350    ///     .consolidation(ConsolidationMode::None)
351    ///     .await
352    ///     .unwrap();
353    /// let replies = querier
354    ///     .get()
355    ///     .with(flume::bounded(32))
356    ///     .await
357    ///     .unwrap();
358    /// while let Ok(reply) = replies.recv_async().await {
359    ///     println!("Received {:?}", reply.result());
360    /// }
361    /// # }
362    /// ```
363    #[inline]
364    pub fn with<Handler>(self, handler: Handler) -> QuerierGetBuilder<'a, 'b, Handler>
365    where
366        Handler: IntoHandler<Reply>,
367    {
368        let QuerierGetBuilder {
369            querier,
370            parameters,
371            value,
372            attachment,
373            #[cfg(feature = "unstable")]
374            source_info,
375            handler: _,
376        } = self;
377        QuerierGetBuilder {
378            querier,
379            parameters,
380            value,
381            attachment,
382            #[cfg(feature = "unstable")]
383            source_info,
384            handler,
385        }
386    }
387}
388impl<'b, Handler> QuerierGetBuilder<'_, 'b, Handler> {
389    /// Set the query payload.
390    #[inline]
391    pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
392    where
393        IntoZBytes: Into<ZBytes>,
394    {
395        let mut value = self.value.unwrap_or_default();
396        value.0 = payload.into();
397        self.value = Some(value);
398        self
399    }
400
401    /// Set the query selector parameters.
402    #[inline]
403    pub fn parameters<P>(mut self, parameters: P) -> Self
404    where
405        P: Into<Parameters<'b>>,
406    {
407        self.parameters = parameters.into();
408        self
409    }
410}
411
412impl<Handler> Resolvable for QuerierGetBuilder<'_, '_, Handler>
413where
414    Handler: IntoHandler<Reply> + Send,
415    Handler::Handler: Send,
416{
417    type To = ZResult<Handler::Handler>;
418}
419
420impl<Handler> Wait for QuerierGetBuilder<'_, '_, Handler>
421where
422    Handler: IntoHandler<Reply> + Send,
423    Handler::Handler: Send,
424{
425    fn wait(self) -> <Self as Resolvable>::To {
426        let (callback, receiver) = self.handler.into_handler();
427
428        #[allow(unused_mut)]
429        // mut is only needed when building with "unstable" feature, which might add extra internal parameters on top of the user-provided ones
430        let mut parameters = self.parameters.clone();
431        #[cfg(feature = "unstable")]
432        if self.querier.accept_replies() == ReplyKeyExpr::Any {
433            parameters.set_reply_key_expr_any();
434        }
435        self.querier
436            .session
437            .query(
438                &self.querier.key_expr,
439                &parameters,
440                self.querier.target,
441                self.querier.consolidation,
442                self.querier.qos,
443                self.querier.destination,
444                self.querier.timeout,
445                self.value,
446                self.attachment,
447                #[cfg(feature = "unstable")]
448                self.source_info,
449                callback,
450            )
451            .map(|_| receiver)
452    }
453}
454
455impl<Handler> IntoFuture for QuerierGetBuilder<'_, '_, Handler>
456where
457    Handler: IntoHandler<Reply> + Send,
458    Handler::Handler: Send,
459{
460    type Output = <Self as Resolvable>::To;
461    type IntoFuture = Ready<<Self as Resolvable>::To>;
462
463    fn into_future(self) -> Self::IntoFuture {
464        std::future::ready(self.wait())
465    }
466}