1use crate::remote::{RemoteArchiveInfo, RemoteTransport};
4use crate::runtime::global_handle;
5use hexz_common::{Error, Result};
6use s3::bucket::Bucket;
7use s3::creds::Credentials;
8use s3::region::Region;
9use std::io::{Error as IoError, ErrorKind};
10use std::path::Path;
11use tokio::runtime::Handle;
12
13#[derive(Debug)]
15pub struct S3Remote {
16 bucket: Box<Bucket>,
17 prefix: String,
18 handle: Handle,
19}
20
21fn parse_s3_url(url: &str) -> Result<(String, String)> {
25 let rest = url.strip_prefix("s3://").ok_or_else(|| {
26 Error::Io(IoError::new(
27 ErrorKind::InvalidInput,
28 format!("Not an S3 URL: {url}"),
29 ))
30 })?;
31
32 let (bucket, prefix) = match rest.find('/') {
33 Some(idx) => {
34 let bucket = &rest[..idx];
35 let mut prefix = rest[idx + 1..].to_string();
36 if !prefix.is_empty() {
38 prefix = prefix.trim_end_matches('/').to_string();
39 prefix.push('/');
40 }
41 (bucket.to_string(), prefix)
42 }
43 None => (rest.to_string(), String::new()),
44 };
45
46 if bucket.is_empty() {
47 return Err(Error::Io(IoError::new(
48 ErrorKind::InvalidInput,
49 "S3 URL has empty bucket name",
50 )));
51 }
52
53 Ok((bucket, prefix))
54}
55
56impl S3Remote {
57 pub fn connect(url: &str) -> Result<Self> {
63 let handle = global_handle().map_err(Error::Io)?;
64 let (bucket_name, prefix) = parse_s3_url(url)?;
65
66 let region_name = std::env::var("AWS_REGION").unwrap_or_else(|_| "us-east-1".to_string());
67
68 let region = if let Ok(endpoint) = std::env::var("AWS_ENDPOINT") {
69 Region::Custom {
70 region: region_name,
71 endpoint,
72 }
73 } else {
74 region_name.parse::<Region>().map_err(|e| {
75 Error::Io(IoError::new(
76 ErrorKind::InvalidInput,
77 format!("Invalid region: {e}"),
78 ))
79 })?
80 };
81
82 let credentials = Credentials::default().map_err(|e| {
83 Error::Io(IoError::new(
84 ErrorKind::PermissionDenied,
85 format!("Missing credentials: {e}"),
86 ))
87 })?;
88
89 let bucket = Bucket::new(&bucket_name, region, credentials)
90 .map_err(|e| Error::Io(IoError::other(format!("Bucket error: {e}"))))?
91 .with_path_style();
92
93 Ok(Self {
94 bucket: Box::new(bucket),
95 prefix,
96 handle,
97 })
98 }
99
100 fn key(&self, archive_name: &str) -> String {
102 format!("{}{}", self.prefix, archive_name)
103 }
104}
105
106impl RemoteTransport for S3Remote {
107 fn list_archives(&self) -> Result<Vec<RemoteArchiveInfo>> {
108 tokio::task::block_in_place(|| {
109 self.handle.block_on(async {
110 let results = self
111 .bucket
112 .list(self.prefix.clone(), None)
113 .await
114 .map_err(|e| Error::Io(IoError::other(format!("S3 list error: {e}"))))?;
115
116 let mut archives = Vec::new();
117 for list in &results {
118 for obj in &list.contents {
119 let name = obj.key.strip_prefix(&self.prefix).unwrap_or(&obj.key);
120 if std::path::Path::new(name)
121 .extension()
122 .is_some_and(|ext| ext.eq_ignore_ascii_case("hxz"))
123 && !name.contains('/')
124 {
125 archives.push(RemoteArchiveInfo {
126 name: name.to_string(),
127 size: obj.size,
128 });
129 }
130 }
131 }
132 Ok(archives)
133 })
134 })
135 }
136
137 fn upload(&self, local_path: &Path, remote_name: &str) -> Result<()> {
138 let data = std::fs::read(local_path).map_err(Error::Io)?;
139 let key = self.key(remote_name);
140
141 tokio::task::block_in_place(|| {
142 self.handle.block_on(async {
143 let response = self
144 .bucket
145 .put_object(&key, &data)
146 .await
147 .map_err(|e| Error::Io(IoError::other(format!("S3 upload error: {e}"))))?;
148
149 let code = response.status_code();
150 if code != 200 {
151 return Err(Error::Io(IoError::other(format!(
152 "S3 upload failed with status {code}"
153 ))));
154 }
155 Ok(())
156 })
157 })
158 }
159
160 fn download(&self, remote_name: &str, local_path: &Path) -> Result<()> {
161 let key = self.key(remote_name);
162
163 tokio::task::block_in_place(|| {
164 self.handle.block_on(async {
165 let response =
166 self.bucket.get_object(&key).await.map_err(|e| {
167 Error::Io(IoError::other(format!("S3 download error: {e}")))
168 })?;
169
170 let code = response.status_code();
171 if code != 200 {
172 return Err(Error::Io(IoError::other(format!(
173 "S3 download failed with status {code}"
174 ))));
175 }
176
177 std::fs::write(local_path, response.bytes()).map_err(Error::Io)?;
178 Ok(())
179 })
180 })
181 }
182
183 fn exists(&self, remote_name: &str) -> Result<bool> {
184 let key = self.key(remote_name);
185
186 tokio::task::block_in_place(|| {
187 self.handle.block_on(async {
188 match self.bucket.head_object(&key).await {
189 Ok((_, code)) => Ok(code == 200),
190 Err(e) => {
191 let msg = e.to_string();
193 if msg.contains("404") || msg.contains("Not Found") {
194 Ok(false)
195 } else {
196 Err(Error::Io(IoError::other(format!("S3 head error: {e}"))))
197 }
198 }
199 }
200 })
201 })
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 #[test]
210 fn test_parse_s3_url_bucket_only() {
211 let (bucket, prefix) = parse_s3_url("s3://my-bucket").unwrap();
212 assert_eq!(bucket, "my-bucket");
213 assert_eq!(prefix, "");
214 }
215
216 #[test]
217 fn test_parse_s3_url_with_prefix() {
218 let (bucket, prefix) = parse_s3_url("s3://my-bucket/archives/v1").unwrap();
219 assert_eq!(bucket, "my-bucket");
220 assert_eq!(prefix, "archives/v1/");
221 }
222
223 #[test]
224 fn test_parse_s3_url_with_trailing_slash() {
225 let (bucket, prefix) = parse_s3_url("s3://my-bucket/archives/").unwrap();
226 assert_eq!(bucket, "my-bucket");
227 assert_eq!(prefix, "archives/");
228 }
229
230 #[test]
231 fn test_parse_s3_url_empty_prefix() {
232 let (bucket, prefix) = parse_s3_url("s3://my-bucket/").unwrap();
233 assert_eq!(bucket, "my-bucket");
234 assert_eq!(prefix, "");
235 }
236
237 #[test]
238 fn test_parse_s3_url_not_s3() {
239 assert!(parse_s3_url("http://example.com").is_err());
240 }
241
242 #[test]
243 fn test_parse_s3_url_empty_bucket() {
244 assert!(parse_s3_url("s3://").is_err());
245 }
246}