sqlite_vtable_opendal/backends/
s3.rs1use crate::backends::StorageBackend;
7use crate::error::{Result, VTableError};
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::S3, EntryMode, Metakey, Operator};
12use std::path::Path;
13
14pub struct S3Backend {
37 bucket: String,
39 region: String,
41 access_key_id: Option<String>,
43 secret_access_key: Option<String>,
45 base_path: String,
47}
48
49impl S3Backend {
50 pub fn new(bucket: impl Into<String>, region: impl Into<String>) -> Self {
65 Self {
66 bucket: bucket.into(),
67 region: region.into(),
68 access_key_id: None,
69 secret_access_key: None,
70 base_path: "/".to_string(),
71 }
72 }
73
74 pub fn with_credentials(
90 mut self,
91 access_key_id: impl Into<String>,
92 secret_access_key: impl Into<String>,
93 ) -> Self {
94 self.access_key_id = Some(access_key_id.into());
95 self.secret_access_key = Some(secret_access_key.into());
96 self
97 }
98
99 pub fn with_base_path(mut self, path: impl Into<String>) -> Self {
114 self.base_path = path.into();
115 self
116 }
117
118 fn create_operator(&self) -> Result<Operator> {
120 let mut builder = S3::default()
121 .bucket(&self.bucket)
122 .region(&self.region)
123 .root(&self.base_path);
124
125 if let (Some(key_id), Some(secret)) = (&self.access_key_id, &self.secret_access_key) {
127 builder = builder.access_key_id(key_id).secret_access_key(secret);
128 }
129
130 Operator::new(builder)
131 .map(|op| op.finish())
132 .map_err(|e| VTableError::OpenDal(e))
133 }
134}
135
136#[async_trait]
137impl StorageBackend for S3Backend {
138 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
139 let operator = self.create_operator()?;
140 let mut results = Vec::new();
141
142 let normalized_path = if config.root_path.is_empty() || config.root_path == "/" {
144 "".to_string()
145 } else {
146 config.root_path.trim_matches('/').to_string()
147 };
148
149 let lister_builder = operator.lister_with(&normalized_path);
151
152 let mut lister = lister_builder
153 .recursive(config.recursive)
154 .metakey(
155 Metakey::ContentLength
156 | Metakey::ContentMd5
157 | Metakey::ContentType
158 | Metakey::Mode
159 | Metakey::LastModified
160 | Metakey::Etag,
161 )
162 .await
163 .map_err(|e| VTableError::OpenDal(e))?;
164
165 while let Some(entry) = lister.try_next().await.map_err(|e| VTableError::OpenDal(e))? {
167 let entry_path = entry.path();
168 let entry_mode = entry.metadata().mode();
169
170 if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
172 continue;
173 }
174
175 let full_path = if entry_path.starts_with('/') {
176 entry_path.to_string()
177 } else {
178 format!("/{}", entry_path)
179 };
180
181 let name = Path::new(&full_path)
183 .file_name()
184 .map(|s| s.to_string_lossy().to_string())
185 .unwrap_or_else(|| {
186 let clean_path = entry_path.trim_end_matches('/');
187 Path::new(clean_path)
188 .file_name()
189 .map(|s| s.to_string_lossy().to_string())
190 .unwrap_or_default()
191 });
192
193 if entry_mode == EntryMode::FILE {
194 let metadata = operator
196 .stat(&full_path)
197 .await
198 .map_err(|e| VTableError::OpenDal(e))?;
199
200 let content = if config.fetch_content {
202 operator
203 .read(&full_path)
204 .await
205 .ok()
206 .map(|bytes| bytes.to_vec())
207 } else {
208 None
209 };
210
211 results.push(FileMetadata {
212 name,
213 path: full_path.clone(),
214 size: metadata.content_length(),
215 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
216 etag: metadata
217 .etag()
218 .or_else(|| metadata.content_md5())
219 .map(|e| e.to_string()),
220 is_dir: false,
221 content_type: metadata
222 .content_type()
223 .map(|ct| ct.to_string())
224 .or_else(|| {
225 Path::new(&full_path)
226 .extension()
227 .and_then(|ext| ext.to_str())
228 .map(|ext| ext.to_string())
229 }),
230 content,
231 });
232
233 if let Some(limit) = config.limit {
235 if results.len() >= limit + config.offset {
236 break;
237 }
238 }
239 } else if entry_mode == EntryMode::DIR {
240 results.push(FileMetadata {
242 name,
243 path: full_path,
244 size: 0,
245 last_modified: None,
246 etag: None,
247 is_dir: true,
248 content_type: Some("directory".to_string()),
249 content: None,
250 });
251
252 if let Some(limit) = config.limit {
254 if results.len() >= limit + config.offset {
255 break;
256 }
257 }
258 }
259 }
260
261 if config.offset > 0 && config.offset < results.len() {
263 results = results.into_iter().skip(config.offset).collect();
264 }
265
266 Ok(results)
267 }
268
269 fn backend_name(&self) -> &'static str {
270 "s3"
271 }
272}
273
274pub fn register(
300 conn: &rusqlite::Connection,
301 module_name: &str,
302 bucket: impl Into<String>,
303 region: impl Into<String>,
304 access_key_id: impl Into<String>,
305 secret_access_key: impl Into<String>,
306) -> rusqlite::Result<()> {
307 use crate::types::{columns, QueryConfig};
308 use rusqlite::{
309 ffi,
310 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
311 };
312 use std::os::raw::c_int;
313
314 let bucket_name = bucket.into();
315 let region_name = region.into();
316 let key_id = access_key_id.into();
317 let secret = secret_access_key.into();
318
319 #[repr(C)]
321 struct S3Table {
322 base: ffi::sqlite3_vtab,
323 bucket: String,
324 region: String,
325 access_key_id: String,
326 secret_access_key: String,
327 }
328
329 #[repr(C)]
331 struct S3Cursor {
332 base: ffi::sqlite3_vtab_cursor,
333 files: Vec<crate::types::FileMetadata>,
334 current_row: usize,
335 bucket: String,
336 region: String,
337 access_key_id: String,
338 secret_access_key: String,
339 }
340
341 impl S3Cursor {
342 fn new(
343 bucket: String,
344 region: String,
345 access_key_id: String,
346 secret_access_key: String,
347 ) -> Self {
348 Self {
349 base: ffi::sqlite3_vtab_cursor::default(),
350 files: Vec::new(),
351 current_row: 0,
352 bucket,
353 region,
354 access_key_id,
355 secret_access_key,
356 }
357 }
358 }
359
360 unsafe impl VTabCursor for S3Cursor {
361 fn filter(
362 &mut self,
363 _idx_num: c_int,
364 _idx_str: Option<&str>,
365 _args: &vtab::Values<'_>,
366 ) -> rusqlite::Result<()> {
367 let mut backend = S3Backend::new(&self.bucket, &self.region);
369 if !self.access_key_id.is_empty() {
370 backend = backend.with_credentials(&self.access_key_id, &self.secret_access_key);
371 }
372 let config = QueryConfig::default();
373
374 let files = tokio::task::block_in_place(|| {
376 tokio::runtime::Handle::current().block_on(async {
377 backend.list_files(&config).await
378 })
379 })
380 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
381
382 self.files = files;
383 self.current_row = 0;
384 Ok(())
385 }
386
387 fn next(&mut self) -> rusqlite::Result<()> {
388 self.current_row += 1;
389 Ok(())
390 }
391
392 fn eof(&self) -> bool {
393 self.current_row >= self.files.len()
394 }
395
396 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
397 if self.current_row >= self.files.len() {
398 return Ok(());
399 }
400
401 let file = &self.files[self.current_row];
402
403 match col_index {
404 columns::PATH => ctx.set_result(&file.path),
405 columns::SIZE => ctx.set_result(&(file.size as i64)),
406 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
407 columns::ETAG => ctx.set_result(&file.etag),
408 columns::IS_DIR => ctx.set_result(&file.is_dir),
409 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
410 columns::NAME => ctx.set_result(&file.name),
411 columns::CONTENT => {
412 if let Some(ref content) = file.content {
413 ctx.set_result(&content.as_slice())
414 } else {
415 ctx.set_result::<Option<&[u8]>>(&None)
416 }
417 }
418 _ => Ok(()),
419 }
420 }
421
422 fn rowid(&self) -> rusqlite::Result<i64> {
423 Ok(self.current_row as i64)
424 }
425 }
426
427 impl vtab::CreateVTab<'_> for S3Table {
428 const KIND: VTabKind = VTabKind::EponymousOnly;
429 }
430
431 unsafe impl VTab<'_> for S3Table {
432 type Aux = (String, String, String, String);
433 type Cursor = S3Cursor;
434
435 fn connect(
436 _db: &mut vtab::VTabConnection,
437 aux: Option<&Self::Aux>,
438 _args: &[&[u8]],
439 ) -> rusqlite::Result<(String, Self)> {
440 let schema = "
441 CREATE TABLE x(
442 path TEXT,
443 size INTEGER,
444 last_modified TEXT,
445 etag TEXT,
446 is_dir INTEGER,
447 content_type TEXT,
448 name TEXT,
449 content BLOB
450 )
451 ";
452
453 let (bucket, region, access_key_id, secret_access_key) =
454 if let Some((b, r, k, s)) = aux {
455 (b.clone(), r.clone(), k.clone(), s.clone())
456 } else {
457 (
458 "".to_string(),
459 "us-east-1".to_string(),
460 "".to_string(),
461 "".to_string(),
462 )
463 };
464
465 Ok((
466 schema.to_owned(),
467 S3Table {
468 base: ffi::sqlite3_vtab::default(),
469 bucket,
470 region,
471 access_key_id,
472 secret_access_key,
473 },
474 ))
475 }
476
477 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
478 info.set_estimated_cost(1000.0);
479 Ok(())
480 }
481
482 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
483 Ok(S3Cursor::new(
484 self.bucket.clone(),
485 self.region.clone(),
486 self.access_key_id.clone(),
487 self.secret_access_key.clone(),
488 ))
489 }
490 }
491
492 conn.create_module(
493 module_name,
494 eponymous_only_module::<S3Table>(),
495 Some((bucket_name, region_name, key_id, secret)),
496 )
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_backend_creation() {
505 let backend = S3Backend::new("my-bucket", "us-east-1");
506 assert_eq!(backend.bucket, "my-bucket");
507 assert_eq!(backend.region, "us-east-1");
508 assert_eq!(backend.backend_name(), "s3");
509 assert!(backend.access_key_id.is_none());
510 assert!(backend.secret_access_key.is_none());
511 }
512
513 #[test]
514 fn test_backend_with_credentials() {
515 let backend = S3Backend::new("my-bucket", "us-west-2")
516 .with_credentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI");
517 assert_eq!(backend.bucket, "my-bucket");
518 assert_eq!(backend.region, "us-west-2");
519 assert_eq!(
520 backend.access_key_id,
521 Some("AKIAIOSFODNN7EXAMPLE".to_string())
522 );
523 assert_eq!(backend.secret_access_key, Some("wJalrXUtnFEMI".to_string()));
524 }
525
526 #[test]
527 fn test_backend_with_base_path() {
528 let backend = S3Backend::new("my-bucket", "eu-west-1").with_base_path("data/2024/");
529 assert_eq!(backend.base_path, "data/2024/");
530 }
531
532 #[test]
533 fn test_backend_builder_pattern() {
534 let backend = S3Backend::new("test-bucket", "ap-south-1")
535 .with_credentials("key", "secret")
536 .with_base_path("logs/");
537 assert_eq!(backend.bucket, "test-bucket");
538 assert_eq!(backend.region, "ap-south-1");
539 assert_eq!(backend.access_key_id, Some("key".to_string()));
540 assert_eq!(backend.base_path, "logs/");
541 }
542
543 }