use crate::events::{Event, Stats};
use crate::specs::{LinuxResources, Process};
use chrono::{DateTime, Utc};
use futures::ready;
use futures::task::{Context, Poll};
use log::warn;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::iter::FromIterator;
use std::path::PathBuf;
use std::process::Stdio;
use std::time::Duration;
use std::{env, fs, io};
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::macros::support::Pin;
use tokio::process::Child;
use tokio::process::Command;
use tokio::stream::Stream;
use tokio::stream::StreamExt;
use tokio::time::timeout;
use uuid::Uuid;
pub mod console;
pub mod events;
pub mod specs;
pub type TopResults = Vec<HashMap<String, String>>;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to extract test files: {}", source))]
BundleExtractError { source: io::Error },
#[snafu(display("Invalid path: {}", source))]
InvalidPathError { source: io::Error },
#[snafu(display("Json deserialization error: {}", source))]
JsonDeserializationError { source: serde_json::error::Error },
#[snafu(display("Missing container statistics"))]
MissingContainerStatsError {},
#[snafu(display("Unable to spawn process: {}", source))]
ProcessSpawnError { source: io::Error },
#[snafu(display("Runc command error: {}", source))]
RuncCommandError { source: io::Error },
#[snafu(display("Runc command failed, stdout: \"{}\", stderr: \"{}\"", stdout, stderr))]
RuncCommandFailedError { stdout: String, stderr: String },
#[snafu(display("Runc command timed out: {}", source))]
RuncCommandTimeoutError { source: tokio::time::Elapsed },
#[snafu(display("Unable to parse runc version"))]
RuncInvalidVersionError {},
#[snafu(display("Unable to locate the runc binary"))]
RuncNotFoundError {},
#[snafu(display("Failed to create spec file: {}", source))]
SpecFileCreationError { source: io::Error },
#[snafu(display("Failed to cleanup spec file: {}", source))]
SpecFileCleanupError { source: io::Error },
#[snafu(display("Failed to find valid path for spec file"))]
SpecFilePathError {},
#[snafu(display("Top command is missing a pid header"))]
TopMissingPidHeaderError {},
#[snafu(display("Top command returned an empty response"))]
TopShortResponseError {},
#[snafu(display("Unix socket connection error: {}", source))]
UnixSocketConnectError { source: io::Error },
#[snafu(display("Unable to bind to unix socket: {}", source))]
UnixSocketOpenError { source: io::Error },
#[snafu(display("Unix socket failed to receive pty"))]
UnixSocketReceiveMessageError {},
#[snafu(display("Unix socket unexpectedly closed"))]
UnixSocketUnexpectedCloseError {},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Container {
pub id: Option<String>,
pub pid: Option<usize>,
pub status: Option<String>,
pub bundle: Option<String>,
pub rootfs: Option<String>,
pub created: Option<DateTime<Utc>>,
pub annotations: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone)]
pub struct Version {
pub runc_version: Option<String>,
pub spec_version: Option<String>,
pub commit: Option<String>,
}
#[derive(Debug, Clone)]
pub enum RuncLogFormat {
Json,
Text,
}
#[derive(Debug, Clone, Default)]
pub struct RuncConfiguration {
pub command: Option<PathBuf>,
pub timeout: Option<Duration>,
pub root: Option<PathBuf>,
pub debug: bool,
pub log: Option<PathBuf>,
pub log_format: Option<RuncLogFormat>,
pub systemd_cgroup: bool,
pub rootless: Option<bool>,
}
pub struct Runc {
command: PathBuf,
timeout: Duration,
root: Option<PathBuf>,
debug: bool,
log: Option<PathBuf>,
log_format: Option<RuncLogFormat>,
systemd_cgroup: bool,
rootless: Option<bool>,
}
trait Args {
fn args(&self) -> Result<Vec<String>, Error>;
}
impl Runc {
pub fn new(config: RuncConfiguration) -> Result<Self, Error> {
let command = config
.command
.or_else(Self::runc_binary)
.context(RuncNotFoundError {})?;
let timeout = config
.timeout
.or(Some(Duration::from_millis(5000)))
.unwrap();
Ok(Self {
command,
timeout,
root: config.root,
debug: config.debug,
log: config.log,
log_format: config.log_format,
systemd_cgroup: config.systemd_cgroup,
rootless: config.rootless,
})
}
pub async fn create(
&self,
id: &str,
bundle: &PathBuf,
opts: Option<&CreateOpts>,
) -> Result<(), Error> {
let mut args = vec![String::from("create")];
Self::append_opts(&mut args, opts.map(|opts| opts as &dyn Args))?;
let bundle: String = bundle
.canonicalize()
.context(InvalidPathError {})?
.to_string_lossy()
.parse()
.unwrap();
args.push(String::from("--bundle"));
args.push(bundle);
args.push(String::from(id));
self.command(&args, true).await.map(|_| ())
}
pub async fn delete(&self, id: &str, opts: Option<&DeleteOpts>) -> Result<(), Error> {
let mut args = vec![String::from("delete")];
Self::append_opts(&mut args, opts.map(|opts| opts as &dyn Args))?;
args.push(String::from(id));
self.command(&args, true).await.map(|_| ())
}
pub async fn events(&self, id: &str, interval: &Duration) -> Result<EventStream, Error> {
let args = vec![
String::from("events"),
format!("--interval={}s", interval.as_secs()),
String::from(id),
];
let console_stream = self.command_with_streaming_output(&args, false).await?;
Ok(EventStream::new(console_stream))
}
pub async fn exec(
&self,
id: &str,
spec: &Process,
opts: Option<&ExecOpts>,
) -> Result<(), Error> {
let temp_file = env::var_os("XDG_RUNTIME_DIR")
.and_then(
|temp_dir| match temp_dir.to_string_lossy().parse() as Result<String, _> {
Ok(temp_dir) => Some(PathBuf::from(format!(
"{}/runc-process-{}",
temp_dir,
Uuid::new_v4()
))),
Err(_) => None,
},
)
.context(SpecFilePathError {})?;
{
let spec_json = serde_json::to_string(spec).context(JsonDeserializationError {})?;
let mut f = File::create(temp_file.clone()).context(SpecFileCreationError {})?;
f.write(spec_json.as_bytes())
.context(SpecFileCreationError {})?;
f.flush().context(SpecFileCreationError {})?;
}
let temp_file: String = temp_file.to_string_lossy().parse().unwrap();
let mut args = vec![
String::from("exec"),
String::from("--process"),
temp_file.clone(),
];
Self::append_opts(&mut args, opts.map(|opts| opts as &dyn Args))?;
args.push(String::from(id));
let res = self.command(&args, true).await.map(|_| ());
fs::remove_file(temp_file).context(SpecFileCleanupError {})?;
res
}
pub async fn kill(&self, id: &str, sig: i32, opts: Option<&KillOpts>) -> Result<(), Error> {
let mut args = vec![String::from("kill")];
Self::append_opts(&mut args, opts.map(|opts| opts as &dyn Args))?;
args.push(String::from(id));
args.push(format!("{}", sig));
self.command(&args, true).await.map(|_| ())
}
pub async fn list(&self) -> Result<Vec<Container>, Error> {
let args = vec![String::from("list"), String::from("--format=json")];
let output = self.command(&args, false).await?;
let output = output.trim();
Ok(if output == "null" {
Vec::new()
} else {
serde_json::from_str(&output).context(JsonDeserializationError {})?
})
}
pub async fn pause(&self, id: &str) -> Result<(), Error> {
let args = vec![String::from("pause"), String::from(id)];
self.command(&args, true).await.map(|_| ())
}
pub async fn ps(&self, id: &str) -> Result<Vec<usize>, Error> {
let args = vec![
String::from("ps"),
String::from("--format=json"),
String::from(id),
];
let output = self.command(&args, false).await?;
let output = output.trim();
Ok(if output == "null" {
Vec::new()
} else {
serde_json::from_str(&output).context(JsonDeserializationError {})?
})
}
pub async fn resume(&self, id: &str) -> Result<(), Error> {
let args = vec![String::from("resume"), String::from(id)];
self.command(&args, true).await.map(|_| ())
}
pub async fn run(
&self,
id: &str,
bundle: &PathBuf,
opts: Option<&CreateOpts>,
) -> Result<(), Error> {
let mut args = vec![String::from("run")];
Self::append_opts(&mut args, opts.map(|opts| opts as &dyn Args))?;
let bundle: String = bundle
.canonicalize()
.context(InvalidPathError {})?
.to_string_lossy()
.parse()
.unwrap();
args.push(String::from("--bundle"));
args.push(bundle);
args.push(String::from(id));
self.command(&args, true).await.map(|_| ())
}
pub async fn start(&self, id: &str) -> Result<(), Error> {
let args = vec![String::from("start"), String::from(id)];
self.command(&args, true).await.map(|_| ())
}
pub async fn state(&self, id: &str) -> Result<Container, Error> {
let args = vec![String::from("state"), String::from(id)];
let output = self.command(&args, true).await?;
Ok(serde_json::from_str(&output).context(JsonDeserializationError {})?)
}
pub async fn stats(&self, id: &str) -> Result<Stats, Error> {
let args = vec![
String::from("events"),
String::from("--stats"),
String::from(id),
];
let output = self.command(&args, true).await?;
let ev: Event = serde_json::from_str(&output).context(JsonDeserializationError {})?;
ensure!(ev.stats.is_some(), MissingContainerStatsError {});
Ok(ev.stats.unwrap())
}
pub async fn top(&self, id: &str, ps_options: Option<&str>) -> Result<TopResults, Error> {
let mut args = vec![
String::from("ps"),
String::from("--format"),
String::from("table"),
String::from(id),
];
if let Some(ps_options) = ps_options {
args.push(String::from(ps_options));
}
let output = self.command(&args, false).await?;
let lines: Vec<&str> = output.split('\n').collect();
ensure!(!lines.is_empty(), TopShortResponseError {});
let headers: Vec<String> = lines[0].split_whitespace().map(String::from).collect();
let pid_index = headers.iter().position(|x| x == "PID");
ensure!(pid_index.is_some(), TopMissingPidHeaderError {});
let mut processes: TopResults = Vec::new();
for line in lines.iter().skip(1) {
if line.is_empty() {
continue;
}
let fields: Vec<&str> = line.split_whitespace().collect();
if fields[pid_index.unwrap()] == "-" {
continue;
}
let mut process: Vec<&str> = Vec::from(&fields[..headers.len() - 1]);
let process_field = &fields[headers.len() - 1..].join(" ");
process.push(process_field);
let mut process_map: HashMap<String, String> = HashMap::new();
for j in 0..headers.len() {
if let Some(key) = headers.get(j) {
if let Some(&value) = process.get(j) {
process_map.insert(key.clone(), String::from(value));
}
}
}
processes.push(process_map);
}
Ok(processes)
}
pub async fn update(&self, id: &str, resources: &LinuxResources) -> Result<(), Error> {
let temp_file = env::var_os("XDG_RUNTIME_DIR")
.and_then(
|temp_dir| match temp_dir.to_string_lossy().parse() as Result<String, _> {
Ok(temp_dir) => Some(PathBuf::from(format!(
"{}/runc-process-{}",
temp_dir,
Uuid::new_v4()
))),
Err(_) => None,
},
)
.context(SpecFilePathError {})?;
{
let spec_json =
serde_json::to_string(resources).context(JsonDeserializationError {})?;
let mut f = File::create(temp_file.clone()).context(SpecFileCreationError {})?;
f.write(spec_json.as_bytes())
.context(SpecFileCreationError {})?;
f.flush().context(SpecFileCreationError {})?;
}
let temp_file: String = temp_file.to_string_lossy().parse().unwrap();
let args = vec![
String::from("update"),
String::from("--resources"),
temp_file.clone(),
String::from(id),
];
let res = self.command(&args, true).await.map(|_| ());
fs::remove_file(temp_file).context(SpecFileCleanupError {})?;
res
}
pub async fn version(&self) -> Result<Version, Error> {
let output = self.command(&[String::from("--version")], false).await?;
let mut version = Version {
runc_version: None,
spec_version: None,
commit: None,
};
for line in output.split('\n').take(3).map(|line| line.trim()) {
if line.contains("version") {
version.runc_version = Some(
line.split("version ")
.nth(1)
.map(String::from)
.context(RuncInvalidVersionError {})?,
);
} else if line.contains("spec") {
version.spec_version = Some(
line.split(": ")
.nth(1)
.map(String::from)
.context(RuncInvalidVersionError {})?,
);
} else if line.contains("commit") {
version.commit = line.split(": ").nth(1).map(String::from);
}
}
Ok(version)
}
async fn command(&self, args: &[String], combined_output: bool) -> Result<String, Error> {
let args = self.concat_args(args)?;
let process = Command::new(&self.command)
.args(&args.clone())
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context(ProcessSpawnError {})?;
let result = timeout(self.timeout, process.wait_with_output())
.await
.context(RuncCommandTimeoutError {})?
.context(RuncCommandError {})?;
let stdout = String::from_utf8(result.stdout.clone()).unwrap();
let stderr = String::from_utf8(result.stderr.clone()).unwrap();
ensure!(
result.status.success(),
RuncCommandFailedError {
stdout: stdout,
stderr: stderr
}
);
Ok(if combined_output {
let mut combined = String::new();
combined.push_str(&stdout);
combined.push_str(&stderr);
combined
} else {
stdout
})
}
async fn command_with_streaming_output(
&self,
args: &[String],
combined_output: bool,
) -> Result<ConsoleStream, Error> {
let args = self.concat_args(args)?;
let process = Command::new(&self.command)
.args(&args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context(ProcessSpawnError {})?;
ConsoleStream::new(process, combined_output)
}
fn concat_args(&self, args: &[String]) -> Result<Vec<String>, Error> {
let mut combined = self.args()?;
combined.append(&mut Vec::from_iter(args.iter().cloned().map(String::from)));
Ok(combined)
}
fn append_opts(args: &mut Vec<String>, opts: Option<&dyn Args>) -> Result<(), Error> {
if let Some(opts) = opts {
args.append(&mut opts.args()?);
}
Ok(())
}
fn runc_binary() -> Option<PathBuf> {
env::var_os("PATH").and_then(|paths| {
env::split_paths(&paths)
.filter_map(|dir| {
let full_path = dir.join("runc");
if full_path.is_file() {
Some(full_path)
} else {
None
}
})
.next()
})
}
}
impl Args for Runc {
fn args(&self) -> Result<Vec<String>, Error> {
let mut args: Vec<String> = Vec::new();
if let Some(root) = self.root.clone() {
args.push(String::from("--root"));
args.push(
root.canonicalize()
.context(InvalidPathError {})?
.to_string_lossy()
.parse()
.unwrap(),
);
}
if self.debug {
args.push(String::from("--debug"));
}
if let Some(log) = self.log.clone() {
args.push(String::from("--log"));
args.push(log.to_string_lossy().parse().unwrap());
}
if let Some(log_format) = self.log_format.clone() {
args.push(String::from("--log-format"));
args.push(String::from(match log_format {
RuncLogFormat::Json => "json",
RuncLogFormat::Text => "text",
}))
}
if self.systemd_cgroup {
args.push(String::from("--systemd-cgroup"));
}
if let Some(rootless) = self.rootless {
args.push(format!("--rootless={}", rootless));
}
Ok(args)
}
}
#[cfg(test)]
impl Drop for Runc {
fn drop(&mut self) {
if let Some(root) = self.root.clone() {
if let Err(e) = fs::remove_dir_all(&root) {
warn!("failed to cleanup root directory: {}", e);
}
}
if let Some(system_runc) = Self::runc_binary() {
if system_runc != self.command {
if let Err(e) = fs::remove_file(&self.command) {
warn!("failed to remove runc binary: {}", e);
}
}
} else if let Err(e) = fs::remove_file(&self.command) {
warn!("failed to remove runc binary: {}", e);
}
}
}
#[derive(Debug, Clone)]
pub struct CreateOpts {
pub pid_file: Option<PathBuf>,
pub console_socket: Option<PathBuf>,
pub no_pivot: bool,
pub no_new_keyring: bool,
pub detach: bool,
}
impl Args for CreateOpts {
fn args(&self) -> Result<Vec<String>, Error> {
let mut args: Vec<String> = Vec::new();
if let Some(pid_file) = self.pid_file.clone() {
args.push(String::from("--pid-file"));
args.push(pid_file.to_string_lossy().parse().unwrap())
}
if let Some(console_socket) = self.console_socket.clone() {
args.push(String::from("--console-socket"));
args.push(
console_socket
.canonicalize()
.context(InvalidPathError {})?
.to_string_lossy()
.parse()
.unwrap(),
)
}
if self.no_pivot {
args.push(String::from("--no-pivot"));
}
if self.no_new_keyring {
args.push(String::from("--no-new-keyring"));
}
if self.detach {
args.push(String::from("--detach"));
}
Ok(args)
}
}
#[derive(Debug, Clone)]
pub struct DeleteOpts {
pub force: bool,
}
impl Args for DeleteOpts {
fn args(&self) -> Result<Vec<String>, Error> {
let mut args: Vec<String> = Vec::new();
if self.force {
args.push(String::from("--force"));
}
Ok(args)
}
}
#[derive(Debug, Clone)]
pub struct ExecOpts {
pub pid_file: Option<PathBuf>,
pub console_socket: Option<PathBuf>,
pub detach: bool,
}
impl Args for ExecOpts {
fn args(&self) -> Result<Vec<String>, Error> {
let mut args: Vec<String> = Vec::new();
if let Some(console_socket) = self.console_socket.clone() {
args.push(String::from("--console-socket"));
args.push(
console_socket
.canonicalize()
.context(InvalidPathError {})?
.to_string_lossy()
.parse()
.unwrap(),
);
}
if self.detach {
args.push(String::from("--detach"));
}
if let Some(pid_file) = self.pid_file.clone() {
args.push(String::from("--pid-file"));
args.push(pid_file.to_string_lossy().parse().unwrap());
}
Ok(args)
}
}
#[derive(Debug, Clone)]
pub struct KillOpts {
pub all: bool,
}
impl Args for KillOpts {
fn args(&self) -> Result<Vec<String>, Error> {
let mut args: Vec<String> = Vec::new();
if self.all {
args.push(String::from("--all"))
}
Ok(args)
}
}
pub struct EventStream {
inner: ConsoleStream,
}
impl EventStream {
fn new(inner: ConsoleStream) -> Self {
Self { inner }
}
}
impl Stream for EventStream {
type Item = Result<Event, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(Ok(line)) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
Poll::Ready(Some(
serde_json::from_str(&line).context(JsonDeserializationError {}),
))
} else {
Poll::Ready(None)
}
}
}
struct ConsoleStream {
process: Child,
inner: Pin<Box<dyn Stream<Item = tokio::io::Result<String>>>>,
}
impl ConsoleStream {
fn new(mut process: Child, combined_output: bool) -> Result<Self, Error> {
let stdout = BufReader::new(process.stdout.take().unwrap()).lines();
let inner: Pin<Box<dyn Stream<Item = tokio::io::Result<String>>>> = if combined_output {
let stderr = BufReader::new(process.stderr.take().unwrap()).lines();
Box::pin(stdout.merge(stderr))
} else {
Box::pin(stdout)
};
Ok(Self { process, inner })
}
}
impl Stream for ConsoleStream {
type Item = tokio::io::Result<String>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(line) = ready!(self.inner.as_mut().poll_next(cx)) {
Poll::Ready(Some(line))
} else {
Poll::Ready(None)
}
}
}
impl Drop for ConsoleStream {
fn drop(&mut self) {
if let Err(e) = self.process.kill() {
warn!("failed to kill container: {}", e);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::console::ReceivePtyMaster;
use crate::specs::{LinuxCapabilities, LinuxMemory, POSIXRlimit, User};
use flate2::read::GzDecoder;
use futures::executor::block_on;
use futures::StreamExt;
use log::error;
use tar::Archive;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
use tokio::time::delay_for;
#[test]
fn test_create() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path);
config.root = Some(runc_root);
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let id = format!("{}", Uuid::new_v4());
let console_socket = env::temp_dir().join(&id).with_extension("console");
let receive_pty_master = ReceivePtyMaster::new(&console_socket)?;
tokio::spawn(async move {
match receive_pty_master.receive().await {
Ok(pty_master) => {
Box::leak(Box::new(pty_master));
}
Err(err) => {
error!("Receive PTY master error: {}", err);
}
}
});
let bundle = env::temp_dir().join(&id);
extract_tarball(&PathBuf::from("test_fixture/busybox.tar.gz"), &bundle)
.context(BundleExtractError {})?;
runc.create(
&id,
&bundle,
Some(&CreateOpts {
pid_file: None,
console_socket: Some(console_socket),
no_pivot: false,
no_new_keyring: false,
detach: false,
}),
)
.await?;
runc.state(&id).await
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let container = runtime.block_on(task).expect("test failed");
assert_eq!(container.status, Some(String::from("created")));
}
#[test]
fn test_delete() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await?;
runc.kill(&container.id, libc::SIGKILL, None).await?;
delay_for(Duration::from_millis(500)).await;
runc.delete(&container.id, None).await?;
runc.list().await
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let containers = runtime.block_on(task).expect("test failed");
assert!(containers.is_empty());
}
#[test]
fn test_events() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await?;
let events = runc.events(&container.id, &Duration::from_secs(1)).await?;
Ok::<_, Error>(
events
.take(3)
.map(|event| event.unwrap())
.collect::<Vec<Event>>()
.await,
)
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let events = runtime.block_on(task).expect("test failed");
assert_eq!(events.len(), 3);
for event in events.iter() {
if let Some(stats) = event.stats.clone() {
if let Some(memory) = stats.memory.clone() {
if let Some(usage) = memory.usage {
if let Some(usage) = usage.usage {
if usage > 0 {
continue;
}
}
}
}
}
panic!("event is missing memory usage statistics");
}
}
#[test]
fn test_exec() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path);
config.root = Some(runc_root);
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let id = format!("{}", Uuid::new_v4());
let console_socket = env::temp_dir().join(&id).with_extension("console");
let receive_pty_master = ReceivePtyMaster::new(&console_socket)?;
tokio::spawn(async move {
match receive_pty_master.receive().await {
Ok(pty_master) => {
Box::leak(Box::new(pty_master));
}
Err(err) => {
error!("Receive PTY master error: {}", err);
}
}
});
let additional_console_socket = env::temp_dir().join(&id).with_extension("console2");
let receive_additional_pty_master = ReceivePtyMaster::new(&additional_console_socket)?;
tokio::spawn(async move {
match receive_additional_pty_master.receive().await {
Ok(pty_master) => {
Box::leak(Box::new(pty_master));
}
Err(err) => {
error!("Receive additional PTY master error: {}", err);
}
}
});
let bundle = env::temp_dir().join(&id);
extract_tarball(&PathBuf::from("test_fixture/busybox.tar.gz"), &bundle)
.context(BundleExtractError {})?;
let capabilities = Some(vec![
String::from("CAP_AUDIT_WRITE"),
String::from("CAP_KILL"),
String::from("CAP_NET_BIND_SERVICE"),
]);
runc.create(
&id,
&bundle,
Some(&CreateOpts {
pid_file: None,
console_socket: Some(console_socket),
no_pivot: false,
no_new_keyring: false,
detach: false,
}),
)
.await?;
runc.exec(
&id,
&Process {
terminal: Some(true),
console_size: None,
user: Some(User {
uid: Some(0),
gid: Some(0),
additional_gids: None,
username: None,
}),
args: Some(vec![String::from("sleep"), String::from("10")]),
command_line: None,
env: Some(vec![
String::from(
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
),
String::from("TERM=xterm"),
]),
cwd: Some(String::from("/")),
capabilities: Some(LinuxCapabilities {
bounding: capabilities.clone(),
effective: capabilities.clone(),
inheritable: capabilities.clone(),
permitted: capabilities.clone(),
ambient: capabilities.clone(),
}),
rlimits: Some(vec![POSIXRlimit {
limit_type: Some(String::from("RLIMIT_NOFILE")),
hard: Some(1024),
soft: Some(1024),
}]),
no_new_privileges: Some(false),
app_armor_profile: None,
oom_score_adj: None,
selinux_label: None,
},
Some(&ExecOpts {
pid_file: Some(PathBuf::from("/tmp/bang.pid")),
console_socket: Some(additional_console_socket),
detach: true,
}),
)
.await?;
delay_for(Duration::from_millis(500)).await;
let processes = runc.top(&id, None).await?;
runc.kill(&id, libc::SIGKILL, None).await?;
Ok::<_, Error>(processes)
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let processes = runtime.block_on(task).expect("test failed");
assert_ne!(
processes
.iter()
.find(|process| if let Some(cmd) = process.get("CMD") {
cmd == "sleep 10"
} else {
false
}),
None
);
}
#[test]
fn test_kill() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await?;
runc.kill(&container.id, libc::SIGKILL, None).await?;
delay_for(Duration::from_millis(500)).await;
runc.state(&container.id).await
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let state = runtime.block_on(task).expect("test failed");
assert_eq!(state.status, Some(String::from("stopped")));
}
#[test]
fn test_list() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await
.unwrap();
let containers = runc.list().await.unwrap();
if containers.len() != 1 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected a single container",
));
}
if let Some(container_item) = containers.get(0) {
if let Some(id) = container_item.id.clone() {
if id == container.id {
return Ok(runc);
}
}
}
Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected container to match",
))
};
let mut runtime = Runtime::new().expect("unable to create runtime");
runtime.block_on(task).expect("test failed");
}
#[test]
fn test_pause() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await?;
runc.pause(&container.id).await?;
let container_state = runc.state(&container.id).await?;
runc.resume(&container.id).await?;
Ok::<_, Error>(container_state)
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let container_state = runtime.block_on(task).expect("test failed");
assert_eq!(container_state.status, Some(String::from("paused")));
}
#[test]
fn test_ps() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await
.unwrap();
delay_for(Duration::from_millis(100)).await;
let res = runc.ps(&container.id).await;
if let Err(err) = res {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("failed to run ps command: {}", err),
));
}
let processes = res.unwrap();
if processes.len() != 1 {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected a single shell process",
))
} else if let Some(pid) = processes.get(0) {
if *pid > 0 && *pid < 32768 {
Ok::<_, io::Error>(runc)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid pid number",
))
}
} else {
Err(io::Error::new(io::ErrorKind::Other, ""))
}
};
let mut runtime = Runtime::new().expect("unable to create runtime");
runtime.block_on(task).expect("test failed");
}
#[test]
fn test_resume() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await?;
runc.pause(&container.id).await?;
let container_state = runc.state(&container.id).await?;
let status = container_state.status.unwrap();
assert_eq!(status, "paused");
runc.resume(&container.id).await?;
runc.state(&container.id).await
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let container = runtime.block_on(task).expect("test failed");
assert_eq!(container.status, Some(String::from("running")));
}
#[test]
fn test_run() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path);
config.root = Some(runc_root);
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let id = format!("{}", Uuid::new_v4());
let console_socket = env::temp_dir().join(&id).with_extension("console");
let receive_pty_master = ReceivePtyMaster::new(&console_socket)?;
tokio::spawn(async move {
match receive_pty_master.receive().await {
Ok(pty_master) => {
Box::leak(Box::new(pty_master));
}
Err(err) => {
error!("Receive PTY master error: {}", err);
}
}
});
let bundle = env::temp_dir().join(&id);
extract_tarball(&PathBuf::from("test_fixture/busybox.tar.gz"), &bundle)
.context(BundleExtractError {})?;
runc.run(
&id,
&bundle,
Some(&CreateOpts {
pid_file: None,
console_socket: Some(console_socket),
no_pivot: false,
no_new_keyring: false,
detach: true,
}),
)
.await?;
delay_for(Duration::from_millis(500)).await;
runc.state(&id).await
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let container = runtime.block_on(task).expect("test failed");
assert_eq!(container.status, Some(String::from("running")));
}
#[test]
fn test_start() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path);
config.root = Some(runc_root);
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let id = format!("{}", Uuid::new_v4());
let console_socket = env::temp_dir().join(&id).with_extension("console");
let receive_pty_master = ReceivePtyMaster::new(&console_socket)?;
tokio::spawn(async move {
match receive_pty_master.receive().await {
Ok(pty_master) => {
Box::leak(Box::new(pty_master));
}
Err(err) => {
error!("Receive PTY master error: {}", err);
}
}
});
let bundle = env::temp_dir().join(&id);
extract_tarball(&PathBuf::from("test_fixture/busybox.tar.gz"), &bundle)
.context(BundleExtractError {})?;
runc.create(
&id,
&bundle,
Some(&CreateOpts {
pid_file: None,
console_socket: Some(console_socket),
no_pivot: false,
no_new_keyring: false,
detach: false,
}),
)
.await?;
runc.start(&id).await?;
delay_for(Duration::from_millis(500)).await;
let container_state = runc.state(&id).await?;
runc.kill(&id, libc::SIGKILL, None).await?;
Ok::<_, Error>(container_state)
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let container = runtime.block_on(task).expect("test failed");
assert_eq!(container.status, Some(String::from("running")));
}
#[test]
fn test_state() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await?;
runc.state(&container.id).await
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let state = runtime.block_on(task).expect("test failed");
assert_eq!(state.status, Some(String::from("running")));
}
#[test]
fn test_stats() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await
.unwrap();
let stats = runc
.stats(&container.id)
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{}", err)))?;
if let Some(memory) = stats.memory {
if let Some(usage) = memory.usage {
if let Some(usage) = usage.usage {
if usage > 0 {
return Ok::<_, io::Error>(());
}
}
}
}
Err(io::Error::new(
io::ErrorKind::InvalidData,
"missing memory usage statistics",
))
};
let mut runtime = Runtime::new().expect("unable to create runtime");
runtime.block_on(task).expect("test failed");
}
#[test]
fn test_top() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let container = ManagedContainer::new(
&runc_path,
&runc_root,
&PathBuf::from("test_fixture/busybox.tar.gz"),
)
.await
.unwrap();
delay_for(Duration::from_millis(100)).await;
let processes = runc
.top(&container.id, None)
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{}", err)))?;
if processes.len() != 1 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected a single shell process",
));
}
if let Some(process) = processes.get(0) {
if process["CMD"] != "sh" {
return Err(io::Error::new(io::ErrorKind::InvalidData, "expected shell"));
}
}
Ok::<_, io::Error>(())
};
let mut runtime = Runtime::new().expect("unable to create runtime");
runtime.block_on(task).expect("test failed");
}
#[test]
fn test_update() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path);
config.root = Some(runc_root);
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let id = format!("{}", Uuid::new_v4());
let console_socket = env::temp_dir().join(&id).with_extension("console");
let receive_pty_master = ReceivePtyMaster::new(&console_socket)
.expect("Unable to open pty receiving socket");
tokio::spawn(async move {
match receive_pty_master.receive().await {
Ok(pty_master) => {
Box::leak(Box::new(pty_master));
}
Err(err) => {
error!("Receive PTY master error: {}", err);
}
}
});
let bundle = env::temp_dir().join(&id);
extract_tarball(&PathBuf::from("test_fixture/busybox.tar.gz"), &bundle)
.context(BundleExtractError {})?;
runc.run(
&id,
&bundle,
Some(&CreateOpts {
pid_file: None,
console_socket: Some(console_socket),
no_pivot: false,
no_new_keyring: false,
detach: true,
}),
)
.await?;
runc.update(
&id,
&LinuxResources {
devices: None,
memory: Some(LinuxMemory {
limit: Some(232_000_000),
reservation: None,
swap: None,
kernel: None,
kernel_tcp: None,
swappiness: None,
disable_oom_killer: None,
}),
cpu: None,
pids: None,
block_io: None,
hugepage_limits: None,
network: None,
rdma: None,
},
)
.await?;
runc.stats(&id).await
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let stats = runtime.block_on(task).expect("test failed");
if let Some(memory) = stats.memory {
if let Some(usage) = memory.usage {
if let Some(limit) = usage.limit {
if limit < 233_000_000 && limit > 231_000_000 {
return;
}
}
}
}
panic!("updating memory limit failed");
}
#[test]
fn test_version() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path);
config.root = Some(runc_root);
let runc = Runc::new(config).expect("Unable to create runc instance");
let mut runtime = Runtime::new().expect("unable to create runtime");
let version = runtime.block_on(runc.version()).expect("test failed");
assert_eq!(version.runc_version, Some(String::from("1.0.0-rc10")));
assert_eq!(version.spec_version, Some(String::from("1.0.1-dev")));
}
#[test]
fn test_receive_pty_master() {
let runc_id = format!("{}", Uuid::new_v4());
let runc_path = env::temp_dir().join(&runc_id).join("runc.amd64");
let runc_root =
PathBuf::from(env::var_os("XDG_RUNTIME_DIR").expect("expected temporary path"))
.join("rust-runc")
.join(&runc_id);
fs::create_dir_all(&runc_root).expect("unable to create runc root");
extract_tarball(
&PathBuf::from("test_fixture/runc_v1.0.0-rc10.tar.gz"),
&env::temp_dir().join(&runc_id),
)
.expect("unable to extract runc");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path);
config.root = Some(runc_root);
let runc = Runc::new(config).expect("Unable to create runc instance");
let task = async move {
let id = format!("{}", Uuid::new_v4());
let (fd_sender, fd_receiver) = futures::channel::oneshot::channel::<tokio::fs::File>();
let console_socket = env::temp_dir().join(&id).with_extension("console");
let receive_pty_master = ReceivePtyMaster::new(&console_socket)?;
tokio::spawn(async move {
match receive_pty_master.receive().await {
Ok(pty_master) => {
fd_sender.send(pty_master).unwrap();
}
Err(err) => {
error!("Receive PTY master error: {}", err);
}
}
});
let bundle = env::temp_dir().join(&id);
extract_tarball(&PathBuf::from("test_fixture/busybox.tar.gz"), &bundle)
.context(BundleExtractError {})?;
runc.run(
&id,
&bundle,
Some(&CreateOpts {
pid_file: None,
console_socket: Some(console_socket),
no_pivot: false,
no_new_keyring: false,
detach: true,
}),
)
.await?;
Ok::<_, Error>(fd_receiver.await.unwrap())
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let mut pty_master = runtime.block_on(task).expect("test failed");
let task = async move {
let mut response = [0u8; 160];
pty_master.read(&mut response).await?;
pty_master.write(b"uname -a && exit\n").await?;
delay_for(Duration::from_millis(500)).await;
let len = pty_master.read(&mut response).await?;
Ok::<_, io::Error>(String::from_utf8(Vec::from(&response[..len])).unwrap())
};
let mut runtime = Runtime::new().expect("unable to create runtime");
let response = runtime.block_on(task).expect("test failed");
let response = match response
.split('\n')
.find(|line| line.contains("Linux runc"))
{
Some(response) => response,
None => panic!("did not find response to command"),
};
assert!(response.starts_with("Linux runc"));
}
fn extract_tarball(tarball: &PathBuf, dst: &PathBuf) -> io::Result<()> {
let tarball = File::open(tarball)?;
let tar = GzDecoder::new(tarball);
let mut archive = Archive::new(tar);
archive.unpack(dst)?;
Ok(())
}
struct ManagedContainer {
id: String,
runc: Option<Runc>,
}
impl ManagedContainer {
async fn new(
runc_path: &PathBuf,
runc_root: &PathBuf,
compressed_bundle: &PathBuf,
) -> Result<Self, Error> {
let id = format!("{}", Uuid::new_v4());
let bundle = env::temp_dir().join(&id);
extract_tarball(compressed_bundle, &bundle).expect("Unable to extract bundle");
let mut config: RuncConfiguration = Default::default();
config.command = Some(runc_path.clone());
config.root = Some(runc_root.clone());
let runc = Runc::new(config)?;
let console_socket = env::temp_dir().join(&id).with_extension("console");
let receive_pty_master = ReceivePtyMaster::new(&console_socket)
.expect("Unable to open pty receiving socket");
tokio::spawn(async move {
match receive_pty_master.receive().await {
Ok(pty_master) => {
Box::leak(Box::new(pty_master));
}
Err(err) => {
error!("Receive PTY master error: {}", err);
}
}
});
runc.create(
&id,
&bundle,
Some(&CreateOpts {
pid_file: None,
console_socket: Some(console_socket),
no_pivot: false,
no_new_keyring: false,
detach: false,
}),
)
.await?;
runc.start(&id).await?;
Ok(Self {
id,
runc: Some(runc),
})
}
}
impl Drop for ManagedContainer {
fn drop(&mut self) {
if let Some(runc) = self.runc.take() {
let bundle = env::temp_dir().join(&self.id);
block_on(async move {
runc.delete(&self.id, Some(&DeleteOpts { force: true }))
.await
.expect("Unable to delete container");
fs::remove_dir_all(&bundle).expect("Unable to delete bundle");
});
}
}
}
}