use crossterm::event::{self, Event, KeyCode, KeyEvent, KeyEventKind, KeyModifiers};
use std::io;
use std::os::unix::io::RawFd;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{self, SyncSender};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
const SUBMIT_POLL_TIMEOUT: Duration = Duration::from_millis(2);
const SHUTDOWN_JOIN_TIMEOUT: Duration = Duration::from_millis(1000);
pub enum WriteOutcome {
Complete,
QuitSignaled,
}
pub fn write_chunked(
fd: RawFd,
buf: &[u8],
mut should_quit: impl FnMut() -> io::Result<bool>,
) -> io::Result<WriteOutcome> {
let mut written = 0;
while written < buf.len() {
if should_quit()? {
return Ok(WriteOutcome::QuitSignaled);
}
let chunk_end = (written + 16384).min(buf.len());
let n = unsafe {
libc::write(
fd,
buf[written..chunk_end].as_ptr() as *const libc::c_void,
chunk_end - written,
)
};
if n > 0 {
written += n as usize;
if should_quit()? {
return Ok(WriteOutcome::QuitSignaled);
}
} else if n < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(err);
}
}
Ok(WriteOutcome::Complete)
}
pub fn is_quit_key(code: KeyCode, modifiers: KeyModifiers, quit_keys: &[KeyCode]) -> bool {
quit_keys.contains(&code)
|| (code == KeyCode::Char('c') && modifiers.contains(KeyModifiers::CONTROL))
}
pub enum SubmitResult {
Ok,
Quit,
WriterDied,
}
pub struct ThreadedRenderer {
tx: Option<SyncSender<Vec<u8>>>,
write_time: Arc<AtomicU64>,
handle: Option<JoinHandle<()>>,
}
impl ThreadedRenderer {
pub fn new(quit: Arc<AtomicBool>, fd: RawFd) -> Self {
let (tx, rx) = mpsc::sync_channel::<Vec<u8>>(1);
let write_time = Arc::new(AtomicU64::new(0));
let write_time_w = write_time.clone();
let quit_w = quit;
let handle = thread::spawn(move || {
while let Ok(buf) = rx.recv() {
let start = Instant::now();
match write_chunked(fd, &buf, || Ok(quit_w.load(Ordering::Relaxed))) {
Ok(_) => {
write_time_w.store(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
}
Err(_) => break, }
}
});
ThreadedRenderer {
tx: Some(tx),
write_time,
handle: Some(handle),
}
}
pub fn submit(
&mut self,
mut buf: Vec<u8>,
quit: &AtomicBool,
quit_keys: &[KeyCode],
) -> io::Result<SubmitResult> {
let tx = match &self.tx {
Some(tx) => tx,
None => return Ok(SubmitResult::WriterDied),
};
loop {
if quit.load(Ordering::Relaxed) {
return Ok(SubmitResult::Quit);
}
match tx.try_send(buf) {
Ok(()) => return Ok(SubmitResult::Ok),
Err(mpsc::TrySendError::Full(b)) => {
buf = b;
if event::poll(SUBMIT_POLL_TIMEOUT)? && poll_quit(quit, quit_keys)? {
return Ok(SubmitResult::Quit);
}
}
Err(mpsc::TrySendError::Disconnected(_)) => return Ok(SubmitResult::WriterDied),
}
}
}
pub fn write_time_secs(&self) -> f64 {
self.write_time.load(Ordering::Relaxed) as f64 / 1e9
}
pub fn shutdown(mut self) -> io::Result<()> {
self.tx.take();
if let Some(handle) = self.handle.take() {
let (done_tx, done_rx) = mpsc::channel();
thread::spawn(move || {
let _ = handle.join();
let _ = done_tx.send(());
});
let _ = done_rx.recv_timeout(SHUTDOWN_JOIN_TIMEOUT);
}
Ok(())
}
}
fn poll_quit(quit: &AtomicBool, quit_keys: &[KeyCode]) -> io::Result<bool> {
if let Event::Key(KeyEvent {
code,
kind: KeyEventKind::Press,
modifiers,
..
}) = event::read()?
&& is_quit_key(code, modifiers, quit_keys)
{
quit.store(true, Ordering::Release);
return Ok(true);
}
Ok(false)
}
#[cfg(test)]
mod tests {
use super::*;
use crossterm::event::KeyCode;
use std::io::Read;
use std::os::unix::io::{AsRawFd, FromRawFd};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::{Duration, Instant};
fn make_pipe() -> (std::fs::File, std::fs::File) {
let mut fds = [0i32; 2];
assert_eq!(unsafe { libc::pipe(fds.as_mut_ptr()) }, 0, "pipe() failed");
let read_end = unsafe { std::fs::File::from_raw_fd(fds[0]) };
let write_end = unsafe { std::fs::File::from_raw_fd(fds[1]) };
(read_end, write_end)
}
#[test]
fn write_chunked_writes_all_bytes_when_no_quit() {
let (mut read_end, write_end) = make_pipe();
let fd = write_end.as_raw_fd();
let data = b"hello threaded world".repeat(100); let outcome = write_chunked(fd, &data, || Ok(false)).unwrap();
assert!(matches!(outcome, WriteOutcome::Complete));
drop(write_end); let mut got = Vec::new();
read_end.read_to_end(&mut got).unwrap();
assert_eq!(got, data);
}
#[test]
fn write_chunked_bails_when_quit_signalled() {
let (mut read_end, write_end) = make_pipe();
let fd = write_end.as_raw_fd();
let data = vec![b'x'; 40_000];
let mut calls = 0;
let outcome = write_chunked(fd, &data, || {
calls += 1;
Ok(calls >= 2) })
.unwrap();
assert!(matches!(outcome, WriteOutcome::QuitSignaled));
drop(write_end);
let mut got = Vec::new();
read_end.read_to_end(&mut got).unwrap();
assert!(!got.is_empty(), "expected at least one chunk written");
assert!(
got.len() < data.len(),
"wrote {} bytes, expected a partial write",
got.len()
);
}
#[test]
fn threaded_renderer_writes_submitted_frame() {
let (read_end, write_end) = make_pipe();
let fd = write_end.as_raw_fd();
let quit = Arc::new(AtomicBool::new(false));
let mut renderer = ThreadedRenderer::new(quit.clone(), fd);
let frame = b"\x1b[?2026hframe1\x1b[?2026l".to_vec();
let res = renderer
.submit(frame, &quit, &[KeyCode::Char('q')])
.unwrap();
assert!(matches!(res, SubmitResult::Ok));
renderer.shutdown().unwrap();
drop(write_end);
let mut read_end = read_end;
let mut got = Vec::new();
read_end.read_to_end(&mut got).unwrap();
assert_eq!(got, b"\x1b[?2026hframe1\x1b[?2026l");
}
#[test]
fn threaded_renderer_shutdown_joins_promptly() {
let (_read_end, write_end) = make_pipe();
let fd = write_end.as_raw_fd();
let quit = Arc::new(AtomicBool::new(false));
let renderer = ThreadedRenderer::new(quit, fd);
let start = Instant::now();
renderer.shutdown().unwrap();
assert!(
start.elapsed() < Duration::from_secs(2),
"shutdown took {:?}, expected prompt join",
start.elapsed()
);
}
#[test]
fn threaded_renderer_reports_write_died_after_drop() {
let (_read_end, write_end) = make_pipe();
let fd = write_end.as_raw_fd();
let quit = Arc::new(AtomicBool::new(false));
let mut renderer = ThreadedRenderer::new(quit.clone(), fd);
renderer.tx.take();
let res = renderer
.submit(vec![b'x'; 10], &quit, &[KeyCode::Char('q')])
.unwrap();
assert!(matches!(res, SubmitResult::WriterDied));
}
}