1use alloc::vec::Vec;
15
16use zenoh_buffers::{
17 reader::{DidntRead, Reader},
18 writer::{DidntWrite, Writer},
19 ZBuf,
20};
21use zenoh_protocol::{
22 common::{iext, imsg},
23 core::Encoding,
24 zenoh::{
25 id,
26 put::{ext, flag, Put},
27 },
28};
29
30#[cfg(not(feature = "shared-memory"))]
31use crate::Zenoh080Bounded;
32#[cfg(feature = "shared-memory")]
33use crate::Zenoh080Sliced;
34use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
35
36impl<W> WCodec<&Put, &mut W> for Zenoh080
37where
38 W: Writer,
39{
40 type Output = Result<(), DidntWrite>;
41
42 fn write(self, writer: &mut W, x: &Put) -> Self::Output {
43 let Put {
44 timestamp,
45 encoding,
46 ext_sinfo,
47 ext_attachment,
48 #[cfg(feature = "shared-memory")]
49 ext_shm,
50 ext_unknown,
51 payload,
52 }: &Put = x;
53
54 let mut header = id::PUT;
56 if timestamp.is_some() {
57 header |= flag::T;
58 }
59 if encoding != &Encoding::empty() {
60 header |= flag::E;
61 }
62 let mut n_exts = (ext_sinfo.is_some()) as u8
63 + (ext_attachment.is_some()) as u8
64 + (ext_unknown.len() as u8);
65 #[cfg(feature = "shared-memory")]
66 {
67 n_exts += ext_shm.is_some() as u8;
68 }
69 if n_exts != 0 {
70 header |= flag::Z;
71 }
72 self.write(&mut *writer, header)?;
73
74 if let Some(ts) = timestamp.as_ref() {
76 self.write(&mut *writer, ts)?;
77 }
78 if encoding != &Encoding::empty() {
79 self.write(&mut *writer, encoding)?;
80 }
81
82 if let Some(sinfo) = ext_sinfo.as_ref() {
84 n_exts -= 1;
85 self.write(&mut *writer, (sinfo, n_exts != 0))?;
86 }
87 #[cfg(feature = "shared-memory")]
88 if let Some(eshm) = ext_shm.as_ref() {
89 n_exts -= 1;
90 self.write(&mut *writer, (eshm, n_exts != 0))?;
91 }
92 if let Some(att) = ext_attachment.as_ref() {
93 n_exts -= 1;
94 self.write(&mut *writer, (att, n_exts != 0))?;
95 }
96 for u in ext_unknown.iter() {
97 n_exts -= 1;
98 self.write(&mut *writer, (u, n_exts != 0))?;
99 }
100
101 #[cfg(feature = "shared-memory")]
103 {
104 let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
105 codec.write(&mut *writer, payload)?;
106 }
107
108 #[cfg(not(feature = "shared-memory"))]
109 {
110 let bodec = Zenoh080Bounded::<u32>::new();
111 bodec.write(&mut *writer, payload)?;
112 }
113
114 Ok(())
115 }
116}
117
118impl<R> RCodec<Put, &mut R> for Zenoh080
119where
120 R: Reader,
121{
122 type Error = DidntRead;
123
124 fn read(self, reader: &mut R) -> Result<Put, Self::Error> {
125 let header: u8 = self.read(&mut *reader)?;
126 let codec = Zenoh080Header::new(header);
127 codec.read(reader)
128 }
129}
130
131impl<R> RCodec<Put, &mut R> for Zenoh080Header
132where
133 R: Reader,
134{
135 type Error = DidntRead;
136
137 fn read(self, reader: &mut R) -> Result<Put, Self::Error> {
138 if imsg::mid(self.header) != id::PUT {
139 return Err(DidntRead);
140 }
141
142 let mut timestamp: Option<uhlc::Timestamp> = None;
144 if imsg::has_flag(self.header, flag::T) {
145 timestamp = Some(self.codec.read(&mut *reader)?);
146 }
147
148 let mut encoding = Encoding::empty();
149 if imsg::has_flag(self.header, flag::E) {
150 encoding = self.codec.read(&mut *reader)?;
151 }
152
153 let mut ext_sinfo: Option<ext::SourceInfoType> = None;
155 #[cfg(feature = "shared-memory")]
156 let mut ext_shm: Option<ext::ShmType> = None;
157 let mut ext_attachment: Option<ext::AttachmentType> = None;
158 let mut ext_unknown = Vec::new();
159
160 let mut has_ext = imsg::has_flag(self.header, flag::Z);
161 while has_ext {
162 let ext: u8 = self.codec.read(&mut *reader)?;
163 let eodec = Zenoh080Header::new(ext);
164 match iext::eid(ext) {
165 ext::SourceInfo::ID => {
166 let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
167 ext_sinfo = Some(s);
168 has_ext = ext;
169 }
170 #[cfg(feature = "shared-memory")]
171 ext::Shm::ID => {
172 let (s, ext): (ext::ShmType, bool) = eodec.read(&mut *reader)?;
173 ext_shm = Some(s);
174 has_ext = ext;
175 }
176 ext::Attachment::ID => {
177 let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?;
178 ext_attachment = Some(a);
179 has_ext = ext;
180 }
181 _ => {
182 let (u, ext) = extension::read(reader, "Put", ext)?;
183 ext_unknown.push(u);
184 has_ext = ext;
185 }
186 }
187 }
188
189 let payload: ZBuf = {
191 #[cfg(feature = "shared-memory")]
192 {
193 let codec = Zenoh080Sliced::<u32>::new(ext_shm.is_some());
194 codec.read(&mut *reader)?
195 }
196
197 #[cfg(not(feature = "shared-memory"))]
198 {
199 let bodec = Zenoh080Bounded::<u32>::new();
200 bodec.read(&mut *reader)?
201 }
202 };
203
204 Ok(Put {
205 timestamp,
206 encoding,
207 ext_sinfo,
208 #[cfg(feature = "shared-memory")]
209 ext_shm,
210 ext_attachment,
211 ext_unknown,
212 payload,
213 })
214 }
215}