Skip to main content

hexz_store/s3/
remote.rs

1//! S3 implementation of [`RemoteTransport`](crate::remote::RemoteTransport).
2
3use crate::remote::{RemoteArchiveInfo, RemoteTransport};
4use crate::runtime::global_handle;
5use hexz_common::{Error, Result};
6use s3::bucket::Bucket;
7use s3::creds::Credentials;
8use s3::region::Region;
9use std::io::{Error as IoError, ErrorKind};
10use std::path::Path;
11use tokio::runtime::Handle;
12
13/// S3-backed remote transport for push/pull operations.
14#[derive(Debug)]
15pub struct S3Remote {
16    bucket: Box<Bucket>,
17    prefix: String,
18    handle: Handle,
19}
20
21/// Parse an `s3://bucket/prefix` URL into `(bucket_name, prefix)`.
22///
23/// The prefix is normalised to end with `/` (or be empty for bucket root).
24fn parse_s3_url(url: &str) -> Result<(String, String)> {
25    let rest = url.strip_prefix("s3://").ok_or_else(|| {
26        Error::Io(IoError::new(
27            ErrorKind::InvalidInput,
28            format!("Not an S3 URL: {url}"),
29        ))
30    })?;
31
32    let (bucket, prefix) = match rest.find('/') {
33        Some(idx) => {
34            let bucket = &rest[..idx];
35            let mut prefix = rest[idx + 1..].to_string();
36            // Normalise: strip trailing slash then re-add, so "a/b" and "a/b/" both become "a/b/"
37            if !prefix.is_empty() {
38                prefix = prefix.trim_end_matches('/').to_string();
39                prefix.push('/');
40            }
41            (bucket.to_string(), prefix)
42        }
43        None => (rest.to_string(), String::new()),
44    };
45
46    if bucket.is_empty() {
47        return Err(Error::Io(IoError::new(
48            ErrorKind::InvalidInput,
49            "S3 URL has empty bucket name",
50        )));
51    }
52
53    Ok((bucket, prefix))
54}
55
56impl S3Remote {
57    /// Connect to an S3 remote given an `s3://bucket/prefix` URL.
58    ///
59    /// Uses `AWS_REGION` (default `us-east-1`), `AWS_ENDPOINT` (optional, for
60    /// `MinIO` etc.), and standard `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`
61    /// credentials.
62    pub fn connect(url: &str) -> Result<Self> {
63        let handle = global_handle().map_err(Error::Io)?;
64        let (bucket_name, prefix) = parse_s3_url(url)?;
65
66        let region_name = std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string());
67
68        let region = if let Ok(endpoint) = std::env::var("AWS_ENDPOINT") {
69            Region::Custom {
70                region: region_name,
71                endpoint,
72            }
73        } else {
74            region_name.parse::<Region>().map_err(|e| {
75                Error::Io(IoError::new(
76                    ErrorKind::InvalidInput,
77                    format!("Invalid region: {e}"),
78                ))
79            })?
80        };
81
82        let credentials = Credentials::default().map_err(|e| {
83            Error::Io(IoError::new(
84                ErrorKind::PermissionDenied,
85                format!("Missing credentials: {e}"),
86            ))
87        })?;
88
89        let bucket = Bucket::new(&bucket_name, region, credentials)
90            .map_err(|e| Error::Io(IoError::other(format!("Bucket error: {e}"))))?
91            .with_path_style();
92
93        Ok(Self {
94            bucket: Box::new(bucket),
95            prefix,
96            handle,
97        })
98    }
99
100    /// Build the full S3 key for an archive name.
101    fn key(&self, archive_name: &str) -> String {
102        format!("{}{}", self.prefix, archive_name)
103    }
104}
105
106impl RemoteTransport for S3Remote {
107    fn list_archives(&self) -> Result<Vec<RemoteArchiveInfo>> {
108        tokio::task::block_in_place(|| {
109            self.handle.block_on(async {
110                let results = self
111                    .bucket
112                    .list(self.prefix.clone(), None)
113                    .await
114                    .map_err(|e| Error::Io(IoError::other(format!("S3 list error: {e}"))))?;
115
116                let mut archives = Vec::new();
117                for list in &results {
118                    for obj in &list.contents {
119                        let name = obj.key.strip_prefix(&self.prefix).unwrap_or(&obj.key);
120                        if std::path::Path::new(name)
121                            .extension()
122                            .is_some_and(|ext| ext.eq_ignore_ascii_case("hxz"))
123                            && !name.contains('/')
124                        {
125                            archives.push(RemoteArchiveInfo {
126                                name: name.to_string(),
127                                size: obj.size,
128                            });
129                        }
130                    }
131                }
132                Ok(archives)
133            })
134        })
135    }
136
137    fn upload(&self, local_path: &Path, remote_name: &str) -> Result<()> {
138        let data = std::fs::read(local_path).map_err(Error::Io)?;
139        let key = self.key(remote_name);
140
141        tokio::task::block_in_place(|| {
142            self.handle.block_on(async {
143                let response = self
144                    .bucket
145                    .put_object(&key, &data)
146                    .await
147                    .map_err(|e| Error::Io(IoError::other(format!("S3 upload error: {e}"))))?;
148
149                let code = response.status_code();
150                if code != 200 {
151                    return Err(Error::Io(IoError::other(format!(
152                        "S3 upload failed with status {code}"
153                    ))));
154                }
155                Ok(())
156            })
157        })
158    }
159
160    fn download(&self, remote_name: &str, local_path: &Path) -> Result<()> {
161        let key = self.key(remote_name);
162
163        tokio::task::block_in_place(|| {
164            self.handle.block_on(async {
165                let response =
166                    self.bucket.get_object(&key).await.map_err(|e| {
167                        Error::Io(IoError::other(format!("S3 download error: {e}")))
168                    })?;
169
170                let code = response.status_code();
171                if code != 200 {
172                    return Err(Error::Io(IoError::other(format!(
173                        "S3 download failed with status {code}"
174                    ))));
175                }
176
177                std::fs::write(local_path, response.bytes()).map_err(Error::Io)?;
178                Ok(())
179            })
180        })
181    }
182
183    fn exists(&self, remote_name: &str) -> Result<bool> {
184        let key = self.key(remote_name);
185
186        tokio::task::block_in_place(|| {
187            self.handle.block_on(async {
188                match self.bucket.head_object(&key).await {
189                    Ok((_, code)) => Ok(code == 200),
190                    Err(e) => {
191                        // rust-s3 with fail-on-err turns 404 into an error
192                        let msg = e.to_string();
193                        if msg.contains("404") || msg.contains("Not Found") {
194                            Ok(false)
195                        } else {
196                            Err(Error::Io(IoError::other(format!("S3 head error: {e}"))))
197                        }
198                    }
199                }
200            })
201        })
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    #[test]
210    fn test_parse_s3_url_bucket_only() {
211        let (bucket, prefix) = parse_s3_url("s3://my-bucket").unwrap();
212        assert_eq!(bucket, "my-bucket");
213        assert_eq!(prefix, "");
214    }
215
216    #[test]
217    fn test_parse_s3_url_with_prefix() {
218        let (bucket, prefix) = parse_s3_url("s3://my-bucket/archives/v1").unwrap();
219        assert_eq!(bucket, "my-bucket");
220        assert_eq!(prefix, "archives/v1/");
221    }
222
223    #[test]
224    fn test_parse_s3_url_with_trailing_slash() {
225        let (bucket, prefix) = parse_s3_url("s3://my-bucket/archives/").unwrap();
226        assert_eq!(bucket, "my-bucket");
227        assert_eq!(prefix, "archives/");
228    }
229
230    #[test]
231    fn test_parse_s3_url_empty_prefix() {
232        let (bucket, prefix) = parse_s3_url("s3://my-bucket/").unwrap();
233        assert_eq!(bucket, "my-bucket");
234        assert_eq!(prefix, "");
235    }
236
237    #[test]
238    fn test_parse_s3_url_not_s3() {
239        assert!(parse_s3_url("http://example.com").is_err());
240    }
241
242    #[test]
243    fn test_parse_s3_url_empty_bucket() {
244        assert!(parse_s3_url("s3://").is_err());
245    }
246}