#![allow(dead_code)]
use std::process::{Child, Command, Stdio};
use std::time::Duration;
use tempfile::TempDir;
pub fn free_port() -> u16 {
std::net::TcpListener::bind("127.0.0.1:0")
.expect("bind ephemeral port")
.local_addr()
.expect("read local addr")
.port()
}
pub struct TestNats {
child: Child,
pub url: String,
_store_dir: TempDir,
}
impl TestNats {
pub async fn start() -> TestNats {
let bin = std::env::var("NATS_SERVER_BIN").unwrap_or_else(|_| "nats-server".to_string());
let port = free_port();
let store_dir = tempfile::tempdir().expect("create jetstream store dir");
let child = Command::new(&bin)
.args([
"--jetstream",
"--addr",
"127.0.0.1",
"--port",
&port.to_string(),
"--store_dir",
store_dir.path().to_str().expect("utf-8 store path"),
])
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap_or_else(|e| {
panic!(
"failed to spawn `{bin}`: {e}. Is nats-server installed? \
Run `mise install` or set NATS_SERVER_BIN."
)
});
let guard = TestNats {
child,
url: format!("nats://127.0.0.1:{port}"),
_store_dir: store_dir,
};
for _ in 0..100 {
if async_nats::connect(&guard.url).await.is_ok() {
return guard;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("nats-server at {} never became ready", guard.url);
}
}
impl Drop for TestNats {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
pub const MINIO_USER: &str = "minioadmin";
pub const MINIO_PASSWORD: &str = "minioadmin";
pub const MINIO_BUCKET: &str = "test-bucket";
pub struct TestMinio {
child: Child,
pub endpoint: String,
_data_dir: TempDir,
}
impl TestMinio {
pub async fn start() -> TestMinio {
let minio_bin = std::env::var("MINIO_BIN").unwrap_or_else(|_| "minio".to_string());
let mc_bin = std::env::var("MC_BIN").unwrap_or_else(|_| "mc".to_string());
let api_port = free_port();
let console_port = free_port();
let data_dir = tempfile::tempdir().expect("create minio data dir");
let child = Command::new(&minio_bin)
.args([
"server",
data_dir.path().to_str().expect("utf-8 data path"),
"--address",
&format!("127.0.0.1:{api_port}"),
"--console-address",
&format!("127.0.0.1:{console_port}"),
])
.env("MINIO_ROOT_USER", MINIO_USER)
.env("MINIO_ROOT_PASSWORD", MINIO_PASSWORD)
.env("MINIO_BROWSER", "off")
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap_or_else(|e| {
panic!(
"failed to spawn `{minio_bin}`: {e}. Is minio installed? \
Run `mise install` or set MINIO_BIN."
)
});
let guard = TestMinio {
child,
endpoint: format!("http://127.0.0.1:{api_port}"),
_data_dir: data_dir,
};
let endpoint = guard.endpoint.clone();
let mc_cfg = tempfile::tempdir().expect("create mc config dir");
let cfg = mc_cfg.path().to_str().expect("utf-8 mc config path");
let mut ready = false;
for _ in 0..100 {
let alias = Command::new(&mc_bin)
.args([
"--config-dir",
cfg,
"alias",
"set",
"t",
&endpoint,
MINIO_USER,
MINIO_PASSWORD,
])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.unwrap_or_else(|e| panic!("failed to spawn `{mc_bin}`: {e}. Run `mise install`."));
if alias.success() {
let mb = Command::new(&mc_bin)
.args([
"--config-dir",
cfg,
"mb",
"--ignore-existing",
&format!("t/{MINIO_BUCKET}"),
])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.expect("run mc mb");
if mb.success() {
ready = true;
break;
}
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
assert!(
ready,
"minio at {endpoint} never became ready (mc mb failed)"
);
guard
}
pub fn s3_options(&self) -> Vec<(&'static str, String)> {
vec![
("aws_access_key_id", MINIO_USER.to_string()),
("aws_secret_access_key", MINIO_PASSWORD.to_string()),
("aws_endpoint", self.endpoint.clone()),
("aws_allow_http", "true".to_string()),
("aws_region", "us-east-1".to_string()),
]
}
}
impl Drop for TestMinio {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
#[derive(Debug)]
pub struct ManifestPutCrash {
pub inner: std::sync::Arc<dyn object_store::ObjectStore>,
pub armed: std::sync::atomic::AtomicBool,
}
impl std::fmt::Display for ManifestPutCrash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ManifestPutCrash({})", self.inner)
}
}
#[async_trait::async_trait]
impl object_store::ObjectStore for ManifestPutCrash {
async fn put_opts(
&self,
location: &object_store::path::Path,
payload: object_store::PutPayload,
opts: object_store::PutOptions,
) -> object_store::Result<object_store::PutResult> {
if location.as_ref().ends_with(".manifest.json")
&& self.armed.swap(false, std::sync::atomic::Ordering::SeqCst)
{
return Err(object_store::Error::Generic {
store: "ManifestPutCrash",
source: "injected crash between payload and manifest PUT".into(),
});
}
self.inner.put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &object_store::path::Path,
opts: object_store::PutMultipartOptions,
) -> object_store::Result<Box<dyn object_store::MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
async fn get_opts(
&self,
location: &object_store::path::Path,
options: object_store::GetOptions,
) -> object_store::Result<object_store::GetResult> {
self.inner.get_opts(location, options).await
}
fn delete_stream(
&self,
locations: futures::stream::BoxStream<
'static,
object_store::Result<object_store::path::Path>,
>,
) -> futures::stream::BoxStream<'static, object_store::Result<object_store::path::Path>> {
self.inner.delete_stream(locations)
}
fn list(
&self,
prefix: Option<&object_store::path::Path>,
) -> futures::stream::BoxStream<'static, object_store::Result<object_store::ObjectMeta>> {
self.inner.list(prefix)
}
async fn list_with_delimiter(
&self,
prefix: Option<&object_store::path::Path>,
) -> object_store::Result<object_store::ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy_opts(
&self,
from: &object_store::path::Path,
to: &object_store::path::Path,
options: object_store::CopyOptions,
) -> object_store::Result<()> {
self.inner.copy_opts(from, to, options).await
}
}