use crate::{debug_log as debug, CrosstermEventReader, EventReader, ThagError, ThagResult};
use crossterm::{
event::{poll, read, Event, KeyCode, KeyModifiers},
terminal::{self, is_raw_mode_enabled},
};
use scopeguard::defer;
use std::{
env,
fmt::Debug,
io::{self, IsTerminal, Write},
time::{Duration, Instant},
};
#[cfg(target_os = "windows")]
use {
std::sync::OnceLock, winapi::um::consoleapi::SetConsoleMode,
winapi::um::handleapi::INVALID_HANDLE_VALUE, winapi::um::processenv::GetStdHandle,
winapi::um::winbase::STD_OUTPUT_HANDLE, winapi::um::wincon::ENABLE_VIRTUAL_TERMINAL_PROCESSING,
};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Terminal {
Screen,
Tmux,
XtermCompatible,
Windows,
Emacs,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct Rgb {
pub r: u16,
pub g: u16,
pub b: u16,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Theme {
Light,
Dark,
}
#[cfg(not(target_os = "windows"))]
#[must_use]
pub fn terminal() -> Terminal {
if env::var("INSIDE_EMACS").is_ok() {
return Terminal::Emacs;
}
if env::var("TMUX").is_ok() || env::var("TERM").is_ok_and(|x| x.starts_with("tmux-")) {
Terminal::Tmux
} else {
let is_screen = env::var("TERM").map_or(false, |term| term.starts_with("screen"));
if is_screen {
Terminal::Screen
} else {
Terminal::XtermCompatible
}
}
}
#[cfg(target_os = "windows")]
pub fn terminal() -> Terminal {
if let Ok(term_program) = env::var("TERM_PROGRAM") {
debug!("term_program={term_program}\r");
if term_program == "vscode" {
return Terminal::XtermCompatible;
}
}
if env::var("INSIDE_EMACS").is_ok() {
return Terminal::Emacs;
}
if enable_virtual_terminal_processing() {
debug!(
"This Windows terminal supports virtual terminal processing (but not OSC 10/11 colour queries if prior to Windows Terminal 1.22 Preview of August 2024)\r"
);
Terminal::XtermCompatible
} else {
debug!("Terminal::Windows\r");
Terminal::Windows
}
}
#[cfg(not(target_os = "windows"))]
pub fn rgb(timeout: Duration) -> ThagResult<Rgb> {
let term = terminal();
let rgb = match term {
Terminal::Emacs => Err(ThagError::UnsupportedTerm),
_ => from_xterm(term, timeout),
};
let fallback = from_env_colorfgbg();
if rgb.is_ok() {
rgb
} else if fallback.is_ok() {
fallback
} else {
rgb
}
}
#[cfg(target_os = "windows")]
pub fn rgb(timeout: Duration) -> ThagResult<Rgb> {
let term = terminal();
let rgb = match term {
Terminal::Emacs => Err(ThagError::UnsupportedTerm),
Terminal::XtermCompatible => from_xterm(term, timeout), _ => from_winapi(), };
let fallback = from_env_colorfgbg();
debug!("rgb={rgb:?}, fallback={fallback:?}\r");
if rgb.is_ok() {
rgb
} else if fallback.is_ok() {
fallback
} else {
rgb
}
}
pub fn theme(timeout: Duration) -> ThagResult<Theme> {
let rgb = rgb(timeout)?;
let y = f64::from(rgb.b).mul_add(
0.114,
f64::from(rgb.r).mul_add(0.299, f64::from(rgb.g) * 0.587),
);
if y > 32768.0 {
Ok(Theme::Light)
} else {
Ok(Theme::Dark)
}
}
#[cfg(target_os = "windows")]
fn enable_virtual_terminal_processing() -> bool {
static ENABLE_VT_PROCESSING: OnceLock<bool> = OnceLock::new();
*ENABLE_VT_PROCESSING.get_or_init(|| unsafe {
let handle = GetStdHandle(STD_OUTPUT_HANDLE);
if handle != INVALID_HANDLE_VALUE {
let mut mode: u32 = 0;
if winapi::um::consoleapi::GetConsoleMode(handle, &mut mode) != 0 {
if SetConsoleMode(handle, mode | ENABLE_VIRTUAL_TERMINAL_PROCESSING) != 0 {
debug!("Successfully enabled Virtual Terminal Processing.\r");
return true;
} else {
debug!("Failed to enable Virtual Terminal Processing.\r");
}
}
}
false
})
}
fn from_xterm(term: Terminal, timeout: Duration) -> ThagResult<Rgb> {
if !std::io::stdin().is_terminal()
|| !std::io::stdout().is_terminal()
|| !std::io::stderr().is_terminal()
{
return Err(ThagError::UnsupportedTerm);
}
let raw_before = is_raw_mode_enabled()?;
defer! {
let is_raw = match is_raw_mode_enabled() {
Ok(val) => val,
Err(e) => {
debug!("Failed to check raw mode status: {:?}\r", e);
return;
}
};
if is_raw == raw_before {
debug!("Raw mode status unchanged from raw={raw_before}.\r");
} else if let Err(e) = restore_raw_status(raw_before) {
debug!("Failed to restore raw mode: {e:?} to raw={raw_before}\r");
} else {
debug!("Raw mode restored to previous state (raw={raw_before}).\r");
}
if let Err(e) = clear_stdin() {
debug!("Failed to clear stdin: {e:?}\r");
} else {
debug!("Cleared any excess from stdin.\r");
}
}
if !raw_before {
terminal::enable_raw_mode()?;
}
#[cfg(target_os = "windows")]
{
if !enable_virtual_terminal_processing() {
debug!(
"Virtual Terminal Processing could not be enabled. Falling back to default behavior.\r"
);
return from_winapi();
}
}
let event_reader = CrosstermEventReader;
let mut stderr = io::stderr();
query_xterm(term, timeout, &event_reader, &mut stderr)
}
fn query_xterm<R, W>(
term: Terminal,
timeout: Duration,
event_reader: &R,
buffer: &mut W,
) -> ThagResult<Rgb>
where
R: EventReader + Debug,
W: Write + Debug,
{
let query = match term {
Terminal::Tmux => "\x1bPtmux;\x1b\x1b]11;?\x07\x1b\\",
Terminal::Screen => "\x1bP\x1b]11;?\x07\x1b\\",
_ => "\x1b]11;?\x1b\\",
};
write!(buffer, "{query}")?;
buffer.flush()?;
let mut response = String::new();
let start_time = Instant::now();
let timeout = if cfg!(target_os = "windows") {
Duration::from_secs(1) } else {
timeout
};
loop {
if start_time.elapsed() > timeout {
debug!("After timeout, found response={response}\r");
if response.contains("rgb:") {
let rgb_slice = decode_unterminated(&response)?;
debug!("Found a valid response {rgb_slice} in pre-timeout check despite unrecognized terminator in response code {response:#?}\r");
return parse_response(rgb_slice, start_time);
}
debug!("Failed to capture response\r");
return Err(io::Error::new(io::ErrorKind::TimedOut, "timeout 1").into());
}
if event_reader.poll(Duration::from_millis(100))? {
if let Event::Key(key_event) = event_reader.read_event()? {
match (key_event.code, key_event.modifiers) {
(KeyCode::Char('\\'), KeyModifiers::ALT | KeyModifiers::NONE) | (KeyCode::Char('g'), KeyModifiers::CONTROL) | (KeyCode::Char('\u{0007}'), KeyModifiers::NONE) => {
debug!("End of response detected ({key_event:?}).\r");
return parse_response(&response, start_time);
}
(KeyCode::Char(c), KeyModifiers::NONE) => {
debug!("pushing {c}\r");
response.push(c);
}
_ => {
debug!("ignoring {key_event:?}\r");
}
}
}
}
}
}
fn decode_unterminated(response: &str) -> ThagResult<&str> {
let resp_start = response
.find("rgb:")
.ok_or("Required string `rgb:` not found in response")?;
let mid = resp_start + 4;
let raw_rgb_slice = response.split_at(mid).1;
debug!("raw_rgb_slice={raw_rgb_slice}\r");
let fragments = raw_rgb_slice.splitn(3, '/').collect::<Vec<_>>();
if fragments.len() < 3 {
return Err(format!(
"Incomplete response `{response}`: does not contain two forward slashes"
)
.into());
}
let frag_len = fragments[0].len();
if fragments[1].len() != frag_len || fragments[2].len() < frag_len {
return Err(format!("Can't safely reconstitute unterminated response `{response}`from fragments of unequal length").into());
}
let rgb_str_len = frag_len * 3 + 2;
let rgb_slice = &response[resp_start..mid + rgb_str_len];
Ok(rgb_slice)
}
fn parse_response(response: &str, start_time: Instant) -> ThagResult<Rgb> {
let (r, g, b) = extract_rgb(response)?;
let elapsed = start_time.elapsed();
debug!("Elapsed time: {:.2?}\r", elapsed);
Ok(Rgb { r, g, b })
}
fn extract_rgb(response: &str) -> ThagResult<(u16, u16, u16)> {
let rgb_str = response
.split_at(
response
.find("rgb:")
.ok_or("Could not find 'rgb:' in terminal response string")?
+ 4,
)
.1;
let (r, g, b) = decode_x11_color(rgb_str)?;
Ok((r, g, b))
}
fn restore_raw_status(raw_before: bool) -> ThagResult<()> {
let raw_now = is_raw_mode_enabled()?;
if raw_now == raw_before {
return Ok(());
}
if raw_before {
terminal::enable_raw_mode()?;
} else {
terminal::disable_raw_mode()?;
}
Ok(())
}
fn clear_stdin() -> Result<(), Box<dyn std::error::Error>> {
while poll(Duration::from_millis(10))? {
if let Event::Key(c) = read()? {
debug!("discarding char{c:x?}\r");
}
}
Ok(())
}
fn from_env_colorfgbg() -> ThagResult<Rgb> {
let var = env::var("COLORFGBG").map_err(|_| ThagError::UnsupportedTerm)?;
let fgbg: Vec<_> = var.split(';').collect();
let bg = fgbg.get(1).ok_or(ThagError::UnsupportedTerm)?;
let bg = bg.parse::<u8>().map_err(|_| var)?;
#[allow(clippy::match_same_arms)]
let (r, g, b) = match bg {
0 => (0, 0, 0),
1 => (205, 0, 0),
2 => (0, 205, 0),
3 => (205, 205, 0),
4 => (0, 0, 238),
5 => (205, 0, 205),
6 => (0, 205, 205),
7 => (229, 229, 229),
8 => (127, 127, 127),
9 => (255, 0, 0),
10 => (0, 255, 0),
11 => (255, 255, 0),
12 => (92, 92, 255),
13 => (255, 0, 255),
14 => (0, 255, 255),
15 => (255, 255, 255),
_ => (0, 0, 0),
};
Ok(Rgb {
r: r * 256,
g: g * 256,
b: b * 256,
})
}
fn decode_x11_color(s: &str) -> ThagResult<(u16, u16, u16)> {
fn decode_hex(s: &str) -> ThagResult<u16> {
let len = s.len();
let mut ret = u16::from_str_radix(s, 16).map_err(|_| s.to_string())?;
ret <<= (4 - len) * 4;
Ok(ret)
}
let rgb: Vec<_> = s.split('/').collect();
let r = rgb.first().ok_or_else(|| s.to_string())?;
let g = rgb.get(1).ok_or_else(|| s.to_string())?;
let b = rgb.get(2).ok_or_else(|| s.to_string())?;
let r = decode_hex(r)?;
let g = decode_hex(g)?;
let b = decode_hex(b)?;
Ok((r, g, b))
}
#[cfg(target_os = "windows")]
fn from_winapi() -> ThagResult<Rgb> {
use winapi::um::wincon;
debug!("In from_winapi()\r");
let info = unsafe {
let handle = winapi::um::processenv::GetStdHandle(winapi::um::winbase::STD_OUTPUT_HANDLE);
let mut info: wincon::CONSOLE_SCREEN_BUFFER_INFO = Default::default();
wincon::GetConsoleScreenBufferInfo(handle, &mut info);
info
};
debug!("info.wAttributes={:x?}\r", info.wAttributes);
let r = (wincon::BACKGROUND_RED & info.wAttributes) != 0;
let g = (wincon::BACKGROUND_GREEN & info.wAttributes) != 0;
let b = (wincon::BACKGROUND_BLUE & info.wAttributes) != 0;
let i = (wincon::BACKGROUND_INTENSITY & info.wAttributes) != 0;
let r: u8 = r as u8;
let g: u8 = g as u8;
let b: u8 = b as u8;
let i: u8 = i as u8;
let (r, g, b) = match (r, g, b, i) {
(0, 0, 0, 0) => (0, 0, 0),
(1, 0, 0, 0) => (128, 0, 0),
(0, 1, 0, 0) => (0, 128, 0),
(1, 1, 0, 0) => (128, 128, 0),
(0, 0, 1, 0) => (0, 0, 128),
(1, 0, 1, 0) => (128, 0, 128),
(0, 1, 1, 0) => (0, 128, 128),
(1, 1, 1, 0) => (192, 192, 192),
(0, 0, 0, 1) => (128, 128, 128),
(1, 0, 0, 1) => (255, 0, 0),
(0, 1, 0, 1) => (0, 255, 0),
(1, 1, 0, 1) => (255, 255, 0),
(0, 0, 1, 1) => (0, 0, 255),
(1, 0, 1, 1) => (255, 0, 255),
(0, 1, 1, 1) => (0, 255, 255),
(1, 1, 1, 1) => (255, 255, 255),
_ => unreachable!(),
};
Ok(Rgb {
r: r * 256,
g: g * 256,
b: b * 256,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::MockEventReader;
use crossterm::event::Event;
use crossterm::event::KeyEvent;
use either::Either;
use mockall::mock;
use std::io::{self, Write};
use std::iter::{self, Cloned};
use std::slice::Iter;
use std::sync::{Arc, Mutex};
use std::thread::sleep;
use std::time::Duration;
const ESC_OSC_QUERY: &[u8; 8] = b"\x1b]11;?\x1b\\";
const RGB_RESPONSE: &[Event] = &[
Event::Key(KeyEvent::new(KeyCode::Char(']'), KeyModifiers::ALT)),
Event::Key(KeyEvent::new(KeyCode::Char('1'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('1'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char(';'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('r'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('g'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('b'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char(':'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('f'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('f'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('/'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('c'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('c'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('/'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('9'), KeyModifiers::NONE)),
Event::Key(KeyEvent::new(KeyCode::Char('9'), KeyModifiers::NONE)),
];
const RGB_RESPONSE_LEN: usize = RGB_RESPONSE.len();
fn run_query_xterm_test(
emulate_response: bool,
num_to_send: usize, maybe_terminator: Option<&Event>,
expected_rgb: Option<(u16, u16, u16)>,
) {
eprintln!("Testing for terminator {maybe_terminator:?}");
let mut mock_writer = MockWriter::new();
let mut mock_event_reader = MockEventReader::new();
mock_writer
.expect_write()
.withf(move |buf| buf == ESC_OSC_QUERY)
.times(1)
.returning(|_| Ok(ESC_OSC_QUERY.len()));
mock_writer.expect_flush().times(1).returning(|| Ok(()));
let event_count = Arc::new(Mutex::new(0));
let total_events = if emulate_response {
num_to_send + if maybe_terminator.is_some() { 1 } else { 0 }
} else {
0
};
let poll_event_count = Arc::clone(&event_count);
let read_event_count = Arc::clone(&event_count);
mock_event_reader.expect_poll().returning(move |_| {
let count = poll_event_count.lock().unwrap();
if *count < total_events {
Ok(true) } else {
Ok(false) }
});
let base_iterator = RGB_RESPONSE.iter().cloned();
let mut response_iter: Either<
Cloned<Iter<'_, Event>>,
std::iter::Chain<Cloned<Iter<'_, Event>>, iter::Once<Event>>,
> = if let Some(terminator) = maybe_terminator {
Either::Right(base_iterator.chain(iter::once(terminator.clone())))
} else {
Either::Left(base_iterator)
};
mock_event_reader.expect_read_event().returning(move || {
let event_count = Arc::clone(&read_event_count);
let mut count = event_count.lock().unwrap();
if *count < total_events {
*count += 1; response_iter.next().ok_or_else(|| {
sleep(Duration::from_secs(3));
io::Error::new(io::ErrorKind::TimedOut, "timeout 2").into()
})
} else {
sleep(Duration::from_secs(3));
Err("timeout 3".into()) }
});
let result = query_xterm(
Terminal::XtermCompatible,
Duration::from_secs(1),
&mock_event_reader,
&mut mock_writer,
);
debug!("result={result:?}\r");
match expected_rgb {
Some((r, g, b)) => {
let rgb = result.expect("Expected successful RGB parsing");
assert_eq!(
rgb,
Rgb { r, g, b },
"RGB values do not match expected for terminator {maybe_terminator:?}",
);
}
None => {
assert!(result.is_err(), "Expected an error for this scenario");
}
}
}
mock! {
#[derive(Debug)]
Writer {}
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize>;
fn flush(&mut self) -> io::Result<()>;
}
}
#[test]
fn test_termbg_query_xterm_with_various_terminators() {
const TERMINATORS: &[Event] = &[
Event::Key(KeyEvent::new(
KeyCode::Char('g'), KeyModifiers::CONTROL,
)),
Event::Key(KeyEvent::new(
KeyCode::Char(0x07_u8 as char),
KeyModifiers::NONE,
)), Event::Key(KeyEvent::new(
KeyCode::Char(0x09c_u8 as char),
KeyModifiers::NONE,
)), Event::Key(KeyEvent::new(
KeyCode::Char(0x5c_u8 as char),
KeyModifiers::ALT,
)), Event::Key(KeyEvent::new(
KeyCode::Char(0x5c_u8 as char),
KeyModifiers::NONE,
)), Event::Key(KeyEvent::new(KeyCode::Char('\\'), KeyModifiers::ALT)), Event::Key(KeyEvent::new(KeyCode::Char('\\'), KeyModifiers::NONE)), Event::Key(KeyEvent::new(KeyCode::Char('x'), KeyModifiers::NONE)), ];
let expected_rgb = Some((0xff * 256, 0xcc * 256, 0x99 * 256));
for terminator in TERMINATORS {
run_query_xterm_test(true, RGB_RESPONSE_LEN, Some(terminator), expected_rgb);
}
}
#[test]
fn test_termbg_query_xterm_timeout_no_response() {
run_query_xterm_test(false, RGB_RESPONSE_LEN, None, None);
}
#[test]
fn test_termbg_query_xterm_timeout_incomplete_response_1() {
run_query_xterm_test(true, RGB_RESPONSE_LEN - 4, None, None);
}
#[test]
fn test_termbg_query_xterm_timeout_incomplete_response_2() {
run_query_xterm_test(true, RGB_RESPONSE_LEN - 1, None, None);
}
#[test]
fn test_termbg_query_xterm_timeout_unterminated_response() {
run_query_xterm_test(
true,
RGB_RESPONSE_LEN,
None,
Some((0xff * 256, 0xcc * 256, 0x99 * 256)),
);
}
#[test]
fn test_termbg_decode_x11_color() {
let s = "0000/0000/0000";
assert_eq!((0, 0, 0), decode_x11_color(s).unwrap());
let s = "1111/2222/3333";
assert_eq!((0x1111, 0x2222, 0x3333), decode_x11_color(s).unwrap());
let s = "111/222/333";
assert_eq!((0x1110, 0x2220, 0x3330), decode_x11_color(s).unwrap());
let s = "11/22/33";
assert_eq!((0x1100, 0x2200, 0x3300), decode_x11_color(s).unwrap());
let s = "1/2/3";
assert_eq!((0x1000, 0x2000, 0x3000), decode_x11_color(s).unwrap());
}
}