hexz_core/store/http/async_client.rs
1//! HTTP storage backend with embedded Tokio runtime.
2//!
3//! This module provides an HTTP storage backend that wraps the `reqwest` async
4//! client in an embedded Tokio runtime. This allows the backend to present a
5//! synchronous `StorageBackend` interface while leveraging async I/O internally
6//! for efficient concurrent operations.
7//!
8//! # Architecture
9//!
10//! The [`HttpBackend`] embeds a Tokio runtime (`Arc<Runtime>`) and uses
11//! `runtime.block_on()` to execute async operations synchronously. This design:
12//! - Maintains compatibility with the synchronous `StorageBackend` trait
13//! - Enables efficient connection pooling and concurrent requests via `reqwest`
14//! - Provides async benefits (low memory overhead per connection) without requiring
15//! callers to use async/await
16//!
17//! # Thread Safety
18//!
19//! The backend is fully thread-safe (`Send + Sync`):
20//! - The `reqwest::Client` is designed for concurrent use
21//! - The `Arc<Runtime>` is shared safely across threads
22//! - Multiple threads can call `read_exact()` concurrently without coordination
23//!
24//! # Security
25//!
26//! This backend validates URLs to prevent SSRF attacks:
27//! - Blocks access to localhost and private networks by default
28//! - Set `allow_restricted: true` only in trusted environments
29//! - See [`validate_url`](crate::store::utils::validate_url) for details
30//!
31//! # Examples
32//!
33//! ```no_run
34//! use hexz_core::store::http::HttpBackend;
35//! use hexz_core::store::StorageBackend;
36//!
37//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
38//! let backend = HttpBackend::new(
39//! "https://cdn.example.com/snapshots/data.hxz".to_string(),
40//! false // block restricted IPs
41//! )?;
42//!
43//! println!("Snapshot size: {} bytes", backend.len());
44//!
45//! let header = backend.read_exact(0, 512)?;
46//! assert_eq!(header.len(), 512);
47//! # Ok(())
48//! # }
49//! ```
50
51use crate::store::StorageBackend;
52use crate::store::runtime::global_handle;
53use crate::store::utils::validate_url;
54use bytes::Bytes;
55use hexz_common::{Error, Result};
56use reqwest::Client;
57use std::io::{Error as IoError, ErrorKind};
58use tokio::runtime::Handle;
59
60/// HTTP storage backend with embedded Tokio runtime.
61///
62/// This backend wraps an async `reqwest::Client` and Tokio `Runtime` to provide
63/// synchronous `StorageBackend` operations while leveraging async I/O internally.
64/// It validates URLs for security, maintains a connection pool, and performs
65/// range requests to fetch specific byte ranges.
66///
67/// # Examples
68///
69/// ```no_run
70/// use hexz_core::store::http::HttpBackend;
71/// use hexz_core::store::StorageBackend;
72///
73/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
74/// let backend = HttpBackend::new(
75/// "https://example.com/snapshot.hxz".to_string(),
76/// false
77/// )?;
78///
79/// let data = backend.read_exact(8192, 4096)?;
80/// assert_eq!(data.len(), 4096);
81/// # Ok(())
82/// # }
83/// ```
84#[derive(Debug)]
85pub struct HttpBackend {
86 url: String,
87 client: Client,
88 len: u64,
89 handle: Handle,
90}
91
92impl HttpBackend {
93 /// Creates a new HTTP backend by validating the URL and fetching file metadata.
94 ///
95 /// This constructor:
96 /// 1. Validates the URL for security (blocks restricted IPs unless allowed)
97 /// 2. Creates a Tokio runtime for executing async operations
98 /// 3. Sends an async HEAD request to verify the server and fetch file size
99 /// 4. Extracts the `Content-Length` header to determine snapshot size
100 ///
101 /// # Parameters
102 ///
103 /// - `url`: The HTTP/HTTPS URL of the snapshot file
104 /// - `allow_restricted`: If `false`, blocks access to localhost and private networks
105 pub fn new(url: String, allow_restricted: bool) -> Result<Self> {
106 let safe_url = validate_url(&url, allow_restricted)?;
107
108 let handle = global_handle();
109
110 let client = Client::builder()
111 .build()
112 .map_err(|e| Error::Io(IoError::other(e)))?;
113
114 let len = handle.block_on(async {
115 let resp = client
116 .head(&safe_url)
117 .send()
118 .await
119 .map_err(|e| Error::Io(IoError::other(e)))?;
120
121 if !resp.status().is_success() {
122 return Err(Error::Io(IoError::other(format!(
123 "HTTP error: {}",
124 resp.status()
125 ))));
126 }
127
128 resp.headers()
129 .get(reqwest::header::CONTENT_LENGTH)
130 .and_then(|val| val.to_str().ok())
131 .and_then(|s| s.parse::<u64>().ok())
132 .ok_or_else(|| {
133 Error::Io(IoError::new(
134 ErrorKind::InvalidData,
135 "Missing Content-Length header",
136 ))
137 })
138 })?;
139
140 Ok(Self {
141 url: safe_url,
142 client,
143 len,
144 handle,
145 })
146 }
147}
148
149impl StorageBackend for HttpBackend {
150 fn read_exact(&self, offset: u64, len: usize) -> Result<Bytes> {
151 if len == 0 {
152 return Ok(Bytes::new());
153 }
154 let end = offset + len as u64 - 1;
155 let range_header = format!("bytes={}-{}", offset, end);
156
157 self.handle.block_on(async {
158 let resp = self
159 .client
160 .get(&self.url)
161 .header("Range", range_header)
162 .send()
163 .await
164 .map_err(|e| Error::Io(IoError::other(e)))?;
165
166 if !resp.status().is_success() {
167 return Err(Error::Io(IoError::other(format!(
168 "HTTP error: {}",
169 resp.status()
170 ))));
171 }
172
173 let bytes = resp
174 .bytes()
175 .await
176 .map_err(|e| Error::Io(IoError::other(e)))?;
177
178 if bytes.len() != len {
179 return Err(Error::Io(IoError::new(
180 ErrorKind::UnexpectedEof,
181 format!("Expected {} bytes, got {}", len, bytes.len()),
182 )));
183 }
184
185 Ok(bytes)
186 })
187 }
188
189 fn len(&self) -> u64 {
190 self.len
191 }
192}