use async_priority_channel as priority;
use tokio::{
io::AsyncReadExt,
sync::{mpsc, oneshot, watch},
};
use tracing::trace;
pub use watchexec_events::Keyboard;
use crate::{
error::{CriticalError, KeyboardWatcherError, RuntimeError},
event::{Event, Priority, Source, Tag},
};
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct WorkingData {
pub eof: bool,
}
pub async fn worker(
mut working: watch::Receiver<WorkingData>,
errors: mpsc::Sender<RuntimeError>,
events: priority::Sender<Event, Priority>,
) -> Result<(), CriticalError> {
let mut send_close = None;
while working.changed().await.is_ok() {
let watch_for_eof = { working.borrow().eof };
match (watch_for_eof, &send_close) {
(true, None) => {
let (close_s, close_r) = tokio::sync::oneshot::channel::<()>();
send_close = Some(close_s);
tokio::spawn(watch_stdin(errors.clone(), events.clone(), close_r));
}
(false, Some(_)) => {
if let Some(close_s) = send_close.take() {
if close_s.send(()).is_err() {
errors
.send(RuntimeError::KeyboardWatcher {
err: KeyboardWatcherError::StdinShutdown,
})
.await?;
}
}
}
_ => {}
}
}
Ok(())
}
async fn watch_stdin(
errors: mpsc::Sender<RuntimeError>,
events: priority::Sender<Event, Priority>,
mut close_r: oneshot::Receiver<()>,
) -> Result<(), CriticalError> {
let mut stdin = tokio::io::stdin();
let mut buffer = [0; 10];
loop {
tokio::select! {
result = stdin.read(&mut buffer[..]) => {
match result {
Ok(0) => {
send_event(errors, events, Keyboard::Eof).await?;
break;
}
Err(_) => break,
_ => {
}
}
}
_ = &mut close_r => {
break;
}
}
}
Ok(())
}
async fn send_event(
errors: mpsc::Sender<RuntimeError>,
events: priority::Sender<Event, Priority>,
msg: Keyboard,
) -> Result<(), CriticalError> {
let tags = vec![Tag::Source(Source::Keyboard), Tag::Keyboard(msg)];
let event = Event {
tags,
metadata: Default::default(),
};
trace!(?event, "processed keyboard input into event");
if let Err(err) = events.send(event, Priority::Normal).await {
errors
.send(RuntimeError::EventChannelSend {
ctx: "keyboard",
err,
})
.await?;
}
Ok(())
}