zenoh/api/
queryable.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    fmt,
16    future::{IntoFuture, Ready},
17    ops::{Deref, DerefMut},
18    sync::Arc,
19};
20
21use tracing::error;
22use zenoh_core::{Resolvable, Resolve, Wait};
23use zenoh_protocol::{
24    core::{EntityId, Parameters, WireExpr, ZenohIdProto},
25    network::{response, Mapping, RequestId, Response, ResponseFinal},
26    zenoh::{self, reply::ReplyBody, Del, Put, ResponseBody},
27};
28use zenoh_result::ZResult;
29#[zenoh_macros::unstable]
30use {
31    crate::api::query::ReplyKeyExpr, zenoh_config::wrappers::EntityGlobalId,
32    zenoh_protocol::core::EntityGlobalIdProto,
33};
34
35#[zenoh_macros::unstable]
36use crate::api::sample::SourceInfo;
37#[zenoh_macros::unstable]
38use crate::api::selector::ZenohParameters;
39#[zenoh_macros::internal]
40use crate::net::primitives::DummyPrimitives;
41use crate::{
42    api::{
43        builders::reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder},
44        bytes::ZBytes,
45        encoding::Encoding,
46        handlers::CallbackParameter,
47        key_expr::KeyExpr,
48        sample::{Locality, Sample, SampleKind},
49        selector::Selector,
50        session::{UndeclarableSealed, WeakSession},
51        Id,
52    },
53    handlers::Callback,
54    net::primitives::Primitives,
55};
56
57pub(crate) struct QueryInner {
58    pub(crate) key_expr: KeyExpr<'static>,
59    pub(crate) parameters: Parameters<'static>,
60    pub(crate) qid: RequestId,
61    pub(crate) zid: ZenohIdProto,
62    #[cfg(feature = "unstable")]
63    pub(crate) source_info: Option<SourceInfo>,
64    pub(crate) primitives: Arc<dyn Primitives>,
65}
66
67impl QueryInner {
68    #[zenoh_macros::internal]
69    fn empty() -> Self {
70        QueryInner {
71            key_expr: KeyExpr::dummy(),
72            parameters: Parameters::empty(),
73            qid: 0,
74            zid: ZenohIdProto::default(),
75            #[cfg(feature = "unstable")]
76            source_info: None,
77            primitives: Arc::new(DummyPrimitives),
78        }
79    }
80}
81
82impl Drop for QueryInner {
83    fn drop(&mut self) {
84        self.primitives.send_response_final(&mut ResponseFinal {
85            rid: self.qid,
86            ext_qos: response::ext::QoSType::RESPONSE_FINAL,
87            ext_tstamp: None,
88        });
89    }
90}
91
92/// The request received by a [`Queryable`].
93///
94/// The `Query` provides all data sent by [`Querier::get`](crate::query::Querier::get)
95/// or [`Session::get`](crate::Session::get): the key expression, the
96/// parameters, the payload, and the attachment, if any.
97///
98/// The reply to the query should be made with one of its methods:
99/// - [`Query::reply`](crate::query::Query::reply) to reply with a data [`Sample`](crate::sample::Sample) of kind [`Put`](crate::sample::SampleKind::Put),
100/// - [`Query::reply_del`](crate::query::Query::reply_del) to reply with a data [`Sample`](crate::sample::Sample) of kind [`Delete`](crate::sample::SampleKind::Delete),
101/// - [`Query::reply_err`](crate::query::Query::reply_err) to send an error reply.
102///
103/// The important detail: the [`Query::key_expr`] is **not** the key expression
104/// which should be used as the parameter of [`reply`](Query::reply), because it may contain globs.
105/// The [`Queryable`]'s key expression is the one that should be used.
106/// For example, the `Query` may contain the key expression `foo/*` and the reply
107/// should be sent with `foo/bar` or `foo/baz`, depending on the concrete querier.
108#[derive(Clone)]
109pub struct Query {
110    pub(crate) inner: Arc<QueryInner>,
111    pub(crate) eid: EntityId,
112    pub(crate) value: Option<(ZBytes, Encoding)>,
113    pub(crate) attachment: Option<ZBytes>,
114}
115
116impl Query {
117    /// The full [`Selector`] of this Query.
118    ///
119    /// # Examples
120    /// ```
121    /// # #[tokio::main]
122    /// # async fn main() {
123    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
124    /// let queryable = session
125    ///     .declare_queryable("key/expression")
126    ///     .callback(move |query| { println!("{}", query.selector()); })
127    ///     .await
128    ///     .unwrap();
129    /// # session.get("key/expression").await.unwrap();
130    /// # }
131    #[inline(always)]
132    pub fn selector(&self) -> Selector<'_> {
133        Selector::borrowed(&self.inner.key_expr, &self.inner.parameters)
134    }
135
136    /// The key selector part of this Query.
137    ///
138    /// # Examples
139    /// ```
140    /// # #[tokio::main]
141    /// # async fn main() {
142    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
143    /// let queryable = session
144    ///     .declare_queryable("key/expression")
145    ///     .callback(move |query| { println!("{}", query.key_expr()); })
146    ///     .await
147    ///     .unwrap();
148    /// # session.get("key/expression").await.unwrap();
149    /// # }
150    #[inline(always)]
151    pub fn key_expr(&self) -> &KeyExpr<'static> {
152        &self.inner.key_expr
153    }
154
155    /// This Query's selector parameters.
156    ///
157    /// # Examples
158    /// ```
159    /// # #[tokio::main]
160    /// # async fn main() {
161    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
162    /// let queryable = session
163    ///     .declare_queryable("key/expression")
164    ///     .callback(move |query| { println!("{}", query.parameters()); })
165    ///     .await
166    ///     .unwrap();
167    /// # session.get("key/expression").await.unwrap();
168    /// # }
169    #[inline(always)]
170    pub fn parameters(&self) -> &Parameters<'static> {
171        &self.inner.parameters
172    }
173
174    /// This Query's payload.
175    ///
176    /// # Examples
177    /// ```
178    /// # use zenoh::bytes::ZBytes;
179    /// # #[tokio::main]
180    /// # async fn main() {
181    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
182    /// let queryable = session
183    ///     .declare_queryable("key/expression")
184    ///     .callback(move |query| {
185    ///         let payload: Option<&ZBytes> = query.payload();
186    ///     })
187    ///     .await
188    ///     .unwrap();
189    /// # session.get("key/expression").await.unwrap();
190    /// # }
191    #[inline(always)]
192    pub fn payload(&self) -> Option<&ZBytes> {
193        self.value.as_ref().map(|v| &v.0)
194    }
195
196    /// This Query's payload (mutable).
197    ///
198    /// # Examples
199    /// ```
200    /// # use zenoh::bytes::ZBytes;
201    /// # #[tokio::main]
202    /// # async fn main() {
203    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
204    /// let queryable = session
205    ///     .declare_queryable("key/expression")
206    ///     .callback(move |mut query| {
207    ///         let payload: Option<&mut ZBytes> = query.payload_mut();
208    ///     })
209    ///     .await
210    ///     .unwrap();
211    /// # session.get("key/expression").await.unwrap();
212    /// # }
213    #[inline(always)]
214    pub fn payload_mut(&mut self) -> Option<&mut ZBytes> {
215        self.value.as_mut().map(|v| &mut v.0)
216    }
217
218    /// This Query's encoding.
219    ///
220    /// # Examples
221    /// ```
222    /// # #[tokio::main]
223    /// # async fn main() {
224    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
225    /// let queryable = session
226    ///     .declare_queryable("key/expression")
227    ///     .callback(move |query| { println!("{:?}", query.encoding()); })
228    ///     .await
229    ///     .unwrap();
230    /// # session.get("key/expression").await.unwrap();
231    /// # }
232    #[inline(always)]
233    pub fn encoding(&self) -> Option<&Encoding> {
234        self.value.as_ref().map(|v| &v.1)
235    }
236
237    /// This Query's attachment.
238    ///
239    /// # Examples
240    /// ```
241    /// # use zenoh::bytes::ZBytes;
242    /// # #[tokio::main]
243    /// # async fn main() {
244    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
245    /// let queryable = session
246    ///     .declare_queryable("key/expression")
247    ///     .callback(move |query| {
248    ///         let attachment: Option<&ZBytes> = query.attachment();
249    ///     })
250    ///     .await
251    ///     .unwrap();
252    /// # session.get("key/expression").await.unwrap();
253    /// # }
254    pub fn attachment(&self) -> Option<&ZBytes> {
255        self.attachment.as_ref()
256    }
257
258    /// This Query's attachment (mutable).
259    ///
260    /// # Examples
261    /// ```
262    /// # use zenoh::bytes::ZBytes;
263    /// # #[tokio::main]
264    /// # async fn main() {
265    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
266    /// let queryable = session
267    ///     .declare_queryable("key/expression")
268    ///     .callback(move |mut query| {
269    ///         let attachment: Option<&mut ZBytes> = query.attachment_mut();
270    ///     })
271    ///     .await
272    ///     .unwrap();
273    /// # session.get("key/expression").await.unwrap();
274    /// # }
275    pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> {
276        self.attachment.as_mut()
277    }
278
279    /// Gets info on the source of this Query.
280    #[zenoh_macros::unstable]
281    #[inline]
282    pub fn source_info(&self) -> Option<&SourceInfo> {
283        self.inner.source_info.as_ref()
284    }
285
286    /// Sends a reply in the form of [`Sample`] to this Query.
287    ///
288    /// This api is for internal use only.
289    ///
290    /// # Examples
291    /// ```
292    /// # use zenoh::sample::Sample;
293    /// # #[tokio::main]
294    /// # async fn main() {
295    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
296    /// let queryable = session
297    ///     .declare_queryable("key/expression")
298    ///     .callback(move |query| { query.reply_sample(Sample::empty()); })
299    ///     .await
300    ///     .unwrap();
301    /// # session.get("key/expression").await.unwrap();
302    /// # }
303    #[inline(always)]
304    #[zenoh_macros::internal]
305    pub fn reply_sample(&self, sample: Sample) -> ReplySample<'_> {
306        ReplySample {
307            query: self,
308            sample,
309        }
310    }
311
312    /// Sends a [`Sample`](crate::sample::Sample) of kind [`Put`](crate::sample::SampleKind::Put)
313    /// as a reply to this Query.
314    ///
315    /// By default, queries only accept replies whose key expression intersects with the query's.
316    /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
317    /// replying on a disjoint key expression will result in an error when resolving the reply.
318    #[inline(always)]
319    pub fn reply<'b, TryIntoKeyExpr, IntoZBytes>(
320        &self,
321        key_expr: TryIntoKeyExpr,
322        payload: IntoZBytes,
323    ) -> ReplyBuilder<'_, 'b, ReplyBuilderPut>
324    where
325        TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
326        <TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
327        IntoZBytes: Into<ZBytes>,
328    {
329        ReplyBuilder::<'_, 'b, ReplyBuilderPut>::new(self, key_expr, payload)
330    }
331
332    /// Sends a [`ReplyError`](crate::query::ReplyError) as a reply to this Query.
333    #[inline(always)]
334    pub fn reply_err<IntoZBytes>(&self, payload: IntoZBytes) -> ReplyErrBuilder<'_>
335    where
336        IntoZBytes: Into<ZBytes>,
337    {
338        ReplyErrBuilder::new(self, payload)
339    }
340
341    /// Sends a [`Sample`](crate::sample::Sample) of kind [`Delete`](crate::sample::SampleKind::Delete)
342    /// as a reply to this Query.
343    ///
344    /// By default, queries only accept replies whose key expression intersects with the query's.
345    /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
346    /// replying on a disjoint key expression will result in an error when resolving the reply.
347    #[inline(always)]
348    pub fn reply_del<'b, TryIntoKeyExpr>(
349        &self,
350        key_expr: TryIntoKeyExpr,
351    ) -> ReplyBuilder<'_, 'b, ReplyBuilderDelete>
352    where
353        TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
354        <TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
355    {
356        ReplyBuilder::<'_, 'b, ReplyBuilderDelete>::new(self, key_expr)
357    }
358
359    /// See details in [`ReplyKeyExpr`](crate::query::ReplyKeyExpr) documentation.
360    /// Queries may or may not accept replies on key expressions that do not intersect with their own key expression.
361    /// This getter allows you to check whether or not a specific query does so.
362    ///
363    /// # Examples
364    /// ```
365    /// # #[tokio::main]
366    /// # async fn main() {
367    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
368    /// let queryable = session
369    ///     .declare_queryable("key/expression")
370    ///     .callback(move |query| { query.accepts_replies(); })
371    ///     .await
372    ///     .unwrap();
373    /// # session.get("key/expression").await.unwrap();
374    /// # }
375    #[zenoh_macros::unstable]
376    pub fn accepts_replies(&self) -> ZResult<ReplyKeyExpr> {
377        self._accepts_any_replies().map(|any| {
378            if any {
379                ReplyKeyExpr::Any
380            } else {
381                ReplyKeyExpr::MatchingQuery
382            }
383        })
384    }
385    #[cfg(feature = "unstable")]
386    fn _accepts_any_replies(&self) -> ZResult<bool> {
387        Ok(self.parameters().reply_key_expr_any())
388    }
389
390    /// Constructs an empty Query without payload or attachment, referencing the same inner query.
391    ///
392    /// # Examples
393    /// ```
394    /// # fn main() {
395    /// let query = unsafe { zenoh::query::Query::empty() };
396    /// # }
397    #[zenoh_macros::internal]
398    pub unsafe fn empty() -> Self {
399        Query {
400            inner: Arc::new(QueryInner::empty()),
401            eid: 0,
402            value: None,
403            attachment: None,
404        }
405    }
406}
407
408impl fmt::Debug for Query {
409    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
410        f.debug_struct("Query")
411            .field("key_selector", &self.inner.key_expr)
412            .field("parameters", &self.inner.parameters)
413            .finish()
414    }
415}
416
417impl fmt::Display for Query {
418    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
419        f.debug_struct("Query")
420            .field(
421                "selector",
422                &format!("{}{}", &self.inner.key_expr, &self.inner.parameters),
423            )
424            .finish()
425    }
426}
427
428impl CallbackParameter for Query {
429    type Message<'a> = Self;
430
431    fn from_message(msg: Self::Message<'_>) -> Self {
432        msg
433    }
434}
435
436#[zenoh_macros::internal]
437pub struct ReplySample<'a> {
438    query: &'a Query,
439    sample: Sample,
440}
441
442#[zenoh_macros::internal]
443impl Resolvable for ReplySample<'_> {
444    type To = ZResult<()>;
445}
446
447#[zenoh_macros::internal]
448impl Wait for ReplySample<'_> {
449    fn wait(self) -> <Self as Resolvable>::To {
450        self.query._reply_sample(self.sample)
451    }
452}
453
454#[zenoh_macros::internal]
455impl IntoFuture for ReplySample<'_> {
456    type Output = <Self as Resolvable>::To;
457    type IntoFuture = Ready<<Self as Resolvable>::To>;
458
459    fn into_future(self) -> Self::IntoFuture {
460        std::future::ready(self.wait())
461    }
462}
463
464impl Query {
465    pub(crate) fn _reply_sample(&self, sample: Sample) -> ZResult<()> {
466        let c = zcondfeat!(
467            "unstable",
468            !self._accepts_any_replies().unwrap_or(false),
469            true
470        );
471        if c && !self.key_expr().intersects(&sample.key_expr) {
472            bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", sample.key_expr, self.key_expr())
473        }
474        #[cfg(not(feature = "unstable"))]
475        let ext_sinfo = None;
476        #[cfg(feature = "unstable")]
477        let ext_sinfo = sample.source_info.map(Into::into);
478        self.inner.primitives.send_response(&mut Response {
479            rid: self.inner.qid,
480            wire_expr: WireExpr {
481                scope: 0,
482                suffix: std::borrow::Cow::Owned(sample.key_expr.into()),
483                mapping: Mapping::Sender,
484            },
485            payload: ResponseBody::Reply(zenoh::Reply {
486                consolidation: zenoh::ConsolidationMode::DEFAULT,
487                ext_unknown: vec![],
488                payload: match sample.kind {
489                    SampleKind::Put => ReplyBody::Put(Put {
490                        timestamp: sample.timestamp,
491                        encoding: sample.encoding.into(),
492                        ext_sinfo,
493                        #[cfg(feature = "shared-memory")]
494                        ext_shm: None,
495                        ext_attachment: sample.attachment.map(|a| a.into()),
496                        ext_unknown: vec![],
497                        payload: sample.payload.into(),
498                    }),
499                    SampleKind::Delete => ReplyBody::Del(Del {
500                        timestamp: sample.timestamp,
501                        ext_sinfo,
502                        ext_attachment: sample.attachment.map(|a| a.into()),
503                        ext_unknown: vec![],
504                    }),
505                },
506            }),
507            ext_qos: sample.qos.into(),
508            ext_tstamp: None,
509            ext_respid: Some(response::ext::ResponderIdType {
510                zid: self.inner.zid,
511                eid: self.eid,
512            }),
513        });
514        Ok(())
515    }
516}
517pub(crate) struct QueryableState {
518    pub(crate) id: Id,
519    pub(crate) key_expr: KeyExpr<'static>,
520    pub(crate) complete: bool,
521    pub(crate) origin: Locality,
522    pub(crate) callback: Callback<Query>,
523}
524
525impl fmt::Debug for QueryableState {
526    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
527        f.debug_struct("Queryable")
528            .field("id", &self.id)
529            .field("key_expr", &self.key_expr)
530            .field("complete", &self.complete)
531            .finish()
532    }
533}
534
535#[derive(Debug)]
536pub(crate) struct QueryableInner {
537    pub(crate) session: WeakSession,
538    pub(crate) id: Id,
539    pub(crate) undeclare_on_drop: bool,
540    pub(crate) key_expr: KeyExpr<'static>,
541}
542
543/// A [`Resolvable`] returned when undeclaring a [`Queryable`].
544///
545/// # Examples
546/// ```
547/// # #[tokio::main]
548/// # async fn main() {
549///
550/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
551/// let queryable = session.declare_queryable("key/expression").await.unwrap();
552/// queryable.undeclare().await.unwrap();
553/// # }
554/// ```
555#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
556pub struct QueryableUndeclaration<Handler>(Queryable<Handler>);
557
558impl<Handler> Resolvable for QueryableUndeclaration<Handler> {
559    type To = ZResult<()>;
560}
561
562impl<Handler> Wait for QueryableUndeclaration<Handler> {
563    fn wait(mut self) -> <Self as Resolvable>::To {
564        self.0.undeclare_impl()
565    }
566}
567
568impl<Handler> IntoFuture for QueryableUndeclaration<Handler> {
569    type Output = <Self as Resolvable>::To;
570    type IntoFuture = Ready<<Self as Resolvable>::To>;
571
572    fn into_future(self) -> Self::IntoFuture {
573        std::future::ready(self.wait())
574    }
575}
576/// A `Queryable` is an entity that implements the query/reply pattern.
577///
578/// A `Queryable` is declared by the
579/// [`Session::declare_queryable`](crate::Session::declare_queryable) method
580/// and serves [`Query`](crate::query::Query) using callback
581/// or channel (see [handlers](crate::handlers) module documentation for details).
582///
583/// The `Queryable` receives [`Query`](crate::query::Query) requests from
584/// [`Querier::get`](crate::query::Querier::get) or from [`Session::get`](crate::Session::get)
585/// and sends back replies with the methods of the [`Query`](crate::query::Query): [`reply`](crate::query::Query::reply),
586/// [`reply_err`](crate::query::Query::reply_err) or [`reply_del`](crate::query::Query::reply_del).
587///
588/// # Examples
589///
590/// Using callback:
591/// ```
592/// # #[tokio::main]
593/// # async fn main() {
594/// use futures::prelude::*;
595///
596/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
597/// let queryable = session
598///     .declare_queryable("key/expression")
599///     .callback(move |query| {
600///         use crate::zenoh::Wait;
601///         println!(">> Handling query '{}'", query.selector());
602///         query.reply("key/expression", "value").wait().unwrap();
603/// #       format!("{query}");
604/// #       format!("{query:?}");
605///     })
606///     .await
607///     .unwrap();
608/// # format!("{queryable:?}");
609/// # session.get("key/expression").await.unwrap();
610/// # }
611/// ```
612///
613/// Using channel handler:
614/// ```no_run
615/// # #[tokio::main]
616/// # async fn main() {
617///
618/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
619/// let queryable = session
620///     .declare_queryable("key/expression")
621///     .await
622///     .unwrap();
623/// while let Ok(query) = queryable.recv_async().await {
624///     println!(">> Handling query '{}'", query.selector());
625///     query.reply("key/expression", "value").await.unwrap();
626/// }
627/// // queryable is undeclared at the end of the scope
628/// # }
629/// ```
630#[non_exhaustive]
631#[derive(Debug)]
632pub struct Queryable<Handler> {
633    pub(crate) inner: QueryableInner,
634    pub(crate) handler: Handler,
635}
636
637impl<Handler> Queryable<Handler> {
638    /// Returns the [`EntityGlobalId`] of this Queryable.
639    ///
640    /// # Examples
641    /// ```
642    /// # #[tokio::main]
643    /// # async fn main() {
644    ///
645    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
646    /// let queryable = session.declare_queryable("key/expression").await.unwrap();
647    /// let queryable_id = queryable.id();
648    /// # }
649    /// ```
650    #[zenoh_macros::unstable]
651    pub fn id(&self) -> EntityGlobalId {
652        EntityGlobalIdProto {
653            zid: self.inner.session.zid().into(),
654            eid: self.inner.id,
655        }
656        .into()
657    }
658
659    /// Returns a reference to this queryable's handler.
660    /// A handler is anything that implements [`IntoHandler`](crate::handlers::IntoHandler).
661    /// The default handler is [`DefaultHandler`](crate::handlers::DefaultHandler).
662    ///
663    /// # Examples
664    /// ```
665    /// # #[tokio::main]
666    /// # async fn main() {
667    ///
668    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
669    /// let queryable = session.declare_queryable("key/expression").await.unwrap();
670    /// let handler = queryable.handler();
671    /// # }
672    /// ```
673    pub fn handler(&self) -> &Handler {
674        &self.handler
675    }
676
677    /// Returns a mutable reference to this queryable's handler.
678    /// A handler is anything that implements [`IntoHandler`](crate::handlers::IntoHandler).
679    /// The default handler is [`DefaultHandler`](crate::handlers::DefaultHandler).
680    ///
681    /// # Examples
682    /// ```
683    /// # #[tokio::main]
684    /// # async fn main() {
685    ///
686    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
687    /// let mut queryable = session.declare_queryable("key/expression").await.unwrap();
688    /// let handler = queryable.handler_mut();
689    /// # }
690    /// ```
691    pub fn handler_mut(&mut self) -> &mut Handler {
692        &mut self.handler
693    }
694
695    /// Undeclare the [`Queryable`].
696    ///
697    /// # Examples
698    /// ```
699    /// # #[tokio::main]
700    /// # async fn main() {
701    ///
702    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
703    /// let queryable = session.declare_queryable("key/expression").await.unwrap();
704    /// queryable.undeclare().await.unwrap();
705    /// # }
706    /// ```
707    #[inline]
708    pub fn undeclare(self) -> impl Resolve<ZResult<()>>
709    where
710        Handler: Send,
711    {
712        UndeclarableSealed::undeclare_inner(self, ())
713    }
714
715    fn undeclare_impl(&mut self) -> ZResult<()> {
716        // set the flag first to avoid double panic if this function panic
717        self.inner.undeclare_on_drop = false;
718        self.inner.session.close_queryable(self.inner.id)
719    }
720
721    /// Make queryable run in background until the session is closed.
722    ///
723    /// # Examples
724    /// ```
725    /// # #[tokio::main]
726    /// # async fn main() {
727    ///
728    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
729    /// let mut queryable = session.declare_queryable("key/expression").await.unwrap();
730    /// queryable.set_background(true);
731    /// # }
732    /// ```
733    #[zenoh_macros::internal]
734    pub fn set_background(&mut self, background: bool) {
735        self.inner.undeclare_on_drop = !background;
736    }
737
738    /// Returns the [`KeyExpr`] this queryable responds to.
739    ///
740    /// # Examples
741    /// ```
742    /// # #[tokio::main]
743    /// # async fn main() {
744    ///
745    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
746    /// let queryable = session.declare_queryable("key/expression")
747    ///     .await
748    ///     .unwrap();
749    /// let key_expr = queryable.key_expr();
750    /// # }
751    /// ```
752    #[inline]
753    pub fn key_expr(&self) -> &KeyExpr<'static> {
754        &self.inner.key_expr
755    }
756}
757
758impl<Handler> Drop for Queryable<Handler> {
759    fn drop(&mut self) {
760        if self.inner.undeclare_on_drop {
761            if let Err(error) = self.undeclare_impl() {
762                error!(error);
763            }
764        }
765    }
766}
767
768impl<Handler: Send> UndeclarableSealed<()> for Queryable<Handler> {
769    type Undeclaration = QueryableUndeclaration<Handler>;
770
771    fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
772        QueryableUndeclaration(self)
773    }
774}
775
776impl<Handler> Deref for Queryable<Handler> {
777    type Target = Handler;
778
779    fn deref(&self) -> &Self::Target {
780        self.handler()
781    }
782}
783
784impl<Handler> DerefMut for Queryable<Handler> {
785    fn deref_mut(&mut self) -> &mut Self::Target {
786        self.handler_mut()
787    }
788}