Skip to main content

sqlite_vtable_opendal/backends/
s3.rs

1//! AWS S3 storage backend implementation
2//!
3//! This backend allows querying S3 buckets and objects using SQL.
4//! Requires AWS credentials and bucket configuration.
5
6use crate::backends::StorageBackend;
7use crate::error::{Result, VTableError};
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::S3, EntryMode, Metakey, Operator};
12use std::path::Path;
13
14/// AWS S3 storage backend
15///
16/// This backend uses OpenDAL's S3 service to list objects from S3 buckets.
17///
18/// # Authentication
19///
20/// Requires AWS credentials. Can use:
21/// - Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
22/// - IAM roles (when running on EC2/ECS)
23/// - Explicit credentials passed to constructor
24///
25/// # Example
26///
27/// ```rust,ignore
28/// use sqlite_vtable_opendal::backends::s3::S3Backend;
29/// use sqlite_vtable_opendal::types::QueryConfig;
30///
31/// let backend = S3Backend::new("my-bucket", "us-east-1")
32///     .with_credentials("access_key", "secret_key");
33/// let config = QueryConfig::default();
34/// let files = backend.list_files(&config).await?;
35/// ```
36pub struct S3Backend {
37    /// S3 bucket name
38    bucket: String,
39    /// AWS region
40    region: String,
41    /// AWS access key ID (optional, can use env vars or IAM)
42    access_key_id: Option<String>,
43    /// AWS secret access key (optional)
44    secret_access_key: Option<String>,
45    /// Base path/prefix in bucket
46    base_path: String,
47}
48
49impl S3Backend {
50    /// Create a new S3 backend
51    ///
52    /// # Arguments
53    ///
54    /// * `bucket` - S3 bucket name
55    /// * `region` - AWS region (e.g., "us-east-1")
56    ///
57    /// # Example
58    ///
59    /// ```
60    /// use sqlite_vtable_opendal::backends::s3::S3Backend;
61    ///
62    /// let backend = S3Backend::new("my-bucket", "us-east-1");
63    /// ```
64    pub fn new(bucket: impl Into<String>, region: impl Into<String>) -> Self {
65        Self {
66            bucket: bucket.into(),
67            region: region.into(),
68            access_key_id: None,
69            secret_access_key: None,
70            base_path: "/".to_string(),
71        }
72    }
73
74    /// Set AWS credentials explicitly
75    ///
76    /// # Arguments
77    ///
78    /// * `access_key_id` - AWS access key ID
79    /// * `secret_access_key` - AWS secret access key
80    ///
81    /// # Example
82    ///
83    /// ```
84    /// use sqlite_vtable_opendal::backends::s3::S3Backend;
85    ///
86    /// let backend = S3Backend::new("my-bucket", "us-east-1")
87    ///     .with_credentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
88    /// ```
89    pub fn with_credentials(
90        mut self,
91        access_key_id: impl Into<String>,
92        secret_access_key: impl Into<String>,
93    ) -> Self {
94        self.access_key_id = Some(access_key_id.into());
95        self.secret_access_key = Some(secret_access_key.into());
96        self
97    }
98
99    /// Set base path/prefix in bucket
100    ///
101    /// # Arguments
102    ///
103    /// * `path` - Base path (e.g., "data/2024/")
104    ///
105    /// # Example
106    ///
107    /// ```
108    /// use sqlite_vtable_opendal::backends::s3::S3Backend;
109    ///
110    /// let backend = S3Backend::new("my-bucket", "us-east-1")
111    ///     .with_base_path("data/2024/");
112    /// ```
113    pub fn with_base_path(mut self, path: impl Into<String>) -> Self {
114        self.base_path = path.into();
115        self
116    }
117
118    /// Create an OpenDAL operator for S3
119    fn create_operator(&self) -> Result<Operator> {
120        let mut builder = S3::default()
121            .bucket(&self.bucket)
122            .region(&self.region)
123            .root(&self.base_path);
124
125        // Add credentials if provided
126        if let (Some(key_id), Some(secret)) = (&self.access_key_id, &self.secret_access_key) {
127            builder = builder.access_key_id(key_id).secret_access_key(secret);
128        }
129
130        Operator::new(builder)
131            .map(|op| op.finish())
132            .map_err(|e| VTableError::OpenDal(e))
133    }
134}
135
136#[async_trait]
137impl StorageBackend for S3Backend {
138    async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
139        let operator = self.create_operator()?;
140        let mut results = Vec::new();
141
142        // Normalize the path
143        let normalized_path = if config.root_path.is_empty() || config.root_path == "/" {
144            "".to_string()
145        } else {
146            config.root_path.trim_matches('/').to_string()
147        };
148
149        // Create lister with metadata keys
150        let lister_builder = operator.lister_with(&normalized_path);
151
152        let mut lister = lister_builder
153            .recursive(config.recursive)
154            .metakey(
155                Metakey::ContentLength
156                    | Metakey::ContentMd5
157                    | Metakey::ContentType
158                    | Metakey::Mode
159                    | Metakey::LastModified
160                    | Metakey::Etag,
161            )
162            .await
163            .map_err(|e| VTableError::OpenDal(e))?;
164
165        // Iterate through entries
166        while let Some(entry) = lister.try_next().await.map_err(|e| VTableError::OpenDal(e))? {
167            let entry_path = entry.path();
168            let entry_mode = entry.metadata().mode();
169
170            // Skip the root directory itself
171            if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
172                continue;
173            }
174
175            let full_path = if entry_path.starts_with('/') {
176                entry_path.to_string()
177            } else {
178                format!("/{}", entry_path)
179            };
180
181            // Extract object name from path
182            let name = Path::new(&full_path)
183                .file_name()
184                .map(|s| s.to_string_lossy().to_string())
185                .unwrap_or_else(|| {
186                    let clean_path = entry_path.trim_end_matches('/');
187                    Path::new(clean_path)
188                        .file_name()
189                        .map(|s| s.to_string_lossy().to_string())
190                        .unwrap_or_default()
191                });
192
193            if entry_mode == EntryMode::FILE {
194                // Fetch detailed metadata for files
195                let metadata = operator
196                    .stat(&full_path)
197                    .await
198                    .map_err(|e| VTableError::OpenDal(e))?;
199
200                // Optionally fetch content
201                let content = if config.fetch_content {
202                    operator
203                        .read(&full_path)
204                        .await
205                        .ok()
206                        .map(|bytes| bytes.to_vec())
207                } else {
208                    None
209                };
210
211                results.push(FileMetadata {
212                    name,
213                    path: full_path.clone(),
214                    size: metadata.content_length(),
215                    last_modified: metadata.last_modified().map(|dt| dt.to_string()),
216                    etag: metadata
217                        .etag()
218                        .or_else(|| metadata.content_md5())
219                        .map(|e| e.to_string()),
220                    is_dir: false,
221                    content_type: metadata
222                        .content_type()
223                        .map(|ct| ct.to_string())
224                        .or_else(|| {
225                            Path::new(&full_path)
226                                .extension()
227                                .and_then(|ext| ext.to_str())
228                                .map(|ext| ext.to_string())
229                        }),
230                    content,
231                });
232
233                // Apply limit if specified
234                if let Some(limit) = config.limit {
235                    if results.len() >= limit + config.offset {
236                        break;
237                    }
238                }
239            } else if entry_mode == EntryMode::DIR {
240                // Add directory entry (S3 "folders")
241                results.push(FileMetadata {
242                    name,
243                    path: full_path,
244                    size: 0,
245                    last_modified: None,
246                    etag: None,
247                    is_dir: true,
248                    content_type: Some("directory".to_string()),
249                    content: None,
250                });
251
252                // Apply limit if specified
253                if let Some(limit) = config.limit {
254                    if results.len() >= limit + config.offset {
255                        break;
256                    }
257                }
258            }
259        }
260
261        // Apply offset
262        if config.offset > 0 && config.offset < results.len() {
263            results = results.into_iter().skip(config.offset).collect();
264        }
265
266        Ok(results)
267    }
268
269    fn backend_name(&self) -> &'static str {
270        "s3"
271    }
272}
273
274/// Register the S3 virtual table with SQLite
275///
276/// This function creates a virtual table module that allows querying
277/// S3 objects using SQL.
278///
279/// # Arguments
280///
281/// * `conn` - SQLite connection
282/// * `module_name` - Name for the virtual table (e.g., "s3_files")
283/// * `bucket` - S3 bucket name
284/// * `region` - AWS region
285/// * `access_key_id` - AWS access key ID (optional, use "" for IAM/env)
286/// * `secret_access_key` - AWS secret access key (optional)
287///
288/// # Example
289///
290/// ```rust,ignore
291/// use rusqlite::Connection;
292/// use sqlite_vtable_opendal::backends::s3;
293///
294/// let conn = Connection::open_in_memory()?;
295/// s3::register(&conn, "s3_files", "my-bucket", "us-east-1", "KEY_ID", "SECRET")?;
296///
297/// // Now you can query: SELECT * FROM s3_files
298/// ```
299pub fn register(
300    conn: &rusqlite::Connection,
301    module_name: &str,
302    bucket: impl Into<String>,
303    region: impl Into<String>,
304    access_key_id: impl Into<String>,
305    secret_access_key: impl Into<String>,
306) -> rusqlite::Result<()> {
307    use crate::types::{columns, QueryConfig};
308    use rusqlite::{
309        ffi,
310        vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
311    };
312    use std::os::raw::c_int;
313
314    let bucket_name = bucket.into();
315    let region_name = region.into();
316    let key_id = access_key_id.into();
317    let secret = secret_access_key.into();
318
319    // Create a specific table type for S3
320    #[repr(C)]
321    struct S3Table {
322        base: ffi::sqlite3_vtab,
323        bucket: String,
324        region: String,
325        access_key_id: String,
326        secret_access_key: String,
327    }
328
329    // Create a specific cursor type for S3
330    #[repr(C)]
331    struct S3Cursor {
332        base: ffi::sqlite3_vtab_cursor,
333        files: Vec<crate::types::FileMetadata>,
334        current_row: usize,
335        bucket: String,
336        region: String,
337        access_key_id: String,
338        secret_access_key: String,
339    }
340
341    impl S3Cursor {
342        fn new(
343            bucket: String,
344            region: String,
345            access_key_id: String,
346            secret_access_key: String,
347        ) -> Self {
348            Self {
349                base: ffi::sqlite3_vtab_cursor::default(),
350                files: Vec::new(),
351                current_row: 0,
352                bucket,
353                region,
354                access_key_id,
355                secret_access_key,
356            }
357        }
358    }
359
360    unsafe impl VTabCursor for S3Cursor {
361        fn filter(
362            &mut self,
363            _idx_num: c_int,
364            _idx_str: Option<&str>,
365            _args: &vtab::Values<'_>,
366        ) -> rusqlite::Result<()> {
367            // Create backend and fetch files
368            let mut backend = S3Backend::new(&self.bucket, &self.region);
369            if !self.access_key_id.is_empty() {
370                backend = backend.with_credentials(&self.access_key_id, &self.secret_access_key);
371            }
372            let config = QueryConfig::default();
373
374            // Fetch files from the backend (blocking the async call)
375            let files = tokio::task::block_in_place(|| {
376                tokio::runtime::Handle::current().block_on(async {
377                    backend.list_files(&config).await
378                })
379            })
380            .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
381
382            self.files = files;
383            self.current_row = 0;
384            Ok(())
385        }
386
387        fn next(&mut self) -> rusqlite::Result<()> {
388            self.current_row += 1;
389            Ok(())
390        }
391
392        fn eof(&self) -> bool {
393            self.current_row >= self.files.len()
394        }
395
396        fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
397            if self.current_row >= self.files.len() {
398                return Ok(());
399            }
400
401            let file = &self.files[self.current_row];
402
403            match col_index {
404                columns::PATH => ctx.set_result(&file.path),
405                columns::SIZE => ctx.set_result(&(file.size as i64)),
406                columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
407                columns::ETAG => ctx.set_result(&file.etag),
408                columns::IS_DIR => ctx.set_result(&file.is_dir),
409                columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
410                columns::NAME => ctx.set_result(&file.name),
411                columns::CONTENT => {
412                    if let Some(ref content) = file.content {
413                        ctx.set_result(&content.as_slice())
414                    } else {
415                        ctx.set_result::<Option<&[u8]>>(&None)
416                    }
417                }
418                _ => Ok(()),
419            }
420        }
421
422        fn rowid(&self) -> rusqlite::Result<i64> {
423            Ok(self.current_row as i64)
424        }
425    }
426
427    impl vtab::CreateVTab<'_> for S3Table {
428        const KIND: VTabKind = VTabKind::EponymousOnly;
429    }
430
431    unsafe impl VTab<'_> for S3Table {
432        type Aux = (String, String, String, String);
433        type Cursor = S3Cursor;
434
435        fn connect(
436            _db: &mut vtab::VTabConnection,
437            aux: Option<&Self::Aux>,
438            _args: &[&[u8]],
439        ) -> rusqlite::Result<(String, Self)> {
440            let schema = "
441                CREATE TABLE x(
442                    path TEXT,
443                    size INTEGER,
444                    last_modified TEXT,
445                    etag TEXT,
446                    is_dir INTEGER,
447                    content_type TEXT,
448                    name TEXT,
449                    content BLOB
450                )
451            ";
452
453            let (bucket, region, access_key_id, secret_access_key) =
454                if let Some((b, r, k, s)) = aux {
455                    (b.clone(), r.clone(), k.clone(), s.clone())
456                } else {
457                    (
458                        "".to_string(),
459                        "us-east-1".to_string(),
460                        "".to_string(),
461                        "".to_string(),
462                    )
463                };
464
465            Ok((
466                schema.to_owned(),
467                S3Table {
468                    base: ffi::sqlite3_vtab::default(),
469                    bucket,
470                    region,
471                    access_key_id,
472                    secret_access_key,
473                },
474            ))
475        }
476
477        fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
478            info.set_estimated_cost(1000.0);
479            Ok(())
480        }
481
482        fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
483            Ok(S3Cursor::new(
484                self.bucket.clone(),
485                self.region.clone(),
486                self.access_key_id.clone(),
487                self.secret_access_key.clone(),
488            ))
489        }
490    }
491
492    conn.create_module(
493        module_name,
494        eponymous_only_module::<S3Table>(),
495        Some((bucket_name, region_name, key_id, secret)),
496    )
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    #[test]
504    fn test_backend_creation() {
505        let backend = S3Backend::new("my-bucket", "us-east-1");
506        assert_eq!(backend.bucket, "my-bucket");
507        assert_eq!(backend.region, "us-east-1");
508        assert_eq!(backend.backend_name(), "s3");
509        assert!(backend.access_key_id.is_none());
510        assert!(backend.secret_access_key.is_none());
511    }
512
513    #[test]
514    fn test_backend_with_credentials() {
515        let backend = S3Backend::new("my-bucket", "us-west-2")
516            .with_credentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI");
517        assert_eq!(backend.bucket, "my-bucket");
518        assert_eq!(backend.region, "us-west-2");
519        assert_eq!(
520            backend.access_key_id,
521            Some("AKIAIOSFODNN7EXAMPLE".to_string())
522        );
523        assert_eq!(backend.secret_access_key, Some("wJalrXUtnFEMI".to_string()));
524    }
525
526    #[test]
527    fn test_backend_with_base_path() {
528        let backend = S3Backend::new("my-bucket", "eu-west-1").with_base_path("data/2024/");
529        assert_eq!(backend.base_path, "data/2024/");
530    }
531
532    #[test]
533    fn test_backend_builder_pattern() {
534        let backend = S3Backend::new("test-bucket", "ap-south-1")
535            .with_credentials("key", "secret")
536            .with_base_path("logs/");
537        assert_eq!(backend.bucket, "test-bucket");
538        assert_eq!(backend.region, "ap-south-1");
539        assert_eq!(backend.access_key_id, Some("key".to_string()));
540        assert_eq!(backend.base_path, "logs/");
541    }
542
543    // Note: Integration tests with actual S3 would require credentials
544    // and are better suited for manual testing or CI with secrets
545}