1use crate::traits::StorageBackend;
8use crate::{Result, StreamError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use std::path::{Path, PathBuf};
12
13#[derive(Debug, Clone)]
19pub struct FsStorage {
20 root: PathBuf,
21}
22
23impl FsStorage {
24 pub fn new(root: impl Into<PathBuf>) -> Self {
26 Self { root: root.into() }
27 }
28
29 fn resolve(&self, key: &str) -> Result<PathBuf> {
31 if key.starts_with('/') || key.split('/').any(|seg| seg == "..") {
33 return Err(StreamError::storage(format!("invalid key: {key}")));
34 }
35 Ok(self.root.join(key))
36 }
37}
38
39#[async_trait]
40impl StorageBackend for FsStorage {
41 async fn put(&self, key: &str, data: Bytes) -> Result<()> {
42 let path = self.resolve(key)?;
43 if let Some(parent) = path.parent() {
44 tokio::fs::create_dir_all(parent)
45 .await
46 .map_err(|e| StreamError::storage(format!("mkdir {parent:?}: {e}")))?;
47 }
48 tokio::fs::write(&path, &data)
49 .await
50 .map_err(|e| StreamError::storage(format!("write {path:?}: {e}")))
51 }
52
53 async fn get(&self, key: &str) -> Result<Bytes> {
54 let path = self.resolve(key)?;
55 match tokio::fs::read(&path).await {
56 Ok(data) => Ok(Bytes::from(data)),
57 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
58 Err(StreamError::StorageNotFound(key.to_string()))
59 }
60 Err(e) => Err(StreamError::storage(format!("read {path:?}: {e}"))),
61 }
62 }
63
64 async fn delete(&self, key: &str) -> Result<()> {
65 let path = self.resolve(key)?;
66 match tokio::fs::remove_file(&path).await {
67 Ok(()) => Ok(()),
68 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
69 Err(e) => Err(StreamError::storage(format!("delete {path:?}: {e}"))),
70 }
71 }
72
73 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
74 let mut out = Vec::new();
75 let mut stack = vec![self.root.clone()];
76 while let Some(dir) = stack.pop() {
77 let mut entries = match tokio::fs::read_dir(&dir).await {
78 Ok(e) => e,
79 Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
80 Err(e) => return Err(StreamError::storage(format!("readdir {dir:?}: {e}"))),
81 };
82 while let Some(entry) = entries
83 .next_entry()
84 .await
85 .map_err(|e| StreamError::storage(format!("readdir entry: {e}")))?
86 {
87 let path = entry.path();
88 if path.is_dir() {
89 stack.push(path);
90 } else if let Some(rel) =
91 path.strip_prefix(&self.root).ok().and_then(|p| p.to_str())
92 {
93 let key = rel.replace('\\', "/");
94 if key.starts_with(prefix) {
95 out.push(key);
96 }
97 }
98 }
99 }
100 Ok(out)
101 }
102
103 async fn put_file(&self, key: &str, path: &Path) -> Result<()> {
104 let dest = self.resolve(key)?;
105 if let Some(parent) = dest.parent() {
106 tokio::fs::create_dir_all(parent)
107 .await
108 .map_err(|e| StreamError::storage(format!("mkdir {parent:?}: {e}")))?;
109 }
110 match tokio::fs::rename(path, &dest).await {
112 Ok(()) => Ok(()),
113 Err(_) => {
114 tokio::fs::copy(path, &dest)
115 .await
116 .map_err(|e| StreamError::storage(format!("copy to {dest:?}: {e}")))?;
117 tokio::fs::remove_file(path).await.ok();
118 Ok(())
119 }
120 }
121 }
122}