zenoh/api/builders/
queryable.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    future::{IntoFuture, Ready},
16    sync::Arc,
17};
18
19use zenoh_core::{Resolvable, Wait};
20use zenoh_result::ZResult;
21
22use crate::{
23    api::{
24        handlers::{locked, DefaultHandler, IntoHandler},
25        key_expr::KeyExpr,
26        queryable::{Query, Queryable, QueryableInner},
27        sample::Locality,
28    },
29    handlers::Callback,
30    Session,
31};
32
33/// A builder for initializing a [`Queryable`].
34///
35/// # Examples
36/// ```
37/// # #[tokio::main]
38/// # async fn main() {
39///
40/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
41/// let queryable = session.declare_queryable("key/expression").await.unwrap();
42/// # }
43/// ```
44#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
45#[derive(Debug)]
46pub struct QueryableBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> {
47    pub(crate) session: &'a Session,
48    pub(crate) key_expr: ZResult<KeyExpr<'b>>,
49    pub(crate) complete: bool,
50    pub(crate) origin: Locality,
51    pub(crate) handler: Handler,
52}
53
54impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> {
55    /// Receive the queries for this queryable with a callback.
56    ///
57    /// # Examples
58    /// ```
59    /// # #[tokio::main]
60    /// # async fn main() {
61    ///
62    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
63    /// let queryable = session
64    ///     .declare_queryable("key/expression")
65    ///     .callback(|query| {println!(">> Handling query '{}'", query.selector());})
66    ///     .await
67    ///     .unwrap();
68    /// # }
69    /// ```
70    #[inline]
71    pub fn callback<F>(self, callback: F) -> QueryableBuilder<'a, 'b, Callback<Query>>
72    where
73        F: Fn(Query) + Send + Sync + 'static,
74    {
75        self.with(Callback::new(Arc::new(callback)))
76    }
77
78    /// Receive the queries for this Queryable with a mutable callback.
79    ///
80    /// Using this guarantees that your callback will never be called concurrently.
81    /// If your callback is also accepted by the [`callback`](QueryableBuilder::callback) method, we suggest you use it instead of `callback_mut`.
82    ///
83    /// # Examples
84    /// ```
85    /// # #[tokio::main]
86    /// # async fn main() {
87    ///
88    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
89    /// let mut n = 0;
90    /// let queryable = session
91    ///     .declare_queryable("key/expression")
92    ///     .callback_mut(move |query| {n += 1;})
93    ///     .await
94    ///     .unwrap();
95    /// # }
96    /// ```
97    #[inline]
98    pub fn callback_mut<F>(self, callback: F) -> QueryableBuilder<'a, 'b, Callback<Query>>
99    where
100        F: FnMut(Query) + Send + Sync + 'static,
101    {
102        self.callback(locked(callback))
103    }
104
105    /// Receive the queries for this Queryable with a [`Handler`](crate::handlers::IntoHandler).
106    ///
107    /// # Examples
108    /// ```no_run
109    /// # #[tokio::main]
110    /// # async fn main() {
111    ///
112    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
113    /// let queryable = session
114    ///     .declare_queryable("key/expression")
115    ///     .with(flume::bounded(32))
116    ///     .await
117    ///     .unwrap();
118    /// while let Ok(query) = queryable.recv_async().await {
119    ///     println!(">> Handling query '{}'", query.selector());
120    /// }
121    /// # }
122    /// ```
123    #[inline]
124    pub fn with<Handler>(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler>
125    where
126        Handler: IntoHandler<Query>,
127    {
128        let QueryableBuilder {
129            session,
130            key_expr,
131            complete,
132            origin,
133            handler: _,
134        } = self;
135        QueryableBuilder {
136            session,
137            key_expr,
138            complete,
139            origin,
140            handler,
141        }
142    }
143}
144
145impl<'a, 'b> QueryableBuilder<'a, 'b, Callback<Query>> {
146    /// Register the queryable callback to be run in background until the session is closed.
147    ///
148    /// Background builder doesn't return a `Queryable` object anymore.
149    ///
150    /// # Examples
151    /// ```
152    /// # #[tokio::main]
153    /// # async fn main() {
154    ///
155    /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
156    /// // no need to assign and keep a variable with a background queryable
157    /// session
158    ///     .declare_queryable("key/expression")
159    ///     .callback(|query| {println!(">> Handling query '{}'", query.selector());})
160    ///     .background()
161    ///     .await
162    ///     .unwrap();
163    /// # }
164    /// ```
165    pub fn background(self) -> QueryableBuilder<'a, 'b, Callback<Query>, true> {
166        QueryableBuilder {
167            session: self.session,
168            key_expr: self.key_expr,
169            complete: self.complete,
170            origin: self.origin,
171            handler: self.handler,
172        }
173    }
174}
175
176impl<Handler, const BACKGROUND: bool> QueryableBuilder<'_, '_, Handler, BACKGROUND> {
177    /// Change queryable completeness.
178    #[inline]
179    pub fn complete(mut self, complete: bool) -> Self {
180        self.complete = complete;
181        self
182    }
183
184    ///
185    ///
186    /// Restrict the matching queries that will be receive by this [`Queryable`]
187    /// to the ones that have the given [`Locality`](Locality).
188    #[inline]
189    #[zenoh_macros::unstable]
190    pub fn allowed_origin(mut self, origin: Locality) -> Self {
191        self.origin = origin;
192        self
193    }
194}
195
196impl<Handler> Resolvable for QueryableBuilder<'_, '_, Handler>
197where
198    Handler: IntoHandler<Query> + Send,
199    Handler::Handler: Send,
200{
201    type To = ZResult<Queryable<Handler::Handler>>;
202}
203
204impl<Handler> Wait for QueryableBuilder<'_, '_, Handler>
205where
206    Handler: IntoHandler<Query> + Send,
207    Handler::Handler: Send,
208{
209    fn wait(self) -> <Self as Resolvable>::To {
210        let session = self.session;
211        let (callback, receiver) = self.handler.into_handler();
212        session
213            .0
214            .declare_queryable_inner(&self.key_expr?, self.complete, self.origin, callback)
215            .map(|qable_state| Queryable {
216                inner: QueryableInner {
217                    session: self.session.downgrade(),
218                    id: qable_state.id,
219                    undeclare_on_drop: true,
220                },
221                handler: receiver,
222            })
223    }
224}
225
226impl<Handler> IntoFuture for QueryableBuilder<'_, '_, Handler>
227where
228    Handler: IntoHandler<Query> + Send,
229    Handler::Handler: Send,
230{
231    type Output = <Self as Resolvable>::To;
232    type IntoFuture = Ready<<Self as Resolvable>::To>;
233
234    fn into_future(self) -> Self::IntoFuture {
235        std::future::ready(self.wait())
236    }
237}
238
239impl Resolvable for QueryableBuilder<'_, '_, Callback<Query>, true> {
240    type To = ZResult<()>;
241}
242
243impl Wait for QueryableBuilder<'_, '_, Callback<Query>, true> {
244    fn wait(self) -> <Self as Resolvable>::To {
245        self.session.0.declare_queryable_inner(
246            &self.key_expr?,
247            self.complete,
248            self.origin,
249            self.handler,
250        )?;
251        Ok(())
252    }
253}
254
255impl IntoFuture for QueryableBuilder<'_, '_, Callback<Query>, true> {
256    type Output = <Self as Resolvable>::To;
257    type IntoFuture = Ready<<Self as Resolvable>::To>;
258
259    fn into_future(self) -> Self::IntoFuture {
260        std::future::ready(self.wait())
261    }
262}