zenoh_codec/network/
request.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use zenoh_buffers::{
15    reader::{DidntRead, Reader},
16    writer::{DidntWrite, Writer},
17};
18use zenoh_protocol::{
19    common::{iext, imsg},
20    core::WireExpr,
21    network::{
22        id,
23        request::{ext, flag},
24        Mapping, Request, RequestId,
25    },
26    zenoh::RequestBody,
27};
28
29use crate::{
30    common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Condition, Zenoh080Header,
31};
32
33// Target
34impl<W> WCodec<(&ext::QueryTarget, bool), &mut W> for Zenoh080
35where
36    W: Writer,
37{
38    type Output = Result<(), DidntWrite>;
39
40    fn write(self, writer: &mut W, x: (&ext::QueryTarget, bool)) -> Self::Output {
41        let (x, more) = x;
42
43        let v = match x {
44            ext::QueryTarget::BestMatching => 0,
45            ext::QueryTarget::All => 1,
46            ext::QueryTarget::AllComplete => 2,
47        };
48        let ext = ext::Target::new(v);
49        self.write(&mut *writer, (&ext, more))
50    }
51}
52
53impl<R> RCodec<(ext::QueryTarget, bool), &mut R> for Zenoh080Header
54where
55    R: Reader,
56{
57    type Error = DidntRead;
58
59    fn read(self, reader: &mut R) -> Result<(ext::QueryTarget, bool), Self::Error> {
60        let (ext, more): (ext::Target, bool) = self.read(&mut *reader)?;
61        let rt = match ext.value {
62            0 => ext::QueryTarget::BestMatching,
63            1 => ext::QueryTarget::All,
64            2 => ext::QueryTarget::AllComplete,
65            _ => return Err(DidntRead),
66        };
67        Ok((rt, more))
68    }
69}
70
71impl<W> WCodec<&Request, &mut W> for Zenoh080
72where
73    W: Writer,
74{
75    type Output = Result<(), DidntWrite>;
76
77    fn write(self, writer: &mut W, x: &Request) -> Self::Output {
78        let Request {
79            id,
80            wire_expr,
81            ext_qos,
82            ext_tstamp,
83            ext_nodeid,
84            ext_target,
85            ext_budget,
86            ext_timeout,
87            payload,
88        } = x;
89
90        // Header
91        let mut header = id::REQUEST;
92        let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8)
93            + (ext_tstamp.is_some() as u8)
94            + ((ext_target != &ext::QueryTarget::DEFAULT) as u8)
95            + (ext_budget.is_some() as u8)
96            + (ext_timeout.is_some() as u8)
97            + ((ext_nodeid != &ext::NodeIdType::DEFAULT) as u8);
98        if n_exts != 0 {
99            header |= flag::Z;
100        }
101        if wire_expr.mapping != Mapping::DEFAULT {
102            header |= flag::M;
103        }
104        if wire_expr.has_suffix() {
105            header |= flag::N;
106        }
107        self.write(&mut *writer, header)?;
108
109        // Body
110        self.write(&mut *writer, id)?;
111        self.write(&mut *writer, wire_expr)?;
112
113        // Extensions
114        if ext_qos != &ext::QoSType::DEFAULT {
115            n_exts -= 1;
116            self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
117        }
118        if let Some(ts) = ext_tstamp.as_ref() {
119            n_exts -= 1;
120            self.write(&mut *writer, (ts, n_exts != 0))?;
121        }
122        if ext_target != &ext::QueryTarget::DEFAULT {
123            n_exts -= 1;
124            self.write(&mut *writer, (ext_target, n_exts != 0))?;
125        }
126        if let Some(l) = ext_budget.as_ref() {
127            n_exts -= 1;
128            let e = ext::Budget::new(l.get() as u64);
129            self.write(&mut *writer, (&e, n_exts != 0))?;
130        }
131        if let Some(to) = ext_timeout.as_ref() {
132            n_exts -= 1;
133            let e = ext::Timeout::new(to.as_millis() as u64);
134            self.write(&mut *writer, (&e, n_exts != 0))?;
135        }
136        if ext_nodeid != &ext::NodeIdType::DEFAULT {
137            n_exts -= 1;
138            self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?;
139        }
140
141        // Payload
142        self.write(&mut *writer, payload)?;
143
144        Ok(())
145    }
146}
147
148impl<R> RCodec<Request, &mut R> for Zenoh080
149where
150    R: Reader,
151{
152    type Error = DidntRead;
153
154    fn read(self, reader: &mut R) -> Result<Request, Self::Error> {
155        let header: u8 = self.read(&mut *reader)?;
156        let codec = Zenoh080Header::new(header);
157        codec.read(reader)
158    }
159}
160
161impl<R> RCodec<Request, &mut R> for Zenoh080Header
162where
163    R: Reader,
164{
165    type Error = DidntRead;
166
167    fn read(self, reader: &mut R) -> Result<Request, Self::Error> {
168        if imsg::mid(self.header) != id::REQUEST {
169            return Err(DidntRead);
170        }
171
172        // Body
173        let bodec = Zenoh080Bounded::<RequestId>::new();
174        let id: RequestId = bodec.read(&mut *reader)?;
175        let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, flag::N));
176        let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?;
177        wire_expr.mapping = if imsg::has_flag(self.header, flag::M) {
178            Mapping::Sender
179        } else {
180            Mapping::Receiver
181        };
182
183        // Extensions
184        let mut ext_qos = ext::QoSType::DEFAULT;
185        let mut ext_tstamp = None;
186        let mut ext_nodeid = ext::NodeIdType::DEFAULT;
187        let mut ext_target = ext::QueryTarget::DEFAULT;
188        let mut ext_limit = None;
189        let mut ext_timeout = None;
190
191        let mut has_ext = imsg::has_flag(self.header, flag::Z);
192        while has_ext {
193            let ext: u8 = self.codec.read(&mut *reader)?;
194            let eodec = Zenoh080Header::new(ext);
195            match iext::eid(ext) {
196                ext::QoS::ID => {
197                    let (q, ext): (ext::QoSType, bool) = eodec.read(&mut *reader)?;
198                    ext_qos = q;
199                    has_ext = ext;
200                }
201                ext::Timestamp::ID => {
202                    let (t, ext): (ext::TimestampType, bool) = eodec.read(&mut *reader)?;
203                    ext_tstamp = Some(t);
204                    has_ext = ext;
205                }
206                ext::NodeId::ID => {
207                    let (nid, ext): (ext::NodeIdType, bool) = eodec.read(&mut *reader)?;
208                    ext_nodeid = nid;
209                    has_ext = ext;
210                }
211                ext::Target::ID => {
212                    let (rt, ext): (ext::QueryTarget, bool) = eodec.read(&mut *reader)?;
213                    ext_target = rt;
214                    has_ext = ext;
215                }
216                ext::Budget::ID => {
217                    let (l, ext): (ext::Budget, bool) = eodec.read(&mut *reader)?;
218                    ext_limit = ext::BudgetType::new(l.value as u32);
219                    has_ext = ext;
220                }
221                ext::Timeout::ID => {
222                    let (to, ext): (ext::Timeout, bool) = eodec.read(&mut *reader)?;
223                    ext_timeout = Some(ext::TimeoutType::from_millis(to.value));
224                    has_ext = ext;
225                }
226                _ => {
227                    has_ext = extension::skip(reader, "Request", ext)?;
228                }
229            }
230        }
231
232        // Payload
233        let payload: RequestBody = self.codec.read(&mut *reader)?;
234
235        Ok(Request {
236            id,
237            wire_expr,
238            payload,
239            ext_qos,
240            ext_tstamp,
241            ext_nodeid,
242            ext_target,
243            ext_budget: ext_limit,
244            ext_timeout,
245        })
246    }
247}