mod callbacks;
#[cfg(test)]
mod tests;
use std::{
pin::Pin,
sync::mpsc::{Receiver, Sender},
time::Duration,
};
use bollard::container::LogOutput;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tui_term::vt100::{Parser, Screen};
use crate::pty::callbacks::TerminalCallback;
type Output = Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>;
type Input = Pin<Box<dyn AsyncWrite + Send>>;
const SCROLLBACK_WINDOW: usize = 5000;
pub struct Pty {
stdin: Sender<Bytes>,
stdout: Receiver<Bytes>,
stderr: Receiver<Bytes>,
container_session_id: String,
parser: Parser<TerminalCallback>,
}
impl Pty {
pub fn new(
container_session_id: String,
mut output: Output,
mut input: Input,
) -> Self {
let (stdout_in, stdout) = std::sync::mpsc::channel();
let (stderr_in, stderr) = std::sync::mpsc::channel();
let _jh = tokio::spawn(async move {
while let Some(Ok(msg)) = output.next().await {
match msg {
LogOutput::StdOut { message }
| LogOutput::StdIn { message }
| LogOutput::Console { message } => {
drop(crate::debugger::Debugger::log_pty_stdout(&message));
stdout_in.send(message)?;
},
LogOutput::StdErr { message } => {
drop(crate::debugger::Debugger::log_pty_stdout(&message));
stderr_in.send(message)?;
},
}
}
color_eyre::eyre::Ok(())
});
let (stdin, stdin_out) = std::sync::mpsc::channel::<Bytes>();
let _jh = tokio::spawn(async move {
while let Ok(bytes) = stdin_out.recv() {
drop(crate::debugger::Debugger::log_pty_stdin(&bytes));
input.write_all(&bytes).await?;
input.flush().await?;
}
color_eyre::eyre::Ok(())
});
let parser =
Parser::new_with_callbacks(24, 80, SCROLLBACK_WINDOW, TerminalCallback(stdin.clone()));
Self {
stdin,
stdout,
stderr,
container_session_id,
parser,
}
}
pub fn scroll_up(&mut self) {
let cur_scrollback = self.parser.screen().scrollback();
self.parser
.screen_mut()
.set_scrollback(cur_scrollback.saturating_add(1));
}
pub fn scroll_down(&mut self) {
let cur_scrollback = self.parser.screen().scrollback();
self.parser
.screen_mut()
.set_scrollback(cur_scrollback.saturating_sub(1));
}
pub fn scroll_to_bottom(&mut self) {
self.parser.screen_mut().set_scrollback(0);
}
pub fn container_session_id(&self) -> &str {
&self.container_session_id
}
pub fn screen(&self) -> &Screen {
self.parser.screen()
}
pub fn size(&self) -> (u16, u16) {
self.parser.screen().size()
}
pub fn set_size(
&mut self,
height: u16,
width: u16,
) {
self.parser.screen_mut().set_size(height, width);
}
pub fn process_stdout_and_stderr(
&mut self,
timeout: Duration,
) {
if let Ok(bytes) = self.stdout.recv_timeout(timeout) {
self.parser.process(&bytes);
}
if let Ok(bytes) = self.stderr.recv_timeout(timeout) {
self.parser.process(&bytes);
}
}
pub fn process_stdin(
&self,
bytes: &[u8],
) {
drop(self.stdin.send(Bytes::copy_from_slice(bytes)));
}
}