1use std::future::{IntoFuture, Ready};
15
16use uhlc::Timestamp;
17use zenoh_core::{Resolvable, Wait};
18use zenoh_protocol::{
19 core::{CongestionControl, WireExpr},
20 network::{response, Mapping, Response},
21 zenoh::{self, ResponseBody},
22};
23use zenoh_result::ZResult;
24
25#[zenoh_macros::unstable]
26use crate::api::sample::SourceInfo;
27use crate::api::{
28 builders::sample::{
29 EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait,
30 TimestampBuilderTrait,
31 },
32 bytes::{OptionZBytes, ZBytes},
33 encoding::Encoding,
34 key_expr::KeyExpr,
35 publisher::Priority,
36 queryable::Query,
37 sample::QoSBuilder,
38};
39
40#[derive(Debug)]
42pub struct ReplyBuilderPut {
43 payload: ZBytes,
44 encoding: Encoding,
45}
46
47#[derive(Debug)]
49pub struct ReplyBuilderDelete;
50
51#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
54#[derive(Debug)]
55pub struct ReplyBuilder<'a, 'b, T> {
56 query: &'a Query,
57 key_expr: ZResult<KeyExpr<'b>>,
58 kind: T,
59 timestamp: Option<Timestamp>,
60 qos: QoSBuilder,
61 #[cfg(feature = "unstable")]
62 source_info: Option<SourceInfo>,
63 attachment: Option<ZBytes>,
64}
65
66impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderPut> {
67 pub(crate) fn new<TryIntoKeyExpr, IntoZBytes>(
68 query: &'a Query,
69 key_expr: TryIntoKeyExpr,
70 payload: IntoZBytes,
71 ) -> Self
72 where
73 TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
74 <TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
75 IntoZBytes: Into<ZBytes>,
76 {
77 Self {
78 query,
79 key_expr: key_expr.try_into().map_err(Into::into),
80 qos: query.inner.qos.into(),
81 kind: ReplyBuilderPut {
82 payload: payload.into(),
83 encoding: Encoding::default(),
84 },
85 timestamp: None,
86 #[cfg(feature = "unstable")]
87 source_info: None,
88 attachment: None,
89 }
90 }
91}
92
93impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderDelete> {
94 pub(crate) fn new<TryIntoKeyExpr>(query: &'a Query, key_expr: TryIntoKeyExpr) -> Self
95 where
96 TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
97 <TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
98 {
99 Self {
100 query,
101 key_expr: key_expr.try_into().map_err(Into::into),
102 qos: query.inner.qos.into(),
103 kind: ReplyBuilderDelete,
104 timestamp: None,
105 #[cfg(feature = "unstable")]
106 source_info: None,
107 attachment: None,
108 }
109 }
110}
111
112#[zenoh_macros::internal_trait]
113impl<T> TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
114 fn timestamp<U: Into<Option<Timestamp>>>(self, timestamp: U) -> Self {
115 Self {
116 timestamp: timestamp.into(),
117 ..self
118 }
119 }
120}
121
122#[zenoh_macros::internal_trait]
123impl<T> SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
124 fn attachment<U: Into<OptionZBytes>>(self, attachment: U) -> Self {
125 let attachment: OptionZBytes = attachment.into();
126 Self {
127 attachment: attachment.into(),
128 ..self
129 }
130 }
131
132 #[cfg(feature = "unstable")]
133 fn source_info<TS: Into<Option<SourceInfo>>>(self, source_info: TS) -> Self {
134 Self {
135 source_info: source_info.into(),
136 ..self
137 }
138 }
139}
140
141#[zenoh_macros::internal_trait]
142impl<T> QoSBuilderTrait for ReplyBuilder<'_, '_, T> {
143 #[deprecated = "calling this function has no impact, replies will use the query congestion control"]
144 fn congestion_control(self, _congestion_control: CongestionControl) -> Self {
145 self
146 }
147
148 #[deprecated = "calling this function has no impact, replies will use the query priority"]
149 fn priority(self, _priority: Priority) -> Self {
150 self
151 }
152
153 fn express(self, is_express: bool) -> Self {
154 let qos = self.qos.express(is_express);
155 Self { qos, ..self }
156 }
157}
158
159#[zenoh_macros::internal_trait]
160impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> {
161 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
162 Self {
163 kind: ReplyBuilderPut {
164 encoding: encoding.into(),
165 ..self.kind
166 },
167 ..self
168 }
169 }
170}
171
172impl<T> Resolvable for ReplyBuilder<'_, '_, T> {
173 type To = ZResult<()>;
174}
175
176impl Wait for ReplyBuilder<'_, '_, ReplyBuilderPut> {
177 fn wait(self) -> <Self as Resolvable>::To {
178 let key_expr = self.key_expr?.into_owned();
179 let sample = SampleBuilder::put(key_expr, self.kind.payload)
180 .encoding(self.kind.encoding)
181 .timestamp(self.timestamp)
182 .qos(self.qos.into());
183 #[cfg(feature = "unstable")]
184 let sample = sample.source_info(self.source_info);
185 let sample = sample.attachment(self.attachment);
186 self.query._reply_sample(sample.into())
187 }
188}
189
190impl Wait for ReplyBuilder<'_, '_, ReplyBuilderDelete> {
191 fn wait(self) -> <Self as Resolvable>::To {
192 let key_expr = self.key_expr?.into_owned();
193 let sample = SampleBuilder::delete(key_expr)
194 .timestamp(self.timestamp)
195 .qos(self.qos.into());
196 #[cfg(feature = "unstable")]
197 let sample = sample.source_info(self.source_info);
198 let sample = sample.attachment(self.attachment);
199 self.query._reply_sample(sample.into())
200 }
201}
202
203impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderPut> {
204 type Output = <Self as Resolvable>::To;
205 type IntoFuture = Ready<<Self as Resolvable>::To>;
206
207 fn into_future(self) -> Self::IntoFuture {
208 std::future::ready(self.wait())
209 }
210}
211
212impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderDelete> {
213 type Output = <Self as Resolvable>::To;
214 type IntoFuture = Ready<<Self as Resolvable>::To>;
215
216 fn into_future(self) -> Self::IntoFuture {
217 std::future::ready(self.wait())
218 }
219}
220
221#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"]
224#[derive(Debug)]
225pub struct ReplyErrBuilder<'a> {
226 query: &'a Query,
227 payload: ZBytes,
228 encoding: Encoding,
229}
230
231impl<'a> ReplyErrBuilder<'a> {
232 pub(crate) fn new<IntoZBytes>(query: &'a Query, payload: IntoZBytes) -> ReplyErrBuilder<'a>
233 where
234 IntoZBytes: Into<ZBytes>,
235 {
236 Self {
237 query,
238 payload: payload.into(),
239 encoding: Encoding::default(),
240 }
241 }
242}
243
244#[zenoh_macros::internal_trait]
245impl EncodingBuilderTrait for ReplyErrBuilder<'_> {
246 fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
247 Self {
248 encoding: encoding.into(),
249 ..self
250 }
251 }
252}
253
254impl Resolvable for ReplyErrBuilder<'_> {
255 type To = ZResult<()>;
256}
257
258impl Wait for ReplyErrBuilder<'_> {
259 fn wait(self) -> <Self as Resolvable>::To {
260 self.query.inner.primitives.send_response(&mut Response {
261 rid: self.query.inner.qid,
262 wire_expr: WireExpr {
263 scope: 0,
264 suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()),
265 mapping: Mapping::Sender,
266 },
267 payload: ResponseBody::Err(zenoh::Err {
268 encoding: self.encoding.into(),
269 ext_sinfo: None,
270 #[cfg(feature = "shared-memory")]
271 ext_shm: None,
272 ext_unknown: vec![],
273 payload: self.payload.into(),
274 }),
275 ext_qos: self.query.inner.qos.into(),
276 ext_tstamp: None,
277 ext_respid: Some(response::ext::ResponderIdType {
278 zid: self.query.inner.zid,
279 eid: self.query.eid,
280 }),
281 });
282 Ok(())
283 }
284}
285
286impl IntoFuture for ReplyErrBuilder<'_> {
287 type Output = <Self as Resolvable>::To;
288 type IntoFuture = Ready<<Self as Resolvable>::To>;
289
290 fn into_future(self) -> Self::IntoFuture {
291 std::future::ready(self.wait())
292 }
293}