use std::io::Read;
use std::time::Duration;
use anyhow::Result;
const ESC: u8 = 0x1b;
const BRACKETED_PASTE_START: &[u8] = b"\x1b[200~";
const BRACKETED_PASTE_END: &[u8] = b"\x1b[201~";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SequenceStatus {
Complete,
Incomplete,
NotEscape,
}
fn is_complete_sequence(data: &[u8]) -> SequenceStatus {
if data.is_empty() || data[0] != ESC {
return SequenceStatus::NotEscape;
}
if data.len() == 1 {
return SequenceStatus::Incomplete;
}
let after_esc = &data[1..];
if after_esc.starts_with(b"[") {
if after_esc.starts_with(b"[M") {
return if data.len() >= 6 {
SequenceStatus::Complete
} else {
SequenceStatus::Incomplete
};
}
return is_complete_csi_sequence(data);
}
if after_esc.starts_with(b"]") {
return is_complete_osc_sequence(data);
}
if after_esc.starts_with(b"P") {
return is_complete_string_sequence(data, b"P");
}
if after_esc.starts_with(b"_") {
return is_complete_string_sequence(data, b"_");
}
if after_esc.starts_with(b"O") {
return if after_esc.len() >= 2 {
SequenceStatus::Complete
} else {
SequenceStatus::Incomplete
};
}
if after_esc.len() == 1 {
return SequenceStatus::Complete;
}
SequenceStatus::Complete
}
fn is_complete_csi_sequence(data: &[u8]) -> SequenceStatus {
if data.len() < 3 {
return SequenceStatus::Incomplete;
}
let payload = &data[2..];
let last = payload[payload.len() - 1];
if !(0x40..=0x7e).contains(&last) {
return SequenceStatus::Incomplete;
}
if payload.starts_with(b"<") {
if is_valid_sgr_mouse(payload) {
return SequenceStatus::Complete;
}
return SequenceStatus::Incomplete;
}
SequenceStatus::Complete
}
fn is_valid_sgr_mouse(payload: &[u8]) -> bool {
if payload.len() < 4 || payload[0] != b'<' {
return false;
}
let last = payload[payload.len() - 1];
if last != b'M' && last != b'm' {
return false;
}
let inner = &payload[1..payload.len() - 1];
let parts: Vec<&[u8]> = inner.split(|&b| b == b';').collect();
if parts.len() != 3 {
return false;
}
parts.iter().all(|p| !p.is_empty() && p.iter().all(|&b| b.is_ascii_digit()))
}
fn is_complete_osc_sequence(data: &[u8]) -> SequenceStatus {
if data.len() >= 2 && data[data.len() - 1] == 0x07 {
return SequenceStatus::Complete;
}
let len = data.len();
if len >= 2 && data[len - 2] == ESC && data[len - 1] == b'\\' {
return SequenceStatus::Complete;
}
SequenceStatus::Incomplete
}
fn is_complete_string_sequence(data: &[u8], _kind: &[u8]) -> SequenceStatus {
let len = data.len();
if len >= 2 && data[len - 2] == ESC && data[len - 1] == b'\\' {
return SequenceStatus::Complete;
}
SequenceStatus::Incomplete
}
fn extract_complete_sequences(buf: &[u8]) -> (Vec<Vec<u8>>, Vec<u8>) {
let mut sequences: Vec<Vec<u8>> = Vec::new();
let mut pos = 0;
while pos < buf.len() {
if buf[pos] == ESC {
let mut found = false;
for end in (pos + 1)..=buf.len() {
let candidate = &buf[pos..end];
match is_complete_sequence(candidate) {
SequenceStatus::Complete => {
sequences.push(candidate.to_vec());
pos = end;
found = true;
break;
}
SequenceStatus::Incomplete => continue,
SequenceStatus::NotEscape => {
sequences.push(candidate.to_vec());
pos = end;
found = true;
break;
}
}
}
if !found {
return (sequences, buf[pos..].to_vec());
}
} else {
sequences.push(vec![buf[pos]]);
pos += 1;
}
}
(sequences, Vec::new())
}
fn parse_kitty_printable_codepoint(seq: &[u8]) -> Option<u32> {
if seq.len() < 4 {
return None;
}
if seq[0] != ESC || seq[1] != b'[' {
return None;
}
if *seq.last()? != b'u' {
return None;
}
let inner = &seq[2..seq.len() - 1];
let cp_end = inner
.iter()
.position(|&b| b == b':' || b == b';')
.unwrap_or(inner.len());
if cp_end == 0 {
return None;
}
let cp_bytes = &inner[..cp_end];
let cp_str = std::str::from_utf8(cp_bytes).ok()?;
let codepoint: u32 = cp_str.parse().ok()?;
if codepoint >= 32 {
Some(codepoint)
} else {
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BufferingMode {
Character,
Line,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StdinBufferEvent {
Data(Vec<u8>),
Paste(Vec<u8>),
}
#[derive(Debug, Clone)]
pub struct StdinBufferOptions {
pub timeout: Duration,
pub mode: BufferingMode,
pub read_capacity: usize,
}
impl Default for StdinBufferOptions {
fn default() -> Self {
Self {
timeout: Duration::from_millis(10),
mode: BufferingMode::Character,
read_capacity: 4096,
}
}
}
pub struct StdinBuffer<R: Read> {
reader: R,
read_buf: Vec<u8>,
seq_buf: Vec<u8>,
opts: StdinBufferOptions,
paste_mode: bool,
paste_buf: Vec<u8>,
pending_kitty_cp: Option<u32>,
line_buf: Vec<u8>,
}
impl<R: Read> StdinBuffer<R> {
pub fn new(reader: R, opts: StdinBufferOptions) -> Self {
Self {
reader,
read_buf: vec![0u8; opts.read_capacity],
seq_buf: Vec::with_capacity(256),
opts,
paste_mode: false,
paste_buf: Vec::new(),
pending_kitty_cp: None,
line_buf: Vec::new(),
}
}
pub fn with_reader(reader: R) -> Self {
Self::new(reader, StdinBufferOptions::default())
}
pub fn mode(&self) -> BufferingMode {
self.opts.mode
}
pub fn set_mode(&mut self, mode: BufferingMode) {
self.opts.mode = mode;
}
pub fn buffered_len(&self) -> usize {
self.seq_buf.len()
}
pub fn is_pasting(&self) -> bool {
self.paste_mode
}
pub fn poll(&mut self) -> Result<Vec<StdinBufferEvent>> {
let mut events = Vec::new();
match self.reader.read(&mut self.read_buf) {
Ok(0) | Err(_) => {
events.append(&mut self.flush_inner());
}
Ok(n) => {
let data = self.read_buf[..n].to_vec();
self.process_bytes(&data, &mut events);
}
}
Ok(events)
}
pub fn drain(&mut self) -> Result<Vec<StdinBufferEvent>> {
let mut all_events = Vec::new();
loop {
match self.reader.read(&mut self.read_buf) {
Ok(0) => break,
Ok(n) => {
let data = self.read_buf[..n].to_vec();
self.process_bytes(&data, &mut all_events);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),
}
}
all_events.append(&mut self.flush_inner());
Ok(all_events)
}
pub fn read_into_buffer(&mut self, max_bytes: usize) -> Result<Vec<StdinBufferEvent>> {
let mut tmp = vec![0u8; max_bytes];
let mut events = Vec::new();
match self.reader.read(&mut tmp) {
Ok(0) | Err(_) => {}
Ok(n) => {
self.process_bytes(&tmp[..n], &mut events);
}
}
Ok(events)
}
fn process_bytes(&mut self, data: &[u8], events: &mut Vec<StdinBufferEvent>) {
if data.is_empty() && self.seq_buf.is_empty() {
return;
}
self.seq_buf.extend_from_slice(data);
if self.paste_mode {
self.paste_buf.extend_from_slice(&self.seq_buf);
self.seq_buf.clear();
if let Some(idx) = find_subsequence(&self.paste_buf, BRACKETED_PASTE_END) {
let pasted = self.paste_buf[..idx].to_vec();
let remaining = self.paste_buf[idx + BRACKETED_PASTE_END.len()..].to_vec();
self.paste_mode = false;
self.paste_buf.clear();
self.pending_kitty_cp = None;
events.push(StdinBufferEvent::Paste(pasted));
if !remaining.is_empty() {
self.process_bytes(&remaining, events);
}
}
return;
}
if let Some(idx) = find_subsequence(&self.seq_buf, BRACKETED_PASTE_START) {
if idx > 0 {
let (seqs, _rem) = extract_complete_sequences(&self.seq_buf[..idx]);
for seq in seqs {
self.emit_data_sequence(&seq, events);
}
}
self.pending_kitty_cp = None;
let after_start = idx + BRACKETED_PASTE_START.len();
let remaining = self.seq_buf[after_start..].to_vec();
self.seq_buf.clear();
self.paste_mode = true;
if !remaining.is_empty() {
self.paste_buf = remaining;
if let Some(end_idx) = find_subsequence(&self.paste_buf, BRACKETED_PASTE_END) {
let pasted = self.paste_buf[..end_idx].to_vec();
let after = self.paste_buf[end_idx + BRACKETED_PASTE_END.len()..].to_vec();
self.paste_mode = false;
self.paste_buf.clear();
self.pending_kitty_cp = None;
events.push(StdinBufferEvent::Paste(pasted));
if !after.is_empty() {
self.process_bytes(&after, events);
}
}
}
return;
}
let (seqs, remainder) = extract_complete_sequences(&self.seq_buf);
self.seq_buf = remainder;
for seq in seqs {
self.emit_data_sequence(&seq, events);
}
}
fn emit_data_sequence(&mut self, seq: &[u8], events: &mut Vec<StdinBufferEvent>) {
if seq.len() == 1 {
if Some(seq[0] as u32) == self.pending_kitty_cp {
self.pending_kitty_cp = None;
return;
}
}
self.pending_kitty_cp = parse_kitty_printable_codepoint(seq);
match self.opts.mode {
BufferingMode::Character => {
events.push(StdinBufferEvent::Data(seq.to_vec()));
}
BufferingMode::Line => {
if seq.contains(&b'\n') {
let mut combined = std::mem::take(&mut self.line_buf);
combined.extend_from_slice(seq);
events.push(StdinBufferEvent::Data(combined));
} else {
self.line_buf.extend_from_slice(seq);
}
}
}
}
pub fn flush(&mut self) -> Vec<StdinBufferEvent> {
self.flush_inner()
}
fn flush_inner(&mut self) -> Vec<StdinBufferEvent> {
let mut events = Vec::new();
if self.paste_mode && !self.paste_buf.is_empty() {
let pasted = std::mem::take(&mut self.paste_buf);
self.paste_mode = false;
self.pending_kitty_cp = None;
events.push(StdinBufferEvent::Paste(pasted));
}
if !self.seq_buf.is_empty() {
let data = std::mem::take(&mut self.seq_buf);
self.emit_data_sequence(&data, &mut events);
}
if !self.line_buf.is_empty() {
let line = std::mem::take(&mut self.line_buf);
events.push(StdinBufferEvent::Data(line));
}
self.pending_kitty_cp = None;
events
}
pub fn clear(&mut self) {
self.seq_buf.clear();
self.paste_mode = false;
self.paste_buf.clear();
self.line_buf.clear();
self.pending_kitty_cp = None;
}
}
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
if needle.is_empty() {
return Some(0);
}
haystack
.windows(needle.len())
.position(|w| w == needle)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
fn buffer_from(data: &[u8], mode: BufferingMode) -> StdinBuffer<Cursor<Vec<u8>>> {
let cursor = Cursor::new(data.to_vec());
let mut opts = StdinBufferOptions::default();
opts.mode = mode;
StdinBuffer::new(cursor, opts)
}
#[test]
fn test_plain_characters_emitted_immediately() {
let mut buf = buffer_from(b"abc", BufferingMode::Character);
let events = buf.drain().unwrap();
let data_events: Vec<&Vec<u8>> = events
.iter()
.filter_map(|e| match e {
StdinBufferEvent::Data(d) => Some(d),
_ => None,
})
.collect();
assert_eq!(data_events.len(), 3);
assert_eq!(data_events[0], &vec![b'a']);
assert_eq!(data_events[1], &vec![b'b']);
assert_eq!(data_events[2], &vec![b'c']);
}
#[test]
fn test_complete_csi_sequence() {
let seq = b"\x1b[A";
let mut buf = buffer_from(seq, BufferingMode::Character);
let events = buf.drain().unwrap();
let data_events: Vec<&Vec<u8>> = events
.iter()
.filter_map(|e| match e {
StdinBufferEvent::Data(d) => Some(d),
_ => None,
})
.collect();
assert_eq!(data_events.len(), 1);
assert_eq!(data_events[0], &vec![0x1b, b'[', b'A']);
}
#[test]
fn test_partial_escape_sequence_across_reads() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"\x1b", &mut events);
assert!(events.is_empty(), "should wait for more data");
buf.process_bytes(b"[A", &mut events);
assert_eq!(events.len(), 1);
match &events[0] {
StdinBufferEvent::Data(d) => assert_eq!(d, &vec![0x1b, b'[', b'A']),
_ => panic!("expected Data event"),
}
}
#[test]
fn test_bracketed_paste() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
let input = b"\x1b[200~hello world\x1b[201~";
buf.process_bytes(input, &mut events);
assert_eq!(events.len(), 1);
match &events[0] {
StdinBufferEvent::Paste(content) => {
assert_eq!(content, b"hello world");
}
_ => panic!("expected Paste event"),
}
}
#[test]
fn test_line_buffering_mode() {
let cursor = Cursor::new(b"".to_vec());
let mut opts = StdinBufferOptions::default();
opts.mode = BufferingMode::Line;
let mut buf = StdinBuffer::new(cursor, opts);
let mut events = Vec::new();
buf.process_bytes(b"hello ", &mut events);
assert!(events.is_empty(), "line mode should buffer until newline");
buf.process_bytes(b"world\n", &mut events);
assert_eq!(events.len(), 1);
match &events[0] {
StdinBufferEvent::Data(d) => {
assert_eq!(&String::from_utf8_lossy(d), "hello world\n");
}
_ => panic!("expected Data event"),
}
}
#[test]
fn test_sgr_mouse_sequence() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"\x1b[<35;20;5m", &mut events);
assert_eq!(events.len(), 1);
match &events[0] {
StdinBufferEvent::Data(d) => {
assert_eq!(&String::from_utf8_lossy(d), "\x1b[<35;20;5m");
}
_ => panic!("expected Data event"),
}
}
#[test]
fn test_flush_incomplete_sequence() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"\x1b[", &mut events);
assert!(events.is_empty(), "incomplete CSI should not emit");
let flushed = buf.flush();
assert_eq!(flushed.len(), 1);
match &flushed[0] {
StdinBufferEvent::Data(d) => {
assert_eq!(d, &vec![0x1b, b'[']);
}
_ => panic!("expected Data from flush"),
}
}
#[test]
fn test_clear_discards_buffered_data() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"\x1b[", &mut events);
assert!(events.is_empty());
assert!(buf.buffered_len() > 0);
buf.clear();
assert_eq!(buf.buffered_len(), 0);
let flushed = buf.flush();
assert!(flushed.is_empty());
}
#[test]
fn test_mixed_text_and_escapes() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"a\x1b[Ab", &mut events);
let data_events: Vec<Vec<u8>> = events
.into_iter()
.filter_map(|e| match e {
StdinBufferEvent::Data(d) => Some(d),
_ => None,
})
.collect();
assert_eq!(data_events.len(), 3);
assert_eq!(data_events[0], vec![b'a']);
assert_eq!(data_events[1], vec![0x1b, b'[', b'A']);
assert_eq!(data_events[2], vec![b'b']);
}
#[test]
fn test_kitty_deduplication() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"\x1b[97ua", &mut events);
let data_events: Vec<Vec<u8>> = events
.into_iter()
.filter_map(|e| match e {
StdinBufferEvent::Data(d) => Some(d),
_ => None,
})
.collect();
assert_eq!(data_events.len(), 1, "raw char should be suppressed");
assert_eq!(data_events[0], vec![0x1b, b'[', b'9', b'7', b'u']);
}
#[test]
fn test_drain_reads_all() {
let data = b"hello\x1b[Aworld";
let cursor = Cursor::new(data.to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let events = buf.drain().unwrap();
let data_events: Vec<Vec<u8>> = events
.into_iter()
.filter_map(|e| match e {
StdinBufferEvent::Data(d) => Some(d),
_ => None,
})
.collect();
assert_eq!(data_events.len(), 11);
}
#[test]
fn test_ss3_sequence() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"\x1bOP", &mut events);
assert_eq!(events.len(), 1);
match &events[0] {
StdinBufferEvent::Data(d) => {
assert_eq!(d, &vec![0x1b, b'O', b'P']);
}
_ => panic!("expected Data event"),
}
}
#[test]
fn test_osc_bel_terminated() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"\x1b]0;title\x07", &mut events);
assert_eq!(events.len(), 1);
match &events[0] {
StdinBufferEvent::Data(d) => {
assert_eq!(&String::from_utf8_lossy(d), "\x1b]0;title\x07");
}
_ => panic!("expected Data event"),
}
}
#[test]
fn test_is_complete_sequence_helpers() {
assert_eq!(is_complete_sequence(b"a"), SequenceStatus::NotEscape);
assert_eq!(is_complete_sequence(b"\x1b"), SequenceStatus::Incomplete);
assert_eq!(
is_complete_sequence(b"\x1b[A"),
SequenceStatus::Complete
);
assert_eq!(
is_complete_sequence(b"\x1b["),
SequenceStatus::Incomplete
);
assert_eq!(
is_complete_sequence(b"\x1bOP"),
SequenceStatus::Complete
);
assert_eq!(
is_complete_sequence(b"\x1ba"),
SequenceStatus::Complete
);
}
#[test]
fn test_find_subsequence() {
assert_eq!(find_subsequence(b"hello world", b"world"), Some(6));
assert_eq!(find_subsequence(b"hello", b"xyz"), None);
assert_eq!(find_subsequence(b"abc", b"abc"), Some(0));
assert_eq!(find_subsequence(b"", b"abc"), None);
assert_eq!(find_subsequence(b"abc", b""), Some(0));
}
#[test]
fn test_extract_complete_sequences() {
let (seqs, rem) = extract_complete_sequences(b"a\x1b[Ab");
assert_eq!(seqs.len(), 3);
assert_eq!(seqs[0], vec![b'a']);
assert_eq!(seqs[1], vec![0x1b, b'[', b'A']);
assert_eq!(seqs[2], vec![b'b']);
assert!(rem.is_empty());
let (seqs, rem) = extract_complete_sequences(b"a\x1b[");
assert_eq!(seqs.len(), 1);
assert_eq!(seqs[0], vec![b'a']);
assert_eq!(rem, vec![0x1b, b'[']);
}
#[test]
fn test_parse_kitty_printable_codepoint() {
assert_eq!(parse_kitty_printable_codepoint(b"\x1b[97u"), Some(97)); assert_eq!(parse_kitty_printable_codepoint(b"\x1b[97:1u"), Some(97));
assert_eq!(parse_kitty_printable_codepoint(b"\x1b[10u"), None); assert_eq!(parse_kitty_printable_codepoint(b"\x1b[A"), None); assert_eq!(parse_kitty_printable_codepoint(b"\x1b[65;1u"), Some(65)); }
#[test]
fn test_old_style_mouse_sequence() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let mut events = Vec::new();
buf.process_bytes(b"\x1b[M \x20\x20", &mut events);
assert_eq!(events.len(), 1);
assert_eq!(events[0], StdinBufferEvent::Data(vec![0x1b, b'[', b'M', b' ', 0x20, 0x20]));
}
#[test]
fn test_read_into_buffer() {
let data = b"test data";
let cursor = Cursor::new(data.to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
let events = buf.read_into_buffer(1024).unwrap();
let combined: Vec<u8> = events
.into_iter()
.filter_map(|e| match e {
StdinBufferEvent::Data(d) => Some(d),
_ => None,
})
.flatten()
.collect();
assert_eq!(combined, b"test data");
}
#[test]
fn test_set_mode_switching() {
let cursor = Cursor::new(b"".to_vec());
let mut buf = StdinBuffer::new(cursor, StdinBufferOptions::default());
assert_eq!(buf.mode(), BufferingMode::Character);
buf.set_mode(BufferingMode::Line);
assert_eq!(buf.mode(), BufferingMode::Line);
buf.set_mode(BufferingMode::Character);
assert_eq!(buf.mode(), BufferingMode::Character);
}
}