zenoh_codec/transport/
oam.rs1use zenoh_buffers::{
15 reader::{DidntRead, Reader},
16 writer::{DidntWrite, Writer},
17 ZBuf,
18};
19use zenoh_protocol::{
20 common::{iext, imsg, ZExtBody},
21 transport::{
22 id,
23 oam::{ext, flag, Oam, OamId},
24 },
25};
26
27use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
28
29impl<W> WCodec<&Oam, &mut W> for Zenoh080
30where
31 W: Writer,
32{
33 type Output = Result<(), DidntWrite>;
34
35 fn write(self, writer: &mut W, x: &Oam) -> Self::Output {
36 let Oam { id, body, ext_qos } = x;
37
38 let mut header = id::OAM;
40 match &body {
41 ZExtBody::Unit => {
42 header |= iext::ENC_UNIT;
43 }
44 ZExtBody::Z64(_) => {
45 header |= iext::ENC_Z64;
46 }
47 ZExtBody::ZBuf(_) => {
48 header |= iext::ENC_ZBUF;
49 }
50 }
51 let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8;
52 if n_exts != 0 {
53 header |= flag::Z;
54 }
55 self.write(&mut *writer, header)?;
56
57 self.write(&mut *writer, id)?;
59
60 if ext_qos != &ext::QoSType::DEFAULT {
62 n_exts -= 1;
63 self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
64 }
65
66 match &body {
68 ZExtBody::Unit => {}
69 ZExtBody::Z64(u64) => {
70 self.write(&mut *writer, u64)?;
71 }
72 ZExtBody::ZBuf(zbuf) => {
73 self.write(&mut *writer, zbuf)?;
74 }
75 }
76
77 Ok(())
78 }
79}
80
81impl<R> RCodec<Oam, &mut R> for Zenoh080
82where
83 R: Reader,
84{
85 type Error = DidntRead;
86
87 fn read(self, reader: &mut R) -> Result<Oam, Self::Error> {
88 let header: u8 = self.read(&mut *reader)?;
89 let codec = Zenoh080Header::new(header);
90 codec.read(reader)
91 }
92}
93
94impl<R> RCodec<Oam, &mut R> for Zenoh080Header
95where
96 R: Reader,
97{
98 type Error = DidntRead;
99
100 fn read(self, reader: &mut R) -> Result<Oam, Self::Error> {
101 if imsg::mid(self.header) != id::OAM {
102 return Err(DidntRead);
103 }
104
105 let id: OamId = self.codec.read(&mut *reader)?;
107
108 let mut ext_qos = ext::QoSType::DEFAULT;
110
111 let mut has_ext = imsg::has_flag(self.header, flag::Z);
112 while has_ext {
113 let ext: u8 = self.codec.read(&mut *reader)?;
114 let eodec = Zenoh080Header::new(ext);
115 match iext::eid(ext) {
116 ext::QoS::ID => {
117 let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
118 ext_qos = q;
119 has_ext = ext;
120 }
121 _ => {
122 has_ext = extension::skip(reader, "OAM", ext)?;
123 }
124 }
125 }
126
127 let body = match self.header & iext::ENC_MASK {
129 iext::ENC_UNIT => ZExtBody::Unit,
130 iext::ENC_Z64 => {
131 let u64: u64 = self.codec.read(&mut *reader)?;
132 ZExtBody::Z64(u64)
133 }
134 iext::ENC_ZBUF => {
135 let zbuf: ZBuf = self.codec.read(&mut *reader)?;
136 ZExtBody::ZBuf(zbuf)
137 }
138 _ => return Err(DidntRead),
139 };
140
141 Ok(Oam { id, body, ext_qos })
142 }
143}