use std::cmp::Reverse;
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::os::fd::{BorrowedFd, RawFd};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use nix::libc;
use nix::sys::termios::{LocalFlags, tcgetattr};
use crate::highlight::{
AnsiChunk, BenchmarkReport, Highlighter, MAX_INCOMPLETE_ESCAPE_BYTES, StreamingHighlighter,
incomplete_escape_start, strip_ansi,
};
use crate::profile_runtime::ProfileRuntime;
use crate::profiles::ProfileStore;
use super::CliError;
use super::args::Options;
use super::profile_selection::{
ProfileReporter, auto_detect_enabled, build_highlighter_for_profiles_with_store,
dynamic_profile_enabled, profile_store, select_profile_names_with_store,
should_continue_auto_detect,
};
use super::runtime::ReloadWatcher;
use super::trace::IoTrace;
const AUTO_DETECT_SAMPLE_LIMIT: usize = 64 * 1024;
pub(super) struct InputSource {
pub(super) interactive: bool,
pub(super) pty_fd: Option<RawFd>,
pub(super) recent_input: Option<Arc<Mutex<Vec<u8>>>>,
}
pub(super) fn highlight_stream<R: Read, W: Write>(
mut reader: R,
writer: &mut W,
options: &Options,
input: InputSource,
mut reload_watcher: Option<ReloadWatcher>,
trace: IoTrace,
profile_input_rx: Option<mpsc::Receiver<Vec<u8>>>,
) -> Result<(), CliError> {
let started = Instant::now();
let mut input_bytes = 0usize;
let mut buffer = [0_u8; 8192];
let mut strip_carry: Vec<u8> = Vec::new();
let read = reader.read(&mut buffer)?;
if read == 0 {
return Ok(());
}
trace.log("OUT", &buffer[..read]);
let first_chunk = prepare_chunk(&buffer[..read], options.strip_ansi, &mut strip_carry);
let mut detection_sample = first_chunk.bytes().to_vec();
input_bytes += first_chunk.bytes().len();
let store = profile_store()?;
let profile_names = select_profile_names_with_store(options, &store, &detection_sample)?;
let mut session = HighlightSession::new(options, &store, input.interactive, profile_names)?;
session.report_current();
let dynamic_profiles =
dynamic_profile_enabled(options, input.interactive) && profile_input_rx.is_some();
let mut profile_runtime = if dynamic_profiles {
Some(ProfileRuntime::new(session.profile_names().to_vec()))
} else {
None
};
let mut auto_detect_pending =
!dynamic_profiles && should_continue_auto_detect(options, session.profile_names());
if let Some(next_profile_names) = observe_dynamic_profile(
&mut profile_runtime,
profile_input_rx.as_ref(),
dynamic_profiles.then_some(&store),
&first_chunk,
) {
session.switch_profiles(writer, &trace, next_profile_names)?;
}
session.push(writer, &trace, &first_chunk)?;
if should_flush_input_echo(
input.interactive,
input.pty_fd,
session.buffered_echo(),
input.recent_input.as_deref(),
) {
session.flush_input_echo(writer, &trace)?;
}
writer.flush()?;
loop {
let read = reader.read(&mut buffer)?;
if read == 0 {
break;
}
trace.log("OUT", &buffer[..read]);
let chunk = prepare_chunk(&buffer[..read], options.strip_ansi, &mut strip_carry);
input_bytes += chunk.bytes().len();
if let Some(next_profile_names) = observe_dynamic_profile(
&mut profile_runtime,
profile_input_rx.as_ref(),
dynamic_profiles.then_some(&store),
&chunk,
) {
session.switch_profiles(writer, &trace, next_profile_names)?;
}
if auto_detect_pending && detection_sample.len() < AUTO_DETECT_SAMPLE_LIMIT {
detection_sample.extend_from_slice(chunk.bytes());
let next_profile_names =
select_profile_names_with_store(options, &store, &detection_sample)?;
if next_profile_names.as_slice() != session.profile_names() {
session.switch_profiles(writer, &trace, next_profile_names)?;
auto_detect_pending = should_continue_auto_detect(options, session.profile_names());
} else if detection_sample.len() >= AUTO_DETECT_SAMPLE_LIMIT {
auto_detect_pending = false;
session.report_current();
}
}
if reload_watcher
.as_mut()
.is_some_and(ReloadWatcher::reload_requested)
{
session.reload(writer, &trace)?;
}
session.push(writer, &trace, &chunk)?;
if should_flush_input_echo(
input.interactive,
input.pty_fd,
session.buffered_echo(),
input.recent_input.as_deref(),
) {
session.flush_input_echo(writer, &trace)?;
}
writer.flush()?;
}
session.finish(writer, &trace)?;
writer.flush()?;
session.report_current();
if options.benchmark {
print_benchmark_report(
session.benchmark_report(),
input_bytes,
started.elapsed().as_secs_f64(),
);
}
Ok(())
}
const HIGHLIGHTER_CACHE_LIMIT: usize = 32;
#[derive(Default)]
struct HighlighterCache {
entries: HashMap<Vec<String>, Highlighter>,
}
impl HighlighterCache {
fn get_or_build<E>(
&mut self,
profile_names: &[String],
build: impl FnOnce() -> Result<Highlighter, E>,
) -> Result<Highlighter, E> {
if let Some(cached) = self.entries.get(profile_names) {
return Ok(cached.clone());
}
let highlighter = build()?;
if self.entries.len() >= HIGHLIGHTER_CACHE_LIMIT {
self.entries.clear();
}
self.entries
.insert(profile_names.to_vec(), highlighter.clone());
Ok(highlighter)
}
}
struct HighlightSession<'a> {
options: &'a Options,
store: &'a ProfileStore,
interactive: bool,
profile_names: Vec<String>,
streaming: StreamingHighlighter,
reporter: ProfileReporter,
highlighter_cache: HighlighterCache,
}
impl<'a> HighlightSession<'a> {
fn new(
options: &'a Options,
store: &'a ProfileStore,
interactive: bool,
profile_names: Vec<String>,
) -> Result<Self, CliError> {
let streaming = Self::streaming_for(options, store, &profile_names, interactive)?;
let reporter = ProfileReporter::new(options.show_profile, auto_detect_enabled(options));
Ok(Self {
options,
store,
interactive,
profile_names,
streaming,
reporter,
highlighter_cache: HighlighterCache::default(),
})
}
fn profile_names(&self) -> &[String] {
&self.profile_names
}
fn report_current(&mut self) {
self.reporter.report(&self.profile_names);
}
fn switch_profiles<W: Write>(
&mut self,
writer: &mut W,
trace: &IoTrace,
profile_names: Vec<String>,
) -> Result<(), CliError> {
if profile_names == self.profile_names {
return Ok(());
}
self.rebuild(writer, trace, profile_names, true)
}
fn reload<W: Write>(&mut self, writer: &mut W, trace: &IoTrace) -> Result<(), CliError> {
self.rebuild(writer, trace, self.profile_names.clone(), false)
}
fn push<W: Write>(
&mut self,
writer: &mut W,
trace: &IoTrace,
chunk: &AnsiChunk,
) -> Result<(), CliError> {
write_rendered(writer, trace, self.streaming.push_chunk(chunk))?;
Ok(())
}
fn flush_input_echo<W: Write>(
&mut self,
writer: &mut W,
trace: &IoTrace,
) -> Result<(), CliError> {
write_rendered(writer, trace, self.streaming.flush_buffered_echo())?;
Ok(())
}
fn buffered_echo(&self) -> &[u8] {
self.streaming.buffered_echo()
}
fn finish<W: Write>(&mut self, writer: &mut W, trace: &IoTrace) -> Result<(), CliError> {
write_rendered(writer, trace, self.streaming.finish())?;
Ok(())
}
fn benchmark_report(&self) -> Option<&BenchmarkReport> {
self.streaming.benchmark_report()
}
fn rebuild<W: Write>(
&mut self,
_writer: &mut W,
_trace: &IoTrace,
profile_names: Vec<String>,
report: bool,
) -> Result<(), CliError> {
self.profile_names = profile_names;
let options = self.options;
let store = self.store;
let interactive = self.interactive;
let names = self.profile_names.clone();
let highlighter = self.highlighter_cache.get_or_build(&names, || {
build_highlighter_for_profiles_with_store(options, store, &names, interactive)
})?;
self.streaming.replace_highlighter(highlighter);
if report {
self.report_current();
}
Ok(())
}
fn streaming_for(
options: &Options,
store: &ProfileStore,
profile_names: &[String],
interactive: bool,
) -> Result<StreamingHighlighter, CliError> {
let highlighter =
build_highlighter_for_profiles_with_store(options, store, profile_names, interactive)?;
Ok(new_streaming_highlighter(
highlighter,
interactive,
options.benchmark,
options.no_minimal_reset,
))
}
}
fn observe_dynamic_profile(
runtime: &mut Option<ProfileRuntime>,
profile_input_rx: Option<&mpsc::Receiver<Vec<u8>>>,
store: Option<&ProfileStore>,
chunk: &AnsiChunk,
) -> Option<Vec<String>> {
let runtime = runtime.as_mut()?;
let store = store?;
if let Some(receiver) = profile_input_rx {
while let Ok(input) = receiver.try_recv() {
runtime.observe_input(&input);
}
}
runtime.observe_output(chunk.visible_bytes(), store)
}
fn write_rendered<W: Write>(writer: &mut W, trace: &IoTrace, rendered: Vec<u8>) -> io::Result<()> {
trace.log("RENDER", &rendered);
writer.write_all(&rendered)
}
fn new_streaming_highlighter(
highlighter: Highlighter,
interactive: bool,
benchmark: bool,
no_minimal_reset: bool,
) -> StreamingHighlighter {
let mut streaming = if interactive && benchmark {
StreamingHighlighter::new_interactive_with_benchmark(highlighter)
} else if interactive {
StreamingHighlighter::new_interactive(highlighter)
} else if benchmark {
StreamingHighlighter::new_with_benchmark(highlighter)
} else {
StreamingHighlighter::new(highlighter)
};
if no_minimal_reset {
streaming.set_no_minimal_resets(true);
}
streaming
}
fn print_benchmark_report(report: Option<&BenchmarkReport>, input_bytes: usize, elapsed_secs: f64) {
eprintln!("Benchmark results (time spent, match count):");
if let Some(report) = report {
let total = report.total_duration().as_secs_f64();
let mut rules = report.rules().to_vec();
rules.sort_by_key(|rule| Reverse(rule.duration));
for rule in rules {
let percent = if total > 0.0 {
rule.duration.as_secs_f64() / total * 100.0
} else {
0.0
};
eprintln!(
"{percent:>6.2}% {:>8.3}s {:<7} {}",
rule.duration.as_secs_f64(),
rule.match_count,
rule.description
);
}
}
eprintln!("Processed {input_bytes} bytes in {elapsed_secs:.3}s");
}
fn prepare_chunk(input: &[u8], strip_existing_ansi: bool, strip_carry: &mut Vec<u8>) -> AnsiChunk {
if strip_existing_ansi {
let mut combined = std::mem::take(strip_carry);
combined.extend_from_slice(input);
let split = incomplete_escape_start(&combined).unwrap_or(combined.len());
strip_carry.extend_from_slice(&combined[split..]);
if strip_carry.len() > MAX_INCOMPLETE_ESCAPE_BYTES {
strip_carry.clear();
}
AnsiChunk::new(strip_ansi(&combined[..split]))
} else {
AnsiChunk::from_slice(input)
}
}
fn should_flush_input_echo(
interactive: bool,
pty_fd: Option<RawFd>,
buffered_echo: &[u8],
recent_input: Option<&Mutex<Vec<u8>>>,
) -> bool {
if !interactive || buffered_echo.is_empty() {
return false;
}
let Some(recent_input) = recent_input else {
return false;
};
if !terminal_echo_enabled(pty_fd) {
recent_input
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clear();
return false;
}
if !input_source_idle(pty_fd) {
return false;
}
let mut recent_input = recent_input
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
consume_echo_suffix(&mut recent_input, buffered_echo)
}
pub(super) fn terminal_echo_enabled(pty_fd: Option<RawFd>) -> bool {
let Some(fd) = pty_fd else {
return true;
};
let borrowed = unsafe { BorrowedFd::borrow_raw(fd) };
match tcgetattr(borrowed) {
Ok(termios) => termios.local_flags.contains(LocalFlags::ECHO),
Err(_) => false,
}
}
fn consume_echo_suffix(recent_input: &mut Vec<u8>, echo: &[u8]) -> bool {
if recent_input.ends_with(echo) {
recent_input.clear();
true
} else {
false
}
}
fn input_source_idle(pty_fd: Option<RawFd>) -> bool {
let Some(fd) = pty_fd else {
return false;
};
let mut poll_fd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let ready = unsafe { libc::poll(&mut poll_fd, 1, 0) };
ready <= 0
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
fn sample_highlighter() -> crate::highlight::Highlighter {
let config =
crate::config::PrismConfig::from_chromaterm_yaml("rules: []\n").expect("config loads");
crate::highlight::Highlighter::from_config(config).expect("highlighter compiles")
}
#[test]
fn highlighter_cache_reuses_compiled_highlighters_for_seen_profile_sets() {
let mut cache = super::HighlighterCache::default();
let set_a = vec!["generic".to_string(), "cisco".to_string()];
let set_b = vec!["generic".to_string(), "juniper".to_string()];
let mut builds = 0usize;
cache
.get_or_build(&set_a, || {
builds += 1;
Ok::<_, ()>(sample_highlighter())
})
.unwrap();
cache
.get_or_build(&set_b, || {
builds += 1;
Ok::<_, ()>(sample_highlighter())
})
.unwrap();
cache
.get_or_build(&set_a, || {
builds += 1;
Ok::<_, ()>(sample_highlighter())
})
.unwrap();
assert_eq!(
builds, 2,
"switching back to a seen profile set must not recompile"
);
}
#[test]
fn strip_mode_carries_split_escape_across_reads() {
let mut carry = Vec::new();
let first = super::prepare_chunk(b"hello\x1b[3", true, &mut carry);
let second = super::prepare_chunk(b"1m world", true, &mut carry);
let mut visible = first.bytes().to_vec();
visible.extend_from_slice(second.bytes());
assert_eq!(
visible,
b"hello world",
"split escape tail leaked into stripped output: {:?}",
String::from_utf8_lossy(&visible)
);
}
#[test]
fn strip_mode_drops_oversized_incomplete_escape_carry() {
let mut carry = Vec::new();
let mut input = b"\x1b[".to_vec();
input.extend(std::iter::repeat_n(
b'1',
crate::highlight::MAX_INCOMPLETE_ESCAPE_BYTES + 1,
));
let chunk = super::prepare_chunk(&input, true, &mut carry);
assert!(
chunk.bytes().is_empty(),
"oversized incomplete escape should be stripped as control data"
);
assert!(
carry.is_empty(),
"oversized incomplete escape carry must not grow without bound"
);
}
#[test]
fn dynamic_profile_observation_reuses_prepared_visible_chunk() {
let source = include_str!("stream.rs");
let runtime_source = source.split("#[cfg(test)]").next().unwrap_or(source);
assert!(!runtime_source.contains("let visible_chunk = strip_ansi(chunk)"));
assert!(runtime_source.contains("visible_bytes()"));
}
#[test]
fn consume_echo_suffix_matches_only_genuine_echo() {
let mut recent = b"update add test.example.com 3600 A 192.0.2.1".to_vec();
assert!(super::consume_echo_suffix(&mut recent, b"192.0.2.1"));
assert!(
recent.is_empty(),
"matched input is cleared after the echo tail is surfaced"
);
assert!(!super::consume_echo_suffix(&mut recent, b"192.0.2.1"));
let mut typed = b"x".to_vec();
assert!(!super::consume_echo_suffix(&mut typed, b"Vlan11"));
assert_eq!(typed, b"x", "non-matching input is left untouched");
let mut empty = Vec::new();
assert!(!super::consume_echo_suffix(&mut empty, b"Vlan11"));
}
#[test]
fn should_flush_input_echo_requires_interactive_buffered_and_recent_input() {
assert!(!super::should_flush_input_echo(false, None, b"tok", None));
let recent = Mutex::new(b"tok".to_vec());
assert!(!super::should_flush_input_echo(
true,
None,
b"",
Some(&recent)
));
assert!(!super::should_flush_input_echo(true, None, b"tok", None));
}
#[test]
fn terminal_echo_enabled_fails_closed_for_non_terminal_fd() {
let mut fds = [0 as nix::libc::c_int; 2];
assert_eq!(unsafe { nix::libc::pipe(fds.as_mut_ptr()) }, 0);
assert!(!super::terminal_echo_enabled(Some(fds[0])));
unsafe {
nix::libc::close(fds[0]);
nix::libc::close(fds[1]);
}
}
#[test]
fn should_flush_input_echo_flushes_only_matching_idle_echo() {
use nix::pty::openpty;
use nix::sys::termios::{SetArg, tcgetattr, tcsetattr};
use std::os::fd::AsRawFd;
let pty = openpty(None, None).expect("openpty");
let master_fd = pty.master.as_raw_fd();
let slave_fd = pty.slave.as_raw_fd();
let mut attrs = tcgetattr(&pty.slave).expect("tcgetattr");
attrs
.local_flags
.insert(nix::sys::termios::LocalFlags::ECHO);
tcsetattr(&pty.slave, SetArg::TCSANOW, &attrs).expect("echo on");
let recent = Mutex::new(b"router# show ".to_vec());
assert!(super::should_flush_input_echo(
true,
Some(master_fd),
b"show ",
Some(&recent),
));
assert!(
recent.lock().unwrap().is_empty(),
"matched input is cleared after the echo tail is surfaced"
);
let recent = Mutex::new(b"x".to_vec());
assert!(!super::should_flush_input_echo(
true,
Some(master_fd),
b"Vlan11",
Some(&recent),
));
assert_eq!(recent.lock().unwrap().as_slice(), b"x");
assert_eq!(
unsafe { nix::libc::write(slave_fd, b"x".as_ptr().cast(), 1) },
1
);
let recent = Mutex::new(b"show ".to_vec());
assert!(!super::should_flush_input_echo(
true,
Some(master_fd),
b"show ",
Some(&recent),
));
assert_eq!(recent.lock().unwrap().as_slice(), b"show ");
}
}