use crate::pty::PtyHandle;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use std::collections::HashMap;
#[cfg(unix)]
use std::os::unix::process::CommandExt;
use std::process::Stdio;
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContainerState {
Created,
Running,
Stopped,
}
impl ContainerState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Created => "created",
Self::Running => "running",
Self::Stopped => "stopped",
}
}
}
#[derive(Debug, Clone)]
pub struct ContainerHandle {
pub id: String,
pub name: String,
pub image: String,
pub command: Vec<String>,
pub env: Vec<(String, String)>,
pub working_dir: String,
pub state: ContainerState,
pub pid: Option<u32>,
pub exit_code: Option<i32>,
pub created_at: DateTime<Utc>,
pub tty: bool,
pub open_stdin: bool,
}
struct ProcessHandle {
child: Child,
pty: Option<PtyHandle>,
}
pub struct ContainerRuntime {
containers: HashMap<String, ContainerHandle>,
processes: Mutex<HashMap<String, ProcessHandle>>,
}
impl ContainerRuntime {
#[must_use]
pub fn new() -> Self {
Self {
containers: HashMap::new(),
processes: Mutex::new(HashMap::new()),
}
}
pub fn add_container(&mut self, handle: ContainerHandle) {
tracing::info!(
"Adding container: id={}, name={}, image={}",
handle.id,
handle.name,
handle.image
);
self.containers.insert(handle.id.clone(), handle);
}
#[must_use]
pub fn get_container(&self, id: &str) -> Option<&ContainerHandle> {
self.containers.get(id)
}
pub fn get_container_mut(&mut self, id: &str) -> Option<&mut ContainerHandle> {
self.containers.get_mut(id)
}
#[must_use]
pub fn list_containers(&self, all: bool) -> Vec<ContainerHandle> {
self.containers
.values()
.filter(|c| all || c.state == ContainerState::Running)
.cloned()
.collect()
}
pub async fn start_container(&mut self, id: &str) -> Result<()> {
self.start_container_with_size(id, 80, 24).await
}
pub async fn start_container_with_size(
&mut self,
id: &str,
cols: u16,
rows: u16,
) -> Result<()> {
let container = self
.containers
.get_mut(id)
.context("container not found")?;
if container.state == ContainerState::Running {
anyhow::bail!("container is already running");
}
if container.command.is_empty() {
anyhow::bail!("container has no command");
}
let use_tty = container.tty;
let command = container.command.clone();
let working_dir = container.working_dir.clone();
let env = container.env.clone();
tracing::info!(
"Starting container {}: cmd={:?}, workdir={}, tty={}",
id,
command,
working_dir,
use_tty
);
let mut cmd = Command::new(&command[0]);
cmd.args(&command[1..]);
cmd.current_dir(&working_dir);
for (key, value) in &env {
cmd.env(key, value);
}
let pty_handle = if use_tty {
let pty = PtyHandle::new(cols, rows)
.context("failed to create PTY")?;
let slave_fd = pty.slave_fd();
unsafe {
cmd.pre_exec(move || {
if libc::setsid() < 0 {
return Err(std::io::Error::last_os_error());
}
if libc::ioctl(slave_fd, libc::TIOCSCTTY as libc::c_ulong, 0) < 0 {
return Err(std::io::Error::last_os_error());
}
if libc::dup2(slave_fd, libc::STDIN_FILENO) < 0 {
return Err(std::io::Error::last_os_error());
}
if libc::dup2(slave_fd, libc::STDOUT_FILENO) < 0 {
return Err(std::io::Error::last_os_error());
}
if libc::dup2(slave_fd, libc::STDERR_FILENO) < 0 {
return Err(std::io::Error::last_os_error());
}
if slave_fd > libc::STDERR_FILENO {
libc::close(slave_fd);
}
Ok(())
});
}
cmd.env("TERM", "xterm-256color");
Some(pty)
} else {
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
None
};
let child = cmd.spawn().context("failed to spawn container process")?;
let pid = child.id();
container.state = ContainerState::Running;
container.pid = pid;
container.exit_code = None;
if let Some(pid) = pid {
tracing::info!("Container {} started with PID {}", id, pid);
}
let mut processes = self.processes.lock().await;
processes.insert(
id.to_string(),
ProcessHandle {
child,
pty: pty_handle,
},
);
Ok(())
}
pub async fn stop_container(&mut self, id: &str, timeout_secs: u32) -> Result<()> {
let container = self
.containers
.get_mut(id)
.context("container not found")?;
if container.state != ContainerState::Running {
anyhow::bail!("container is not running");
}
let pid = container.pid.context("container has no PID")?;
tracing::info!(
"Stopping container {} (PID {}) with timeout {}s",
id,
pid,
timeout_secs
);
#[cfg(target_os = "linux")]
{
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let nix_pid = Pid::from_raw(pid as i32);
if let Err(e) = kill(nix_pid, Signal::SIGTERM) {
tracing::warn!("Failed to send SIGTERM to {}: {}", pid, e);
}
}
#[cfg(not(target_os = "linux"))]
{
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
let mut processes = self.processes.lock().await;
if let Some(process_handle) = processes.get_mut(id) {
let timeout = tokio::time::Duration::from_secs(timeout_secs.into());
let result = tokio::time::timeout(timeout, process_handle.child.wait()).await;
match result {
Ok(Ok(status)) => {
container.exit_code = status.code();
tracing::info!(
"Container {} exited with code {:?}",
id,
container.exit_code
);
}
Ok(Err(e)) => {
tracing::warn!("Error waiting for container {}: {}", id, e);
}
Err(_) => {
tracing::warn!(
"Container {} did not stop after {}s, sending SIGKILL",
id,
timeout_secs
);
#[cfg(target_os = "linux")]
{
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let nix_pid = Pid::from_raw(pid as i32);
let _ = kill(nix_pid, Signal::SIGKILL);
}
#[cfg(not(target_os = "linux"))]
{
unsafe {
libc::kill(pid as i32, libc::SIGKILL);
}
}
let _ = tokio::time::timeout(
tokio::time::Duration::from_secs(5),
process_handle.child.wait(),
)
.await;
}
}
}
container.state = ContainerState::Stopped;
container.pid = None;
processes.remove(id);
Ok(())
}
pub async fn remove_container(&mut self, id: &str, force: bool) -> Result<()> {
let container = self.containers.get(id).context("container not found")?;
if container.state == ContainerState::Running {
if force {
self.stop_container(id, 10).await?;
} else {
anyhow::bail!("cannot remove running container (use force=true)");
}
}
tracing::info!("Removing container {}", id);
self.containers.remove(id);
Ok(())
}
pub async fn wait_container(&mut self, id: &str) -> Result<i32> {
let container = self.containers.get(id).context("container not found")?;
if container.state == ContainerState::Stopped {
return Ok(container.exit_code.unwrap_or(-1));
}
if container.state != ContainerState::Running {
anyhow::bail!("container is not running");
}
let mut processes = self.processes.lock().await;
if let Some(process_handle) = processes.get_mut(id) {
let status = process_handle.child.wait().await?;
let exit_code = status.code().unwrap_or(-1);
drop(processes); if let Some(container) = self.containers.get_mut(id) {
container.state = ContainerState::Stopped;
container.exit_code = Some(exit_code);
container.pid = None;
}
return Ok(exit_code);
}
anyhow::bail!("container process not found")
}
pub async fn signal_container(&mut self, id: &str, signal: &str) -> Result<()> {
let container = self
.containers
.get(id)
.context("container not found")?;
if container.state != ContainerState::Running {
anyhow::bail!("container is not running");
}
let pid = container.pid.context("container has no PID")?;
tracing::info!("Sending signal {} to container {} (PID {})", signal, id, pid);
let sig_num = parse_signal(signal)?;
#[cfg(target_os = "linux")]
{
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
let nix_pid = Pid::from_raw(pid as i32);
let nix_signal = Signal::try_from(sig_num).context("invalid signal number")?;
kill(nix_pid, nix_signal).context("failed to send signal")?;
}
#[cfg(not(target_os = "linux"))]
{
let result = unsafe { libc::kill(pid as i32, sig_num) };
if result != 0 {
anyhow::bail!("failed to send signal: {}", std::io::Error::last_os_error());
}
}
Ok(())
}
pub async fn resize_tty(&self, id: &str, cols: u16, rows: u16) -> Result<()> {
tracing::debug!("ResizeTty for {}: {}x{}", id, cols, rows);
let processes = self.processes.lock().await;
if let Some(process_handle) = processes.get(id) {
if let Some(ref pty) = process_handle.pty {
pty.resize(cols, rows)?;
tracing::debug!("Container {} TTY resized to {}x{}", id, cols, rows);
return Ok(());
} else {
tracing::debug!("Container {} has no TTY", id);
}
} else {
tracing::debug!("Container {} process not found", id);
}
Ok(())
}
pub async fn get_pty_master_fd(&self, id: &str) -> Option<std::os::unix::io::RawFd> {
let processes = self.processes.lock().await;
processes.get(id).and_then(|p| p.pty.as_ref().map(|pty| pty.master_fd()))
}
}
fn parse_signal(signal: &str) -> Result<i32> {
if let Ok(num) = signal.parse::<i32>() {
return Ok(num);
}
let sig_name = signal.to_uppercase();
let sig_name = sig_name.strip_prefix("SIG").unwrap_or(&sig_name);
match sig_name {
"HUP" => Ok(libc::SIGHUP),
"INT" => Ok(libc::SIGINT),
"QUIT" => Ok(libc::SIGQUIT),
"ILL" => Ok(libc::SIGILL),
"TRAP" => Ok(libc::SIGTRAP),
"ABRT" | "IOT" => Ok(libc::SIGABRT),
"BUS" => Ok(libc::SIGBUS),
"FPE" => Ok(libc::SIGFPE),
"KILL" => Ok(libc::SIGKILL),
"USR1" => Ok(libc::SIGUSR1),
"SEGV" => Ok(libc::SIGSEGV),
"USR2" => Ok(libc::SIGUSR2),
"PIPE" => Ok(libc::SIGPIPE),
"ALRM" => Ok(libc::SIGALRM),
"TERM" => Ok(libc::SIGTERM),
"CHLD" => Ok(libc::SIGCHLD),
"CONT" => Ok(libc::SIGCONT),
"STOP" => Ok(libc::SIGSTOP),
"TSTP" => Ok(libc::SIGTSTP),
"TTIN" => Ok(libc::SIGTTIN),
"TTOU" => Ok(libc::SIGTTOU),
"URG" => Ok(libc::SIGURG),
"XCPU" => Ok(libc::SIGXCPU),
"XFSZ" => Ok(libc::SIGXFSZ),
"VTALRM" => Ok(libc::SIGVTALRM),
"PROF" => Ok(libc::SIGPROF),
"WINCH" => Ok(libc::SIGWINCH),
"IO" | "POLL" => Ok(libc::SIGIO),
"SYS" => Ok(libc::SIGSYS),
_ => anyhow::bail!("unknown signal: {}", signal),
}
}
impl Default for ContainerRuntime {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_container(id: &str, cmd: Vec<String>) -> ContainerHandle {
ContainerHandle {
id: id.to_string(),
name: format!("test-{}", id),
image: "test:latest".to_string(),
command: cmd,
env: vec![],
working_dir: "/".to_string(),
state: ContainerState::Created,
pid: None,
exit_code: None,
created_at: Utc::now(),
tty: false,
open_stdin: false,
}
}
fn create_test_container_with_env(
id: &str,
cmd: Vec<String>,
env: Vec<(String, String)>,
) -> ContainerHandle {
ContainerHandle {
id: id.to_string(),
name: format!("test-{}", id),
image: "test:latest".to_string(),
command: cmd,
env,
working_dir: "/".to_string(),
state: ContainerState::Created,
pid: None,
exit_code: None,
created_at: Utc::now(),
tty: false,
open_stdin: false,
}
}
#[test]
fn test_container_state_as_str() {
assert_eq!(ContainerState::Created.as_str(), "created");
assert_eq!(ContainerState::Running.as_str(), "running");
assert_eq!(ContainerState::Stopped.as_str(), "stopped");
}
#[test]
fn test_container_state_equality() {
assert_eq!(ContainerState::Created, ContainerState::Created);
assert_ne!(ContainerState::Created, ContainerState::Running);
assert_ne!(ContainerState::Running, ContainerState::Stopped);
}
#[test]
fn test_container_runtime_new() {
let runtime = ContainerRuntime::new();
assert!(runtime.list_containers(true).is_empty());
}
#[test]
fn test_container_runtime_default() {
let runtime = ContainerRuntime::default();
assert!(runtime.list_containers(true).is_empty());
}
#[test]
fn test_container_runtime_add_list() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container("test1", vec!["echo".to_string()]);
runtime.add_container(container);
let list = runtime.list_containers(true);
assert_eq!(list.len(), 1);
assert_eq!(list[0].id, "test1");
let running = runtime.list_containers(false);
assert!(running.is_empty());
}
#[test]
fn test_container_runtime_add_multiple() {
let mut runtime = ContainerRuntime::new();
runtime.add_container(create_test_container("c1", vec!["echo".to_string()]));
runtime.add_container(create_test_container("c2", vec!["echo".to_string()]));
runtime.add_container(create_test_container("c3", vec!["echo".to_string()]));
let list = runtime.list_containers(true);
assert_eq!(list.len(), 3);
let ids: Vec<&str> = list.iter().map(|c| c.id.as_str()).collect();
assert!(ids.contains(&"c1"));
assert!(ids.contains(&"c2"));
assert!(ids.contains(&"c3"));
}
#[test]
fn test_container_runtime_get_container() {
let mut runtime = ContainerRuntime::new();
runtime.add_container(create_test_container("test1", vec!["echo".to_string()]));
let container = runtime.get_container("test1");
assert!(container.is_some());
assert_eq!(container.unwrap().id, "test1");
let nonexistent = runtime.get_container("nonexistent");
assert!(nonexistent.is_none());
}
#[test]
fn test_container_runtime_get_container_mut() {
let mut runtime = ContainerRuntime::new();
runtime.add_container(create_test_container("test1", vec!["echo".to_string()]));
{
let container = runtime.get_container_mut("test1").unwrap();
container.name = "modified-name".to_string();
}
let container = runtime.get_container("test1").unwrap();
assert_eq!(container.name, "modified-name");
}
#[tokio::test]
async fn test_container_lifecycle() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container(
"lifecycle-test",
vec!["echo".to_string(), "hello".to_string()],
);
runtime.add_container(container);
runtime.start_container("lifecycle-test").await.unwrap();
let container = runtime.get_container("lifecycle-test").unwrap();
assert_eq!(container.state, ContainerState::Running);
let exit_code = runtime.wait_container("lifecycle-test").await.unwrap();
assert_eq!(exit_code, 0);
let container = runtime.get_container("lifecycle-test").unwrap();
assert_eq!(container.state, ContainerState::Stopped);
runtime
.remove_container("lifecycle-test", false)
.await
.unwrap();
assert!(runtime.get_container("lifecycle-test").is_none());
}
#[tokio::test]
async fn test_start_container_sets_pid() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container(
"pid-test",
vec!["sleep".to_string(), "0.1".to_string()],
);
runtime.add_container(container);
runtime.start_container("pid-test").await.unwrap();
let container = runtime.get_container("pid-test").unwrap();
assert!(container.pid.is_some());
assert!(container.pid.unwrap() > 0);
let _ = runtime.wait_container("pid-test").await;
}
#[tokio::test]
async fn test_start_nonexistent_container() {
let mut runtime = ContainerRuntime::new();
let result = runtime.start_container("nonexistent").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
}
#[tokio::test]
async fn test_start_already_running_container() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container(
"double-start",
vec!["sleep".to_string(), "1".to_string()],
);
runtime.add_container(container);
runtime.start_container("double-start").await.unwrap();
let result = runtime.start_container("double-start").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("already running"));
let _ = runtime.stop_container("double-start", 1).await;
}
#[tokio::test]
async fn test_start_container_with_no_command() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container("empty-cmd", vec![]);
runtime.add_container(container);
let result = runtime.start_container("empty-cmd").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("no command"));
}
#[tokio::test]
async fn test_container_with_nonzero_exit_code() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container(
"exit-code-test",
vec!["sh".to_string(), "-c".to_string(), "exit 42".to_string()],
);
runtime.add_container(container);
runtime.start_container("exit-code-test").await.unwrap();
let exit_code = runtime.wait_container("exit-code-test").await.unwrap();
assert_eq!(exit_code, 42);
let container = runtime.get_container("exit-code-test").unwrap();
assert_eq!(container.exit_code, Some(42));
}
#[tokio::test]
async fn test_container_with_environment_variables() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container_with_env(
"env-test",
vec![
"sh".to_string(),
"-c".to_string(),
"exit $((MY_VAR + 10))".to_string(),
],
vec![("MY_VAR".to_string(), "5".to_string())],
);
runtime.add_container(container);
runtime.start_container("env-test").await.unwrap();
let exit_code = runtime.wait_container("env-test").await.unwrap();
assert_eq!(exit_code, 15);
}
#[tokio::test]
async fn test_stop_running_container() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container(
"stop-test",
vec!["sleep".to_string(), "60".to_string()],
);
runtime.add_container(container);
runtime.start_container("stop-test").await.unwrap();
let container = runtime.get_container("stop-test").unwrap();
assert_eq!(container.state, ContainerState::Running);
runtime.stop_container("stop-test", 5).await.unwrap();
let container = runtime.get_container("stop-test").unwrap();
assert_eq!(container.state, ContainerState::Stopped);
assert!(container.pid.is_none());
}
#[tokio::test]
async fn test_stop_nonexistent_container() {
let mut runtime = ContainerRuntime::new();
let result = runtime.stop_container("nonexistent", 5).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
}
#[tokio::test]
async fn test_stop_not_running_container() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container("not-running", vec!["echo".to_string()]);
runtime.add_container(container);
let result = runtime.stop_container("not-running", 5).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not running"));
}
#[tokio::test]
async fn test_remove_stopped_container() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container("remove-test", vec!["echo".to_string()]);
runtime.add_container(container);
runtime.start_container("remove-test").await.unwrap();
let _ = runtime.wait_container("remove-test").await;
runtime.remove_container("remove-test", false).await.unwrap();
assert!(runtime.get_container("remove-test").is_none());
}
#[tokio::test]
async fn test_remove_running_container_without_force() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container(
"force-remove",
vec!["sleep".to_string(), "60".to_string()],
);
runtime.add_container(container);
runtime.start_container("force-remove").await.unwrap();
let result = runtime.remove_container("force-remove", false).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("running"));
let _ = runtime.stop_container("force-remove", 1).await;
}
#[tokio::test]
async fn test_remove_running_container_with_force() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container(
"force-remove",
vec!["sleep".to_string(), "60".to_string()],
);
runtime.add_container(container);
runtime.start_container("force-remove").await.unwrap();
runtime.remove_container("force-remove", true).await.unwrap();
assert!(runtime.get_container("force-remove").is_none());
}
#[tokio::test]
async fn test_remove_nonexistent_container() {
let mut runtime = ContainerRuntime::new();
let result = runtime.remove_container("nonexistent", false).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
}
#[tokio::test]
async fn test_wait_already_stopped_container() {
let mut runtime = ContainerRuntime::new();
let mut container = create_test_container("wait-stopped", vec!["echo".to_string()]);
container.state = ContainerState::Stopped;
container.exit_code = Some(123);
runtime.add_container(container);
let exit_code = runtime.wait_container("wait-stopped").await.unwrap();
assert_eq!(exit_code, 123);
}
#[tokio::test]
async fn test_wait_nonexistent_container() {
let mut runtime = ContainerRuntime::new();
let result = runtime.wait_container("nonexistent").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
}
#[tokio::test]
async fn test_wait_created_container() {
let mut runtime = ContainerRuntime::new();
let container = create_test_container("wait-created", vec!["echo".to_string()]);
runtime.add_container(container);
let result = runtime.wait_container("wait-created").await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not running"));
}
#[tokio::test]
async fn test_list_containers_filter_running() {
let mut runtime = ContainerRuntime::new();
runtime.add_container(create_test_container(
"running",
vec!["sleep".to_string(), "60".to_string()],
));
runtime.add_container(create_test_container("created", vec!["echo".to_string()]));
runtime.start_container("running").await.unwrap();
let all = runtime.list_containers(true);
assert_eq!(all.len(), 2);
let running = runtime.list_containers(false);
assert_eq!(running.len(), 1);
assert_eq!(running[0].id, "running");
let _ = runtime.stop_container("running", 1).await;
}
#[test]
fn test_container_handle_clone() {
let container = create_test_container("clone-test", vec!["echo".to_string()]);
let cloned = container.clone();
assert_eq!(cloned.id, container.id);
assert_eq!(cloned.name, container.name);
assert_eq!(cloned.image, container.image);
assert_eq!(cloned.command, container.command);
assert_eq!(cloned.state, container.state);
}
}