Skip to main content

zenoh/api/builders/
reply.rs

1//
2// Copyright (c) 2024 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::future::{IntoFuture, Ready};
15
16use uhlc::Timestamp;
17use zenoh_core::{Resolvable, Wait};
18use zenoh_protocol::{
19    core::{CongestionControl, WireExpr},
20    network::{response, Mapping, Response},
21    zenoh::{self, ResponseBody},
22};
23use zenoh_result::ZResult;
24
25#[zenoh_macros::unstable]
26use crate::api::sample::SourceInfo;
27use crate::api::{
28    builders::sample::{
29        EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait,
30        TimestampBuilderTrait,
31    },
32    bytes::{OptionZBytes, ZBytes},
33    encoding::Encoding,
34    key_expr::KeyExpr,
35    publisher::Priority,
36    queryable::Query,
37    sample::QoSBuilder,
38};
39
40/// The type modifier for a [`ReplyBuilder`] to create a reply with a [`Put`](crate::sample::SampleKind::Put) sample.
41#[derive(Debug)]
42pub struct ReplyBuilderPut {
43    payload: ZBytes,
44    encoding: Encoding,
45}
46
47/// The type modifier for a [`ReplyBuilder`] to create a reply with a [`Delete`](crate::sample::SampleKind::Delete) sample.
48#[derive(Debug)]
49pub struct ReplyBuilderDelete;
50
51/// A builder for a [`Reply`](crate::query::Reply)
52/// returned by [`Query::reply()`](Query::reply) and [`Query::reply_del()`](Query::reply_del)
53#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
54#[derive(Debug)]
55pub struct ReplyBuilder<'a, 'b, T> {
56    query: &'a Query,
57    key_expr: ZResult<KeyExpr<'b>>,
58    kind: T,
59    timestamp: Option<Timestamp>,
60    qos: QoSBuilder,
61    #[cfg(feature = "unstable")]
62    source_info: Option<SourceInfo>,
63    attachment: Option<ZBytes>,
64}
65
66impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderPut> {
67    pub(crate) fn new<TryIntoKeyExpr, IntoZBytes>(
68        query: &'a Query,
69        key_expr: TryIntoKeyExpr,
70        payload: IntoZBytes,
71    ) -> Self
72    where
73        TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
74        <TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
75        IntoZBytes: Into<ZBytes>,
76    {
77        Self {
78            query,
79            key_expr: key_expr.try_into().map_err(Into::into),
80            qos: query.inner.qos.into(),
81            kind: ReplyBuilderPut {
82                payload: payload.into(),
83                encoding: Encoding::default(),
84            },
85            timestamp: None,
86            #[cfg(feature = "unstable")]
87            source_info: None,
88            attachment: None,
89        }
90    }
91}
92
93impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderDelete> {
94    pub(crate) fn new<TryIntoKeyExpr>(query: &'a Query, key_expr: TryIntoKeyExpr) -> Self
95    where
96        TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
97        <TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
98    {
99        Self {
100            query,
101            key_expr: key_expr.try_into().map_err(Into::into),
102            qos: query.inner.qos.into(),
103            kind: ReplyBuilderDelete,
104            timestamp: None,
105            #[cfg(feature = "unstable")]
106            source_info: None,
107            attachment: None,
108        }
109    }
110}
111
112#[zenoh_macros::internal_trait]
113impl<T> TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
114    fn timestamp<U: Into<Option<Timestamp>>>(self, timestamp: U) -> Self {
115        Self {
116            timestamp: timestamp.into(),
117            ..self
118        }
119    }
120}
121
122#[zenoh_macros::internal_trait]
123impl<T> SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
124    fn attachment<U: Into<OptionZBytes>>(self, attachment: U) -> Self {
125        let attachment: OptionZBytes = attachment.into();
126        Self {
127            attachment: attachment.into(),
128            ..self
129        }
130    }
131
132    #[cfg(feature = "unstable")]
133    fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
134        Self {
135            source_info: source_info.into(),
136            ..self
137        }
138    }
139}
140
141#[zenoh_macros::internal_trait]
142impl<T> QoSBuilderTrait for ReplyBuilder<'_, '_, T> {
143    #[deprecated = "calling this function has no impact, replies will use the query congestion control"]
144    fn congestion_control(self, _congestion_control: CongestionControl) -> Self {
145        self
146    }
147
148    #[deprecated = "calling this function has no impact, replies will use the query priority"]
149    fn priority(self, _priority: Priority) -> Self {
150        self
151    }
152
153    fn express(self, is_express: bool) -> Self {
154        let qos = self.qos.express(is_express);
155        Self { qos, ..self }
156    }
157}
158
159#[zenoh_macros::internal_trait]
160impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> {
161    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
162        Self {
163            kind: ReplyBuilderPut {
164                encoding: encoding.into(),
165                ..self.kind
166            },
167            ..self
168        }
169    }
170}
171
172impl<T> Resolvable for ReplyBuilder<'_, '_, T> {
173    type To = ZResult<()>;
174}
175
176impl Wait for ReplyBuilder<'_, '_, ReplyBuilderPut> {
177    fn wait(self) -> <Self as Resolvable>::To {
178        let key_expr = self.key_expr?.into_owned();
179        let sample = SampleBuilder::put(key_expr, self.kind.payload)
180            .encoding(self.kind.encoding)
181            .timestamp(self.timestamp)
182            .qos(self.qos.into());
183        #[cfg(feature = "unstable")]
184        let sample = sample.source_info(self.source_info);
185        let sample = sample.attachment(self.attachment);
186        self.query._reply_sample(sample.into())
187    }
188}
189
190impl Wait for ReplyBuilder<'_, '_, ReplyBuilderDelete> {
191    fn wait(self) -> <Self as Resolvable>::To {
192        let key_expr = self.key_expr?.into_owned();
193        let sample = SampleBuilder::delete(key_expr)
194            .timestamp(self.timestamp)
195            .qos(self.qos.into());
196        #[cfg(feature = "unstable")]
197        let sample = sample.source_info(self.source_info);
198        let sample = sample.attachment(self.attachment);
199        self.query._reply_sample(sample.into())
200    }
201}
202
203impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderPut> {
204    type Output = <Self as Resolvable>::To;
205    type IntoFuture = Ready<<Self as Resolvable>::To>;
206
207    fn into_future(self) -> Self::IntoFuture {
208        std::future::ready(self.wait())
209    }
210}
211
212impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderDelete> {
213    type Output = <Self as Resolvable>::To;
214    type IntoFuture = Ready<<Self as Resolvable>::To>;
215
216    fn into_future(self) -> Self::IntoFuture {
217        std::future::ready(self.wait())
218    }
219}
220
221/// A builder for a [`ReplyError`](crate::query::ReplyError)
222/// returned by [`Query::reply_err()`](Query::reply_err).
223#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
224#[derive(Debug)]
225pub struct ReplyErrBuilder<'a> {
226    query: &'a Query,
227    payload: ZBytes,
228    encoding: Encoding,
229}
230
231impl<'a> ReplyErrBuilder<'a> {
232    pub(crate) fn new<IntoZBytes>(query: &'a Query, payload: IntoZBytes) -> ReplyErrBuilder<'a>
233    where
234        IntoZBytes: Into<ZBytes>,
235    {
236        Self {
237            query,
238            payload: payload.into(),
239            encoding: Encoding::default(),
240        }
241    }
242}
243
244#[zenoh_macros::internal_trait]
245impl EncodingBuilderTrait for ReplyErrBuilder<'_> {
246    fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
247        Self {
248            encoding: encoding.into(),
249            ..self
250        }
251    }
252}
253
254impl Resolvable for ReplyErrBuilder<'_> {
255    type To = ZResult<()>;
256}
257
258impl Wait for ReplyErrBuilder<'_> {
259    fn wait(self) -> <Self as Resolvable>::To {
260        self.query.inner.primitives.send_response(&mut Response {
261            rid: self.query.inner.qid,
262            wire_expr: WireExpr {
263                scope: 0,
264                suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()),
265                mapping: Mapping::Sender,
266            },
267            payload: ResponseBody::Err(zenoh::Err {
268                encoding: self.encoding.into(),
269                ext_sinfo: None,
270                #[cfg(feature = "shared-memory")]
271                ext_shm: None,
272                ext_unknown: vec![],
273                payload: self.payload.into(),
274            }),
275            ext_qos: self.query.inner.qos.into(),
276            ext_tstamp: None,
277            ext_respid: Some(response::ext::ResponderIdType {
278                zid: self.query.inner.zid,
279                eid: self.query.eid,
280            }),
281        });
282        Ok(())
283    }
284}
285
286impl IntoFuture for ReplyErrBuilder<'_> {
287    type Output = <Self as Resolvable>::To;
288    type IntoFuture = Ready<<Self as Resolvable>::To>;
289
290    fn into_future(self) -> Self::IntoFuture {
291        std::future::ready(self.wait())
292    }
293}