sqlite_vtable_opendal/backends/
local_fs.rs1use crate::backends::StorageBackend;
7use crate::error::{Result, VTableError};
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::Fs, EntryMode, Metakey, Operator};
12use std::path::Path;
13
14pub struct LocalFsBackend {
29 root_path: String,
31}
32
33impl LocalFsBackend {
34 pub fn new(root_path: impl Into<String>) -> Self {
48 Self {
49 root_path: root_path.into(),
50 }
51 }
52
53 fn create_operator(&self) -> Result<Operator> {
55 let builder = Fs::default().root(&self.root_path);
56
57 Operator::new(builder)
58 .map(|op| op.finish())
59 .map_err(|e| VTableError::OpenDal(e))
60 }
61}
62
63#[async_trait]
64impl StorageBackend for LocalFsBackend {
65 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
66 let operator = self.create_operator()?;
67 let mut results = Vec::new();
68
69 let normalized_path = if config.root_path == "/" || config.root_path.is_empty() {
71 "".to_string()
72 } else {
73 config.root_path.trim_matches('/').to_string()
74 };
75
76 let lister_builder = operator.lister_with(&normalized_path);
78
79 let mut lister = lister_builder
80 .recursive(config.recursive)
81 .metakey(
82 Metakey::ContentLength
83 | Metakey::ContentType
84 | Metakey::Mode
85 | Metakey::LastModified,
86 )
87 .await
88 .map_err(|e| VTableError::OpenDal(e))?;
89
90 while let Some(entry) = lister.try_next().await.map_err(|e| VTableError::OpenDal(e))? {
92 let entry_path = entry.path();
93 let entry_mode = entry.metadata().mode();
94
95 if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
97 continue;
98 }
99
100 let full_path = if entry_path.starts_with('/') {
101 entry_path.to_string()
102 } else {
103 format!("/{}", entry_path)
104 };
105
106 let name = Path::new(&full_path)
108 .file_name()
109 .map(|s| s.to_string_lossy().to_string())
110 .unwrap_or_else(|| {
111 let clean_path = entry_path.trim_end_matches('/');
112 Path::new(clean_path)
113 .file_name()
114 .map(|s| s.to_string_lossy().to_string())
115 .unwrap_or_default()
116 });
117
118 if entry_mode == EntryMode::FILE {
119 let metadata = operator
121 .stat(&full_path)
122 .await
123 .map_err(|e| VTableError::OpenDal(e))?;
124
125 let content = if config.fetch_content {
127 operator
128 .read(&full_path)
129 .await
130 .ok()
131 .map(|bytes| bytes.to_vec())
132 } else {
133 None
134 };
135
136 results.push(FileMetadata {
137 name,
138 path: full_path.clone(),
139 size: metadata.content_length(),
140 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
141 etag: metadata.etag().map(|e| e.to_string()),
142 is_dir: false,
143 content_type: Path::new(&full_path)
144 .extension()
145 .and_then(|ext| ext.to_str())
146 .map(|ext| ext.to_string()),
147 content,
148 });
149
150 if let Some(limit) = config.limit {
152 if results.len() >= limit + config.offset {
153 break;
154 }
155 }
156 } else if entry_mode == EntryMode::DIR {
157 results.push(FileMetadata {
159 name,
160 path: full_path,
161 size: 0,
162 last_modified: None,
163 etag: None,
164 is_dir: true,
165 content_type: Some("directory".to_string()),
166 content: None,
167 });
168
169 if let Some(limit) = config.limit {
171 if results.len() >= limit + config.offset {
172 break;
173 }
174 }
175 }
176 }
177
178 if config.offset > 0 && config.offset < results.len() {
180 results = results.into_iter().skip(config.offset).collect();
181 }
182
183 Ok(results)
184 }
185
186 fn backend_name(&self) -> &'static str {
187 "local_fs"
188 }
189}
190
191pub fn register(
214 conn: &rusqlite::Connection,
215 module_name: &str,
216 root_path: impl Into<String>,
217) -> rusqlite::Result<()> {
218 use crate::types::{columns, QueryConfig};
219 use rusqlite::{
220 ffi,
221 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
222 };
223 use std::os::raw::c_int;
224
225 let root = root_path.into();
226
227 #[repr(C)]
229 struct LocalFsTable {
230 base: ffi::sqlite3_vtab,
231 root_path: String,
232 }
233
234 #[repr(C)]
236 struct LocalFsCursor {
237 base: ffi::sqlite3_vtab_cursor,
238 files: Vec<crate::types::FileMetadata>,
239 current_row: usize,
240 root_path: String,
241 }
242
243 impl LocalFsCursor {
244 fn new(root_path: String) -> Self {
245 Self {
246 base: ffi::sqlite3_vtab_cursor::default(),
247 files: Vec::new(),
248 current_row: 0,
249 root_path,
250 }
251 }
252 }
253
254 unsafe impl VTabCursor for LocalFsCursor {
255 fn filter(
256 &mut self,
257 _idx_num: c_int,
258 _idx_str: Option<&str>,
259 _args: &vtab::Values<'_>,
260 ) -> rusqlite::Result<()> {
261 let backend = LocalFsBackend::new(&self.root_path);
263 let config = QueryConfig::default();
264
265 let files = tokio::task::block_in_place(|| {
267 tokio::runtime::Handle::current().block_on(async {
268 backend.list_files(&config).await
269 })
270 })
271 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
272
273 self.files = files;
274 self.current_row = 0;
275 Ok(())
276 }
277
278 fn next(&mut self) -> rusqlite::Result<()> {
279 self.current_row += 1;
280 Ok(())
281 }
282
283 fn eof(&self) -> bool {
284 self.current_row >= self.files.len()
285 }
286
287 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
288 if self.current_row >= self.files.len() {
289 return Ok(());
290 }
291
292 let file = &self.files[self.current_row];
293
294 match col_index {
295 columns::PATH => ctx.set_result(&file.path),
296 columns::SIZE => ctx.set_result(&(file.size as i64)),
297 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
298 columns::ETAG => ctx.set_result(&file.etag),
299 columns::IS_DIR => ctx.set_result(&file.is_dir),
300 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
301 columns::NAME => ctx.set_result(&file.name),
302 columns::CONTENT => {
303 if let Some(ref content) = file.content {
304 ctx.set_result(&content.as_slice())
305 } else {
306 ctx.set_result::<Option<&[u8]>>(&None)
307 }
308 }
309 _ => Ok(()),
310 }
311 }
312
313 fn rowid(&self) -> rusqlite::Result<i64> {
314 Ok(self.current_row as i64)
315 }
316 }
317
318 impl vtab::CreateVTab<'_> for LocalFsTable {
319 const KIND: VTabKind = VTabKind::EponymousOnly;
320 }
321
322 unsafe impl VTab<'_> for LocalFsTable {
323 type Aux = String;
324 type Cursor = LocalFsCursor;
325
326 fn connect(
327 _db: &mut vtab::VTabConnection,
328 aux: Option<&Self::Aux>,
329 _args: &[&[u8]],
330 ) -> rusqlite::Result<(String, Self)> {
331 let schema = "
332 CREATE TABLE x(
333 path TEXT,
334 size INTEGER,
335 last_modified TEXT,
336 etag TEXT,
337 is_dir INTEGER,
338 content_type TEXT,
339 name TEXT,
340 content BLOB
341 )
342 ";
343
344 let root_path = aux.cloned().unwrap_or_else(|| "/".to_string());
345
346 Ok((
347 schema.to_owned(),
348 LocalFsTable {
349 base: ffi::sqlite3_vtab::default(),
350 root_path,
351 },
352 ))
353 }
354
355 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
356 info.set_estimated_cost(100.0);
357 Ok(())
358 }
359
360 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
361 Ok(LocalFsCursor::new(self.root_path.clone()))
362 }
363 }
364
365 conn.create_module(module_name, eponymous_only_module::<LocalFsTable>(), Some(root))
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use std::fs;
372 use tempfile::TempDir;
373
374 #[tokio::test]
375 async fn test_list_empty_directory() {
376 let temp_dir = TempDir::new().unwrap();
377 let backend = LocalFsBackend::new(temp_dir.path().to_str().unwrap());
378 let config = QueryConfig::default();
379
380 let files = backend.list_files(&config).await.unwrap();
381 assert!(files.is_empty());
382 }
383
384 #[tokio::test]
385 async fn test_list_files() {
386 let temp_dir = TempDir::new().unwrap();
387 let temp_path = temp_dir.path();
388
389 fs::write(temp_path.join("file1.txt"), "content1").unwrap();
391 fs::write(temp_path.join("file2.txt"), "content2").unwrap();
392
393 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
394 let config = QueryConfig::default();
395
396 let files = backend.list_files(&config).await.unwrap();
397 assert_eq!(files.len(), 2);
398
399 assert!(files.iter().any(|f| f.name == "file1.txt"));
401 assert!(files.iter().any(|f| f.name == "file2.txt"));
402 }
403
404 #[tokio::test]
405 async fn test_file_metadata() {
406 let temp_dir = TempDir::new().unwrap();
407 let temp_path = temp_dir.path();
408
409 fs::write(temp_path.join("test.txt"), "hello world").unwrap();
410
411 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
412 let config = QueryConfig::default();
413
414 let files = backend.list_files(&config).await.unwrap();
415 assert_eq!(files.len(), 1);
416
417 let file = &files[0];
418 assert_eq!(file.name, "test.txt");
419 assert_eq!(file.size, 11); assert!(!file.is_dir);
421 assert_eq!(file.content_type, Some("txt".to_string()));
422 assert!(file.content.is_none()); }
424
425 #[tokio::test]
426 async fn test_fetch_content() {
427 let temp_dir = TempDir::new().unwrap();
428 let temp_path = temp_dir.path();
429
430 fs::write(temp_path.join("test.txt"), "hello world").unwrap();
431
432 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
433 let config = QueryConfig {
434 fetch_content: true,
435 ..Default::default()
436 };
437
438 let files = backend.list_files(&config).await.unwrap();
439 assert_eq!(files.len(), 1);
440
441 let file = &files[0];
442 assert!(file.content.is_some());
443 assert_eq!(file.content.as_ref().unwrap(), b"hello world");
444 }
445
446 #[tokio::test]
447 async fn test_recursive_listing() {
448 let temp_dir = TempDir::new().unwrap();
449 let temp_path = temp_dir.path();
450
451 fs::create_dir(temp_path.join("subdir")).unwrap();
453 fs::write(temp_path.join("file1.txt"), "content1").unwrap();
454 fs::write(temp_path.join("subdir/file2.txt"), "content2").unwrap();
455
456 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
457 let config = QueryConfig {
458 recursive: true,
459 ..Default::default()
460 };
461
462 let files = backend.list_files(&config).await.unwrap();
463
464 assert!(files.len() >= 2); assert!(files.iter().any(|f| f.name == "file1.txt"));
468 assert!(files.iter().any(|f| f.name == "file2.txt"));
469 }
470
471 #[tokio::test(flavor = "multi_thread")]
472 async fn test_sqlite_integration() {
473 use rusqlite::Connection;
474
475 let temp_dir = TempDir::new().unwrap();
476 let temp_path = temp_dir.path();
477
478 fs::write(temp_path.join("large.txt"), "x".repeat(10000)).unwrap();
480 fs::write(temp_path.join("small.txt"), "tiny").unwrap();
481 fs::write(temp_path.join("medium.txt"), "medium content").unwrap();
482
483 let conn = Connection::open_in_memory().unwrap();
485 register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
486
487 let mut stmt = conn
489 .prepare("SELECT name, size, is_dir FROM local_files ORDER BY name")
490 .unwrap();
491
492 let files: Vec<(String, i64, bool)> = stmt
493 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
494 .unwrap()
495 .collect::<std::result::Result<Vec<_>, _>>()
496 .unwrap();
497
498 assert_eq!(files.len(), 3);
499 assert!(files.iter().any(|(name, _, _)| name == "large.txt"));
500 assert!(files.iter().any(|(name, _, _)| name == "small.txt"));
501 assert!(files.iter().any(|(name, _, _)| name == "medium.txt"));
502
503 let mut stmt = conn
505 .prepare("SELECT name FROM local_files WHERE size > 100")
506 .unwrap();
507
508 let large_files: Vec<String> = stmt
509 .query_map([], |row| row.get(0))
510 .unwrap()
511 .collect::<std::result::Result<Vec<_>, _>>()
512 .unwrap();
513
514 assert_eq!(large_files.len(), 1);
516 assert_eq!(large_files[0], "large.txt");
517 }
518
519 #[tokio::test(flavor = "multi_thread")]
520 async fn test_sqlite_count_and_aggregate() {
521 use rusqlite::Connection;
522
523 let temp_dir = TempDir::new().unwrap();
524 let temp_path = temp_dir.path();
525
526 for i in 1..=5 {
528 fs::write(temp_path.join(format!("file{}.txt", i)), format!("content{}", i))
529 .unwrap();
530 }
531
532 let conn = Connection::open_in_memory().unwrap();
533 register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
534
535 let count: i64 = conn
537 .query_row("SELECT COUNT(*) FROM local_files", [], |row| row.get(0))
538 .unwrap();
539 assert_eq!(count, 5);
540
541 let total_size: i64 = conn
543 .query_row("SELECT SUM(size) FROM local_files", [], |row| row.get(0))
544 .unwrap();
545 assert!(total_size > 0);
546
547 let first_file: String = conn
549 .query_row(
550 "SELECT name FROM local_files ORDER BY name LIMIT 1",
551 [],
552 |row| row.get(0),
553 )
554 .unwrap();
555 assert_eq!(first_file, "file1.txt");
556 }
557}