use crate::{byte_converter::FromByteSlice, converting_callback_receiver::*, low_level_traits::*};
use std::{marker::PhantomData, time::Duration};
pub struct ConvertingHighLevelCallbackReceiver<
PayloadT: Default + Copy + Clone,
ResultT,
T: FromByteSlice + LowLevelRead<PayloadT, ResultT>,
> {
receiver: ConvertingCallbackReceiver<T>,
buf: Vec<PayloadT>,
currently_receiving_stream: bool,
message_length: usize,
chunk_offset: usize,
phantom: PhantomData<ResultT>,
}
impl<PayloadT: Default + Copy + Clone, ResultT, T: FromByteSlice + LowLevelRead<PayloadT, ResultT>>
ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T>
{
pub fn new(receiver: ConvertingCallbackReceiver<T>) -> ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T> {
ConvertingHighLevelCallbackReceiver {
receiver,
phantom: PhantomData,
buf: Vec::with_capacity(0),
currently_receiving_stream: false,
message_length: 0,
chunk_offset: 0,
}
}
fn recv_stream_chunk(&mut self, chunk: &T) -> Option<(Vec<PayloadT>, ResultT)> {
if !self.currently_receiving_stream && chunk.ll_message_chunk_offset() != 0 {
return None;
}
if self.currently_receiving_stream
&& (chunk.ll_message_chunk_offset() != self.chunk_offset || chunk.ll_message_length() != self.message_length)
{
return None;
}
if !self.currently_receiving_stream {
self.currently_receiving_stream = true;
self.message_length = chunk.ll_message_length();
self.chunk_offset = 0;
self.buf = vec![PayloadT::default(); self.message_length];
}
let read_length = std::cmp::min(chunk.ll_message_chunk_data().len(), self.message_length - self.chunk_offset);
self.buf[self.chunk_offset..self.chunk_offset + read_length].copy_from_slice(&chunk.ll_message_chunk_data()[0..read_length]);
self.chunk_offset += read_length;
if self.chunk_offset >= self.message_length {
self.currently_receiving_stream = false;
return Some((self.buf.clone(), chunk.get_result()));
}
None
}
pub fn try_recv(&mut self) -> Result<(Vec<PayloadT>, ResultT), CallbackTryRecvError> {
loop {
let ll_result = self.receiver.try_recv()?;
let result_opt = self.recv_stream_chunk(&ll_result);
if let Some(tup) = result_opt {
return Ok(tup);
}
}
}
pub fn recv_forever(&mut self) -> Result<(Vec<PayloadT>, ResultT), CallbackRecvError> {
loop {
let ll_result = self.receiver.recv_forever()?;
let result_opt = self.recv_stream_chunk(&ll_result);
if let Some(tup) = result_opt {
return Ok(tup);
}
}
}
pub fn recv_timeout(&mut self, timeout: Duration) -> Result<(Vec<PayloadT>, ResultT), CallbackRecvTimeoutError> {
loop {
let ll_result = self.receiver.recv_timeout(timeout)?;
let result_opt = self.recv_stream_chunk(&ll_result);
if let Some(tup) = result_opt {
return Ok(tup);
}
}
}
}
impl<PayloadT: Default + Copy + Clone, ResultT, T: FromByteSlice + LowLevelRead<PayloadT, ResultT>> Iterator
for ConvertingHighLevelCallbackReceiver<PayloadT, ResultT, T>
{
type Item = Option<(Vec<PayloadT>, ResultT)>;
fn next(&mut self) -> Option<Option<(Vec<PayloadT>, ResultT)>> {
match self.recv_forever() {
Ok(result) => Some(Some(result)),
Err(CallbackRecvError::MalformedPacket) => Some(None),
Err(_e) => None,
}
}
}