use super::*;
pub(crate) fn priv_framed(
send: PrivRawSend,
recv: PrivRawRecv,
) -> (PrivFramedSend, PrivFramedRecv) {
let send = PrivFramedSend(send);
let recv = PrivFramedRecv::new(recv);
(send, recv)
}
pub(crate) struct PrivFramedSend(PrivRawSend);
impl PrivFramedSend {
pub(crate) fn send(
&mut self,
d: Box<[u8]>,
) -> impl Future<Output = LairResult<()>> + '_ + Send {
async move {
if d.len() > MAX_FRAME {
return Err(OneErr::with_message(
"FrameOverflow",
format!("{} > {}", d.len(), MAX_FRAME),
));
}
let ltag = (d.len() as u16).to_le_bytes();
self.0.write_all(<ag).await?;
self.0.write_all(&d).await?;
Ok(())
}
}
pub(crate) fn shutdown(
&mut self,
) -> impl Future<Output = LairResult<()>> + '_ + Send {
async move { self.0.shutdown().await.map_err(OneErr::new) }
}
}
pub(crate) struct PrivFramedRecv(BoxStream<'static, LairResult<Box<[u8]>>>);
impl PrivFramedRecv {
pub fn new(recv: PrivRawRecv) -> Self {
let recv = futures::stream::try_unfold(recv, |mut recv| async move {
let mut ltag = [0; 2];
recv.read_exact(&mut ltag).await?;
let ltag = u16::from_le_bytes(ltag) as usize;
if ltag > MAX_FRAME {
return Err(OneErr::with_message(
"FrameOverflow",
format!("{} > {}", ltag, MAX_FRAME),
));
}
let mut msg = Vec::with_capacity(ltag);
#[allow(unsafe_code)]
#[allow(clippy::uninit_vec)]
unsafe {
msg.set_len(ltag);
}
recv.read_exact(&mut msg).await?;
Ok(Some((msg.into_boxed_slice(), recv)))
});
Self(recv.boxed())
}
}
impl Stream for PrivFramedRecv {
type Item = LairResult<Box<[u8]>>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.0), cx)
}
}