zenoh_codec/transport/
batch.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use core::num::NonZeroUsize;
15
16use zenoh_buffers::{
17    reader::{BacktrackableReader, DidntRead, Reader, SiphonableReader},
18    writer::{BacktrackableWriter, DidntWrite, Writer},
19    ZBufReader,
20};
21use zenoh_protocol::{
22    core::Reliability,
23    network::{NetworkMessageExt, NetworkMessageRef},
24    transport::{
25        Fragment, FragmentHeader, Frame, FrameHeader, TransportBody, TransportMessage, TransportSn,
26    },
27};
28
29use crate::{transport::frame::FrameReader, RCodec, WCodec, Zenoh080};
30
31#[derive(Clone, Copy, Debug)]
32#[repr(u8)]
33pub enum CurrentFrame {
34    Reliable,
35    BestEffort,
36    None,
37}
38
39#[derive(Clone, Copy, Debug)]
40pub struct LatestSn {
41    pub reliable: Option<TransportSn>,
42    pub best_effort: Option<TransportSn>,
43}
44
45impl LatestSn {
46    const fn new() -> Self {
47        Self {
48            reliable: None,
49            best_effort: None,
50        }
51    }
52}
53
54#[derive(Clone, Debug)]
55pub struct Zenoh080Batch {
56    // The current frame being serialized: BestEffort/Reliable
57    pub current_frame: CurrentFrame,
58    // The latest SN
59    pub latest_sn: LatestSn,
60}
61
62impl Default for Zenoh080Batch {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl Zenoh080Batch {
69    pub const fn new() -> Self {
70        Self {
71            current_frame: CurrentFrame::None,
72            latest_sn: LatestSn::new(),
73        }
74    }
75
76    pub fn clear(&mut self) {
77        self.current_frame = CurrentFrame::None;
78        self.latest_sn = LatestSn::new();
79    }
80}
81
82#[repr(u8)]
83#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum BatchError {
85    NewFrame,
86    DidntWrite,
87}
88
89impl<W> WCodec<&TransportMessage, &mut W> for &mut Zenoh080Batch
90where
91    W: Writer + BacktrackableWriter,
92    <W as BacktrackableWriter>::Mark: Copy,
93{
94    type Output = Result<(), DidntWrite>;
95
96    fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
97        // Mark the write operation
98        let mark = writer.mark();
99
100        let codec = Zenoh080::new();
101        codec.write(&mut *writer, x).map_err(|e| {
102            // Revert the write operation
103            writer.rewind(mark);
104            e
105        })?;
106
107        // Reset the current frame value
108        self.current_frame = CurrentFrame::None;
109
110        Ok(())
111    }
112}
113
114impl<W> WCodec<NetworkMessageRef<'_>, &mut W> for &mut Zenoh080Batch
115where
116    W: Writer + BacktrackableWriter,
117    <W as BacktrackableWriter>::Mark: Copy,
118{
119    type Output = Result<(), BatchError>;
120
121    fn write(self, writer: &mut W, x: NetworkMessageRef) -> Self::Output {
122        // Eventually update the current frame and sn based on the current status
123        if let (CurrentFrame::Reliable, false)
124        | (CurrentFrame::BestEffort, true)
125        | (CurrentFrame::None, _) = (self.current_frame, x.is_reliable())
126        {
127            // We are not serializing on the right frame.
128            return Err(BatchError::NewFrame);
129        }
130
131        // Mark the write operation
132        let mark = writer.mark();
133
134        let codec = Zenoh080::new();
135        codec.write(&mut *writer, x).map_err(|_| {
136            // Revert the write operation
137            writer.rewind(mark);
138            BatchError::DidntWrite
139        })
140    }
141}
142
143impl<W> WCodec<(NetworkMessageRef<'_>, &FrameHeader), &mut W> for &mut Zenoh080Batch
144where
145    W: Writer + BacktrackableWriter,
146    <W as BacktrackableWriter>::Mark: Copy,
147{
148    type Output = Result<(), BatchError>;
149
150    fn write(self, writer: &mut W, x: (NetworkMessageRef, &FrameHeader)) -> Self::Output {
151        let (m, f) = x;
152
153        if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
154            (f.reliability, m.is_reliable())
155        {
156            // We are not serializing on the right frame.
157            return Err(BatchError::NewFrame);
158        }
159
160        // Mark the write operation
161        let mark = writer.mark();
162
163        let codec = Zenoh080::new();
164        // Write the frame header
165        codec.write(&mut *writer, f).map_err(|_| {
166            // Revert the write operation
167            writer.rewind(mark);
168            BatchError::DidntWrite
169        })?;
170        // Write the zenoh message
171        codec.write(&mut *writer, m).map_err(|_| {
172            // Revert the write operation
173            writer.rewind(mark);
174            BatchError::DidntWrite
175        })?;
176        // Update the frame
177        self.current_frame = match f.reliability {
178            Reliability::Reliable => {
179                self.latest_sn.reliable = Some(f.sn);
180                CurrentFrame::Reliable
181            }
182            Reliability::BestEffort => {
183                self.latest_sn.best_effort = Some(f.sn);
184                CurrentFrame::BestEffort
185            }
186        };
187        Ok(())
188    }
189}
190
191impl<W> WCodec<(&mut ZBufReader<'_>, &mut FragmentHeader), &mut W> for &mut Zenoh080Batch
192where
193    W: Writer + BacktrackableWriter,
194    <W as BacktrackableWriter>::Mark: Copy,
195{
196    type Output = Result<NonZeroUsize, DidntWrite>;
197
198    fn write(self, writer: &mut W, x: (&mut ZBufReader<'_>, &mut FragmentHeader)) -> Self::Output {
199        let (r, f) = x;
200
201        // Mark the buffer for the writing operation
202        let mark = writer.mark();
203
204        let codec = Zenoh080::new();
205        // Write the fragment header
206        codec.write(&mut *writer, &*f).map_err(|e| {
207            // Revert the write operation
208            writer.rewind(mark);
209            e
210        })?;
211
212        // Check if it is really the final fragment
213        if r.remaining() <= writer.remaining() {
214            // Revert the buffer
215            writer.rewind(mark);
216            // It is really the finally fragment, reserialize the header
217            f.more = false;
218            // Write the fragment header
219            codec.write(&mut *writer, &*f).map_err(|e| {
220                // Revert the write operation
221                writer.rewind(mark);
222                e
223            })?;
224        }
225
226        // Write the fragment
227        r.siphon(&mut *writer).map_err(|_| {
228            // Revert the write operation
229            writer.rewind(mark);
230            DidntWrite
231        })
232    }
233}
234
235impl<R> RCodec<TransportMessage, &mut R> for &mut Zenoh080Batch
236where
237    R: Reader + BacktrackableReader,
238{
239    type Error = DidntRead;
240
241    fn read(self, reader: &mut R) -> Result<TransportMessage, Self::Error> {
242        let codec = Zenoh080::new();
243        let x: TransportMessage = codec.read(reader)?;
244
245        match &x.body {
246            TransportBody::Frame(Frame {
247                reliability, sn, ..
248            })
249            | TransportBody::Fragment(Fragment {
250                reliability, sn, ..
251            }) => match reliability {
252                Reliability::Reliable => {
253                    self.current_frame = CurrentFrame::Reliable;
254                    self.latest_sn.reliable = Some(*sn);
255                }
256                Reliability::BestEffort => {
257                    self.current_frame = CurrentFrame::BestEffort;
258                    self.latest_sn.best_effort = Some(*sn);
259                }
260            },
261            _ => self.current_frame = CurrentFrame::None,
262        }
263
264        Ok(x)
265    }
266}
267
268impl<'a, R: BacktrackableReader> RCodec<FrameReader<'a, R>, &'a mut R> for &mut Zenoh080Batch {
269    type Error = DidntRead;
270
271    fn read(self, reader: &'a mut R) -> Result<FrameReader<'a, R>, Self::Error> {
272        let codec = Zenoh080::new();
273        let frame: FrameReader<R> = codec.read(reader)?;
274        match frame.reliability {
275            Reliability::Reliable => {
276                self.current_frame = CurrentFrame::Reliable;
277                self.latest_sn.reliable = Some(frame.sn);
278            }
279            Reliability::BestEffort => {
280                self.current_frame = CurrentFrame::BestEffort;
281                self.latest_sn.best_effort = Some(frame.sn);
282            }
283        }
284        Ok(frame)
285    }
286}