sqlite_vtable_opendal/backends/
s3.rs1use crate::backends::StorageBackend;
7use crate::error::Result;
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::S3, EntryMode, Metakey, Operator};
12use std::path::Path;
13
14pub struct S3Backend {
37 bucket: String,
39 region: String,
41 access_key_id: Option<String>,
43 secret_access_key: Option<String>,
45 base_path: String,
47}
48
49impl S3Backend {
50 pub fn new(bucket: impl Into<String>, region: impl Into<String>) -> Self {
65 Self {
66 bucket: bucket.into(),
67 region: region.into(),
68 access_key_id: None,
69 secret_access_key: None,
70 base_path: "/".to_string(),
71 }
72 }
73
74 pub fn with_credentials(
90 mut self,
91 access_key_id: impl Into<String>,
92 secret_access_key: impl Into<String>,
93 ) -> Self {
94 self.access_key_id = Some(access_key_id.into());
95 self.secret_access_key = Some(secret_access_key.into());
96 self
97 }
98
99 pub fn with_base_path(mut self, path: impl Into<String>) -> Self {
114 self.base_path = path.into();
115 self
116 }
117
118 fn create_operator(&self) -> Result<Operator> {
120 let mut builder = S3::default()
121 .bucket(&self.bucket)
122 .region(&self.region)
123 .root(&self.base_path);
124
125 if let (Some(key_id), Some(secret)) = (&self.access_key_id, &self.secret_access_key) {
127 builder = builder.access_key_id(key_id).secret_access_key(secret);
128 }
129
130 Ok(Operator::new(builder)?
131 .finish())
132 }
133}
134
135#[async_trait]
136impl StorageBackend for S3Backend {
137 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
138 let operator = self.create_operator()?;
139 let mut results = Vec::new();
140
141 let normalized_path = if config.root_path.is_empty() || config.root_path == "/" {
143 "".to_string()
144 } else {
145 config.root_path.trim_matches('/').to_string()
146 };
147
148 let lister_builder = operator.lister_with(&normalized_path);
150
151 let mut lister = lister_builder
152 .recursive(config.recursive)
153 .metakey(
154 Metakey::ContentLength
155 | Metakey::ContentMd5
156 | Metakey::ContentType
157 | Metakey::Mode
158 | Metakey::LastModified
159 | Metakey::Etag,
160 )
161 .await
162 ?;
163
164 while let Some(entry) = lister.try_next().await? {
166 let entry_path = entry.path();
167 let entry_mode = entry.metadata().mode();
168
169 if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
171 continue;
172 }
173
174 let full_path = if entry_path.starts_with('/') {
175 entry_path.to_string()
176 } else {
177 format!("/{}", entry_path)
178 };
179
180 let name = Path::new(&full_path)
182 .file_name()
183 .map(|s| s.to_string_lossy().to_string())
184 .unwrap_or_else(|| {
185 let clean_path = entry_path.trim_end_matches('/');
186 Path::new(clean_path)
187 .file_name()
188 .map(|s| s.to_string_lossy().to_string())
189 .unwrap_or_default()
190 });
191
192 if entry_mode == EntryMode::FILE {
193 let metadata = operator
195 .stat(&full_path)
196 .await
197 ?;
198
199 let content = if config.fetch_content {
201 operator
202 .read(&full_path)
203 .await
204 .ok()
205 .map(|bytes| bytes.to_vec())
206 } else {
207 None
208 };
209
210 results.push(FileMetadata {
211 name,
212 path: full_path.clone(),
213 size: metadata.content_length(),
214 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
215 etag: metadata
216 .etag()
217 .or_else(|| metadata.content_md5())
218 .map(|e| e.to_string()),
219 is_dir: false,
220 content_type: metadata
221 .content_type()
222 .map(|ct| ct.to_string())
223 .or_else(|| {
224 Path::new(&full_path)
225 .extension()
226 .and_then(|ext| ext.to_str())
227 .map(|ext| ext.to_string())
228 }),
229 content,
230 });
231
232 if let Some(limit) = config.limit {
234 if results.len() >= limit + config.offset {
235 break;
236 }
237 }
238 } else if entry_mode == EntryMode::DIR {
239 results.push(FileMetadata {
241 name,
242 path: full_path,
243 size: 0,
244 last_modified: None,
245 etag: None,
246 is_dir: true,
247 content_type: Some("directory".to_string()),
248 content: None,
249 });
250
251 if let Some(limit) = config.limit {
253 if results.len() >= limit + config.offset {
254 break;
255 }
256 }
257 }
258 }
259
260 if config.offset > 0 && config.offset < results.len() {
262 results = results.into_iter().skip(config.offset).collect();
263 }
264
265 Ok(results)
266 }
267
268 fn backend_name(&self) -> &'static str {
269 "s3"
270 }
271}
272
273pub fn register(
299 conn: &rusqlite::Connection,
300 module_name: &str,
301 bucket: impl Into<String>,
302 region: impl Into<String>,
303 access_key_id: impl Into<String>,
304 secret_access_key: impl Into<String>,
305) -> rusqlite::Result<()> {
306 use crate::types::{columns, QueryConfig};
307 use rusqlite::{
308 ffi,
309 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
310 };
311 use std::os::raw::c_int;
312
313 let bucket_name = bucket.into();
314 let region_name = region.into();
315 let key_id = access_key_id.into();
316 let secret = secret_access_key.into();
317
318 #[repr(C)]
320 struct S3Table {
321 base: ffi::sqlite3_vtab,
322 bucket: String,
323 region: String,
324 access_key_id: String,
325 secret_access_key: String,
326 }
327
328 #[repr(C)]
330 struct S3Cursor {
331 base: ffi::sqlite3_vtab_cursor,
332 files: Vec<crate::types::FileMetadata>,
333 current_row: usize,
334 bucket: String,
335 region: String,
336 access_key_id: String,
337 secret_access_key: String,
338 }
339
340 impl S3Cursor {
341 fn new(
342 bucket: String,
343 region: String,
344 access_key_id: String,
345 secret_access_key: String,
346 ) -> Self {
347 Self {
348 base: ffi::sqlite3_vtab_cursor::default(),
349 files: Vec::new(),
350 current_row: 0,
351 bucket,
352 region,
353 access_key_id,
354 secret_access_key,
355 }
356 }
357 }
358
359 unsafe impl VTabCursor for S3Cursor {
360 fn filter(
361 &mut self,
362 _idx_num: c_int,
363 _idx_str: Option<&str>,
364 _args: &vtab::Values<'_>,
365 ) -> rusqlite::Result<()> {
366 let mut backend = S3Backend::new(&self.bucket, &self.region);
368 if !self.access_key_id.is_empty() {
369 backend = backend.with_credentials(&self.access_key_id, &self.secret_access_key);
370 }
371 let config = QueryConfig::default();
372
373 let files = tokio::task::block_in_place(|| {
375 tokio::runtime::Handle::current().block_on(async {
376 backend.list_files(&config).await
377 })
378 })
379 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
380
381 self.files = files;
382 self.current_row = 0;
383 Ok(())
384 }
385
386 fn next(&mut self) -> rusqlite::Result<()> {
387 self.current_row += 1;
388 Ok(())
389 }
390
391 fn eof(&self) -> bool {
392 self.current_row >= self.files.len()
393 }
394
395 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
396 if self.current_row >= self.files.len() {
397 return Ok(());
398 }
399
400 let file = &self.files[self.current_row];
401
402 match col_index {
403 columns::PATH => ctx.set_result(&file.path),
404 columns::SIZE => ctx.set_result(&(file.size as i64)),
405 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
406 columns::ETAG => ctx.set_result(&file.etag),
407 columns::IS_DIR => ctx.set_result(&file.is_dir),
408 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
409 columns::NAME => ctx.set_result(&file.name),
410 columns::CONTENT => {
411 if let Some(ref content) = file.content {
412 ctx.set_result(&content.as_slice())
413 } else {
414 ctx.set_result::<Option<&[u8]>>(&None)
415 }
416 }
417 _ => Ok(()),
418 }
419 }
420
421 fn rowid(&self) -> rusqlite::Result<i64> {
422 Ok(self.current_row as i64)
423 }
424 }
425
426 impl vtab::CreateVTab<'_> for S3Table {
427 const KIND: VTabKind = VTabKind::EponymousOnly;
428 }
429
430 unsafe impl VTab<'_> for S3Table {
431 type Aux = (String, String, String, String);
432 type Cursor = S3Cursor;
433
434 fn connect(
435 _db: &mut vtab::VTabConnection,
436 aux: Option<&Self::Aux>,
437 _args: &[&[u8]],
438 ) -> rusqlite::Result<(String, Self)> {
439 let schema = "
440 CREATE TABLE x(
441 path TEXT,
442 size INTEGER,
443 last_modified TEXT,
444 etag TEXT,
445 is_dir INTEGER,
446 content_type TEXT,
447 name TEXT,
448 content BLOB
449 )
450 ";
451
452 let (bucket, region, access_key_id, secret_access_key) =
453 if let Some((b, r, k, s)) = aux {
454 (b.clone(), r.clone(), k.clone(), s.clone())
455 } else {
456 (
457 "".to_string(),
458 "us-east-1".to_string(),
459 "".to_string(),
460 "".to_string(),
461 )
462 };
463
464 Ok((
465 schema.to_owned(),
466 S3Table {
467 base: ffi::sqlite3_vtab::default(),
468 bucket,
469 region,
470 access_key_id,
471 secret_access_key,
472 },
473 ))
474 }
475
476 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
477 info.set_estimated_cost(1000.0);
478 Ok(())
479 }
480
481 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
482 Ok(S3Cursor::new(
483 self.bucket.clone(),
484 self.region.clone(),
485 self.access_key_id.clone(),
486 self.secret_access_key.clone(),
487 ))
488 }
489 }
490
491 conn.create_module(
492 module_name,
493 eponymous_only_module::<S3Table>(),
494 Some((bucket_name, region_name, key_id, secret)),
495 )
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501
502 #[test]
503 fn test_backend_creation() {
504 let backend = S3Backend::new("my-bucket", "us-east-1");
505 assert_eq!(backend.bucket, "my-bucket");
506 assert_eq!(backend.region, "us-east-1");
507 assert_eq!(backend.backend_name(), "s3");
508 assert!(backend.access_key_id.is_none());
509 assert!(backend.secret_access_key.is_none());
510 }
511
512 #[test]
513 fn test_backend_with_credentials() {
514 let backend = S3Backend::new("my-bucket", "us-west-2")
515 .with_credentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI");
516 assert_eq!(backend.bucket, "my-bucket");
517 assert_eq!(backend.region, "us-west-2");
518 assert_eq!(
519 backend.access_key_id,
520 Some("AKIAIOSFODNN7EXAMPLE".to_string())
521 );
522 assert_eq!(backend.secret_access_key, Some("wJalrXUtnFEMI".to_string()));
523 }
524
525 #[test]
526 fn test_backend_with_base_path() {
527 let backend = S3Backend::new("my-bucket", "eu-west-1").with_base_path("data/2024/");
528 assert_eq!(backend.base_path, "data/2024/");
529 }
530
531 #[test]
532 fn test_backend_builder_pattern() {
533 let backend = S3Backend::new("test-bucket", "ap-south-1")
534 .with_credentials("key", "secret")
535 .with_base_path("logs/");
536 assert_eq!(backend.bucket, "test-bucket");
537 assert_eq!(backend.region, "ap-south-1");
538 assert_eq!(backend.access_key_id, Some("key".to_string()));
539 assert_eq!(backend.base_path, "logs/");
540 }
541
542 }