sqlite_vtable_opendal/backends/
local_fs.rs1use crate::backends::StorageBackend;
7use crate::error::Result;
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::Fs, EntryMode, Operator};
12use std::path::Path;
13
14pub struct LocalFsBackend {
29 root_path: String,
31}
32
33impl LocalFsBackend {
34 pub fn new(root_path: impl Into<String>) -> Self {
48 Self {
49 root_path: root_path.into(),
50 }
51 }
52
53 fn create_operator(&self) -> Result<Operator> {
55 let builder = Fs::default().root(&self.root_path);
56
57 Ok(Operator::new(builder)?.finish())
58 }
59}
60
61#[async_trait]
62impl StorageBackend for LocalFsBackend {
63 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
64 let operator = self.create_operator()?;
65 let mut results = Vec::new();
66
67 let normalized_path = if config.root_path == "/" || config.root_path.is_empty() {
69 "".to_string()
70 } else {
71 config.root_path.trim_matches('/').to_string()
72 };
73
74 let mut lister = operator
76 .lister_with(&normalized_path)
77 .recursive(config.recursive)
78 .await?;
79
80 while let Some(entry) = lister.try_next().await? {
82 let entry_path = entry.path();
83 let entry_mode = entry.metadata().mode();
84
85 if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
87 continue;
88 }
89
90 let full_path = if entry_path.starts_with('/') {
91 entry_path.to_string()
92 } else {
93 format!("/{}", entry_path)
94 };
95
96 let name = Path::new(&full_path)
98 .file_name()
99 .map(|s| s.to_string_lossy().to_string())
100 .unwrap_or_else(|| {
101 let clean_path = entry_path.trim_end_matches('/');
102 Path::new(clean_path)
103 .file_name()
104 .map(|s| s.to_string_lossy().to_string())
105 .unwrap_or_default()
106 });
107
108 if entry_mode == EntryMode::FILE {
109 let metadata = operator
111 .stat(&full_path)
112 .await
113 ?;
114
115 let content = if config.fetch_content {
117 operator
118 .read(&full_path)
119 .await
120 .ok()
121 .map(|bytes| bytes.to_vec())
122 } else {
123 None
124 };
125
126 results.push(FileMetadata {
127 name,
128 path: full_path.clone(),
129 size: metadata.content_length(),
130 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
131 etag: metadata.etag().map(|e| e.to_string()),
132 is_dir: false,
133 content_type: Path::new(&full_path)
134 .extension()
135 .and_then(|ext| ext.to_str())
136 .map(|ext| ext.to_string()),
137 content,
138 });
139
140 if let Some(limit) = config.limit {
142 if results.len() >= limit + config.offset {
143 break;
144 }
145 }
146 } else if entry_mode == EntryMode::DIR {
147 results.push(FileMetadata {
149 name,
150 path: full_path,
151 size: 0,
152 last_modified: None,
153 etag: None,
154 is_dir: true,
155 content_type: Some("directory".to_string()),
156 content: None,
157 });
158
159 if let Some(limit) = config.limit {
161 if results.len() >= limit + config.offset {
162 break;
163 }
164 }
165 }
166 }
167
168 if config.offset > 0 && config.offset < results.len() {
170 results = results.into_iter().skip(config.offset).collect();
171 }
172
173 Ok(results)
174 }
175
176 fn backend_name(&self) -> &'static str {
177 "local_fs"
178 }
179}
180
181pub fn register(
204 conn: &rusqlite::Connection,
205 module_name: &str,
206 root_path: impl Into<String>,
207) -> rusqlite::Result<()> {
208 use crate::types::{columns, QueryConfig};
209 use rusqlite::{
210 ffi,
211 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
212 };
213 use std::os::raw::c_int;
214
215 let root = root_path.into();
216
217 #[repr(C)]
219 struct LocalFsTable {
220 base: ffi::sqlite3_vtab,
221 root_path: String,
222 }
223
224 #[repr(C)]
226 struct LocalFsCursor {
227 base: ffi::sqlite3_vtab_cursor,
228 files: Vec<crate::types::FileMetadata>,
229 current_row: usize,
230 root_path: String,
231 }
232
233 impl LocalFsCursor {
234 fn new(root_path: String) -> Self {
235 Self {
236 base: ffi::sqlite3_vtab_cursor::default(),
237 files: Vec::new(),
238 current_row: 0,
239 root_path,
240 }
241 }
242 }
243
244 unsafe impl VTabCursor for LocalFsCursor {
245 fn filter(
246 &mut self,
247 _idx_num: c_int,
248 _idx_str: Option<&str>,
249 _args: &vtab::Filters<'_>,
250 ) -> rusqlite::Result<()> {
251 let backend = LocalFsBackend::new(&self.root_path);
253 let config = QueryConfig::default();
254
255 let files = tokio::task::block_in_place(|| {
257 tokio::runtime::Handle::current().block_on(async {
258 backend.list_files(&config).await
259 })
260 })
261 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
262
263 self.files = files;
264 self.current_row = 0;
265 Ok(())
266 }
267
268 fn next(&mut self) -> rusqlite::Result<()> {
269 self.current_row += 1;
270 Ok(())
271 }
272
273 fn eof(&self) -> bool {
274 self.current_row >= self.files.len()
275 }
276
277 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
278 if self.current_row >= self.files.len() {
279 return Ok(());
280 }
281
282 let file = &self.files[self.current_row];
283
284 match col_index {
285 columns::PATH => ctx.set_result(&file.path),
286 columns::SIZE => ctx.set_result(&(file.size as i64)),
287 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
288 columns::ETAG => ctx.set_result(&file.etag),
289 columns::IS_DIR => ctx.set_result(&file.is_dir),
290 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
291 columns::NAME => ctx.set_result(&file.name),
292 columns::CONTENT => {
293 if let Some(ref content) = file.content {
294 ctx.set_result(&content.as_slice())
295 } else {
296 ctx.set_result::<Option<&[u8]>>(&None)
297 }
298 }
299 _ => Ok(()),
300 }
301 }
302
303 fn rowid(&self) -> rusqlite::Result<i64> {
304 Ok(self.current_row as i64)
305 }
306 }
307
308 impl vtab::CreateVTab<'_> for LocalFsTable {
309 const KIND: VTabKind = VTabKind::EponymousOnly;
310 }
311
312 unsafe impl VTab<'_> for LocalFsTable {
313 type Aux = String;
314 type Cursor = LocalFsCursor;
315
316 fn connect(
317 _db: &mut vtab::VTabConnection,
318 aux: Option<&Self::Aux>,
319 _args: &[&[u8]],
320 ) -> rusqlite::Result<(String, Self)> {
321 let schema = "
322 CREATE TABLE x(
323 path TEXT,
324 size INTEGER,
325 last_modified TEXT,
326 etag TEXT,
327 is_dir INTEGER,
328 content_type TEXT,
329 name TEXT,
330 content BLOB
331 )
332 ";
333
334 let root_path = aux.cloned().unwrap_or_else(|| "/".to_string());
335
336 Ok((
337 schema.to_owned(),
338 LocalFsTable {
339 base: ffi::sqlite3_vtab::default(),
340 root_path,
341 },
342 ))
343 }
344
345 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
346 info.set_estimated_cost(100.0);
347 Ok(())
348 }
349
350 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
351 Ok(LocalFsCursor::new(self.root_path.clone()))
352 }
353 }
354
355 conn.create_module(module_name, eponymous_only_module::<LocalFsTable>(), Some(root))
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use std::fs;
362 use tempfile::TempDir;
363
364 #[tokio::test]
365 async fn test_list_empty_directory() {
366 let temp_dir = TempDir::new().unwrap();
367 let backend = LocalFsBackend::new(temp_dir.path().to_str().unwrap());
368 let config = QueryConfig::default();
369
370 let files = backend.list_files(&config).await.unwrap();
371 assert!(files.is_empty());
372 }
373
374 #[tokio::test]
375 async fn test_list_files() {
376 let temp_dir = TempDir::new().unwrap();
377 let temp_path = temp_dir.path();
378
379 fs::write(temp_path.join("file1.txt"), "content1").unwrap();
381 fs::write(temp_path.join("file2.txt"), "content2").unwrap();
382
383 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
384 let config = QueryConfig::default();
385
386 let files = backend.list_files(&config).await.unwrap();
387 assert_eq!(files.len(), 2);
388
389 assert!(files.iter().any(|f| f.name == "file1.txt"));
391 assert!(files.iter().any(|f| f.name == "file2.txt"));
392 }
393
394 #[tokio::test]
395 async fn test_file_metadata() {
396 let temp_dir = TempDir::new().unwrap();
397 let temp_path = temp_dir.path();
398
399 fs::write(temp_path.join("test.txt"), "hello world").unwrap();
400
401 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
402 let config = QueryConfig::default();
403
404 let files = backend.list_files(&config).await.unwrap();
405 assert_eq!(files.len(), 1);
406
407 let file = &files[0];
408 assert_eq!(file.name, "test.txt");
409 assert_eq!(file.size, 11); assert!(!file.is_dir);
411 assert_eq!(file.content_type, Some("txt".to_string()));
412 assert!(file.content.is_none()); }
414
415 #[tokio::test]
416 async fn test_fetch_content() {
417 let temp_dir = TempDir::new().unwrap();
418 let temp_path = temp_dir.path();
419
420 fs::write(temp_path.join("test.txt"), "hello world").unwrap();
421
422 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
423 let config = QueryConfig {
424 fetch_content: true,
425 ..Default::default()
426 };
427
428 let files = backend.list_files(&config).await.unwrap();
429 assert_eq!(files.len(), 1);
430
431 let file = &files[0];
432 assert!(file.content.is_some());
433 assert_eq!(file.content.as_ref().unwrap(), b"hello world");
434 }
435
436 #[tokio::test]
437 async fn test_recursive_listing() {
438 let temp_dir = TempDir::new().unwrap();
439 let temp_path = temp_dir.path();
440
441 fs::create_dir(temp_path.join("subdir")).unwrap();
443 fs::write(temp_path.join("file1.txt"), "content1").unwrap();
444 fs::write(temp_path.join("subdir/file2.txt"), "content2").unwrap();
445
446 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
447 let config = QueryConfig {
448 recursive: true,
449 ..Default::default()
450 };
451
452 let files = backend.list_files(&config).await.unwrap();
453
454 assert!(files.len() >= 2); assert!(files.iter().any(|f| f.name == "file1.txt"));
458 assert!(files.iter().any(|f| f.name == "file2.txt"));
459 }
460
461 #[tokio::test(flavor = "multi_thread")]
462 async fn test_sqlite_integration() {
463 use rusqlite::Connection;
464
465 let temp_dir = TempDir::new().unwrap();
466 let temp_path = temp_dir.path();
467
468 fs::write(temp_path.join("large.txt"), "x".repeat(10000)).unwrap();
470 fs::write(temp_path.join("small.txt"), "tiny").unwrap();
471 fs::write(temp_path.join("medium.txt"), "medium content").unwrap();
472
473 let conn = Connection::open_in_memory().unwrap();
475 register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
476
477 let mut stmt = conn
479 .prepare("SELECT name, size, is_dir FROM local_files ORDER BY name")
480 .unwrap();
481
482 let files: Vec<(String, i64, bool)> = stmt
483 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
484 .unwrap()
485 .collect::<std::result::Result<Vec<_>, _>>()
486 .unwrap();
487
488 assert_eq!(files.len(), 3);
489 assert!(files.iter().any(|(name, _, _)| name == "large.txt"));
490 assert!(files.iter().any(|(name, _, _)| name == "small.txt"));
491 assert!(files.iter().any(|(name, _, _)| name == "medium.txt"));
492
493 let mut stmt = conn
495 .prepare("SELECT name FROM local_files WHERE size > 100")
496 .unwrap();
497
498 let large_files: Vec<String> = stmt
499 .query_map([], |row| row.get(0))
500 .unwrap()
501 .collect::<std::result::Result<Vec<_>, _>>()
502 .unwrap();
503
504 assert_eq!(large_files.len(), 1);
506 assert_eq!(large_files[0], "large.txt");
507 }
508
509 #[tokio::test(flavor = "multi_thread")]
510 async fn test_sqlite_count_and_aggregate() {
511 use rusqlite::Connection;
512
513 let temp_dir = TempDir::new().unwrap();
514 let temp_path = temp_dir.path();
515
516 for i in 1..=5 {
518 fs::write(temp_path.join(format!("file{}.txt", i)), format!("content{}", i))
519 .unwrap();
520 }
521
522 let conn = Connection::open_in_memory().unwrap();
523 register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
524
525 let count: i64 = conn
527 .query_row("SELECT COUNT(*) FROM local_files", [], |row| row.get(0))
528 .unwrap();
529 assert_eq!(count, 5);
530
531 let total_size: i64 = conn
533 .query_row("SELECT SUM(size) FROM local_files", [], |row| row.get(0))
534 .unwrap();
535 assert!(total_size > 0);
536
537 let first_file: String = conn
539 .query_row(
540 "SELECT name FROM local_files ORDER BY name LIMIT 1",
541 [],
542 |row| row.get(0),
543 )
544 .unwrap();
545 assert_eq!(first_file, "file1.txt");
546 }
547}