use bytes::Bytes;
use futures::StreamExt;
use tracing::{debug, info, instrument, trace};
use crate::errors::{Error, Result};
use crate::session::{Session, SessionConfig, SessionState};
use crate::terminal::{ControlSignal, Terminal, TerminalConfig, TerminalInput};
use crate::SessionManager;
#[derive(Debug, Clone)]
pub struct InteractiveConfig {
pub terminal: TerminalConfig,
pub send_initial_size: bool,
pub show_banner: bool,
pub forward_signals: bool,
}
impl Default for InteractiveConfig {
fn default() -> Self {
Self {
terminal: TerminalConfig::default(),
send_initial_size: true,
show_banner: true,
forward_signals: true,
}
}
}
pub struct InteractiveShell {
config: InteractiveConfig,
terminal: Terminal,
session: Option<Session>,
session_manager: Option<SessionManager>,
}
impl InteractiveShell {
pub fn new(config: InteractiveConfig) -> Result<Self> {
let terminal = Terminal::new(config.terminal.clone())?;
Ok(Self {
config,
terminal,
session: None,
session_manager: None,
})
}
#[instrument(skip(self))]
pub async fn connect(&mut self, target: &str) -> Result<()> {
info!(target = %target, "Connecting to instance");
let manager = SessionManager::new().await?;
let session_config = SessionConfig {
target: target.to_string(),
..Default::default()
};
let session = manager.start_session(session_config).await?;
if self.config.show_banner {
println!(
"\n\x1b[32mStarting session with SessionId: {}\x1b[0m\n",
session.id()
);
}
self.session = Some(session);
self.session_manager = Some(manager);
if self.config.send_initial_size {
self.send_terminal_size().await?;
}
Ok(())
}
async fn send_size_to_session(session: &Session, terminal: &Terminal) -> Result<()> {
let size = terminal.size();
session.send_size(size).await
}
async fn send_terminal_size(&self) -> Result<()> {
if let Some(ref session) = self.session {
Self::send_size_to_session(session, &self.terminal).await?;
}
Ok(())
}
#[instrument(skip(self))]
pub async fn run(&mut self) -> Result<()> {
let mut session = self
.session
.take()
.ok_or_else(|| Error::Config("Not connected".to_string()))?;
let _raw_guard = self.terminal.enable_raw_mode()?;
let mut input_rx = self.terminal.start_input_reader();
let mut output = session.output();
let result = loop {
tokio::select! {
input = input_rx.recv() => {
match input {
Some(TerminalInput::Data(data)) => {
trace!(len = data.len(), "Sending input data");
if let Err(e) = session.send(data).await {
break Err(e);
}
}
Some(TerminalInput::Signal(signal)) => {
if self.config.forward_signals {
let byte = signal.as_byte();
debug!(?signal, byte, "Forwarding control signal");
if let Err(e) = session.send(Bytes::from(vec![byte])).await {
break Err(e);
}
}
if matches!(signal, ControlSignal::EndOfFile) {
info!("EOF received, terminating session");
break Ok(());
}
}
Some(TerminalInput::Resize(size)) => {
debug!(cols = size.cols, rows = size.rows, "Terminal resized");
if let Err(e) = Self::send_size_to_session(&session, &self.terminal).await {
break Err(e);
}
}
Some(TerminalInput::Eof) | None => {
info!("Terminal input closed");
break Ok(());
}
}
}
output_data = output.next() => {
match output_data {
Some(data) => {
trace!(len = data.len(), "Received output data");
if let Err(e) = Terminal::write_output(&data) {
break Err(Error::Io(e));
}
}
None => {
info!("Session output closed");
break Ok(());
}
}
}
}
};
self.terminal.stop();
session.terminate().await?;
if self.config.show_banner {
println!("\n\x1b[33mSession terminated.\x1b[0m\n");
}
result
}
pub fn session(&self) -> Option<&Session> {
self.session.as_ref()
}
pub async fn is_connected(&self) -> bool {
if let Some(ref session) = self.session {
session.state().await == SessionState::Connected
} else {
false
}
}
}
impl Drop for InteractiveShell {
fn drop(&mut self) {
self.terminal.stop();
}
}
pub async fn run_shell(target: &str) -> Result<()> {
let config = InteractiveConfig::default();
let mut shell = InteractiveShell::new(config)?;
shell.connect(target).await?;
shell.run().await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_interactive_config_default() {
let config = InteractiveConfig::default();
assert!(config.send_initial_size);
assert!(config.show_banner);
assert!(config.forward_signals);
}
#[test]
fn test_interactive_shell_creation() {
let config = InteractiveConfig::default();
let shell = InteractiveShell::new(config);
assert!(shell.is_ok());
}
#[tokio::test]
async fn test_not_connected_initially() {
let config = InteractiveConfig::default();
let shell = InteractiveShell::new(config).unwrap();
assert!(!shell.is_connected().await);
}
}