Skip to main content

sqlite_vtable_opendal/backends/
local_fs.rs

1//! Local filesystem backend implementation
2//!
3//! This backend allows querying local directories using SQL.
4//! It's useful for file discovery, auditing, and testing.
5
6use crate::backends::StorageBackend;
7use crate::error::{Result, VTableError};
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::Fs, EntryMode, Metakey, Operator};
12use std::path::Path;
13
14/// Local filesystem storage backend
15///
16/// This backend uses OpenDAL's Fs service to list files from local directories.
17///
18/// # Example
19///
20/// ```rust,ignore
21/// use sqlite_vtable_opendal::backends::local_fs::LocalFsBackend;
22/// use sqlite_vtable_opendal::types::QueryConfig;
23///
24/// let backend = LocalFsBackend::new("/path/to/directory");
25/// let config = QueryConfig::default();
26/// let files = backend.list_files(&config).await?;
27/// ```
28pub struct LocalFsBackend {
29    /// Root directory path
30    root_path: String,
31}
32
33impl LocalFsBackend {
34    /// Create a new local filesystem backend
35    ///
36    /// # Arguments
37    ///
38    /// * `root_path` - The root directory to query from
39    ///
40    /// # Example
41    ///
42    /// ```
43    /// use sqlite_vtable_opendal::backends::local_fs::LocalFsBackend;
44    ///
45    /// let backend = LocalFsBackend::new("/tmp");
46    /// ```
47    pub fn new(root_path: impl Into<String>) -> Self {
48        Self {
49            root_path: root_path.into(),
50        }
51    }
52
53    /// Create an OpenDAL operator for the local filesystem
54    fn create_operator(&self) -> Result<Operator> {
55        let builder = Fs::default().root(&self.root_path);
56
57        Operator::new(builder)
58            .map(|op| op.finish())
59            .map_err(|e| VTableError::OpenDal(e))
60    }
61}
62
63#[async_trait]
64impl StorageBackend for LocalFsBackend {
65    async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
66        let operator = self.create_operator()?;
67        let mut results = Vec::new();
68
69        // Normalize the path
70        let normalized_path = if config.root_path == "/" || config.root_path.is_empty() {
71            "".to_string()
72        } else {
73            config.root_path.trim_matches('/').to_string()
74        };
75
76        // Create lister with metadata keys
77        let lister_builder = operator.lister_with(&normalized_path);
78
79        let mut lister = lister_builder
80            .recursive(config.recursive)
81            .metakey(
82                Metakey::ContentLength
83                    | Metakey::ContentType
84                    | Metakey::Mode
85                    | Metakey::LastModified,
86            )
87            .await
88            .map_err(|e| VTableError::OpenDal(e))?;
89
90        // Iterate through entries
91        while let Some(entry) = lister.try_next().await.map_err(|e| VTableError::OpenDal(e))? {
92            let entry_path = entry.path();
93            let entry_mode = entry.metadata().mode();
94
95            // Skip the root directory itself
96            if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
97                continue;
98            }
99
100            let full_path = if entry_path.starts_with('/') {
101                entry_path.to_string()
102            } else {
103                format!("/{}", entry_path)
104            };
105
106            // Extract file name from path
107            let name = Path::new(&full_path)
108                .file_name()
109                .map(|s| s.to_string_lossy().to_string())
110                .unwrap_or_else(|| {
111                    let clean_path = entry_path.trim_end_matches('/');
112                    Path::new(clean_path)
113                        .file_name()
114                        .map(|s| s.to_string_lossy().to_string())
115                        .unwrap_or_default()
116                });
117
118            if entry_mode == EntryMode::FILE {
119                // Fetch detailed metadata for files
120                let metadata = operator
121                    .stat(&full_path)
122                    .await
123                    .map_err(|e| VTableError::OpenDal(e))?;
124
125                // Optionally fetch content
126                let content = if config.fetch_content {
127                    operator
128                        .read(&full_path)
129                        .await
130                        .ok()
131                        .map(|bytes| bytes.to_vec())
132                } else {
133                    None
134                };
135
136                results.push(FileMetadata {
137                    name,
138                    path: full_path.clone(),
139                    size: metadata.content_length(),
140                    last_modified: metadata.last_modified().map(|dt| dt.to_string()),
141                    etag: metadata.etag().map(|e| e.to_string()),
142                    is_dir: false,
143                    content_type: Path::new(&full_path)
144                        .extension()
145                        .and_then(|ext| ext.to_str())
146                        .map(|ext| ext.to_string()),
147                    content,
148                });
149
150                // Apply limit if specified
151                if let Some(limit) = config.limit {
152                    if results.len() >= limit + config.offset {
153                        break;
154                    }
155                }
156            } else if entry_mode == EntryMode::DIR {
157                // Add directory entry
158                results.push(FileMetadata {
159                    name,
160                    path: full_path,
161                    size: 0,
162                    last_modified: None,
163                    etag: None,
164                    is_dir: true,
165                    content_type: Some("directory".to_string()),
166                    content: None,
167                });
168
169                // Apply limit if specified
170                if let Some(limit) = config.limit {
171                    if results.len() >= limit + config.offset {
172                        break;
173                    }
174                }
175            }
176        }
177
178        // Apply offset
179        if config.offset > 0 && config.offset < results.len() {
180            results = results.into_iter().skip(config.offset).collect();
181        }
182
183        Ok(results)
184    }
185
186    fn backend_name(&self) -> &'static str {
187        "local_fs"
188    }
189}
190
191/// Register the local_fs virtual table with SQLite
192///
193/// This function creates a virtual table module that allows querying
194/// local directories using SQL.
195///
196/// # Arguments
197///
198/// * `conn` - SQLite connection
199/// * `module_name` - Name for the virtual table (e.g., "local_files")
200/// * `root_path` - Root directory to query
201///
202/// # Example
203///
204/// ```rust,ignore
205/// use rusqlite::Connection;
206/// use sqlite_vtable_opendal::backends::local_fs;
207///
208/// let conn = Connection::open_in_memory()?;
209/// local_fs::register(&conn, "local_files", "/tmp")?;
210///
211/// // Now you can query: SELECT * FROM local_files
212/// ```
213pub fn register(
214    conn: &rusqlite::Connection,
215    module_name: &str,
216    root_path: impl Into<String>,
217) -> rusqlite::Result<()> {
218    use crate::types::{columns, QueryConfig};
219    use rusqlite::{
220        ffi,
221        vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
222    };
223    use std::os::raw::c_int;
224
225    let root = root_path.into();
226
227    // Create a specific table type for local_fs
228    #[repr(C)]
229    struct LocalFsTable {
230        base: ffi::sqlite3_vtab,
231        root_path: String,
232    }
233
234    // Create a specific cursor type for local_fs
235    #[repr(C)]
236    struct LocalFsCursor {
237        base: ffi::sqlite3_vtab_cursor,
238        files: Vec<crate::types::FileMetadata>,
239        current_row: usize,
240        root_path: String,
241    }
242
243    impl LocalFsCursor {
244        fn new(root_path: String) -> Self {
245            Self {
246                base: ffi::sqlite3_vtab_cursor::default(),
247                files: Vec::new(),
248                current_row: 0,
249                root_path,
250            }
251        }
252    }
253
254    unsafe impl VTabCursor for LocalFsCursor {
255        fn filter(
256            &mut self,
257            _idx_num: c_int,
258            _idx_str: Option<&str>,
259            _args: &vtab::Values<'_>,
260        ) -> rusqlite::Result<()> {
261            // Create backend and fetch files
262            let backend = LocalFsBackend::new(&self.root_path);
263            let config = QueryConfig::default();
264
265            // Fetch files from the backend (blocking the async call)
266            let files = tokio::task::block_in_place(|| {
267                tokio::runtime::Handle::current().block_on(async {
268                    backend.list_files(&config).await
269                })
270            })
271            .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
272
273            self.files = files;
274            self.current_row = 0;
275            Ok(())
276        }
277
278        fn next(&mut self) -> rusqlite::Result<()> {
279            self.current_row += 1;
280            Ok(())
281        }
282
283        fn eof(&self) -> bool {
284            self.current_row >= self.files.len()
285        }
286
287        fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
288            if self.current_row >= self.files.len() {
289                return Ok(());
290            }
291
292            let file = &self.files[self.current_row];
293
294            match col_index {
295                columns::PATH => ctx.set_result(&file.path),
296                columns::SIZE => ctx.set_result(&(file.size as i64)),
297                columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
298                columns::ETAG => ctx.set_result(&file.etag),
299                columns::IS_DIR => ctx.set_result(&file.is_dir),
300                columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
301                columns::NAME => ctx.set_result(&file.name),
302                columns::CONTENT => {
303                    if let Some(ref content) = file.content {
304                        ctx.set_result(&content.as_slice())
305                    } else {
306                        ctx.set_result::<Option<&[u8]>>(&None)
307                    }
308                }
309                _ => Ok(()),
310            }
311        }
312
313        fn rowid(&self) -> rusqlite::Result<i64> {
314            Ok(self.current_row as i64)
315        }
316    }
317
318    impl vtab::CreateVTab<'_> for LocalFsTable {
319        const KIND: VTabKind = VTabKind::EponymousOnly;
320    }
321
322    unsafe impl VTab<'_> for LocalFsTable {
323        type Aux = String;
324        type Cursor = LocalFsCursor;
325
326        fn connect(
327            _db: &mut vtab::VTabConnection,
328            aux: Option<&Self::Aux>,
329            _args: &[&[u8]],
330        ) -> rusqlite::Result<(String, Self)> {
331            let schema = "
332                CREATE TABLE x(
333                    path TEXT,
334                    size INTEGER,
335                    last_modified TEXT,
336                    etag TEXT,
337                    is_dir INTEGER,
338                    content_type TEXT,
339                    name TEXT,
340                    content BLOB
341                )
342            ";
343
344            let root_path = aux.cloned().unwrap_or_else(|| "/".to_string());
345
346            Ok((
347                schema.to_owned(),
348                LocalFsTable {
349                    base: ffi::sqlite3_vtab::default(),
350                    root_path,
351                },
352            ))
353        }
354
355        fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
356            info.set_estimated_cost(100.0);
357            Ok(())
358        }
359
360        fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
361            Ok(LocalFsCursor::new(self.root_path.clone()))
362        }
363    }
364
365    conn.create_module(module_name, eponymous_only_module::<LocalFsTable>(), Some(root))
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use std::fs;
372    use tempfile::TempDir;
373
374    #[tokio::test]
375    async fn test_list_empty_directory() {
376        let temp_dir = TempDir::new().unwrap();
377        let backend = LocalFsBackend::new(temp_dir.path().to_str().unwrap());
378        let config = QueryConfig::default();
379
380        let files = backend.list_files(&config).await.unwrap();
381        assert!(files.is_empty());
382    }
383
384    #[tokio::test]
385    async fn test_list_files() {
386        let temp_dir = TempDir::new().unwrap();
387        let temp_path = temp_dir.path();
388
389        // Create test files
390        fs::write(temp_path.join("file1.txt"), "content1").unwrap();
391        fs::write(temp_path.join("file2.txt"), "content2").unwrap();
392
393        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
394        let config = QueryConfig::default();
395
396        let files = backend.list_files(&config).await.unwrap();
397        assert_eq!(files.len(), 2);
398
399        // Check that files are listed
400        assert!(files.iter().any(|f| f.name == "file1.txt"));
401        assert!(files.iter().any(|f| f.name == "file2.txt"));
402    }
403
404    #[tokio::test]
405    async fn test_file_metadata() {
406        let temp_dir = TempDir::new().unwrap();
407        let temp_path = temp_dir.path();
408
409        fs::write(temp_path.join("test.txt"), "hello world").unwrap();
410
411        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
412        let config = QueryConfig::default();
413
414        let files = backend.list_files(&config).await.unwrap();
415        assert_eq!(files.len(), 1);
416
417        let file = &files[0];
418        assert_eq!(file.name, "test.txt");
419        assert_eq!(file.size, 11); // "hello world" is 11 bytes
420        assert!(!file.is_dir);
421        assert_eq!(file.content_type, Some("txt".to_string()));
422        assert!(file.content.is_none()); // Not fetched by default
423    }
424
425    #[tokio::test]
426    async fn test_fetch_content() {
427        let temp_dir = TempDir::new().unwrap();
428        let temp_path = temp_dir.path();
429
430        fs::write(temp_path.join("test.txt"), "hello world").unwrap();
431
432        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
433        let config = QueryConfig {
434            fetch_content: true,
435            ..Default::default()
436        };
437
438        let files = backend.list_files(&config).await.unwrap();
439        assert_eq!(files.len(), 1);
440
441        let file = &files[0];
442        assert!(file.content.is_some());
443        assert_eq!(file.content.as_ref().unwrap(), b"hello world");
444    }
445
446    #[tokio::test]
447    async fn test_recursive_listing() {
448        let temp_dir = TempDir::new().unwrap();
449        let temp_path = temp_dir.path();
450
451        // Create nested structure
452        fs::create_dir(temp_path.join("subdir")).unwrap();
453        fs::write(temp_path.join("file1.txt"), "content1").unwrap();
454        fs::write(temp_path.join("subdir/file2.txt"), "content2").unwrap();
455
456        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
457        let config = QueryConfig {
458            recursive: true,
459            ..Default::default()
460        };
461
462        let files = backend.list_files(&config).await.unwrap();
463
464        // Should have: subdir (directory), file1.txt, subdir/file2.txt
465        assert!(files.len() >= 2); // At least the files, maybe the directory too
466
467        assert!(files.iter().any(|f| f.name == "file1.txt"));
468        assert!(files.iter().any(|f| f.name == "file2.txt"));
469    }
470
471    #[tokio::test(flavor = "multi_thread")]
472    async fn test_sqlite_integration() {
473        use rusqlite::Connection;
474
475        let temp_dir = TempDir::new().unwrap();
476        let temp_path = temp_dir.path();
477
478        // Create test files
479        fs::write(temp_path.join("large.txt"), "x".repeat(10000)).unwrap();
480        fs::write(temp_path.join("small.txt"), "tiny").unwrap();
481        fs::write(temp_path.join("medium.txt"), "medium content").unwrap();
482
483        // Open SQLite connection and register virtual table
484        let conn = Connection::open_in_memory().unwrap();
485        register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
486
487        // Query all files
488        let mut stmt = conn
489            .prepare("SELECT name, size, is_dir FROM local_files ORDER BY name")
490            .unwrap();
491
492        let files: Vec<(String, i64, bool)> = stmt
493            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
494            .unwrap()
495            .collect::<std::result::Result<Vec<_>, _>>()
496            .unwrap();
497
498        assert_eq!(files.len(), 3);
499        assert!(files.iter().any(|(name, _, _)| name == "large.txt"));
500        assert!(files.iter().any(|(name, _, _)| name == "small.txt"));
501        assert!(files.iter().any(|(name, _, _)| name == "medium.txt"));
502
503        // Query with WHERE clause
504        let mut stmt = conn
505            .prepare("SELECT name FROM local_files WHERE size > 100")
506            .unwrap();
507
508        let large_files: Vec<String> = stmt
509            .query_map([], |row| row.get(0))
510            .unwrap()
511            .collect::<std::result::Result<Vec<_>, _>>()
512            .unwrap();
513
514        // Only large.txt should be returned
515        assert_eq!(large_files.len(), 1);
516        assert_eq!(large_files[0], "large.txt");
517    }
518
519    #[tokio::test(flavor = "multi_thread")]
520    async fn test_sqlite_count_and_aggregate() {
521        use rusqlite::Connection;
522
523        let temp_dir = TempDir::new().unwrap();
524        let temp_path = temp_dir.path();
525
526        // Create multiple files
527        for i in 1..=5 {
528            fs::write(temp_path.join(format!("file{}.txt", i)), format!("content{}", i))
529                .unwrap();
530        }
531
532        let conn = Connection::open_in_memory().unwrap();
533        register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
534
535        // Test COUNT
536        let count: i64 = conn
537            .query_row("SELECT COUNT(*) FROM local_files", [], |row| row.get(0))
538            .unwrap();
539        assert_eq!(count, 5);
540
541        // Test SUM of sizes
542        let total_size: i64 = conn
543            .query_row("SELECT SUM(size) FROM local_files", [], |row| row.get(0))
544            .unwrap();
545        assert!(total_size > 0);
546
547        // Test ORDER BY
548        let first_file: String = conn
549            .query_row(
550                "SELECT name FROM local_files ORDER BY name LIMIT 1",
551                [],
552                |row| row.get(0),
553            )
554            .unwrap();
555        assert_eq!(first_file, "file1.txt");
556    }
557}