zenoh/api/builders/queryable.rs
1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15 future::{IntoFuture, Ready},
16 sync::Arc,
17};
18
19use zenoh_core::{Resolvable, Wait};
20use zenoh_result::ZResult;
21
22use crate::{
23 api::{
24 handlers::{locked, DefaultHandler, IntoHandler},
25 key_expr::KeyExpr,
26 queryable::{Query, Queryable, QueryableInner},
27 sample::Locality,
28 },
29 handlers::Callback,
30 Session,
31};
32
33/// A builder for initializing a [`Queryable`].
34///
35/// # Examples
36/// ```
37/// # #[tokio::main]
38/// # async fn main() {
39///
40/// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
41/// let queryable = session.declare_queryable("key/expression").await.unwrap();
42/// # }
43/// ```
44#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
45#[derive(Debug)]
46pub struct QueryableBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> {
47 pub(crate) session: &'a Session,
48 pub(crate) key_expr: ZResult<KeyExpr<'b>>,
49 pub(crate) complete: bool,
50 pub(crate) origin: Locality,
51 pub(crate) handler: Handler,
52}
53
54impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> {
55 /// Receive the queries for this queryable with a callback.
56 ///
57 /// # Examples
58 /// ```
59 /// # #[tokio::main]
60 /// # async fn main() {
61 ///
62 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
63 /// let queryable = session
64 /// .declare_queryable("key/expression")
65 /// .callback(|query| {println!(">> Handling query '{}'", query.selector());})
66 /// .await
67 /// .unwrap();
68 /// # }
69 /// ```
70 #[inline]
71 pub fn callback<F>(self, callback: F) -> QueryableBuilder<'a, 'b, Callback<Query>>
72 where
73 F: Fn(Query) + Send + Sync + 'static,
74 {
75 self.with(Callback::new(Arc::new(callback)))
76 }
77
78 /// Receive the queries for this Queryable with a mutable callback.
79 ///
80 /// Using this guarantees that your callback will never be called concurrently.
81 /// If your callback is also accepted by the [`callback`](QueryableBuilder::callback) method, we suggest you use it instead of `callback_mut`.
82 ///
83 /// # Examples
84 /// ```
85 /// # #[tokio::main]
86 /// # async fn main() {
87 ///
88 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
89 /// let mut n = 0;
90 /// let queryable = session
91 /// .declare_queryable("key/expression")
92 /// .callback_mut(move |query| {n += 1;})
93 /// .await
94 /// .unwrap();
95 /// # }
96 /// ```
97 #[inline]
98 pub fn callback_mut<F>(self, callback: F) -> QueryableBuilder<'a, 'b, Callback<Query>>
99 where
100 F: FnMut(Query) + Send + Sync + 'static,
101 {
102 self.callback(locked(callback))
103 }
104
105 /// Receive the queries for this Queryable with a [`Handler`](crate::handlers::IntoHandler).
106 ///
107 /// # Examples
108 /// ```no_run
109 /// # #[tokio::main]
110 /// # async fn main() {
111 ///
112 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
113 /// let queryable = session
114 /// .declare_queryable("key/expression")
115 /// .with(flume::bounded(32))
116 /// .await
117 /// .unwrap();
118 /// while let Ok(query) = queryable.recv_async().await {
119 /// println!(">> Handling query '{}'", query.selector());
120 /// }
121 /// # }
122 /// ```
123 #[inline]
124 pub fn with<Handler>(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler>
125 where
126 Handler: IntoHandler<Query>,
127 {
128 let QueryableBuilder {
129 session,
130 key_expr,
131 complete,
132 origin,
133 handler: _,
134 } = self;
135 QueryableBuilder {
136 session,
137 key_expr,
138 complete,
139 origin,
140 handler,
141 }
142 }
143}
144
145impl<'a, 'b> QueryableBuilder<'a, 'b, Callback<Query>> {
146 /// Register the queryable callback to be run in background until the session is closed.
147 ///
148 /// Background builder doesn't return a `Queryable` object anymore.
149 ///
150 /// # Examples
151 /// ```
152 /// # #[tokio::main]
153 /// # async fn main() {
154 ///
155 /// let session = zenoh::open(zenoh::Config::default()).await.unwrap();
156 /// // no need to assign and keep a variable with a background queryable
157 /// session
158 /// .declare_queryable("key/expression")
159 /// .callback(|query| {println!(">> Handling query '{}'", query.selector());})
160 /// .background()
161 /// .await
162 /// .unwrap();
163 /// # }
164 /// ```
165 pub fn background(self) -> QueryableBuilder<'a, 'b, Callback<Query>, true> {
166 QueryableBuilder {
167 session: self.session,
168 key_expr: self.key_expr,
169 complete: self.complete,
170 origin: self.origin,
171 handler: self.handler,
172 }
173 }
174}
175
176impl<Handler, const BACKGROUND: bool> QueryableBuilder<'_, '_, Handler, BACKGROUND> {
177 /// Change queryable completeness.
178 #[inline]
179 pub fn complete(mut self, complete: bool) -> Self {
180 self.complete = complete;
181 self
182 }
183
184 ///
185 ///
186 /// Restrict the matching queries that will be receive by this [`Queryable`]
187 /// to the ones that have the given [`Locality`](Locality).
188 #[inline]
189 #[zenoh_macros::unstable]
190 pub fn allowed_origin(mut self, origin: Locality) -> Self {
191 self.origin = origin;
192 self
193 }
194}
195
196impl<Handler> Resolvable for QueryableBuilder<'_, '_, Handler>
197where
198 Handler: IntoHandler<Query> + Send,
199 Handler::Handler: Send,
200{
201 type To = ZResult<Queryable<Handler::Handler>>;
202}
203
204impl<Handler> Wait for QueryableBuilder<'_, '_, Handler>
205where
206 Handler: IntoHandler<Query> + Send,
207 Handler::Handler: Send,
208{
209 fn wait(self) -> <Self as Resolvable>::To {
210 let session = self.session;
211 let (callback, receiver) = self.handler.into_handler();
212 session
213 .0
214 .declare_queryable_inner(&self.key_expr?, self.complete, self.origin, callback)
215 .map(|qable_state| Queryable {
216 inner: QueryableInner {
217 session: self.session.downgrade(),
218 id: qable_state.id,
219 undeclare_on_drop: true,
220 },
221 handler: receiver,
222 })
223 }
224}
225
226impl<Handler> IntoFuture for QueryableBuilder<'_, '_, Handler>
227where
228 Handler: IntoHandler<Query> + Send,
229 Handler::Handler: Send,
230{
231 type Output = <Self as Resolvable>::To;
232 type IntoFuture = Ready<<Self as Resolvable>::To>;
233
234 fn into_future(self) -> Self::IntoFuture {
235 std::future::ready(self.wait())
236 }
237}
238
239impl Resolvable for QueryableBuilder<'_, '_, Callback<Query>, true> {
240 type To = ZResult<()>;
241}
242
243impl Wait for QueryableBuilder<'_, '_, Callback<Query>, true> {
244 fn wait(self) -> <Self as Resolvable>::To {
245 self.session.0.declare_queryable_inner(
246 &self.key_expr?,
247 self.complete,
248 self.origin,
249 self.handler,
250 )?;
251 Ok(())
252 }
253}
254
255impl IntoFuture for QueryableBuilder<'_, '_, Callback<Query>, true> {
256 type Output = <Self as Resolvable>::To;
257 type IntoFuture = Ready<<Self as Resolvable>::To>;
258
259 fn into_future(self) -> Self::IntoFuture {
260 std::future::ready(self.wait())
261 }
262}