conciliator 0.3.10

[WIP] Library for interactive CLI programs
Documentation
use std::io;
use std::io::ErrorKind;
use std::os::fd::AsRawFd;
use std::sync::mpsc::{
	channel,
	sync_channel,
	Sender,
	SyncSender,
	Receiver,
	TryRecvError
};
use std::time::{
	Duration,
	Instant
};
use std::thread::{
	Builder,
	JoinHandle
};

use mio::{
	Interest,
	Poll,
	Token,
	Events,
	Waker
};
use mio::unix::SourceFd;

use crate::{
	Buffer,
	Claw
};
use crate::term::{
	EchoDisabler,
	InputRequest,
	NonBlockingStdin
};
use super::animation::Animator;
use super::Animate;

const SPINNER_THREAD_NAME: &str = "spinner";

/// Handle to interact with the spinner thread
pub(super) struct Handle {
	lines_tx: Option<Sender<Buffer>>,
	status_tx: Option<Sender<Buffer>>,
	request_tx: Sender<SyncSender<String>>,
	waker: Waker,
	handle: Option<JoinHandle<()>>
}

/// Runs the spinner thread, holds the receiving ends for the [`Handle`]
struct Coordinator {
	poll: Poll,
	lines: Receiver<Buffer>,
	msgs: Receiver<Buffer>,
	requests: Receiver<SyncSender<String>>,
	request: Option<InputRequest<SyncSender<String>>>
}


/*
 *	HANDLE
 */

impl Handle {
	pub(crate) fn spawn<A>(claw: Claw, anim: A, msg: Buffer) -> Self
		where A: Animate + Send + 'static
	{
		let (lines_tx, lines_rx) = channel();
		let (status_tx, status_rx) = channel();
		let (request_tx, request_rx) = channel();
		let (coordinator, waker) = Coordinator::new(
			lines_rx,
			status_rx,
			request_rx
		).unwrap();
		let closure = move || {
			coordinator.run(Animator::new(claw, anim, Default::default()), msg)
		};
		let handle = Builder::new()
			.name(SPINNER_THREAD_NAME.to_string())
			.spawn(closure)
			.expect("should be able to spawn spinner thread");
		Self {
			lines_tx: Some(lines_tx),
			status_tx: Some(status_tx),
			request_tx,
			waker,
			handle: Some(handle)
		}
	}
	pub(crate) fn send_line(&self, buffer: Buffer) {
		if let Some(lines_tx) = self.lines_tx.as_ref() {
			lines_tx.send(buffer).unwrap();
		}
		self.wake();
	}
	pub(crate) fn request_input(&self) -> String {
		let (input_tx, input_rx) = sync_channel(1);
		if self.request_tx.send(input_tx).is_err() {
			panic!("spinner thread failed");
		}
		self.wake();
		match input_rx.recv() {
			Ok(s) => s,
			Err(_e) => panic!("spinner thread failed or request was superseded")
		}
	}

	pub(crate) fn send_message(&self, buffer: Buffer) {
		if let Some(status_tx) = self.status_tx.as_ref() {
			status_tx.send(buffer).unwrap();
		}
		self.wake();
	}
	fn wake(&self) {
		let _ = self.waker.wake();
	}
	pub(crate) fn finish(mut self) {
		self.lines_tx.take();
		self.wake();
		if let Some(handle) = self.handle.take() {
			handle.join().expect("failed to join spinner thread");
		}
	}
	pub(crate) fn clear(self) {}
}

impl Drop for Handle {
	fn drop(&mut self) {
		self.status_tx.take();
		self.wake();
		if let Some(handle) = self.handle.take() {
			handle.join().expect("failed to join spinner thread");
		}
	}
}

/*
 *	COORDINATOR
 */

impl Coordinator {
	const WAKER_TOKEN: Token = Token(0);
	const STDIN_TOKEN: Token = Token(1);

	fn new(
		lines: Receiver<Buffer>,
		msgs: Receiver<Buffer>,
		requests: Receiver<SyncSender<String>>,
		) -> io::Result<(Self, Waker)>
	{
		let poll = Poll::new()?;
		let waker = Waker::new(poll.registry(), Self::WAKER_TOKEN)?;
		Ok((Self {poll, lines, msgs, requests, request: None}, waker))
	}

	fn input_buffer(&self) -> Option<&[u8]> {
		self.request.as_ref().map(|req| req.buffer.as_slice())
	}
	fn deregister_stdin(&self) {
		let res = self.poll.registry().deregister(
			&mut SourceFd(&io::stdin().as_raw_fd())
		);
		match res {
			Ok(()) => {},
			// ENOENT if the fd wasn't registered anyway, ignore
			Err(e) if e.kind() == ErrorKind::NotFound => {},
			// should work even if we receive spurious wakes, don't panic
			Err(_) => {}
		}
	}

	fn run<A: Animate>(mut self, mut animator: Animator<A>, mut msg: Buffer) {
		let echo_disabler = EchoDisabler::on_stderr();
		let mut events = Events::with_capacity(32);
		let mut next_frame_in = animator.init(&msg);
		let mut last_step = Instant::now();

		let finish = loop {
			// check for new buffers / message
			if let Some(finish) = self.check(&animator, &mut msg) {
				break finish;
			}

			// check whether it's time to step
			match next_frame_in.saturating_sub(last_step.elapsed()) {
				Duration::ZERO => {
					next_frame_in = animator.step(&msg, self.input_buffer());
					last_step = Instant::now();
				},
				remaining => next_frame_in = remaining
			}

			match self.poll.poll(&mut events, Some(next_frame_in)) {
				Ok(()) => {},
				Err(e) if e.kind() == ErrorKind::Interrupted => {},
				Err(e) => panic!("failed to poll with timeout: {e}")
			};
			for event in &events {
				if event.token() == Self::STDIN_TOKEN && event.is_readable() {
					// check if we have an input request pending
					if let Some(request) = self.request.as_mut() {
						// create non-blocking stdin
						let mut stdin = NonBlockingStdin::unblock().unwrap();

						let before = request.buffer.len();
						// read stdin into buffer
						let r = stdin.drain_input(&mut request.buffer).unwrap();
						// drop non-blocking stdin
						drop(stdin);

						// check if input is finished
						if let Some(input_str) = r {
							animator.accept_input(
								&msg,
								&input_str.as_bytes()[before..]
							);
							let _ = request.sender.try_send(input_str);

							// switch back to "normal mode"
							drop(self.request.take());
							self.deregister_stdin();
						}
						else {
							//TODO: optimize
							animator.write_message_line(
								&msg,
								Some(request.buffer.as_slice())
							);
						}
					}
					else {
						// got stdin event even though we didn't want it
						self.deregister_stdin();
					}
				}
			}
		};
		match finish {
			true => animator.cleanup(&msg),
			false => {}
		}
		echo_disabler.reset();
	}

	fn check<A: Animate>(
		&mut self,
		animator: &Animator<A>,
		current_msg: &mut Buffer)
		-> Option<bool>
	{
		let mut shutdown = None;

		// check for new buffers to print
		match self.lines.try_recv() {
			Ok(line) => {
				animator.clear_line();
				animator.print_buffer(line);
				// avoid printing (and deleting) message between every line
				loop { match self.lines.try_recv() {
					Ok(line) => animator.print_buffer(line),
					Err(TryRecvError::Empty) => break,
					Err(TryRecvError::Disconnected) => {
						shutdown = Some(true);
						break;
					}
				}}
				animator.write_message_line(current_msg, self.input_buffer());
			},
			// nothing to print
			Err(TryRecvError::Empty) => {},
			// sender dropped → stop
			Err(TryRecvError::Disconnected) => shutdown = Some(true)
		}

		// check for updated message
		match self.msgs.try_recv() {
			Ok(mut msg) => {
				// fast-forward to the newest message, just in case
				loop { match self.msgs.try_recv() {
					Ok(newer_msg) => msg = newer_msg,
					Err(TryRecvError::Empty) => break,
					Err(TryRecvError::Disconnected) => {
						shutdown = Some(false);
						break;
					}
				}}
				animator.clear_line();
				*current_msg = msg;
				animator.write_message_line(current_msg, self.input_buffer());
			},
			Err(TryRecvError::Empty) => {},
			Err(TryRecvError::Disconnected) => shutdown = Some(false)
		}

		// check for input requests
		match self.requests.try_recv() {
			Ok(mut input_sender) => {
				// fast-forward to the newest request, just in case
				loop { match self.requests.try_recv() {
					Ok(newer_sender) => input_sender = newer_sender,
					Err(TryRecvError::Empty) => break,
					Err(TryRecvError::Disconnected) => {
						shutdown = Some(false);
						break;
					}
				}}

				// switch into "input mode"
				if self.request.is_none() {
					// try to register stdin for polling
					let registered = self.poll.registry().register(
						&mut SourceFd(&io::stdin().as_raw_fd()),
						Self::STDIN_TOKEN,
						Interest::READABLE
					);
					match registered {
						Ok(()) => {},
						Err(e) if e.kind() == ErrorKind::AlreadyExists => {},
						// fails for regular files because they are always ready
						//TODO: try to fulfill the request instantly
						Err(e) if e.kind() == ErrorKind::PermissionDenied => {
							panic!("cannot poll regular file")
						},
						Err(e) => panic!("unable to poll stdin: {e}")
					}
				}

				// set non-canonical input, store the sender & init/clear buffer
				InputRequest::replace(&mut self.request, input_sender);
			},
			Err(TryRecvError::Empty) => {},
			Err(TryRecvError::Disconnected) => shutdown = Some(false)
		}

		shutdown
	}
}