use anyhow::Result;
use rustyline::config::Configurer;
use rustyline::error::ReadlineError;
use rustyline::DefaultEditor;
use std::io::{self, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::time::Duration;
use super::super::interactive_signal::is_interrupted;
use super::types::{InteractiveCommand, NodeSession, SSH_OUTPUT_POLL_INTERVAL_MS};
impl InteractiveCommand {
pub(super) async fn run_single_node_mode(&self, session: NodeSession) -> Result<usize> {
let mut commands_executed = 0;
let history_path = self.expand_path(&self.history_file)?;
let mut rl = DefaultEditor::new()?;
rl.set_max_history_size(1000)?;
if history_path.exists() {
let _ = rl.load_history(&history_path);
}
let session_arc = Arc::new(Mutex::new(session));
let session_clone = Arc::clone(&session_arc);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shutdown);
const SSH_OUTPUT_CHANNEL_SIZE: usize = 128;
let (output_tx, mut output_rx) = mpsc::channel::<String>(SSH_OUTPUT_CHANNEL_SIZE);
let output_reader = tokio::spawn(async move {
let mut shutdown_watch = {
let shutdown_clone_for_watch = Arc::clone(&shutdown_clone);
tokio::spawn(async move {
loop {
if shutdown_clone_for_watch.load(Ordering::Relaxed) || is_interrupted() {
break;
}
const SHUTDOWN_POLL_INTERVAL_MS: u64 = 50;
tokio::time::sleep(Duration::from_millis(SHUTDOWN_POLL_INTERVAL_MS)).await;
}
})
};
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(SSH_OUTPUT_POLL_INTERVAL_MS)) => {
let mut session_guard = session_clone.lock().await;
if !session_guard.is_connected {
break;
}
if let Ok(Some(output)) = session_guard.read_output().await {
if output_tx.try_send(output).is_err() {
break;
}
}
drop(session_guard);
}
_ = &mut shutdown_watch => {
break;
}
}
}
});
println!("Interactive session started. Type 'exit' or press Ctrl+D to quit.");
println!();
loop {
if is_interrupted() {
println!("\nInterrupted by user. Exiting...");
shutdown.store(true, Ordering::Relaxed);
break;
}
while let Ok(output) = output_rx.try_recv() {
print!("{output}");
io::stdout().flush()?;
}
let session_guard = session_arc.lock().await;
let prompt = self.format_prompt(&session_guard.node, &session_guard.working_dir);
let is_connected = session_guard.is_connected;
drop(session_guard);
if !is_connected {
eprintln!("Connection lost. Exiting.");
break;
}
tokio::select! {
output = output_rx.recv() => {
match output {
Some(output) => {
print!("{output}");
io::stdout().flush()?;
continue; }
None => {
eprintln!("Session output channel closed. Exiting.");
break;
}
}
}
_ = tokio::time::sleep(Duration::from_millis(SSH_OUTPUT_POLL_INTERVAL_MS)) => {
match rl.readline(&prompt) {
Ok(line) => {
if line.trim() == "exit" {
let mut session_guard = session_arc.lock().await;
session_guard.send_command("exit").await?;
drop(session_guard);
const SSH_EXIT_DELAY_MS: u64 = 100;
tokio::time::sleep(Duration::from_millis(SSH_EXIT_DELAY_MS)).await;
break;
}
rl.add_history_entry(&line)?;
let mut session_guard = session_arc.lock().await;
session_guard.send_command(&line).await?;
commands_executed += 1;
if line.trim().starts_with("cd ") {
session_guard.send_command("pwd").await?;
}
}
Err(ReadlineError::Interrupted) => {
println!("^C");
}
Err(ReadlineError::Eof) => {
println!("^D");
break;
}
Err(err) => {
eprintln!("Error: {err}");
break;
}
}
}
}
}
shutdown.store(true, Ordering::Relaxed);
output_reader.abort();
let mut session_guard = session_arc.lock().await;
if session_guard.is_connected {
let _ = session_guard.channel.close().await;
session_guard.is_connected = false;
}
drop(session_guard);
let _ = rl.save_history(&history_path);
Ok(commands_executed)
}
}