Skip to main content

zenoh_codec/transport/
batch.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use core::num::NonZeroUsize;
15
16use zenoh_buffers::{
17    reader::{BacktrackableReader, DidntRead, Reader, SiphonableReader},
18    writer::{BacktrackableWriter, DidntWrite, Writer},
19    ZBufReader,
20};
21use zenoh_protocol::{
22    core::Reliability,
23    network::{NetworkMessageExt, NetworkMessageRef},
24    transport::{
25        Fragment, FragmentHeader, Frame, FrameHeader, TransportBody, TransportMessage, TransportSn,
26    },
27};
28
29use crate::{transport::frame::FrameReader, RCodec, WCodec, Zenoh080};
30
31#[derive(Clone, Copy, Debug)]
32#[repr(u8)]
33pub enum CurrentFrame {
34    Reliable,
35    BestEffort,
36    None,
37}
38
39#[derive(Clone, Copy, Debug)]
40pub struct LatestSn {
41    pub reliable: Option<TransportSn>,
42    pub best_effort: Option<TransportSn>,
43}
44
45impl LatestSn {
46    const fn new() -> Self {
47        Self {
48            reliable: None,
49            best_effort: None,
50        }
51    }
52}
53
54#[derive(Clone, Debug)]
55pub struct Zenoh080Batch {
56    // The current frame being serialized: BestEffort/Reliable
57    pub current_frame: CurrentFrame,
58    // The latest SN
59    pub latest_sn: LatestSn,
60}
61
62impl Default for Zenoh080Batch {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl Zenoh080Batch {
69    pub const fn new() -> Self {
70        Self {
71            current_frame: CurrentFrame::None,
72            latest_sn: LatestSn::new(),
73        }
74    }
75
76    pub fn clear(&mut self) {
77        self.current_frame = CurrentFrame::None;
78        self.latest_sn = LatestSn::new();
79    }
80}
81
82#[repr(u8)]
83#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum BatchError {
85    NewFrame,
86    DidntWrite,
87}
88
89impl<W> WCodec<&TransportMessage, &mut W> for &mut Zenoh080Batch
90where
91    W: Writer + BacktrackableWriter,
92    <W as BacktrackableWriter>::Mark: Copy,
93{
94    type Output = Result<(), DidntWrite>;
95
96    fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
97        // Mark the write operation
98        let mark = writer.mark();
99
100        let codec = Zenoh080::new();
101        codec.write(&mut *writer, x).map_err(|e| {
102            // Revert the write operation
103            writer.rewind(mark);
104            e
105        })?;
106
107        // Reset the current frame value
108        self.current_frame = CurrentFrame::None;
109
110        Ok(())
111    }
112}
113
114impl<W> WCodec<NetworkMessageRef<'_>, &mut W> for &mut Zenoh080Batch
115where
116    W: Writer + BacktrackableWriter,
117    <W as BacktrackableWriter>::Mark: Copy,
118{
119    type Output = Result<(), BatchError>;
120
121    #[inline(always)]
122    fn write(self, writer: &mut W, x: NetworkMessageRef) -> Self::Output {
123        // Eventually update the current frame and sn based on the current status
124        if let (CurrentFrame::Reliable, false)
125        | (CurrentFrame::BestEffort, true)
126        | (CurrentFrame::None, _) = (self.current_frame, x.is_reliable())
127        {
128            // We are not serializing on the right frame.
129            return Err(BatchError::NewFrame);
130        }
131
132        // Mark the write operation
133        let mark = writer.mark();
134
135        let codec = Zenoh080::new();
136        codec.write(&mut *writer, x).map_err(|_| {
137            // Revert the write operation
138            writer.rewind(mark);
139            BatchError::DidntWrite
140        })
141    }
142}
143
144impl<W> WCodec<(NetworkMessageRef<'_>, &FrameHeader), &mut W> for &mut Zenoh080Batch
145where
146    W: Writer + BacktrackableWriter,
147    <W as BacktrackableWriter>::Mark: Copy,
148{
149    type Output = Result<(), BatchError>;
150
151    fn write(self, writer: &mut W, x: (NetworkMessageRef, &FrameHeader)) -> Self::Output {
152        let (m, f) = x;
153
154        if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
155            (f.reliability, m.is_reliable())
156        {
157            // We are not serializing on the right frame.
158            return Err(BatchError::NewFrame);
159        }
160
161        // Mark the write operation
162        let mark = writer.mark();
163
164        let codec = Zenoh080::new();
165        // Write the frame header
166        codec.write(&mut *writer, f).map_err(|_| {
167            // Revert the write operation
168            writer.rewind(mark);
169            BatchError::DidntWrite
170        })?;
171        // Write the zenoh message
172        codec.write(&mut *writer, m).map_err(|_| {
173            // Revert the write operation
174            writer.rewind(mark);
175            BatchError::DidntWrite
176        })?;
177        // Update the frame
178        self.current_frame = match f.reliability {
179            Reliability::Reliable => {
180                self.latest_sn.reliable = Some(f.sn);
181                CurrentFrame::Reliable
182            }
183            Reliability::BestEffort => {
184                self.latest_sn.best_effort = Some(f.sn);
185                CurrentFrame::BestEffort
186            }
187        };
188        Ok(())
189    }
190}
191
192impl<W> WCodec<(&mut ZBufReader<'_>, &mut FragmentHeader), &mut W> for &mut Zenoh080Batch
193where
194    W: Writer + BacktrackableWriter,
195    <W as BacktrackableWriter>::Mark: Copy,
196{
197    type Output = Result<NonZeroUsize, DidntWrite>;
198
199    fn write(self, writer: &mut W, x: (&mut ZBufReader<'_>, &mut FragmentHeader)) -> Self::Output {
200        let (r, f) = x;
201
202        // Mark the buffer for the writing operation
203        let mark = writer.mark();
204
205        let codec = Zenoh080::new();
206        // Write the fragment header
207        codec.write(&mut *writer, &*f).map_err(|e| {
208            // Revert the write operation
209            writer.rewind(mark);
210            e
211        })?;
212
213        // Check if it is really the final fragment
214        if r.remaining() <= writer.remaining() {
215            // Revert the buffer
216            writer.rewind(mark);
217            // It is really the finally fragment, reserialize the header
218            f.more = false;
219            // Write the fragment header
220            codec.write(&mut *writer, &*f).map_err(|e| {
221                // Revert the write operation
222                writer.rewind(mark);
223                e
224            })?;
225        }
226
227        // Write the fragment
228        r.siphon(&mut *writer).map_err(|_| {
229            // Revert the write operation
230            writer.rewind(mark);
231            DidntWrite
232        })
233    }
234}
235
236impl<R> RCodec<TransportMessage, &mut R> for &mut Zenoh080Batch
237where
238    R: Reader + BacktrackableReader,
239{
240    type Error = DidntRead;
241
242    fn read(self, reader: &mut R) -> Result<TransportMessage, Self::Error> {
243        let codec = Zenoh080::new();
244        let x: TransportMessage = codec.read(reader)?;
245
246        match &x.body {
247            TransportBody::Frame(Frame {
248                reliability, sn, ..
249            })
250            | TransportBody::Fragment(Fragment {
251                reliability, sn, ..
252            }) => match reliability {
253                Reliability::Reliable => {
254                    self.current_frame = CurrentFrame::Reliable;
255                    self.latest_sn.reliable = Some(*sn);
256                }
257                Reliability::BestEffort => {
258                    self.current_frame = CurrentFrame::BestEffort;
259                    self.latest_sn.best_effort = Some(*sn);
260                }
261            },
262            _ => self.current_frame = CurrentFrame::None,
263        }
264
265        Ok(x)
266    }
267}
268
269impl<'a, R: BacktrackableReader> RCodec<FrameReader<'a, R>, &'a mut R> for &mut Zenoh080Batch {
270    type Error = DidntRead;
271
272    fn read(self, reader: &'a mut R) -> Result<FrameReader<'a, R>, Self::Error> {
273        let codec = Zenoh080::new();
274        let frame: FrameReader<R> = codec.read(reader)?;
275        match frame.reliability {
276            Reliability::Reliable => {
277                self.current_frame = CurrentFrame::Reliable;
278                self.latest_sn.reliable = Some(frame.sn);
279            }
280            Reliability::BestEffort => {
281                self.current_frame = CurrentFrame::BestEffort;
282                self.latest_sn.best_effort = Some(frame.sn);
283            }
284        }
285        Ok(frame)
286    }
287}