#![cfg(feature = "std")]
use std::io::{ErrorKind, Read, Write};
use std::sync::mpsc::{Receiver, SyncSender};
use alloc::vec::Vec;
pub enum ChannelEgress {
Data(Vec<u8>),
Eof,
Close,
}
pub struct ChannelStream {
rx: Option<Receiver<Option<Vec<u8>>>>,
tx: Option<SyncSender<ChannelEgress>>,
buf: Vec<u8>,
rx_eof: bool,
}
impl ChannelStream {
pub(crate) fn new(rx: Receiver<Option<Vec<u8>>>, tx: SyncSender<ChannelEgress>) -> Self {
Self {
rx: Some(rx),
tx: Some(tx),
buf: Vec::new(),
rx_eof: false,
}
}
pub fn send_eof(&mut self) -> std::io::Result<()> {
let tx = self
.tx
.as_ref()
.ok_or_else(|| std::io::Error::new(ErrorKind::BrokenPipe, "channel closed"))?;
tx.send(ChannelEgress::Eof)
.map_err(|_| std::io::Error::new(ErrorKind::BrokenPipe, "channel closed"))
}
pub fn into_raw(mut self) -> (Receiver<Option<Vec<u8>>>, SyncSender<ChannelEgress>) {
let rx = self
.rx
.take()
.expect("ChannelStream::into_raw called twice");
let tx = self
.tx
.take()
.expect("ChannelStream::into_raw called twice");
(rx, tx)
}
}
impl Read for ChannelStream {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
if !self.buf.is_empty() {
let n = out.len().min(self.buf.len());
out[..n].copy_from_slice(&self.buf[..n]);
self.buf.drain(..n);
return Ok(n);
}
if self.rx_eof {
return Ok(0);
}
let rx = self
.rx
.as_ref()
.ok_or_else(|| std::io::Error::new(ErrorKind::BrokenPipe, "channel taken"))?;
match rx.recv() {
Ok(Some(chunk)) => {
self.buf = chunk;
self.read(out)
}
Ok(None) | Err(_) => {
self.rx_eof = true;
Ok(0)
}
}
}
}
impl Write for ChannelStream {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
let tx = self
.tx
.as_ref()
.ok_or_else(|| std::io::Error::new(ErrorKind::BrokenPipe, "channel taken"))?;
let take = data.len().min(32 * 1024);
let chunk = data[..take].to_vec();
tx.send(ChannelEgress::Data(chunk))
.map_err(|_| std::io::Error::new(ErrorKind::BrokenPipe, "channel closed"))?;
Ok(take)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Drop for ChannelStream {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(ChannelEgress::Eof);
let _ = tx.send(ChannelEgress::Close);
}
}
}