Skip to main content

sqlite_vtable_opendal/backends/
local_fs.rs

1//! Local filesystem backend implementation
2//!
3//! This backend allows querying local directories using SQL.
4//! It's useful for file discovery, auditing, and testing.
5
6use crate::backends::StorageBackend;
7use crate::error::Result;
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::Fs, EntryMode, Operator};
12use std::path::Path;
13
14/// Local filesystem storage backend
15///
16/// This backend uses OpenDAL's Fs service to list files from local directories.
17///
18/// # Example
19///
20/// ```rust,ignore
21/// use sqlite_vtable_opendal::backends::local_fs::LocalFsBackend;
22/// use sqlite_vtable_opendal::types::QueryConfig;
23///
24/// let backend = LocalFsBackend::new("/path/to/directory");
25/// let config = QueryConfig::default();
26/// let files = backend.list_files(&config).await?;
27/// ```
28pub struct LocalFsBackend {
29    /// Root directory path
30    root_path: String,
31}
32
33impl LocalFsBackend {
34    /// Create a new local filesystem backend
35    ///
36    /// # Arguments
37    ///
38    /// * `root_path` - The root directory to query from
39    ///
40    /// # Example
41    ///
42    /// ```
43    /// use sqlite_vtable_opendal::backends::local_fs::LocalFsBackend;
44    ///
45    /// let backend = LocalFsBackend::new("/tmp");
46    /// ```
47    pub fn new(root_path: impl Into<String>) -> Self {
48        Self {
49            root_path: root_path.into(),
50        }
51    }
52
53    /// Create an OpenDAL operator for the local filesystem
54    fn create_operator(&self) -> Result<Operator> {
55        let builder = Fs::default().root(&self.root_path);
56
57        Ok(Operator::new(builder)?.finish())
58    }
59}
60
61#[async_trait]
62impl StorageBackend for LocalFsBackend {
63    async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
64        let operator = self.create_operator()?;
65        let mut results = Vec::new();
66
67        // Normalize the path
68        let normalized_path = if config.root_path == "/" || config.root_path.is_empty() {
69            "".to_string()
70        } else {
71            config.root_path.trim_matches('/').to_string()
72        };
73
74        // Create lister
75        let mut lister = operator
76            .lister_with(&normalized_path)
77            .recursive(config.recursive)
78            .await?;
79
80        // Iterate through entries
81        while let Some(entry) = lister.try_next().await? {
82            let entry_path = entry.path();
83            let entry_mode = entry.metadata().mode();
84
85            // Skip the root directory itself
86            if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
87                continue;
88            }
89
90            let full_path = if entry_path.starts_with('/') {
91                entry_path.to_string()
92            } else {
93                format!("/{}", entry_path)
94            };
95
96            // Extract file name from path
97            let name = Path::new(&full_path)
98                .file_name()
99                .map(|s| s.to_string_lossy().to_string())
100                .unwrap_or_else(|| {
101                    let clean_path = entry_path.trim_end_matches('/');
102                    Path::new(clean_path)
103                        .file_name()
104                        .map(|s| s.to_string_lossy().to_string())
105                        .unwrap_or_default()
106                });
107
108            if entry_mode == EntryMode::FILE {
109                // Fetch detailed metadata for files
110                let metadata = operator
111                    .stat(&full_path)
112                    .await
113                    ?;
114
115                // Optionally fetch content
116                let content = if config.fetch_content {
117                    operator
118                        .read(&full_path)
119                        .await
120                        .ok()
121                        .map(|bytes| bytes.to_vec())
122                } else {
123                    None
124                };
125
126                results.push(FileMetadata {
127                    name,
128                    path: full_path.clone(),
129                    size: metadata.content_length(),
130                    last_modified: metadata.last_modified().map(|dt| dt.to_string()),
131                    etag: metadata.etag().map(|e| e.to_string()),
132                    is_dir: false,
133                    content_type: Path::new(&full_path)
134                        .extension()
135                        .and_then(|ext| ext.to_str())
136                        .map(|ext| ext.to_string()),
137                    content,
138                });
139
140                // Apply limit if specified
141                if let Some(limit) = config.limit {
142                    if results.len() >= limit + config.offset {
143                        break;
144                    }
145                }
146            } else if entry_mode == EntryMode::DIR {
147                // Add directory entry
148                results.push(FileMetadata {
149                    name,
150                    path: full_path,
151                    size: 0,
152                    last_modified: None,
153                    etag: None,
154                    is_dir: true,
155                    content_type: Some("directory".to_string()),
156                    content: None,
157                });
158
159                // Apply limit if specified
160                if let Some(limit) = config.limit {
161                    if results.len() >= limit + config.offset {
162                        break;
163                    }
164                }
165            }
166        }
167
168        // Apply offset
169        if config.offset > 0 && config.offset < results.len() {
170            results = results.into_iter().skip(config.offset).collect();
171        }
172
173        Ok(results)
174    }
175
176    fn backend_name(&self) -> &'static str {
177        "local_fs"
178    }
179}
180
181/// Register the local_fs virtual table with SQLite
182///
183/// This function creates a virtual table module that allows querying
184/// local directories using SQL.
185///
186/// # Arguments
187///
188/// * `conn` - SQLite connection
189/// * `module_name` - Name for the virtual table (e.g., "local_files")
190/// * `root_path` - Root directory to query
191///
192/// # Example
193///
194/// ```rust,ignore
195/// use rusqlite::Connection;
196/// use sqlite_vtable_opendal::backends::local_fs;
197///
198/// let conn = Connection::open_in_memory()?;
199/// local_fs::register(&conn, "local_files", "/tmp")?;
200///
201/// // Now you can query: SELECT * FROM local_files
202/// ```
203pub fn register(
204    conn: &rusqlite::Connection,
205    module_name: &str,
206    root_path: impl Into<String>,
207) -> rusqlite::Result<()> {
208    use crate::types::{columns, QueryConfig};
209    use rusqlite::{
210        ffi,
211        vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
212    };
213    use std::os::raw::c_int;
214
215    let root = root_path.into();
216
217    // Create a specific table type for local_fs
218    #[repr(C)]
219    struct LocalFsTable {
220        base: ffi::sqlite3_vtab,
221        root_path: String,
222    }
223
224    // Create a specific cursor type for local_fs
225    #[repr(C)]
226    struct LocalFsCursor {
227        base: ffi::sqlite3_vtab_cursor,
228        files: Vec<crate::types::FileMetadata>,
229        current_row: usize,
230        root_path: String,
231    }
232
233    impl LocalFsCursor {
234        fn new(root_path: String) -> Self {
235            Self {
236                base: ffi::sqlite3_vtab_cursor::default(),
237                files: Vec::new(),
238                current_row: 0,
239                root_path,
240            }
241        }
242    }
243
244    unsafe impl VTabCursor for LocalFsCursor {
245        fn filter(
246            &mut self,
247            _idx_num: c_int,
248            _idx_str: Option<&str>,
249            _args: &vtab::Filters<'_>,
250        ) -> rusqlite::Result<()> {
251            // Create backend and fetch files
252            let backend = LocalFsBackend::new(&self.root_path);
253            let config = QueryConfig::default();
254
255            // Fetch files from the backend (blocking the async call)
256            let files = tokio::task::block_in_place(|| {
257                tokio::runtime::Handle::current().block_on(async {
258                    backend.list_files(&config).await
259                })
260            })
261            .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
262
263            self.files = files;
264            self.current_row = 0;
265            Ok(())
266        }
267
268        fn next(&mut self) -> rusqlite::Result<()> {
269            self.current_row += 1;
270            Ok(())
271        }
272
273        fn eof(&self) -> bool {
274            self.current_row >= self.files.len()
275        }
276
277        fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
278            if self.current_row >= self.files.len() {
279                return Ok(());
280            }
281
282            let file = &self.files[self.current_row];
283
284            match col_index {
285                columns::PATH => ctx.set_result(&file.path),
286                columns::SIZE => ctx.set_result(&(file.size as i64)),
287                columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
288                columns::ETAG => ctx.set_result(&file.etag),
289                columns::IS_DIR => ctx.set_result(&file.is_dir),
290                columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
291                columns::NAME => ctx.set_result(&file.name),
292                columns::CONTENT => {
293                    if let Some(ref content) = file.content {
294                        ctx.set_result(&content.as_slice())
295                    } else {
296                        ctx.set_result::<Option<&[u8]>>(&None)
297                    }
298                }
299                _ => Ok(()),
300            }
301        }
302
303        fn rowid(&self) -> rusqlite::Result<i64> {
304            Ok(self.current_row as i64)
305        }
306    }
307
308    impl vtab::CreateVTab<'_> for LocalFsTable {
309        const KIND: VTabKind = VTabKind::EponymousOnly;
310    }
311
312    unsafe impl VTab<'_> for LocalFsTable {
313        type Aux = String;
314        type Cursor = LocalFsCursor;
315
316        fn connect(
317            _db: &mut vtab::VTabConnection,
318            aux: Option<&Self::Aux>,
319            _args: &[&[u8]],
320        ) -> rusqlite::Result<(String, Self)> {
321            let schema = "
322                CREATE TABLE x(
323                    path TEXT,
324                    size INTEGER,
325                    last_modified TEXT,
326                    etag TEXT,
327                    is_dir INTEGER,
328                    content_type TEXT,
329                    name TEXT,
330                    content BLOB
331                )
332            ";
333
334            let root_path = aux.cloned().unwrap_or_else(|| "/".to_string());
335
336            Ok((
337                schema.to_owned(),
338                LocalFsTable {
339                    base: ffi::sqlite3_vtab::default(),
340                    root_path,
341                },
342            ))
343        }
344
345        fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
346            info.set_estimated_cost(100.0);
347            Ok(())
348        }
349
350        fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
351            Ok(LocalFsCursor::new(self.root_path.clone()))
352        }
353    }
354
355    conn.create_module(module_name, eponymous_only_module::<LocalFsTable>(), Some(root))
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use std::fs;
362    use tempfile::TempDir;
363
364    #[tokio::test]
365    async fn test_list_empty_directory() {
366        let temp_dir = TempDir::new().unwrap();
367        let backend = LocalFsBackend::new(temp_dir.path().to_str().unwrap());
368        let config = QueryConfig::default();
369
370        let files = backend.list_files(&config).await.unwrap();
371        assert!(files.is_empty());
372    }
373
374    #[tokio::test]
375    async fn test_list_files() {
376        let temp_dir = TempDir::new().unwrap();
377        let temp_path = temp_dir.path();
378
379        // Create test files
380        fs::write(temp_path.join("file1.txt"), "content1").unwrap();
381        fs::write(temp_path.join("file2.txt"), "content2").unwrap();
382
383        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
384        let config = QueryConfig::default();
385
386        let files = backend.list_files(&config).await.unwrap();
387        assert_eq!(files.len(), 2);
388
389        // Check that files are listed
390        assert!(files.iter().any(|f| f.name == "file1.txt"));
391        assert!(files.iter().any(|f| f.name == "file2.txt"));
392    }
393
394    #[tokio::test]
395    async fn test_file_metadata() {
396        let temp_dir = TempDir::new().unwrap();
397        let temp_path = temp_dir.path();
398
399        fs::write(temp_path.join("test.txt"), "hello world").unwrap();
400
401        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
402        let config = QueryConfig::default();
403
404        let files = backend.list_files(&config).await.unwrap();
405        assert_eq!(files.len(), 1);
406
407        let file = &files[0];
408        assert_eq!(file.name, "test.txt");
409        assert_eq!(file.size, 11); // "hello world" is 11 bytes
410        assert!(!file.is_dir);
411        assert_eq!(file.content_type, Some("txt".to_string()));
412        assert!(file.content.is_none()); // Not fetched by default
413    }
414
415    #[tokio::test]
416    async fn test_fetch_content() {
417        let temp_dir = TempDir::new().unwrap();
418        let temp_path = temp_dir.path();
419
420        fs::write(temp_path.join("test.txt"), "hello world").unwrap();
421
422        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
423        let config = QueryConfig {
424            fetch_content: true,
425            ..Default::default()
426        };
427
428        let files = backend.list_files(&config).await.unwrap();
429        assert_eq!(files.len(), 1);
430
431        let file = &files[0];
432        assert!(file.content.is_some());
433        assert_eq!(file.content.as_ref().unwrap(), b"hello world");
434    }
435
436    #[tokio::test]
437    async fn test_recursive_listing() {
438        let temp_dir = TempDir::new().unwrap();
439        let temp_path = temp_dir.path();
440
441        // Create nested structure
442        fs::create_dir(temp_path.join("subdir")).unwrap();
443        fs::write(temp_path.join("file1.txt"), "content1").unwrap();
444        fs::write(temp_path.join("subdir/file2.txt"), "content2").unwrap();
445
446        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
447        let config = QueryConfig {
448            recursive: true,
449            ..Default::default()
450        };
451
452        let files = backend.list_files(&config).await.unwrap();
453
454        // Should have: subdir (directory), file1.txt, subdir/file2.txt
455        assert!(files.len() >= 2); // At least the files, maybe the directory too
456
457        assert!(files.iter().any(|f| f.name == "file1.txt"));
458        assert!(files.iter().any(|f| f.name == "file2.txt"));
459    }
460
461    #[tokio::test(flavor = "multi_thread")]
462    async fn test_sqlite_integration() {
463        use rusqlite::Connection;
464
465        let temp_dir = TempDir::new().unwrap();
466        let temp_path = temp_dir.path();
467
468        // Create test files
469        fs::write(temp_path.join("large.txt"), "x".repeat(10000)).unwrap();
470        fs::write(temp_path.join("small.txt"), "tiny").unwrap();
471        fs::write(temp_path.join("medium.txt"), "medium content").unwrap();
472
473        // Open SQLite connection and register virtual table
474        let conn = Connection::open_in_memory().unwrap();
475        register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
476
477        // Query all files
478        let mut stmt = conn
479            .prepare("SELECT name, size, is_dir FROM local_files ORDER BY name")
480            .unwrap();
481
482        let files: Vec<(String, i64, bool)> = stmt
483            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
484            .unwrap()
485            .collect::<std::result::Result<Vec<_>, _>>()
486            .unwrap();
487
488        assert_eq!(files.len(), 3);
489        assert!(files.iter().any(|(name, _, _)| name == "large.txt"));
490        assert!(files.iter().any(|(name, _, _)| name == "small.txt"));
491        assert!(files.iter().any(|(name, _, _)| name == "medium.txt"));
492
493        // Query with WHERE clause
494        let mut stmt = conn
495            .prepare("SELECT name FROM local_files WHERE size > 100")
496            .unwrap();
497
498        let large_files: Vec<String> = stmt
499            .query_map([], |row| row.get(0))
500            .unwrap()
501            .collect::<std::result::Result<Vec<_>, _>>()
502            .unwrap();
503
504        // Only large.txt should be returned
505        assert_eq!(large_files.len(), 1);
506        assert_eq!(large_files[0], "large.txt");
507    }
508
509    #[tokio::test(flavor = "multi_thread")]
510    async fn test_sqlite_count_and_aggregate() {
511        use rusqlite::Connection;
512
513        let temp_dir = TempDir::new().unwrap();
514        let temp_path = temp_dir.path();
515
516        // Create multiple files
517        for i in 1..=5 {
518            fs::write(temp_path.join(format!("file{}.txt", i)), format!("content{}", i))
519                .unwrap();
520        }
521
522        let conn = Connection::open_in_memory().unwrap();
523        register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
524
525        // Test COUNT
526        let count: i64 = conn
527            .query_row("SELECT COUNT(*) FROM local_files", [], |row| row.get(0))
528            .unwrap();
529        assert_eq!(count, 5);
530
531        // Test SUM of sizes
532        let total_size: i64 = conn
533            .query_row("SELECT SUM(size) FROM local_files", [], |row| row.get(0))
534            .unwrap();
535        assert!(total_size > 0);
536
537        // Test ORDER BY
538        let first_file: String = conn
539            .query_row(
540                "SELECT name FROM local_files ORDER BY name LIMIT 1",
541                [],
542                |row| row.get(0),
543            )
544            .unwrap();
545        assert_eq!(first_file, "file1.txt");
546    }
547}