use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use encoding_rs::Encoding;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio::sync::Notify;
use crate::buffer::{OutputBufferPolicy, OverflowMode};
pub(crate) type LineHandler = Arc<dyn Fn(&str) + Send + Sync>;
pub(crate) struct SharedLines {
inner: Mutex<Inner>,
notify: Notify,
count: AtomicUsize,
}
struct Inner {
lines: VecDeque<String>,
max: Option<usize>,
mode: OverflowMode,
closed: bool,
}
pub(crate) enum Popped {
Line(String),
Empty,
Closed,
}
impl SharedLines {
pub(crate) fn new(policy: &OutputBufferPolicy) -> Arc<Self> {
Arc::new(Self {
inner: Mutex::new(Inner {
lines: VecDeque::new(),
max: policy.max_lines,
mode: policy.overflow,
closed: false,
}),
notify: Notify::new(),
count: AtomicUsize::new(0),
})
}
fn push(&self, line: String) {
self.count.fetch_add(1, Ordering::Relaxed);
{
let mut inner = self.inner.lock().expect("SharedLines poisoned");
match inner.max {
Some(0) => {} Some(n) if inner.lines.len() >= n => match inner.mode {
OverflowMode::DropOldest => {
inner.lines.pop_front();
inner.lines.push_back(line);
}
OverflowMode::DropNewest => {} },
_ => inner.lines.push_back(line),
}
}
self.notify.notify_one();
}
fn close(&self) {
self.inner.lock().expect("SharedLines poisoned").closed = true;
self.notify.notify_one();
}
pub(crate) fn close_now(&self) {
self.close();
}
pub(crate) fn count(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
pub(crate) fn drain(&self) -> Vec<String> {
let mut inner = self.inner.lock().expect("SharedLines poisoned");
inner.lines.drain(..).collect()
}
pub(crate) fn try_pop(&self) -> Popped {
let mut inner = self.inner.lock().expect("SharedLines poisoned");
if let Some(line) = inner.lines.pop_front() {
Popped::Line(line)
} else if inner.closed {
Popped::Closed
} else {
Popped::Empty
}
}
pub(crate) async fn changed(self: Arc<Self>) {
self.notify.notified().await;
}
}
pub(crate) async fn pump_lines<R>(
reader: R,
encoding: &'static Encoding,
handler: Option<LineHandler>,
sink: Arc<SharedLines>,
) where
R: AsyncRead + Unpin,
{
let mut reader = BufReader::new(reader);
let mut buf = Vec::new();
loop {
buf.clear();
match reader.read_until(b'\n', &mut buf).await {
Ok(0) => break,
Ok(_) => {}
Err(_) => break,
}
while matches!(buf.last(), Some(b'\n') | Some(b'\r')) {
buf.pop();
}
let (decoded, _, _) = encoding.decode(&buf);
let line = decoded.into_owned();
if let Some(handler) = &handler {
handler(&line);
}
sink.push(line);
}
sink.close();
}
#[cfg(test)]
mod tests {
use super::*;
use crate::buffer::OutputBufferPolicy;
#[tokio::test]
async fn pumps_utf8_lines_and_counts() {
let sink = SharedLines::new(&OutputBufferPolicy::unbounded());
pump_lines(
&b"one\ntwo\nthree\n"[..],
encoding_rs::UTF_8,
None,
sink.clone(),
)
.await;
assert_eq!(sink.count(), 3);
assert_eq!(sink.drain(), vec!["one", "two", "three"]);
}
#[tokio::test]
async fn decodes_shift_jis() {
let sink = SharedLines::new(&OutputBufferPolicy::unbounded());
pump_lines(
&[0x82, 0xA0, b'\n'][..],
encoding_rs::SHIFT_JIS,
None,
sink.clone(),
)
.await;
assert_eq!(sink.drain(), vec!["\u{3042}"]);
}
#[tokio::test]
async fn drop_oldest_keeps_tail_but_counts_all() {
let sink = SharedLines::new(&OutputBufferPolicy::bounded(2));
pump_lines(&b"a\nb\nc\nd\n"[..], encoding_rs::UTF_8, None, sink.clone()).await;
assert_eq!(sink.count(), 4, "every line is counted");
assert_eq!(sink.drain(), vec!["c", "d"], "only the newest two retained");
}
#[tokio::test]
async fn drop_newest_keeps_head() {
let policy = OutputBufferPolicy::bounded(2).with_overflow(OverflowMode::DropNewest);
let sink = SharedLines::new(&policy);
pump_lines(&b"a\nb\nc\nd\n"[..], encoding_rs::UTF_8, None, sink.clone()).await;
assert_eq!(sink.drain(), vec!["a", "b"]);
}
#[tokio::test]
async fn handler_sees_every_line_even_when_nothing_retained() {
let seen = Arc::new(Mutex::new(Vec::new()));
let captured = seen.clone();
let handler: LineHandler =
Arc::new(move |line: &str| captured.lock().unwrap().push(line.to_owned()));
let sink = SharedLines::new(&OutputBufferPolicy::bounded(0));
pump_lines(
&b"x\ny\n"[..],
encoding_rs::UTF_8,
Some(handler),
sink.clone(),
)
.await;
assert_eq!(sink.count(), 2);
assert!(
sink.drain().is_empty(),
"retain-nothing policy keeps no lines"
);
assert_eq!(*seen.lock().unwrap(), vec!["x", "y"]);
}
}