1use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12use sha2::{Digest, Sha256};
13use tokio::fs;
14use tokio::sync::mpsc;
15
16use crate::error::PodError;
17use crate::storage::{ResourceMeta, Storage, StorageEvent};
18
19const META_SUFFIX: &str = ".meta.json";
20
21#[derive(Clone)]
23pub struct FsBackend {
24 root: Arc<PathBuf>,
25}
26
27#[derive(serde::Serialize, serde::Deserialize)]
28struct MetaSidecar {
29 content_type: String,
30 #[serde(default)]
31 links: Vec<String>,
32}
33
34impl FsBackend {
35 pub async fn new(root: impl Into<PathBuf>) -> Result<Self, PodError> {
38 let root: PathBuf = root.into();
39 fs::create_dir_all(&root).await?;
40 Ok(Self {
41 root: Arc::new(root),
42 })
43 }
44
45 pub fn root(&self) -> &Path {
47 &self.root
48 }
49
50 fn normalize(path: &str) -> Result<String, PodError> {
51 let p = if path.is_empty() {
52 "/".to_string()
53 } else if path.starts_with('/') {
54 path.to_string()
55 } else {
56 format!("/{path}")
57 };
58 if p.contains("..") || p.contains('\0') {
59 return Err(PodError::InvalidPath(p));
60 }
61 Ok(p)
62 }
63
64 fn resolve(&self, path: &str) -> Result<PathBuf, PodError> {
65 let norm = Self::normalize(path)?;
66 let rel = norm.trim_start_matches('/');
67 let full = self.root.join(rel);
68 if !full.starts_with(self.root.as_path()) {
69 return Err(PodError::InvalidPath(norm));
70 }
71 Ok(full)
72 }
73
74 fn meta_path(data_path: &Path) -> PathBuf {
75 let mut p = data_path.as_os_str().to_owned();
76 p.push(META_SUFFIX);
77 PathBuf::from(p)
78 }
79
80 fn compute_etag(body: &[u8]) -> String {
81 hex::encode(Sha256::digest(body))
82 }
83
84 async fn read_meta(
85 &self,
86 path: &str,
87 body_len: u64,
88 etag: String,
89 modified: chrono::DateTime<chrono::Utc>,
90 ) -> Result<ResourceMeta, PodError> {
91 let data_path = self.resolve(path)?;
92 let meta_path = Self::meta_path(&data_path);
93 let fallback_ct: &str =
98 crate::ldp::infer_dotfile_content_type(path).unwrap_or("application/octet-stream");
99 let (content_type, links) = match fs::read(&meta_path).await {
100 Ok(bytes) => {
101 let sidecar: MetaSidecar =
102 serde_json::from_slice(&bytes).unwrap_or_else(|_| MetaSidecar {
103 content_type: fallback_ct.to_string(),
104 links: Vec::new(),
105 });
106 (sidecar.content_type, sidecar.links)
107 }
108 Err(_) => (fallback_ct.to_string(), Vec::new()),
109 };
110 Ok(ResourceMeta {
111 etag,
112 modified,
113 size: body_len,
114 content_type,
115 links,
116 })
117 }
118}
119
120#[async_trait]
121impl Storage for FsBackend {
122 async fn get(&self, path: &str) -> Result<(Bytes, ResourceMeta), PodError> {
123 let data_path = self.resolve(path)?;
124 let body = match fs::read(&data_path).await {
125 Ok(b) => Bytes::from(b),
126 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
127 return Err(PodError::NotFound(path.into()));
128 }
129 Err(e) => return Err(e.into()),
130 };
131 let metadata = fs::metadata(&data_path).await?;
132 let modified: chrono::DateTime<chrono::Utc> = metadata
133 .modified()
134 .ok()
135 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
136 .map(|d| {
137 chrono::DateTime::from_timestamp(d.as_secs() as i64, d.subsec_nanos())
138 .unwrap_or_else(chrono::Utc::now)
139 })
140 .unwrap_or_else(chrono::Utc::now);
141 let etag = Self::compute_etag(&body);
142 let meta = self
143 .read_meta(path, body.len() as u64, etag, modified)
144 .await?;
145 Ok((body, meta))
146 }
147
148 async fn put(
149 &self,
150 path: &str,
151 body: Bytes,
152 content_type: &str,
153 ) -> Result<ResourceMeta, PodError> {
154 let data_path = self.resolve(path)?;
155 if let Some(parent) = data_path.parent() {
156 fs::create_dir_all(parent).await?;
157 }
158 fs::write(&data_path, &body).await?;
159 let sidecar = MetaSidecar {
160 content_type: content_type.to_string(),
161 links: Vec::new(),
162 };
163 let sidecar_bytes = serde_json::to_vec(&sidecar)?;
164 fs::write(Self::meta_path(&data_path), &sidecar_bytes).await?;
165 let etag = Self::compute_etag(&body);
166 Ok(ResourceMeta {
167 etag,
168 modified: chrono::Utc::now(),
169 size: body.len() as u64,
170 content_type: content_type.to_string(),
171 links: Vec::new(),
172 })
173 }
174
175 async fn delete(&self, path: &str) -> Result<(), PodError> {
176 let data_path = self.resolve(path)?;
177 match fs::remove_file(&data_path).await {
178 Ok(()) => {}
179 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
180 return Err(PodError::NotFound(path.into()));
181 }
182 Err(e) => return Err(e.into()),
183 }
184 let _ = fs::remove_file(Self::meta_path(&data_path)).await;
185 Ok(())
186 }
187
188 async fn list(&self, container: &str) -> Result<Vec<String>, PodError> {
189 let container_path = self.resolve(container)?;
190 let mut out = Vec::new();
191 let mut dir = match fs::read_dir(&container_path).await {
192 Ok(d) => d,
193 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
194 return Ok(out);
195 }
196 Err(e) => return Err(e.into()),
197 };
198 while let Some(entry) = dir.next_entry().await? {
199 let name = entry.file_name().to_string_lossy().to_string();
200 if name.ends_with(META_SUFFIX) {
201 continue;
202 }
203 let ft = entry.file_type().await?;
204 if ft.is_dir() {
205 out.push(format!("{name}/"));
206 } else {
207 out.push(name);
208 }
209 }
210 out.sort();
211 Ok(out)
212 }
213
214 async fn head(&self, path: &str) -> Result<ResourceMeta, PodError> {
215 let data_path = self.resolve(path)?;
216 let metadata = match fs::metadata(&data_path).await {
217 Ok(m) => m,
218 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
219 return Err(PodError::NotFound(path.into()));
220 }
221 Err(e) => return Err(e.into()),
222 };
223 let body = fs::read(&data_path).await?;
224 let etag = Self::compute_etag(&body);
225 let modified = metadata
226 .modified()
227 .ok()
228 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
229 .map(|d| {
230 chrono::DateTime::from_timestamp(d.as_secs() as i64, d.subsec_nanos())
231 .unwrap_or_else(chrono::Utc::now)
232 })
233 .unwrap_or_else(chrono::Utc::now);
234 self.read_meta(path, body.len() as u64, etag, modified).await
235 }
236
237 async fn exists(&self, path: &str) -> Result<bool, PodError> {
238 let data_path = self.resolve(path)?;
239 Ok(fs::try_exists(&data_path).await.unwrap_or(false))
240 }
241
242 async fn watch(&self, path: &str) -> Result<mpsc::Receiver<StorageEvent>, PodError> {
243 use notify::{RecursiveMode, Watcher};
244
245 let data_path = self.resolve(path)?;
246 let filter_root = data_path.clone();
247 let root = self.root.clone();
248 let (tx, rx) = mpsc::channel::<StorageEvent>(64);
249
250 let (raw_tx, raw_rx) =
251 std::sync::mpsc::channel::<notify::Result<notify::Event>>();
252 let mut watcher = notify::recommended_watcher(move |res| {
253 let _ = raw_tx.send(res);
254 })?;
255 let mode = if data_path.is_dir() {
256 RecursiveMode::Recursive
257 } else {
258 RecursiveMode::NonRecursive
259 };
260 let watch_target = if data_path.exists() {
261 data_path.clone()
262 } else {
263 root.to_path_buf()
264 };
265 watcher.watch(&watch_target, mode)?;
266
267 tokio::task::spawn_blocking(move || {
268 let _keep = watcher;
269 while let Ok(Ok(event)) = raw_rx.recv() {
270 for path in &event.paths {
271 let s = path.to_string_lossy();
272 if s.ends_with(META_SUFFIX) {
273 continue;
274 }
275 let virt = match path.strip_prefix(root.as_path()) {
276 Ok(p) => format!("/{}", p.to_string_lossy()),
277 Err(_) => continue,
278 };
279 if !path.starts_with(&filter_root) && path != &filter_root {
280 continue;
281 }
282 use notify::EventKind;
283 let storage_event = match event.kind {
284 EventKind::Create(_) => StorageEvent::Created(virt),
285 EventKind::Modify(_) => StorageEvent::Updated(virt),
286 EventKind::Remove(_) => StorageEvent::Deleted(virt),
287 _ => continue,
288 };
289 if tx.blocking_send(storage_event).is_err() {
290 return;
291 }
292 }
293 }
294 });
295
296 Ok(rx)
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use tempfile::TempDir;
304
305 #[tokio::test]
306 async fn put_get_roundtrip() {
307 let dir = TempDir::new().unwrap();
308 let fsb = FsBackend::new(dir.path()).await.unwrap();
309 fsb.put("/a/b.txt", Bytes::from_static(b"hello"), "text/plain")
310 .await
311 .unwrap();
312 let (body, meta) = fsb.get("/a/b.txt").await.unwrap();
313 assert_eq!(&body[..], b"hello");
314 assert_eq!(meta.content_type, "text/plain");
315 assert_eq!(meta.size, 5);
316 }
317
318 #[tokio::test]
319 async fn list_skips_meta_sidecar() {
320 let dir = TempDir::new().unwrap();
321 let fsb = FsBackend::new(dir.path()).await.unwrap();
322 fsb.put("/c/x.txt", Bytes::from_static(b"x"), "text/plain")
323 .await
324 .unwrap();
325 let items = fsb.list("/c").await.unwrap();
326 assert_eq!(items, vec!["x.txt".to_string()]);
327 }
328
329 #[tokio::test]
330 async fn delete_removes_resource_and_sidecar() {
331 let dir = TempDir::new().unwrap();
332 let fsb = FsBackend::new(dir.path()).await.unwrap();
333 fsb.put("/f.txt", Bytes::from_static(b"y"), "text/plain")
334 .await
335 .unwrap();
336 fsb.delete("/f.txt").await.unwrap();
337 assert!(!fsb.exists("/f.txt").await.unwrap());
338 let sidecar = dir.path().join("f.txt.meta.json");
339 assert!(!sidecar.exists());
340 }
341
342 #[tokio::test]
343 async fn fs_backend_serves_acl_as_jsonld_without_sidecar() {
344 let dir = TempDir::new().unwrap();
348 let fsb = FsBackend::new(dir.path()).await.unwrap();
349 fs::write(dir.path().join(".acl"), b"{}").await.unwrap();
353 let (_body, meta) = fsb.get("/.acl").await.unwrap();
354 assert_eq!(meta.content_type, "application/ld+json");
355
356 fs::write(dir.path().join("foo.acl"), b"{}").await.unwrap();
358 let (_, meta2) = fsb.get("/foo.acl").await.unwrap();
359 assert_eq!(meta2.content_type, "application/ld+json");
360
361 fs::write(dir.path().join("bar.meta"), b"{}").await.unwrap();
363 let (_, meta3) = fsb.get("/bar.meta").await.unwrap();
364 assert_eq!(meta3.content_type, "application/ld+json");
365
366 fs::write(dir.path().join("plain.bin"), b"\x00\x01")
368 .await
369 .unwrap();
370 let (_, meta4) = fsb.get("/plain.bin").await.unwrap();
371 assert_eq!(meta4.content_type, "application/octet-stream");
372 }
373
374 #[tokio::test]
375 async fn rejects_path_traversal() {
376 let dir = TempDir::new().unwrap();
377 let fsb = FsBackend::new(dir.path()).await.unwrap();
378 let err = fsb
379 .put("/../escape.txt", Bytes::from_static(b""), "text/plain")
380 .await
381 .err()
382 .unwrap();
383 assert!(matches!(err, PodError::InvalidPath(_)));
384 }
385}