use std::io::Read;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_priority_channel as priority;
use tokio::{
spawn,
sync::{mpsc, oneshot},
};
use tracing::trace;
use watchexec_events::{Event, KeyCode, Keyboard, Modifiers, Priority, Source, Tag};
use crate::{
error::{CriticalError, RuntimeError},
Config,
};
pub async fn worker(
config: Arc<Config>,
errors: mpsc::Sender<RuntimeError>,
events: priority::Sender<Event, Priority>,
) -> Result<(), CriticalError> {
let mut send_close = None;
let mut config_watch = config.watch();
loop {
config_watch.next().await;
let want_keyboard = config.keyboard_events.get();
match (want_keyboard, &send_close) {
(true, None) => {
let (close_s, close_r) = oneshot::channel::<()>();
send_close = Some(close_s);
spawn(watch_stdin(errors.clone(), events.clone(), close_r));
}
(false, Some(_)) => {
send_close
.take()
.expect("unreachable due to match")
.send(())
.ok();
}
_ => {}
}
}
}
#[cfg(unix)]
mod raw_mode {
use std::os::fd::AsRawFd;
pub struct RawModeGuard {
fd: i32,
original: libc::termios,
}
impl RawModeGuard {
pub fn enter() -> Option<Self> {
let fd = std::io::stdin().as_raw_fd();
unsafe {
if libc::isatty(fd) == 0 {
return None;
}
let mut original: libc::termios = std::mem::zeroed();
if libc::tcgetattr(fd, &mut original) != 0 {
return None;
}
let mut raw = original;
libc::cfmakeraw(&mut raw);
raw.c_oflag |= libc::OPOST;
raw.c_cc[libc::VMIN] = 0;
raw.c_cc[libc::VTIME] = 1;
if libc::tcsetattr(fd, libc::TCSANOW, &raw) != 0 {
return None;
}
Some(Self { fd, original })
}
}
}
impl Drop for RawModeGuard {
fn drop(&mut self) {
unsafe {
libc::tcsetattr(self.fd, libc::TCSANOW, &self.original);
}
}
}
}
#[cfg(windows)]
mod raw_mode {
use windows_sys::Win32::Foundation::{HANDLE, INVALID_HANDLE_VALUE};
use windows_sys::Win32::System::Console::{
GetConsoleMode, GetStdHandle, SetConsoleMode, ENABLE_ECHO_INPUT, ENABLE_LINE_INPUT,
ENABLE_PROCESSED_INPUT, STD_INPUT_HANDLE,
};
pub struct RawModeGuard {
handle: HANDLE,
original_mode: u32,
}
unsafe impl Send for RawModeGuard {}
impl RawModeGuard {
pub fn enter() -> Option<Self> {
unsafe {
let handle = GetStdHandle(STD_INPUT_HANDLE);
if handle == INVALID_HANDLE_VALUE || handle.is_null() {
return None;
}
let mut original_mode: u32 = 0;
if GetConsoleMode(handle, &mut original_mode) == 0 {
return None;
}
let raw_mode = original_mode
& !(ENABLE_LINE_INPUT | ENABLE_ECHO_INPUT | ENABLE_PROCESSED_INPUT);
if SetConsoleMode(handle, raw_mode) == 0 {
return None;
}
Some(Self {
handle,
original_mode,
})
}
}
}
impl Drop for RawModeGuard {
fn drop(&mut self) {
unsafe {
SetConsoleMode(self.handle, self.original_mode);
}
}
}
}
fn byte_to_keyboard(byte: u8) -> Option<Keyboard> {
match byte {
3 | 4 => Some(Keyboard::Eof),
13 => Some(Keyboard::Key {
key: KeyCode::Enter,
modifiers: Modifiers::default(),
}),
b @ 1..=26 => Some(Keyboard::Key {
key: KeyCode::Char((b + b'a' - 1) as char),
modifiers: Modifiers {
ctrl: true,
..Default::default()
},
}),
27 => Some(Keyboard::Key {
key: KeyCode::Escape,
modifiers: Modifiers::default(),
}),
b if char::from(b).is_ascii_graphic() || b == b' ' => Some(Keyboard::Key {
key: KeyCode::Char(char::from(b)),
modifiers: Modifiers::default(),
}),
_ => None,
}
}
async fn watch_stdin(
errors: mpsc::Sender<RuntimeError>,
events: priority::Sender<Event, Priority>,
close_r: oneshot::Receiver<()>,
) -> Result<(), CriticalError> {
let cancel = Arc::new(AtomicBool::new(false));
let cancel_clone = cancel.clone();
let (tx, mut rx) = mpsc::channel::<Result<Vec<u8>, ()>>(16);
tokio::task::spawn_blocking(move || {
#[cfg(any(unix, windows))]
let _raw_guard = raw_mode::RawModeGuard::enter();
let mut stdin = std::io::stdin().lock();
let mut buffer = [0u8; 10];
while !cancel_clone.load(Ordering::Relaxed) {
match stdin.read(&mut buffer) {
Ok(0) => {
#[cfg(any(unix, windows))]
if _raw_guard.is_some() {
continue;
}
let _ = tx.blocking_send(Ok(vec![]));
break;
}
Ok(n) => {
if tx.blocking_send(Ok(buffer[..n].to_vec())).is_err() {
break;
}
}
Err(_) => {
let _ = tx.blocking_send(Err(()));
break;
}
}
}
});
tokio::select! {
_ = async {
'read: while let Some(result) = rx.recv().await {
match result {
Ok(bytes) if bytes.is_empty() => {
let _ = send_event(errors.clone(), events.clone(), Keyboard::Eof).await;
break;
}
Ok(bytes) => {
for &byte in &bytes {
if let Some(key) = byte_to_keyboard(byte) {
let is_eof = matches!(key, Keyboard::Eof);
let _ = send_event(errors.clone(), events.clone(), key).await;
if is_eof {
break 'read;
}
}
}
}
Err(()) => break,
}
}
} => {}
_ = close_r => {}
}
cancel.store(true, Ordering::Relaxed);
Ok(())
}
async fn send_event(
errors: mpsc::Sender<RuntimeError>,
events: priority::Sender<Event, Priority>,
msg: Keyboard,
) -> Result<(), CriticalError> {
let tags = vec![Tag::Source(Source::Keyboard), Tag::Keyboard(msg)];
let event = Event {
tags,
metadata: Default::default(),
};
trace!(?event, "processed keyboard input into event");
if let Err(err) = events.send(event, Priority::Normal).await {
errors
.send(RuntimeError::EventChannelSend {
ctx: "keyboard",
err,
})
.await?;
}
Ok(())
}