use std::io;
use std::io::ErrorKind;
use std::os::fd::AsRawFd;
use std::sync::Arc;
use std::sync::atomic::{
AtomicBool,
Ordering
};
use mio::{
Interest,
Poll,
Token,
Events,
Waker
};
use mio::unix::SourceFd;
use tokio::sync::mpsc::{
unbounded_channel,
UnboundedSender,
UnboundedReceiver
};
use tokio::sync::oneshot::{
channel as oneshot_channel,
Sender as Replier
};
use tokio::sync::watch::{
channel as watch_channel,
Receiver,
Sender
};
use tokio::task::{
JoinHandle,
spawn
};
use tokio::time::{
Instant,
sleep_until
};
use crate::{
Buffer,
Claw
};
use crate::term::{
EchoDisabler,
InputRequest,
NonBlockingStdin
};
use super::animation::{
Animate,
Animator
};
pub(super) struct Handle {
claw: Claw,
cleared: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
lines_tx: Option<UnboundedSender<Buffer>>,
request_tx: UnboundedSender<Replier<String>>,
message: Option<Sender<Buffer>>
}
async fn launch_animator<A: Animate>(
claw: Claw,
anim: A,
cleared: Arc<AtomicBool>,
lines: UnboundedReceiver<Buffer>,
request_rx: UnboundedReceiver<Replier<String>>,
message: Receiver<Buffer>)
{
Animator::new(claw, anim, cleared)
.run_async(lines, request_rx, message)
.await;
}
struct StdinNotifier {
enable: Arc<AtomicBool>,
waker: Waker,
notifier: Receiver<()>,
handle: JoinHandle<()>
}
impl Handle {
pub(crate) fn spawn<A>(claw: Claw, anim: A, msg: Buffer) -> Self
where A: Animate + Send + 'static
{
let (lines_tx, lines_rx) = unbounded_channel();
let (request_tx, request_rx) = unbounded_channel();
let (message, message_rx) = watch_channel(msg);
let cleared = Arc::new(AtomicBool::new(false));
let handle = spawn(launch_animator(
claw.clone(),
anim,
cleared.clone(),
lines_rx,
request_rx,
message_rx
));
Self {
claw,
cleared,
lines_tx: Some(lines_tx),
message: Some(message),
request_tx,
handle: Some(handle)
}
}
pub(crate) fn send_line(&self, buffer: Buffer) {
if let Some(lines_tx) = self.lines_tx.as_ref() {
lines_tx.send(buffer).ok().unwrap();
}
}
pub(crate) async fn request_input(&self) -> String {
let (input_tx, input_rx) = oneshot_channel();
self.request_tx.send(input_tx).unwrap();
input_rx.await.unwrap()
}
pub(crate) fn send_message(&self, buffer: Buffer) {
if let Some(message) = self.message.as_ref() {
message.send(buffer).ok().unwrap();
}
}
pub(crate) async fn finish(mut self) {
self.lines_tx.take();
if let Some(handle) = self.handle.take() {
handle.await.unwrap();
}
}
pub(crate) async fn clear(mut self) {
self.message.take();
if let Some(handle) = self.handle.take() {
handle.await.unwrap();
}
}
}
impl Drop for Handle {
fn drop(&mut self) {
self.message.take();
if let Some(handle) = self.handle.take() {
handle.abort();
if !self.cleared.swap(true, Ordering::Acquire) {
self.claw.clear_spinner();
}
}
}
}
impl<A: Animate> Animator<A> {
pub async fn run_async(
mut self,
mut lines: UnboundedReceiver<Buffer>,
mut request_rx: UnboundedReceiver<Replier<String>>,
mut message: Receiver<Buffer>)
{
let echo_disabler = EchoDisabler::on_stderr();
let mut next_frame_at = Instant::now()
+ self.init(&message.borrow_and_update());
let mut notifier = StdinNotifier::new().unwrap();
let mut input_req: Option<InputRequest<_>> = None;
let finish = loop {
tokio::select! {
maybe_line = lines.recv() => match maybe_line {
Some(line) => {
self.clear_line();
self.print_buffer(line);
while let Ok(line) = lines.try_recv() {
self.print_buffer(line);
}
self.write_message_line(
&message.borrow_and_update(),
input_req.as_ref().map(|req| req.buffer.as_slice())
);
},
None => break true
},
maybe_message = message.changed() => match maybe_message {
Ok(()) => {
self.clear_line();
self.write_message_line(
&message.borrow_and_update(),
input_req.as_ref().map(|req| req.buffer.as_slice())
);
},
Err(_e) => break false
},
maybe_request = request_rx.recv() => match maybe_request {
Some(mut req) => {
while let Ok(newer_request) = request_rx.try_recv() {
req = newer_request;
}
notifier.enable();
InputRequest::replace(&mut input_req, Some(req));
},
None => break true
},
_ = notifier.readable(), if input_req.is_some() => {
if let Some(request) = input_req.as_mut() {
let mut stdin = NonBlockingStdin::unblock().unwrap();
let before = request.buffer.len();
let r = stdin.drain_input(&mut request.buffer).unwrap();
drop(stdin);
if let Some(input_str) = r {
self.accept_input(
&message.borrow_and_update(),
&input_str.as_bytes()[before..]
);
let _ = input_req.take()
.unwrap()
.sender
.take()
.unwrap()
.send(input_str);
notifier.disable();
}
else {
self.write_message_line(
&message.borrow_and_update(),
Some(request.buffer.as_slice())
);
}
}
},
() = sleep_until(next_frame_at) => {
next_frame_at = Instant::now()+ self.step(
&message.borrow_and_update(),
input_req.as_ref().map(|req| req.buffer.as_slice())
);
}
}
};
notifier.join().await;
if finish {
self.cleanup(&message.borrow_and_update());
}
echo_disabler.reset();
}
}
impl StdinNotifier {
const WAKER_TOKEN: Token = Token(0);
const STDIN_TOKEN: Token = Token(1);
fn register_stdin(poll: &Poll) {
let registered = poll.registry().register(
&mut SourceFd(&io::stdin().as_raw_fd()),
Self::STDIN_TOKEN,
Interest::READABLE
);
match registered {
Ok(()) => {},
Err(e) if e.kind() == ErrorKind::AlreadyExists => {},
Err(e) if e.kind() == ErrorKind::PermissionDenied => {
panic!("cannot poll regular file")
},
Err(e) => panic!("unable to poll stdin: {e}")
}
}
fn deregister_stdin(poll: &Poll) {
let res = poll.registry().deregister(
&mut SourceFd(&io::stdin().as_raw_fd())
);
match res {
Ok(()) => {},
Err(e) if e.kind() == ErrorKind::NotFound => {},
Err(_) => {}
}
}
pub fn new() -> io::Result<Self> {
let mut events = Events::with_capacity(32);
let mut poll = Poll::new()?;
let waker = Waker::new(poll.registry(), Self::WAKER_TOKEN)?;
let watch_stdin = Arc::new(AtomicBool::new(false));
let (readable_tx, readable_rx) = watch_channel(());
let poll_task = {
let watch_stdin = watch_stdin.clone();
tokio::task::spawn_blocking(move || {
let mut registered = false;
loop {
if readable_tx.is_closed() {
break;
}
if watch_stdin.load(Ordering::Acquire) != registered {
if registered {
Self::deregister_stdin(&poll);
registered = false;
}
else {
Self::register_stdin(&poll);
registered = true;
}
}
match poll.poll(&mut events, None) {
Ok(()) => {},
Err(e) if e.kind() == ErrorKind::Interrupted => {},
Err(e) => panic!("failed to poll with timeout: {e}")
};
for event in &events {
if event.token() == Self::STDIN_TOKEN
&& event.is_readable()
&& readable_tx.send(()).is_err()
{
return;
}
}
}
})
};
Ok(Self {
enable: watch_stdin,
waker,
notifier: readable_rx,
handle: poll_task
})
}
pub async fn readable(&mut self) {
self.notifier.changed().await.unwrap();
}
pub fn enable(&self) {
if !self.enable.swap(true, Ordering::Release) {
self.waker.wake().unwrap();
}
}
pub fn disable(&self) {
if self.enable.swap(false, Ordering::Release) {
self.waker.wake().unwrap();
}
}
pub async fn join(self) {
drop(self.notifier);
self.waker.wake().unwrap();
self.handle.await.unwrap();
}
}