use crate::{
pelagos_state,
state::AppState,
types::{ExecCreateBody, ExecSession, ExecStartBody},
};
use axum::{
body::Body,
extract::{Path, Request, State},
http::{Response, StatusCode},
response::IntoResponse,
Json,
};
use hyper_util::rt::TokioIo;
use serde_json::{json, Value};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use uuid::Uuid;
pub async fn create(
Path(container_id): Path<String>,
State(state): State<AppState>,
body: axum::body::Bytes,
) -> (StatusCode, Json<Value>) {
let body: ExecCreateBody = serde_json::from_slice(&body).unwrap_or_default();
if pelagos_state::read_state(&container_id).is_err() {
return (
StatusCode::NOT_FOUND,
Json(json!({"message": format!("container '{}' not found", container_id)})),
);
}
let exec_id = Uuid::new_v4().to_string();
let session = ExecSession {
container_name: container_id.clone(),
cmd: body.cmd,
tty: body.tty.unwrap_or(false),
env: body.env.unwrap_or_default(),
working_dir: body.working_dir,
user: body.user,
};
state.add_exec(exec_id.clone(), session).await;
log::debug!("registered exec {} for {}", exec_id, container_id);
(StatusCode::CREATED, Json(json!({"Id": exec_id})))
}
pub async fn start(
State(state): State<AppState>,
Path(exec_id): Path<String>,
req: Request,
) -> Response<Body> {
let (mut parts, body) = req.into_parts();
let body_bytes = axum::body::to_bytes(body, 4096).await.unwrap_or_default();
let exec_body: ExecStartBody = serde_json::from_slice(&body_bytes).unwrap_or_default();
let session = match state.get_exec(&exec_id).await {
Some(s) => s,
None => {
return (
StatusCode::NOT_FOUND,
Json(json!({"message": format!("exec '{}' not found", exec_id)})),
)
.into_response()
}
};
log::info!(
"exec {} on {}: {:?} detach={:?} tty={}",
exec_id,
session.container_name,
session.cmd,
exec_body.detach,
session.tty,
);
if exec_body.detach.unwrap_or(false) {
let bin = state.pelagos_bin().to_string();
let state2 = state.clone();
let id2 = exec_id.clone();
tokio::spawn(async move {
let code = run_exec_subprocess(&bin, &session).await.unwrap_or(-1);
state2.complete_exec(id2, code).await;
});
return StatusCode::OK.into_response();
}
let on_upgrade = parts.extensions.remove::<hyper::upgrade::OnUpgrade>();
let bin = state.pelagos_bin().to_string();
let state2 = state.clone();
let id2 = exec_id.clone();
if let Some(on_upgrade) = on_upgrade {
let session2 = session.clone();
tokio::spawn(async move {
match on_upgrade.await {
Ok(upgraded) => {
let io = TokioIo::new(upgraded);
let code = run_exec_on_io(io, &session2, &bin).await;
state2.complete_exec(id2, code).await;
}
Err(e) => {
log::error!("upgrade error for exec {}: {}", id2, e);
state2.complete_exec(id2, -1).await;
}
}
});
Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS)
.header("Upgrade", "tcp")
.header("Connection", "Upgrade")
.body(Body::empty())
.unwrap()
} else {
let code = run_exec_subprocess(&bin, &session).await.unwrap_or(-1);
state.complete_exec(exec_id, code).await;
StatusCode::OK.into_response()
}
}
pub async fn inspect(
Path(exec_id): Path<String>,
State(state): State<AppState>,
) -> (StatusCode, Json<Value>) {
if let Some(exit_code) = state.get_completed_exec(&exec_id).await {
return (
StatusCode::OK,
Json(json!({
"ID": exec_id,
"Running": false,
"ExitCode": exit_code,
"ProcessConfig": {"entrypoint": "", "arguments": []}
})),
);
}
if let Some(s) = state.get_exec(&exec_id).await {
return (
StatusCode::OK,
Json(json!({
"ID": exec_id,
"ContainerID": s.container_name,
"Running": true,
"ExitCode": null,
"ProcessConfig": {
"entrypoint": s.cmd.first().cloned().unwrap_or_default(),
"arguments": s.cmd.get(1..).unwrap_or(&[]).to_vec()
}
})),
);
}
(
StatusCode::NOT_FOUND,
Json(json!({"message": "exec not found"})),
)
}
async fn run_exec_subprocess(bin: &str, session: &ExecSession) -> Result<i64, String> {
let out = build_exec_command(bin, session)
.output()
.await
.map_err(|e| e.to_string())?;
Ok(out.status.code().unwrap_or(-1) as i64)
}
async fn run_exec_on_io<IO>(io: IO, session: &ExecSession, bin: &str) -> i64
where
IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let mut child = match build_exec_command(bin, session)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::null())
.spawn()
{
Ok(c) => c,
Err(e) => {
log::error!("exec spawn: {}", e);
return -1;
}
};
let mut stdout = child.stdout.take().expect("stdout piped");
let mut stderr = child.stderr.take().expect("stderr piped");
let (_reader, mut writer) = tokio::io::split(io);
let tty = session.tty;
let mut buf_out = vec![0u8; 4096];
let mut buf_err = vec![0u8; 4096];
let mut stdout_done = false;
let mut stderr_done = false;
while !stdout_done || !stderr_done {
tokio::select! {
n = stdout.read(&mut buf_out), if !stdout_done => {
match n {
Ok(0) | Err(_) => stdout_done = true,
Ok(n) => {
let data = &buf_out[..n];
if tty {
let _ = writer.write_all(data).await;
} else {
let _ = write_docker_frame(&mut writer, 1, data).await;
}
}
}
}
n = stderr.read(&mut buf_err), if !stderr_done => {
match n {
Ok(0) | Err(_) => stderr_done = true,
Ok(n) => {
let data = &buf_err[..n];
if tty {
let _ = writer.write_all(data).await;
} else {
let _ = write_docker_frame(&mut writer, 2, data).await;
}
}
}
}
}
}
let status = child.wait().await.ok();
status.and_then(|s| s.code()).unwrap_or(-1) as i64
}
fn build_exec_command(bin: &str, session: &ExecSession) -> tokio::process::Command {
let mut cmd = tokio::process::Command::new(bin);
cmd.arg("exec");
if let Some(wd) = &session.working_dir {
cmd.arg("--workdir").arg(wd);
}
if let Some(user) = &session.user {
cmd.arg("--user").arg(user);
}
for e in &session.env {
cmd.arg("--env").arg(e);
}
cmd.arg("--").arg(&session.container_name);
for arg in &session.cmd {
cmd.arg(arg);
}
cmd
}
async fn write_docker_frame<W: tokio::io::AsyncWrite + Unpin>(
w: &mut W,
stream_type: u8,
data: &[u8],
) -> std::io::Result<()> {
let len = data.len() as u32;
let header = [
stream_type,
0,
0,
0,
(len >> 24) as u8,
(len >> 16) as u8,
(len >> 8) as u8,
len as u8,
];
w.write_all(&header).await?;
w.write_all(data).await
}