zenoh/api/builders/querier.rs
1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15 future::{IntoFuture, Ready},
16 time::Duration,
17};
18
19use zenoh_core::{Resolvable, Wait};
20use zenoh_protocol::{
21 core::{CongestionControl, Parameters},
22 network::request::ext::QueryTarget,
23};
24use zenoh_result::ZResult;
25
26use super::sample::QoSBuilderTrait;
27#[cfg(feature = "unstable")]
28use crate::api::query::ReplyKeyExpr;
29#[cfg(feature = "unstable")]
30use crate::api::sample::SourceInfo;
31#[cfg(feature = "unstable")]
32use crate::query::ZenohParameters;
33use crate::{
34 api::{
35 builders::sample::{EncodingBuilderTrait, SampleBuilderTrait},
36 bytes::ZBytes,
37 encoding::Encoding,
38 handlers::{locked, Callback, DefaultHandler, IntoHandler},
39 querier::Querier,
40 sample::{Locality, QoSBuilder},
41 },
42 bytes::OptionZBytes,
43 key_expr::KeyExpr,
44 qos::Priority,
45 query::{QueryConsolidation, Reply},
46 Session,
47};
48
49/// A builder for initializing a [`Querier`](crate::query::Querier).
50/// Returned by the
51/// [`Session::declare_querier`](crate::Session::declare_querier) method.
52///
53/// # Examples
54/// ```
55/// # #[tokio::main]
56/// # async fn main() {
57/// use zenoh::{query::{ConsolidationMode, QueryTarget}};
58///
59/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
60/// let querier = session.declare_querier("key/expression")
61/// .target(QueryTarget::All)
62/// .consolidation(ConsolidationMode::None)
63/// .await
64/// .unwrap();
65/// let replies = querier.get()
66/// .parameters("value>1")
67/// .await
68/// .unwrap();
69/// while let Ok(reply) = replies.recv_async().await {
70/// println!("Received {:?}", reply.result())
71/// }
72/// # }
73/// ```
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
75#[derive(Debug)]
76pub struct QuerierBuilder<'a, 'b> {
77 pub(crate) session: &'a Session,
78 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
79 pub(crate) target: QueryTarget,
80 pub(crate) consolidation: QueryConsolidation,
81 pub(crate) qos: QoSBuilder,
82 pub(crate) destination: Locality,
83 pub(crate) timeout: Duration,
84 #[cfg(feature = "unstable")]
85 pub(crate) accept_replies: ReplyKeyExpr,
86}
87
88#[zenoh_macros::internal_trait]
89impl QoSBuilderTrait for QuerierBuilder<'_, '_> {
90 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
91 let qos = self.qos.congestion_control(congestion_control);
92 Self { qos, ..self }
93 }
94
95 fn priority(self, priority: Priority) -> Self {
96 let qos = self.qos.priority(priority);
97 Self { qos, ..self }
98 }
99
100 fn express(self, is_express: bool) -> Self {
101 let qos = self.qos.express(is_express);
102 Self { qos, ..self }
103 }
104}
105
106impl QuerierBuilder<'_, '_> {
107 /// Change the target(s) of the querier's queries.
108 ///
109 /// This method allows to specify whether the request should just return the
110 /// data available in the network which matches the key expression
111 /// ([QueryTarget::BestMatching], default) or if it should arrive to
112 /// all queryables matching the key expression ([QueryTarget::All],
113 /// [QueryTarget::AllComplete]).
114 ///
115 /// See also the [`complete`](crate::query::QueryableBuilder::complete) setting
116 /// of the [`Queryable`](crate::query::Queryable)
117 #[inline]
118 pub fn target(self, target: QueryTarget) -> Self {
119 Self { target, ..self }
120 }
121
122 /// Change the consolidation mode of the querier's queries.
123 ///
124 /// The multiple replies to a query may arrive from the network. The
125 /// [`ConsolidationMode`](crate::query::ConsolidationMode) enum defines
126 /// the strategies of filtering and reordering these replies.
127 /// The wrapper struct [`QueryConsolidation`](crate::query::QueryConsolidation)
128 /// allows to set an [`ConsolidationMode::AUTO`](crate::query::QueryConsolidation::AUTO)
129 /// mode, which lets the implementation choose the best strategy.
130 #[inline]
131 pub fn consolidation<QC: Into<QueryConsolidation>>(self, consolidation: QC) -> Self {
132 Self {
133 consolidation: consolidation.into(),
134 ..self
135 }
136 }
137
138 /// Restrict the matching queryables that will receive the queries
139 /// to the ones that have the given [`Locality`](Locality).
140 #[inline]
141 pub fn allowed_destination(self, destination: Locality) -> Self {
142 Self {
143 destination,
144 ..self
145 }
146 }
147
148 /// Set the query timeout.
149 #[inline]
150 pub fn timeout(self, timeout: Duration) -> Self {
151 Self { timeout, ..self }
152 }
153
154 /// See details in the [`ReplyKeyExpr`](crate::query::ReplyKeyExpr) documentation.
155 /// Queries may or may not accept replies on key expressions that do not intersect with their own key expression.
156 /// This setter allows you to define whether this querier accepts such disjoint replies.
157 #[zenoh_macros::unstable]
158 pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self {
159 Self {
160 accept_replies: accept,
161 ..self
162 }
163 }
164}
165
166impl<'b> Resolvable for QuerierBuilder<'_, 'b> {
167 type To = ZResult<Querier<'b>>;
168}
169
170impl Wait for QuerierBuilder<'_, '_> {
171 fn wait(self) -> <Self as Resolvable>::To {
172 let mut key_expr = self.key_expr?;
173 if !key_expr.is_fully_optimized(&self.session.0) {
174 key_expr = self.session.declare_keyexpr(key_expr).wait()?;
175 }
176 let id = self
177 .session
178 .0
179 .declare_querier_inner(key_expr.clone(), self.destination)?;
180 Ok(Querier {
181 session: self.session.downgrade(),
182 id,
183 key_expr,
184 qos: self.qos.into(),
185 destination: self.destination,
186 undeclare_on_drop: true,
187 target: self.target,
188 consolidation: self.consolidation,
189 timeout: self.timeout,
190 #[cfg(feature = "unstable")]
191 accept_replies: self.accept_replies,
192 matching_listeners: Default::default(),
193 })
194 }
195}
196
197impl IntoFuture for QuerierBuilder<'_, '_> {
198 type Output = <Self as Resolvable>::To;
199 type IntoFuture = Ready<<Self as Resolvable>::To>;
200
201 fn into_future(self) -> Self::IntoFuture {
202 std::future::ready(self.wait())
203 }
204}
205
206/// A builder for configuring a [`get`](crate::query::Querier::get)
207/// operation from a [`Querier`](crate::query::Querier).
208/// The builder resolves to a [`handler`](crate::handlers) generating a series of
209/// [`Reply`](crate::api::query::Reply) for each response received.
210///
211/// # Examples
212/// ```
213/// # #[tokio::main]
214/// # async fn main() {
215/// use zenoh::{query::{ConsolidationMode, QueryTarget}};
216///
217/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
218/// let querier = session.declare_querier("key/expression")
219/// .target(QueryTarget::All)
220/// .consolidation(ConsolidationMode::None)
221/// .await
222/// .unwrap();
223/// let replies = querier
224/// .get()
225/// .parameters("value>1")
226/// .await
227/// .unwrap();
228/// while let Ok(reply) = replies.recv_async().await {
229/// println!("Received {:?}", reply.result())
230/// }
231/// # }
232/// ```
233#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
234#[derive(Debug)]
235pub struct QuerierGetBuilder<'a, 'b, Handler> {
236 pub(crate) querier: &'a Querier<'a>,
237 pub(crate) parameters: Parameters<'b>,
238 pub(crate) handler: Handler,
239 pub(crate) value: Option<(ZBytes, Encoding)>,
240 pub(crate) attachment: Option<ZBytes>,
241 #[cfg(feature = "unstable")]
242 pub(crate) source_info: SourceInfo,
243}
244
245#[zenoh_macros::internal_trait]
246impl<Handler> SampleBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
247 #[zenoh_macros::unstable]
248 fn source_info(self, source_info: SourceInfo) -> Self {
249 Self {
250 source_info,
251 ..self
252 }
253 }
254
255 fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self {
256 let attachment: OptionZBytes = attachment.into();
257 Self {
258 attachment: attachment.into(),
259 ..self
260 }
261 }
262}
263
264#[zenoh_macros::internal_trait]
265impl<Handler> EncodingBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
266 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
267 let mut value = self.value.unwrap_or_default();
268 value.1 = encoding.into();
269 Self {
270 value: Some(value),
271 ..self
272 }
273 }
274}
275
276impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
277 /// Receive the replies for this query with a callback.
278 ///
279 /// # Examples
280 /// ```
281 /// # #[tokio::main]
282 /// # async fn main() {
283 /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
284 ///
285 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
286 /// let querier = session.declare_querier("key/expression")
287 /// .target(QueryTarget::All)
288 /// .consolidation(ConsolidationMode::None)
289 /// .await
290 /// .unwrap();
291 /// let _ = querier
292 /// .get()
293 /// .callback(|reply| {println!("Received {:?}", reply.result());})
294 /// .await
295 /// .unwrap();
296 /// # }
297 /// ```
298 #[inline]
299 pub fn callback<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
300 where
301 F: Fn(Reply) + Send + Sync + 'static,
302 {
303 self.with(Callback::from(callback))
304 }
305
306 /// Receive the replies for this query with a mutable callback.
307 ///
308 /// Using this guarantees that your callback will never be called concurrently.
309 /// If your callback is also accepted by the [`callback`](crate::query::QuerierGetBuilder::callback) method, we suggest you use it instead of `callback_mut`.
310 ///
311 /// # Examples
312 /// ```
313 /// # #[tokio::main]
314 /// # async fn main() {
315 /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
316 ///
317 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
318 /// let querier = session.declare_querier("key/expression")
319 /// .target(QueryTarget::All)
320 /// .consolidation(ConsolidationMode::None)
321 /// .await
322 /// .unwrap();
323 /// let mut n = 0;
324 /// let _ = querier
325 /// .get()
326 /// .callback_mut(move |reply| {n += 1;})
327 /// .await
328 /// .unwrap();
329 /// # }
330 /// ```
331 #[inline]
332 pub fn callback_mut<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
333 where
334 F: FnMut(Reply) + Send + Sync + 'static,
335 {
336 self.callback(locked(callback))
337 }
338
339 /// Receive the replies for this query with a [`Handler`](crate::handlers::IntoHandler).
340 ///
341 /// # Examples
342 /// ```
343 /// # #[tokio::main]
344 /// # async fn main() {
345 /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
346 ///
347 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
348 /// let querier = session.declare_querier("key/expression")
349 /// .target(QueryTarget::All)
350 /// .consolidation(ConsolidationMode::None)
351 /// .await
352 /// .unwrap();
353 /// let replies = querier
354 /// .get()
355 /// .with(flume::bounded(32))
356 /// .await
357 /// .unwrap();
358 /// while let Ok(reply) = replies.recv_async().await {
359 /// println!("Received {:?}", reply.result());
360 /// }
361 /// # }
362 /// ```
363 #[inline]
364 pub fn with<Handler>(self, handler: Handler) -> QuerierGetBuilder<'a, 'b, Handler>
365 where
366 Handler: IntoHandler<Reply>,
367 {
368 let QuerierGetBuilder {
369 querier,
370 parameters,
371 value,
372 attachment,
373 #[cfg(feature = "unstable")]
374 source_info,
375 handler: _,
376 } = self;
377 QuerierGetBuilder {
378 querier,
379 parameters,
380 value,
381 attachment,
382 #[cfg(feature = "unstable")]
383 source_info,
384 handler,
385 }
386 }
387}
388impl<'b, Handler> QuerierGetBuilder<'_, 'b, Handler> {
389 /// Set the query payload.
390 #[inline]
391 pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
392 where
393 IntoZBytes: Into<ZBytes>,
394 {
395 let mut value = self.value.unwrap_or_default();
396 value.0 = payload.into();
397 self.value = Some(value);
398 self
399 }
400
401 /// Set the query selector parameters.
402 #[inline]
403 pub fn parameters<P>(mut self, parameters: P) -> Self
404 where
405 P: Into<Parameters<'b>>,
406 {
407 self.parameters = parameters.into();
408 self
409 }
410}
411
412impl<Handler> Resolvable for QuerierGetBuilder<'_, '_, Handler>
413where
414 Handler: IntoHandler<Reply> + Send,
415 Handler::Handler: Send,
416{
417 type To = ZResult<Handler::Handler>;
418}
419
420impl<Handler> Wait for QuerierGetBuilder<'_, '_, Handler>
421where
422 Handler: IntoHandler<Reply> + Send,
423 Handler::Handler: Send,
424{
425 fn wait(self) -> <Self as Resolvable>::To {
426 let (callback, receiver) = self.handler.into_handler();
427
428 #[allow(unused_mut)]
429 // mut is only needed when building with "unstable" feature, which might add extra internal parameters on top of the user-provided ones
430 let mut parameters = self.parameters.clone();
431 #[cfg(feature = "unstable")]
432 if self.querier.accept_replies() == ReplyKeyExpr::Any {
433 parameters.set_reply_key_expr_any();
434 }
435 self.querier
436 .session
437 .query(
438 &self.querier.key_expr,
439 ¶meters,
440 self.querier.target,
441 self.querier.consolidation,
442 self.querier.qos,
443 self.querier.destination,
444 self.querier.timeout,
445 self.value,
446 self.attachment,
447 #[cfg(feature = "unstable")]
448 self.source_info,
449 callback,
450 )
451 .map(|_| receiver)
452 }
453}
454
455impl<Handler> IntoFuture for QuerierGetBuilder<'_, '_, Handler>
456where
457 Handler: IntoHandler<Reply> + Send,
458 Handler::Handler: Send,
459{
460 type Output = <Self as Resolvable>::To;
461 type IntoFuture = Ready<<Self as Resolvable>::To>;
462
463 fn into_future(self) -> Self::IntoFuture {
464 std::future::ready(self.wait())
465 }
466}