use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use crate::models::encoders::tlv::protocol::TLVEncoder;
use crate::models::frames::tlv_frame::TLVFrame;
use crate::traits::frame_encoder::FrameEncoder;
use crate::traits::stream_buffer::StreamBuffer;
use futures_core::Stream;
pub struct TLVStreamWriter<B: StreamBuffer> {
out_frames: VecDeque<B>,
finished: bool,
waker: Option<Waker>,
global_offset: usize,
}
impl<B: StreamBuffer + 'static> TLVStreamWriter<B> {
pub fn new() -> Self {
Self {
out_frames: VecDeque::new(),
finished: false,
waker: None,
global_offset: 0,
}
}
pub fn write_frame(&mut self, t: u8, value: &[u8]) -> io::Result<()> {
let frame = TLVFrame { t, value };
let (encoded, _) = TLVEncoder::encode::<B>(&mut self.global_offset, &frame)?;
self.out_frames.push_back(encoded);
if let Some(waker) = self.waker.take() {
waker.wake();
}
Ok(())
}
pub fn finish(&mut self) {
self.finished = true;
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}
impl<B> Stream for TLVStreamWriter<B>
where
B: StreamBuffer + Unpin + 'static,
{
type Item = io::Result<B>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(frame) = this.out_frames.pop_front() {
Poll::Ready(Some(Ok(frame)))
} else if this.finished {
Poll::Ready(None)
} else {
this.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_core::Stream;
use minarrow::Vec64;
use std::pin::Pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
fn dummy_waker() -> Waker {
fn no_op(_: *const ()) {}
static VTABLE: RawWakerVTable =
RawWakerVTable::new(|_| dummy_raw_waker(), no_op, no_op, no_op);
fn dummy_raw_waker() -> RawWaker {
RawWaker::new(std::ptr::null(), &VTABLE)
}
unsafe { Waker::from_raw(dummy_raw_waker()) }
}
#[test]
fn test_tlv_stream_writer_basic() {
let mut writer = TLVStreamWriter::<Vec64<u8>>::new();
writer.write_frame(1, &[0x01, 0x02]).unwrap();
writer.write_frame(2, &[0xAB, 0xCD, 0xEF]).unwrap();
writer.finish();
let mut pin_writer = Pin::new(&mut writer);
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
match pin_writer.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(Ok(frame))) => {
assert_eq!(frame[0], 1); assert_eq!(&frame[1..5], &(2u32.to_le_bytes())); assert_eq!(&frame[5..7], &[0x01, 0x02]); }
_ => panic!("expected frame 1"),
}
match pin_writer.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(Ok(frame))) => {
assert_eq!(frame[0], 2); assert_eq!(&frame[1..5], &(3u32.to_le_bytes())); assert_eq!(&frame[5..8], &[0xAB, 0xCD, 0xEF]); }
_ => panic!("expected frame 2"),
}
match pin_writer.as_mut().poll_next(&mut cx) {
Poll::Ready(None) => {}
_ => panic!("expected stream end"),
}
}
#[test]
fn test_tlv_stream_writer_empty_finish() {
let mut writer = TLVStreamWriter::<Vec64<u8>>::new();
writer.finish();
let mut pin_writer = Pin::new(&mut writer);
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
match pin_writer.as_mut().poll_next(&mut cx) {
Poll::Ready(None) => {}
_ => panic!("expected immediate end for empty writer"),
}
}
}