Skip to main content

sqlite_vtable_opendal/backends/
local_fs.rs

1//! Local filesystem backend implementation
2//!
3//! This backend allows querying local directories using SQL.
4//! It's useful for file discovery, auditing, and testing.
5
6use crate::backends::StorageBackend;
7use crate::error::Result;
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::Fs, EntryMode, Metakey, Operator};
12use std::path::Path;
13
14/// Local filesystem storage backend
15///
16/// This backend uses OpenDAL's Fs service to list files from local directories.
17///
18/// # Example
19///
20/// ```rust,ignore
21/// use sqlite_vtable_opendal::backends::local_fs::LocalFsBackend;
22/// use sqlite_vtable_opendal::types::QueryConfig;
23///
24/// let backend = LocalFsBackend::new("/path/to/directory");
25/// let config = QueryConfig::default();
26/// let files = backend.list_files(&config).await?;
27/// ```
28pub struct LocalFsBackend {
29    /// Root directory path
30    root_path: String,
31}
32
33impl LocalFsBackend {
34    /// Create a new local filesystem backend
35    ///
36    /// # Arguments
37    ///
38    /// * `root_path` - The root directory to query from
39    ///
40    /// # Example
41    ///
42    /// ```
43    /// use sqlite_vtable_opendal::backends::local_fs::LocalFsBackend;
44    ///
45    /// let backend = LocalFsBackend::new("/tmp");
46    /// ```
47    pub fn new(root_path: impl Into<String>) -> Self {
48        Self {
49            root_path: root_path.into(),
50        }
51    }
52
53    /// Create an OpenDAL operator for the local filesystem
54    fn create_operator(&self) -> Result<Operator> {
55        let builder = Fs::default().root(&self.root_path);
56
57        Ok(Operator::new(builder)?.finish())
58    }
59}
60
61#[async_trait]
62impl StorageBackend for LocalFsBackend {
63    async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
64        let operator = self.create_operator()?;
65        let mut results = Vec::new();
66
67        // Normalize the path
68        let normalized_path = if config.root_path == "/" || config.root_path.is_empty() {
69            "".to_string()
70        } else {
71            config.root_path.trim_matches('/').to_string()
72        };
73
74        // Create lister with metadata keys
75        let lister_builder = operator.lister_with(&normalized_path);
76
77        let mut lister = lister_builder
78            .recursive(config.recursive)
79            .metakey(
80                Metakey::ContentLength
81                    | Metakey::ContentType
82                    | Metakey::Mode
83                    | Metakey::LastModified,
84            )
85            .await
86            ?;
87
88        // Iterate through entries
89        while let Some(entry) = lister.try_next().await? {
90            let entry_path = entry.path();
91            let entry_mode = entry.metadata().mode();
92
93            // Skip the root directory itself
94            if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
95                continue;
96            }
97
98            let full_path = if entry_path.starts_with('/') {
99                entry_path.to_string()
100            } else {
101                format!("/{}", entry_path)
102            };
103
104            // Extract file name from path
105            let name = Path::new(&full_path)
106                .file_name()
107                .map(|s| s.to_string_lossy().to_string())
108                .unwrap_or_else(|| {
109                    let clean_path = entry_path.trim_end_matches('/');
110                    Path::new(clean_path)
111                        .file_name()
112                        .map(|s| s.to_string_lossy().to_string())
113                        .unwrap_or_default()
114                });
115
116            if entry_mode == EntryMode::FILE {
117                // Fetch detailed metadata for files
118                let metadata = operator
119                    .stat(&full_path)
120                    .await
121                    ?;
122
123                // Optionally fetch content
124                let content = if config.fetch_content {
125                    operator
126                        .read(&full_path)
127                        .await
128                        .ok()
129                        .map(|bytes| bytes.to_vec())
130                } else {
131                    None
132                };
133
134                results.push(FileMetadata {
135                    name,
136                    path: full_path.clone(),
137                    size: metadata.content_length(),
138                    last_modified: metadata.last_modified().map(|dt| dt.to_string()),
139                    etag: metadata.etag().map(|e| e.to_string()),
140                    is_dir: false,
141                    content_type: Path::new(&full_path)
142                        .extension()
143                        .and_then(|ext| ext.to_str())
144                        .map(|ext| ext.to_string()),
145                    content,
146                });
147
148                // Apply limit if specified
149                if let Some(limit) = config.limit {
150                    if results.len() >= limit + config.offset {
151                        break;
152                    }
153                }
154            } else if entry_mode == EntryMode::DIR {
155                // Add directory entry
156                results.push(FileMetadata {
157                    name,
158                    path: full_path,
159                    size: 0,
160                    last_modified: None,
161                    etag: None,
162                    is_dir: true,
163                    content_type: Some("directory".to_string()),
164                    content: None,
165                });
166
167                // Apply limit if specified
168                if let Some(limit) = config.limit {
169                    if results.len() >= limit + config.offset {
170                        break;
171                    }
172                }
173            }
174        }
175
176        // Apply offset
177        if config.offset > 0 && config.offset < results.len() {
178            results = results.into_iter().skip(config.offset).collect();
179        }
180
181        Ok(results)
182    }
183
184    fn backend_name(&self) -> &'static str {
185        "local_fs"
186    }
187}
188
189/// Register the local_fs virtual table with SQLite
190///
191/// This function creates a virtual table module that allows querying
192/// local directories using SQL.
193///
194/// # Arguments
195///
196/// * `conn` - SQLite connection
197/// * `module_name` - Name for the virtual table (e.g., "local_files")
198/// * `root_path` - Root directory to query
199///
200/// # Example
201///
202/// ```rust,ignore
203/// use rusqlite::Connection;
204/// use sqlite_vtable_opendal::backends::local_fs;
205///
206/// let conn = Connection::open_in_memory()?;
207/// local_fs::register(&conn, "local_files", "/tmp")?;
208///
209/// // Now you can query: SELECT * FROM local_files
210/// ```
211pub fn register(
212    conn: &rusqlite::Connection,
213    module_name: &str,
214    root_path: impl Into<String>,
215) -> rusqlite::Result<()> {
216    use crate::types::{columns, QueryConfig};
217    use rusqlite::{
218        ffi,
219        vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
220    };
221    use std::os::raw::c_int;
222
223    let root = root_path.into();
224
225    // Create a specific table type for local_fs
226    #[repr(C)]
227    struct LocalFsTable {
228        base: ffi::sqlite3_vtab,
229        root_path: String,
230    }
231
232    // Create a specific cursor type for local_fs
233    #[repr(C)]
234    struct LocalFsCursor {
235        base: ffi::sqlite3_vtab_cursor,
236        files: Vec<crate::types::FileMetadata>,
237        current_row: usize,
238        root_path: String,
239    }
240
241    impl LocalFsCursor {
242        fn new(root_path: String) -> Self {
243            Self {
244                base: ffi::sqlite3_vtab_cursor::default(),
245                files: Vec::new(),
246                current_row: 0,
247                root_path,
248            }
249        }
250    }
251
252    unsafe impl VTabCursor for LocalFsCursor {
253        fn filter(
254            &mut self,
255            _idx_num: c_int,
256            _idx_str: Option<&str>,
257            _args: &vtab::Values<'_>,
258        ) -> rusqlite::Result<()> {
259            // Create backend and fetch files
260            let backend = LocalFsBackend::new(&self.root_path);
261            let config = QueryConfig::default();
262
263            // Fetch files from the backend (blocking the async call)
264            let files = tokio::task::block_in_place(|| {
265                tokio::runtime::Handle::current().block_on(async {
266                    backend.list_files(&config).await
267                })
268            })
269            .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
270
271            self.files = files;
272            self.current_row = 0;
273            Ok(())
274        }
275
276        fn next(&mut self) -> rusqlite::Result<()> {
277            self.current_row += 1;
278            Ok(())
279        }
280
281        fn eof(&self) -> bool {
282            self.current_row >= self.files.len()
283        }
284
285        fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
286            if self.current_row >= self.files.len() {
287                return Ok(());
288            }
289
290            let file = &self.files[self.current_row];
291
292            match col_index {
293                columns::PATH => ctx.set_result(&file.path),
294                columns::SIZE => ctx.set_result(&(file.size as i64)),
295                columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
296                columns::ETAG => ctx.set_result(&file.etag),
297                columns::IS_DIR => ctx.set_result(&file.is_dir),
298                columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
299                columns::NAME => ctx.set_result(&file.name),
300                columns::CONTENT => {
301                    if let Some(ref content) = file.content {
302                        ctx.set_result(&content.as_slice())
303                    } else {
304                        ctx.set_result::<Option<&[u8]>>(&None)
305                    }
306                }
307                _ => Ok(()),
308            }
309        }
310
311        fn rowid(&self) -> rusqlite::Result<i64> {
312            Ok(self.current_row as i64)
313        }
314    }
315
316    impl vtab::CreateVTab<'_> for LocalFsTable {
317        const KIND: VTabKind = VTabKind::EponymousOnly;
318    }
319
320    unsafe impl VTab<'_> for LocalFsTable {
321        type Aux = String;
322        type Cursor = LocalFsCursor;
323
324        fn connect(
325            _db: &mut vtab::VTabConnection,
326            aux: Option<&Self::Aux>,
327            _args: &[&[u8]],
328        ) -> rusqlite::Result<(String, Self)> {
329            let schema = "
330                CREATE TABLE x(
331                    path TEXT,
332                    size INTEGER,
333                    last_modified TEXT,
334                    etag TEXT,
335                    is_dir INTEGER,
336                    content_type TEXT,
337                    name TEXT,
338                    content BLOB
339                )
340            ";
341
342            let root_path = aux.cloned().unwrap_or_else(|| "/".to_string());
343
344            Ok((
345                schema.to_owned(),
346                LocalFsTable {
347                    base: ffi::sqlite3_vtab::default(),
348                    root_path,
349                },
350            ))
351        }
352
353        fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
354            info.set_estimated_cost(100.0);
355            Ok(())
356        }
357
358        fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
359            Ok(LocalFsCursor::new(self.root_path.clone()))
360        }
361    }
362
363    conn.create_module(module_name, eponymous_only_module::<LocalFsTable>(), Some(root))
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use std::fs;
370    use tempfile::TempDir;
371
372    #[tokio::test]
373    async fn test_list_empty_directory() {
374        let temp_dir = TempDir::new().unwrap();
375        let backend = LocalFsBackend::new(temp_dir.path().to_str().unwrap());
376        let config = QueryConfig::default();
377
378        let files = backend.list_files(&config).await.unwrap();
379        assert!(files.is_empty());
380    }
381
382    #[tokio::test]
383    async fn test_list_files() {
384        let temp_dir = TempDir::new().unwrap();
385        let temp_path = temp_dir.path();
386
387        // Create test files
388        fs::write(temp_path.join("file1.txt"), "content1").unwrap();
389        fs::write(temp_path.join("file2.txt"), "content2").unwrap();
390
391        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
392        let config = QueryConfig::default();
393
394        let files = backend.list_files(&config).await.unwrap();
395        assert_eq!(files.len(), 2);
396
397        // Check that files are listed
398        assert!(files.iter().any(|f| f.name == "file1.txt"));
399        assert!(files.iter().any(|f| f.name == "file2.txt"));
400    }
401
402    #[tokio::test]
403    async fn test_file_metadata() {
404        let temp_dir = TempDir::new().unwrap();
405        let temp_path = temp_dir.path();
406
407        fs::write(temp_path.join("test.txt"), "hello world").unwrap();
408
409        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
410        let config = QueryConfig::default();
411
412        let files = backend.list_files(&config).await.unwrap();
413        assert_eq!(files.len(), 1);
414
415        let file = &files[0];
416        assert_eq!(file.name, "test.txt");
417        assert_eq!(file.size, 11); // "hello world" is 11 bytes
418        assert!(!file.is_dir);
419        assert_eq!(file.content_type, Some("txt".to_string()));
420        assert!(file.content.is_none()); // Not fetched by default
421    }
422
423    #[tokio::test]
424    async fn test_fetch_content() {
425        let temp_dir = TempDir::new().unwrap();
426        let temp_path = temp_dir.path();
427
428        fs::write(temp_path.join("test.txt"), "hello world").unwrap();
429
430        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
431        let config = QueryConfig {
432            fetch_content: true,
433            ..Default::default()
434        };
435
436        let files = backend.list_files(&config).await.unwrap();
437        assert_eq!(files.len(), 1);
438
439        let file = &files[0];
440        assert!(file.content.is_some());
441        assert_eq!(file.content.as_ref().unwrap(), b"hello world");
442    }
443
444    #[tokio::test]
445    async fn test_recursive_listing() {
446        let temp_dir = TempDir::new().unwrap();
447        let temp_path = temp_dir.path();
448
449        // Create nested structure
450        fs::create_dir(temp_path.join("subdir")).unwrap();
451        fs::write(temp_path.join("file1.txt"), "content1").unwrap();
452        fs::write(temp_path.join("subdir/file2.txt"), "content2").unwrap();
453
454        let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
455        let config = QueryConfig {
456            recursive: true,
457            ..Default::default()
458        };
459
460        let files = backend.list_files(&config).await.unwrap();
461
462        // Should have: subdir (directory), file1.txt, subdir/file2.txt
463        assert!(files.len() >= 2); // At least the files, maybe the directory too
464
465        assert!(files.iter().any(|f| f.name == "file1.txt"));
466        assert!(files.iter().any(|f| f.name == "file2.txt"));
467    }
468
469    #[tokio::test(flavor = "multi_thread")]
470    async fn test_sqlite_integration() {
471        use rusqlite::Connection;
472
473        let temp_dir = TempDir::new().unwrap();
474        let temp_path = temp_dir.path();
475
476        // Create test files
477        fs::write(temp_path.join("large.txt"), "x".repeat(10000)).unwrap();
478        fs::write(temp_path.join("small.txt"), "tiny").unwrap();
479        fs::write(temp_path.join("medium.txt"), "medium content").unwrap();
480
481        // Open SQLite connection and register virtual table
482        let conn = Connection::open_in_memory().unwrap();
483        register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
484
485        // Query all files
486        let mut stmt = conn
487            .prepare("SELECT name, size, is_dir FROM local_files ORDER BY name")
488            .unwrap();
489
490        let files: Vec<(String, i64, bool)> = stmt
491            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
492            .unwrap()
493            .collect::<std::result::Result<Vec<_>, _>>()
494            .unwrap();
495
496        assert_eq!(files.len(), 3);
497        assert!(files.iter().any(|(name, _, _)| name == "large.txt"));
498        assert!(files.iter().any(|(name, _, _)| name == "small.txt"));
499        assert!(files.iter().any(|(name, _, _)| name == "medium.txt"));
500
501        // Query with WHERE clause
502        let mut stmt = conn
503            .prepare("SELECT name FROM local_files WHERE size > 100")
504            .unwrap();
505
506        let large_files: Vec<String> = stmt
507            .query_map([], |row| row.get(0))
508            .unwrap()
509            .collect::<std::result::Result<Vec<_>, _>>()
510            .unwrap();
511
512        // Only large.txt should be returned
513        assert_eq!(large_files.len(), 1);
514        assert_eq!(large_files[0], "large.txt");
515    }
516
517    #[tokio::test(flavor = "multi_thread")]
518    async fn test_sqlite_count_and_aggregate() {
519        use rusqlite::Connection;
520
521        let temp_dir = TempDir::new().unwrap();
522        let temp_path = temp_dir.path();
523
524        // Create multiple files
525        for i in 1..=5 {
526            fs::write(temp_path.join(format!("file{}.txt", i)), format!("content{}", i))
527                .unwrap();
528        }
529
530        let conn = Connection::open_in_memory().unwrap();
531        register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
532
533        // Test COUNT
534        let count: i64 = conn
535            .query_row("SELECT COUNT(*) FROM local_files", [], |row| row.get(0))
536            .unwrap();
537        assert_eq!(count, 5);
538
539        // Test SUM of sizes
540        let total_size: i64 = conn
541            .query_row("SELECT SUM(size) FROM local_files", [], |row| row.get(0))
542            .unwrap();
543        assert!(total_size > 0);
544
545        // Test ORDER BY
546        let first_file: String = conn
547            .query_row(
548                "SELECT name FROM local_files ORDER BY name LIMIT 1",
549                [],
550                |row| row.get(0),
551            )
552            .unwrap();
553        assert_eq!(first_file, "file1.txt");
554    }
555}