geocog/
s3.rs

1//! S3 Range Reader implementation using `object_store`
2//!
3//! This module provides S3-compatible storage access for reading COG files.
4//! It supports:
5//! - AWS S3
6//! - MinIO
7//! - Any S3-compatible storage (DigitalOcean Spaces, Backblaze B2, etc.)
8//!
9//! # Configuration
10//!
11//! The reader can be configured via environment variables:
12//! - `AWS_ACCESS_KEY_ID` - AWS access key
13//! - `AWS_SECRET_ACCESS_KEY` - AWS secret key
14//! - `AWS_REGION` - AWS region (default: us-east-1)
15//! - `AWS_ENDPOINT_URL` - Custom endpoint for MinIO/S3-compatible services
16//! - `AWS_ALLOW_HTTP` - Set to "true" to allow HTTP endpoints (for local MinIO)
17//!
18//! # Example
19//!
20//! ```rust,no_run
21//! use geocog::S3RangeReaderAsync;
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
25//!     // For AWS S3
26//!     let reader = S3RangeReaderAsync::new("s3://my-bucket/path/to/file.tif").await?;
27//!
28//!     // For MinIO (set AWS_ENDPOINT_URL=http://localhost:9000)
29//!     // std::env::set_var("AWS_ENDPOINT_URL", "http://localhost:9000");
30//!     // std::env::set_var("AWS_ALLOW_HTTP", "true");
31//!     // let reader = S3RangeReaderAsync::new("s3://my-bucket/path/to/file.tif").await?;
32//!
33//!     Ok(())
34//! }
35//! ```
36
37use crate::range_reader::RangeReader;
38use crate::tiff_utils::AnyResult;
39use object_store::aws::AmazonS3Builder;
40use object_store::path::Path as ObjectPath;
41use object_store::{GetOptions, GetRange, ObjectStore};
42use std::ops::Range;
43use std::sync::Arc;
44use tokio::runtime::Handle;
45
46/// S3 configuration for connecting to S3-compatible storage
47#[derive(Debug, Clone)]
48pub struct S3Config {
49    /// S3 bucket name
50    pub bucket: String,
51    /// Object key (path within the bucket)
52    pub key: String,
53    /// AWS region (default: us-east-1)
54    pub region: Option<String>,
55    /// Custom endpoint URL (for MinIO, LocalStack, etc.)
56    pub endpoint_url: Option<String>,
57    /// AWS access key ID
58    pub access_key_id: Option<String>,
59    /// AWS secret access key
60    pub secret_access_key: Option<String>,
61    /// Allow HTTP connections (required for local MinIO without TLS)
62    pub allow_http: bool,
63    /// Skip signature verification (for anonymous access to public buckets)
64    pub skip_signature: bool,
65}
66
67impl S3Config {
68    /// Create a new S3 config from an S3 URL
69    ///
70    /// Parses URLs like `s3://bucket/key/path`
71    pub fn from_url(url: &str) -> AnyResult<Self> {
72        let parsed = url::Url::parse(url)?;
73
74        if parsed.scheme() != "s3" {
75            return Err(format!("Expected s3:// URL, got: {}", parsed.scheme()).into());
76        }
77
78        let bucket = parsed
79            .host_str()
80            .ok_or("Missing bucket in S3 URL")?
81            .to_string();
82
83        let key = parsed.path().trim_start_matches('/').to_string();
84
85        if key.is_empty() {
86            return Err("Missing key in S3 URL".into());
87        }
88
89        Ok(Self {
90            bucket,
91            key,
92            region: std::env::var("AWS_REGION").ok().or(Some("us-east-1".to_string())),
93            endpoint_url: std::env::var("AWS_ENDPOINT_URL").ok(),
94            access_key_id: std::env::var("AWS_ACCESS_KEY_ID").ok(),
95            secret_access_key: std::env::var("AWS_SECRET_ACCESS_KEY").ok(),
96            allow_http: std::env::var("AWS_ALLOW_HTTP")
97                .map(|v| v.to_lowercase() == "true")
98                .unwrap_or(false),
99            skip_signature: std::env::var("AWS_SKIP_SIGNATURE")
100                .map(|v| v.to_lowercase() == "true")
101                .unwrap_or(false),
102        })
103    }
104
105    /// Create a config for MinIO with default local settings
106    pub fn for_minio(bucket: &str, key: &str, endpoint: &str) -> Self {
107        Self {
108            bucket: bucket.to_string(),
109            key: key.to_string(),
110            region: Some("us-east-1".to_string()),
111            endpoint_url: Some(endpoint.to_string()),
112            access_key_id: std::env::var("AWS_ACCESS_KEY_ID").ok(),
113            secret_access_key: std::env::var("AWS_SECRET_ACCESS_KEY").ok(),
114            allow_http: true,
115            skip_signature: false,
116        }
117    }
118}
119
120/// Async S3 range reader using `object_store`
121pub struct S3RangeReaderAsync {
122    store: Arc<dyn ObjectStore>,
123    path: ObjectPath,
124    size: u64,
125    url: String,
126}
127
128impl S3RangeReaderAsync {
129    /// Create a new S3 range reader from an S3 URL
130    ///
131    /// # Example
132    ///
133    /// ```rust,no_run
134    /// use geocog::S3RangeReaderAsync;
135    ///
136    /// # async fn example() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
137    /// let reader = S3RangeReaderAsync::new("s3://my-bucket/data/file.tif").await?;
138    /// println!("File size: {} bytes", reader.size());
139    /// # Ok(())
140    /// # }
141    /// ```
142    pub async fn new(url: &str) -> AnyResult<Self> {
143        let config = S3Config::from_url(url)?;
144        Self::from_config(config).await
145    }
146
147    /// Create a new S3 range reader from a config
148    pub async fn from_config(config: S3Config) -> AnyResult<Self> {
149        let mut builder = AmazonS3Builder::new()
150            .with_bucket_name(&config.bucket);
151
152        if let Some(region) = &config.region {
153            builder = builder.with_region(region);
154        }
155
156        if let Some(endpoint) = &config.endpoint_url {
157            builder = builder.with_endpoint(endpoint);
158        }
159
160        if let Some(access_key) = &config.access_key_id {
161            builder = builder.with_access_key_id(access_key);
162        }
163
164        if let Some(secret_key) = &config.secret_access_key {
165            builder = builder.with_secret_access_key(secret_key);
166        }
167
168        if config.allow_http {
169            builder = builder.with_allow_http(true);
170        }
171
172        if config.skip_signature {
173            builder = builder.with_skip_signature(true);
174        }
175
176        let store = builder.build()?;
177        let path = ObjectPath::from(config.key.as_str());
178
179        // Get file size via HEAD request
180        let meta = store.head(&path).await?;
181        let size = meta.size as u64;
182
183        let url = format!("s3://{}/{}", config.bucket, config.key);
184
185        Ok(Self {
186            store: Arc::new(store),
187            path,
188            size,
189            url,
190        })
191    }
192
193    /// Read a range of bytes asynchronously
194    pub async fn read_range_async(&self, offset: u64, length: usize) -> AnyResult<Vec<u8>> {
195        let range = Range {
196            start: offset,
197            end: offset + length as u64,
198        };
199
200        let options = GetOptions {
201            range: Some(GetRange::Bounded(range)),
202            ..Default::default()
203        };
204
205        let result = self.store.get_opts(&self.path, options).await?;
206        let bytes = result.bytes().await?;
207
208        Ok(bytes.to_vec())
209    }
210
211    /// Get the file size
212    pub fn size(&self) -> u64 {
213        self.size
214    }
215
216    /// Get the S3 URL
217    pub fn url(&self) -> &str {
218        &self.url
219    }
220}
221
222/// Synchronous wrapper for S3RangeReaderAsync that implements RangeReader trait
223pub struct S3RangeReaderSync {
224    inner: S3RangeReaderAsync,
225    runtime: Handle,
226}
227
228impl S3RangeReaderSync {
229    /// Create a new sync S3 range reader
230    ///
231    /// Must be called from within a tokio runtime context
232    pub fn new(url: &str) -> AnyResult<Self> {
233        let runtime = Handle::try_current()
234            .map_err(|_| "S3RangeReaderSync must be created within a tokio runtime")?;
235
236        let inner = runtime.block_on(S3RangeReaderAsync::new(url))?;
237
238        Ok(Self { inner, runtime })
239    }
240
241    /// Create from an existing async reader
242    pub fn from_async(inner: S3RangeReaderAsync) -> AnyResult<Self> {
243        let runtime = Handle::try_current()
244            .map_err(|_| "S3RangeReaderSync must be created within a tokio runtime")?;
245
246        Ok(Self { inner, runtime })
247    }
248}
249
250impl RangeReader for S3RangeReaderSync {
251    fn read_range(&self, offset: u64, length: usize) -> AnyResult<Vec<u8>> {
252        self.runtime.block_on(self.inner.read_range_async(offset, length))
253    }
254
255    fn size(&self) -> u64 {
256        self.inner.size
257    }
258
259    fn identifier(&self) -> &str {
260        &self.inner.url
261    }
262
263    fn is_local(&self) -> bool {
264        false
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn test_s3_config_from_url() {
274        let config = S3Config::from_url("s3://my-bucket/path/to/file.tif").unwrap();
275        assert_eq!(config.bucket, "my-bucket");
276        assert_eq!(config.key, "path/to/file.tif");
277    }
278
279    #[test]
280    fn test_s3_config_from_url_simple() {
281        let config = S3Config::from_url("s3://bucket/file.tif").unwrap();
282        assert_eq!(config.bucket, "bucket");
283        assert_eq!(config.key, "file.tif");
284    }
285
286    #[test]
287    fn test_s3_config_invalid_scheme() {
288        let result = S3Config::from_url("http://bucket/file.tif");
289        assert!(result.is_err());
290    }
291
292    #[test]
293    fn test_s3_config_missing_key() {
294        let result = S3Config::from_url("s3://bucket/");
295        assert!(result.is_err());
296    }
297
298    #[test]
299    fn test_minio_config() {
300        let config = S3Config::for_minio("test-bucket", "data/test.tif", "http://localhost:9000");
301        assert_eq!(config.bucket, "test-bucket");
302        assert_eq!(config.key, "data/test.tif");
303        assert_eq!(config.endpoint_url, Some("http://localhost:9000".to_string()));
304        assert!(config.allow_http);
305    }
306}