use std::{io::{self, Write},
sync::Arc};
use crossterm::{terminal::{self, disable_raw_mode, Clear},
QueueableCommand};
use r3bl_core::{output_device_as_mut,
InputDevice,
LineStateControlSignal,
OutputDevice,
SendRawTerminal,
SharedWriter};
use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use crate::{History,
LineState,
LineStateLiveness,
PauseBuffer,
SafeHistory,
SafeLineState,
SafePauseBuffer,
StdMutex,
CHANNEL_CAPACITY};
const CTRL_C: crossterm::event::Event =
crossterm::event::Event::Key(crossterm::event::KeyEvent::new(
crossterm::event::KeyCode::Char('c'),
crossterm::event::KeyModifiers::CONTROL,
));
const CTRL_D: crossterm::event::Event =
crossterm::event::Event::Key(crossterm::event::KeyEvent::new(
crossterm::event::KeyCode::Char('d'),
crossterm::event::KeyModifiers::CONTROL,
));
pub struct Readline {
pub output_device: OutputDevice,
pub input_device: InputDevice,
pub safe_line_state: SafeLineState,
pub history_sender: UnboundedSender<String>,
pub history_receiver: UnboundedReceiver<String>,
pub safe_history: SafeHistory,
pub safe_is_paused_buffer: SafePauseBuffer,
pub safe_spinner_is_active: Arc<StdMutex<Option<tokio::sync::broadcast::Sender<()>>>>,
}
#[derive(Debug, Error)]
pub enum ReadlineError {
#[error(transparent)]
IO(#[from] io::Error),
#[error("line writers closed")]
Closed,
}
#[derive(Debug, PartialEq, Clone)]
pub enum ReadlineEvent {
Line(String),
Eof,
Interrupted,
Resized,
}
#[derive(Debug, PartialEq, Clone)]
pub enum ControlFlowExtended<T, E> {
ReturnOk(T),
ReturnError(E),
Continue,
}
#[derive(Debug, PartialEq, Clone)]
pub enum ControlFlowLimited<E> {
ReturnError(E),
Continue,
}
pub mod manage_shared_writer_output {
use super::*;
pub fn spawn_task_to_monitor_line_state_signals(
mut line_control_channel_receiver: mpsc::Receiver<LineStateControlSignal>,
safe_line_state: SafeLineState,
output_device: OutputDevice,
safe_is_paused_buffer: SafePauseBuffer,
safe_spinner_is_active: Arc<StdMutex<Option<tokio::sync::broadcast::Sender<()>>>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
let maybe_line_control_signal = line_control_channel_receiver.recv();
if let Some(maybe_line_control_signal) = maybe_line_control_signal.await {
let control_flow = process_line_control_signal(
maybe_line_control_signal,
safe_is_paused_buffer.clone(),
safe_line_state.clone(),
output_device.clone(),
safe_spinner_is_active.clone(),
);
match control_flow {
ControlFlowLimited::ReturnError(_) => {
break;
}
ControlFlowLimited::Continue => {
}
}
}
else {
break;
}
}
})
}
pub fn process_line_control_signal(
line_control_signal: LineStateControlSignal,
self_safe_is_paused_buffer: SafePauseBuffer,
self_safe_line_state: SafeLineState,
output_device: OutputDevice,
self_safe_spinner_is_active: Arc<
StdMutex<Option<tokio::sync::broadcast::Sender<()>>>,
>,
) -> ControlFlowLimited<ReadlineError> {
match line_control_signal {
LineStateControlSignal::Line(buf) => {
let mut line_state = self_safe_line_state.lock().unwrap();
if line_state.is_paused.is_paused() {
let pause_buffer = &mut *self_safe_is_paused_buffer.lock().unwrap();
pause_buffer.push_back(buf);
return ControlFlowLimited::Continue;
}
let term = output_device_as_mut!(output_device);
if let Err(err) = line_state.print_data_and_flush(&buf, term) {
return ControlFlowLimited::ReturnError(err);
}
if let Err(err) = term.flush() {
return ControlFlowLimited::ReturnError(err.into());
}
}
LineStateControlSignal::Flush => {
let is_paused = self_safe_line_state.lock().unwrap().is_paused;
let term = output_device_as_mut!(output_device);
let line_state = self_safe_line_state.lock().unwrap();
let _ = flush_internal(
self_safe_is_paused_buffer,
is_paused,
line_state,
term,
);
}
LineStateControlSignal::Pause => {
let new_value = LineStateLiveness::Paused;
let term = output_device_as_mut!(output_device);
let mut line_state = self_safe_line_state.lock().unwrap();
if line_state.set_paused(new_value, term).is_err() {
return ControlFlowLimited::ReturnError(ReadlineError::IO(
io::Error::new(io::ErrorKind::Other, "failed to pause terminal"),
));
};
}
LineStateControlSignal::Resume => {
let new_value = LineStateLiveness::NotPaused;
let mut line_state = self_safe_line_state.lock().unwrap();
let term = output_device_as_mut!(output_device);
if line_state.set_paused(new_value, term).is_err() {
return ControlFlowLimited::ReturnError(ReadlineError::IO(
io::Error::new(io::ErrorKind::Other, "failed to resume terminal"),
));
};
let _ = flush_internal(
self_safe_is_paused_buffer,
new_value,
line_state,
term,
);
}
LineStateControlSignal::SpinnerActive(spinner_shutdown_sender) => {
let mut spinner_is_active = self_safe_spinner_is_active.lock().unwrap();
*spinner_is_active = Some(spinner_shutdown_sender);
}
LineStateControlSignal::SpinnerInactive => {
let mut spinner_is_active = self_safe_spinner_is_active.lock().unwrap();
_ = spinner_is_active.take();
}
}
ControlFlowLimited::Continue
}
pub fn flush_internal(
self_safe_is_paused_buffer: SafePauseBuffer,
is_paused: LineStateLiveness,
mut line_state: std::sync::MutexGuard<LineState>,
term: &mut SendRawTerminal,
) -> Result<(), ReadlineError> {
if is_paused.is_paused() {
return Ok(());
}
let is_paused_buffer: String = {
let it: Vec<Vec<u8>> = self_safe_is_paused_buffer
.lock()
.unwrap()
.drain(..)
.collect::<Vec<Vec<u8>>>();
let it: Vec<String> = it
.iter()
.map(|buf| String::from_utf8_lossy(buf).to_string())
.collect();
it.join("")
};
line_state.print_data_and_flush(is_paused_buffer.as_bytes(), term)?;
line_state.clear_and_render_and_flush(term)?;
Ok(())
}
}
impl Drop for Readline {
fn drop(&mut self) {
let term = output_device_as_mut!(self.output_device);
_ = self.safe_line_state.lock().unwrap().exit(term);
_ = disable_raw_mode();
}
}
impl Readline {
pub fn new(
prompt: String,
output_device: OutputDevice,
input_device: InputDevice,
) -> Result<(Self, SharedWriter), ReadlineError> {
let line_state_control_channel =
mpsc::channel::<LineStateControlSignal>(CHANNEL_CAPACITY);
let (line_control_channel_sender, line_state_control_channel_receiver) =
line_state_control_channel;
terminal::enable_raw_mode()?;
let (history, history_receiver) = History::new();
let history_sender = history.sender.clone();
let safe_history = Arc::new(StdMutex::new(history));
let line_state = LineState::new(prompt, terminal::size()?);
let safe_line_state = Arc::new(StdMutex::new(line_state));
let is_paused_buffer = PauseBuffer::new();
let safe_is_paused_buffer = Arc::new(StdMutex::new(is_paused_buffer));
let safe_spinner_is_active = Arc::new(StdMutex::new(None));
manage_shared_writer_output::spawn_task_to_monitor_line_state_signals(
line_state_control_channel_receiver,
safe_line_state.clone(),
output_device.clone(),
safe_is_paused_buffer.clone(),
safe_spinner_is_active.clone(),
);
let readline = Readline {
output_device: output_device.clone(),
input_device,
safe_line_state: safe_line_state.clone(),
history_sender,
history_receiver,
safe_history,
safe_is_paused_buffer,
safe_spinner_is_active,
};
let term = output_device_as_mut!(output_device);
readline
.safe_line_state
.lock()
.unwrap()
.render_and_flush(term)?;
term.queue(terminal::EnableLineWrap)?;
term.flush()?;
let shared_writer = SharedWriter::new(line_control_channel_sender);
Ok((readline, shared_writer))
}
pub fn update_prompt(&mut self, prompt: &str) -> Result<(), ReadlineError> {
let term = output_device_as_mut!(self.output_device);
self.safe_line_state
.lock()
.unwrap()
.update_prompt(prompt, term)?;
Ok(())
}
pub fn clear(&mut self) -> Result<(), ReadlineError> {
let term = output_device_as_mut!(self.output_device);
term.queue(Clear(terminal::ClearType::All))?;
self.safe_line_state
.lock()
.unwrap()
.clear_and_render_and_flush(term)?;
term.flush()?;
Ok(())
}
pub fn set_max_history(&mut self, max_size: usize) {
let mut history = self.safe_history.lock().unwrap();
history.max_size = max_size;
history.entries.truncate(max_size);
}
pub fn should_print_line_on(&mut self, enter: bool, control_c: bool) {
let mut line_state = self.safe_line_state.lock().unwrap();
line_state.should_print_line_on_enter = enter;
line_state.should_print_line_on_control_c = control_c;
}
pub async fn readline(&mut self) -> miette::Result<ReadlineEvent, ReadlineError> {
loop {
tokio::select! {
result_crossterm_event = self.input_device.next() => {
match readline_internal::apply_event_to_line_state_and_render(
result_crossterm_event,
self.safe_line_state.clone(),
output_device_as_mut!(self.output_device),
self.safe_history.clone(),
self.safe_spinner_is_active.clone(),
) {
ControlFlowExtended::ReturnOk(ok_value) => {
return Ok(ok_value);
},
ControlFlowExtended::ReturnError(err_value) => {
return Err(err_value);
},
ControlFlowExtended::Continue => {}
}
},
maybe_line = self.history_receiver.recv() => {
self.safe_history.lock().unwrap().update(maybe_line);
}
}
}
}
pub fn add_history_entry(&mut self, entry: String) -> Option<()> {
self.history_sender.send(entry).ok()
}
}
pub mod readline_internal {
use super::*;
pub fn apply_event_to_line_state_and_render(
result_crossterm_event: miette::Result<crossterm::event::Event>,
self_line_state: SafeLineState,
term: &mut dyn Write,
self_safe_history: SafeHistory,
self_safe_is_spinner_active: Arc<
StdMutex<Option<tokio::sync::broadcast::Sender<()>>>,
>,
) -> ControlFlowExtended<ReadlineEvent, ReadlineError> {
match result_crossterm_event {
Ok(crossterm_event) => {
let mut line_state = self_line_state.lock().unwrap();
let is_spinner_active =
self_safe_is_spinner_active.lock().unwrap().take();
if crossterm_event == CTRL_C || crossterm_event == CTRL_D {
if let Some(spinner_shutdown_sender) = is_spinner_active {
let _ = spinner_shutdown_sender.send(());
return ControlFlowExtended::Continue;
}
}
let result_maybe_readline_event = line_state.apply_event_and_render(
crossterm_event,
term,
self_safe_history,
);
match result_maybe_readline_event {
Ok(maybe_readline_event) => {
if let Some(readline_event) = maybe_readline_event {
return ControlFlowExtended::ReturnOk(readline_event);
}
}
Err(e) => return ControlFlowExtended::ReturnError(e),
}
}
Err(report) => {
return ControlFlowExtended::ReturnError(ReadlineError::IO(
io::Error::new(io::ErrorKind::Other, format!("{report}")),
));
}
}
ControlFlowExtended::Continue
}
}
#[cfg(test)]
pub mod test_fixtures {
use crossterm::event::{Event, KeyCode, KeyEvent, KeyModifiers};
use r3bl_core::CrosstermEventResult;
pub(super) fn get_input_vec() -> Vec<CrosstermEventResult> {
vec![
Ok(Event::Key(KeyEvent::new(
KeyCode::Char('a'),
KeyModifiers::NONE,
))),
Ok(Event::Key(KeyEvent::new(
KeyCode::Char('b'),
KeyModifiers::NONE,
))),
Ok(Event::Key(KeyEvent::new(
KeyCode::Char('c'),
KeyModifiers::NONE,
))),
Ok(Event::Key(KeyEvent::new(
KeyCode::Enter,
KeyModifiers::NONE,
))),
]
}
}
#[cfg(test)]
mod test_readline {
use r3bl_ansi_color::{is_fully_uninteractive_terminal, TTYResult};
use r3bl_test_fixtures::{output_device_ext::OutputDeviceExt as _,
InputDeviceExt as _};
use test_fixtures::get_input_vec;
use super::*;
use crate::{LineStateLiveness, StdMutex};
#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_readline_internal_process_event_and_terminal_output() {
let vec = get_input_vec();
let mut iter = vec.iter();
let prompt_str = "> ";
if let TTYResult::IsNotInteractive = is_fully_uninteractive_terminal() {
return;
}
let (output_device, stdout_mock) = OutputDevice::new_mock();
let input_device = InputDevice::new_mock(get_input_vec());
let (readline, _) = Readline::new(
prompt_str.into(),
output_device.clone(),
input_device,
)
.unwrap();
let safe_is_spinner_active = Arc::new(StdMutex::new(None));
let history = History::new();
let safe_history = Arc::new(StdMutex::new(history.0));
let Some(Ok(event)) = iter.next() else {
panic!();
};
let control_flow = readline_internal::apply_event_to_line_state_and_render(
Ok(event.clone()),
readline.safe_line_state.clone(),
output_device_as_mut!(output_device),
safe_history.clone(),
safe_is_spinner_active.clone(),
);
assert!(matches!(control_flow, ControlFlowExtended::Continue));
assert_eq!(readline.safe_line_state.lock().unwrap().line, "a");
let output_buffer_data = stdout_mock.get_copy_of_buffer_as_string_strip_ansi();
assert!(output_buffer_data.contains("> a"));
}
#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_readline() {
let prompt_str = "> ";
if let TTYResult::IsNotInteractive = is_fully_uninteractive_terminal() {
return;
}
let (output_device, stdout_mock) = OutputDevice::new_mock();
let input_device = InputDevice::new_mock(get_input_vec());
let (mut readline, _) = Readline::new(
prompt_str.into(),
output_device.clone(),
input_device,
)
.unwrap();
let result = readline.readline().await;
assert!(matches!(result, Ok(ReadlineEvent::Line(_))));
pretty_assertions::assert_eq!(
result.unwrap(),
ReadlineEvent::Line("abc".to_string())
);
pretty_assertions::assert_eq!(readline.safe_line_state.lock().unwrap().line, "");
let output_buffer_data = stdout_mock.get_copy_of_buffer_as_string_strip_ansi();
assert!(output_buffer_data.contains("> abc"));
}
#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_pause_resume() {
let prompt_str = "> ";
if let TTYResult::IsNotInteractive = is_fully_uninteractive_terminal() {
return;
}
let (output_device, _) = OutputDevice::new_mock();
let input_device = InputDevice::new_mock(get_input_vec());
let (readline, shared_writer) = Readline::new(
prompt_str.into(),
output_device.clone(),
input_device,
)
.unwrap();
shared_writer
.line_state_control_channel_sender
.send(LineStateControlSignal::Pause)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
assert_eq!(
readline.safe_line_state.lock().unwrap().is_paused,
LineStateLiveness::Paused
);
shared_writer
.line_state_control_channel_sender
.send(LineStateControlSignal::Resume)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
assert_eq!(
readline.safe_line_state.lock().unwrap().is_paused,
LineStateLiveness::NotPaused
);
}
#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_pause_resume_with_output() {
let prompt_str = "> ";
if let TTYResult::IsNotInteractive = is_fully_uninteractive_terminal() {
return;
}
let (output_device, _) = OutputDevice::new_mock();
let input_device = InputDevice::new_mock(get_input_vec());
let (readline, shared_writer) = Readline::new(
prompt_str.into(),
output_device.clone(),
input_device,
)
.unwrap();
shared_writer
.line_state_control_channel_sender
.send(LineStateControlSignal::Pause)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
assert_eq!(
readline.safe_line_state.lock().unwrap().is_paused,
LineStateLiveness::Paused
);
shared_writer
.line_state_control_channel_sender
.send(LineStateControlSignal::Line("abc".into()))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
let pause_buffer = readline.safe_is_paused_buffer.lock().unwrap().clone();
assert_eq!(pause_buffer.len(), 1);
assert_eq!(String::from_utf8_lossy(&pause_buffer[0]), "abc".to_string());
shared_writer
.line_state_control_channel_sender
.send(LineStateControlSignal::Resume)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
assert_eq!(
readline.safe_line_state.lock().unwrap().is_paused,
LineStateLiveness::NotPaused
);
}
}
#[cfg(test)]
mod test_streams {
use r3bl_test_fixtures::gen_input_stream;
use test_streams::test_fixtures::get_input_vec;
use super::*;
#[tokio::test]
#[allow(clippy::needless_return)]
async fn test_generate_event_stream_pinned() {
use futures_util::StreamExt;
let mut count = 0;
let mut it = gen_input_stream(get_input_vec());
while let Some(event) = it.next().await {
let lhs = event.unwrap();
let rhs = get_input_vec()[count].as_ref().unwrap().clone();
assert_eq!(lhs, rhs);
count += 1;
}
}
}
#[cfg(test)]
mod test_pause_and_resume_support {
use std::{collections::VecDeque, sync::Mutex};
use manage_shared_writer_output::flush_internal;
use r3bl_test_fixtures::StdoutMock;
use super::*;
use crate::LineStateLiveness;
#[test]
fn test_flush_internal_paused() {
let safe_line_state =
Arc::new(Mutex::new(LineState::new("> ".to_string(), (100, 100))));
let mut pause_buffer = VecDeque::new();
pause_buffer.push_back(b"Paused line 1".to_vec());
pause_buffer.push_back(b"Paused line 2".to_vec());
let safe_is_paused_buffer = Arc::new(Mutex::new(pause_buffer));
let mut stdout_mock = StdoutMock::default();
let line_state = safe_line_state.lock().unwrap();
let result = flush_internal(
safe_is_paused_buffer.clone(),
LineStateLiveness::Paused,
line_state,
&mut stdout_mock,
);
assert!(result.is_ok());
assert_eq!(stdout_mock.get_copy_of_buffer_as_string_strip_ansi(), "");
}
#[test]
fn test_flush_internal_not_paused() {
let safe_line_state =
Arc::new(Mutex::new(LineState::new("> ".to_string(), (100, 100))));
let mut pause_buffer = VecDeque::new();
pause_buffer.push_back(b"Paused line 1".to_vec());
pause_buffer.push_back(b"Paused line 2".to_vec());
let safe_is_paused_buffer = Arc::new(Mutex::new(pause_buffer));
let mut stdout_mock = StdoutMock::default();
let line_state = safe_line_state.lock().unwrap();
let result = flush_internal(
safe_is_paused_buffer.clone(),
LineStateLiveness::NotPaused,
line_state,
&mut stdout_mock,
);
assert!(result.is_ok());
assert_eq!(
stdout_mock.get_copy_of_buffer_as_string_strip_ansi(),
"Paused line 1Paused line 2\n> > "
);
}
}