zenoh/api/builders/querier.rs
1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15 future::{IntoFuture, Ready},
16 time::Duration,
17};
18
19use zenoh_core::{Resolvable, Wait};
20use zenoh_protocol::{
21 core::{CongestionControl, Parameters},
22 network::request::ext::QueryTarget,
23};
24use zenoh_result::ZResult;
25
26use super::sample::QoSBuilderTrait;
27#[cfg(feature = "unstable")]
28use crate::api::cancellation::CancellationTokenBuilderTrait;
29#[cfg(feature = "unstable")]
30use crate::api::sample::SourceInfo;
31use crate::{
32 api::{
33 builders::sample::{EncodingBuilderTrait, SampleBuilderTrait},
34 bytes::ZBytes,
35 cancellation::SyncGroup,
36 encoding::Encoding,
37 handlers::{locked, Callback, DefaultHandler, IntoHandler},
38 querier::Querier,
39 sample::{Locality, QoSBuilder},
40 selector::REPLY_KEY_EXPR_ANY_SEL_PARAM,
41 },
42 bytes::OptionZBytes,
43 key_expr::KeyExpr,
44 qos::Priority,
45 query::{QueryConsolidation, Reply, ReplyKeyExpr},
46 Session,
47};
48
49/// A builder for initializing a [`Querier`](crate::query::Querier).
50/// Returned by the
51/// [`Session::declare_querier`](crate::Session::declare_querier) method.
52///
53/// # Examples
54/// ```
55/// # #[tokio::main]
56/// # async fn main() {
57/// use zenoh::{query::{ConsolidationMode, QueryTarget}};
58///
59/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
60/// let querier = session.declare_querier("key/expression")
61/// .target(QueryTarget::All)
62/// .consolidation(ConsolidationMode::None)
63/// .await
64/// .unwrap();
65/// let replies = querier.get()
66/// .parameters("value>1")
67/// .await
68/// .unwrap();
69/// while let Ok(reply) = replies.recv_async().await {
70/// println!("Received {:?}", reply.result())
71/// }
72/// # }
73/// ```
74#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
75#[derive(Debug)]
76pub struct QuerierBuilder<'a, 'b> {
77 pub(crate) session: &'a Session,
78 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
79 pub(crate) target: QueryTarget,
80 pub(crate) consolidation: QueryConsolidation,
81 pub(crate) qos: QoSBuilder,
82 pub(crate) destination: Locality,
83 pub(crate) timeout: Duration,
84 pub(crate) accept_replies: ReplyKeyExpr,
85}
86
87#[zenoh_macros::internal_trait]
88impl QoSBuilderTrait for QuerierBuilder<'_, '_> {
89 fn congestion_control(self, congestion_control: CongestionControl) -> Self {
90 let qos = self.qos.congestion_control(congestion_control);
91 Self { qos, ..self }
92 }
93
94 fn priority(self, priority: Priority) -> Self {
95 let qos = self.qos.priority(priority);
96 Self { qos, ..self }
97 }
98
99 fn express(self, is_express: bool) -> Self {
100 let qos = self.qos.express(is_express);
101 Self { qos, ..self }
102 }
103}
104
105impl QuerierBuilder<'_, '_> {
106 /// Change the target(s) of the querier's queries.
107 ///
108 /// This method allows to specify whether the request should just return the
109 /// data available in the network which matches the key expression
110 /// ([QueryTarget::BestMatching], default) or if it should arrive to
111 /// all queryables matching the key expression ([QueryTarget::All],
112 /// [QueryTarget::AllComplete]).
113 ///
114 /// See also the [`complete`](crate::query::QueryableBuilder::complete) setting
115 /// of the [`Queryable`](crate::query::Queryable)
116 #[inline]
117 pub fn target(self, target: QueryTarget) -> Self {
118 Self { target, ..self }
119 }
120
121 /// Change the consolidation mode of the querier's queries.
122 ///
123 /// The multiple replies to a query may arrive from the network. The
124 /// [`ConsolidationMode`](crate::query::ConsolidationMode) enum defines
125 /// the strategies of filtering and reordering these replies.
126 /// The wrapper struct [`QueryConsolidation`](crate::query::QueryConsolidation)
127 /// allows to set an [`ConsolidationMode::AUTO`](crate::query::QueryConsolidation::AUTO)
128 /// mode, which lets the implementation choose the best strategy.
129 #[inline]
130 pub fn consolidation<QC: Into<QueryConsolidation>>(self, consolidation: QC) -> Self {
131 Self {
132 consolidation: consolidation.into(),
133 ..self
134 }
135 }
136
137 /// Restrict the matching queryables that will receive the queries
138 /// to the ones that have the given [`Locality`](Locality).
139 #[inline]
140 pub fn allowed_destination(self, destination: Locality) -> Self {
141 Self {
142 destination,
143 ..self
144 }
145 }
146
147 /// Set the query timeout.
148 #[inline]
149 pub fn timeout(self, timeout: Duration) -> Self {
150 Self { timeout, ..self }
151 }
152
153 /// See details in the [`ReplyKeyExpr`](crate::query::ReplyKeyExpr) documentation.
154 ///
155 /// Queries may or may not accept replies on key expressions that do not intersect with their own key expression.
156 /// This setter allows you to define whether this querier accepts such disjoint replies.
157 pub fn accept_replies(self, accept: ReplyKeyExpr) -> Self {
158 Self {
159 accept_replies: accept,
160 ..self
161 }
162 }
163}
164
165impl<'b> Resolvable for QuerierBuilder<'_, 'b> {
166 type To = ZResult<Querier<'b>>;
167}
168
169impl Wait for QuerierBuilder<'_, '_> {
170 fn wait(self) -> <Self as Resolvable>::To {
171 let mut key_expr = self.key_expr?;
172 key_expr = self.session.declare_keyexpr(key_expr).wait()?;
173 let id = self
174 .session
175 .declare_querier_inner(key_expr.clone(), self.destination)?;
176 Ok(Querier {
177 session: self.session.downgrade(),
178 id,
179 key_expr,
180 qos: self.qos.into(),
181 destination: self.destination,
182 undeclare_on_drop: true,
183 target: self.target,
184 consolidation: self.consolidation,
185 timeout: self.timeout,
186 accept_replies: self.accept_replies,
187 matching_listeners: Default::default(),
188 callback_sync_group: SyncGroup::default(),
189 })
190 }
191}
192
193impl IntoFuture for QuerierBuilder<'_, '_> {
194 type Output = <Self as Resolvable>::To;
195 type IntoFuture = Ready<<Self as Resolvable>::To>;
196
197 fn into_future(self) -> Self::IntoFuture {
198 std::future::ready(self.wait())
199 }
200}
201
202/// A builder for configuring a [`get`](crate::query::Querier::get)
203/// operation from a [`Querier`](crate::query::Querier).
204/// The builder resolves to a [`handler`](crate::handlers) generating a series of
205/// [`Reply`](crate::api::query::Reply) for each response received.
206///
207/// # Examples
208/// ```
209/// # #[tokio::main]
210/// # async fn main() {
211/// use zenoh::{query::{ConsolidationMode, QueryTarget}};
212///
213/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
214/// let querier = session.declare_querier("key/expression")
215/// .target(QueryTarget::All)
216/// .consolidation(ConsolidationMode::None)
217/// .await
218/// .unwrap();
219/// let replies = querier
220/// .get()
221/// .parameters("value>1")
222/// .await
223/// .unwrap();
224/// while let Ok(reply) = replies.recv_async().await {
225/// println!("Received {:?}", reply.result())
226/// }
227/// # }
228/// ```
229#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
230#[derive(Debug)]
231pub struct QuerierGetBuilder<'a, 'b, Handler> {
232 pub(crate) querier: &'a Querier<'a>,
233 pub(crate) parameters: Parameters<'b>,
234 pub(crate) handler: Handler,
235 pub(crate) value: Option<(ZBytes, Encoding)>,
236 pub(crate) attachment: Option<ZBytes>,
237 #[cfg(feature = "unstable")]
238 pub(crate) source_info: Option<SourceInfo>,
239 #[cfg(feature = "unstable")]
240 pub(crate) cancellation_token: Option<crate::api::cancellation::CancellationToken>,
241}
242
243#[cfg(feature = "unstable")]
244#[zenoh_macros::internal_trait]
245impl<Handler> CancellationTokenBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
246 /// Provide a cancellation token that can be used later to interrupt GET operation.
247 ///
248 /// # Examples
249 /// ```
250 /// # #[tokio::main]
251 /// # async fn main() {
252 /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
253 ///
254 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
255 /// let querier = session.declare_querier("key/expression")
256 /// .target(QueryTarget::All)
257 /// .consolidation(ConsolidationMode::None)
258 /// .await
259 /// .unwrap();
260 /// let ct = zenoh::cancellation::CancellationToken::default();
261 /// let _ = querier
262 /// .get()
263 /// .callback(|reply| {println!("Received {:?}", reply.result());})
264 /// .cancellation_token(ct.clone())
265 /// .await
266 /// .unwrap();
267 ///
268 /// tokio::task::spawn(async move {
269 /// tokio::time::sleep(std::time::Duration::from_secs(10)).await;
270 /// ct.cancel().await.unwrap();
271 /// });
272 /// # }
273 /// ```
274 #[zenoh_macros::unstable_doc]
275 fn cancellation_token(
276 self,
277 cancellation_token: crate::api::cancellation::CancellationToken,
278 ) -> Self {
279 Self {
280 cancellation_token: Some(cancellation_token),
281 ..self
282 }
283 }
284}
285
286#[zenoh_macros::internal_trait]
287impl<Handler> SampleBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
288 #[zenoh_macros::unstable]
289 fn source_info<T: Into<Option<SourceInfo>>>(self, source_info: T) -> Self {
290 Self {
291 source_info: source_info.into(),
292 ..self
293 }
294 }
295
296 fn attachment<T: Into<OptionZBytes>>(self, attachment: T) -> Self {
297 let attachment: OptionZBytes = attachment.into();
298 Self {
299 attachment: attachment.into(),
300 ..self
301 }
302 }
303}
304
305#[zenoh_macros::internal_trait]
306impl<Handler> EncodingBuilderTrait for QuerierGetBuilder<'_, '_, Handler> {
307 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
308 let mut value = self.value.unwrap_or_default();
309 value.1 = encoding.into();
310 Self {
311 value: Some(value),
312 ..self
313 }
314 }
315}
316
317impl<'a, 'b> QuerierGetBuilder<'a, 'b, DefaultHandler> {
318 /// Receive the replies for this query with a callback.
319 ///
320 /// # Examples
321 /// ```
322 /// # #[tokio::main]
323 /// # async fn main() {
324 /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
325 ///
326 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
327 /// let querier = session.declare_querier("key/expression")
328 /// .target(QueryTarget::All)
329 /// .consolidation(ConsolidationMode::None)
330 /// .await
331 /// .unwrap();
332 /// let _ = querier
333 /// .get()
334 /// .callback(|reply| {println!("Received {:?}", reply.result());})
335 /// .await
336 /// .unwrap();
337 /// # }
338 /// ```
339 #[inline]
340 pub fn callback<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
341 where
342 F: Fn(Reply) + Send + Sync + 'static,
343 {
344 self.with(Callback::from(callback))
345 }
346
347 /// Receive the replies for this query with a mutable callback.
348 ///
349 /// Using this guarantees that your callback will never be called concurrently.
350 /// If your callback is also accepted by the [`callback`](crate::query::QuerierGetBuilder::callback) method, we suggest you use it instead of `callback_mut`.
351 ///
352 /// # Examples
353 /// ```
354 /// # #[tokio::main]
355 /// # async fn main() {
356 /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
357 ///
358 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
359 /// let querier = session.declare_querier("key/expression")
360 /// .target(QueryTarget::All)
361 /// .consolidation(ConsolidationMode::None)
362 /// .await
363 /// .unwrap();
364 /// let mut n = 0;
365 /// let _ = querier
366 /// .get()
367 /// .callback_mut(move |reply| {n += 1;})
368 /// .await
369 /// .unwrap();
370 /// # }
371 /// ```
372 #[inline]
373 pub fn callback_mut<F>(self, callback: F) -> QuerierGetBuilder<'a, 'b, Callback<Reply>>
374 where
375 F: FnMut(Reply) + Send + Sync + 'static,
376 {
377 self.callback(locked(callback))
378 }
379
380 /// Receive the replies for this query with a [`Handler`](crate::handlers::IntoHandler).
381 ///
382 /// # Examples
383 /// ```
384 /// # #[tokio::main]
385 /// # async fn main() {
386 /// use zenoh::{query::{ConsolidationMode, QueryTarget}};
387 ///
388 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
389 /// let querier = session.declare_querier("key/expression")
390 /// .target(QueryTarget::All)
391 /// .consolidation(ConsolidationMode::None)
392 /// .await
393 /// .unwrap();
394 /// let replies = querier
395 /// .get()
396 /// .with(flume::bounded(32))
397 /// .await
398 /// .unwrap();
399 /// while let Ok(reply) = replies.recv_async().await {
400 /// println!("Received {:?}", reply.result());
401 /// }
402 /// # }
403 /// ```
404 #[inline]
405 pub fn with<Handler>(self, handler: Handler) -> QuerierGetBuilder<'a, 'b, Handler>
406 where
407 Handler: IntoHandler<Reply>,
408 {
409 let QuerierGetBuilder {
410 querier,
411 parameters,
412 value,
413 attachment,
414 #[cfg(feature = "unstable")]
415 source_info,
416 handler: _,
417 #[cfg(feature = "unstable")]
418 cancellation_token,
419 } = self;
420 QuerierGetBuilder {
421 querier,
422 parameters,
423 value,
424 attachment,
425 #[cfg(feature = "unstable")]
426 source_info,
427 handler,
428 #[cfg(feature = "unstable")]
429 cancellation_token,
430 }
431 }
432}
433impl<'b, Handler> QuerierGetBuilder<'_, 'b, Handler> {
434 /// Set the query payload.
435 #[inline]
436 pub fn payload<IntoZBytes>(mut self, payload: IntoZBytes) -> Self
437 where
438 IntoZBytes: Into<ZBytes>,
439 {
440 let mut value = self.value.unwrap_or_default();
441 value.0 = payload.into();
442 self.value = Some(value);
443 self
444 }
445
446 /// Set the query selector parameters.
447 #[inline]
448 pub fn parameters<P>(mut self, parameters: P) -> Self
449 where
450 P: Into<Parameters<'b>>,
451 {
452 self.parameters = parameters.into();
453 self
454 }
455}
456
457impl<Handler> Resolvable for QuerierGetBuilder<'_, '_, Handler>
458where
459 Handler: IntoHandler<Reply> + Send,
460 Handler::Handler: Send,
461{
462 type To = ZResult<Handler::Handler>;
463}
464
465impl<Handler> Wait for QuerierGetBuilder<'_, '_, Handler>
466where
467 Handler: IntoHandler<Reply> + Send,
468 Handler::Handler: Send,
469{
470 fn wait(self) -> <Self as Resolvable>::To {
471 let (callback, receiver) = self.handler.into_handler();
472 #[allow(unused_mut)]
473 // mut is only needed when building with "unstable" feature, which might add extra internal parameters on top of the user-provided ones
474 let mut parameters = self.parameters.clone();
475 if self.querier.accept_replies() == ReplyKeyExpr::Any {
476 parameters.insert(REPLY_KEY_EXPR_ANY_SEL_PARAM, "");
477 }
478 self.querier.session.query(
479 &self.querier.key_expr,
480 ¶meters,
481 self.querier.target,
482 self.querier.consolidation,
483 self.querier.qos,
484 self.querier.destination,
485 self.querier.timeout,
486 self.value,
487 self.attachment,
488 #[cfg(feature = "unstable")]
489 self.source_info,
490 callback,
491 #[cfg(feature = "unstable")]
492 self.cancellation_token,
493 Some(self.querier.id),
494 self.querier.callback_sync_group.notifier(),
495 )?;
496 Ok(receiver)
497 }
498}
499
500impl<Handler> IntoFuture for QuerierGetBuilder<'_, '_, Handler>
501where
502 Handler: IntoHandler<Reply> + Send,
503 Handler::Handler: Send,
504{
505 type Output = <Self as Resolvable>::To;
506 type IntoFuture = Ready<<Self as Resolvable>::To>;
507
508 fn into_future(self) -> Self::IntoFuture {
509 std::future::ready(self.wait())
510 }
511}