use core::num::NonZeroUsize;
use zenoh_buffers::{
reader::{BacktrackableReader, DidntRead, Reader, SiphonableReader},
writer::{BacktrackableWriter, DidntWrite, Writer},
ZBufReader,
};
use zenoh_protocol::{
core::Reliability,
network::NetworkMessage,
transport::{
Fragment, FragmentHeader, Frame, FrameHeader, TransportBody, TransportMessage, TransportSn,
},
};
use crate::{RCodec, WCodec, Zenoh080};
#[derive(Clone, Copy, Debug)]
#[repr(u8)]
pub enum CurrentFrame {
Reliable,
BestEffort,
None,
}
#[derive(Clone, Copy, Debug)]
pub struct LatestSn {
pub reliable: Option<TransportSn>,
pub best_effort: Option<TransportSn>,
}
impl LatestSn {
const fn new() -> Self {
Self {
reliable: None,
best_effort: None,
}
}
}
#[derive(Clone, Debug)]
pub struct Zenoh080Batch {
pub current_frame: CurrentFrame,
pub latest_sn: LatestSn,
}
impl Default for Zenoh080Batch {
fn default() -> Self {
Self::new()
}
}
impl Zenoh080Batch {
pub const fn new() -> Self {
Self {
current_frame: CurrentFrame::None,
latest_sn: LatestSn::new(),
}
}
pub fn clear(&mut self) {
self.current_frame = CurrentFrame::None;
self.latest_sn = LatestSn::new();
}
}
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BatchError {
NewFrame,
DidntWrite,
}
impl<W> WCodec<&TransportMessage, &mut W> for &mut Zenoh080Batch
where
W: Writer + BacktrackableWriter,
<W as BacktrackableWriter>::Mark: Copy,
{
type Output = Result<(), DidntWrite>;
fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
let mark = writer.mark();
let codec = Zenoh080::new();
codec.write(&mut *writer, x).map_err(|e| {
writer.rewind(mark);
e
})?;
self.current_frame = CurrentFrame::None;
Ok(())
}
}
impl<W> WCodec<&NetworkMessage, &mut W> for &mut Zenoh080Batch
where
W: Writer + BacktrackableWriter,
<W as BacktrackableWriter>::Mark: Copy,
{
type Output = Result<(), BatchError>;
fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
if let (CurrentFrame::Reliable, false)
| (CurrentFrame::BestEffort, true)
| (CurrentFrame::None, _) = (self.current_frame, x.is_reliable())
{
return Err(BatchError::NewFrame);
}
let mark = writer.mark();
let codec = Zenoh080::new();
codec.write(&mut *writer, x).map_err(|_| {
writer.rewind(mark);
BatchError::DidntWrite
})
}
}
impl<W> WCodec<(&NetworkMessage, &FrameHeader), &mut W> for &mut Zenoh080Batch
where
W: Writer + BacktrackableWriter,
<W as BacktrackableWriter>::Mark: Copy,
{
type Output = Result<(), BatchError>;
fn write(self, writer: &mut W, x: (&NetworkMessage, &FrameHeader)) -> Self::Output {
let (m, f) = x;
if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
(f.reliability, m.is_reliable())
{
return Err(BatchError::NewFrame);
}
let mark = writer.mark();
let codec = Zenoh080::new();
codec.write(&mut *writer, f).map_err(|_| {
writer.rewind(mark);
BatchError::DidntWrite
})?;
codec.write(&mut *writer, m).map_err(|_| {
writer.rewind(mark);
BatchError::DidntWrite
})?;
self.current_frame = match f.reliability {
Reliability::Reliable => {
self.latest_sn.reliable = Some(f.sn);
CurrentFrame::Reliable
}
Reliability::BestEffort => {
self.latest_sn.best_effort = Some(f.sn);
CurrentFrame::BestEffort
}
};
Ok(())
}
}
impl<W> WCodec<(&mut ZBufReader<'_>, &mut FragmentHeader), &mut W> for &mut Zenoh080Batch
where
W: Writer + BacktrackableWriter,
<W as BacktrackableWriter>::Mark: Copy,
{
type Output = Result<NonZeroUsize, DidntWrite>;
fn write(self, writer: &mut W, x: (&mut ZBufReader<'_>, &mut FragmentHeader)) -> Self::Output {
let (r, f) = x;
let mark = writer.mark();
let codec = Zenoh080::new();
codec.write(&mut *writer, &*f).map_err(|e| {
writer.rewind(mark);
e
})?;
if r.remaining() <= writer.remaining() {
writer.rewind(mark);
f.more = false;
codec.write(&mut *writer, &*f).map_err(|e| {
writer.rewind(mark);
e
})?;
}
r.siphon(&mut *writer).map_err(|_| {
writer.rewind(mark);
DidntWrite
})
}
}
impl<R> RCodec<TransportMessage, &mut R> for &mut Zenoh080Batch
where
R: Reader + BacktrackableReader,
{
type Error = DidntRead;
fn read(self, reader: &mut R) -> Result<TransportMessage, Self::Error> {
let codec = Zenoh080::new();
let x: TransportMessage = codec.read(reader)?;
match &x.body {
TransportBody::Frame(Frame {
reliability, sn, ..
})
| TransportBody::Fragment(Fragment {
reliability, sn, ..
}) => match reliability {
Reliability::Reliable => {
self.current_frame = CurrentFrame::Reliable;
self.latest_sn.reliable = Some(*sn);
}
Reliability::BestEffort => {
self.current_frame = CurrentFrame::BestEffort;
self.latest_sn.best_effort = Some(*sn);
}
},
_ => self.current_frame = CurrentFrame::None,
}
Ok(x)
}
}