conciliator 0.3.10

[WIP] Library for interactive CLI programs
Documentation
use std::io;
use std::io::ErrorKind;
use std::os::fd::AsRawFd;
use std::sync::Arc;
use std::sync::atomic::{
	AtomicBool,
	Ordering
};

use mio::{
	Interest,
	Poll,
	Token,
	Events,
	Waker
};
use mio::unix::SourceFd;
use tokio::sync::mpsc::{
	unbounded_channel,
	UnboundedSender,
	UnboundedReceiver
};
use tokio::sync::oneshot::{
	channel as oneshot_channel,
	Sender as Replier
};
use tokio::sync::watch::{
	channel as watch_channel,
	Receiver,
	Sender
};
use tokio::task::{
	JoinHandle,
	spawn
};
use tokio::time::{
	Instant,
	sleep_until
};

use crate::{
	Buffer,
	Claw
};
use crate::term::{
	EchoDisabler,
	InputRequest,
	NonBlockingStdin
};
use super::animation::{
	Animate,
	Animator
};

pub(super) struct Handle {
	claw: Claw,
	cleared: Arc<AtomicBool>,
	handle: Option<JoinHandle<()>>,
	lines_tx: Option<UnboundedSender<Buffer>>,
	request_tx: UnboundedSender<Replier<String>>,
	message: Option<Sender<Buffer>>
}

async fn launch_animator<A: Animate>(
	claw: Claw,
	anim: A,
	cleared: Arc<AtomicBool>,
	lines: UnboundedReceiver<Buffer>,
	request_rx: UnboundedReceiver<Replier<String>>,
	message: Receiver<Buffer>)
{
	Animator::new(claw, anim, cleared)
		.run_async(lines, request_rx, message)
		.await;
}

struct StdinNotifier {
	enable: Arc<AtomicBool>,
	waker: Waker,
	notifier: Receiver<()>,
	handle: JoinHandle<()>
}

/*
 *	HANDLE
 */

impl Handle {
	pub(crate) fn spawn<A>(claw: Claw, anim: A, msg: Buffer) -> Self
		where A: Animate + Send + 'static
	{
		let (lines_tx, lines_rx) = unbounded_channel();
		let (request_tx, request_rx) = unbounded_channel();
		let (message, message_rx) = watch_channel(msg);
		let cleared = Arc::new(AtomicBool::new(false));
		let handle = spawn(launch_animator(
			claw.clone(),
			anim,
			cleared.clone(),
			lines_rx,
			request_rx,
			message_rx
		));
		Self {
			claw,
			cleared,
			lines_tx: Some(lines_tx),
			message: Some(message),
			request_tx,
			handle: Some(handle)
		}
	}
	pub(crate) fn send_line(&self, buffer: Buffer) {
		if let Some(lines_tx) = self.lines_tx.as_ref() {
			lines_tx.send(buffer).ok().unwrap();
		}
	}
	pub(crate) async fn request_input(&self) -> String {
		let (input_tx, input_rx) = oneshot_channel();
		self.request_tx.send(input_tx).unwrap();
		input_rx.await.unwrap()
	}

	pub(crate) fn send_message(&self, buffer: Buffer) {
		if let Some(message) = self.message.as_ref() {
			message.send(buffer).ok().unwrap();
		}
	}
	pub(crate) async fn finish(mut self) {
		self.lines_tx.take();
		if let Some(handle) = self.handle.take() {
			handle.await.unwrap();
		}
	}
	pub(crate) async fn clear(mut self) {
		self.message.take();
		if let Some(handle) = self.handle.take() {
			handle.await.unwrap();
		}
	}
}

impl Drop for Handle {
	fn drop(&mut self) {
		// Not a clean shutdown, but some cleanup should happen in the Animator destructor
		//self.lines_tx.take();
		self.message.take();
		if let Some(handle) = self.handle.take() {
			handle.abort();
			if !self.cleared.swap(true, Ordering::Acquire) {
				self.claw.clear_spinner();
			}
		}
	}
}

/*
 *	ANIMATOR
 */

impl<A: Animate> Animator<A> {
	pub async fn run_async(
		mut self,
		mut lines: UnboundedReceiver<Buffer>,
		mut request_rx: UnboundedReceiver<Replier<String>>,
		mut message: Receiver<Buffer>)
	{
		let echo_disabler = EchoDisabler::on_stderr();
		let mut next_frame_at = Instant::now()
			+ self.init(&message.borrow_and_update());

		let mut notifier = StdinNotifier::new().unwrap();
		let mut input_req: Option<InputRequest<_>> = None;

		let finish = loop {
			tokio::select! {
				maybe_line = lines.recv() => match maybe_line {
					Some(line) => {
						self.clear_line();
						self.print_buffer(line);
						// avoid printing (and deleting) message between every line
						while let Ok(line) = lines.try_recv() {
							self.print_buffer(line);
						}
						self.write_message_line(
							&message.borrow_and_update(),
							input_req.as_ref().map(|req| req.buffer.as_slice())
						);
					},
					None => break true
				},
				maybe_message = message.changed() => match maybe_message {
					Ok(()) => {
						self.clear_line();
						self.write_message_line(
							&message.borrow_and_update(),
							input_req.as_ref().map(|req| req.buffer.as_slice())
						);
					},
					Err(_e) => break false
				},
				maybe_request = request_rx.recv() => match maybe_request {
					Some(mut req) => {
						// fast-forward to the newest request, just in case
						while let Ok(newer_request) = request_rx.try_recv() {
							req = newer_request;
						}
						notifier.enable();
						InputRequest::replace(&mut input_req, Some(req));
					},
					None => break true
				},
				_ = notifier.readable(), if input_req.is_some() => {
					if let Some(request) = input_req.as_mut() {
						// create non-blocking stdin
						let mut stdin = NonBlockingStdin::unblock().unwrap();

						let before = request.buffer.len();
						// read stdin into buffer
						let r = stdin.drain_input(&mut request.buffer).unwrap();
						// drop non-blocking stdin
						drop(stdin);

						// check if input is finished
						if let Some(input_str) = r {
							self.accept_input(
								&message.borrow_and_update(),
								&input_str.as_bytes()[before..]
							);

							// switch back to "normal mode"
							let _ = input_req.take()
								.unwrap()
								.sender
								.take()
								.unwrap()
								.send(input_str);

							notifier.disable();
						}
						else {
							//TODO: optimize
							self.write_message_line(
								&message.borrow_and_update(),
								Some(request.buffer.as_slice())
							);
						}
					}
				},
				() = sleep_until(next_frame_at) => {
					next_frame_at = Instant::now()+ self.step(
						&message.borrow_and_update(),
						input_req.as_ref().map(|req| req.buffer.as_slice())
					);
				}
			}
		};
		notifier.join().await;
		if finish {
			self.cleanup(&message.borrow_and_update());
		}
		echo_disabler.reset();
	}
}

/*
 *	STDIN NOTIFIER
 */

impl StdinNotifier {
	const WAKER_TOKEN: Token = Token(0);
	const STDIN_TOKEN: Token = Token(1);

	fn register_stdin(poll: &Poll) {
		// try to register stdin for polling
		let registered = poll.registry().register(
			&mut SourceFd(&io::stdin().as_raw_fd()),
			Self::STDIN_TOKEN,
			Interest::READABLE
		);
		match registered {
			Ok(()) => {},
			Err(e) if e.kind() == ErrorKind::AlreadyExists => {},
			// fails for regular files because they are always ready
			//TODO: what do we do here?
			Err(e) if e.kind() == ErrorKind::PermissionDenied => {
				panic!("cannot poll regular file")
			},
			Err(e) => panic!("unable to poll stdin: {e}")
		}
	}
	fn deregister_stdin(poll: &Poll) {
		let res = poll.registry().deregister(
			&mut SourceFd(&io::stdin().as_raw_fd())
		);
		match res {
			Ok(()) => {},
			// ENOENT if the fd wasn't registered anyway, ignore
			Err(e) if e.kind() == ErrorKind::NotFound => {},
			// should work even if we receive spurious wakes, don't panic
			Err(_) => {}
		}
	}

	pub fn new() -> io::Result<Self> {
		let mut events = Events::with_capacity(32);
		let mut poll = Poll::new()?;
		let waker = Waker::new(poll.registry(), Self::WAKER_TOKEN)?;
		let watch_stdin = Arc::new(AtomicBool::new(false));
		let (readable_tx, readable_rx) = watch_channel(());

		let poll_task = {
			let watch_stdin = watch_stdin.clone();
			tokio::task::spawn_blocking(move || {
				let mut registered = false;
				loop {
					if readable_tx.is_closed() {
						break;
					}
					if watch_stdin.load(Ordering::Acquire) != registered {
						if registered {
							Self::deregister_stdin(&poll);
							registered = false;
						}
						else {
							Self::register_stdin(&poll);
							registered = true;
						}
					}

					match poll.poll(&mut events, None) {
						Ok(()) => {},
						Err(e) if e.kind() == ErrorKind::Interrupted => {},
						Err(e) => panic!("failed to poll with timeout: {e}")
					};
					for event in &events {
						if event.token() == Self::STDIN_TOKEN
							&& event.is_readable()
							&& readable_tx.send(()).is_err()
						{
							return;
						}
					}
				}
			})
		};

		Ok(Self {
			enable: watch_stdin,
			waker,
			notifier: readable_rx,
			handle: poll_task
		})
	}

	pub async fn readable(&mut self) {
		self.notifier.changed().await.unwrap();
	}

	pub fn enable(&self) {
		if !self.enable.swap(true, Ordering::Release) {
			self.waker.wake().unwrap();
		}
	}
	pub fn disable(&self) {
		if self.enable.swap(false, Ordering::Release) {
			self.waker.wake().unwrap();
		}
	}
	pub async fn join(self) {
		drop(self.notifier);
		self.waker.wake().unwrap();
		self.handle.await.unwrap();
	}
}