sqlite_vtable_opendal/backends/
local_fs.rs1use crate::backends::StorageBackend;
7use crate::error::Result;
8use crate::types::{FileMetadata, QueryConfig};
9use async_trait::async_trait;
10use futures_util::TryStreamExt;
11use opendal::{services::Fs, EntryMode, Metakey, Operator};
12use std::path::Path;
13
14pub struct LocalFsBackend {
29 root_path: String,
31}
32
33impl LocalFsBackend {
34 pub fn new(root_path: impl Into<String>) -> Self {
48 Self {
49 root_path: root_path.into(),
50 }
51 }
52
53 fn create_operator(&self) -> Result<Operator> {
55 let builder = Fs::default().root(&self.root_path);
56
57 Ok(Operator::new(builder)?.finish())
58 }
59}
60
61#[async_trait]
62impl StorageBackend for LocalFsBackend {
63 async fn list_files(&self, config: &QueryConfig) -> Result<Vec<FileMetadata>> {
64 let operator = self.create_operator()?;
65 let mut results = Vec::new();
66
67 let normalized_path = if config.root_path == "/" || config.root_path.is_empty() {
69 "".to_string()
70 } else {
71 config.root_path.trim_matches('/').to_string()
72 };
73
74 let lister_builder = operator.lister_with(&normalized_path);
76
77 let mut lister = lister_builder
78 .recursive(config.recursive)
79 .metakey(
80 Metakey::ContentLength
81 | Metakey::ContentType
82 | Metakey::Mode
83 | Metakey::LastModified,
84 )
85 .await
86 ?;
87
88 while let Some(entry) = lister.try_next().await? {
90 let entry_path = entry.path();
91 let entry_mode = entry.metadata().mode();
92
93 if entry_path.is_empty() || entry_path == "/" || entry_path == "." {
95 continue;
96 }
97
98 let full_path = if entry_path.starts_with('/') {
99 entry_path.to_string()
100 } else {
101 format!("/{}", entry_path)
102 };
103
104 let name = Path::new(&full_path)
106 .file_name()
107 .map(|s| s.to_string_lossy().to_string())
108 .unwrap_or_else(|| {
109 let clean_path = entry_path.trim_end_matches('/');
110 Path::new(clean_path)
111 .file_name()
112 .map(|s| s.to_string_lossy().to_string())
113 .unwrap_or_default()
114 });
115
116 if entry_mode == EntryMode::FILE {
117 let metadata = operator
119 .stat(&full_path)
120 .await
121 ?;
122
123 let content = if config.fetch_content {
125 operator
126 .read(&full_path)
127 .await
128 .ok()
129 .map(|bytes| bytes.to_vec())
130 } else {
131 None
132 };
133
134 results.push(FileMetadata {
135 name,
136 path: full_path.clone(),
137 size: metadata.content_length(),
138 last_modified: metadata.last_modified().map(|dt| dt.to_string()),
139 etag: metadata.etag().map(|e| e.to_string()),
140 is_dir: false,
141 content_type: Path::new(&full_path)
142 .extension()
143 .and_then(|ext| ext.to_str())
144 .map(|ext| ext.to_string()),
145 content,
146 });
147
148 if let Some(limit) = config.limit {
150 if results.len() >= limit + config.offset {
151 break;
152 }
153 }
154 } else if entry_mode == EntryMode::DIR {
155 results.push(FileMetadata {
157 name,
158 path: full_path,
159 size: 0,
160 last_modified: None,
161 etag: None,
162 is_dir: true,
163 content_type: Some("directory".to_string()),
164 content: None,
165 });
166
167 if let Some(limit) = config.limit {
169 if results.len() >= limit + config.offset {
170 break;
171 }
172 }
173 }
174 }
175
176 if config.offset > 0 && config.offset < results.len() {
178 results = results.into_iter().skip(config.offset).collect();
179 }
180
181 Ok(results)
182 }
183
184 fn backend_name(&self) -> &'static str {
185 "local_fs"
186 }
187}
188
189pub fn register(
212 conn: &rusqlite::Connection,
213 module_name: &str,
214 root_path: impl Into<String>,
215) -> rusqlite::Result<()> {
216 use crate::types::{columns, QueryConfig};
217 use rusqlite::{
218 ffi,
219 vtab::{self, eponymous_only_module, IndexInfo, VTab, VTabCursor, VTabKind},
220 };
221 use std::os::raw::c_int;
222
223 let root = root_path.into();
224
225 #[repr(C)]
227 struct LocalFsTable {
228 base: ffi::sqlite3_vtab,
229 root_path: String,
230 }
231
232 #[repr(C)]
234 struct LocalFsCursor {
235 base: ffi::sqlite3_vtab_cursor,
236 files: Vec<crate::types::FileMetadata>,
237 current_row: usize,
238 root_path: String,
239 }
240
241 impl LocalFsCursor {
242 fn new(root_path: String) -> Self {
243 Self {
244 base: ffi::sqlite3_vtab_cursor::default(),
245 files: Vec::new(),
246 current_row: 0,
247 root_path,
248 }
249 }
250 }
251
252 unsafe impl VTabCursor for LocalFsCursor {
253 fn filter(
254 &mut self,
255 _idx_num: c_int,
256 _idx_str: Option<&str>,
257 _args: &vtab::Values<'_>,
258 ) -> rusqlite::Result<()> {
259 let backend = LocalFsBackend::new(&self.root_path);
261 let config = QueryConfig::default();
262
263 let files = tokio::task::block_in_place(|| {
265 tokio::runtime::Handle::current().block_on(async {
266 backend.list_files(&config).await
267 })
268 })
269 .map_err(|e| rusqlite::Error::ModuleError(e.to_string()))?;
270
271 self.files = files;
272 self.current_row = 0;
273 Ok(())
274 }
275
276 fn next(&mut self) -> rusqlite::Result<()> {
277 self.current_row += 1;
278 Ok(())
279 }
280
281 fn eof(&self) -> bool {
282 self.current_row >= self.files.len()
283 }
284
285 fn column(&self, ctx: &mut vtab::Context, col_index: c_int) -> rusqlite::Result<()> {
286 if self.current_row >= self.files.len() {
287 return Ok(());
288 }
289
290 let file = &self.files[self.current_row];
291
292 match col_index {
293 columns::PATH => ctx.set_result(&file.path),
294 columns::SIZE => ctx.set_result(&(file.size as i64)),
295 columns::LAST_MODIFIED => ctx.set_result(&file.last_modified),
296 columns::ETAG => ctx.set_result(&file.etag),
297 columns::IS_DIR => ctx.set_result(&file.is_dir),
298 columns::CONTENT_TYPE => ctx.set_result(&file.content_type),
299 columns::NAME => ctx.set_result(&file.name),
300 columns::CONTENT => {
301 if let Some(ref content) = file.content {
302 ctx.set_result(&content.as_slice())
303 } else {
304 ctx.set_result::<Option<&[u8]>>(&None)
305 }
306 }
307 _ => Ok(()),
308 }
309 }
310
311 fn rowid(&self) -> rusqlite::Result<i64> {
312 Ok(self.current_row as i64)
313 }
314 }
315
316 impl vtab::CreateVTab<'_> for LocalFsTable {
317 const KIND: VTabKind = VTabKind::EponymousOnly;
318 }
319
320 unsafe impl VTab<'_> for LocalFsTable {
321 type Aux = String;
322 type Cursor = LocalFsCursor;
323
324 fn connect(
325 _db: &mut vtab::VTabConnection,
326 aux: Option<&Self::Aux>,
327 _args: &[&[u8]],
328 ) -> rusqlite::Result<(String, Self)> {
329 let schema = "
330 CREATE TABLE x(
331 path TEXT,
332 size INTEGER,
333 last_modified TEXT,
334 etag TEXT,
335 is_dir INTEGER,
336 content_type TEXT,
337 name TEXT,
338 content BLOB
339 )
340 ";
341
342 let root_path = aux.cloned().unwrap_or_else(|| "/".to_string());
343
344 Ok((
345 schema.to_owned(),
346 LocalFsTable {
347 base: ffi::sqlite3_vtab::default(),
348 root_path,
349 },
350 ))
351 }
352
353 fn best_index(&self, info: &mut IndexInfo) -> rusqlite::Result<()> {
354 info.set_estimated_cost(100.0);
355 Ok(())
356 }
357
358 fn open(&mut self) -> rusqlite::Result<Self::Cursor> {
359 Ok(LocalFsCursor::new(self.root_path.clone()))
360 }
361 }
362
363 conn.create_module(module_name, eponymous_only_module::<LocalFsTable>(), Some(root))
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use std::fs;
370 use tempfile::TempDir;
371
372 #[tokio::test]
373 async fn test_list_empty_directory() {
374 let temp_dir = TempDir::new().unwrap();
375 let backend = LocalFsBackend::new(temp_dir.path().to_str().unwrap());
376 let config = QueryConfig::default();
377
378 let files = backend.list_files(&config).await.unwrap();
379 assert!(files.is_empty());
380 }
381
382 #[tokio::test]
383 async fn test_list_files() {
384 let temp_dir = TempDir::new().unwrap();
385 let temp_path = temp_dir.path();
386
387 fs::write(temp_path.join("file1.txt"), "content1").unwrap();
389 fs::write(temp_path.join("file2.txt"), "content2").unwrap();
390
391 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
392 let config = QueryConfig::default();
393
394 let files = backend.list_files(&config).await.unwrap();
395 assert_eq!(files.len(), 2);
396
397 assert!(files.iter().any(|f| f.name == "file1.txt"));
399 assert!(files.iter().any(|f| f.name == "file2.txt"));
400 }
401
402 #[tokio::test]
403 async fn test_file_metadata() {
404 let temp_dir = TempDir::new().unwrap();
405 let temp_path = temp_dir.path();
406
407 fs::write(temp_path.join("test.txt"), "hello world").unwrap();
408
409 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
410 let config = QueryConfig::default();
411
412 let files = backend.list_files(&config).await.unwrap();
413 assert_eq!(files.len(), 1);
414
415 let file = &files[0];
416 assert_eq!(file.name, "test.txt");
417 assert_eq!(file.size, 11); assert!(!file.is_dir);
419 assert_eq!(file.content_type, Some("txt".to_string()));
420 assert!(file.content.is_none()); }
422
423 #[tokio::test]
424 async fn test_fetch_content() {
425 let temp_dir = TempDir::new().unwrap();
426 let temp_path = temp_dir.path();
427
428 fs::write(temp_path.join("test.txt"), "hello world").unwrap();
429
430 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
431 let config = QueryConfig {
432 fetch_content: true,
433 ..Default::default()
434 };
435
436 let files = backend.list_files(&config).await.unwrap();
437 assert_eq!(files.len(), 1);
438
439 let file = &files[0];
440 assert!(file.content.is_some());
441 assert_eq!(file.content.as_ref().unwrap(), b"hello world");
442 }
443
444 #[tokio::test]
445 async fn test_recursive_listing() {
446 let temp_dir = TempDir::new().unwrap();
447 let temp_path = temp_dir.path();
448
449 fs::create_dir(temp_path.join("subdir")).unwrap();
451 fs::write(temp_path.join("file1.txt"), "content1").unwrap();
452 fs::write(temp_path.join("subdir/file2.txt"), "content2").unwrap();
453
454 let backend = LocalFsBackend::new(temp_path.to_str().unwrap());
455 let config = QueryConfig {
456 recursive: true,
457 ..Default::default()
458 };
459
460 let files = backend.list_files(&config).await.unwrap();
461
462 assert!(files.len() >= 2); assert!(files.iter().any(|f| f.name == "file1.txt"));
466 assert!(files.iter().any(|f| f.name == "file2.txt"));
467 }
468
469 #[tokio::test(flavor = "multi_thread")]
470 async fn test_sqlite_integration() {
471 use rusqlite::Connection;
472
473 let temp_dir = TempDir::new().unwrap();
474 let temp_path = temp_dir.path();
475
476 fs::write(temp_path.join("large.txt"), "x".repeat(10000)).unwrap();
478 fs::write(temp_path.join("small.txt"), "tiny").unwrap();
479 fs::write(temp_path.join("medium.txt"), "medium content").unwrap();
480
481 let conn = Connection::open_in_memory().unwrap();
483 register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
484
485 let mut stmt = conn
487 .prepare("SELECT name, size, is_dir FROM local_files ORDER BY name")
488 .unwrap();
489
490 let files: Vec<(String, i64, bool)> = stmt
491 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
492 .unwrap()
493 .collect::<std::result::Result<Vec<_>, _>>()
494 .unwrap();
495
496 assert_eq!(files.len(), 3);
497 assert!(files.iter().any(|(name, _, _)| name == "large.txt"));
498 assert!(files.iter().any(|(name, _, _)| name == "small.txt"));
499 assert!(files.iter().any(|(name, _, _)| name == "medium.txt"));
500
501 let mut stmt = conn
503 .prepare("SELECT name FROM local_files WHERE size > 100")
504 .unwrap();
505
506 let large_files: Vec<String> = stmt
507 .query_map([], |row| row.get(0))
508 .unwrap()
509 .collect::<std::result::Result<Vec<_>, _>>()
510 .unwrap();
511
512 assert_eq!(large_files.len(), 1);
514 assert_eq!(large_files[0], "large.txt");
515 }
516
517 #[tokio::test(flavor = "multi_thread")]
518 async fn test_sqlite_count_and_aggregate() {
519 use rusqlite::Connection;
520
521 let temp_dir = TempDir::new().unwrap();
522 let temp_path = temp_dir.path();
523
524 for i in 1..=5 {
526 fs::write(temp_path.join(format!("file{}.txt", i)), format!("content{}", i))
527 .unwrap();
528 }
529
530 let conn = Connection::open_in_memory().unwrap();
531 register(&conn, "local_files", temp_path.to_str().unwrap()).unwrap();
532
533 let count: i64 = conn
535 .query_row("SELECT COUNT(*) FROM local_files", [], |row| row.get(0))
536 .unwrap();
537 assert_eq!(count, 5);
538
539 let total_size: i64 = conn
541 .query_row("SELECT SUM(size) FROM local_files", [], |row| row.get(0))
542 .unwrap();
543 assert!(total_size > 0);
544
545 let first_file: String = conn
547 .query_row(
548 "SELECT name FROM local_files ORDER BY name LIMIT 1",
549 [],
550 |row| row.get(0),
551 )
552 .unwrap();
553 assert_eq!(first_file, "file1.txt");
554 }
555}