zenoh/api/queryable.rs
1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15 fmt,
16 future::{IntoFuture, Ready},
17 ops::{Deref, DerefMut},
18 sync::Arc,
19};
20
21use tracing::error;
22use zenoh_core::{Resolvable, Resolve, Wait};
23use zenoh_protocol::{
24 core::{EntityId, Parameters, WireExpr, ZenohIdProto},
25 network::{response, Mapping, RequestId, Response, ResponseFinal},
26 zenoh::{self, reply::ReplyBody, Del, Put, ResponseBody},
27};
28use zenoh_result::ZResult;
29#[zenoh_macros::unstable]
30use {
31 crate::api::query::ReplyKeyExpr, zenoh_config::wrappers::EntityGlobalId,
32 zenoh_protocol::core::EntityGlobalIdProto,
33};
34
35#[zenoh_macros::unstable]
36use crate::api::sample::SourceInfo;
37#[zenoh_macros::unstable]
38use crate::api::selector::ZenohParameters;
39#[zenoh_macros::internal]
40use crate::net::primitives::DummyPrimitives;
41use crate::{
42 api::{
43 builders::reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder},
44 bytes::ZBytes,
45 encoding::Encoding,
46 handlers::CallbackParameter,
47 key_expr::KeyExpr,
48 sample::{Locality, Sample, SampleKind},
49 selector::Selector,
50 session::{UndeclarableSealed, WeakSession},
51 Id,
52 },
53 handlers::Callback,
54 net::primitives::Primitives,
55};
56
57pub(crate) struct QueryInner {
58 pub(crate) key_expr: KeyExpr<'static>,
59 pub(crate) parameters: Parameters<'static>,
60 pub(crate) qid: RequestId,
61 pub(crate) zid: ZenohIdProto,
62 #[cfg(feature = "unstable")]
63 pub(crate) source_info: Option<SourceInfo>,
64 pub(crate) primitives: Arc<dyn Primitives>,
65}
66
67impl QueryInner {
68 #[zenoh_macros::internal]
69 fn empty() -> Self {
70 QueryInner {
71 key_expr: KeyExpr::dummy(),
72 parameters: Parameters::empty(),
73 qid: 0,
74 zid: ZenohIdProto::default(),
75 #[cfg(feature = "unstable")]
76 source_info: None,
77 primitives: Arc::new(DummyPrimitives),
78 }
79 }
80}
81
82impl Drop for QueryInner {
83 fn drop(&mut self) {
84 self.primitives.send_response_final(&mut ResponseFinal {
85 rid: self.qid,
86 ext_qos: response::ext::QoSType::RESPONSE_FINAL,
87 ext_tstamp: None,
88 });
89 }
90}
91
92/// The request received by a [`Queryable`].
93///
94/// The `Query` provides all data sent by [`Querier::get`](crate::query::Querier::get)
95/// or [`Session::get`](crate::Session::get): the key expression, the
96/// parameters, the payload, and the attachment, if any.
97///
98/// The reply to the query should be made with one of its methods:
99/// - [`Query::reply`](crate::query::Query::reply) to reply with a data [`Sample`](crate::sample::Sample) of kind [`Put`](crate::sample::SampleKind::Put),
100/// - [`Query::reply_del`](crate::query::Query::reply_del) to reply with a data [`Sample`](crate::sample::Sample) of kind [`Delete`](crate::sample::SampleKind::Delete),
101/// - [`Query::reply_err`](crate::query::Query::reply_err) to send an error reply.
102///
103/// The important detail: the [`Query::key_expr`] is **not** the key expression
104/// which should be used as the parameter of [`reply`](Query::reply), because it may contain globs.
105/// The [`Queryable`]'s key expression is the one that should be used.
106/// For example, the `Query` may contain the key expression `foo/*` and the reply
107/// should be sent with `foo/bar` or `foo/baz`, depending on the concrete querier.
108#[derive(Clone)]
109pub struct Query {
110 pub(crate) inner: Arc<QueryInner>,
111 pub(crate) eid: EntityId,
112 pub(crate) value: Option<(ZBytes, Encoding)>,
113 pub(crate) attachment: Option<ZBytes>,
114}
115
116impl Query {
117 /// The full [`Selector`] of this Query.
118 ///
119 /// # Examples
120 /// ```
121 /// # #[tokio::main]
122 /// # async fn main() {
123 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
124 /// let queryable = session
125 /// .declare_queryable("key/expression")
126 /// .callback(move |query| { println!("{}", query.selector()); })
127 /// .await
128 /// .unwrap();
129 /// # session.get("key/expression").await.unwrap();
130 /// # }
131 #[inline(always)]
132 pub fn selector(&self) -> Selector<'_> {
133 Selector::borrowed(&self.inner.key_expr, &self.inner.parameters)
134 }
135
136 /// The key selector part of this Query.
137 ///
138 /// # Examples
139 /// ```
140 /// # #[tokio::main]
141 /// # async fn main() {
142 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
143 /// let queryable = session
144 /// .declare_queryable("key/expression")
145 /// .callback(move |query| { println!("{}", query.key_expr()); })
146 /// .await
147 /// .unwrap();
148 /// # session.get("key/expression").await.unwrap();
149 /// # }
150 #[inline(always)]
151 pub fn key_expr(&self) -> &KeyExpr<'static> {
152 &self.inner.key_expr
153 }
154
155 /// This Query's selector parameters.
156 ///
157 /// # Examples
158 /// ```
159 /// # #[tokio::main]
160 /// # async fn main() {
161 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
162 /// let queryable = session
163 /// .declare_queryable("key/expression")
164 /// .callback(move |query| { println!("{}", query.parameters()); })
165 /// .await
166 /// .unwrap();
167 /// # session.get("key/expression").await.unwrap();
168 /// # }
169 #[inline(always)]
170 pub fn parameters(&self) -> &Parameters<'static> {
171 &self.inner.parameters
172 }
173
174 /// This Query's payload.
175 ///
176 /// # Examples
177 /// ```
178 /// # use zenoh::bytes::ZBytes;
179 /// # #[tokio::main]
180 /// # async fn main() {
181 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
182 /// let queryable = session
183 /// .declare_queryable("key/expression")
184 /// .callback(move |query| {
185 /// let payload: Option<&ZBytes> = query.payload();
186 /// })
187 /// .await
188 /// .unwrap();
189 /// # session.get("key/expression").await.unwrap();
190 /// # }
191 #[inline(always)]
192 pub fn payload(&self) -> Option<&ZBytes> {
193 self.value.as_ref().map(|v| &v.0)
194 }
195
196 /// This Query's payload (mutable).
197 ///
198 /// # Examples
199 /// ```
200 /// # use zenoh::bytes::ZBytes;
201 /// # #[tokio::main]
202 /// # async fn main() {
203 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
204 /// let queryable = session
205 /// .declare_queryable("key/expression")
206 /// .callback(move |mut query| {
207 /// let payload: Option<&mut ZBytes> = query.payload_mut();
208 /// })
209 /// .await
210 /// .unwrap();
211 /// # session.get("key/expression").await.unwrap();
212 /// # }
213 #[inline(always)]
214 pub fn payload_mut(&mut self) -> Option<&mut ZBytes> {
215 self.value.as_mut().map(|v| &mut v.0)
216 }
217
218 /// This Query's encoding.
219 ///
220 /// # Examples
221 /// ```
222 /// # #[tokio::main]
223 /// # async fn main() {
224 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
225 /// let queryable = session
226 /// .declare_queryable("key/expression")
227 /// .callback(move |query| { println!("{:?}", query.encoding()); })
228 /// .await
229 /// .unwrap();
230 /// # session.get("key/expression").await.unwrap();
231 /// # }
232 #[inline(always)]
233 pub fn encoding(&self) -> Option<&Encoding> {
234 self.value.as_ref().map(|v| &v.1)
235 }
236
237 /// This Query's attachment.
238 ///
239 /// # Examples
240 /// ```
241 /// # use zenoh::bytes::ZBytes;
242 /// # #[tokio::main]
243 /// # async fn main() {
244 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
245 /// let queryable = session
246 /// .declare_queryable("key/expression")
247 /// .callback(move |query| {
248 /// let attachment: Option<&ZBytes> = query.attachment();
249 /// })
250 /// .await
251 /// .unwrap();
252 /// # session.get("key/expression").await.unwrap();
253 /// # }
254 pub fn attachment(&self) -> Option<&ZBytes> {
255 self.attachment.as_ref()
256 }
257
258 /// This Query's attachment (mutable).
259 ///
260 /// # Examples
261 /// ```
262 /// # use zenoh::bytes::ZBytes;
263 /// # #[tokio::main]
264 /// # async fn main() {
265 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
266 /// let queryable = session
267 /// .declare_queryable("key/expression")
268 /// .callback(move |mut query| {
269 /// let attachment: Option<&mut ZBytes> = query.attachment_mut();
270 /// })
271 /// .await
272 /// .unwrap();
273 /// # session.get("key/expression").await.unwrap();
274 /// # }
275 pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> {
276 self.attachment.as_mut()
277 }
278
279 /// Gets info on the source of this Query.
280 #[zenoh_macros::unstable]
281 #[inline]
282 pub fn source_info(&self) -> Option<&SourceInfo> {
283 self.inner.source_info.as_ref()
284 }
285
286 /// Sends a reply in the form of [`Sample`] to this Query.
287 ///
288 /// This api is for internal use only.
289 ///
290 /// # Examples
291 /// ```
292 /// # use zenoh::sample::Sample;
293 /// # #[tokio::main]
294 /// # async fn main() {
295 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
296 /// let queryable = session
297 /// .declare_queryable("key/expression")
298 /// .callback(move |query| { query.reply_sample(Sample::empty()); })
299 /// .await
300 /// .unwrap();
301 /// # session.get("key/expression").await.unwrap();
302 /// # }
303 #[inline(always)]
304 #[zenoh_macros::internal]
305 pub fn reply_sample(&self, sample: Sample) -> ReplySample<'_> {
306 ReplySample {
307 query: self,
308 sample,
309 }
310 }
311
312 /// Sends a [`Sample`](crate::sample::Sample) of kind [`Put`](crate::sample::SampleKind::Put)
313 /// as a reply to this Query.
314 ///
315 /// By default, queries only accept replies whose key expression intersects with the query's.
316 /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
317 /// replying on a disjoint key expression will result in an error when resolving the reply.
318 #[inline(always)]
319 pub fn reply<'b, TryIntoKeyExpr, IntoZBytes>(
320 &self,
321 key_expr: TryIntoKeyExpr,
322 payload: IntoZBytes,
323 ) -> ReplyBuilder<'_, 'b, ReplyBuilderPut>
324 where
325 TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
326 <TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
327 IntoZBytes: Into<ZBytes>,
328 {
329 ReplyBuilder::<'_, 'b, ReplyBuilderPut>::new(self, key_expr, payload)
330 }
331
332 /// Sends a [`ReplyError`](crate::query::ReplyError) as a reply to this Query.
333 #[inline(always)]
334 pub fn reply_err<IntoZBytes>(&self, payload: IntoZBytes) -> ReplyErrBuilder<'_>
335 where
336 IntoZBytes: Into<ZBytes>,
337 {
338 ReplyErrBuilder::new(self, payload)
339 }
340
341 /// Sends a [`Sample`](crate::sample::Sample) of kind [`Delete`](crate::sample::SampleKind::Delete)
342 /// as a reply to this Query.
343 ///
344 /// By default, queries only accept replies whose key expression intersects with the query's.
345 /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
346 /// replying on a disjoint key expression will result in an error when resolving the reply.
347 #[inline(always)]
348 pub fn reply_del<'b, TryIntoKeyExpr>(
349 &self,
350 key_expr: TryIntoKeyExpr,
351 ) -> ReplyBuilder<'_, 'b, ReplyBuilderDelete>
352 where
353 TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
354 <TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
355 {
356 ReplyBuilder::<'_, 'b, ReplyBuilderDelete>::new(self, key_expr)
357 }
358
359 /// See details in [`ReplyKeyExpr`](crate::query::ReplyKeyExpr) documentation.
360 /// Queries may or may not accept replies on key expressions that do not intersect with their own key expression.
361 /// This getter allows you to check whether or not a specific query does so.
362 ///
363 /// # Examples
364 /// ```
365 /// # #[tokio::main]
366 /// # async fn main() {
367 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
368 /// let queryable = session
369 /// .declare_queryable("key/expression")
370 /// .callback(move |query| { query.accepts_replies(); })
371 /// .await
372 /// .unwrap();
373 /// # session.get("key/expression").await.unwrap();
374 /// # }
375 #[zenoh_macros::unstable]
376 pub fn accepts_replies(&self) -> ZResult<ReplyKeyExpr> {
377 self._accepts_any_replies().map(|any| {
378 if any {
379 ReplyKeyExpr::Any
380 } else {
381 ReplyKeyExpr::MatchingQuery
382 }
383 })
384 }
385 #[cfg(feature = "unstable")]
386 fn _accepts_any_replies(&self) -> ZResult<bool> {
387 Ok(self.parameters().reply_key_expr_any())
388 }
389
390 /// Constructs an empty Query without payload or attachment, referencing the same inner query.
391 ///
392 /// # Examples
393 /// ```
394 /// # fn main() {
395 /// let query = unsafe { zenoh::query::Query::empty() };
396 /// # }
397 #[zenoh_macros::internal]
398 pub unsafe fn empty() -> Self {
399 Query {
400 inner: Arc::new(QueryInner::empty()),
401 eid: 0,
402 value: None,
403 attachment: None,
404 }
405 }
406}
407
408impl fmt::Debug for Query {
409 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
410 f.debug_struct("Query")
411 .field("key_selector", &self.inner.key_expr)
412 .field("parameters", &self.inner.parameters)
413 .finish()
414 }
415}
416
417impl fmt::Display for Query {
418 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
419 f.debug_struct("Query")
420 .field(
421 "selector",
422 &format!("{}{}", &self.inner.key_expr, &self.inner.parameters),
423 )
424 .finish()
425 }
426}
427
428impl CallbackParameter for Query {
429 type Message<'a> = Self;
430
431 fn from_message(msg: Self::Message<'_>) -> Self {
432 msg
433 }
434}
435
436#[zenoh_macros::internal]
437pub struct ReplySample<'a> {
438 query: &'a Query,
439 sample: Sample,
440}
441
442#[zenoh_macros::internal]
443impl Resolvable for ReplySample<'_> {
444 type To = ZResult<()>;
445}
446
447#[zenoh_macros::internal]
448impl Wait for ReplySample<'_> {
449 fn wait(self) -> <Self as Resolvable>::To {
450 self.query._reply_sample(self.sample)
451 }
452}
453
454#[zenoh_macros::internal]
455impl IntoFuture for ReplySample<'_> {
456 type Output = <Self as Resolvable>::To;
457 type IntoFuture = Ready<<Self as Resolvable>::To>;
458
459 fn into_future(self) -> Self::IntoFuture {
460 std::future::ready(self.wait())
461 }
462}
463
464impl Query {
465 pub(crate) fn _reply_sample(&self, sample: Sample) -> ZResult<()> {
466 let c = zcondfeat!(
467 "unstable",
468 !self._accepts_any_replies().unwrap_or(false),
469 true
470 );
471 if c && !self.key_expr().intersects(&sample.key_expr) {
472 bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", sample.key_expr, self.key_expr())
473 }
474 #[cfg(not(feature = "unstable"))]
475 let ext_sinfo = None;
476 #[cfg(feature = "unstable")]
477 let ext_sinfo = sample.source_info.map(Into::into);
478 self.inner.primitives.send_response(&mut Response {
479 rid: self.inner.qid,
480 wire_expr: WireExpr {
481 scope: 0,
482 suffix: std::borrow::Cow::Owned(sample.key_expr.into()),
483 mapping: Mapping::Sender,
484 },
485 payload: ResponseBody::Reply(zenoh::Reply {
486 consolidation: zenoh::ConsolidationMode::DEFAULT,
487 ext_unknown: vec![],
488 payload: match sample.kind {
489 SampleKind::Put => ReplyBody::Put(Put {
490 timestamp: sample.timestamp,
491 encoding: sample.encoding.into(),
492 ext_sinfo,
493 #[cfg(feature = "shared-memory")]
494 ext_shm: None,
495 ext_attachment: sample.attachment.map(|a| a.into()),
496 ext_unknown: vec![],
497 payload: sample.payload.into(),
498 }),
499 SampleKind::Delete => ReplyBody::Del(Del {
500 timestamp: sample.timestamp,
501 ext_sinfo,
502 ext_attachment: sample.attachment.map(|a| a.into()),
503 ext_unknown: vec![],
504 }),
505 },
506 }),
507 ext_qos: sample.qos.into(),
508 ext_tstamp: None,
509 ext_respid: Some(response::ext::ResponderIdType {
510 zid: self.inner.zid,
511 eid: self.eid,
512 }),
513 });
514 Ok(())
515 }
516}
517pub(crate) struct QueryableState {
518 pub(crate) id: Id,
519 pub(crate) key_expr: KeyExpr<'static>,
520 pub(crate) complete: bool,
521 pub(crate) origin: Locality,
522 pub(crate) callback: Callback<Query>,
523}
524
525impl fmt::Debug for QueryableState {
526 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
527 f.debug_struct("Queryable")
528 .field("id", &self.id)
529 .field("key_expr", &self.key_expr)
530 .field("complete", &self.complete)
531 .finish()
532 }
533}
534
535#[derive(Debug)]
536pub(crate) struct QueryableInner {
537 pub(crate) session: WeakSession,
538 pub(crate) id: Id,
539 pub(crate) undeclare_on_drop: bool,
540 pub(crate) key_expr: KeyExpr<'static>,
541}
542
543/// A [`Resolvable`] returned when undeclaring a [`Queryable`].
544///
545/// # Examples
546/// ```
547/// # #[tokio::main]
548/// # async fn main() {
549///
550/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
551/// let queryable = session.declare_queryable("key/expression").await.unwrap();
552/// queryable.undeclare().await.unwrap();
553/// # }
554/// ```
555#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
556pub struct QueryableUndeclaration<Handler>(Queryable<Handler>);
557
558impl<Handler> Resolvable for QueryableUndeclaration<Handler> {
559 type To = ZResult<()>;
560}
561
562impl<Handler> Wait for QueryableUndeclaration<Handler> {
563 fn wait(mut self) -> <Self as Resolvable>::To {
564 self.0.undeclare_impl()
565 }
566}
567
568impl<Handler> IntoFuture for QueryableUndeclaration<Handler> {
569 type Output = <Self as Resolvable>::To;
570 type IntoFuture = Ready<<Self as Resolvable>::To>;
571
572 fn into_future(self) -> Self::IntoFuture {
573 std::future::ready(self.wait())
574 }
575}
576/// A `Queryable` is an entity that implements the query/reply pattern.
577///
578/// A `Queryable` is declared by the
579/// [`Session::declare_queryable`](crate::Session::declare_queryable) method
580/// and serves [`Query`](crate::query::Query) using callback
581/// or channel (see [handlers](crate::handlers) module documentation for details).
582///
583/// The `Queryable` receives [`Query`](crate::query::Query) requests from
584/// [`Querier::get`](crate::query::Querier::get) or from [`Session::get`](crate::Session::get)
585/// and sends back replies with the methods of the [`Query`](crate::query::Query): [`reply`](crate::query::Query::reply),
586/// [`reply_err`](crate::query::Query::reply_err) or [`reply_del`](crate::query::Query::reply_del).
587///
588/// # Examples
589///
590/// Using callback:
591/// ```
592/// # #[tokio::main]
593/// # async fn main() {
594/// use futures::prelude::*;
595///
596/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
597/// let queryable = session
598/// .declare_queryable("key/expression")
599/// .callback(move |query| {
600/// use crate::zenoh::Wait;
601/// println!(">> Handling query '{}'", query.selector());
602/// query.reply("key/expression", "value").wait().unwrap();
603/// # format!("{query}");
604/// # format!("{query:?}");
605/// })
606/// .await
607/// .unwrap();
608/// # format!("{queryable:?}");
609/// # session.get("key/expression").await.unwrap();
610/// # }
611/// ```
612///
613/// Using channel handler:
614/// ```no_run
615/// # #[tokio::main]
616/// # async fn main() {
617///
618/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
619/// let queryable = session
620/// .declare_queryable("key/expression")
621/// .await
622/// .unwrap();
623/// while let Ok(query) = queryable.recv_async().await {
624/// println!(">> Handling query '{}'", query.selector());
625/// query.reply("key/expression", "value").await.unwrap();
626/// }
627/// // queryable is undeclared at the end of the scope
628/// # }
629/// ```
630#[non_exhaustive]
631#[derive(Debug)]
632pub struct Queryable<Handler> {
633 pub(crate) inner: QueryableInner,
634 pub(crate) handler: Handler,
635}
636
637impl<Handler> Queryable<Handler> {
638 /// Returns the [`EntityGlobalId`] of this Queryable.
639 ///
640 /// # Examples
641 /// ```
642 /// # #[tokio::main]
643 /// # async fn main() {
644 ///
645 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
646 /// let queryable = session.declare_queryable("key/expression").await.unwrap();
647 /// let queryable_id = queryable.id();
648 /// # }
649 /// ```
650 #[zenoh_macros::unstable]
651 pub fn id(&self) -> EntityGlobalId {
652 EntityGlobalIdProto {
653 zid: self.inner.session.zid().into(),
654 eid: self.inner.id,
655 }
656 .into()
657 }
658
659 /// Returns a reference to this queryable's handler.
660 /// A handler is anything that implements [`IntoHandler`](crate::handlers::IntoHandler).
661 /// The default handler is [`DefaultHandler`](crate::handlers::DefaultHandler).
662 ///
663 /// # Examples
664 /// ```
665 /// # #[tokio::main]
666 /// # async fn main() {
667 ///
668 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
669 /// let queryable = session.declare_queryable("key/expression").await.unwrap();
670 /// let handler = queryable.handler();
671 /// # }
672 /// ```
673 pub fn handler(&self) -> &Handler {
674 &self.handler
675 }
676
677 /// Returns a mutable reference to this queryable's handler.
678 /// A handler is anything that implements [`IntoHandler`](crate::handlers::IntoHandler).
679 /// The default handler is [`DefaultHandler`](crate::handlers::DefaultHandler).
680 ///
681 /// # Examples
682 /// ```
683 /// # #[tokio::main]
684 /// # async fn main() {
685 ///
686 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
687 /// let mut queryable = session.declare_queryable("key/expression").await.unwrap();
688 /// let handler = queryable.handler_mut();
689 /// # }
690 /// ```
691 pub fn handler_mut(&mut self) -> &mut Handler {
692 &mut self.handler
693 }
694
695 /// Undeclare the [`Queryable`].
696 ///
697 /// # Examples
698 /// ```
699 /// # #[tokio::main]
700 /// # async fn main() {
701 ///
702 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
703 /// let queryable = session.declare_queryable("key/expression").await.unwrap();
704 /// queryable.undeclare().await.unwrap();
705 /// # }
706 /// ```
707 #[inline]
708 pub fn undeclare(self) -> impl Resolve<ZResult<()>>
709 where
710 Handler: Send,
711 {
712 UndeclarableSealed::undeclare_inner(self, ())
713 }
714
715 fn undeclare_impl(&mut self) -> ZResult<()> {
716 // set the flag first to avoid double panic if this function panic
717 self.inner.undeclare_on_drop = false;
718 self.inner.session.close_queryable(self.inner.id)
719 }
720
721 /// Make queryable run in background until the session is closed.
722 ///
723 /// # Examples
724 /// ```
725 /// # #[tokio::main]
726 /// # async fn main() {
727 ///
728 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
729 /// let mut queryable = session.declare_queryable("key/expression").await.unwrap();
730 /// queryable.set_background(true);
731 /// # }
732 /// ```
733 #[zenoh_macros::internal]
734 pub fn set_background(&mut self, background: bool) {
735 self.inner.undeclare_on_drop = !background;
736 }
737
738 /// Returns the [`KeyExpr`] this queryable responds to.
739 ///
740 /// # Examples
741 /// ```
742 /// # #[tokio::main]
743 /// # async fn main() {
744 ///
745 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
746 /// let queryable = session.declare_queryable("key/expression")
747 /// .await
748 /// .unwrap();
749 /// let key_expr = queryable.key_expr();
750 /// # }
751 /// ```
752 #[inline]
753 pub fn key_expr(&self) -> &KeyExpr<'static> {
754 &self.inner.key_expr
755 }
756}
757
758impl<Handler> Drop for Queryable<Handler> {
759 fn drop(&mut self) {
760 if self.inner.undeclare_on_drop {
761 if let Err(error) = self.undeclare_impl() {
762 error!(error);
763 }
764 }
765 }
766}
767
768impl<Handler: Send> UndeclarableSealed<()> for Queryable<Handler> {
769 type Undeclaration = QueryableUndeclaration<Handler>;
770
771 fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
772 QueryableUndeclaration(self)
773 }
774}
775
776impl<Handler> Deref for Queryable<Handler> {
777 type Target = Handler;
778
779 fn deref(&self) -> &Self::Target {
780 self.handler()
781 }
782}
783
784impl<Handler> DerefMut for Queryable<Handler> {
785 fn deref_mut(&mut self) -> &mut Self::Target {
786 self.handler_mut()
787 }
788}