1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//! Event source for keyboard input and related events
use async_priority_channel as priority;
use tokio::{
	io::AsyncReadExt,
	sync::{mpsc, oneshot, watch},
};
use tracing::trace;
pub use watchexec_events::Keyboard;

use crate::{
	error::{CriticalError, KeyboardWatcherError, RuntimeError},
	event::{Event, Priority, Source, Tag},
};

/// The configuration of the [keyboard][self] worker.
///
/// This is marked non-exhaustive so new configuration can be added without breaking.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct WorkingData {
	/// Whether or not to watch for 'end of file' on stdin
	pub eof: bool,
}

/// Launch the filesystem event worker.
///
/// While you can run several, you should only have one.
///
/// Sends keyboard events via to the provided 'events' channel
pub async fn worker(
	mut working: watch::Receiver<WorkingData>,
	errors: mpsc::Sender<RuntimeError>,
	events: priority::Sender<Event, Priority>,
) -> Result<(), CriticalError> {
	let mut send_close = None;
	while working.changed().await.is_ok() {
		let watch_for_eof = { working.borrow().eof };
		match (watch_for_eof, &send_close) {
			// If we want to watch stdin and we're not already watching it then spawn a task to watch it
			(true, None) => {
				let (close_s, close_r) = tokio::sync::oneshot::channel::<()>();

				send_close = Some(close_s);
				tokio::spawn(watch_stdin(errors.clone(), events.clone(), close_r));
			}
			// If we don't want to watch stdin but we are already watching it then send a close signal to end the
			// watching
			(false, Some(_)) => {
				// Repeat match using 'take'
				if let Some(close_s) = send_close.take() {
					if close_s.send(()).is_err() {
						errors
							.send(RuntimeError::KeyboardWatcher {
								err: KeyboardWatcherError::StdinShutdown,
							})
							.await?;
					}
				}
			}
			// Otherwise no action is required
			_ => {}
		}
	}

	Ok(())
}

async fn watch_stdin(
	errors: mpsc::Sender<RuntimeError>,
	events: priority::Sender<Event, Priority>,
	mut close_r: oneshot::Receiver<()>,
) -> Result<(), CriticalError> {
	let mut stdin = tokio::io::stdin();
	let mut buffer = [0; 10];
	loop {
		tokio::select! {
			result = stdin.read(&mut buffer[..]) => {
				// Read from stdin and if we've read 0 bytes then we assume stdin has received an 'eof' so
				// we send that event into the system and break out of the loop as 'eof' means that there will
				// be no more information on stdin.
				match result {
					Ok(0) => {
						send_event(errors, events, Keyboard::Eof).await?;
						break;
					}
					Err(_) => break,
					_ => {
					}
				}
			}
			_ = &mut close_r => {
				// If we receive a close signal then break out of the loop and end which drops
				// our handle on stdin
				break;
			}
		}
	}

	Ok(())
}

async fn send_event(
	errors: mpsc::Sender<RuntimeError>,
	events: priority::Sender<Event, Priority>,
	msg: Keyboard,
) -> Result<(), CriticalError> {
	let tags = vec![Tag::Source(Source::Keyboard), Tag::Keyboard(msg)];

	let event = Event {
		tags,
		metadata: Default::default(),
	};

	trace!(?event, "processed keyboard input into event");
	if let Err(err) = events.send(event, Priority::Normal).await {
		errors
			.send(RuntimeError::EventChannelSend {
				ctx: "keyboard",
				err,
			})
			.await?;
	}

	Ok(())
}