1use std::collections::HashMap;
26use std::fmt;
27use std::future::Future;
28use std::io;
29use std::path::{Path, PathBuf};
30use std::pin::Pin;
31use std::sync::RwLock;
32
33use bytes::Bytes;
34use tokio::io::{AsyncRead, AsyncReadExt};
35
36#[derive(Debug)]
42pub enum VfsError {
43 NotFound(String),
45 Io(io::Error),
47 InvalidPath(String),
50}
51
52impl fmt::Display for VfsError {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 match self {
55 VfsError::NotFound(p) => write!(f, "blob not found: {p}"),
56 VfsError::Io(e) => write!(f, "vfs I/O error: {e}"),
57 VfsError::InvalidPath(p) => write!(f, "invalid blob path: {p}"),
58 }
59 }
60}
61
62impl std::error::Error for VfsError {
63 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
64 match self {
65 VfsError::Io(e) => Some(e),
66 _ => None,
67 }
68 }
69}
70
71impl From<io::Error> for VfsError {
72 fn from(e: io::Error) -> Self {
73 VfsError::Io(e)
74 }
75}
76
77#[derive(Debug, Clone)]
83pub struct VfsStat {
84 pub size: u64,
86}
87
88#[derive(Debug, Clone)]
90pub struct VfsEntry {
91 pub key: String,
93 pub size: u64,
95}
96
97pub trait Vfs: Send + Sync {
110 fn put<'a>(
114 &'a self,
115 path: &'a str,
116 data: &'a mut (dyn AsyncRead + Unpin + Send),
117 ) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>>;
118
119 fn get<'a>(
123 &'a self,
124 path: &'a str,
125 ) -> Pin<
126 Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
127 >;
128
129 fn delete<'a>(
133 &'a self,
134 path: &'a str,
135 ) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>>;
136
137 fn list<'a>(
141 &'a self,
142 prefix: &'a str,
143 ) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>>;
144
145 fn stat<'a>(
147 &'a self,
148 path: &'a str,
149 ) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>>;
150}
151
152pub(crate) fn validate_path(path: &str) -> Result<(), VfsError> {
163 if path.contains('\0') {
164 return Err(VfsError::InvalidPath(path.to_string()));
165 }
166 let clean = path.trim_start_matches('/');
167 for component in Path::new(clean).components() {
168 if component == std::path::Component::ParentDir {
169 return Err(VfsError::InvalidPath(path.to_string()));
170 }
171 }
172 Ok(())
173}
174
175#[derive(Debug, Default)]
189pub struct MemVfs {
190 data: RwLock<HashMap<String, Bytes>>,
191}
192
193impl MemVfs {
194 pub fn new() -> Self {
195 Self::default()
196 }
197}
198
199impl Vfs for MemVfs {
200 fn put<'a>(
201 &'a self,
202 path: &'a str,
203 data: &'a mut (dyn AsyncRead + Unpin + Send),
204 ) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
205 if let Err(e) = validate_path(path) {
206 return Box::pin(async { Err(e) });
207 }
208 Box::pin(async move {
209 let mut buf = Vec::new();
210 let n = data.read_to_end(&mut buf).await.map_err(VfsError::Io)? as u64;
211 self.data
212 .write()
213 .unwrap()
214 .insert(path.to_string(), Bytes::from(buf));
215 Ok(n)
216 })
217 }
218
219 fn get<'a>(
220 &'a self,
221 path: &'a str,
222 ) -> Pin<
223 Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
224 > {
225 if let Err(e) = validate_path(path) {
226 return Box::pin(async { Err(e) });
227 }
228 Box::pin(async move {
229 let data = self
230 .data
231 .read()
232 .unwrap()
233 .get(path)
234 .cloned()
235 .ok_or_else(|| VfsError::NotFound(path.to_string()))?;
236 Ok(Box::new(io::Cursor::new(data)) as Box<dyn AsyncRead + Send + Unpin>)
237 })
238 }
239
240 fn delete<'a>(
241 &'a self,
242 path: &'a str,
243 ) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
244 if let Err(e) = validate_path(path) {
245 return Box::pin(async { Err(e) });
246 }
247 Box::pin(async move {
248 if self.data.write().unwrap().remove(path).is_none() {
249 return Err(VfsError::NotFound(path.to_string()));
250 }
251 Ok(())
252 })
253 }
254
255 fn list<'a>(
256 &'a self,
257 prefix: &'a str,
258 ) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>> {
259 if let Err(e) = validate_path(prefix) {
260 return Box::pin(async { Err(e) });
261 }
262 Box::pin(async move {
263 let guard = self.data.read().unwrap();
264 let mut entries: Vec<VfsEntry> = guard
265 .iter()
266 .filter(|(k, _)| k.starts_with(prefix))
267 .map(|(k, v)| VfsEntry {
268 key: k.clone(),
269 size: v.len() as u64,
270 })
271 .collect();
272 entries.sort_by(|a, b| a.key.cmp(&b.key));
273 Ok(entries)
274 })
275 }
276
277 fn stat<'a>(
278 &'a self,
279 path: &'a str,
280 ) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>> {
281 if let Err(e) = validate_path(path) {
282 return Box::pin(async { Err(e) });
283 }
284 Box::pin(async move {
285 Ok(self.data.read().unwrap().get(path).map(|v| VfsStat {
286 size: v.len() as u64,
287 }))
288 })
289 }
290}
291
292#[derive(Debug)]
311pub struct FsVfs {
312 base_dir: PathBuf,
313}
314
315impl FsVfs {
316 pub fn new(base_dir: impl AsRef<Path>) -> io::Result<Self> {
320 let base_dir = base_dir.as_ref().to_path_buf();
321 std::fs::create_dir_all(&base_dir)?;
322 Ok(Self { base_dir })
323 }
324
325 fn full_path(&self, path: &str) -> Result<PathBuf, VfsError> {
326 validate_path(path)?;
327 Ok(self.base_dir.join(path.trim_start_matches('/')))
328 }
329}
330
331impl Vfs for FsVfs {
332 fn put<'a>(
333 &'a self,
334 path: &'a str,
335 data: &'a mut (dyn AsyncRead + Unpin + Send),
336 ) -> Pin<Box<dyn Future<Output = Result<u64, VfsError>> + Send + 'a>> {
337 match self.full_path(path) {
338 Err(e) => Box::pin(async { Err(e) }),
339 Ok(fp) => Box::pin(async move {
340 if let Some(parent) = fp.parent() {
341 tokio::fs::create_dir_all(parent).await?;
342 }
343 let mut file = tokio::fs::File::create(&fp).await?;
344 let n = tokio::io::copy(data, &mut file).await?;
345 Ok(n)
346 }),
347 }
348 }
349
350 fn get<'a>(
351 &'a self,
352 path: &'a str,
353 ) -> Pin<
354 Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + 'a>,
355 > {
356 match self.full_path(path) {
357 Err(e) => Box::pin(async { Err(e) }),
358 Ok(fp) => {
359 let path = path.to_string();
360 Box::pin(async move {
361 match tokio::fs::File::open(&fp).await {
362 Ok(f) => Ok(Box::new(f) as Box<dyn AsyncRead + Send + Unpin>),
363 Err(e) if e.kind() == io::ErrorKind::NotFound => {
364 Err(VfsError::NotFound(path))
365 }
366 Err(e) => Err(VfsError::Io(e)),
367 }
368 })
369 }
370 }
371 }
372
373 fn delete<'a>(
374 &'a self,
375 path: &'a str,
376 ) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + 'a>> {
377 match self.full_path(path) {
378 Err(e) => Box::pin(async { Err(e) }),
379 Ok(fp) => {
380 let path = path.to_string();
381 Box::pin(async move {
382 match tokio::fs::remove_file(&fp).await {
383 Ok(()) => Ok(()),
384 Err(e) if e.kind() == io::ErrorKind::NotFound => {
385 Err(VfsError::NotFound(path))
386 }
387 Err(e) => Err(VfsError::Io(e)),
388 }
389 })
390 }
391 }
392 }
393
394 fn list<'a>(
395 &'a self,
396 prefix: &'a str,
397 ) -> Pin<Box<dyn Future<Output = Result<Vec<VfsEntry>, VfsError>> + Send + 'a>> {
398 if let Err(e) = validate_path(prefix) {
399 return Box::pin(async { Err(e) });
400 }
401 let clean_prefix = prefix.trim_start_matches('/');
402 let search_dir = if clean_prefix.is_empty() {
405 self.base_dir.clone()
406 } else {
407 let candidate = self.base_dir.join(clean_prefix);
408 if candidate.is_dir() {
409 candidate
410 } else {
411 candidate.parent().unwrap_or(&self.base_dir).to_path_buf()
416 }
417 };
418 let base_str = self.base_dir.to_string_lossy().into_owned();
419 let prefix_owned = clean_prefix.to_string();
420 Box::pin(async move {
421 tokio::task::spawn_blocking(move || -> Result<Vec<VfsEntry>, VfsError> {
422 let mut entries = Vec::new();
423 collect_files(&search_dir, &base_str, &mut entries)?;
424 entries.retain(|e| e.key.starts_with(&prefix_owned));
426 entries.sort_by(|a, b| a.key.cmp(&b.key));
427 Ok(entries)
428 })
429 .await
430 .map_err(|e| VfsError::Io(io::Error::other(e.to_string())))?
431 })
432 }
433
434 fn stat<'a>(
435 &'a self,
436 path: &'a str,
437 ) -> Pin<Box<dyn Future<Output = Result<Option<VfsStat>, VfsError>> + Send + 'a>> {
438 match self.full_path(path) {
439 Err(e) => Box::pin(async { Err(e) }),
440 Ok(fp) => Box::pin(async move {
441 match tokio::fs::metadata(&fp).await {
442 Ok(m) => Ok(Some(VfsStat { size: m.len() })),
443 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
444 Err(e) => Err(VfsError::Io(e)),
445 }
446 }),
447 }
448 }
449}
450
451fn collect_files(dir: &Path, base: &str, out: &mut Vec<VfsEntry>) -> Result<(), VfsError> {
453 if !dir.exists() {
454 return Ok(());
455 }
456 for entry in std::fs::read_dir(dir)? {
457 let entry = entry?;
458 let ft = entry.file_type()?;
459 let path = entry.path();
460 if ft.is_dir() {
461 collect_files(&path, base, out)?;
462 } else if ft.is_file() {
463 let full = path.to_string_lossy();
464 let key = if let Some(stripped) = full.strip_prefix(base) {
465 stripped.trim_start_matches(['/', '\\']).to_string()
466 } else {
467 full.into_owned()
468 };
469 let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
470 out.push(VfsEntry { key, size });
471 }
472 }
473 Ok(())
474}
475
476#[cfg(test)]
481mod tests {
482 use super::*;
483
484 async fn roundtrip(vfs: &dyn Vfs) {
485 let data = b"hello world";
486
487 vfs.put("ns/a/b", &mut data.as_ref()).await.unwrap();
489 let mut buf = Vec::new();
490 vfs.get("ns/a/b")
491 .await
492 .unwrap()
493 .read_to_end(&mut buf)
494 .await
495 .unwrap();
496 assert_eq!(buf, data);
497
498 let stat = vfs.stat("ns/a/b").await.unwrap().unwrap();
500 assert_eq!(stat.size, 11);
501
502 assert!(vfs.stat("ns/a/missing").await.unwrap().is_none());
504
505 vfs.put("ns/a/c", &mut b"other".as_ref()).await.unwrap();
507 let entries = vfs.list("ns/a/").await.unwrap();
508 let keys: Vec<&str> = entries.iter().map(|e| e.key.as_str()).collect();
509 assert!(keys.contains(&"ns/a/b"), "expected ns/a/b in {keys:?}");
510 assert!(keys.contains(&"ns/a/c"), "expected ns/a/c in {keys:?}");
511
512 vfs.delete("ns/a/b").await.unwrap();
514 assert!(matches!(
515 vfs.get("ns/a/b").await,
516 Err(VfsError::NotFound(_))
517 ));
518
519 assert!(matches!(
521 vfs.delete("ns/a/missing").await,
522 Err(VfsError::NotFound(_))
523 ));
524 }
525
526 #[tokio::test]
527 async fn mem_vfs_roundtrip() {
528 let vfs = MemVfs::new();
529 roundtrip(&vfs).await;
530 }
531
532 #[tokio::test]
533 async fn fs_vfs_roundtrip() {
534 let dir = tempfile::tempdir().unwrap();
535 let vfs = FsVfs::new(dir.path()).unwrap();
536 roundtrip(&vfs).await;
537 }
538
539 async fn rejects_path_traversal(vfs: &dyn Vfs) {
540 let traversal_paths = ["../escape", "a/../../escape", "a/../../../etc/passwd"];
541 for path in &traversal_paths {
542 assert!(
543 matches!(
544 vfs.put(path, &mut b"x".as_ref()).await,
545 Err(VfsError::InvalidPath(_))
546 ),
547 "put({path}) should be rejected"
548 );
549 assert!(
550 matches!(vfs.get(path).await, Err(VfsError::InvalidPath(_))),
551 "get({path}) should be rejected"
552 );
553 assert!(
554 matches!(vfs.delete(path).await, Err(VfsError::InvalidPath(_))),
555 "delete({path}) should be rejected"
556 );
557 assert!(
558 matches!(vfs.list(path).await, Err(VfsError::InvalidPath(_))),
559 "list({path}) should be rejected"
560 );
561 assert!(
562 matches!(vfs.stat(path).await, Err(VfsError::InvalidPath(_))),
563 "stat({path}) should be rejected"
564 );
565 }
566 }
567
568 #[tokio::test]
569 async fn mem_vfs_rejects_path_traversal() {
570 let vfs = MemVfs::new();
571 rejects_path_traversal(&vfs).await;
572 }
573
574 #[tokio::test]
575 async fn fs_vfs_rejects_path_traversal() {
576 let dir = tempfile::tempdir().unwrap();
577 let vfs = FsVfs::new(dir.path()).unwrap();
578 rejects_path_traversal(&vfs).await;
579 }
580}