1pub mod del;
15pub mod err;
16pub mod put;
17pub mod query;
18pub mod reply;
19
20use zenoh_buffers::{
21 reader::{DidntRead, Reader},
22 writer::{DidntWrite, Writer},
23 ZBuf,
24};
25#[cfg(feature = "shared-memory")]
26use zenoh_protocol::common::{iext, ZExtUnit};
27use zenoh_protocol::{
28 common::{imsg, ZExtZBufHeader},
29 core::{Encoding, EntityGlobalIdProto, EntityId, ZenohIdProto},
30 zenoh::{ext, id, PushBody, RequestBody, ResponseBody},
31};
32
33#[cfg(not(feature = "shared-memory"))]
34use crate::Zenoh080Bounded;
35#[cfg(feature = "shared-memory")]
36use crate::Zenoh080Sliced;
37use crate::{LCodec, RCodec, WCodec, Zenoh080, Zenoh080Header, Zenoh080Length};
38
39impl<W> WCodec<&PushBody, &mut W> for Zenoh080
41where
42 W: Writer,
43{
44 type Output = Result<(), DidntWrite>;
45
46 fn write(self, writer: &mut W, x: &PushBody) -> Self::Output {
47 match x {
48 PushBody::Put(b) => self.write(&mut *writer, b),
49 PushBody::Del(b) => self.write(&mut *writer, b),
50 }
51 }
52}
53
54impl<R> RCodec<PushBody, &mut R> for Zenoh080
55where
56 R: Reader,
57{
58 type Error = DidntRead;
59
60 fn read(self, reader: &mut R) -> Result<PushBody, Self::Error> {
61 let header: u8 = self.read(&mut *reader)?;
62
63 let codec = Zenoh080Header::new(header);
64 let body = match imsg::mid(codec.header) {
65 id::PUT => PushBody::Put(codec.read(&mut *reader)?),
66 id::DEL => PushBody::Del(codec.read(&mut *reader)?),
67 _ => return Err(DidntRead),
68 };
69
70 Ok(body)
71 }
72}
73
74impl<W> WCodec<&RequestBody, &mut W> for Zenoh080
76where
77 W: Writer,
78{
79 type Output = Result<(), DidntWrite>;
80
81 fn write(self, writer: &mut W, x: &RequestBody) -> Self::Output {
82 match x {
83 RequestBody::Query(b) => self.write(&mut *writer, b),
84 }
85 }
86}
87
88impl<R> RCodec<RequestBody, &mut R> for Zenoh080
89where
90 R: Reader,
91{
92 type Error = DidntRead;
93
94 fn read(self, reader: &mut R) -> Result<RequestBody, Self::Error> {
95 let header: u8 = self.read(&mut *reader)?;
96
97 let codec = Zenoh080Header::new(header);
98 let body = match imsg::mid(codec.header) {
99 id::QUERY => RequestBody::Query(codec.read(&mut *reader)?),
100 _ => return Err(DidntRead),
101 };
102
103 Ok(body)
104 }
105}
106
107impl<W> WCodec<&ResponseBody, &mut W> for Zenoh080
109where
110 W: Writer,
111{
112 type Output = Result<(), DidntWrite>;
113
114 fn write(self, writer: &mut W, x: &ResponseBody) -> Self::Output {
115 match x {
116 ResponseBody::Reply(b) => self.write(&mut *writer, b),
117 ResponseBody::Err(b) => self.write(&mut *writer, b),
118 }
119 }
120}
121
122impl<R> RCodec<ResponseBody, &mut R> for Zenoh080
123where
124 R: Reader,
125{
126 type Error = DidntRead;
127
128 fn read(self, reader: &mut R) -> Result<ResponseBody, Self::Error> {
129 let header: u8 = self.read(&mut *reader)?;
130
131 let codec = Zenoh080Header::new(header);
132 let body = match imsg::mid(codec.header) {
133 id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?),
134 id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
135 _ => return Err(DidntRead),
136 };
137
138 Ok(body)
139 }
140}
141
142impl<const ID: u8> LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 {
144 fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize {
145 let ext::SourceInfoType { id, sn } = x;
146
147 1 + self.w_len(&id.zid) + self.w_len(id.eid) + self.w_len(*sn)
148 }
149}
150
151impl<W, const ID: u8> WCodec<(&ext::SourceInfoType<{ ID }>, bool), &mut W> for Zenoh080
152where
153 W: Writer,
154{
155 type Output = Result<(), DidntWrite>;
156
157 fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output {
158 let (x, more) = x;
159 let ext::SourceInfoType { id, sn } = x;
160
161 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
162 self.write(&mut *writer, (&header, more))?;
163
164 let flags: u8 = (id.zid.size() as u8 - 1) << 4;
165 self.write(&mut *writer, flags)?;
166
167 let lodec = Zenoh080Length::new(id.zid.size());
168 lodec.write(&mut *writer, &id.zid)?;
169
170 self.write(&mut *writer, id.eid)?;
171 self.write(&mut *writer, sn)?;
172 Ok(())
173 }
174}
175
176impl<R, const ID: u8> RCodec<(ext::SourceInfoType<{ ID }>, bool), &mut R> for Zenoh080Header
177where
178 R: Reader,
179{
180 type Error = DidntRead;
181
182 fn read(self, reader: &mut R) -> Result<(ext::SourceInfoType<{ ID }>, bool), Self::Error> {
183 let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
184
185 let flags: u8 = self.codec.read(&mut *reader)?;
186 let length = 1 + ((flags >> 4) as usize);
187
188 let lodec = Zenoh080Length::new(length);
189 let zid: ZenohIdProto = lodec.read(&mut *reader)?;
190
191 let eid: EntityId = self.codec.read(&mut *reader)?;
192 let sn: u32 = self.codec.read(&mut *reader)?;
193
194 Ok((
195 ext::SourceInfoType {
196 id: EntityGlobalIdProto { zid, eid },
197 sn,
198 },
199 more,
200 ))
201 }
202}
203
204#[cfg(feature = "shared-memory")]
206impl<W, const ID: u8> WCodec<(&ext::ShmType<{ ID }>, bool), &mut W> for Zenoh080
207where
208 W: Writer,
209{
210 type Output = Result<(), DidntWrite>;
211
212 fn write(self, writer: &mut W, x: (&ext::ShmType<{ ID }>, bool)) -> Self::Output {
213 let (x, more) = x;
214 let ext::ShmType = x;
215
216 let header: ZExtUnit<{ ID }> = ZExtUnit::new();
217 self.write(&mut *writer, (&header, more))?;
218 Ok(())
219 }
220}
221
222#[cfg(feature = "shared-memory")]
223impl<R, const ID: u8> RCodec<(ext::ShmType<{ ID }>, bool), &mut R> for Zenoh080Header
224where
225 R: Reader,
226{
227 type Error = DidntRead;
228
229 fn read(self, reader: &mut R) -> Result<(ext::ShmType<{ ID }>, bool), Self::Error> {
230 let (_, more): (ZExtUnit<{ ID }>, bool) = self.read(&mut *reader)?;
231 Ok((ext::ShmType, more))
232 }
233}
234
235impl<W, const VID: u8, const SID: u8> WCodec<(&ext::ValueType<{ VID }, { SID }>, bool), &mut W>
237 for Zenoh080
238where
239 W: Writer,
240{
241 type Output = Result<(), DidntWrite>;
242
243 fn write(self, writer: &mut W, x: (&ext::ValueType<{ VID }, { SID }>, bool)) -> Self::Output {
244 let (x, more) = x;
245 let ext::ValueType {
246 encoding,
247 payload,
248 #[cfg(feature = "shared-memory")]
249 ext_shm,
250 } = x;
251
252 #[cfg(feature = "shared-memory")] if let Some(eshm) = ext_shm.as_ref() {
254 self.write(&mut *writer, (eshm, true))?;
255 }
256
257 let mut len = self.w_len(encoding);
259
260 #[cfg(feature = "shared-memory")]
261 {
262 let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
263 len += codec.w_len(payload);
264 }
265
266 #[cfg(not(feature = "shared-memory"))]
267 {
268 let codec = Zenoh080Bounded::<u32>::new();
269 len += codec.w_len(payload);
270 }
271
272 let header: ZExtZBufHeader<{ VID }> = ZExtZBufHeader::new(len);
274 self.write(&mut *writer, (&header, more))?;
275
276 self.write(&mut *writer, encoding)?;
278
279 fn write<W>(writer: &mut W, payload: &ZBuf) -> Result<(), DidntWrite>
281 where
282 W: Writer,
283 {
284 for s in payload.zslices() {
286 writer.write_zslice(s)?;
287 }
288 Ok(())
289 }
290
291 #[cfg(feature = "shared-memory")]
292 {
293 if ext_shm.is_some() {
294 let codec = Zenoh080Sliced::<u32>::new(true);
295 codec.write(&mut *writer, payload)?;
296 } else {
297 write(&mut *writer, payload)?;
298 }
299 }
300
301 #[cfg(not(feature = "shared-memory"))]
302 {
303 write(&mut *writer, payload)?;
304 }
305
306 Ok(())
307 }
308}
309
310impl<R, const VID: u8, const SID: u8> RCodec<(ext::ValueType<{ VID }, { SID }>, bool), &mut R>
311 for Zenoh080Header
312where
313 R: Reader,
314{
315 type Error = DidntRead;
316
317 fn read(
318 #[allow(unused_mut)] mut self,
319 reader: &mut R,
320 ) -> Result<(ext::ValueType<{ VID }, { SID }>, bool), Self::Error> {
321 #[cfg(feature = "shared-memory")]
322 let ext_shm = if iext::eid(self.header) == SID {
323 self.header = self.codec.read(&mut *reader)?;
324 Some(ext::ShmType)
325 } else {
326 None
327 };
328 let (header, more): (ZExtZBufHeader<{ VID }>, bool) = self.read(&mut *reader)?;
329
330 let start = reader.remaining();
332 let encoding: Encoding = self.codec.read(&mut *reader)?;
333 let end = reader.remaining();
334
335 fn read<R>(reader: &mut R, len: usize) -> Result<ZBuf, DidntRead>
337 where
338 R: Reader,
339 {
340 let mut payload = ZBuf::empty();
341 reader.read_zslices(len, |s| payload.push_zslice(s))?;
342 Ok(payload)
343 }
344
345 let len = header.len - (start - end);
347
348 let payload: ZBuf = {
349 #[cfg(feature = "shared-memory")]
350 {
351 if ext_shm.is_some() {
352 let codec = Zenoh080Sliced::<u32>::new(true);
353 let payload: ZBuf = codec.read(&mut *reader)?;
354 payload
355 } else {
356 read(&mut *reader, len)?
357 }
358 }
359
360 #[cfg(not(feature = "shared-memory"))]
361 {
362 read(&mut *reader, len)?
363 }
364 };
365
366 Ok((
367 ext::ValueType {
368 #[cfg(feature = "shared-memory")]
369 ext_shm,
370 encoding,
371 payload,
372 },
373 more,
374 ))
375 }
376}
377
378impl<W, const ID: u8> WCodec<(&ext::AttachmentType<{ ID }>, bool), &mut W> for Zenoh080
380where
381 W: Writer,
382{
383 type Output = Result<(), DidntWrite>;
384
385 fn write(self, writer: &mut W, x: (&ext::AttachmentType<{ ID }>, bool)) -> Self::Output {
386 let (x, more) = x;
387 let ext::AttachmentType { buffer } = x;
388
389 let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(buffer));
390 self.write(&mut *writer, (&header, more))?;
391 for s in buffer.zslices() {
392 writer.write_zslice(s)?;
393 }
394
395 Ok(())
396 }
397}
398
399impl<R, const ID: u8> RCodec<(ext::AttachmentType<{ ID }>, bool), &mut R> for Zenoh080Header
400where
401 R: Reader,
402{
403 type Error = DidntRead;
404
405 fn read(self, reader: &mut R) -> Result<(ext::AttachmentType<{ ID }>, bool), Self::Error> {
406 let (h, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;
407 let mut buffer = ZBuf::empty();
408 reader.read_zslices(h.len, |s| buffer.push_zslice(s))?;
409
410 Ok((ext::AttachmentType { buffer }, more))
411 }
412}