1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
//! Wrapper around the `minio` server binary to run ephemeral instances of S3-compatible object
//! storage for testing

use crate::Result;
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{Credentials, Region};
use color_eyre::eyre::eyre;
use duct::Handle;
use once_cell::sync::Lazy;
use rand::prelude::*;
use regex::Regex;
use std::{
    net::{SocketAddr, TcpListener},
    path::PathBuf,
    sync::{Arc, Weak},
    time::Duration,
};
use tempdir::TempDir;
use tokio::sync::Mutex;
use tracing::debug;
use which::which;

pub struct MinioServer {
    #[allow(dead_code)] // Never used, but needs to stay in scope so the temp dir isn't deleted
    temp_dir: TempDir,
    handle: Handle,
    endpoint: SocketAddr,
}

impl MinioServer {
    /// Try to re-use an existing instance that other tests might also be using, but if there isn't
    /// one then start a new one.
    ///
    /// This is preferable to [`Self::start`] because if many tests can re-use the same server it
    /// amortizes the time spent starting up a server over many tests
    pub async fn get() -> Result<Arc<Self>> {
        // Rust Mutex types are const now so once-cell isn't needed, but this code needs a tokio
        // Mutex because the lock is held across await points.  Pity.
        static INSTANCE: once_cell::sync::Lazy<Mutex<Option<Weak<MinioServer>>>> =
            once_cell::sync::Lazy::new(|| Mutex::new(None));

        let mut instance = INSTANCE.lock().await;

        let server = match instance.as_ref() {
            Some(weak) => {
                // A weak ref is already in place.  Is the thing it references still alive?
                match weak.upgrade() {
                    Some(strong) => {
                        // Still alive so we can use this reference
                        strong
                    }
                    None => {
                        // Ref went to zero so server was already dropped.  Start another one
                        let strong = Arc::new(Self::start().await?);
                        *instance = Some(Arc::downgrade(&strong));

                        strong
                    }
                }
            }
            None => {
                // First time this is being called, make a new instance
                let strong = Arc::new(Self::start().await?);
                *instance = Some(Arc::downgrade(&strong));

                strong
            }
        };

        debug!(endpoint = %server.endpoint,
            "get() found minio server");

        // Make sure the server is still working
        server.wait_for_service_start().await?;

        Ok(server)
    }

    /// Start a new minio server on a random high port.
    ///
    /// This assumes that Minio is installed in your system somewhere.  First the env var
    /// `MINIO_PATH` is checked, and if that's not set then it's assumed that `minio` is in your
    /// path.  If that doesn't work then this will fail.
    pub async fn start() -> Result<Self> {
        let path = Self::find_minio()?;

        let endpoint = Self::random_endpoint()?;

        let temp_dir = Self::temp_data_dir()?;

        let server = duct::cmd!(
            path,
            "server",
            temp_dir.path(),
            "--address",
            endpoint.to_string(),
            "--quiet"
        );
        let handle = server
            //.stderr_to_stdout()
            //.stdout_capture()
            .start()?;

        let minio_server = Self {
            temp_dir,
            handle,
            endpoint,
        };

        debug!(endpoint = %minio_server.endpoint, "Waiting for minio service to start");

        minio_server.wait_for_service_start().await?;

        debug!(endpoint = %minio_server.endpoint, "Minio started");

        Ok(minio_server)
    }

    /// The S3 API endpoint URL where the server is listening
    pub fn endpoint_uri(&self) -> http::Uri {
        format!("http://{}/", self.endpoint).parse().unwrap()
    }

    /// The S3 API endpoint URL where the server is listening
    pub fn endpoint_url(&self) -> url::Url {
        self.endpoint_uri().to_string().parse().unwrap()
    }

    /// Get [`Client`] instance that is configured to use this Minio server instance
    pub async fn aws_client(&self) -> Result<aws_sdk_s3::Client> {
        let region_provider = RegionProviderChain::first_try(Region::new("us-east-1"));
        let aws_config = aws_config::from_env()
            .region(region_provider)
            .credentials_provider(Credentials::from_keys("minioadmin", "minioadmin", None))
            .load()
            .await;

        let s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config)
            .endpoint_url(self.endpoint_uri().to_string());

        Ok(aws_sdk_s3::Client::from_conf(s3_config_builder.build()))
    }

    /// Make a new bucket on this Minio instance for testing purposes.
    ///
    /// The actual bucket name will have a random suffix appended to it, because we run multiple
    /// tests against the same minio service and we don't want them to conflict with one another.
    pub async fn create_bucket(
        &self,
        bucket: impl AsRef<str>,
        enable_versioning: bool,
    ) -> Result<String> {
        // Bucket names can be a maximum of 63 characters, can consist of letters and numbers and .
        // and - characters, with two `.` characters in a row forbidden.
        //
        // So need to make this bucket name comply with the rules
        static REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r##"[^0-9a-zA-Z\.\-]+"##).unwrap());

        debug!(bucket = bucket.as_ref(), "Creating bucket");

        let bucket = REGEX.replace_all(bucket.as_ref(), "-");

        // Shorten the bucket name so we can append the unique ID and it will still be under 63
        // chars
        let bucket = &bucket[..bucket.len().min(63 - 9)];

        // Prepend a random number to ensure the bucket name is unique across multiple tests
        let bucket = format!("{:08x}-{bucket}", rand::thread_rng().next_u32());

        debug!(%bucket, "Transformed bucket name into valid and unique bucket ID");

        let client = self.aws_client().await?;

        client.create_bucket().bucket(bucket.clone()).send().await?;

        // The creation of a bucket seems like it's sometimes asynchronous, because even after
        // `create_bucket` returns operations on it can fail with errors like:
        //
        // The S3 bucket '7ae02532-filter-by-prefix-src' either doesn't exist, or your IAM identity is not granted access
        //
        // So make sure the bucket actually was created before proceeding
        let policy = again::RetryPolicy::exponential(Duration::from_millis(100))
            .with_max_retries(10)
            .with_max_delay(Duration::from_secs(1));

        let client = self.aws_client().await?;

        if let Err(e) = policy
            .retry(|| client.head_bucket().bucket(&bucket).send())
            .await
        {
            return Err(
                eyre!("The bucket {bucket} is not accessible even after it was explicitly created.  Last error was: \n{e}")
            );
        };

        if enable_versioning {
            client
                .put_bucket_versioning()
                .bucket(bucket.clone())
                .versioning_configuration(
                    aws_sdk_s3::model::VersioningConfiguration::builder()
                        .status(aws_sdk_s3::model::BucketVersioningStatus::Enabled)
                        .build(),
                )
                .send()
                .await?;
        }

        debug!(%bucket, "Bucket created");

        Ok(bucket)
    }

    /// Block until able to successfully connect to the minio server or a timeout ocurrs
    async fn wait_for_service_start(&self) -> Result<()> {
        // The server doesn't start immediately upon the process starting; obviously there's a
        // startup period.  On slow CI boxes this can be agonizingly long.
        let policy = again::RetryPolicy::exponential(Duration::from_millis(100))
            .with_max_retries(10)
            .with_max_delay(Duration::from_secs(1));

        let client = self.aws_client().await?;

        if let Err(e) = policy.retry(|| client.list_buckets().send()).await {
            Err(
                eyre!("The minio server didn't come online in the allowed time.  The last error reported by ListBuckets against the server was:\n{}",
                    e)
            )
        } else {
            Ok(())
        }
    }

    fn find_minio() -> Result<PathBuf> {
        std::env::var_os("MINIO_PATH").map(PathBuf::from)
            .or_else(|| which("minio").ok())
            .ok_or_else(|| eyre!("Unable to find `minio`, either set the MINIO_PATH env var or put place the Minio executable in your PATH"))
    }

    /// Find a socket address on localhost that is free for minio to listen on
    fn random_endpoint() -> Result<SocketAddr> {
        let listener = TcpListener::bind("127.0.0.1:0")?;
        let addr = listener.local_addr()?;
        drop(listener);

        Ok(addr)
    }

    /// Get a temporary directory for Minio data.
    ///
    /// Minio sucks so much that they've recently made a change whereby it's completely impossible
    /// to use a data directory in a `tmpfs` filesystem, like for example `/tmp` on most Linux
    /// distros, because it doesn't support `O_DIRECT` and their bullshit erasure coding storage
    /// technology requires that.
    ///
    /// The only rock solid use case for a tool like minio is as a test S3 endpoint to avoid
    /// talking to real object storage, and they messed that up!
    ///
    /// Instead, we'll make a `ssstar-minio-temp` directory in your home directory.  Apologies in
    /// advance for the clutter!
    fn temp_data_dir() -> Result<TempDir> {
        let home = dirs::home_dir().ok_or_else(|| eyre!("Unable to determine home directory"))?;

        Ok(TempDir::new_in(home, "ssstar-minio-temp")?)
    }
}

impl Drop for MinioServer {
    fn drop(&mut self) {
        debug!(pids = ?self.handle.pids(), "Killing minio process(es)");

        if let Err(e) = self.handle.kill() {
            eprintln!("Error killing minio process: {}", e);
        }
    }
}