zenoh_codec/zenoh/
reply.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::vec::Vec;
15
16use zenoh_buffers::{
17    reader::{DidntRead, Reader},
18    writer::{DidntWrite, Writer},
19};
20use zenoh_protocol::{
21    common::imsg,
22    zenoh::{
23        id,
24        query::ConsolidationMode,
25        reply::{flag, Reply, ReplyBody},
26    },
27};
28
29use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
30
31impl<W> WCodec<&Reply, &mut W> for Zenoh080
32where
33    W: Writer,
34{
35    type Output = Result<(), DidntWrite>;
36
37    fn write(self, writer: &mut W, x: &Reply) -> Self::Output {
38        let Reply {
39            consolidation,
40            ext_unknown,
41            payload,
42        } = x;
43
44        // Header
45        let mut header = id::REPLY;
46        if consolidation != &ConsolidationMode::DEFAULT {
47            header |= flag::C;
48        }
49        let mut n_exts = ext_unknown.len() as u8;
50        if n_exts != 0 {
51            header |= flag::Z;
52        }
53        self.write(&mut *writer, header)?;
54
55        // Body
56        if consolidation != &ConsolidationMode::DEFAULT {
57            self.write(&mut *writer, *consolidation)?;
58        }
59
60        // Extensions
61        for u in ext_unknown.iter() {
62            n_exts -= 1;
63            self.write(&mut *writer, (u, n_exts != 0))?;
64        }
65
66        // Payload
67        self.write(&mut *writer, payload)?;
68
69        Ok(())
70    }
71}
72
73impl<R> RCodec<Reply, &mut R> for Zenoh080
74where
75    R: Reader,
76{
77    type Error = DidntRead;
78
79    fn read(self, reader: &mut R) -> Result<Reply, Self::Error> {
80        let header: u8 = self.read(&mut *reader)?;
81        let codec = Zenoh080Header::new(header);
82        codec.read(reader)
83    }
84}
85
86impl<R> RCodec<Reply, &mut R> for Zenoh080Header
87where
88    R: Reader,
89{
90    type Error = DidntRead;
91
92    fn read(self, reader: &mut R) -> Result<Reply, Self::Error> {
93        if imsg::mid(self.header) != id::REPLY {
94            return Err(DidntRead);
95        }
96
97        // Body
98        let mut consolidation = ConsolidationMode::DEFAULT;
99        if imsg::has_flag(self.header, flag::C) {
100            consolidation = self.codec.read(&mut *reader)?;
101        }
102
103        // Extensions
104        let mut ext_unknown = Vec::new();
105
106        let mut has_ext = imsg::has_flag(self.header, flag::Z);
107        while has_ext {
108            let ext: u8 = self.codec.read(&mut *reader)?;
109            let (u, ext) = extension::read(reader, "Reply", ext)?;
110            ext_unknown.push(u);
111            has_ext = ext;
112        }
113
114        // Payload
115        let payload: ReplyBody = self.codec.read(&mut *reader)?;
116
117        Ok(Reply {
118            consolidation,
119            ext_unknown,
120            payload,
121        })
122    }
123}