Skip to main content

hexz_store/s3/
async_client.rs

1//! S3 storage backend with embedded Tokio runtime.
2
3use crate::runtime::global_handle;
4use bytes::Bytes;
5use hexz_common::{Error, Result};
6use hexz_core::store::StorageBackend;
7use s3::bucket::Bucket;
8use s3::creds::Credentials;
9use s3::region::Region;
10use std::io::{Error as IoError, ErrorKind};
11use std::str::FromStr;
12use tokio::runtime::Handle;
13
14/// S3 storage backend with embedded Tokio runtime.
15#[derive(Debug)]
16pub struct S3Backend {
17    bucket: Box<Bucket>,
18    key: String,
19    len: u64,
20    handle: Handle,
21}
22
23impl S3Backend {
24    /// Creates an S3 backend, verifies the object exists, and caches its length.
25    pub fn new(
26        bucket_name: &str,
27        key: String,
28        region_name: String,
29        endpoint: Option<String>,
30    ) -> Result<Self> {
31        let handle = global_handle().map_err(Error::Io)?;
32
33        let region = if let Some(ep) = endpoint {
34            Region::Custom {
35                region: region_name,
36                endpoint: ep,
37            }
38        } else {
39            Region::from_str(&region_name).map_err(|e| {
40                Error::Io(IoError::new(
41                    ErrorKind::InvalidInput,
42                    format!("Invalid region: {e}"),
43                ))
44            })?
45        };
46
47        let credentials = Credentials::default().map_err(|e| {
48            Error::Io(IoError::new(
49                ErrorKind::PermissionDenied,
50                format!("Missing credentials: {e}"),
51            ))
52        })?;
53
54        let bucket = Bucket::new(bucket_name, region, credentials)
55            .map_err(|e| Error::Io(IoError::other(format!("Bucket error: {e}"))))?
56            .with_path_style();
57
58        let (head, code) = tokio::task::block_in_place(|| {
59            handle.block_on(async {
60                tokio::time::timeout(std::time::Duration::from_secs(30), bucket.head_object(&key))
61                    .await
62                    .map_err(|_| {
63                        Error::Io(IoError::new(
64                            ErrorKind::TimedOut,
65                            "S3 connection timeout after 30 seconds",
66                        ))
67                    })?
68                    .map_err(|e| Error::Io(IoError::other(format!("S3 Head error: {e}"))))
69            })
70        })?;
71
72        if code != 200 {
73            return Err(Error::Io(IoError::new(
74                ErrorKind::NotFound,
75                format!("S3 object not found or error: {code}"),
76            )));
77        }
78
79        let len = head.content_length.ok_or_else(|| {
80            Error::Io(IoError::new(
81                ErrorKind::InvalidData,
82                "Missing Content-Length",
83            ))
84        })?;
85
86        if len < 0 {
87            return Err(Error::Io(IoError::new(
88                ErrorKind::InvalidData,
89                "Negative Content-Length",
90            )));
91        }
92
93        Ok(Self {
94            bucket: Box::new(bucket),
95            key,
96            len: len as u64,
97            handle,
98        })
99    }
100}
101
102impl StorageBackend for S3Backend {
103    fn read_exact(&self, offset: u64, len: usize) -> Result<Bytes> {
104        if len == 0 {
105            return Ok(Bytes::new());
106        }
107        let end = offset + len as u64 - 1;
108
109        tokio::task::block_in_place(|| {
110            self.handle.block_on(async {
111                let response_data = tokio::time::timeout(
112                    std::time::Duration::from_secs(60),
113                    self.bucket.get_object_range(&self.key, offset, Some(end)),
114                )
115                .await
116                .map_err(|_| {
117                    Error::Io(IoError::new(
118                        ErrorKind::TimedOut,
119                        "S3 read timeout after 60 seconds",
120                    ))
121                })?
122                .map_err(|e| Error::Io(IoError::other(format!("S3 Read error: {e}"))))?;
123
124                let code = response_data.status_code();
125                if code != 200 && code != 206 {
126                    return Err(Error::Io(IoError::other(format!("S3 error code: {code}"))));
127                }
128
129                let data = response_data.bytes().clone();
130                if data.len() != len {
131                    return Err(Error::Io(IoError::new(
132                        ErrorKind::UnexpectedEof,
133                        format!("Expected {} bytes, got {}", len, data.len()),
134                    )));
135                }
136
137                Ok(data)
138            })
139        })
140    }
141
142    fn len(&self) -> u64 {
143        self.len
144    }
145}