use std::io::{self, Read, Write};
use std::os::unix::io::RawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use crate::pty_buffer::PtyBuffer;
pub const DEFAULT_BUFFER_SIZE: usize = 16384; #[allow(dead_code)]
pub const SMALL_BUFFER_SIZE: usize = 4096;
pub struct PtyIoHandler {
master_fd: RawFd,
#[allow(dead_code)]
buffer_size: usize,
}
impl PtyIoHandler {
pub fn new(master_fd: RawFd) -> Self {
Self {
master_fd,
buffer_size: DEFAULT_BUFFER_SIZE, }
}
pub fn read_from_pty(&self, buffer: &mut [u8]) -> io::Result<usize> {
unsafe {
let result = libc::read(
self.master_fd,
buffer.as_mut_ptr() as *mut libc::c_void,
buffer.len(),
);
if result < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted
{
return Err(err);
}
return Err(err);
}
Ok(result as usize)
}
}
pub fn write_to_pty(&self, data: &[u8]) -> io::Result<()> {
let mut written = 0;
while written < data.len() {
unsafe {
let result = libc::write(
self.master_fd,
data[written..].as_ptr() as *const libc::c_void,
data.len() - written,
);
if result < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(err);
}
written += result as usize;
}
}
Ok(())
}
pub fn send_control_char(&self, ch: u8) -> io::Result<()> {
self.write_to_pty(&[ch])
}
pub fn send_refresh(&self) -> io::Result<()> {
self.send_control_char(0x0c) }
}
pub struct ScrollbackHandler {
buffer: Arc<Mutex<Vec<u8>>>,
#[allow(dead_code)]
max_size: usize,
}
impl ScrollbackHandler {
pub fn new(max_size: usize) -> Self {
Self {
buffer: Arc::new(Mutex::new(Vec::new())),
max_size,
}
}
#[allow(dead_code)]
pub fn add_data(&self, data: &[u8]) {
let mut buffer = self.buffer.lock().unwrap();
buffer.extend_from_slice(data);
if buffer.len() > self.max_size {
let remove = buffer.len() - self.max_size;
buffer.drain(..remove);
}
}
pub fn get_buffer(&self) -> Vec<u8> {
self.buffer.lock().unwrap().clone()
}
pub fn get_shared_buffer(&self) -> Arc<Mutex<Vec<u8>>> {
Arc::clone(&self.buffer)
}
}
pub fn spawn_socket_to_stdout_thread(
mut socket: std::os::unix::net::UnixStream,
running: Arc<AtomicBool>,
scrollback: Arc<Mutex<Vec<u8>>>,
paused: Arc<AtomicBool>,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
let mut stdout = io::stdout();
let mut buffer = [0u8; DEFAULT_BUFFER_SIZE]; let mut held_buffer = Vec::new();
while running.load(Ordering::SeqCst) {
if paused.load(Ordering::SeqCst) {
match socket.read(&mut buffer) {
Ok(0) => break, Ok(n) => {
held_buffer.extend_from_slice(&buffer[..n]);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(_) => break,
}
continue;
}
if !held_buffer.is_empty() {
if stdout.write_all(&held_buffer).is_err() {
break;
}
let _ = stdout.flush();
let mut scrollback = scrollback.lock().unwrap();
scrollback.extend_from_slice(&held_buffer);
held_buffer.clear();
}
match socket.read(&mut buffer) {
Ok(0) => break, Ok(n) => {
if !paused.load(Ordering::SeqCst) {
if stdout.write_all(&buffer[..n]).is_err() {
break;
}
let _ = stdout.flush();
} else {
held_buffer.extend_from_slice(&buffer[..n]);
}
let mut scrollback = scrollback.lock().unwrap();
scrollback.extend_from_slice(&buffer[..n]);
let scrollback_max = 10 * 1024 * 1024; if scrollback.len() > scrollback_max {
let remove = scrollback.len() - scrollback_max;
scrollback.drain(..remove);
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => {
break;
}
Err(_) => break,
}
}
})
}
pub fn spawn_resize_monitor_thread(
mut socket: std::os::unix::net::UnixStream,
running: Arc<AtomicBool>,
initial_size: (u16, u16),
) -> thread::JoinHandle<()> {
use crate::pty::socket::send_resize_command;
use crate::pty::terminal::get_terminal_size;
thread::spawn(move || {
let mut last_size = initial_size;
while running.load(Ordering::SeqCst) {
if let Ok((new_cols, new_rows)) = get_terminal_size() {
if (new_cols, new_rows) != last_size {
let _ = send_resize_command(&mut socket, new_cols, new_rows);
last_size = (new_cols, new_rows);
}
}
thread::sleep(Duration::from_millis(250));
}
})
}
#[allow(dead_code)]
pub fn send_buffered_output(
stream: &mut std::os::unix::net::UnixStream,
output_buffer: &PtyBuffer,
io_handler: &PtyIoHandler,
) -> io::Result<()> {
if !output_buffer.is_empty() {
let mut buffered_data = Vec::new();
output_buffer.drain_to(&mut buffered_data);
for chunk in buffered_data.chunks(DEFAULT_BUFFER_SIZE) {
stream.write_all(chunk)?;
stream.flush()?;
thread::sleep(Duration::from_millis(1));
}
io_handler.send_refresh()?;
thread::sleep(Duration::from_millis(50));
} else {
io_handler.send_refresh()?;
}
Ok(())
}