zenoh_codec/zenoh/
query.rs

1//
2// Copyright (c) 2022 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use alloc::{string::String, vec::Vec};
15
16use zenoh_buffers::{
17    reader::{DidntRead, Reader},
18    writer::{DidntWrite, Writer},
19};
20use zenoh_protocol::{
21    common::{iext, imsg},
22    zenoh::{
23        id,
24        query::{ext, flag, ConsolidationMode, Query},
25    },
26};
27
28use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
29
30// Consolidation
31impl<W> WCodec<ConsolidationMode, &mut W> for Zenoh080
32where
33    W: Writer,
34{
35    type Output = Result<(), DidntWrite>;
36
37    fn write(self, writer: &mut W, x: ConsolidationMode) -> Self::Output {
38        let v: u64 = match x {
39            ConsolidationMode::Auto => 0,
40            ConsolidationMode::None => 1,
41            ConsolidationMode::Monotonic => 2,
42            ConsolidationMode::Latest => 3,
43        };
44        self.write(&mut *writer, v)
45    }
46}
47
48impl<R> RCodec<ConsolidationMode, &mut R> for Zenoh080
49where
50    R: Reader,
51{
52    type Error = DidntRead;
53
54    fn read(self, reader: &mut R) -> Result<ConsolidationMode, Self::Error> {
55        let v: u64 = self.read(&mut *reader)?;
56        let c = match v {
57            0 => ConsolidationMode::Auto,
58            1 => ConsolidationMode::None,
59            2 => ConsolidationMode::Monotonic,
60            3 => ConsolidationMode::Latest,
61            _ => ConsolidationMode::Auto, // Fallback on Auto if Consolidation is unknown
62        };
63        Ok(c)
64    }
65}
66
67impl<W> WCodec<&Query, &mut W> for Zenoh080
68where
69    W: Writer,
70{
71    type Output = Result<(), DidntWrite>;
72
73    fn write(self, writer: &mut W, x: &Query) -> Self::Output {
74        let Query {
75            consolidation,
76            parameters,
77            ext_sinfo,
78            ext_body,
79            ext_attachment,
80            ext_unknown,
81        } = x;
82
83        // Header
84        let mut header = id::QUERY;
85        if consolidation != &ConsolidationMode::DEFAULT {
86            header |= flag::C;
87        }
88        if !parameters.is_empty() {
89            header |= flag::P;
90        }
91        let mut n_exts = (ext_sinfo.is_some() as u8)
92            + (ext_body.is_some() as u8)
93            + (ext_attachment.is_some() as u8)
94            + (ext_unknown.len() as u8);
95        if n_exts != 0 {
96            header |= flag::Z;
97        }
98        self.write(&mut *writer, header)?;
99
100        // Body
101        if consolidation != &ConsolidationMode::DEFAULT {
102            self.write(&mut *writer, *consolidation)?;
103        }
104        if !parameters.is_empty() {
105            self.write(&mut *writer, parameters)?;
106        }
107
108        // Extensions
109        if let Some(sinfo) = ext_sinfo.as_ref() {
110            n_exts -= 1;
111            self.write(&mut *writer, (sinfo, n_exts != 0))?;
112        }
113        if let Some(body) = ext_body.as_ref() {
114            n_exts -= 1;
115            self.write(&mut *writer, (body, n_exts != 0))?;
116        }
117        if let Some(att) = ext_attachment.as_ref() {
118            n_exts -= 1;
119            self.write(&mut *writer, (att, n_exts != 0))?;
120        }
121        for u in ext_unknown.iter() {
122            n_exts -= 1;
123            self.write(&mut *writer, (u, n_exts != 0))?;
124        }
125
126        Ok(())
127    }
128}
129
130impl<R> RCodec<Query, &mut R> for Zenoh080
131where
132    R: Reader,
133{
134    type Error = DidntRead;
135
136    fn read(self, reader: &mut R) -> Result<Query, Self::Error> {
137        let header: u8 = self.read(&mut *reader)?;
138        let codec = Zenoh080Header::new(header);
139        codec.read(reader)
140    }
141}
142
143impl<R> RCodec<Query, &mut R> for Zenoh080Header
144where
145    R: Reader,
146{
147    type Error = DidntRead;
148
149    fn read(self, reader: &mut R) -> Result<Query, Self::Error> {
150        if imsg::mid(self.header) != id::QUERY {
151            return Err(DidntRead);
152        }
153
154        // Body
155        let mut consolidation = ConsolidationMode::DEFAULT;
156        if imsg::has_flag(self.header, flag::C) {
157            consolidation = self.codec.read(&mut *reader)?;
158        }
159
160        let mut parameters = String::new();
161        if imsg::has_flag(self.header, flag::P) {
162            parameters = self.codec.read(&mut *reader)?;
163        }
164
165        // Extensions
166        let mut ext_sinfo: Option<ext::SourceInfoType> = None;
167        let mut ext_body: Option<ext::QueryBodyType> = None;
168        let mut ext_attachment: Option<ext::AttachmentType> = None;
169        let mut ext_unknown = Vec::new();
170
171        let mut has_ext = imsg::has_flag(self.header, flag::Z);
172        while has_ext {
173            let ext: u8 = self.codec.read(&mut *reader)?;
174            let eodec = Zenoh080Header::new(ext);
175            match iext::eid(ext) {
176                ext::SourceInfo::ID => {
177                    let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?;
178                    ext_sinfo = Some(s);
179                    has_ext = ext;
180                }
181                ext::QueryBodyType::SID | ext::QueryBodyType::VID => {
182                    let (s, ext): (ext::QueryBodyType, bool) = eodec.read(&mut *reader)?;
183                    ext_body = Some(s);
184                    has_ext = ext;
185                }
186                ext::Attachment::ID => {
187                    let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?;
188                    ext_attachment = Some(a);
189                    has_ext = ext;
190                }
191                _ => {
192                    let (u, ext) = extension::read(reader, "Query", ext)?;
193                    ext_unknown.push(u);
194                    has_ext = ext;
195                }
196            }
197        }
198
199        Ok(Query {
200            consolidation,
201            parameters,
202            ext_sinfo,
203            ext_body,
204            ext_attachment,
205            ext_unknown,
206        })
207    }
208}