hexz_core/store/s3/async_client.rs
1//! S3 storage backend with embedded Tokio runtime.
2//!
3//! This module provides an S3 storage backend that wraps the `rust-s3` async
4//! client in an embedded Tokio runtime. This allows the backend to present a
5//! synchronous `StorageBackend` interface while leveraging async I/O internally
6//! for efficient concurrent S3 operations.
7//!
8//! # Architecture
9//!
10//! The [`S3Backend`] embeds a Tokio runtime (`Arc<Runtime>`) and uses
11//! `runtime.block_on()` to execute async S3 operations synchronously. This design:
12//! - Maintains compatibility with the synchronous `StorageBackend` trait
13//! - Enables efficient connection pooling and concurrent requests via `rust-s3`
14//! - Provides async benefits (low memory overhead per connection) without requiring
15//! callers to use async/await
16//!
17//! # Feature Gate
18//!
19//! This module is only available when the `s3` feature is enabled:
20//! ```toml
21//! [dependencies]
22//! hexz-core = { version = "*", features = ["s3"] }
23//! ```
24//!
25//! # Custom Endpoints (S3-Compatible Storage)
26//!
27//! This backend supports S3-compatible object storage systems (MinIO, DigitalOcean
28//! Spaces, Wasabi, etc.) via the optional `endpoint` parameter. When provided,
29//! the backend uses a custom endpoint URL instead of AWS S3.
30//!
31//! # Thread Safety
32//!
33//! The backend is fully thread-safe (`Send + Sync`):
34//! - The `rust-s3` async client is designed for concurrent use
35//! - The `Arc<Runtime>` is shared safely across threads
36//! - Multiple threads can call `read_exact()` concurrently without coordination
37//!
38//! # Performance Characteristics
39//!
40//! - **Latency**: 50-200ms per request (network RTT + S3 processing)
41//! - **Throughput**: Up to 100MB/s per connection (scales with parallel requests)
42//! - **Runtime overhead**: ~100µs per request for `block_on()` context switch
43//!
44//! # Examples
45//!
46//! ```no_run
47//! # #[cfg(feature = "s3")]
48//! # {
49//! use hexz_core::store::s3::S3Backend;
50//! use hexz_core::store::StorageBackend;
51//!
52//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
53//! let backend = S3Backend::new(
54//! "my-snapshots".to_string(),
55//! "prod/snapshot-001.hxz".to_string(),
56//! "us-east-1".to_string(),
57//! None // Use AWS endpoint
58//! )?;
59//!
60//! let data = backend.read_exact(0, 512)?;
61//! assert_eq!(data.len(), 512);
62//! # Ok(())
63//! # }
64//! # }
65//! ```
66
67#[cfg(feature = "s3")]
68use crate::store::StorageBackend;
69#[cfg(feature = "s3")]
70use crate::store::runtime::global_handle;
71#[cfg(feature = "s3")]
72use bytes::Bytes;
73#[cfg(feature = "s3")]
74use hexz_common::{Error, Result};
75#[cfg(feature = "s3")]
76use s3::bucket::Bucket;
77#[cfg(feature = "s3")]
78use s3::creds::Credentials;
79#[cfg(feature = "s3")]
80use s3::region::Region;
81#[cfg(feature = "s3")]
82use std::io::{Error as IoError, ErrorKind};
83#[cfg(feature = "s3")]
84use std::str::FromStr;
85#[cfg(feature = "s3")]
86use tokio::runtime::Handle;
87
88/// S3 storage backend with embedded Tokio runtime.
89///
90/// This backend wraps an async `rust-s3` `Bucket` client and Tokio `Runtime` to
91/// provide synchronous `StorageBackend` operations while leveraging async I/O
92/// internally. It supports both AWS S3 and S3-compatible storage via custom endpoints.
93///
94/// # Examples
95///
96/// ```no_run
97/// # #[cfg(feature = "s3")]
98/// # {
99/// use hexz_core::store::s3::S3Backend;
100/// use hexz_core::store::StorageBackend;
101///
102/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
103/// // AWS S3
104/// let backend = S3Backend::new(
105/// "my-snapshots".to_string(),
106/// "snapshot.hxz".to_string(),
107/// "us-east-1".to_string(),
108/// None
109/// )?;
110///
111/// // Read 4KB at offset 8192
112/// let data = backend.read_exact(8192, 4096)?;
113/// assert_eq!(data.len(), 4096);
114/// # Ok(())
115/// # }
116/// # }
117/// ```
118#[cfg(feature = "s3")]
119#[derive(Debug)]
120pub struct S3Backend {
121 bucket: Box<Bucket>,
122 key: String,
123 len: u64,
124 handle: Handle,
125}
126
127#[cfg(feature = "s3")]
128impl S3Backend {
129 /// Creates a new S3 backend with optional custom endpoint support.
130 ///
131 /// This constructor:
132 /// 1. Creates a Tokio runtime for executing async operations
133 /// 2. Parses the region and optionally constructs a custom endpoint
134 /// 3. Loads AWS credentials from the default credential chain
135 /// 4. Creates a `Bucket` client with path-style addressing
136 /// 5. Sends an async HEAD request to verify object existence and fetch size
137 ///
138 /// # Parameters
139 ///
140 /// - `bucket_name`: The S3 bucket name (e.g., `"my-snapshots"`)
141 /// - `key`: The object key within the bucket (e.g., `"prod/snapshot-001.hxz"`)
142 /// - `region_name`: AWS region or arbitrary name for custom endpoints (e.g., `"us-east-1"`)
143 /// - `endpoint`: Optional custom endpoint URL for S3-compatible storage
144 /// (e.g., `Some("https://minio.example.com:9000".to_string())`)
145 pub fn new(
146 bucket_name: String,
147 key: String,
148 region_name: String,
149 endpoint: Option<String>,
150 ) -> Result<Self> {
151 let handle = global_handle();
152
153 let region = if let Some(ep) = endpoint {
154 Region::Custom {
155 region: region_name,
156 endpoint: ep,
157 }
158 } else {
159 Region::from_str(®ion_name).map_err(|e| {
160 Error::Io(IoError::new(
161 ErrorKind::InvalidInput,
162 format!("Invalid region: {}", e),
163 ))
164 })?
165 };
166
167 let credentials = Credentials::default().map_err(|e| {
168 Error::Io(IoError::new(
169 ErrorKind::PermissionDenied,
170 format!("Missing credentials: {}", e),
171 ))
172 })?;
173
174 let bucket = Bucket::new(&bucket_name, region, credentials)
175 .map_err(|e| Error::Io(IoError::other(format!("Bucket error: {}", e))))?
176 .with_path_style();
177
178 // Perform HEAD request to get size and validate access (with 30s timeout)
179 let (head, code) = handle.block_on(async {
180 tokio::time::timeout(std::time::Duration::from_secs(30), bucket.head_object(&key))
181 .await
182 .map_err(|_| {
183 Error::Io(IoError::new(
184 ErrorKind::TimedOut,
185 "S3 connection timeout after 30 seconds",
186 ))
187 })?
188 .map_err(|e| Error::Io(IoError::other(format!("S3 Head error: {}", e))))
189 })?;
190
191 if code != 200 {
192 return Err(Error::Io(IoError::new(
193 ErrorKind::NotFound,
194 format!("S3 object not found or error: {}", code),
195 )));
196 }
197
198 let len = head.content_length.ok_or_else(|| {
199 Error::Io(IoError::new(
200 ErrorKind::InvalidData,
201 "Missing Content-Length",
202 ))
203 })?;
204
205 if len < 0 {
206 return Err(Error::Io(IoError::new(
207 ErrorKind::InvalidData,
208 "Negative Content-Length",
209 )));
210 }
211
212 Ok(Self {
213 bucket: Box::new(bucket),
214 key,
215 len: len as u64,
216 handle,
217 })
218 }
219}
220
221#[cfg(feature = "s3")]
222impl StorageBackend for S3Backend {
223 fn read_exact(&self, offset: u64, len: usize) -> Result<Bytes> {
224 if len == 0 {
225 return Ok(Bytes::copy_from_slice(&[]));
226 }
227 let end = offset + len as u64 - 1;
228
229 self.handle.block_on(async {
230 let response_data = tokio::time::timeout(
231 std::time::Duration::from_secs(60),
232 self.bucket.get_object_range(&self.key, offset, Some(end)),
233 )
234 .await
235 .map_err(|_| {
236 Error::Io(IoError::new(
237 ErrorKind::TimedOut,
238 "S3 read timeout after 60 seconds",
239 ))
240 })?
241 .map_err(|e| Error::Io(IoError::other(format!("S3 Read error: {}", e))))?;
242
243 let code = response_data.status_code();
244 if code != 200 && code != 206 {
245 return Err(Error::Io(IoError::other(format!(
246 "S3 error code: {}",
247 code
248 ))));
249 }
250
251 let data = response_data.as_slice();
252
253 if data.len() != len {
254 return Err(Error::Io(IoError::new(
255 ErrorKind::UnexpectedEof,
256 format!("Expected {} bytes, got {}", len, data.len()),
257 )));
258 }
259
260 Ok(Bytes::copy_from_slice(data))
261 })
262 }
263
264 fn len(&self) -> u64 {
265 self.len
266 }
267}