use std::io;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use dashmap::DashMap;
use tokio::process::Child;
use tokio::sync::Mutex;
use crate::session::wrapped_child::WrappedChild;
#[derive(Debug)]
pub enum ChildHandle {
Unwrapped {
child: Arc<Mutex<Child>>,
},
Wrapped {
child: Arc<Mutex<WrappedChild>>,
},
}
impl Clone for ChildHandle {
fn clone(&self) -> Self {
match self {
Self::Unwrapped { child } => Self::Unwrapped {
child: Arc::clone(child),
},
Self::Wrapped { child } => Self::Wrapped {
child: Arc::clone(child),
},
}
}
}
impl ChildHandle {
pub fn stdout(&self) -> Option<&tokio::process::ChildStdout> {
None }
pub fn stderr(&self) -> Option<&tokio::process::ChildStderr> {
None }
pub async fn kill(&mut self) -> io::Result<()> {
match self {
Self::Unwrapped { child } => {
let mut guard = child.lock().await;
guard.kill().await
}
Self::Wrapped { child } => {
let mut guard = child.lock().await;
guard.kill().await
}
}
}
pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
match self {
Self::Unwrapped { child } => {
let mut guard = child.lock().await;
guard.wait().await
}
Self::Wrapped { child } => {
let mut guard = child.lock().await;
guard.wait().await
}
}
}
pub fn try_wait(&mut self) -> io::Result<Option<std::process::ExitStatus>> {
match self {
Self::Unwrapped { child } => {
if let Ok(mut guard) = child.try_lock() {
guard.try_wait()
} else {
Ok(None)
}
}
Self::Wrapped { child } => {
if let Ok(mut guard) = child.try_lock() {
guard.try_wait()
} else {
Ok(None)
}
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TerminalExitStatus {
Exited(i32),
Killed,
TimedOut,
Aborted,
}
impl TerminalExitStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Exited(_) => "exited",
Self::Killed => "killed",
Self::TimedOut => "timedOut",
Self::Aborted => "aborted",
}
}
}
#[derive(Debug)]
pub enum BackgroundTerminal {
Running {
child: ChildHandle,
output_buffer: Arc<Mutex<String>>,
last_read_offset: Arc<AtomicUsize>,
},
Finished {
status: TerminalExitStatus,
final_output: String,
},
}
impl BackgroundTerminal {
pub fn new_running(child: ChildHandle) -> Self {
Self::Running {
child,
output_buffer: Arc::new(Mutex::new(String::new())),
last_read_offset: Arc::new(AtomicUsize::new(0)),
}
}
pub fn new_running_unwrapped(child: Child) -> Self {
Self::Running {
child: ChildHandle::Unwrapped {
child: Arc::new(Mutex::new(child)),
},
output_buffer: Arc::new(Mutex::new(String::new())),
last_read_offset: Arc::new(AtomicUsize::new(0)),
}
}
pub fn is_running(&self) -> bool {
matches!(self, Self::Running { .. })
}
pub fn status_str(&self) -> &'static str {
match self {
Self::Running { .. } => "running",
Self::Finished { status, .. } => status.as_str(),
}
}
pub async fn get_incremental_output(&self) -> String {
match self {
Self::Running {
output_buffer,
last_read_offset,
..
} => {
let current_offset = last_read_offset.load(Ordering::Acquire);
let buffer = output_buffer.lock().await;
let new_output = buffer[current_offset..].to_string();
let new_len = buffer.len();
drop(buffer);
last_read_offset.store(new_len, Ordering::Release);
new_output
}
Self::Finished { final_output, .. } => final_output.clone(),
}
}
pub async fn append_output(&self, output: &str) {
if let Self::Running { output_buffer, .. } = self {
let mut buffer = output_buffer.lock().await;
buffer.push_str(output);
}
}
pub async fn get_all_output(&self) -> String {
match self {
Self::Running { output_buffer, .. } => {
let buffer = output_buffer.lock().await;
buffer.clone()
}
Self::Finished { final_output, .. } => final_output.clone(),
}
}
pub async fn finish(self, status: TerminalExitStatus) -> Self {
match self {
Self::Running { output_buffer, .. } => {
let final_output = output_buffer.lock().await.clone();
Self::Finished {
status,
final_output,
}
}
finished @ Self::Finished { .. } => finished,
}
}
}
#[derive(Debug, Default)]
pub struct BackgroundProcessManager {
terminals: DashMap<String, BackgroundTerminal>,
}
impl BackgroundProcessManager {
pub fn new() -> Self {
Self {
terminals: DashMap::new(),
}
}
pub fn register(&self, shell_id: String, terminal: BackgroundTerminal) {
self.terminals.insert(shell_id, terminal);
}
pub fn has_terminal(&self, shell_id: &str) -> bool {
self.terminals.contains_key(shell_id)
}
pub fn get(
&self,
shell_id: &str,
) -> Option<dashmap::mapref::one::Ref<'_, String, BackgroundTerminal>> {
self.terminals.get(shell_id)
}
pub fn remove(&self, shell_id: &str) -> Option<(String, BackgroundTerminal)> {
self.terminals.remove(shell_id)
}
pub async fn finish_terminal(&self, shell_id: &str, status: TerminalExitStatus) {
if let Some((id, terminal)) = self.terminals.remove(shell_id) {
let finished = terminal.finish(status).await;
self.terminals.insert(id, finished);
}
}
pub fn count(&self) -> usize {
self.terminals.len()
}
pub fn shell_ids(&self) -> Vec<String> {
self.terminals.iter().map(|r| r.key().clone()).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_terminal_exit_status() {
assert_eq!(TerminalExitStatus::Exited(0).as_str(), "exited");
assert_eq!(TerminalExitStatus::Killed.as_str(), "killed");
assert_eq!(TerminalExitStatus::TimedOut.as_str(), "timedOut");
assert_eq!(TerminalExitStatus::Aborted.as_str(), "aborted");
}
#[test]
fn test_background_process_manager_new() {
let manager = BackgroundProcessManager::new();
assert_eq!(manager.count(), 0);
}
#[test]
fn test_background_process_manager_has_terminal() {
let manager = BackgroundProcessManager::new();
assert!(!manager.has_terminal("test-id"));
}
#[tokio::test]
async fn test_background_terminal_finished() {
let terminal = BackgroundTerminal::Finished {
status: TerminalExitStatus::Exited(0),
final_output: "test output".to_string(),
};
assert!(!terminal.is_running());
assert_eq!(terminal.status_str(), "exited");
assert_eq!(terminal.get_all_output().await, "test output");
}
}