zenoh_codec/network/
response.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::{
15    reader::{DidntRead, Reader},
16    writer::{DidntWrite, Writer},
17};
18use zenoh_protocol::{
19    common::{iext, imsg},
20    core::WireExpr,
21    network::{
22        id,
23        response::{ext, flag},
24        Mapping, RequestId, Response, ResponseFinal,
25    },
26    zenoh::ResponseBody,
27};
28
29use crate::{
30    common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Condition, Zenoh080Header,
31};
32
33// Response
34impl<W> WCodec<&Response, &mut W> for Zenoh080
35where
36    W: Writer,
37{
38    type Output = Result<(), DidntWrite>;
39
40    fn write(self, writer: &mut W, x: &Response) -> Self::Output {
41        let Response {
42            rid,
43            wire_expr,
44            payload,
45            ext_qos,
46            ext_tstamp,
47            ext_respid,
48        } = x;
49
50        // Header
51        let mut header = id::RESPONSE;
52        let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8)
53            + (ext_tstamp.is_some() as u8)
54            + (ext_respid.is_some() as u8);
55        if n_exts != 0 {
56            header |= flag::Z;
57        }
58        if wire_expr.mapping != Mapping::DEFAULT {
59            header |= flag::M;
60        }
61        if wire_expr.has_suffix() {
62            header |= flag::N;
63        }
64        self.write(&mut *writer, header)?;
65
66        // Body
67        self.write(&mut *writer, rid)?;
68        self.write(&mut *writer, wire_expr)?;
69
70        // Extensions
71        if ext_qos != &ext::QoSType::DEFAULT {
72            n_exts -= 1;
73            self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
74        }
75        if let Some(ts) = ext_tstamp.as_ref() {
76            n_exts -= 1;
77            self.write(&mut *writer, (ts, n_exts != 0))?;
78        }
79        if let Some(ri) = ext_respid.as_ref() {
80            n_exts -= 1;
81            self.write(&mut *writer, (ri, n_exts != 0))?;
82        }
83
84        // Payload
85        self.write(&mut *writer, payload)?;
86
87        Ok(())
88    }
89}
90
91impl<R> RCodec<Response, &mut R> for Zenoh080
92where
93    R: Reader,
94{
95    type Error = DidntRead;
96
97    fn read(self, reader: &mut R) -> Result<Response, Self::Error> {
98        let header: u8 = self.read(&mut *reader)?;
99        let codec = Zenoh080Header::new(header);
100        codec.read(reader)
101    }
102}
103
104impl<R> RCodec<Response, &mut R> for Zenoh080Header
105where
106    R: Reader,
107{
108    type Error = DidntRead;
109
110    fn read(self, reader: &mut R) -> Result<Response, Self::Error> {
111        if imsg::mid(self.header) != id::RESPONSE {
112            return Err(DidntRead);
113        }
114
115        // Body
116        let bodec = Zenoh080Bounded::<RequestId>::new();
117        let rid: RequestId = bodec.read(&mut *reader)?;
118        let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, flag::N));
119        let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
120        wire_expr.mapping = if imsg::has_flag(self.header, flag::M) {
121            Mapping::Sender
122        } else {
123            Mapping::Receiver
124        };
125
126        // Extensions
127        let mut ext_qos = ext::QoSType::DEFAULT;
128        let mut ext_tstamp = None;
129        let mut ext_respid = None;
130
131        let mut has_ext = imsg::has_flag(self.header, flag::Z);
132        while has_ext {
133            let ext: u8 = self.codec.read(&mut *reader)?;
134            let eodec = Zenoh080Header::new(ext);
135            match iext::eid(ext) {
136                ext::QoS::ID => {
137                    let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
138                    ext_qos = q;
139                    has_ext = ext;
140                }
141                ext::Timestamp::ID => {
142                    let (t, ext): (ext::TimestampType, bool) = eodec.read(&mut *reader)?;
143                    ext_tstamp = Some(t);
144                    has_ext = ext;
145                }
146                ext::ResponderId::ID => {
147                    let (t, ext): (ext::ResponderIdType, bool) = eodec.read(&mut *reader)?;
148                    ext_respid = Some(t);
149                    has_ext = ext;
150                }
151                _ => {
152                    has_ext = extension::skip(reader, "Response", ext)?;
153                }
154            }
155        }
156
157        // Payload
158        let payload: ResponseBody = self.codec.read(&mut *reader)?;
159
160        Ok(Response {
161            rid,
162            wire_expr,
163            payload,
164            ext_qos,
165            ext_tstamp,
166            ext_respid,
167        })
168    }
169}
170
171// ResponseFinal
172impl<W> WCodec<&ResponseFinal, &mut W> for Zenoh080
173where
174    W: Writer,
175{
176    type Output = Result<(), DidntWrite>;
177
178    fn write(self, writer: &mut W, x: &ResponseFinal) -> Self::Output {
179        let ResponseFinal {
180            rid,
181            ext_qos,
182            ext_tstamp,
183        } = x;
184
185        // Header
186        let mut header = id::RESPONSE_FINAL;
187        let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8);
188        if n_exts != 0 {
189            header |= flag::Z;
190        }
191        self.write(&mut *writer, header)?;
192
193        // Body
194        self.write(&mut *writer, rid)?;
195
196        // Extensions
197        if ext_qos != &ext::QoSType::DEFAULT {
198            n_exts -= 1;
199            self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
200        }
201        if let Some(ts) = ext_tstamp.as_ref() {
202            n_exts -= 1;
203            self.write(&mut *writer, (ts, n_exts != 0))?;
204        }
205
206        Ok(())
207    }
208}
209
210impl<R> RCodec<ResponseFinal, &mut R> for Zenoh080
211where
212    R: Reader,
213{
214    type Error = DidntRead;
215
216    fn read(self, reader: &mut R) -> Result<ResponseFinal, Self::Error> {
217        let header: u8 = self.read(&mut *reader)?;
218        let codec = Zenoh080Header::new(header);
219        codec.read(reader)
220    }
221}
222
223impl<R> RCodec<ResponseFinal, &mut R> for Zenoh080Header
224where
225    R: Reader,
226{
227    type Error = DidntRead;
228
229    fn read(self, reader: &mut R) -> Result<ResponseFinal, Self::Error> {
230        if imsg::mid(self.header) != id::RESPONSE_FINAL {
231            return Err(DidntRead);
232        }
233
234        // Body
235        let bodec = Zenoh080Bounded::<RequestId>::new();
236        let rid: RequestId = bodec.read(&mut *reader)?;
237
238        // Extensions
239        let mut ext_qos = ext::QoSType::DEFAULT;
240        let mut ext_tstamp = None;
241
242        let mut has_ext = imsg::has_flag(self.header, flag::Z);
243        while has_ext {
244            let ext: u8 = self.codec.read(&mut *reader)?;
245            let eodec = Zenoh080Header::new(ext);
246            match iext::eid(ext) {
247                ext::QoS::ID => {
248                    let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
249                    ext_qos = q;
250                    has_ext = ext;
251                }
252                ext::Timestamp::ID => {
253                    let (t, ext): (ext::TimestampType, bool) = eodec.read(&mut *reader)?;
254                    ext_tstamp = Some(t);
255                    has_ext = ext;
256                }
257                _ => {
258                    has_ext = extension::skip(reader, "ResponseFinal", ext)?;
259                }
260            }
261        }
262
263        Ok(ResponseFinal {
264            rid,
265            ext_qos,
266            ext_tstamp,
267        })
268    }
269}