zenoh_codec/network/
interest.rs1use zenoh_buffers::{
15 reader::{DidntRead, Reader},
16 writer::{DidntWrite, Writer},
17};
18use zenoh_protocol::{
19 common::{
20 iext,
21 imsg::{self, HEADER_BITS},
22 },
23 core::WireExpr,
24 network::{
25 declare, id,
26 interest::{self, Interest, InterestMode, InterestOptions},
27 Mapping,
28 },
29};
30
31use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Condition, Zenoh080Header};
32
33impl<W> WCodec<&Interest, &mut W> for Zenoh080
35where
36 W: Writer,
37{
38 type Output = Result<(), DidntWrite>;
39
40 fn write(self, writer: &mut W, x: &Interest) -> Self::Output {
41 let Interest {
42 id,
43 mode,
44 options: _, wire_expr,
46 ext_qos,
47 ext_tstamp,
48 ext_nodeid,
49 } = x;
50
51 let mut header = id::INTEREST;
53 header |= match mode {
54 InterestMode::Final => 0b00,
55 InterestMode::Current => 0b01,
56 InterestMode::Future => 0b10,
57 InterestMode::CurrentFuture => 0b11,
58 } << HEADER_BITS;
59 let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
60 + (ext_tstamp.is_some() as u8)
61 + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
62 if n_exts != 0 {
63 header |= declare::flag::Z;
64 }
65 self.write(&mut *writer, header)?;
66
67 self.write(&mut *writer, id)?;
68
69 if *mode != InterestMode::Final {
70 self.write(&mut *writer, x.options())?;
71 if let Some(we) = wire_expr.as_ref() {
72 self.write(&mut *writer, we)?;
73 }
74 }
75
76 if ext_qos != &declare::ext::QoSType::DEFAULT {
78 n_exts -= 1;
79 self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
80 }
81 if let Some(ts) = ext_tstamp.as_ref() {
82 n_exts -= 1;
83 self.write(&mut *writer, (ts, n_exts != 0))?;
84 }
85 if ext_nodeid != &declare::ext::NodeIdType::DEFAULT {
86 n_exts -= 1;
87 self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?;
88 }
89
90 Ok(())
91 }
92}
93
94impl<R> RCodec<Interest, &mut R> for Zenoh080
95where
96 R: Reader,
97{
98 type Error = DidntRead;
99
100 fn read(self, reader: &mut R) -> Result<Interest, Self::Error> {
101 let header: u8 = self.read(&mut *reader)?;
102 let codec = Zenoh080Header::new(header);
103
104 codec.read(reader)
105 }
106}
107
108impl<R> RCodec<Interest, &mut R> for Zenoh080Header
109where
110 R: Reader,
111{
112 type Error = DidntRead;
113
114 fn read(self, reader: &mut R) -> Result<Interest, Self::Error> {
115 if imsg::mid(self.header) != id::INTEREST {
116 return Err(DidntRead);
117 }
118
119 let id = self.codec.read(&mut *reader)?;
120 let mode = match (self.header >> HEADER_BITS) & 0b11 {
121 0b00 => InterestMode::Final,
122 0b01 => InterestMode::Current,
123 0b10 => InterestMode::Future,
124 0b11 => InterestMode::CurrentFuture,
125 _ => return Err(DidntRead),
126 };
127
128 let mut options = InterestOptions::empty();
129 let mut wire_expr = None;
130 if mode != InterestMode::Final {
131 let options_byte: u8 = self.codec.read(&mut *reader)?;
132 options = InterestOptions::from(options_byte);
133 if options.restricted() {
134 let ccond = Zenoh080Condition::new(options.named());
135 let mut we: WireExpr<'static> = ccond.read(&mut *reader)?;
136 we.mapping = if options.mapping() {
137 Mapping::Sender
138 } else {
139 Mapping::Receiver
140 };
141 wire_expr = Some(we);
142 }
143 }
144
145 let mut ext_qos = declare::ext::QoSType::DEFAULT;
147 let mut ext_tstamp = None;
148 let mut ext_nodeid = declare::ext::NodeIdType::DEFAULT;
149
150 let mut has_ext = imsg::has_flag(self.header, declare::flag::Z);
151 while has_ext {
152 let ext: u8 = self.codec.read(&mut *reader)?;
153 let eodec = Zenoh080Header::new(ext);
154 match iext::eid(ext) {
155 declare::ext::QoS::ID => {
156 let (q, ext): (interest::ext::QoSType, bool) = eodec.read(&mut *reader)?;
157 ext_qos = q;
158 has_ext = ext;
159 }
160 declare::ext::Timestamp::ID => {
161 let (t, ext): (interest::ext::TimestampType, bool) =
162 eodec.read(&mut *reader)?;
163 ext_tstamp = Some(t);
164 has_ext = ext;
165 }
166 declare::ext::NodeId::ID => {
167 let (nid, ext): (interest::ext::NodeIdType, bool) = eodec.read(&mut *reader)?;
168 ext_nodeid = nid;
169 has_ext = ext;
170 }
171 _ => {
172 has_ext = extension::skip(reader, "Declare", ext)?;
173 }
174 }
175 }
176
177 Ok(Interest {
178 id,
179 mode,
180 options,
181 wire_expr,
182 ext_qos,
183 ext_tstamp,
184 ext_nodeid,
185 })
186 }
187}