use core::panic;
use std::{future::Future, sync::Arc};
use bollard::{
container::{CreateContainerOptions, RemoveContainerOptions, StartContainerOptions},
Docker,
};
use lazy_static::lazy_static;
use thiserror::Error;
use tokio::{runtime::Handle, signal::unix::SignalKind, sync::Mutex};
#[derive(Debug, Clone)]
pub struct EggShell {
docker: Arc<Mutex<Docker>>,
containers: Arc<Mutex<Vec<String>>>,
debug: bool,
}
#[derive(Debug, Error)]
pub enum Error {
#[error("error talking to docker: {0}")]
Docker(DockerError),
#[error("unknown error: {0}")]
Generic(String),
}
#[derive(Debug, Error)]
pub enum DockerError {
#[error("could not ping docker")]
Ping,
#[error("could not create container: {0}")]
CreateContainer(String),
#[error("could not start container: {0}")]
StartContainer(String),
#[error("could not delete container: {0}")]
DeleteContainer(String),
}
lazy_static! {
static ref EGGSHELLS: Arc<Mutex<Vec<EggShell>>> = Arc::new(Mutex::new(Vec::new()));
static ref SUPERVISOR_RUNNING: Arc<Mutex<Option<()>>> = Arc::new(Mutex::new(None));
}
pub async fn supervise_signals<F>(wait_hook: F)
where
F: Future<Output = ()>,
{
let mut supervisor = SUPERVISOR_RUNNING.lock().await;
if !supervisor.is_some() {
supervisor.replace(());
drop(supervisor)
} else {
wait_hook.await;
return;
}
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut terminate = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
tokio::select! {
_ = interrupt.recv() => {},
_ = terminate.recv() => {},
};
eprintln!("eggshell: signal received, tearing down containers");
let mut lock = EGGSHELLS.lock().await.clone();
let mut i = 0;
let mut prune_indexes = Vec::new();
for shell in lock.iter_mut() {
if let Err(e) = shell.teardown().await {
eprintln!("error during teardown: {:?}", e);
} else {
prune_indexes.push(i)
}
i += 1;
}
for idx in prune_indexes {
lock.remove(idx);
}
wait_hook.await;
if lock.len() == 0 {
std::process::abort()
}
}
impl EggShell {
pub async fn new(docker: Arc<Mutex<Docker>>) -> Result<Self, Error> {
match docker.lock().await.ping().await {
Ok(_) => {}
Err(_) => return Err(Error::Docker(DockerError::Ping)),
}
let this = Self {
docker,
containers: Arc::new(Mutex::new(Vec::new())),
debug: false,
};
EGGSHELLS.lock().await.push(this.clone());
Ok(this)
}
pub fn set_debug(&mut self, debug: bool) {
self.debug = debug;
}
pub async fn launch(
&mut self,
name: &str,
container: bollard::container::Config<String>,
start_options: Option<StartContainerOptions<String>>,
) -> Result<(), Error> {
self.containers.lock().await.push(name.to_string());
match self
.docker
.lock()
.await
.create_container(Some(CreateContainerOptions { name }), container)
.await
{
Ok(s) => s,
Err(_) => {
return Err(Error::Docker(DockerError::CreateContainer(
name.to_string(),
)))
}
};
match self
.docker
.lock()
.await
.start_container(name.clone(), start_options)
.await
{
Ok(_) => {}
Err(_) => return Err(Error::Docker(DockerError::StartContainer(name.to_string()))),
};
Ok(())
}
pub async fn teardown(&self) -> Result<(), Error> {
if !self.debug {
let mut error = None;
let mut containers = self.containers.lock().await;
while containers.len() > 0 {
let len = containers.len();
let container = &containers[len - 1];
match self
.docker
.lock()
.await
.remove_container(
container,
Some(RemoveContainerOptions {
force: true,
v: true,
link: false,
}),
)
.await
{
Ok(_) => {}
Err(e) => {
if EGGSHELLS.lock().await.len() > 1 {
error.replace(Err(Error::Docker(DockerError::DeleteContainer(
e.to_string(),
))));
}
}
}
containers.remove(len - 1);
}
if error.is_some() {
return error.unwrap();
}
}
Ok(())
}
}
impl Drop for EggShell {
fn drop(&mut self) {
tokio::task::block_in_place(move || Handle::current().block_on(self.teardown()).unwrap());
}
}
mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn basic() {
use crate::supervise_signals;
use crate::EggShell;
use bollard::container::Config;
use bollard::Docker;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
tokio::spawn(supervise_signals(async {
sleep(Duration::new(1, 0)).await
}));
tokio::spawn(supervise_signals(async { () }));
tokio::spawn(supervise_signals(async { () }));
tokio::spawn(supervise_signals(async { () }));
tokio::spawn(supervise_signals(async { () }));
tokio::spawn(supervise_signals(async { () }));
tokio::spawn(supervise_signals(async { () }));
tokio::spawn(supervise_signals(async { () }));
let units = 20;
let docker = Arc::new(Mutex::new(Docker::connect_with_unix_defaults().unwrap()));
let res = EggShell::new(docker.clone()).await;
assert!(res.is_ok());
let count = docker
.lock()
.await
.list_containers::<String>(None)
.await
.unwrap()
.len();
let mut gs = res.unwrap();
for num in 0..units {
let res = gs
.launch(
&format!("test-{}", num),
Config {
image: Some("postgres:latest".to_string()),
env: Some(vec!["POSTGRES_HOST_AUTH_METHOD=trust".to_string()]),
..Default::default()
},
None,
)
.await;
assert!(res.is_ok())
}
let newcount = docker
.lock()
.await
.list_containers::<String>(None)
.await
.unwrap()
.len();
assert!(newcount == count + units);
drop(gs);
let newcount = docker
.lock()
.await
.list_containers::<String>(None)
.await
.unwrap()
.len();
assert!(newcount == count);
}
}