1use core::num::NonZeroUsize;
15
16use zenoh_buffers::{
17 reader::{BacktrackableReader, DidntRead, Reader, SiphonableReader},
18 writer::{BacktrackableWriter, DidntWrite, Writer},
19 ZBufReader,
20};
21use zenoh_protocol::{
22 core::Reliability,
23 network::{NetworkMessageExt, NetworkMessageRef},
24 transport::{
25 Fragment, FragmentHeader, Frame, FrameHeader, TransportBody, TransportMessage, TransportSn,
26 },
27};
28
29use crate::{transport::frame::FrameReader, RCodec, WCodec, Zenoh080};
30
31#[derive(Clone, Copy, Debug)]
32#[repr(u8)]
33pub enum CurrentFrame {
34 Reliable,
35 BestEffort,
36 None,
37}
38
39#[derive(Clone, Copy, Debug)]
40pub struct LatestSn {
41 pub reliable: Option<TransportSn>,
42 pub best_effort: Option<TransportSn>,
43}
44
45impl LatestSn {
46 const fn new() -> Self {
47 Self {
48 reliable: None,
49 best_effort: None,
50 }
51 }
52}
53
54#[derive(Clone, Debug)]
55pub struct Zenoh080Batch {
56 pub current_frame: CurrentFrame,
58 pub latest_sn: LatestSn,
60}
61
62impl Default for Zenoh080Batch {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl Zenoh080Batch {
69 pub const fn new() -> Self {
70 Self {
71 current_frame: CurrentFrame::None,
72 latest_sn: LatestSn::new(),
73 }
74 }
75
76 pub fn clear(&mut self) {
77 self.current_frame = CurrentFrame::None;
78 self.latest_sn = LatestSn::new();
79 }
80}
81
82#[repr(u8)]
83#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum BatchError {
85 NewFrame,
86 DidntWrite,
87}
88
89impl<W> WCodec<&TransportMessage, &mut W> for &mut Zenoh080Batch
90where
91 W: Writer + BacktrackableWriter,
92 <W as BacktrackableWriter>::Mark: Copy,
93{
94 type Output = Result<(), DidntWrite>;
95
96 fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
97 let mark = writer.mark();
99
100 let codec = Zenoh080::new();
101 codec.write(&mut *writer, x).map_err(|e| {
102 writer.rewind(mark);
104 e
105 })?;
106
107 self.current_frame = CurrentFrame::None;
109
110 Ok(())
111 }
112}
113
114impl<W> WCodec<NetworkMessageRef<'_>, &mut W> for &mut Zenoh080Batch
115where
116 W: Writer + BacktrackableWriter,
117 <W as BacktrackableWriter>::Mark: Copy,
118{
119 type Output = Result<(), BatchError>;
120
121 #[inline(always)]
122 fn write(self, writer: &mut W, x: NetworkMessageRef) -> Self::Output {
123 if let (CurrentFrame::Reliable, false)
125 | (CurrentFrame::BestEffort, true)
126 | (CurrentFrame::None, _) = (self.current_frame, x.is_reliable())
127 {
128 return Err(BatchError::NewFrame);
130 }
131
132 let mark = writer.mark();
134
135 let codec = Zenoh080::new();
136 codec.write(&mut *writer, x).map_err(|_| {
137 writer.rewind(mark);
139 BatchError::DidntWrite
140 })
141 }
142}
143
144impl<W> WCodec<(NetworkMessageRef<'_>, &FrameHeader), &mut W> for &mut Zenoh080Batch
145where
146 W: Writer + BacktrackableWriter,
147 <W as BacktrackableWriter>::Mark: Copy,
148{
149 type Output = Result<(), BatchError>;
150
151 fn write(self, writer: &mut W, x: (NetworkMessageRef, &FrameHeader)) -> Self::Output {
152 let (m, f) = x;
153
154 if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
155 (f.reliability, m.is_reliable())
156 {
157 return Err(BatchError::NewFrame);
159 }
160
161 let mark = writer.mark();
163
164 let codec = Zenoh080::new();
165 codec.write(&mut *writer, f).map_err(|_| {
167 writer.rewind(mark);
169 BatchError::DidntWrite
170 })?;
171 codec.write(&mut *writer, m).map_err(|_| {
173 writer.rewind(mark);
175 BatchError::DidntWrite
176 })?;
177 self.current_frame = match f.reliability {
179 Reliability::Reliable => {
180 self.latest_sn.reliable = Some(f.sn);
181 CurrentFrame::Reliable
182 }
183 Reliability::BestEffort => {
184 self.latest_sn.best_effort = Some(f.sn);
185 CurrentFrame::BestEffort
186 }
187 };
188 Ok(())
189 }
190}
191
192impl<W> WCodec<(&mut ZBufReader<'_>, &mut FragmentHeader), &mut W> for &mut Zenoh080Batch
193where
194 W: Writer + BacktrackableWriter,
195 <W as BacktrackableWriter>::Mark: Copy,
196{
197 type Output = Result<NonZeroUsize, DidntWrite>;
198
199 fn write(self, writer: &mut W, x: (&mut ZBufReader<'_>, &mut FragmentHeader)) -> Self::Output {
200 let (r, f) = x;
201
202 let mark = writer.mark();
204
205 let codec = Zenoh080::new();
206 codec.write(&mut *writer, &*f).map_err(|e| {
208 writer.rewind(mark);
210 e
211 })?;
212
213 if r.remaining() <= writer.remaining() {
215 writer.rewind(mark);
217 f.more = false;
219 codec.write(&mut *writer, &*f).map_err(|e| {
221 writer.rewind(mark);
223 e
224 })?;
225 }
226
227 r.siphon(&mut *writer).map_err(|_| {
229 writer.rewind(mark);
231 DidntWrite
232 })
233 }
234}
235
236impl<R> RCodec<TransportMessage, &mut R> for &mut Zenoh080Batch
237where
238 R: Reader + BacktrackableReader,
239{
240 type Error = DidntRead;
241
242 fn read(self, reader: &mut R) -> Result<TransportMessage, Self::Error> {
243 let codec = Zenoh080::new();
244 let x: TransportMessage = codec.read(reader)?;
245
246 match &x.body {
247 TransportBody::Frame(Frame {
248 reliability, sn, ..
249 })
250 | TransportBody::Fragment(Fragment {
251 reliability, sn, ..
252 }) => match reliability {
253 Reliability::Reliable => {
254 self.current_frame = CurrentFrame::Reliable;
255 self.latest_sn.reliable = Some(*sn);
256 }
257 Reliability::BestEffort => {
258 self.current_frame = CurrentFrame::BestEffort;
259 self.latest_sn.best_effort = Some(*sn);
260 }
261 },
262 _ => self.current_frame = CurrentFrame::None,
263 }
264
265 Ok(x)
266 }
267}
268
269impl<'a, R: BacktrackableReader> RCodec<FrameReader<'a, R>, &'a mut R> for &mut Zenoh080Batch {
270 type Error = DidntRead;
271
272 fn read(self, reader: &'a mut R) -> Result<FrameReader<'a, R>, Self::Error> {
273 let codec = Zenoh080::new();
274 let frame: FrameReader<R> = codec.read(reader)?;
275 match frame.reliability {
276 Reliability::Reliable => {
277 self.current_frame = CurrentFrame::Reliable;
278 self.latest_sn.reliable = Some(frame.sn);
279 }
280 Reliability::BestEffort => {
281 self.current_frame = CurrentFrame::BestEffort;
282 self.latest_sn.best_effort = Some(frame.sn);
283 }
284 }
285 Ok(frame)
286 }
287}