use std::io::{self, Write};
use crate::ok;
pub type Text = Vec<u8>;
pub struct SharedWriter {
pub buffer: Text,
pub line_state_control_channel_sender:
tokio::sync::mpsc::Sender<LineStateControlSignal>,
pub silent_error: bool,
}
#[derive(Debug, Clone)]
pub enum LineStateControlSignal {
Line(Text),
Flush,
Pause,
Resume,
SpinnerActive(tokio::sync::broadcast::Sender<()>),
SpinnerInactive,
}
impl SharedWriter {
pub fn new(line_sender: tokio::sync::mpsc::Sender<LineStateControlSignal>) -> Self {
Self {
buffer: Default::default(),
line_state_control_channel_sender: line_sender,
silent_error: false,
}
}
}
impl Clone for SharedWriter {
fn clone(&self) -> Self {
Self {
buffer: Default::default(),
line_state_control_channel_sender: self
.line_state_control_channel_sender
.clone(),
silent_error: true,
}
}
}
impl Write for SharedWriter {
fn write(&mut self, payload: &[u8]) -> io::Result<usize> {
let self_buffer = &mut self.buffer;
self_buffer.extend_from_slice(payload);
if self_buffer.ends_with(b"\n") {
match self
.line_state_control_channel_sender
.try_send(LineStateControlSignal::Line(self_buffer.clone()))
{
Ok(_) => {
self_buffer.clear();
}
Err(_) => {
if !self.silent_error {
return Err(io::Error::new(
io::ErrorKind::Other,
"SharedWriter Receiver has closed",
));
}
}
}
};
Ok(payload.len())
}
fn flush(&mut self) -> io::Result<()> {
match self
.line_state_control_channel_sender
.try_send(LineStateControlSignal::Line(self.buffer.clone()))
{
Ok(_) => {
self.buffer.clear();
}
Err(_) => {
if !self.silent_error {
return Err(io::Error::new(
io::ErrorKind::Other,
"SharedWriter Receiver has closed",
));
}
}
}
ok!()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_write() {
let (line_sender, _) = tokio::sync::mpsc::channel(1_000);
let mut shared_writer = SharedWriter::new(line_sender);
shared_writer.write_all(b"Hello, World!").unwrap();
assert_eq!(shared_writer.buffer, b"Hello, World!");
}
#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_write_flush() {
let (line_sender, mut line_receiver) = tokio::sync::mpsc::channel(1_000);
let mut shared_writer = SharedWriter::new(line_sender);
shared_writer.write_all(b"Hello, World!").unwrap();
shared_writer.flush().unwrap();
assert_eq!(shared_writer.buffer, b"");
let it = line_receiver.recv().await.unwrap();
if let LineStateControlSignal::Line(bytes) = it {
assert_eq!(bytes, b"Hello, World!".to_vec());
} else {
panic!("Expected LineStateControlSignal::Line, got something else");
}
}
#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_writeln_no_flush() {
let (line_sender, mut line_receiver) = tokio::sync::mpsc::channel(1_000);
let mut shared_writer = SharedWriter::new(line_sender);
shared_writer.write_all(b"Hello, World!\n").unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let it = line_receiver.recv().await.unwrap();
if let LineStateControlSignal::Line(bytes) = it {
assert_eq!(bytes, b"Hello, World!\n".to_vec());
} else {
panic!("Expected LineStateControlSignal::Line, got something else");
}
}
#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_clone_silent_error() {
let (line_sender, mut line_receiver) = tokio::sync::mpsc::channel(1_000);
let mut shared_writer = SharedWriter::new(line_sender);
assert!(!shared_writer.silent_error);
let mut cloned_writer = shared_writer.clone();
assert!(cloned_writer.silent_error);
cloned_writer.write_all(b"Hello, World!\n").unwrap();
assert!(cloned_writer.buffer.is_empty());
line_receiver.close();
cloned_writer.write_all(b"Hello, World!\n").unwrap();
assert!(shared_writer.write_all(b"error\n").is_err());
}
}