1use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12use sha2::{Digest, Sha256};
13use tokio::fs;
14use tokio::sync::mpsc;
15
16use crate::error::PodError;
17use crate::storage::{ResourceMeta, Storage, StorageEvent};
18
19const META_SUFFIX: &str = ".meta.json";
20
21#[derive(Clone)]
23pub struct FsBackend {
24 root: Arc<PathBuf>,
25}
26
27#[derive(serde::Serialize, serde::Deserialize)]
28struct MetaSidecar {
29 content_type: String,
30 #[serde(default)]
31 links: Vec<String>,
32}
33
34impl FsBackend {
35 pub async fn new(root: impl Into<PathBuf>) -> Result<Self, PodError> {
38 let root: PathBuf = root.into();
39 fs::create_dir_all(&root).await?;
40 Ok(Self {
41 root: Arc::new(root),
42 })
43 }
44
45 pub fn root(&self) -> &Path {
47 &self.root
48 }
49
50 fn normalize(path: &str) -> Result<String, PodError> {
51 let p = if path.is_empty() {
52 "/".to_string()
53 } else if path.starts_with('/') {
54 path.to_string()
55 } else {
56 format!("/{path}")
57 };
58 if p.contains("..") || p.contains('\0') {
59 return Err(PodError::InvalidPath(p));
60 }
61 Ok(p)
62 }
63
64 fn resolve(&self, path: &str) -> Result<PathBuf, PodError> {
65 let norm = Self::normalize(path)?;
66 let rel = norm.trim_start_matches('/');
67 let full = self.root.join(rel);
68 if !full.starts_with(self.root.as_path()) {
69 return Err(PodError::InvalidPath(norm));
70 }
71 Ok(full)
72 }
73
74 fn meta_path(data_path: &Path) -> PathBuf {
75 let mut p = data_path.as_os_str().to_owned();
76 p.push(META_SUFFIX);
77 PathBuf::from(p)
78 }
79
80 fn compute_etag(body: &[u8]) -> String {
81 hex::encode(Sha256::digest(body))
82 }
83
84 async fn read_meta(
85 &self,
86 path: &str,
87 body_len: u64,
88 etag: String,
89 modified: chrono::DateTime<chrono::Utc>,
90 ) -> Result<ResourceMeta, PodError> {
91 let data_path = self.resolve(path)?;
92 let meta_path = Self::meta_path(&data_path);
93 let fallback_ct: String = crate::ldp::guess_content_type(path);
101 let (content_type, links) = match fs::read(&meta_path).await {
102 Ok(bytes) => {
103 let sidecar: MetaSidecar =
104 serde_json::from_slice(&bytes).unwrap_or_else(|_| MetaSidecar {
105 content_type: fallback_ct.clone(),
106 links: Vec::new(),
107 });
108 (sidecar.content_type, sidecar.links)
109 }
110 Err(_) => (fallback_ct, Vec::new()),
111 };
112 Ok(ResourceMeta {
113 etag,
114 modified,
115 size: body_len,
116 content_type,
117 links,
118 })
119 }
120}
121
122#[async_trait]
123impl Storage for FsBackend {
124 async fn get(&self, path: &str) -> Result<(Bytes, ResourceMeta), PodError> {
125 let data_path = self.resolve(path)?;
126 let body = match fs::read(&data_path).await {
127 Ok(b) => Bytes::from(b),
128 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
129 return Err(PodError::NotFound(path.into()));
130 }
131 Err(e) => return Err(e.into()),
132 };
133 let metadata = fs::metadata(&data_path).await?;
134 let modified: chrono::DateTime<chrono::Utc> = metadata
135 .modified()
136 .ok()
137 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
138 .map(|d| {
139 chrono::DateTime::from_timestamp(d.as_secs() as i64, d.subsec_nanos())
140 .unwrap_or_else(chrono::Utc::now)
141 })
142 .unwrap_or_else(chrono::Utc::now);
143 let etag = Self::compute_etag(&body);
144 let meta = self
145 .read_meta(path, body.len() as u64, etag, modified)
146 .await?;
147 Ok((body, meta))
148 }
149
150 async fn put(
151 &self,
152 path: &str,
153 body: Bytes,
154 content_type: &str,
155 ) -> Result<ResourceMeta, PodError> {
156 let data_path = self.resolve(path)?;
157 if let Some(parent) = data_path.parent() {
158 fs::create_dir_all(parent).await?;
159 }
160 fs::write(&data_path, &body).await?;
161 let sidecar = MetaSidecar {
162 content_type: content_type.to_string(),
163 links: Vec::new(),
164 };
165 let sidecar_bytes = serde_json::to_vec(&sidecar)?;
166 fs::write(Self::meta_path(&data_path), &sidecar_bytes).await?;
167 let etag = Self::compute_etag(&body);
168 Ok(ResourceMeta {
169 etag,
170 modified: chrono::Utc::now(),
171 size: body.len() as u64,
172 content_type: content_type.to_string(),
173 links: Vec::new(),
174 })
175 }
176
177 async fn delete(&self, path: &str) -> Result<(), PodError> {
178 let data_path = self.resolve(path)?;
179 match fs::remove_file(&data_path).await {
180 Ok(()) => {}
181 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
182 return Err(PodError::NotFound(path.into()));
183 }
184 Err(e) => return Err(e.into()),
185 }
186 let _ = fs::remove_file(Self::meta_path(&data_path)).await;
187 Ok(())
188 }
189
190 async fn list(&self, container: &str) -> Result<Vec<String>, PodError> {
191 let container_path = self.resolve(container)?;
192 let mut out = Vec::new();
193 let mut dir = match fs::read_dir(&container_path).await {
194 Ok(d) => d,
195 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
196 return Ok(out);
197 }
198 Err(e) => return Err(e.into()),
199 };
200 while let Some(entry) = dir.next_entry().await? {
201 let name = entry.file_name().to_string_lossy().to_string();
202 if name.ends_with(META_SUFFIX) {
203 continue;
204 }
205 let ft = entry.file_type().await?;
213 let is_dir = if ft.is_symlink() {
214 fs::metadata(entry.path())
215 .await
216 .map(|m| m.is_dir())
217 .unwrap_or(false)
218 } else {
219 ft.is_dir()
220 };
221 if is_dir {
222 out.push(format!("{name}/"));
223 } else {
224 out.push(name);
225 }
226 }
227 out.sort();
228 Ok(out)
229 }
230
231 async fn head(&self, path: &str) -> Result<ResourceMeta, PodError> {
232 let data_path = self.resolve(path)?;
233 let metadata = match fs::metadata(&data_path).await {
234 Ok(m) => m,
235 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
236 return Err(PodError::NotFound(path.into()));
237 }
238 Err(e) => return Err(e.into()),
239 };
240 let body = fs::read(&data_path).await?;
241 let etag = Self::compute_etag(&body);
242 let modified = metadata
243 .modified()
244 .ok()
245 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
246 .map(|d| {
247 chrono::DateTime::from_timestamp(d.as_secs() as i64, d.subsec_nanos())
248 .unwrap_or_else(chrono::Utc::now)
249 })
250 .unwrap_or_else(chrono::Utc::now);
251 self.read_meta(path, body.len() as u64, etag, modified)
252 .await
253 }
254
255 async fn exists(&self, path: &str) -> Result<bool, PodError> {
256 let data_path = self.resolve(path)?;
257 Ok(fs::try_exists(&data_path).await.unwrap_or(false))
258 }
259
260 async fn create_container(&self, path: &str) -> Result<ResourceMeta, PodError> {
261 let container = if path.ends_with('/') {
262 path.to_string()
263 } else {
264 format!("{path}/")
265 };
266 let dir_path = self.resolve(&container)?;
267 fs::create_dir_all(&dir_path).await?;
268 Ok(ResourceMeta::new("container", 0, "application/ld+json"))
269 }
270
271 async fn watch(&self, path: &str) -> Result<mpsc::Receiver<StorageEvent>, PodError> {
272 use notify::{RecursiveMode, Watcher};
273
274 let data_path = self.resolve(path)?;
275 let filter_root = data_path.clone();
276 let root = self.root.clone();
277 let (tx, rx) = mpsc::channel::<StorageEvent>(64);
278
279 let (raw_tx, raw_rx) = std::sync::mpsc::channel::<notify::Result<notify::Event>>();
280 let mut watcher = notify::recommended_watcher(move |res| {
281 let _ = raw_tx.send(res);
282 })?;
283 let mode = if data_path.is_dir() {
284 RecursiveMode::Recursive
285 } else {
286 RecursiveMode::NonRecursive
287 };
288 let watch_target = if data_path.exists() {
289 data_path.clone()
290 } else {
291 root.to_path_buf()
292 };
293 watcher.watch(&watch_target, mode)?;
294
295 tokio::task::spawn_blocking(move || {
296 let _keep = watcher;
297 while let Ok(Ok(event)) = raw_rx.recv() {
298 for path in &event.paths {
299 let s = path.to_string_lossy();
300 if s.ends_with(META_SUFFIX) {
301 continue;
302 }
303 let virt = match path.strip_prefix(root.as_path()) {
304 Ok(p) => format!("/{}", p.to_string_lossy()),
305 Err(_) => continue,
306 };
307 if !path.starts_with(&filter_root) && path != &filter_root {
308 continue;
309 }
310 use notify::EventKind;
311 let storage_event = match event.kind {
312 EventKind::Create(_) => StorageEvent::Created(virt),
313 EventKind::Modify(_) => StorageEvent::Updated(virt),
314 EventKind::Remove(_) => StorageEvent::Deleted(virt),
315 _ => continue,
316 };
317 if tx.blocking_send(storage_event).is_err() {
318 return;
319 }
320 }
321 }
322 });
323
324 Ok(rx)
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use tempfile::TempDir;
332
333 #[tokio::test]
334 async fn put_get_roundtrip() {
335 let dir = TempDir::new().unwrap();
336 let fsb = FsBackend::new(dir.path()).await.unwrap();
337 fsb.put("/a/b.txt", Bytes::from_static(b"hello"), "text/plain")
338 .await
339 .unwrap();
340 let (body, meta) = fsb.get("/a/b.txt").await.unwrap();
341 assert_eq!(&body[..], b"hello");
342 assert_eq!(meta.content_type, "text/plain");
343 assert_eq!(meta.size, 5);
344 }
345
346 #[tokio::test]
347 async fn list_skips_meta_sidecar() {
348 let dir = TempDir::new().unwrap();
349 let fsb = FsBackend::new(dir.path()).await.unwrap();
350 fsb.put("/c/x.txt", Bytes::from_static(b"x"), "text/plain")
351 .await
352 .unwrap();
353 let items = fsb.list("/c").await.unwrap();
354 assert_eq!(items, vec!["x.txt".to_string()]);
355 }
356
357 #[tokio::test]
358 async fn delete_removes_resource_and_sidecar() {
359 let dir = TempDir::new().unwrap();
360 let fsb = FsBackend::new(dir.path()).await.unwrap();
361 fsb.put("/f.txt", Bytes::from_static(b"y"), "text/plain")
362 .await
363 .unwrap();
364 fsb.delete("/f.txt").await.unwrap();
365 assert!(!fsb.exists("/f.txt").await.unwrap());
366 let sidecar = dir.path().join("f.txt.meta.json");
367 assert!(!sidecar.exists());
368 }
369
370 #[tokio::test]
371 async fn fs_backend_serves_acl_as_jsonld_without_sidecar() {
372 let dir = TempDir::new().unwrap();
376 let fsb = FsBackend::new(dir.path()).await.unwrap();
377 fs::write(dir.path().join(".acl"), b"{}").await.unwrap();
381 let (_body, meta) = fsb.get("/.acl").await.unwrap();
382 assert_eq!(meta.content_type, "application/ld+json");
383
384 fs::write(dir.path().join("foo.acl"), b"{}").await.unwrap();
386 let (_, meta2) = fsb.get("/foo.acl").await.unwrap();
387 assert_eq!(meta2.content_type, "application/ld+json");
388
389 fs::write(dir.path().join("bar.meta"), b"{}").await.unwrap();
391 let (_, meta3) = fsb.get("/bar.meta").await.unwrap();
392 assert_eq!(meta3.content_type, "application/ld+json");
393
394 fs::write(dir.path().join("plain.bin"), b"\x00\x01")
396 .await
397 .unwrap();
398 let (_, meta4) = fsb.get("/plain.bin").await.unwrap();
399 assert_eq!(meta4.content_type, "application/octet-stream");
400 }
401
402 #[tokio::test]
403 async fn rejects_path_traversal() {
404 let dir = TempDir::new().unwrap();
405 let fsb = FsBackend::new(dir.path()).await.unwrap();
406 let err = fsb
407 .put("/../escape.txt", Bytes::from_static(b""), "text/plain")
408 .await
409 .err()
410 .unwrap();
411 assert!(matches!(err, PodError::InvalidPath(_)));
412 }
413}