use std::fs::File;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
use tokio::sync::RwLock;
use tokio::time::sleep;
use crate::pty::socket_async::DEFAULT_BUFFER_SIZE;
use crate::pty_buffer::PtyBuffer;
#[allow(dead_code)]
pub struct AsyncPtyIoHandler {
master_fd: RawFd,
buffer_size: usize,
}
#[allow(dead_code)]
impl AsyncPtyIoHandler {
pub fn new(master_fd: RawFd) -> Self {
Self {
master_fd,
buffer_size: DEFAULT_BUFFER_SIZE, }
}
pub async fn read_from_pty(&self, buffer: &mut [u8]) -> tokio::io::Result<usize> {
let master_file = unsafe { File::from_raw_fd(self.master_fd) };
let master_fd_clone = master_file.as_raw_fd();
std::mem::forget(master_file);
let mut async_file =
tokio::fs::File::from_std(unsafe { std::fs::File::from_raw_fd(master_fd_clone) });
let result = async_file.read(buffer).await;
std::mem::forget(async_file); result
}
pub async fn write_to_pty(&self, data: &[u8]) -> tokio::io::Result<()> {
let master_file = unsafe { File::from_raw_fd(self.master_fd) };
let master_fd_clone = master_file.as_raw_fd();
std::mem::forget(master_file);
let mut async_file =
tokio::fs::File::from_std(unsafe { std::fs::File::from_raw_fd(master_fd_clone) });
let result = async_file.write_all(data).await;
std::mem::forget(async_file); result
}
pub async fn send_control_char(&self, ch: u8) -> tokio::io::Result<()> {
self.write_to_pty(&[ch]).await
}
pub async fn send_refresh(&self) -> tokio::io::Result<()> {
self.send_control_char(0x0c).await }
}
#[allow(dead_code)]
pub struct AsyncScrollbackHandler {
buffer: Arc<RwLock<Vec<u8>>>,
max_size: usize,
}
#[allow(dead_code)]
impl AsyncScrollbackHandler {
pub fn new(max_size: usize) -> Self {
Self {
buffer: Arc::new(RwLock::new(Vec::with_capacity(max_size / 4))),
max_size,
}
}
pub async fn add_data(&self, data: &[u8]) {
let mut buffer = self.buffer.write().await;
buffer.extend_from_slice(data);
if buffer.len() > self.max_size {
let remove = buffer.len() - self.max_size;
buffer.drain(..remove);
}
}
pub async fn get_buffer(&self) -> Vec<u8> {
self.buffer.read().await.clone()
}
pub fn get_shared_buffer(&self) -> Arc<RwLock<Vec<u8>>> {
Arc::clone(&self.buffer)
}
}
#[allow(dead_code)]
pub async fn socket_to_stdout_task(
mut socket: UnixStream,
running: Arc<AtomicBool>,
scrollback: Arc<RwLock<Vec<u8>>>,
) -> tokio::io::Result<()> {
let mut buffer = vec![0u8; DEFAULT_BUFFER_SIZE];
let mut stdout = tokio::io::stdout();
while running.load(Ordering::SeqCst) {
tokio::select! {
result = socket.read(&mut buffer) => {
match result {
Ok(0) => break, Ok(n) => {
if stdout.write_all(&buffer[..n]).await.is_err() {
break;
}
stdout.flush().await?;
let mut scrollback_guard = scrollback.write().await;
scrollback_guard.extend_from_slice(&buffer[..n]);
let scrollback_max = 10 * 1024 * 1024; if scrollback_guard.len() > scrollback_max {
let remove = scrollback_guard.len() - scrollback_max;
scrollback_guard.drain(..remove);
}
}
Err(e) if e.kind() == tokio::io::ErrorKind::WouldBlock => {
sleep(Duration::from_millis(10)).await;
}
Err(e) if e.kind() == tokio::io::ErrorKind::BrokenPipe => {
break;
}
Err(_) => break,
}
}
_ = sleep(Duration::from_millis(100)) => {
if !running.load(Ordering::SeqCst) {
break;
}
}
}
}
Ok(())
}
#[allow(dead_code)]
pub async fn resize_monitor_task(
mut socket: UnixStream,
running: Arc<AtomicBool>,
initial_size: (u16, u16),
) -> tokio::io::Result<()> {
use crate::pty::socket_async::send_resize_command_async;
use crossterm::terminal;
let mut last_size = initial_size;
while running.load(Ordering::SeqCst) {
if let Ok((new_cols, new_rows)) = terminal::size() {
if (new_cols, new_rows) != last_size {
send_resize_command_async(&mut socket, new_cols, new_rows).await?;
last_size = (new_cols, new_rows);
}
}
sleep(Duration::from_millis(250)).await;
}
Ok(())
}
#[allow(dead_code)]
pub async fn send_buffered_output_async(
stream: &mut UnixStream,
output_buffer: &PtyBuffer,
io_handler: &AsyncPtyIoHandler,
) -> tokio::io::Result<()> {
if !output_buffer.is_empty() {
let mut buffered_data = Vec::new();
output_buffer.drain_to(&mut buffered_data);
let init_sequence = b"\x1b7\x1b[?47h\x1b[2J\x1b[H"; stream.write_all(init_sequence).await?;
stream.flush().await?;
for chunk in buffered_data.chunks(DEFAULT_BUFFER_SIZE) {
stream.write_all(chunk).await?;
stream.flush().await?;
sleep(Duration::from_millis(1)).await;
}
let restore_sequence = b"\x1b[?47l\x1b8"; stream.write_all(restore_sequence).await?;
stream.flush().await?;
sleep(Duration::from_millis(50)).await;
io_handler.send_refresh().await?;
sleep(Duration::from_millis(100)).await;
} else {
io_handler.send_refresh().await?;
}
Ok(())
}
#[allow(dead_code)]
pub struct AsyncSessionManager {
sessions: Arc<RwLock<std::collections::HashMap<String, SessionData>>>,
}
#[derive(Clone)]
#[allow(dead_code)]
pub struct SessionData {
pub id: String,
pub master_fd: RawFd,
pub pid: i32,
pub socket_path: std::path::PathBuf,
pub created_at: std::time::SystemTime,
}
#[allow(dead_code)]
impl AsyncSessionManager {
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
}
}
pub async fn add_session(&self, id: String, data: SessionData) {
let mut sessions = self.sessions.write().await;
sessions.insert(id, data);
}
pub async fn remove_session(&self, id: &str) -> Option<SessionData> {
let mut sessions = self.sessions.write().await;
sessions.remove(id)
}
pub async fn get_session(&self, id: &str) -> Option<SessionData> {
let sessions = self.sessions.read().await;
sessions.get(id).cloned()
}
pub async fn list_sessions(&self) -> Vec<SessionData> {
let sessions = self.sessions.read().await;
sessions.values().cloned().collect()
}
pub async fn session_count(&self) -> usize {
let sessions = self.sessions.read().await;
sessions.len()
}
}