use std::cell::RefCell;
use std::io::{self, Write};
use std::sync::{Mutex, Weak};
pub trait LiveOutput: Send + Sync {
fn push_captured_line(&self, line: String, is_stderr: bool);
}
#[derive(Clone)]
pub struct FileProxy {
state: Weak<Mutex<dyn LiveOutput>>,
is_stderr: bool,
buffer: RefCell<String>,
}
unsafe impl Send for FileProxy {}
impl FileProxy {
pub fn new(state: Weak<Mutex<dyn LiveOutput>>, is_stderr: bool) -> Self {
Self {
state,
is_stderr,
buffer: RefCell::new(String::new()),
}
}
pub fn is_alive(&self) -> bool {
self.state.upgrade().is_some()
}
fn flush_buffer(&self) -> io::Result<()> {
let content = std::mem::take(&mut *self.buffer.borrow_mut());
if !content.is_empty() {
self.send_line(&content);
}
Ok(())
}
fn send_line(&self, line: &str) {
if let Some(state) = self.state.upgrade() {
if let Ok(state) = state.lock() {
state.push_captured_line(line.to_string(), self.is_stderr);
}
}
}
}
impl Write for FileProxy {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let text = String::from_utf8_lossy(buf);
let mut chunks = text.split('\n').peekable();
while let Some(chunk) = chunks.next() {
self.buffer.borrow_mut().push_str(chunk);
if chunks.peek().is_some() {
self.flush_buffer()?;
}
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Drop for FileProxy {
fn drop(&mut self) {
let _ = self.flush_buffer();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
struct MockSink {
captured: Arc<Mutex<Vec<(String, bool)>>>,
}
impl LiveOutput for MockSink {
fn push_captured_line(&self, line: String, is_stderr: bool) {
self.captured.lock().unwrap().push((line, is_stderr));
}
}
fn proxy_with_sink(
is_stderr: bool,
) -> (
FileProxy,
Arc<Mutex<Vec<(String, bool)>>>,
Arc<Mutex<MockSink>>,
) {
let captured = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::new(Mutex::new(MockSink {
captured: Arc::clone(&captured),
}));
let proxy = FileProxy::new(
Arc::downgrade(&sink) as Weak<Mutex<dyn LiveOutput>>,
is_stderr,
);
(proxy, captured, sink)
}
#[test]
fn buffers_until_newline() {
let (mut proxy, captured, _sink) = proxy_with_sink(false);
write!(proxy, "hello").unwrap();
assert_eq!(captured.lock().unwrap().len(), 0);
writeln!(proxy, " world").unwrap();
assert_eq!(captured.lock().unwrap().len(), 1);
assert_eq!(captured.lock().unwrap()[0].0, "hello world");
assert!(!captured.lock().unwrap()[0].1);
}
#[test]
fn marks_stderr() {
let (mut proxy, captured, _sink) = proxy_with_sink(true);
writeln!(proxy, "boom").unwrap();
assert!(captured.lock().unwrap()[0].1);
}
#[test]
fn multi_line_in_one_write() {
let (mut proxy, captured, _sink) = proxy_with_sink(false);
write!(proxy, "a\nb\nc\n").unwrap();
let lines = captured.lock().unwrap();
assert_eq!(lines.len(), 3);
assert_eq!(lines[0].0, "a");
assert_eq!(lines[1].0, "b");
assert_eq!(lines[2].0, "c");
}
#[test]
fn drop_flushes_partial_line() {
let (proxy, captured, _sink) = proxy_with_sink(false);
{
let mut proxy = proxy.clone();
write!(proxy, "no terminator").unwrap();
assert_eq!(captured.lock().unwrap().len(), 0);
drop(proxy);
}
assert_eq!(captured.lock().unwrap().len(), 1);
assert_eq!(captured.lock().unwrap()[0].0, "no terminator");
}
#[test]
fn explicit_flush_keeps_partial_buffered() {
let (mut proxy, captured, _sink) = proxy_with_sink(false);
write!(proxy, "partial").unwrap();
proxy.flush().unwrap();
assert_eq!(captured.lock().unwrap().len(), 0);
}
#[test]
fn is_alive_tracks_sink_lifetime() {
let (proxy, _captured, sink) = proxy_with_sink(false);
assert!(proxy.is_alive());
drop(sink);
assert!(!proxy.is_alive());
}
#[test]
fn writes_no_op_after_sink_dropped() {
let (mut proxy, captured, sink) = proxy_with_sink(false);
drop(sink);
writeln!(proxy, "ghost").unwrap();
assert_eq!(captured.lock().unwrap().len(), 0);
}
}