sqlite_vtable_opendal/backends/
s3.rs1use crate::backends::StorageBackend;
7use crate::error::Result;
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::S3, EntryMode, Operator};
12use std::path::Path;
13
14pub struct S3Backend {
37 bucket: String,
39 region: String,
41 access_key_id: Option<String>,
43 secret_access_key: Option<String>,
45 base_path: String,
47}
48
49impl S3Backend {
50 pub fn new(bucket: impl Into<String>, region: impl Into<String>) -> Self {
65 Self {
66 bucket: bucket.into(),
67 region: region.into(),
68 access_key_id: None,
69 secret_access_key: None,
70 base_path: "/".to_string(),
71 }
72 }
73
74 pub fn with_credentials(
90 mut self,
91 access_key_id: impl Into<String>,
92 secret_access_key: impl Into<String>,
93 ) -> Self {
94 self.access_key_id = Some(access_key_id.into());
95 self.secret_access_key = Some(secret_access_key.into());
96 self
97 }
98
99 pub fn with_base_path(mut self, path: impl Into<String>) -> Self {
114 self.base_path = path.into();
115 self
116 }
117
118 fn create_operator(&self) -> Result<Operator> {
120 let mut builder = S3::default()
121 .bucket(&self.bucket)
122 .region(&self.region)
123 .root(&self.base_path);
124
125 if let (Some(key_id), Some(secret)) = (&self.access_key_id, &self.secret_access_key) {
127 builder = builder.access_key_id(key_id).secret_access_key(secret);
128 }
129
130 Ok(Operator::new(builder)?
131 .finish())
132 }
133}
134
135#[async_trait]
136impl StorageBackend for S3Backend {
137 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
138 let operator = self.create_operator()?;
139 let mut results = Vec::new();
140
141 let normalized_path = if config.root_path.is_empty() || config.root_path == "/" {
143 "".to_string()
144 } else {
145 config.root_path.trim_matches('/').to_string()
146 };
147
148 let mut lister = operator
150 .lister_with(&normalized_path)
151 .recursive(config.recursive)
152 .await?;
153
154 while let Some(entry) = lister.try_next().await? {
156 let entry_path = entry.path();
157 let entry_mode = entry.metadata().mode();
158
159 if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
161 continue;
162 }
163
164 let full_path = if entry_path.starts_with('/') {
165 entry_path.to_string()
166 } else {
167 format!("/{}", entry_path)
168 };
169
170 let name = Path::new(&full_path)
172 .file_name()
173 .map(|s| s.to_string_lossy().to_string())
174 .unwrap_or_else(|| {
175 let clean_path = entry_path.trim_end_matches('/');
176 Path::new(clean_path)
177 .file_name()
178 .map(|s| s.to_string_lossy().to_string())
179 .unwrap_or_default()
180 });
181
182 if entry_mode == EntryMode::FILE {
183 let metadata = operator
185 .stat(&full_path)
186 .await
187 ?;
188
189 let content = if config.fetch_content {
191 operator
192 .read(&full_path)
193 .await
194 .ok()
195 .map(|bytes| bytes.to_vec())
196 } else {
197 None
198 };
199
200 results.push(FileMetadata {
201 name,
202 path: full_path.clone(),
203 size: metadata.content_length(),
204 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
205 etag: metadata
206 .etag()
207 .or_else(|| metadata.content_md5())
208 .map(|e| e.to_string()),
209 is_dir: false,
210 content_type: metadata
211 .content_type()
212 .map(|ct| ct.to_string())
213 .or_else(|| {
214 Path::new(&full_path)
215 .extension()
216 .and_then(|ext| ext.to_str())
217 .map(|ext| ext.to_string())
218 }),
219 content,
220 });
221
222 if let Some(limit) = config.limit {
224 if results.len() >= limit + config.offset {
225 break;
226 }
227 }
228 } else if entry_mode == EntryMode::DIR {
229 results.push(FileMetadata {
231 name,
232 path: full_path,
233 size: 0,
234 last_modified: None,
235 etag: None,
236 is_dir: true,
237 content_type: Some("directory".to_string()),
238 content: None,
239 });
240
241 if let Some(limit) = config.limit {
243 if results.len() >= limit + config.offset {
244 break;
245 }
246 }
247 }
248 }
249
250 if config.offset > 0 && config.offset < results.len() {
252 results = results.into_iter().skip(config.offset).collect();
253 }
254
255 Ok(results)
256 }
257
258 fn backend_name(&self) -> &'static str {
259 "s3"
260 }
261}
262
263pub fn register(
289 conn: &rusqlite::Connection,
290 module_name: &str,
291 bucket: impl Into<String>,
292 region: impl Into<String>,
293 access_key_id: impl Into<String>,
294 secret_access_key: impl Into<String>,
295) -> rusqlite::Result<()> {
296 use crate::types::{columns, QueryConfig};
297 use rusqlite::{
298 ffi,
299 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
300 };
301 use std::os::raw::c_int;
302
303 let bucket_name = bucket.into();
304 let region_name = region.into();
305 let key_id = access_key_id.into();
306 let secret = secret_access_key.into();
307
308 #[repr(C)]
310 struct S3Table {
311 base: ffi::sqlite3_vtab,
312 bucket: String,
313 region: String,
314 access_key_id: String,
315 secret_access_key: String,
316 }
317
318 #[repr(C)]
320 struct S3Cursor {
321 base: ffi::sqlite3_vtab_cursor,
322 files: Vec<crate::types::FileMetadata>,
323 current_row: usize,
324 bucket: String,
325 region: String,
326 access_key_id: String,
327 secret_access_key: String,
328 }
329
330 impl S3Cursor {
331 fn new(
332 bucket: String,
333 region: String,
334 access_key_id: String,
335 secret_access_key: String,
336 ) -> Self {
337 Self {
338 base: ffi::sqlite3_vtab_cursor::default(),
339 files: Vec::new(),
340 current_row: 0,
341 bucket,
342 region,
343 access_key_id,
344 secret_access_key,
345 }
346 }
347 }
348
349 unsafe impl VTabCursor for S3Cursor {
350 fn filter(
351 &mut self,
352 _idx_num: c_int,
353 _idx_str: Option<&str>,
354 _args: &vtab::Filters<'_>,
355 ) -> rusqlite::Result<()> {
356 let mut backend = S3Backend::new(&self.bucket, &self.region);
358 if !self.access_key_id.is_empty() {
359 backend = backend.with_credentials(&self.access_key_id, &self.secret_access_key);
360 }
361 let config = QueryConfig::default();
362
363 let files = tokio::task::block_in_place(|| {
365 tokio::runtime::Handle::current().block_on(async {
366 backend.list_files(&config).await
367 })
368 })
369 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
370
371 self.files = files;
372 self.current_row = 0;
373 Ok(())
374 }
375
376 fn next(&mut self) -> rusqlite::Result<()> {
377 self.current_row += 1;
378 Ok(())
379 }
380
381 fn eof(&self) -> bool {
382 self.current_row >= self.files.len()
383 }
384
385 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
386 if self.current_row >= self.files.len() {
387 return Ok(());
388 }
389
390 let file = &self.files[self.current_row];
391
392 match col_index {
393 columns::PATH => ctx.set_result(&file.path),
394 columns::SIZE => ctx.set_result(&(file.size as i64)),
395 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
396 columns::ETAG => ctx.set_result(&file.etag),
397 columns::IS_DIR => ctx.set_result(&file.is_dir),
398 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
399 columns::NAME => ctx.set_result(&file.name),
400 columns::CONTENT => {
401 if let Some(ref content) = file.content {
402 ctx.set_result(&content.as_slice())
403 } else {
404 ctx.set_result::<Option<&[u8]>>(&None)
405 }
406 }
407 _ => Ok(()),
408 }
409 }
410
411 fn rowid(&self) -> rusqlite::Result<i64> {
412 Ok(self.current_row as i64)
413 }
414 }
415
416 impl vtab::CreateVTab<'_> for S3Table {
417 const KIND: VTabKind = VTabKind::EponymousOnly;
418 }
419
420 unsafe impl VTab<'_> for S3Table {
421 type Aux = (String, String, String, String);
422 type Cursor = S3Cursor;
423
424 fn connect(
425 _db: &mut vtab::VTabConnection,
426 aux: Option<&Self::Aux>,
427 _args: &[&[u8]],
428 ) -> rusqlite::Result<(String, Self)> {
429 let schema = "
430 CREATE TABLE x(
431 path TEXT,
432 size INTEGER,
433 last_modified TEXT,
434 etag TEXT,
435 is_dir INTEGER,
436 content_type TEXT,
437 name TEXT,
438 content BLOB
439 )
440 ";
441
442 let (bucket, region, access_key_id, secret_access_key) =
443 if let Some((b, r, k, s)) = aux {
444 (b.clone(), r.clone(), k.clone(), s.clone())
445 } else {
446 (
447 "".to_string(),
448 "us-east-1".to_string(),
449 "".to_string(),
450 "".to_string(),
451 )
452 };
453
454 Ok((
455 schema.to_owned(),
456 S3Table {
457 base: ffi::sqlite3_vtab::default(),
458 bucket,
459 region,
460 access_key_id,
461 secret_access_key,
462 },
463 ))
464 }
465
466 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
467 info.set_estimated_cost(1000.0);
468 Ok(())
469 }
470
471 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
472 Ok(S3Cursor::new(
473 self.bucket.clone(),
474 self.region.clone(),
475 self.access_key_id.clone(),
476 self.secret_access_key.clone(),
477 ))
478 }
479 }
480
481 conn.create_module(
482 module_name,
483 eponymous_only_module::<S3Table>(),
484 Some((bucket_name, region_name, key_id, secret)),
485 )
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 #[test]
493 fn test_backend_creation() {
494 let backend = S3Backend::new("my-bucket", "us-east-1");
495 assert_eq!(backend.bucket, "my-bucket");
496 assert_eq!(backend.region, "us-east-1");
497 assert_eq!(backend.backend_name(), "s3");
498 assert!(backend.access_key_id.is_none());
499 assert!(backend.secret_access_key.is_none());
500 }
501
502 #[test]
503 fn test_backend_with_credentials() {
504 let backend = S3Backend::new("my-bucket", "us-west-2")
505 .with_credentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI");
506 assert_eq!(backend.bucket, "my-bucket");
507 assert_eq!(backend.region, "us-west-2");
508 assert_eq!(
509 backend.access_key_id,
510 Some("AKIAIOSFODNN7EXAMPLE".to_string())
511 );
512 assert_eq!(backend.secret_access_key, Some("wJalrXUtnFEMI".to_string()));
513 }
514
515 #[test]
516 fn test_backend_with_base_path() {
517 let backend = S3Backend::new("my-bucket", "eu-west-1").with_base_path("data/2024/");
518 assert_eq!(backend.base_path, "data/2024/");
519 }
520
521 #[test]
522 fn test_backend_builder_pattern() {
523 let backend = S3Backend::new("test-bucket", "ap-south-1")
524 .with_credentials("key", "secret")
525 .with_base_path("logs/");
526 assert_eq!(backend.bucket, "test-bucket");
527 assert_eq!(backend.region, "ap-south-1");
528 assert_eq!(backend.access_key_id, Some("key".to_string()));
529 assert_eq!(backend.base_path, "logs/");
530 }
531
532 }