use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use minarrow::Field;
use crate::models::decoders::tlv::TLVDecoder;
use crate::models::frames::protocol_message::LightstreamMessage;
use crate::models::protocol::codec::LightstreamCodec;
use crate::models::streams::framed_byte_stream::FramedByteStream;
use crate::traits::stream_buffer::StreamBuffer;
pub struct LightstreamReader<S, B = Vec<u8>>
where
S: Stream<Item = Result<B, io::Error>> + Unpin + Send,
B: StreamBuffer,
{
framed: FramedByteStream<S, TLVDecoder<B>, B>,
codec: LightstreamCodec<B>,
}
impl<S, B> LightstreamReader<S, B>
where
S: Stream<Item = Result<B, io::Error>> + Unpin + Send,
B: StreamBuffer + Unpin,
{
pub fn new(stream: S) -> Self {
Self {
framed: FramedByteStream::new(stream, TLVDecoder::new(), 64 * 1024),
codec: LightstreamCodec::new(),
}
}
pub fn register_message(&mut self, name: impl Into<String>) -> u8 {
self.codec.register_message(name)
}
pub fn register_table(&mut self, name: impl Into<String>, schema: Vec<Field>) -> u8 {
self.codec.register_table(name, schema)
}
pub fn codec(&self) -> &LightstreamCodec<B> {
&self.codec
}
}
impl<S, B> Stream for LightstreamReader<S, B>
where
S: Stream<Item = Result<B, io::Error>> + Unpin + Send,
B: StreamBuffer + Unpin,
{
type Item = io::Result<LightstreamMessage>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.framed).poll_next(cx) {
Poll::Ready(Some(Ok(tlv_frame))) => {
let msg = this.codec.decode_frame(tlv_frame.t, tlv_frame.value.as_ref())?;
Poll::Ready(Some(Ok(msg)))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}