1use core::num::NonZeroUsize;
15
16use zenoh_buffers::{
17 reader::{BacktrackableReader, DidntRead, Reader, SiphonableReader},
18 writer::{BacktrackableWriter, DidntWrite, Writer},
19 ZBufReader,
20};
21use zenoh_protocol::{
22 core::Reliability,
23 network::{NetworkMessageExt, NetworkMessageRef},
24 transport::{
25 Fragment, FragmentHeader, Frame, FrameHeader, TransportBody, TransportMessage, TransportSn,
26 },
27};
28
29use crate::{transport::frame::FrameReader, RCodec, WCodec, Zenoh080};
30
31#[derive(Clone, Copy, Debug)]
32#[repr(u8)]
33pub enum CurrentFrame {
34 Reliable,
35 BestEffort,
36 None,
37}
38
39#[derive(Clone, Copy, Debug)]
40pub struct LatestSn {
41 pub reliable: Option<TransportSn>,
42 pub best_effort: Option<TransportSn>,
43}
44
45impl LatestSn {
46 const fn new() -> Self {
47 Self {
48 reliable: None,
49 best_effort: None,
50 }
51 }
52}
53
54#[derive(Clone, Debug)]
55pub struct Zenoh080Batch {
56 pub current_frame: CurrentFrame,
58 pub latest_sn: LatestSn,
60}
61
62impl Default for Zenoh080Batch {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl Zenoh080Batch {
69 pub const fn new() -> Self {
70 Self {
71 current_frame: CurrentFrame::None,
72 latest_sn: LatestSn::new(),
73 }
74 }
75
76 pub fn clear(&mut self) {
77 self.current_frame = CurrentFrame::None;
78 self.latest_sn = LatestSn::new();
79 }
80}
81
82#[repr(u8)]
83#[derive(Clone, Copy, Debug, PartialEq, Eq)]
84pub enum BatchError {
85 NewFrame,
86 DidntWrite,
87}
88
89impl<W> WCodec<&TransportMessage, &mut W> for &mut Zenoh080Batch
90where
91 W: Writer + BacktrackableWriter,
92 <W as BacktrackableWriter>::Mark: Copy,
93{
94 type Output = Result<(), DidntWrite>;
95
96 fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
97 let mark = writer.mark();
99
100 let codec = Zenoh080::new();
101 codec.write(&mut *writer, x).map_err(|e| {
102 writer.rewind(mark);
104 e
105 })?;
106
107 self.current_frame = CurrentFrame::None;
109
110 Ok(())
111 }
112}
113
114impl<W> WCodec<NetworkMessageRef<'_>, &mut W> for &mut Zenoh080Batch
115where
116 W: Writer + BacktrackableWriter,
117 <W as BacktrackableWriter>::Mark: Copy,
118{
119 type Output = Result<(), BatchError>;
120
121 fn write(self, writer: &mut W, x: NetworkMessageRef) -> Self::Output {
122 if let (CurrentFrame::Reliable, false)
124 | (CurrentFrame::BestEffort, true)
125 | (CurrentFrame::None, _) = (self.current_frame, x.is_reliable())
126 {
127 return Err(BatchError::NewFrame);
129 }
130
131 let mark = writer.mark();
133
134 let codec = Zenoh080::new();
135 codec.write(&mut *writer, x).map_err(|_| {
136 writer.rewind(mark);
138 BatchError::DidntWrite
139 })
140 }
141}
142
143impl<W> WCodec<(NetworkMessageRef<'_>, &FrameHeader), &mut W> for &mut Zenoh080Batch
144where
145 W: Writer + BacktrackableWriter,
146 <W as BacktrackableWriter>::Mark: Copy,
147{
148 type Output = Result<(), BatchError>;
149
150 fn write(self, writer: &mut W, x: (NetworkMessageRef, &FrameHeader)) -> Self::Output {
151 let (m, f) = x;
152
153 if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
154 (f.reliability, m.is_reliable())
155 {
156 return Err(BatchError::NewFrame);
158 }
159
160 let mark = writer.mark();
162
163 let codec = Zenoh080::new();
164 codec.write(&mut *writer, f).map_err(|_| {
166 writer.rewind(mark);
168 BatchError::DidntWrite
169 })?;
170 codec.write(&mut *writer, m).map_err(|_| {
172 writer.rewind(mark);
174 BatchError::DidntWrite
175 })?;
176 self.current_frame = match f.reliability {
178 Reliability::Reliable => {
179 self.latest_sn.reliable = Some(f.sn);
180 CurrentFrame::Reliable
181 }
182 Reliability::BestEffort => {
183 self.latest_sn.best_effort = Some(f.sn);
184 CurrentFrame::BestEffort
185 }
186 };
187 Ok(())
188 }
189}
190
191impl<W> WCodec<(&mut ZBufReader<'_>, &mut FragmentHeader), &mut W> for &mut Zenoh080Batch
192where
193 W: Writer + BacktrackableWriter,
194 <W as BacktrackableWriter>::Mark: Copy,
195{
196 type Output = Result<NonZeroUsize, DidntWrite>;
197
198 fn write(self, writer: &mut W, x: (&mut ZBufReader<'_>, &mut FragmentHeader)) -> Self::Output {
199 let (r, f) = x;
200
201 let mark = writer.mark();
203
204 let codec = Zenoh080::new();
205 codec.write(&mut *writer, &*f).map_err(|e| {
207 writer.rewind(mark);
209 e
210 })?;
211
212 if r.remaining() <= writer.remaining() {
214 writer.rewind(mark);
216 f.more = false;
218 codec.write(&mut *writer, &*f).map_err(|e| {
220 writer.rewind(mark);
222 e
223 })?;
224 }
225
226 r.siphon(&mut *writer).map_err(|_| {
228 writer.rewind(mark);
230 DidntWrite
231 })
232 }
233}
234
235impl<R> RCodec<TransportMessage, &mut R> for &mut Zenoh080Batch
236where
237 R: Reader + BacktrackableReader,
238{
239 type Error = DidntRead;
240
241 fn read(self, reader: &mut R) -> Result<TransportMessage, Self::Error> {
242 let codec = Zenoh080::new();
243 let x: TransportMessage = codec.read(reader)?;
244
245 match &x.body {
246 TransportBody::Frame(Frame {
247 reliability, sn, ..
248 })
249 | TransportBody::Fragment(Fragment {
250 reliability, sn, ..
251 }) => match reliability {
252 Reliability::Reliable => {
253 self.current_frame = CurrentFrame::Reliable;
254 self.latest_sn.reliable = Some(*sn);
255 }
256 Reliability::BestEffort => {
257 self.current_frame = CurrentFrame::BestEffort;
258 self.latest_sn.best_effort = Some(*sn);
259 }
260 },
261 _ => self.current_frame = CurrentFrame::None,
262 }
263
264 Ok(x)
265 }
266}
267
268impl<'a, R: BacktrackableReader> RCodec<FrameReader<'a, R>, &'a mut R> for &mut Zenoh080Batch {
269 type Error = DidntRead;
270
271 fn read(self, reader: &'a mut R) -> Result<FrameReader<'a, R>, Self::Error> {
272 let codec = Zenoh080::new();
273 let frame: FrameReader<R> = codec.read(reader)?;
274 match frame.reliability {
275 Reliability::Reliable => {
276 self.current_frame = CurrentFrame::Reliable;
277 self.latest_sn.reliable = Some(frame.sn);
278 }
279 Reliability::BestEffort => {
280 self.current_frame = CurrentFrame::BestEffort;
281 self.latest_sn.best_effort = Some(frame.sn);
282 }
283 }
284 Ok(frame)
285 }
286}