mod os_specific_apis;
mod read_provider;
pub mod replacement;
mod state;
use crate::{
app_name_to_prefix,
errors::LisaError,
input::{
InputProvider, TerminalInputEvent,
autocomplete::AutocompleteProvider,
history::HistoryProvider,
stdin::{
os_specific_apis::{
CachedModeType, default_cached_mode, disable_raw_mode, enable_raw_mode,
os_pre_reqs, raise_sigint,
},
read_provider::ReadProvider,
replacement::parse_character_replacements,
state::{StdinInputState, transaction::InputStateTransaction},
},
},
};
use arboard::Clipboard;
use fnv::FnvHashMap;
use parking_lot::Mutex;
#[cfg(test)]
use std::collections::VecDeque;
use std::{
env::var as env_var,
hash::BuildHasherDefault,
io::Stdin,
sync::{
OnceLock,
atomic::{AtomicBool, Ordering},
},
};
type NewlineHookFn = dyn Fn(&str) -> bool + Send + Sync;
static CLIPBOARD_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
#[cfg(test)]
static TEST_OVERRIDE_CLIPBOARD: OnceLock<Mutex<VecDeque<String>>> = OnceLock::new();
pub struct StdinInputProvider<ReadTy: ReadProvider> {
active: AtomicBool,
autocomplete_provider: Option<Box<dyn AutocompleteProvider>>,
cached_mode: CachedModeType,
current_input: StdinInputState,
completed_inputs: Vec<String>,
history_provider: Option<Box<dyn HistoryProvider>>,
in_raw_mode: AtomicBool,
newline_hook: Option<Box<NewlineHookFn>>,
read_provider: ReadTy,
replacements: FnvHashMap<char, Option<char>>,
}
impl StdinInputProvider<Stdin> {
pub fn new(app_name: &'static str) -> Result<Self, LisaError> {
os_pre_reqs()?;
let environment_prefix = app_name_to_prefix(app_name);
let replacements =
if let Ok(env_var_value) = env_var(format!("{environment_prefix}_STTY_REPLACEMENTS")) {
parse_character_replacements(&env_var_value)?
} else {
FnvHashMap::with_capacity_and_hasher(0, BuildHasherDefault::default())
};
Ok(Self {
active: AtomicBool::new(false),
autocomplete_provider: None,
cached_mode: default_cached_mode(),
current_input: StdinInputState::new(),
completed_inputs: Vec::with_capacity(1),
history_provider: None,
in_raw_mode: AtomicBool::new(false),
newline_hook: None,
read_provider: std::io::stdin(),
replacements,
})
}
}
impl<ReadTy: ReadProvider> StdinInputProvider<ReadTy> {
pub fn new_with_provider(app_name: &'static str, provider: ReadTy) -> Result<Self, LisaError> {
os_pre_reqs()?;
let environment_prefix = app_name_to_prefix(app_name);
let replacements =
if let Ok(env_var_value) = env_var(format!("{environment_prefix}_STTY_REPLACEMENTS")) {
parse_character_replacements(&env_var_value)?
} else {
FnvHashMap::with_capacity_and_hasher(0, BuildHasherDefault::default())
};
Ok(Self {
active: AtomicBool::new(false),
autocomplete_provider: None,
cached_mode: default_cached_mode(),
current_input: StdinInputState::new(),
completed_inputs: Vec::with_capacity(1),
history_provider: None,
in_raw_mode: AtomicBool::new(false),
newline_hook: None,
read_provider: provider,
replacements,
})
}
pub fn manually_poll_input_and_consume(&self) -> Result<String, LisaError> {
self.read_provider.non_blocking_read().map(|string| {
string
.chars()
.filter_map(|character| {
if let Some(replacement) = self.replacements.get(&character) {
*replacement
} else {
Some(character)
}
})
.collect::<String>()
})
}
pub fn set_newline_hook(&mut self, hook: Box<dyn Fn(&str) -> bool + Send + Sync>) {
_ = self.newline_hook.insert(hook);
}
fn process_inputs(&mut self, item: &str, ansi_supported: bool) -> Vec<TerminalInputEvent> {
let mut input_events = Vec::with_capacity(0);
let had_started = self.current_input.started();
let append_ledger = self.calculate_append_ledger(&mut input_events, item, ansi_supported);
let item_events = if let Some(ledger) = append_ledger {
if ledger.is_empty() {
Vec::with_capacity(0)
} else if ledger.len() == 1
&& let Some(char) = ledger.chars().next()
{
vec![TerminalInputEvent::InputAppend(char)]
} else if !ledger.is_empty() {
vec![TerminalInputEvent::InputMassAppend(ledger)]
} else {
Vec::with_capacity(0)
}
} else if !input_events.last().is_some_and(|ie| {
matches!(
ie,
TerminalInputEvent::CursorMoveLeft(_) | TerminalInputEvent::CursorMoveRight(_)
)
}) {
input_events.truncate(0);
if had_started {
vec![TerminalInputEvent::InputChanged(
self.current_input.cursor_position(),
)]
} else {
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputChanged(self.current_input.cursor_position()),
]
}
} else {
let buffer = self.current_input();
if buffer.is_empty() {
Vec::with_capacity(0)
} else {
input_events.truncate(0);
if had_started {
vec![TerminalInputEvent::InputChanged(
self.current_input.cursor_position(),
)]
} else {
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputChanged(self.current_input.cursor_position()),
]
}
}
};
input_events.extend(item_events);
input_events
}
fn calculate_append_ledger(
&mut self,
input_events: &mut Vec<TerminalInputEvent>,
item: &str,
ansi_supported: bool,
) -> Option<String> {
let mut processing = self.current_input.process();
let mut processed_amount = 0_usize;
for character in item.chars() {
processed_amount += 1;
if Self::route_character(
self.autocomplete_provider.as_deref(),
self.history_provider.as_deref(),
self.newline_hook.as_deref(),
ansi_supported,
character,
&mut processing,
) {
std::mem::drop(processing);
self.raise_sigint();
return None;
}
if let Some((move_left, move_amount, buffer, original_position)) = processing
.did_trigger_multibyte_cursor(
self.autocomplete_provider.as_deref(),
self.history_provider.as_deref(),
ansi_supported,
) {
if let Some(buff) = buffer {
if buff.is_empty() {
} else if buff.len() == 1 {
input_events.push(TerminalInputEvent::InputAppend(
buff.chars().next().unwrap_or_else(|| unreachable!()),
));
} else {
input_events.push(TerminalInputEvent::InputMassAppend(buff));
}
} else if processed_amount > 0 {
input_events.push(TerminalInputEvent::InputChanged(original_position));
}
if move_left {
input_events.push(TerminalInputEvent::CursorMoveLeft(move_amount));
} else {
input_events.push(TerminalInputEvent::CursorMoveRight(move_amount));
}
}
if processing.did_trigger_multibyte_paste()
&& let Some(data) = Self::get_clipboard()
{
processing.new_string_action(&data, self.history_provider.as_deref());
}
if processing.did_trigger_start() {
input_events.push(TerminalInputEvent::InputStarted);
}
if processing.did_trigger_cancel() {
if input_events.last() == Some(&TerminalInputEvent::InputStarted) {
input_events.pop();
} else {
input_events.push(TerminalInputEvent::InputCancelled);
}
}
if processing.did_trigger_clear() {
input_events.push(TerminalInputEvent::ClearScreen);
input_events.push(TerminalInputEvent::InputChanged(
processing.cursor_position(),
));
}
if processing.did_trigger_pause() {
input_events.push(TerminalInputEvent::ToggleOutputPause);
}
if let Some((append_buffer, finished)) = processing.did_trigger_finish() {
if let Some(hprovider) = self.history_provider.as_ref() {
hprovider.insert_command(&finished);
}
self.completed_inputs.push(finished);
if let Some(append) = append_buffer {
if append.len() == 1
&& let Some(character) = append.chars().next()
{
input_events.push(TerminalInputEvent::InputAppend(character));
} else if !append.is_empty() {
input_events.push(TerminalInputEvent::InputMassAppend(append));
}
} else {
input_events.push(TerminalInputEvent::InputChanged(
processing.cursor_position(),
));
}
input_events.push(TerminalInputEvent::InputFinished);
}
}
processing.finished_processing()
}
fn route_character(
autocomplete_provider: Option<&dyn AutocompleteProvider>,
history_provider: Option<&dyn HistoryProvider>,
newline_hook: Option<&NewlineHookFn>,
ansi_supported: bool,
character: char,
processing: &mut InputStateTransaction<'_>,
) -> bool {
match character {
'\u{0}' => {}
'\u{7f}' | '\u{8}' => processing.backspace(),
'\u{3}' => {
if !processing.cancel() {
return true;
}
}
'\u{4}' => {
return true;
}
'\u{16}' => {
if let Some(data) = Self::get_clipboard() {
processing.new_string_action(&data, history_provider);
}
}
'\u{10}' => processing.previous_history_action(history_provider),
'\u{e}' => processing.next_history_action(history_provider),
'\u{c}' => processing.clear(),
'\u{13}' => processing.pause_output(),
'\u{2}' => {
if ansi_supported {
_ = processing.move_cursor_left(1);
}
}
'\u{6}' => {
if ansi_supported {
_ = processing.move_cursor_right(1);
}
}
'\u{1}' => {
if ansi_supported {
_ = processing.move_cursor_left(processing.cursor_position() + 1);
}
}
'\u{5}' => {
if ansi_supported {
_ = processing.move_cursor_right(processing.current_input().len());
}
}
'\u{15}' => {
if ansi_supported {
processing.uwk_cursor_to_begin();
}
}
'\u{b}' => {
if ansi_supported {
processing.uwk_cursor_to_end();
}
}
'\u{17}' => {
if ansi_supported {
processing.uwk_cursor_word();
}
}
'\u{19}' => {
if ansi_supported {
processing.paste_uwk();
}
}
'\u{14}' => {
if ansi_supported {
processing.swap_last_two_characters();
}
}
'\u{7}' => _ = processing.cancel(),
'\u{12}' => {
if ansi_supported {
processing.start_history_search_action(history_provider);
}
}
'\n' => processing.complete(),
'\r' => {
if let Some(hook) = newline_hook {
if (*hook)(processing.current_input().as_ref()) {
processing.complete();
} else {
processing.new_character_action('\n', history_provider);
}
} else {
processing.complete();
}
}
'\t' => processing.tab_action(autocomplete_provider, history_provider, true, false),
_ => processing.new_character_action(character, history_provider),
}
false
}
fn get_clipboard() -> Option<String> {
#[cfg(test)]
{
let mut clipboard = TEST_OVERRIDE_CLIPBOARD
.get_or_init(|| Mutex::new(VecDeque::new()))
.lock();
if let Some(value) = clipboard.pop_front() {
return Some(value);
}
}
let guard = CLIPBOARD_LOCK.get_or_init(|| Mutex::new(())).lock();
let Ok(mut clipboard) = Clipboard::new() else {
return None;
};
let text = clipboard.get_text().ok();
std::mem::drop(guard);
text
}
fn enable_raw_mode(&mut self) -> Result<(), LisaError> {
if self.in_raw_mode.swap(true, Ordering::AcqRel) {
return Ok(());
}
self.cached_mode = enable_raw_mode()?;
Ok(())
}
fn disable_raw_mode(&mut self) -> Result<(), LisaError> {
if !self.in_raw_mode.swap(false, Ordering::AcqRel) {
return Ok(());
}
self.cached_mode = disable_raw_mode(self.cached_mode)?;
Ok(())
}
fn raise_sigint(&mut self) {
_ = self.disable_raw_mode();
if let Some(hprovider) = self.history_provider.as_ref() {
hprovider.attempt_to_do_full_sync();
}
raise_sigint();
}
}
impl<ReadTy: ReadProvider> InputProvider for StdinInputProvider<ReadTy> {
fn is_stdin(&self) -> bool {
true
}
fn is_active(&self) -> bool {
self.active.load(Ordering::Relaxed)
}
fn set_active(&mut self, active: bool) -> Result<(), LisaError> {
if active {
self.enable_raw_mode()?;
} else {
self.disable_raw_mode()?;
}
self.active.store(active, Ordering::SeqCst);
Ok(())
}
fn input_in_progress(&self) -> bool {
self.current_input.started()
}
fn current_input(&self) -> String {
self.current_input.input()
}
fn poll_for_input(&mut self, ansi_supported: bool) -> Vec<TerminalInputEvent> {
let read_value = self.read_provider.non_blocking_read().map(|string| {
string
.chars()
.filter_map(|character| {
if let Some(replacement) = self.replacements.get(&character) {
*replacement
} else {
Some(character)
}
})
.collect::<String>()
});
match read_value {
Ok(item) => self.process_inputs(&item, ansi_supported),
Err(_cause) => Vec::with_capacity(0),
}
}
fn set_autocomplete_provider(&mut self, autocomplete: Box<dyn AutocompleteProvider>) {
_ = self.autocomplete_provider.insert(autocomplete);
}
fn current_autocomplete_suggestion(&self) -> Option<String> {
self.autocomplete_provider
.as_ref()
.and_then(|ap| ap.get_displayable_suggestion(self.current_input.input()))
}
fn autocomplete_suggestion_pending(&self) -> bool {
self.current_input.autocomplete_pending()
}
fn set_history_provider(&mut self, history: Box<dyn HistoryProvider>) {
_ = self.history_provider.insert(history);
}
fn current_history_search_value(&self) -> Option<String> {
self.current_input.current_history_search_value()
}
fn is_doing_history_search(&self) -> bool {
self.current_input.is_doing_history_search()
}
fn inputs(&mut self) -> Vec<String> {
self.completed_inputs.drain(..).collect()
}
}
impl<ReadTy: ReadProvider> Drop for StdinInputProvider<ReadTy> {
fn drop(&mut self) {
if self.in_raw_mode.load(Ordering::Relaxed) {
_ = self.disable_raw_mode();
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum MultibyteCursorControl {
ShiftTab,
UpArrow,
DownArrow,
LeftArrow,
RightArrow,
ShiftLeftArrow,
ShiftRightArrow,
}
#[cfg(test)]
pub mod test_helpers {
use super::*;
pub fn insert_next_paste_value(new_value: String) {
let mut clipboard = TEST_OVERRIDE_CLIPBOARD
.get_or_init(|| Mutex::new(VecDeque::new()))
.lock();
clipboard.push_back(new_value);
}
}
#[cfg(test)]
mod unit_tests {
use super::{test_helpers::insert_next_paste_value, *};
use crate::input::{
autocomplete::test_helpers::AutocompleteSuggestionList, history::SimpleHistoryProvider,
};
use parking_lot::Mutex;
use tokio::sync::mpsc::channel;
#[test]
pub fn always_stdin() {
{
let provider = StdinInputProvider::new("test")
.expect("Failed to create new stdin input provider!");
assert!(provider.is_stdin());
}
{
let (_sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
assert!(provider.is_stdin());
}
}
#[tokio::test]
pub async fn manually_poll_for_input_consumes() {
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
sender
.send("abcdefghijklmnopqrstuvwxyz".to_owned())
.await
.expect("Failed to send example input");
assert_eq!(
provider
.manually_poll_input_and_consume()
.expect("Failed to manually poll input and consume"),
"abcdefghijklmnopqrstuvwxyz",
);
assert_eq!(provider.poll_for_input(true), Vec::with_capacity(0));
assert!(provider.manually_poll_input_and_consume().is_err());
}
#[tokio::test]
pub async fn simple_type_enter_with_hooks_and_without_hooks() {
{
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
sender
.send("abcdefghijklmnopqrstuvwxyz(\r".to_owned())
.await
.expect("Failed to send example input");
assert_eq!(
provider.poll_for_input(true),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputMassAppend("abcdefghijklmnopqrstuvwxyz(".to_owned()),
TerminalInputEvent::InputFinished,
],
);
provider.set_newline_hook(Box::new(|data: &str| -> bool {
data.chars().filter(|character| *character == '(').count()
== data.chars().filter(|character| *character == ')').count()
}));
sender
.send("abcdefghijklmnopqrstuvwxyz(\r".to_owned())
.await
.expect("Failed to send example input");
assert_eq!(
provider.poll_for_input(true),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputMassAppend("abcdefghijklmnopqrstuvwxyz(\n".to_owned()),
],
);
sender
.send(")\r".to_owned())
.await
.expect("Failed to send example input!");
assert_eq!(
provider.poll_for_input(true),
vec![
TerminalInputEvent::InputAppend(')'),
TerminalInputEvent::InputFinished,
],
);
}
{
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
sender
.send("abcdefghijklmnopqrstuvwxyz(\r".to_owned())
.await
.expect("Failed to send example input");
assert_eq!(
provider.poll_for_input(false),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputMassAppend("abcdefghijklmnopqrstuvwxyz(".to_owned()),
TerminalInputEvent::InputFinished,
],
);
provider.set_newline_hook(Box::new(|data: &str| -> bool {
data.chars().filter(|character| *character == '(').count()
== data.chars().filter(|character| *character == ')').count()
}));
sender
.send("abcdefghijklmnopqrstuvwxyz(\r".to_owned())
.await
.expect("Failed to send example input");
assert_eq!(
provider.poll_for_input(false),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputMassAppend("abcdefghijklmnopqrstuvwxyz(\n".to_owned()),
],
);
sender
.send(")\r".to_owned())
.await
.expect("Failed to send example input!");
assert_eq!(
provider.poll_for_input(false),
vec![
TerminalInputEvent::InputAppend(')'),
TerminalInputEvent::InputFinished,
],
);
}
}
#[tokio::test]
pub async fn respects_replacements() {
unsafe {
std::env::set_var(
"TEST_TOTALLY_UNIQUE_RESPECTS_REPLACEMENTS_STTY_REPLACEMENTS",
"z=0xd",
);
}
let mut expected_replacement_map = FnvHashMap::default();
expected_replacement_map.insert('z', Some('\r'));
{
let state = StdinInputProvider::new("test-totally-unique-respects-replacements")
.expect("Failed to create stdininputprovider");
assert_eq!(state.replacements, expected_replacement_map);
}
{
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider(
"test-totally-unique-respects-replacements",
read_provider,
)
.expect("Failed to create new stdin input provider!");
sender
.send("abcdefghijklmnopqrstuvwxy(z".to_owned())
.await
.expect("Failed to send example input");
assert_eq!(
provider.poll_for_input(false),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputMassAppend("abcdefghijklmnopqrstuvwxy(".to_owned()),
TerminalInputEvent::InputFinished,
],
);
sender
.send("abcdefghijklmnopqrstuvwxy(z".to_owned())
.await
.expect("Failed to send example input");
assert_eq!(
provider
.manually_poll_input_and_consume()
.expect("Failed to manually poll input and consume!"),
"abcdefghijklmnopqrstuvwxy(\r".to_owned(),
);
}
unsafe {
std::env::set_var(
"TEST_TOTALLY_UNIQUE_RESPECTS_REPLACEMENTS_STTY_REPLACEMENTS",
"undef=",
);
}
assert!(
StdinInputProvider::new("test-totally-unique-respects-replacements").is_err(),
"Invalid STTY replacements did not error `StdinInputProvider`",
);
assert!(
StdinInputProvider::new_with_provider(
"test-totally-unique-respects-replacements",
std::io::stdin(),
)
.is_err(),
"Invalid STTY replacements did not error `StdinInputProvider`",
);
}
#[tokio::test]
pub async fn append_ledger_to_events() {
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
sender
.send("a\u{7f}".to_owned())
.await
.expect("Failed to start input!");
_ = provider.poll_for_input(true);
sender
.send("".to_owned())
.await
.expect("Failed to send input");
assert_eq!(provider.poll_for_input(true), vec![]);
sender
.send("a".to_owned())
.await
.expect("Failed to send input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputAppend('a')]
);
sender
.send("bcd".to_owned())
.await
.expect("Failed to send input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputMassAppend("bcd".to_owned())],
);
sender
.send("\u{7f}a".to_owned())
.await
.expect("failed to send input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(4)],
);
sender
.send("\u{7f}abcdef\u{1b}[D\u{7f}c".to_owned())
.await
.expect("");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(8)],
);
}
#[tokio::test]
pub async fn cursor_operations_no_happen_when_no_ansi() {
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
sender
.send("abcdefghijklmnopqrstuvwxyz".to_owned())
.await
.expect("Failed to send base input data");
_ = provider.poll_for_input(false);
sender.send(
"\u{2}\u{2}\u{2}\u{6}\u{5}\u{1}\u{6}\u{15}\u{19}\u{17}\u{19}\u{b}\u{14}\u{12}\u{1b}[C\u{1b}[C\u{1b}[D\u{1b}[1;2C\u{1b}[1;2D".to_owned()
).await.expect("Failed to send input data!");
assert_eq!(provider.poll_for_input(false), Vec::with_capacity(0));
assert_eq!(provider.current_input.cursor_position(), 26);
sender
.send("\u{3}".to_owned())
.await
.expect("Failed to send cancel!");
_ = provider.poll_for_input(true);
sender
.send("abcdefghijklmnopqrstuvwxyz".to_owned())
.await
.expect("Failed to send base input data");
_ = provider.poll_for_input(false);
sender.send(
"\u{2}\u{2}\u{2}\u{6}\u{5}\u{1}\u{6}\u{15}\u{19}\u{17}\u{19}\u{b}\u{14}\u{12}\u{1b}[C\u{1b}[C\u{1b}[D\u{1b}[1;2C\u{1b}[1;2D".to_owned()
).await.expect("Failed to send input data!");
assert_eq!(
provider.poll_for_input(true),
vec![
TerminalInputEvent::InputChanged(26),
TerminalInputEvent::CursorMoveLeft(1),
TerminalInputEvent::InputChanged(26),
TerminalInputEvent::CursorMoveRight(1),
TerminalInputEvent::CursorMoveLeft(26),
],
);
assert_eq!(provider.current_input.cursor_position(), 0);
}
#[cfg(feature = "tests-with-signals")]
#[tokio::test]
pub async fn ctrl_c_and_ctrl_d() {
use std::sync::{Arc, atomic::AtomicU8};
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
let ctrl_c_handler = Arc::new(AtomicU8::new(0));
let cloned_handler = ctrl_c_handler.clone();
ctrlc::set_handler(move || {
cloned_handler.fetch_add(1, Ordering::SeqCst);
})
.expect("Failed to set ctrl-c handler!");
sender
.send("abcdefghijklmnopqrstuvwxyz\u{3}".to_owned())
.await
.expect("Failed to send first input + ctrl-c");
_ = provider.poll_for_input(true);
assert_eq!(ctrl_c_handler.load(Ordering::SeqCst), 0);
sender
.send("abcdefghijklmnopqrstuvwxyz\u{3}".to_owned())
.await
.expect("Failed to send first input + ctrl-c");
_ = provider.poll_for_input(false);
assert_eq!(ctrl_c_handler.load(Ordering::SeqCst), 0);
sender
.send("\u{3}".to_owned())
.await
.expect("Failed to send second ctrl-c!");
_ = provider.poll_for_input(true);
sender
.send("\u{3}".to_owned())
.await
.expect("Failed to send second ctrl-c!");
_ = provider.poll_for_input(false);
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
assert_eq!(ctrl_c_handler.load(Ordering::SeqCst), 2);
sender
.send("\u{4}".to_owned())
.await
.expect("Failed to send second ctrl-c!");
_ = provider.poll_for_input(true);
sender
.send("\u{4}".to_owned())
.await
.expect("Failed to send second ctrl-c!");
_ = provider.poll_for_input(false);
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
assert_eq!(ctrl_c_handler.load(Ordering::SeqCst), 4);
}
#[tokio::test]
pub async fn autocomplete() {
let suggestion = Box::new(AutocompleteSuggestionList(vec![
"command long".to_owned(),
"command short".to_owned(),
"command longer".to_owned(),
"command shorter".to_owned(),
"other command".to_owned(),
]));
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
provider.set_autocomplete_provider(suggestion);
sender
.send("command ".to_owned())
.await
.expect("Failed to send start of input");
_ = provider.poll_for_input(true);
assert!(provider.input_in_progress());
assert!(!provider.autocomplete_suggestion_pending());
assert_eq!(
provider.current_autocomplete_suggestion(),
Some("long".to_owned()),
);
sender
.send("\t".to_owned())
.await
.expect("Failed to send tab input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(12)],
);
assert!(provider.autocomplete_suggestion_pending());
assert_eq!(provider.current_input(), "command long");
sender
.send("\t".to_owned())
.await
.expect("Failed to send tab input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(13)],
);
assert!(provider.autocomplete_suggestion_pending());
assert_eq!(provider.current_input(), "command short");
sender
.send("\t".to_owned())
.await
.expect("Failed to send tab input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(14)],
);
assert!(provider.autocomplete_suggestion_pending());
assert_eq!(provider.current_input(), "command longer");
sender
.send("\u{1b}[Z".to_owned())
.await
.expect("Failed to send shift tab input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(13)],
"Post shift tab Current input is: [{}] @ {}",
provider.current_input(),
provider.current_input.cursor_position(),
);
assert!(provider.autocomplete_suggestion_pending());
assert_eq!(provider.current_input(), "command short");
sender
.send("a".to_owned())
.await
.expect("Failed to send promotion input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputAppend('a')],
);
assert!(!provider.autocomplete_suggestion_pending());
assert_eq!(provider.current_input(), "command shorta");
sender
.send("\u{3}".to_owned())
.await
.expect("Failed to send cancellation!");
_ = provider.poll_for_input(true);
sender
.send("command \u{1b}[D\u{1b}[D\u{1b}[D\u{1b}[D\u{1b}[D".to_owned())
.await
.expect("Failed to send start of input");
_ = provider.poll_for_input(true);
assert!(provider.input_in_progress());
assert!(!provider.autocomplete_suggestion_pending());
assert_eq!(
provider.current_autocomplete_suggestion(),
Some("long".to_owned()),
);
sender
.send("\t\u{1b}[C\u{1b}[C\u{1b}[C\u{1b}[C\u{1b}[C".to_owned())
.await
.expect("Failed to start autocomplete input!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(12)],
"Current input is: [{:?}] @ {}",
provider.current_input(),
provider.current_input.cursor_position(),
);
assert!(!provider.autocomplete_suggestion_pending());
assert_eq!(provider.current_input(), "command long".to_owned());
}
#[tokio::test]
pub async fn history_non_search() {
let history_provider = Box::new(SimpleHistoryProvider::new_in_memory(10));
history_provider.insert_command("previous command");
history_provider.insert_command("short");
history_provider.insert_command("cos sdkversion");
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
provider.set_history_provider(history_provider);
sender
.send("new command\rtest".to_owned())
.await
.expect("Failed to send newer command");
_ = provider.poll_for_input(false);
sender
.send("\u{1b}[A".to_owned())
.await
.expect("Failed to send up arrow!");
assert_eq!(
provider.poll_for_input(false),
vec![TerminalInputEvent::InputChanged(4)],
);
assert_eq!(provider.current_input(), "new command");
sender
.send("\u{1b}[A".to_owned())
.await
.expect("Failed to send up arrow!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(4)],
"Current input is: [{:?}] @ {}",
provider.current_input(),
provider.current_input.cursor_position(),
);
assert_eq!(provider.current_input(), "cos sdkversion");
sender
.send("\u{1b}[C\u{1b}[C\u{1b}[C\u{1b}[C\u{1b}[C\u{1b}[C".to_owned())
.await
.expect("failed to send some right arrows!");
_ = provider.poll_for_input(true);
sender
.send("\u{1b}[A".to_owned())
.await
.expect("Failed to send up arrow!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(5)],
"Current input is: [{:?}] @ {}",
provider.current_input(),
provider.current_input.cursor_position(),
);
assert_eq!(provider.current_input(), "short");
sender
.send("\u{1b}[A".to_owned())
.await
.expect("Failed to send up arrow!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(5)],
"Current input is: [{:?}] @ {}",
provider.current_input(),
provider.current_input.cursor_position(),
);
assert_eq!(provider.current_input(), "previous command");
sender
.send("\u{1b}[C\u{1b}[C\u{1b}[B".to_owned())
.await
.expect("Failed to send up arrow!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(5)],
"Current input is: [{:?}] @ {}",
provider.current_input(),
provider.current_input.cursor_position(),
);
assert_eq!(provider.current_input(), "short");
sender
.send("\u{1b}[B\u{1b}[B\u{1b}[B\u{1b}[B\u{1b}[B\u{1b}[B".to_owned())
.await
.expect("Failed to send down arrow!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(4)],
);
assert_eq!(provider.current_input(), "test");
assert_eq!(provider.current_input.cursor_position(), 4);
sender
.send("\u{3}".to_owned())
.await
.expect("Failed to send cancel input");
_ = provider.poll_for_input(true);
sender
.send("\u{1b}[A".to_owned())
.await
.expect("Failed to send up arrow!");
assert_eq!(
provider.poll_for_input(true),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputChanged(11),
],
);
}
#[tokio::test]
pub async fn history_searching() {
let history_provider = Box::new(SimpleHistoryProvider::new_in_memory(15));
history_provider.insert_command("word swap again");
history_provider.insert_command("word swap more");
history_provider.insert_command("swap word again");
history_provider.insert_command("swap word more");
history_provider.insert_command("comamand longer1");
history_provider.insert_command("command long");
history_provider.insert_command("command longer");
history_provider.insert_command("command short");
history_provider.insert_command("command shorter");
history_provider.insert_command("other");
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
provider.set_history_provider(history_provider);
sender
.send("command ".to_owned())
.await
.expect("Failed to start history search");
assert_eq!(
provider.poll_for_input(true),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputMassAppend("command ".to_owned()),
],
);
assert!(!provider.is_doing_history_search());
assert!(provider.current_history_search_value().is_none());
sender
.send("\u{12}".to_owned())
.await
.expect("Failed to send ctrl-r to input to start history search!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(8)],
);
assert!(provider.is_doing_history_search());
assert_eq!(
provider.current_history_search_value(),
Some("shorter".to_owned()),
);
assert_eq!(provider.current_input(), "command shorter");
sender
.send("l".to_owned())
.await
.expect("Failed to send key press to terminal");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputAppend('l')],
);
assert!(provider.is_doing_history_search());
assert_eq!(
provider.current_history_search_value(),
Some("onger".to_owned()),
);
assert_eq!(provider.current_input(), "command longer");
sender
.send("\u{1b}[D\u{1b}[D\u{2}\u{2}\u{1b}[D\u{1b}[Da".to_owned())
.await
.expect("Failed to send cursor movement and append for history!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(4)],
"Current Input Is ({:?}) @ {}",
provider.current_input(),
provider.current_input.cursor_position(),
);
assert!(provider.is_doing_history_search());
assert_eq!(
provider.current_history_search_value(),
Some("onger1".to_owned()),
);
sender
.send("\u{1b}[1;2Conger1".to_owned())
.await
.expect("Failed to complete history search!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(14)],
);
assert!(provider.is_doing_history_search());
assert!(provider.current_history_search_value().is_none());
sender
.send("\u{3}".to_owned())
.await
.expect("Failed to send cancel!");
_ = provider.poll_for_input(true);
sender
.send("word swap\u{12}".to_owned())
.await
.expect("Failed to send history search start!");
assert_eq!(
provider.poll_for_input(true),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputChanged(9),
],
);
sender
.send("\u{1b}t a".to_owned())
.await
.expect("Failed to swap words and type a!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputChanged(11)],
"Current input is: [{:?}] @ {}",
provider.current_input.input(),
provider.current_input.cursor_position(),
);
assert!(provider.is_doing_history_search());
assert_eq!(
provider.current_history_search_value(),
Some("gain".to_owned()),
);
assert_eq!(provider.current_input.input(), "swap word again");
sender
.send("\r".to_owned())
.await
.expect("Failed to send enter early!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputFinished],
);
assert_eq!(
provider.completed_inputs.last().map(Clone::clone),
Some("swap word again".to_owned())
);
assert!(!provider.is_doing_history_search());
sender
.send("command \u{12}".to_owned())
.await
.expect("Failed to test history search without ANSI!");
assert_eq!(
provider.poll_for_input(false),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputMassAppend("command ".to_owned())
],
);
assert!(!provider.is_doing_history_search());
}
#[tokio::test]
pub async fn paste_control_codes() {
let (sender, raw_receiver) = channel(8);
let read_provider = Mutex::new(raw_receiver);
let mut provider = StdinInputProvider::new_with_provider("test", read_provider)
.expect("Failed to create new stdin input provider!");
insert_next_paste_value("\u{1b}[31m\u{1b}[1;31min red!\u{1b}[0mjust normal!".to_owned());
sender
.send("\u{16}".to_owned())
.await
.expect("Failed to send paste!");
assert_eq!(
provider.poll_for_input(true),
vec![
TerminalInputEvent::InputStarted,
TerminalInputEvent::InputMassAppend(
"\u{1b}[31m\u{1b}[1;31min red!\u{1b}[0mjust normal!".to_owned()
),
],
);
assert_eq!(provider.current_input.cursor_position(), 35);
insert_next_paste_value("\u{16}\u{12}\u{2}\u{1b}[D\u{1b}[D\u{1b}[D".to_owned());
sender
.send("\u{16}".to_owned())
.await
.expect("Failed to send paste!");
assert_eq!(
provider.poll_for_input(true),
vec![TerminalInputEvent::InputMassAppend(
"\u{16}\u{12}\u{2}\u{1b}[D\u{1b}[D\u{1b}[D".to_owned()
)],
);
assert!(!provider.current_input.is_doing_history_search());
assert_eq!(provider.current_input.cursor_position(), 47);
}
}