1use crate::Result;
5use aws_config::meta::region::RegionProviderChain;
6use aws_sdk_s3::config::Credentials;
7use color_eyre::eyre::eyre;
8use duct::Handle;
9use once_cell::sync::Lazy;
10use rand::prelude::*;
11use regex::Regex;
12use std::{
13 net::{SocketAddr, TcpListener},
14 path::PathBuf,
15 sync::{Arc, Weak},
16 time::Duration,
17};
18use tempfile::TempDir;
19use tokio::sync::Mutex;
20use tracing::debug;
21use which::which;
22
23pub struct MinioServer {
24 #[allow(dead_code)] temp_dir: TempDir,
26 handle: Handle,
27 endpoint: SocketAddr,
28}
29
30impl MinioServer {
31 pub async fn get() -> Result<Arc<Self>> {
37 static INSTANCE: once_cell::sync::Lazy<Mutex<Option<Weak<MinioServer>>>> =
40 once_cell::sync::Lazy::new(|| Mutex::new(None));
41
42 let mut instance = INSTANCE.lock().await;
43
44 let server = match instance.as_ref() {
45 Some(weak) => {
46 match weak.upgrade() {
48 Some(strong) => {
49 strong
51 }
52 None => {
53 let strong = Arc::new(Self::start().await?);
55 *instance = Some(Arc::downgrade(&strong));
56
57 strong
58 }
59 }
60 }
61 None => {
62 let strong = Arc::new(Self::start().await?);
64 *instance = Some(Arc::downgrade(&strong));
65
66 strong
67 }
68 };
69
70 debug!(endpoint = %server.endpoint,
71 "get() found minio server");
72
73 server.wait_for_service_start().await?;
75
76 Ok(server)
77 }
78
79 pub async fn start() -> Result<Self> {
85 let path = Self::find_minio()?;
86
87 let endpoint = Self::random_endpoint()?;
88
89 let temp_dir = Self::temp_data_dir()?;
90
91 let server = duct::cmd!(
92 path,
93 "server",
94 temp_dir.path(),
95 "--address",
96 endpoint.to_string(),
97 "--quiet"
98 );
99 let handle = server
100 .start()?;
103
104 let minio_server = Self {
105 temp_dir,
106 handle,
107 endpoint,
108 };
109
110 debug!(endpoint = %minio_server.endpoint, "Waiting for minio service to start");
111
112 minio_server.wait_for_service_start().await?;
113
114 debug!(endpoint = %minio_server.endpoint, "Minio started");
115
116 Ok(minio_server)
117 }
118
119 pub fn endpoint_uri(&self) -> http::Uri {
121 format!("http://{}/", self.endpoint).parse().unwrap()
122 }
123
124 pub fn endpoint_url(&self) -> url::Url {
126 self.endpoint_uri().to_string().parse().unwrap()
127 }
128
129 pub async fn aws_client(&self) -> Result<aws_sdk_s3::Client> {
131 let region_provider = RegionProviderChain::first_try("us-east-1");
132 let aws_config = aws_config::from_env()
133 .region(region_provider)
134 .credentials_provider(Credentials::from_keys("minioadmin", "minioadmin", None))
135 .load()
136 .await;
137
138 let s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config)
139 .endpoint_url(self.endpoint_uri().to_string());
140
141 Ok(aws_sdk_s3::Client::from_conf(s3_config_builder.build()))
142 }
143
144 pub async fn create_bucket(
149 &self,
150 bucket: impl AsRef<str>,
151 enable_versioning: bool,
152 ) -> Result<String> {
153 static REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r##"[^0-9a-zA-Z\.\-]+"##).unwrap());
158
159 debug!(bucket = bucket.as_ref(), "Creating bucket");
160
161 let bucket = REGEX.replace_all(bucket.as_ref(), "-");
162
163 let bucket = &bucket[..bucket.len().min(63 - 9)];
166
167 let bucket = format!("{:08x}-{bucket}", rand::thread_rng().next_u32());
169
170 debug!(%bucket, "Transformed bucket name into valid and unique bucket ID");
171
172 let client = self.aws_client().await?;
173
174 client.create_bucket().bucket(bucket.clone()).send().await?;
175
176 let policy = again::RetryPolicy::exponential(Duration::from_millis(100))
183 .with_max_retries(10)
184 .with_max_delay(Duration::from_secs(1));
185
186 let client = self.aws_client().await?;
187
188 if let Err(e) = policy
189 .retry(|| client.head_bucket().bucket(&bucket).send())
190 .await
191 {
192 return Err(
193 eyre!("The bucket {bucket} is not accessible even after it was explicitly created. Last error was: \n{e}")
194 );
195 };
196
197 if enable_versioning {
198 client
199 .put_bucket_versioning()
200 .bucket(bucket.clone())
201 .versioning_configuration(
202 aws_sdk_s3::types::VersioningConfiguration::builder()
203 .status(aws_sdk_s3::types::BucketVersioningStatus::Enabled)
204 .build(),
205 )
206 .send()
207 .await?;
208 }
209
210 debug!(%bucket, "Bucket created");
211
212 Ok(bucket)
213 }
214
215 async fn wait_for_service_start(&self) -> Result<()> {
217 let policy = again::RetryPolicy::exponential(Duration::from_millis(100))
220 .with_max_retries(10)
221 .with_max_delay(Duration::from_secs(1));
222
223 let client = self.aws_client().await?;
224
225 if let Err(e) = policy.retry(|| client.list_buckets().send()).await {
226 Err(
227 eyre!("The minio server didn't come online in the allowed time. The last error reported by ListBuckets against the server was:\n{}",
228 e)
229 )
230 } else {
231 Ok(())
232 }
233 }
234
235 fn find_minio() -> Result<PathBuf> {
236 std::env::var_os("MINIO_PATH").map(PathBuf::from)
237 .or_else(|| which("minio").ok())
238 .ok_or_else(|| eyre!("Unable to find `minio`, either set the MINIO_PATH env var or put place the Minio executable in your PATH"))
239 }
240
241 fn random_endpoint() -> Result<SocketAddr> {
243 let listener = TcpListener::bind("127.0.0.1:0")?;
244 let addr = listener.local_addr()?;
245 drop(listener);
246
247 Ok(addr)
248 }
249
250 fn temp_data_dir() -> Result<TempDir> {
263 let home = dirs::home_dir().ok_or_else(|| eyre!("Unable to determine home directory"))?;
264
265 Ok(tempfile::tempdir_in(home)?)
266 }
267}
268
269impl Drop for MinioServer {
270 fn drop(&mut self) {
271 debug!(pids = ?self.handle.pids(), "Killing minio process(es)");
272
273 if let Err(e) = self.handle.kill() {
274 eprintln!("Error killing minio process: {}", e);
275 }
276 }
277}