Skip to main content

hexz_core/store/s3/
async_client.rs

1//! S3 storage backend with embedded Tokio runtime.
2//!
3//! This module provides an S3 storage backend that wraps the `rust-s3` async
4//! client in an embedded Tokio runtime. This allows the backend to present a
5//! synchronous `StorageBackend` interface while leveraging async I/O internally
6//! for efficient concurrent S3 operations.
7//!
8//! # Architecture
9//!
10//! The [`S3Backend`] embeds a Tokio runtime (`Arc<Runtime>`) and uses
11//! `runtime.block_on()` to execute async S3 operations synchronously. This design:
12//! - Maintains compatibility with the synchronous `StorageBackend` trait
13//! - Enables efficient connection pooling and concurrent requests via `rust-s3`
14//! - Provides async benefits (low memory overhead per connection) without requiring
15//!   callers to use async/await
16//!
17//! # Feature Gate
18//!
19//! This module is only available when the `s3` feature is enabled:
20//! ```toml
21//! [dependencies]
22//! hexz-core = { version = "*", features = ["s3"] }
23//! ```
24//!
25//! # Custom Endpoints (S3-Compatible Storage)
26//!
27//! This backend supports S3-compatible object storage systems (MinIO, DigitalOcean
28//! Spaces, Wasabi, etc.) via the optional `endpoint` parameter. When provided,
29//! the backend uses a custom endpoint URL instead of AWS S3.
30//!
31//! # Thread Safety
32//!
33//! The backend is fully thread-safe (`Send + Sync`):
34//! - The `rust-s3` async client is designed for concurrent use
35//! - The `Arc<Runtime>` is shared safely across threads
36//! - Multiple threads can call `read_exact()` concurrently without coordination
37//!
38//! # Performance Characteristics
39//!
40//! - **Latency**: 50-200ms per request (network RTT + S3 processing)
41//! - **Throughput**: Up to 100MB/s per connection (scales with parallel requests)
42//! - **Runtime overhead**: ~100µs per request for `block_on()` context switch
43//!
44//! # Examples
45//!
46//! ```no_run
47//! # #[cfg(feature = "s3")]
48//! # {
49//! use hexz_core::store::s3::S3Backend;
50//! use hexz_core::store::StorageBackend;
51//!
52//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
53//! let backend = S3Backend::new(
54//!     "my-snapshots".to_string(),
55//!     "prod/snapshot-001.hxz".to_string(),
56//!     "us-east-1".to_string(),
57//!     None // Use AWS endpoint
58//! )?;
59//!
60//! let data = backend.read_exact(0, 512)?;
61//! assert_eq!(data.len(), 512);
62//! # Ok(())
63//! # }
64//! # }
65//! ```
66
67#[cfg(feature = "s3")]
68use crate::store::StorageBackend;
69#[cfg(feature = "s3")]
70use crate::store::runtime::global_handle;
71#[cfg(feature = "s3")]
72use bytes::Bytes;
73#[cfg(feature = "s3")]
74use hexz_common::{Error, Result};
75#[cfg(feature = "s3")]
76use s3::bucket::Bucket;
77#[cfg(feature = "s3")]
78use s3::creds::Credentials;
79#[cfg(feature = "s3")]
80use s3::region::Region;
81#[cfg(feature = "s3")]
82use std::io::{Error as IoError, ErrorKind};
83#[cfg(feature = "s3")]
84use std::str::FromStr;
85#[cfg(feature = "s3")]
86use tokio::runtime::Handle;
87
88/// S3 storage backend with embedded Tokio runtime.
89///
90/// This backend wraps an async `rust-s3` `Bucket` client and Tokio `Runtime` to
91/// provide synchronous `StorageBackend` operations while leveraging async I/O
92/// internally. It supports both AWS S3 and S3-compatible storage via custom endpoints.
93///
94/// # Examples
95///
96/// ```no_run
97/// # #[cfg(feature = "s3")]
98/// # {
99/// use hexz_core::store::s3::S3Backend;
100/// use hexz_core::store::StorageBackend;
101///
102/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
103/// // AWS S3
104/// let backend = S3Backend::new(
105///     "my-snapshots".to_string(),
106///     "snapshot.hxz".to_string(),
107///     "us-east-1".to_string(),
108///     None
109/// )?;
110///
111/// // Read 4KB at offset 8192
112/// let data = backend.read_exact(8192, 4096)?;
113/// assert_eq!(data.len(), 4096);
114/// # Ok(())
115/// # }
116/// # }
117/// ```
118#[cfg(feature = "s3")]
119#[derive(Debug)]
120pub struct S3Backend {
121    bucket: Box<Bucket>,
122    key: String,
123    len: u64,
124    handle: Handle,
125}
126
127#[cfg(feature = "s3")]
128impl S3Backend {
129    /// Creates a new S3 backend with optional custom endpoint support.
130    ///
131    /// This constructor:
132    /// 1. Creates a Tokio runtime for executing async operations
133    /// 2. Parses the region and optionally constructs a custom endpoint
134    /// 3. Loads AWS credentials from the default credential chain
135    /// 4. Creates a `Bucket` client with path-style addressing
136    /// 5. Sends an async HEAD request to verify object existence and fetch size
137    ///
138    /// # Parameters
139    ///
140    /// - `bucket_name`: The S3 bucket name (e.g., `"my-snapshots"`)
141    /// - `key`: The object key within the bucket (e.g., `"prod/snapshot-001.hxz"`)
142    /// - `region_name`: AWS region or arbitrary name for custom endpoints (e.g., `"us-east-1"`)
143    /// - `endpoint`: Optional custom endpoint URL for S3-compatible storage
144    ///   (e.g., `Some("https://minio.example.com:9000".to_string())`)
145    pub fn new(
146        bucket_name: String,
147        key: String,
148        region_name: String,
149        endpoint: Option<String>,
150    ) -> Result<Self> {
151        let handle = global_handle();
152
153        let region = if let Some(ep) = endpoint {
154            Region::Custom {
155                region: region_name,
156                endpoint: ep,
157            }
158        } else {
159            Region::from_str(&region_name).map_err(|e| {
160                Error::Io(IoError::new(
161                    ErrorKind::InvalidInput,
162                    format!("Invalid region: {}", e),
163                ))
164            })?
165        };
166
167        let credentials = Credentials::default().map_err(|e| {
168            Error::Io(IoError::new(
169                ErrorKind::PermissionDenied,
170                format!("Missing credentials: {}", e),
171            ))
172        })?;
173
174        let bucket = Bucket::new(&bucket_name, region, credentials)
175            .map_err(|e| Error::Io(IoError::other(format!("Bucket error: {}", e))))?
176            .with_path_style();
177
178        // Perform HEAD request to get size and validate access (with 30s timeout)
179        let (head, code) = handle.block_on(async {
180            tokio::time::timeout(std::time::Duration::from_secs(30), bucket.head_object(&key))
181                .await
182                .map_err(|_| {
183                    Error::Io(IoError::new(
184                        ErrorKind::TimedOut,
185                        "S3 connection timeout after 30 seconds",
186                    ))
187                })?
188                .map_err(|e| Error::Io(IoError::other(format!("S3 Head error: {}", e))))
189        })?;
190
191        if code != 200 {
192            return Err(Error::Io(IoError::new(
193                ErrorKind::NotFound,
194                format!("S3 object not found or error: {}", code),
195            )));
196        }
197
198        let len = head.content_length.ok_or_else(|| {
199            Error::Io(IoError::new(
200                ErrorKind::InvalidData,
201                "Missing Content-Length",
202            ))
203        })?;
204
205        if len < 0 {
206            return Err(Error::Io(IoError::new(
207                ErrorKind::InvalidData,
208                "Negative Content-Length",
209            )));
210        }
211
212        Ok(Self {
213            bucket: Box::new(bucket),
214            key,
215            len: len as u64,
216            handle,
217        })
218    }
219}
220
221#[cfg(feature = "s3")]
222impl StorageBackend for S3Backend {
223    fn read_exact(&self, offset: u64, len: usize) -> Result<Bytes> {
224        if len == 0 {
225            return Ok(Bytes::copy_from_slice(&[]));
226        }
227        let end = offset + len as u64 - 1;
228
229        self.handle.block_on(async {
230            let response_data = tokio::time::timeout(
231                std::time::Duration::from_secs(60),
232                self.bucket.get_object_range(&self.key, offset, Some(end)),
233            )
234            .await
235            .map_err(|_| {
236                Error::Io(IoError::new(
237                    ErrorKind::TimedOut,
238                    "S3 read timeout after 60 seconds",
239                ))
240            })?
241            .map_err(|e| Error::Io(IoError::other(format!("S3 Read error: {}", e))))?;
242
243            let code = response_data.status_code();
244            if code != 200 && code != 206 {
245                return Err(Error::Io(IoError::other(format!(
246                    "S3 error code: {}",
247                    code
248                ))));
249            }
250
251            let data = response_data.as_slice();
252
253            if data.len() != len {
254                return Err(Error::Io(IoError::new(
255                    ErrorKind::UnexpectedEof,
256                    format!("Expected {} bytes, got {}", len, data.len()),
257                )));
258            }
259
260            Ok(Bytes::copy_from_slice(data))
261        })
262    }
263
264    fn len(&self) -> u64 {
265        self.len
266    }
267}