ssstar_testing/
minio.rs

1//! Wrapper around the `minio` server binary to run ephemeral instances of S3-compatible object
2//! storage for testing
3
4use crate::Result;
5use aws_config::meta::region::RegionProviderChain;
6use aws_sdk_s3::config::Credentials;
7use color_eyre::eyre::eyre;
8use duct::Handle;
9use once_cell::sync::Lazy;
10use rand::prelude::*;
11use regex::Regex;
12use std::{
13    net::{SocketAddr, TcpListener},
14    path::PathBuf,
15    sync::{Arc, Weak},
16    time::Duration,
17};
18use tempfile::TempDir;
19use tokio::sync::Mutex;
20use tracing::debug;
21use which::which;
22
23pub struct MinioServer {
24    #[allow(dead_code)] // Never used, but needs to stay in scope so the temp dir isn't deleted
25    temp_dir: TempDir,
26    handle: Handle,
27    endpoint: SocketAddr,
28}
29
30impl MinioServer {
31    /// Try to re-use an existing instance that other tests might also be using, but if there isn't
32    /// one then start a new one.
33    ///
34    /// This is preferable to [`Self::start`] because if many tests can re-use the same server it
35    /// amortizes the time spent starting up a server over many tests
36    pub async fn get() -> Result<Arc<Self>> {
37        // Rust Mutex types are const now so once-cell isn't needed, but this code needs a tokio
38        // Mutex because the lock is held across await points.  Pity.
39        static INSTANCE: once_cell::sync::Lazy<Mutex<Option<Weak<MinioServer>>>> =
40            once_cell::sync::Lazy::new(|| Mutex::new(None));
41
42        let mut instance = INSTANCE.lock().await;
43
44        let server = match instance.as_ref() {
45            Some(weak) => {
46                // A weak ref is already in place.  Is the thing it references still alive?
47                match weak.upgrade() {
48                    Some(strong) => {
49                        // Still alive so we can use this reference
50                        strong
51                    }
52                    None => {
53                        // Ref went to zero so server was already dropped.  Start another one
54                        let strong = Arc::new(Self::start().await?);
55                        *instance = Some(Arc::downgrade(&strong));
56
57                        strong
58                    }
59                }
60            }
61            None => {
62                // First time this is being called, make a new instance
63                let strong = Arc::new(Self::start().await?);
64                *instance = Some(Arc::downgrade(&strong));
65
66                strong
67            }
68        };
69
70        debug!(endpoint = %server.endpoint,
71            "get() found minio server");
72
73        // Make sure the server is still working
74        server.wait_for_service_start().await?;
75
76        Ok(server)
77    }
78
79    /// Start a new minio server on a random high port.
80    ///
81    /// This assumes that Minio is installed in your system somewhere.  First the env var
82    /// `MINIO_PATH` is checked, and if that's not set then it's assumed that `minio` is in your
83    /// path.  If that doesn't work then this will fail.
84    pub async fn start() -> Result<Self> {
85        let path = Self::find_minio()?;
86
87        let endpoint = Self::random_endpoint()?;
88
89        let temp_dir = Self::temp_data_dir()?;
90
91        let server = duct::cmd!(
92            path,
93            "server",
94            temp_dir.path(),
95            "--address",
96            endpoint.to_string(),
97            "--quiet"
98        );
99        let handle = server
100            //.stderr_to_stdout()
101            //.stdout_capture()
102            .start()?;
103
104        let minio_server = Self {
105            temp_dir,
106            handle,
107            endpoint,
108        };
109
110        debug!(endpoint = %minio_server.endpoint, "Waiting for minio service to start");
111
112        minio_server.wait_for_service_start().await?;
113
114        debug!(endpoint = %minio_server.endpoint, "Minio started");
115
116        Ok(minio_server)
117    }
118
119    /// The S3 API endpoint URL where the server is listening
120    pub fn endpoint_uri(&self) -> http::Uri {
121        format!("http://{}/", self.endpoint).parse().unwrap()
122    }
123
124    /// The S3 API endpoint URL where the server is listening
125    pub fn endpoint_url(&self) -> url::Url {
126        self.endpoint_uri().to_string().parse().unwrap()
127    }
128
129    /// Get [`Client`] instance that is configured to use this Minio server instance
130    pub async fn aws_client(&self) -> Result<aws_sdk_s3::Client> {
131        let region_provider = RegionProviderChain::first_try("us-east-1");
132        let aws_config = aws_config::from_env()
133            .region(region_provider)
134            .credentials_provider(Credentials::from_keys("minioadmin", "minioadmin", None))
135            .load()
136            .await;
137
138        let s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config)
139            .endpoint_url(self.endpoint_uri().to_string());
140
141        Ok(aws_sdk_s3::Client::from_conf(s3_config_builder.build()))
142    }
143
144    /// Make a new bucket on this Minio instance for testing purposes.
145    ///
146    /// The actual bucket name will have a random suffix appended to it, because we run multiple
147    /// tests against the same minio service and we don't want them to conflict with one another.
148    pub async fn create_bucket(
149        &self,
150        bucket: impl AsRef<str>,
151        enable_versioning: bool,
152    ) -> Result<String> {
153        // Bucket names can be a maximum of 63 characters, can consist of letters and numbers and .
154        // and - characters, with two `.` characters in a row forbidden.
155        //
156        // So need to make this bucket name comply with the rules
157        static REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r##"[^0-9a-zA-Z\.\-]+"##).unwrap());
158
159        debug!(bucket = bucket.as_ref(), "Creating bucket");
160
161        let bucket = REGEX.replace_all(bucket.as_ref(), "-");
162
163        // Shorten the bucket name so we can append the unique ID and it will still be under 63
164        // chars
165        let bucket = &bucket[..bucket.len().min(63 - 9)];
166
167        // Prepend a random number to ensure the bucket name is unique across multiple tests
168        let bucket = format!("{:08x}-{bucket}", rand::thread_rng().next_u32());
169
170        debug!(%bucket, "Transformed bucket name into valid and unique bucket ID");
171
172        let client = self.aws_client().await?;
173
174        client.create_bucket().bucket(bucket.clone()).send().await?;
175
176        // The creation of a bucket seems like it's sometimes asynchronous, because even after
177        // `create_bucket` returns operations on it can fail with errors like:
178        //
179        // The S3 bucket '7ae02532-filter-by-prefix-src' either doesn't exist, or your IAM identity is not granted access
180        //
181        // So make sure the bucket actually was created before proceeding
182        let policy = again::RetryPolicy::exponential(Duration::from_millis(100))
183            .with_max_retries(10)
184            .with_max_delay(Duration::from_secs(1));
185
186        let client = self.aws_client().await?;
187
188        if let Err(e) = policy
189            .retry(|| client.head_bucket().bucket(&bucket).send())
190            .await
191        {
192            return Err(
193                eyre!("The bucket {bucket} is not accessible even after it was explicitly created.  Last error was: \n{e}")
194            );
195        };
196
197        if enable_versioning {
198            client
199                .put_bucket_versioning()
200                .bucket(bucket.clone())
201                .versioning_configuration(
202                    aws_sdk_s3::types::VersioningConfiguration::builder()
203                        .status(aws_sdk_s3::types::BucketVersioningStatus::Enabled)
204                        .build(),
205                )
206                .send()
207                .await?;
208        }
209
210        debug!(%bucket, "Bucket created");
211
212        Ok(bucket)
213    }
214
215    /// Block until able to successfully connect to the minio server or a timeout ocurrs
216    async fn wait_for_service_start(&self) -> Result<()> {
217        // The server doesn't start immediately upon the process starting; obviously there's a
218        // startup period.  On slow CI boxes this can be agonizingly long.
219        let policy = again::RetryPolicy::exponential(Duration::from_millis(100))
220            .with_max_retries(10)
221            .with_max_delay(Duration::from_secs(1));
222
223        let client = self.aws_client().await?;
224
225        if let Err(e) = policy.retry(|| client.list_buckets().send()).await {
226            Err(
227                eyre!("The minio server didn't come online in the allowed time.  The last error reported by ListBuckets against the server was:\n{}",
228                    e)
229            )
230        } else {
231            Ok(())
232        }
233    }
234
235    fn find_minio() -> Result<PathBuf> {
236        std::env::var_os("MINIO_PATH").map(PathBuf::from)
237            .or_else(|| which("minio").ok())
238            .ok_or_else(|| eyre!("Unable to find `minio`, either set the MINIO_PATH env var or put place the Minio executable in your PATH"))
239    }
240
241    /// Find a socket address on localhost that is free for minio to listen on
242    fn random_endpoint() -> Result<SocketAddr> {
243        let listener = TcpListener::bind("127.0.0.1:0")?;
244        let addr = listener.local_addr()?;
245        drop(listener);
246
247        Ok(addr)
248    }
249
250    /// Get a temporary directory for Minio data.
251    ///
252    /// Minio sucks so much that they've recently made a change whereby it's completely impossible
253    /// to use a data directory in a `tmpfs` filesystem, like for example `/tmp` on most Linux
254    /// distros, because it doesn't support `O_DIRECT` and their bullshit erasure coding storage
255    /// technology requires that.
256    ///
257    /// The only rock solid use case for a tool like minio is as a test S3 endpoint to avoid
258    /// talking to real object storage, and they messed that up!
259    ///
260    /// Instead, we'll make a `ssstar-minio-temp` directory in your home directory.  Apologies in
261    /// advance for the clutter!
262    fn temp_data_dir() -> Result<TempDir> {
263        let home = dirs::home_dir().ok_or_else(|| eyre!("Unable to determine home directory"))?;
264
265        Ok(tempfile::tempdir_in(home)?)
266    }
267}
268
269impl Drop for MinioServer {
270    fn drop(&mut self) {
271        debug!(pids = ?self.handle.pids(), "Killing minio process(es)");
272
273        if let Err(e) = self.handle.kill() {
274            eprintln!("Error killing minio process: {}", e);
275        }
276    }
277}