hexz_store/s3/
async_client.rs1use crate::runtime::global_handle;
4use bytes::Bytes;
5use hexz_common::{Error, Result};
6use hexz_core::store::StorageBackend;
7use s3::bucket::Bucket;
8use s3::creds::Credentials;
9use s3::region::Region;
10use std::io::{Error as IoError, ErrorKind};
11use std::str::FromStr;
12use tokio::runtime::Handle;
13
14#[derive(Debug)]
16pub struct S3Backend {
17 bucket: Box<Bucket>,
18 key: String,
19 len: u64,
20 handle: Handle,
21}
22
23impl S3Backend {
24 pub fn new(
26 bucket_name: &str,
27 key: String,
28 region_name: String,
29 endpoint: Option<String>,
30 ) -> Result<Self> {
31 let handle = global_handle().map_err(Error::Io)?;
32
33 let region = if let Some(ep) = endpoint {
34 Region::Custom {
35 region: region_name,
36 endpoint: ep,
37 }
38 } else {
39 Region::from_str(®ion_name).map_err(|e| {
40 Error::Io(IoError::new(
41 ErrorKind::InvalidInput,
42 format!("Invalid region: {e}"),
43 ))
44 })?
45 };
46
47 let credentials = Credentials::default().map_err(|e| {
48 Error::Io(IoError::new(
49 ErrorKind::PermissionDenied,
50 format!("Missing credentials: {e}"),
51 ))
52 })?;
53
54 let bucket = Bucket::new(bucket_name, region, credentials)
55 .map_err(|e| Error::Io(IoError::other(format!("Bucket error: {e}"))))?
56 .with_path_style();
57
58 let (head, code) = tokio::task::block_in_place(|| {
59 handle.block_on(async {
60 tokio::time::timeout(std::time::Duration::from_secs(30), bucket.head_object(&key))
61 .await
62 .map_err(|_| {
63 Error::Io(IoError::new(
64 ErrorKind::TimedOut,
65 "S3 connection timeout after 30 seconds",
66 ))
67 })?
68 .map_err(|e| Error::Io(IoError::other(format!("S3 Head error: {e}"))))
69 })
70 })?;
71
72 if code != 200 {
73 return Err(Error::Io(IoError::new(
74 ErrorKind::NotFound,
75 format!("S3 object not found or error: {code}"),
76 )));
77 }
78
79 let len = head.content_length.ok_or_else(|| {
80 Error::Io(IoError::new(
81 ErrorKind::InvalidData,
82 "Missing Content-Length",
83 ))
84 })?;
85
86 if len < 0 {
87 return Err(Error::Io(IoError::new(
88 ErrorKind::InvalidData,
89 "Negative Content-Length",
90 )));
91 }
92
93 Ok(Self {
94 bucket: Box::new(bucket),
95 key,
96 len: len as u64,
97 handle,
98 })
99 }
100}
101
102impl StorageBackend for S3Backend {
103 fn read_exact(&self, offset: u64, len: usize) -> Result<Bytes> {
104 if len == 0 {
105 return Ok(Bytes::new());
106 }
107 let end = offset + len as u64 - 1;
108
109 tokio::task::block_in_place(|| {
110 self.handle.block_on(async {
111 let response_data = tokio::time::timeout(
112 std::time::Duration::from_secs(60),
113 self.bucket.get_object_range(&self.key, offset, Some(end)),
114 )
115 .await
116 .map_err(|_| {
117 Error::Io(IoError::new(
118 ErrorKind::TimedOut,
119 "S3 read timeout after 60 seconds",
120 ))
121 })?
122 .map_err(|e| Error::Io(IoError::other(format!("S3 Read error: {e}"))))?;
123
124 let code = response_data.status_code();
125 if code != 200 && code != 206 {
126 return Err(Error::Io(IoError::other(format!("S3 error code: {code}"))));
127 }
128
129 let data = response_data.bytes().clone();
130 if data.len() != len {
131 return Err(Error::Io(IoError::new(
132 ErrorKind::UnexpectedEof,
133 format!("Expected {} bytes, got {}", len, data.len()),
134 )));
135 }
136
137 Ok(data)
138 })
139 })
140 }
141
142 fn len(&self) -> u64 {
143 self.len
144 }
145}