zenoh_codec/network/
oam.rs1use zenoh_buffers::{
15 reader::{DidntRead, Reader},
16 writer::{DidntWrite, Writer},
17 ZBuf,
18};
19use zenoh_protocol::{
20 common::{iext, imsg, ZExtBody},
21 network::{
22 id,
23 oam::{ext, flag, Oam, OamId},
24 },
25};
26
27use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
28
29impl<W> WCodec<&Oam, &mut W> for Zenoh080
30where
31 W: Writer,
32{
33 type Output = Result<(), DidntWrite>;
34
35 fn write(self, writer: &mut W, x: &Oam) -> Self::Output {
36 let Oam {
37 id,
38 body,
39 ext_qos,
40 ext_tstamp,
41 } = x;
42
43 let mut header = id::OAM;
45 match &body {
46 ZExtBody::Unit => {
47 header |= iext::ENC_UNIT;
48 }
49 ZExtBody::Z64(_) => {
50 header |= iext::ENC_Z64;
51 }
52 ZExtBody::ZBuf(_) => {
53 header |= iext::ENC_ZBUF;
54 }
55 }
56 let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8);
57 if n_exts != 0 {
58 header |= flag::Z;
59 }
60 self.write(&mut *writer, header)?;
61
62 self.write(&mut *writer, id)?;
64
65 if ext_qos != &ext::QoSType::DEFAULT {
67 n_exts -= 1;
68 self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
69 }
70 if let Some(ts) = ext_tstamp.as_ref() {
71 n_exts -= 1;
72 self.write(&mut *writer, (ts, n_exts != 0))?;
73 }
74
75 match body {
77 ZExtBody::Unit => {}
78 ZExtBody::Z64(u64) => {
79 self.write(&mut *writer, u64)?;
80 }
81 ZExtBody::ZBuf(zbuf) => {
82 self.write(&mut *writer, zbuf)?;
83 }
84 }
85
86 Ok(())
87 }
88}
89
90impl<R> RCodec<Oam, &mut R> for Zenoh080
91where
92 R: Reader,
93{
94 type Error = DidntRead;
95
96 fn read(self, reader: &mut R) -> Result<Oam, Self::Error> {
97 let header: u8 = self.read(&mut *reader)?;
98 let codec = Zenoh080Header::new(header);
99 codec.read(reader)
100 }
101}
102
103impl<R> RCodec<Oam, &mut R> for Zenoh080Header
104where
105 R: Reader,
106{
107 type Error = DidntRead;
108
109 fn read(self, reader: &mut R) -> Result<Oam, Self::Error> {
110 if imsg::mid(self.header) != id::OAM {
111 return Err(DidntRead);
112 }
113
114 let id: OamId = self.codec.read(&mut *reader)?;
116
117 let mut ext_qos = ext::QoSType::DEFAULT;
119 let mut ext_tstamp = None;
120
121 let mut has_ext = imsg::has_flag(self.header, flag::Z);
122 while has_ext {
123 let ext: u8 = self.codec.read(&mut *reader)?;
124 let eodec = Zenoh080Header::new(ext);
125 match iext::eid(ext) {
126 ext::QoS::ID => {
127 let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
128 ext_qos = q;
129 has_ext = ext;
130 }
131 ext::Timestamp::ID => {
132 let (t, ext): (ext::TimestampType, bool) = eodec.read(&mut *reader)?;
133 ext_tstamp = Some(t);
134 has_ext = ext;
135 }
136 _ => {
137 has_ext = extension::skip(reader, "OAM", ext)?;
138 }
139 }
140 }
141
142 let body = match self.header & iext::ENC_MASK {
144 iext::ENC_UNIT => ZExtBody::Unit,
145 iext::ENC_Z64 => {
146 let u64: u64 = self.codec.read(&mut *reader)?;
147 ZExtBody::Z64(u64)
148 }
149 iext::ENC_ZBUF => {
150 let zbuf: ZBuf = self.codec.read(&mut *reader)?;
151 ZExtBody::ZBuf(zbuf)
152 }
153 _ => return Err(DidntRead),
154 };
155
156 Ok(Oam {
157 id,
158 body,
159 ext_qos,
160 ext_tstamp,
161 })
162 }
163}