use std::io;
use std::io::ErrorKind;
use std::os::fd::AsRawFd;
use std::sync::mpsc::{
channel,
sync_channel,
Sender,
SyncSender,
Receiver,
TryRecvError
};
use std::time::{
Duration,
Instant
};
use std::thread::{
Builder,
JoinHandle
};
use mio::{
Interest,
Poll,
Token,
Events,
Waker
};
use mio::unix::SourceFd;
use crate::{
Buffer,
Claw
};
use crate::term::{
EchoDisabler,
InputRequest,
NonBlockingStdin
};
use super::animation::Animator;
use super::Animate;
const SPINNER_THREAD_NAME: &str = "spinner";
pub(super) struct Handle {
lines_tx: Option<Sender<Buffer>>,
status_tx: Option<Sender<Buffer>>,
request_tx: Sender<SyncSender<String>>,
waker: Waker,
handle: Option<JoinHandle<()>>
}
struct Coordinator {
poll: Poll,
lines: Receiver<Buffer>,
msgs: Receiver<Buffer>,
requests: Receiver<SyncSender<String>>,
request: Option<InputRequest<SyncSender<String>>>
}
impl Handle {
pub(crate) fn spawn<A>(claw: Claw, anim: A, msg: Buffer) -> Self
where A: Animate + Send + 'static
{
let (lines_tx, lines_rx) = channel();
let (status_tx, status_rx) = channel();
let (request_tx, request_rx) = channel();
let (coordinator, waker) = Coordinator::new(
lines_rx,
status_rx,
request_rx
).unwrap();
let closure = move || {
coordinator.run(Animator::new(claw, anim, Default::default()), msg)
};
let handle = Builder::new()
.name(SPINNER_THREAD_NAME.to_string())
.spawn(closure)
.expect("should be able to spawn spinner thread");
Self {
lines_tx: Some(lines_tx),
status_tx: Some(status_tx),
request_tx,
waker,
handle: Some(handle)
}
}
pub(crate) fn send_line(&self, buffer: Buffer) {
if let Some(lines_tx) = self.lines_tx.as_ref() {
lines_tx.send(buffer).unwrap();
}
self.wake();
}
pub(crate) fn request_input(&self) -> String {
let (input_tx, input_rx) = sync_channel(1);
if self.request_tx.send(input_tx).is_err() {
panic!("spinner thread failed");
}
self.wake();
match input_rx.recv() {
Ok(s) => s,
Err(_e) => panic!("spinner thread failed or request was superseded")
}
}
pub(crate) fn send_message(&self, buffer: Buffer) {
if let Some(status_tx) = self.status_tx.as_ref() {
status_tx.send(buffer).unwrap();
}
self.wake();
}
fn wake(&self) {
let _ = self.waker.wake();
}
pub(crate) fn finish(mut self) {
self.lines_tx.take();
self.wake();
if let Some(handle) = self.handle.take() {
handle.join().expect("failed to join spinner thread");
}
}
pub(crate) fn clear(self) {}
}
impl Drop for Handle {
fn drop(&mut self) {
self.status_tx.take();
self.wake();
if let Some(handle) = self.handle.take() {
handle.join().expect("failed to join spinner thread");
}
}
}
impl Coordinator {
const WAKER_TOKEN: Token = Token(0);
const STDIN_TOKEN: Token = Token(1);
fn new(
lines: Receiver<Buffer>,
msgs: Receiver<Buffer>,
requests: Receiver<SyncSender<String>>,
) -> io::Result<(Self, Waker)>
{
let poll = Poll::new()?;
let waker = Waker::new(poll.registry(), Self::WAKER_TOKEN)?;
Ok((Self {poll, lines, msgs, requests, request: None}, waker))
}
fn input_buffer(&self) -> Option<&[u8]> {
self.request.as_ref().map(|req| req.buffer.as_slice())
}
fn deregister_stdin(&self) {
let res = self.poll.registry().deregister(
&mut SourceFd(&io::stdin().as_raw_fd())
);
match res {
Ok(()) => {},
Err(e) if e.kind() == ErrorKind::NotFound => {},
Err(_) => {}
}
}
fn run<A: Animate>(mut self, mut animator: Animator<A>, mut msg: Buffer) {
let echo_disabler = EchoDisabler::on_stderr();
let mut events = Events::with_capacity(32);
let mut next_frame_in = animator.init(&msg);
let mut last_step = Instant::now();
let finish = loop {
if let Some(finish) = self.check(&animator, &mut msg) {
break finish;
}
match next_frame_in.saturating_sub(last_step.elapsed()) {
Duration::ZERO => {
next_frame_in = animator.step(&msg, self.input_buffer());
last_step = Instant::now();
},
remaining => next_frame_in = remaining
}
match self.poll.poll(&mut events, Some(next_frame_in)) {
Ok(()) => {},
Err(e) if e.kind() == ErrorKind::Interrupted => {},
Err(e) => panic!("failed to poll with timeout: {e}")
};
for event in &events {
if event.token() == Self::STDIN_TOKEN && event.is_readable() {
if let Some(request) = self.request.as_mut() {
let mut stdin = NonBlockingStdin::unblock().unwrap();
let before = request.buffer.len();
let r = stdin.drain_input(&mut request.buffer).unwrap();
drop(stdin);
if let Some(input_str) = r {
animator.accept_input(
&msg,
&input_str.as_bytes()[before..]
);
let _ = request.sender.try_send(input_str);
drop(self.request.take());
self.deregister_stdin();
}
else {
animator.write_message_line(
&msg,
Some(request.buffer.as_slice())
);
}
}
else {
self.deregister_stdin();
}
}
}
};
match finish {
true => animator.cleanup(&msg),
false => {}
}
echo_disabler.reset();
}
fn check<A: Animate>(
&mut self,
animator: &Animator<A>,
current_msg: &mut Buffer)
-> Option<bool>
{
let mut shutdown = None;
match self.lines.try_recv() {
Ok(line) => {
animator.clear_line();
animator.print_buffer(line);
loop { match self.lines.try_recv() {
Ok(line) => animator.print_buffer(line),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
shutdown = Some(true);
break;
}
}}
animator.write_message_line(current_msg, self.input_buffer());
},
Err(TryRecvError::Empty) => {},
Err(TryRecvError::Disconnected) => shutdown = Some(true)
}
match self.msgs.try_recv() {
Ok(mut msg) => {
loop { match self.msgs.try_recv() {
Ok(newer_msg) => msg = newer_msg,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
shutdown = Some(false);
break;
}
}}
animator.clear_line();
*current_msg = msg;
animator.write_message_line(current_msg, self.input_buffer());
},
Err(TryRecvError::Empty) => {},
Err(TryRecvError::Disconnected) => shutdown = Some(false)
}
match self.requests.try_recv() {
Ok(mut input_sender) => {
loop { match self.requests.try_recv() {
Ok(newer_sender) => input_sender = newer_sender,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
shutdown = Some(false);
break;
}
}}
if self.request.is_none() {
let registered = self.poll.registry().register(
&mut SourceFd(&io::stdin().as_raw_fd()),
Self::STDIN_TOKEN,
Interest::READABLE
);
match registered {
Ok(()) => {},
Err(e) if e.kind() == ErrorKind::AlreadyExists => {},
Err(e) if e.kind() == ErrorKind::PermissionDenied => {
panic!("cannot poll regular file")
},
Err(e) => panic!("unable to poll stdin: {e}")
}
}
InputRequest::replace(&mut self.request, input_sender);
},
Err(TryRecvError::Empty) => {},
Err(TryRecvError::Disconnected) => shutdown = Some(false)
}
shutdown
}
}