use std::collections::HashMap;
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant};
pub const LIVE_TAIL_CAP: usize = 128 * 1024;
const GRACE: Duration = Duration::from_secs(60);
const SLACK: usize = 16 * 1024;
#[derive(Default)]
struct Ring {
buf: Vec<u8>,
truncated: bool,
}
impl Ring {
fn push(&mut self, chunk: &[u8]) {
self.buf.extend_from_slice(chunk);
if self.buf.len() > LIVE_TAIL_CAP + SLACK {
let overflow = self.buf.len() - LIVE_TAIL_CAP;
self.buf.drain(..overflow);
self.truncated = true;
}
}
fn snapshot(&self) -> (String, bool) {
let mut start = self.buf.len().saturating_sub(LIVE_TAIL_CAP);
let trimmed = start > 0 || self.truncated;
if trimmed {
while start < self.buf.len() && (self.buf[start] & 0b1100_0000) == 0b1000_0000 {
start += 1;
}
}
(
String::from_utf8_lossy(&self.buf[start..]).into_owned(),
trimmed,
)
}
}
pub struct LiveTail {
stdout: Mutex<Ring>,
stderr: Mutex<Ring>,
running: Mutex<bool>,
finished_at: Mutex<Option<Instant>>,
}
impl LiveTail {
fn new() -> Self {
Self {
stdout: Mutex::new(Ring::default()),
stderr: Mutex::new(Ring::default()),
running: Mutex::new(true),
finished_at: Mutex::new(None),
}
}
pub fn push_stdout(&self, chunk: &[u8]) {
if chunk.is_empty() {
return;
}
self.stdout.lock().unwrap().push(chunk);
}
pub fn push_stderr(&self, chunk: &[u8]) {
if chunk.is_empty() {
return;
}
self.stderr.lock().unwrap().push(chunk);
}
pub fn snapshot(&self) -> Snapshot {
let (stdout, stdout_truncated) = self.stdout.lock().unwrap().snapshot();
let (stderr, stderr_truncated) = self.stderr.lock().unwrap().snapshot();
Snapshot {
stdout,
stderr,
stdout_truncated,
stderr_truncated,
running: *self.running.lock().unwrap(),
}
}
fn mark_finished(&self) {
*self.running.lock().unwrap() = false;
*self.finished_at.lock().unwrap() = Some(Instant::now());
}
fn evictable(&self) -> bool {
matches!(*self.finished_at.lock().unwrap(), Some(t) if t.elapsed() >= GRACE)
}
}
pub struct Snapshot {
pub stdout: String,
pub stderr: String,
pub stdout_truncated: bool,
pub stderr_truncated: bool,
pub running: bool,
}
type Map = Mutex<HashMap<String, Arc<LiveTail>>>;
fn registry() -> &'static Map {
static REGISTRY: OnceLock<Map> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
}
fn sweep(map: &mut HashMap<String, Arc<LiveTail>>) {
map.retain(|_, tail| !tail.evictable());
}
pub fn register(result_id: &str) -> LiveHandle {
let tail = Arc::new(LiveTail::new());
let mut map = registry().lock().unwrap();
sweep(&mut map);
map.insert(result_id.to_string(), tail.clone());
LiveHandle { tail }
}
pub fn get(result_id: &str) -> Option<Arc<LiveTail>> {
let mut map = registry().lock().unwrap();
sweep(&mut map);
map.get(result_id).cloned()
}
pub struct LiveHandle {
tail: Arc<LiveTail>,
}
impl LiveHandle {
pub fn tail(&self) -> Arc<LiveTail> {
self.tail.clone()
}
}
impl Drop for LiveHandle {
fn drop(&mut self) {
self.tail.mark_finished();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_and_snapshot_round_trip() {
let tail = LiveTail::new();
tail.push_stdout(b"hello ");
tail.push_stdout(b"world");
tail.push_stderr(b"oops");
let s = tail.snapshot();
assert_eq!(s.stdout, "hello world");
assert_eq!(s.stderr, "oops");
assert!(!s.stdout_truncated);
assert!(s.running);
}
#[test]
fn snapshot_trims_to_cap_via_slack() {
let mut ring = Ring::default();
ring.push(&vec![b'a'; LIVE_TAIL_CAP]);
let (_s, trimmed) = ring.snapshot();
assert!(!trimmed, "exactly cap → nothing dropped");
ring.push(b"bbbb");
assert!(!ring.truncated, "slack not yet exceeded → no drain");
let (s, trimmed) = ring.snapshot();
assert!(trimmed);
assert!(s.ends_with("bbbb"));
assert!(s.len() <= LIVE_TAIL_CAP);
}
#[test]
fn ring_physically_drains_past_cap_plus_slack() {
let mut ring = Ring::default();
ring.push(&vec![b'a'; LIVE_TAIL_CAP + SLACK + 100]);
assert!(ring.truncated);
assert_eq!(ring.buf.len(), LIVE_TAIL_CAP);
}
#[test]
fn truncated_front_snaps_to_char_boundary() {
let mut ring = Ring::default();
ring.push("あ".as_bytes()); ring.push(&vec![b'x'; LIVE_TAIL_CAP - 2]); let (s, trimmed) = ring.snapshot();
assert!(trimmed);
assert!(
!s.starts_with('\u{FFFD}'),
"front snapped to boundary: {s:?}"
);
}
#[test]
fn register_get_and_finish_flag() {
let handle = register("res-test-1");
handle.tail().push_stdout(b"live");
let got = get("res-test-1").expect("registered");
assert!(got.snapshot().running);
assert_eq!(got.snapshot().stdout, "live");
drop(handle);
let after = get("res-test-1").expect("retained in grace");
assert!(!after.snapshot().running);
}
}