use crate::frame::{Frame, FrameCodec, FrameEncodeError, FrameKind};
use crate::utils::now;
pub struct FrameStreamEncoder<F>
where
F: FnMut(&[u8]),
{
stream_id: u32,
max_chunk_size: usize,
next_seq_id: u32,
next_kind: FrameKind,
buffer: Vec<u8>,
is_canceled: bool,
is_ended: bool,
on_emit: F,
}
impl<F> FrameStreamEncoder<F>
where
F: FnMut(&[u8]),
{
pub fn new(stream_id: u32, max_chunk_size: usize, on_emit: F) -> Self {
Self {
stream_id,
max_chunk_size,
next_seq_id: 0,
next_kind: FrameKind::Open,
buffer: Vec::new(),
is_canceled: false,
is_ended: false,
on_emit,
}
}
fn emit_frame(&mut self, frame: Frame) -> Result<usize, FrameEncodeError> {
if self.is_canceled {
return Err(FrameEncodeError::WriteAfterCancel);
} else if self.is_ended {
return Err(FrameEncodeError::WriteAfterEnd);
}
let bytes = FrameCodec::encode(&frame);
(self.on_emit)(&bytes);
Ok(bytes.len())
}
pub fn write_bytes(&mut self, data: &[u8]) -> Result<usize, FrameEncodeError> {
if self.is_canceled {
return Err(FrameEncodeError::WriteAfterCancel);
} else if self.is_ended {
return Err(FrameEncodeError::WriteAfterEnd);
}
self.buffer.extend_from_slice(data);
let mut op_written_bytes: usize = 0;
while self.buffer.len() >= self.max_chunk_size {
let chunk = self.buffer.drain(..self.max_chunk_size).collect::<Vec<_>>();
let frame = Frame {
stream_id: self.stream_id,
seq_id: self.next_seq_id,
kind: self.next_kind,
timestamp_micros: now(),
payload: chunk,
};
op_written_bytes += self.emit_frame(frame)?;
self.next_seq_id += 1;
self.next_kind = FrameKind::Data;
}
Ok(op_written_bytes)
}
pub fn flush(&mut self) -> Result<usize, FrameEncodeError> {
if self.is_canceled {
return Err(FrameEncodeError::WriteAfterCancel);
} else if self.is_ended {
return Err(FrameEncodeError::WriteAfterEnd);
}
if self.buffer.is_empty() {
return Ok(0);
}
let frame = Frame {
stream_id: self.stream_id,
seq_id: self.next_seq_id,
kind: self.next_kind,
timestamp_micros: now(),
payload: self.buffer.split_off(0),
};
let op_written_bytes = self.emit_frame(frame)?;
self.next_seq_id += 1;
self.next_kind = FrameKind::Data;
Ok(op_written_bytes)
}
pub fn end_stream(&mut self) -> Result<usize, FrameEncodeError> {
if self.is_canceled {
return Err(FrameEncodeError::WriteAfterCancel);
} else if self.is_ended {
return Err(FrameEncodeError::WriteAfterEnd);
}
let frame = Frame {
stream_id: self.stream_id,
seq_id: self.next_seq_id,
kind: FrameKind::End,
timestamp_micros: now(),
payload: self.buffer.split_off(0),
};
let op_written_bytes = self.emit_frame(frame)?;
self.is_ended = true;
Ok(op_written_bytes)
}
pub fn cancel_stream(&mut self) -> Result<usize, FrameEncodeError> {
if self.is_canceled {
return Err(FrameEncodeError::WriteAfterCancel);
} else if self.is_ended {
return Err(FrameEncodeError::WriteAfterEnd);
}
let frame = Frame {
stream_id: self.stream_id,
seq_id: self.next_seq_id,
kind: FrameKind::Cancel,
timestamp_micros: now(),
payload: Vec::new(),
};
let op_written_bytes = self.emit_frame(frame)?;
self.is_canceled = true;
Ok(op_written_bytes)
}
}