zenoh_codec/transport/
mod.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14pub mod batch;
15mod close;
16mod fragment;
17pub mod frame;
18mod init;
19mod join;
20mod keepalive;
21mod oam;
22mod open;
23
24use zenoh_buffers::{
25    reader::{BacktrackableReader, DidntRead, Reader},
26    writer::{DidntWrite, Writer},
27};
28use zenoh_protocol::{
29    common::{imsg, ZExtZ64},
30    network::NetworkMessage,
31    transport::*,
32};
33
34use crate::{RCodec, WCodec, Zenoh080, Zenoh080Header};
35
36// TransportMessageLowLatency
37impl<W> WCodec<TransportMessageLowLatencyRef<'_>, &mut W> for Zenoh080
38where
39    W: Writer,
40{
41    type Output = Result<(), DidntWrite>;
42
43    fn write(self, writer: &mut W, x: TransportMessageLowLatencyRef) -> Self::Output {
44        let TransportMessageLowLatencyRef { body } = x;
45
46        match body {
47            TransportBodyLowLatencyRef::Network(b) => self.write(&mut *writer, b),
48            TransportBodyLowLatencyRef::KeepAlive(b) => self.write(&mut *writer, &b),
49            TransportBodyLowLatencyRef::Close(b) => self.write(&mut *writer, &b),
50        }
51    }
52}
53
54impl<R> RCodec<TransportMessageLowLatency, &mut R> for Zenoh080
55where
56    R: Reader + BacktrackableReader,
57{
58    type Error = DidntRead;
59
60    fn read(self, reader: &mut R) -> Result<TransportMessageLowLatency, Self::Error> {
61        let header: u8 = self.read(&mut *reader)?;
62
63        let codec = Zenoh080Header::new(header);
64        let body = match imsg::mid(codec.header) {
65            id::KEEP_ALIVE => TransportBodyLowLatency::KeepAlive(codec.read(&mut *reader)?),
66            id::CLOSE => TransportBodyLowLatency::Close(codec.read(&mut *reader)?),
67            _ => {
68                let nw: NetworkMessage = codec.read(&mut *reader)?;
69                TransportBodyLowLatency::Network(nw)
70            }
71        };
72
73        Ok(TransportMessageLowLatency { body })
74    }
75}
76
77// TransportMessage
78impl<W> WCodec<&TransportMessage, &mut W> for Zenoh080
79where
80    W: Writer,
81{
82    type Output = Result<(), DidntWrite>;
83
84    fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
85        let TransportMessage { body, .. } = x;
86
87        match body {
88            TransportBody::Frame(b) => self.write(&mut *writer, b),
89            TransportBody::Fragment(b) => self.write(&mut *writer, b),
90            TransportBody::KeepAlive(b) => self.write(&mut *writer, b),
91            TransportBody::InitSyn(b) => self.write(&mut *writer, b),
92            TransportBody::InitAck(b) => self.write(&mut *writer, b),
93            TransportBody::OpenSyn(b) => self.write(&mut *writer, b),
94            TransportBody::OpenAck(b) => self.write(&mut *writer, b),
95            TransportBody::Close(b) => self.write(&mut *writer, b),
96            TransportBody::OAM(b) => self.write(&mut *writer, b),
97            TransportBody::Join(b) => self.write(&mut *writer, b),
98        }
99    }
100}
101
102impl<R> RCodec<TransportMessage, &mut R> for Zenoh080
103where
104    R: Reader + BacktrackableReader,
105{
106    type Error = DidntRead;
107
108    fn read(self, reader: &mut R) -> Result<TransportMessage, Self::Error> {
109        let header: u8 = self.read(&mut *reader)?;
110
111        let codec = Zenoh080Header::new(header);
112        let body = match imsg::mid(codec.header) {
113            id::FRAME => TransportBody::Frame(codec.read(&mut *reader)?),
114            id::FRAGMENT => TransportBody::Fragment(codec.read(&mut *reader)?),
115            id::KEEP_ALIVE => TransportBody::KeepAlive(codec.read(&mut *reader)?),
116            id::INIT => {
117                if !imsg::has_flag(codec.header, zenoh_protocol::transport::init::flag::A) {
118                    TransportBody::InitSyn(codec.read(&mut *reader)?)
119                } else {
120                    TransportBody::InitAck(codec.read(&mut *reader)?)
121                }
122            }
123            id::OPEN => {
124                if !imsg::has_flag(codec.header, zenoh_protocol::transport::open::flag::A) {
125                    TransportBody::OpenSyn(codec.read(&mut *reader)?)
126                } else {
127                    TransportBody::OpenAck(codec.read(&mut *reader)?)
128                }
129            }
130            id::CLOSE => TransportBody::Close(codec.read(&mut *reader)?),
131            id::OAM => TransportBody::OAM(codec.read(&mut *reader)?),
132            id::JOIN => TransportBody::Join(codec.read(&mut *reader)?),
133            _ => return Err(DidntRead),
134        };
135
136        Ok(body.into())
137    }
138}
139
140// Extensions: QoS
141impl<W, const ID: u8> WCodec<(ext::QoSType<{ ID }>, bool), &mut W> for Zenoh080
142where
143    W: Writer,
144{
145    type Output = Result<(), DidntWrite>;
146
147    fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output {
148        let (x, more) = x;
149        let ext: ZExtZ64<{ ID }> = x.into();
150
151        self.write(&mut *writer, (&ext, more))
152    }
153}
154
155impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080
156where
157    R: Reader,
158{
159    type Error = DidntRead;
160
161    fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
162        let header: u8 = self.read(&mut *reader)?;
163        let codec = Zenoh080Header::new(header);
164        codec.read(reader)
165    }
166}
167
168impl<R, const ID: u8> RCodec<(ext::QoSType<{ ID }>, bool), &mut R> for Zenoh080Header
169where
170    R: Reader,
171{
172    type Error = DidntRead;
173
174    fn read(self, reader: &mut R) -> Result<(ext::QoSType<{ ID }>, bool), Self::Error> {
175        let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
176        Ok((ext.into(), more))
177    }
178}
179
180// Extensions: Patch
181impl<W, const ID: u8> WCodec<(ext::PatchType<{ ID }>, bool), &mut W> for Zenoh080
182where
183    W: Writer,
184{
185    type Output = Result<(), DidntWrite>;
186
187    fn write(self, writer: &mut W, x: (ext::PatchType<{ ID }>, bool)) -> Self::Output {
188        let (x, more) = x;
189        let ext: ZExtZ64<{ ID }> = x.into();
190
191        self.write(&mut *writer, (&ext, more))
192    }
193}
194
195impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080
196where
197    R: Reader,
198{
199    type Error = DidntRead;
200
201    fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
202        let header: u8 = self.read(&mut *reader)?;
203        let codec = Zenoh080Header::new(header);
204        codec.read(reader)
205    }
206}
207
208impl<R, const ID: u8> RCodec<(ext::PatchType<{ ID }>, bool), &mut R> for Zenoh080Header
209where
210    R: Reader,
211{
212    type Error = DidntRead;
213
214    fn read(self, reader: &mut R) -> Result<(ext::PatchType<{ ID }>, bool), Self::Error> {
215        let (ext, more): (ZExtZ64<{ ID }>, bool) = self.read(&mut *reader)?;
216        Ok((ext.into(), more))
217    }
218}