Skip to main content

fileloft_store_fs/
store.rs

1//! Filesystem tus.io storage backend.
2//!
3//! On-disk layout matches tusd / GCS-style keys (relative to `root`):
4//!
5//! - Metadata: `{root}/{prefix}{id}.info` — JSON [`UploadInfo`].
6//! - Parts: `{root}/{prefix}{id}_part_{n}` — one file per PATCH chunk.
7//! - Final: `{root}/{prefix}{id}` — assembled from parts in [`SendUpload::finalize`].
8//!
9//! `prefix` may contain path segments (e.g. `uploads/`); parent directories are created as needed.
10
11use std::path::{Path, PathBuf};
12
13use tokio::fs::OpenOptions;
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15
16use fileloft_core::{
17    error::TusError,
18    info::{UploadId, UploadInfo},
19    store::{SendDataStore, SendUpload},
20};
21
22/// Filesystem-backed store using a flat tusd-style key layout under `root`.
23#[derive(Clone, Debug)]
24pub struct FileStore {
25    root: PathBuf,
26    /// Object-key prefix; empty or ends with `/` (normalized by [`FileStore::with_prefix`]).
27    prefix: String,
28}
29
30impl FileStore {
31    pub fn new(root: impl Into<PathBuf>) -> Self {
32        Self {
33            root: root.into(),
34            prefix: String::new(),
35        }
36    }
37
38    /// Same as [`FileStore::new`], with an object-key prefix such as `"uploads/"`.
39    ///
40    /// A non-empty prefix is normalized to end with `/`, matching GCS object naming.
41    pub fn with_prefix(root: impl Into<PathBuf>, prefix: impl Into<String>) -> Self {
42        let mut prefix = prefix.into();
43        if !prefix.is_empty() && !prefix.ends_with('/') {
44            prefix.push('/');
45        }
46        Self {
47            root: root.into(),
48            prefix,
49        }
50    }
51
52    fn key_to_path(&self, key: &str) -> PathBuf {
53        let mut p = self.root.clone();
54        for seg in key.split('/').filter(|s| !s.is_empty()) {
55            p.push(seg);
56        }
57        p
58    }
59
60    fn object_key_info(&self, id: &UploadId) -> String {
61        format!("{}{}.info", self.prefix, id.as_str())
62    }
63
64    fn info_path(&self, id: &UploadId) -> PathBuf {
65        self.key_to_path(&self.object_key_info(id))
66    }
67}
68
69impl SendDataStore for FileStore {
70    type UploadType = FileUpload;
71
72    async fn create_upload(&self, info: UploadInfo) -> Result<FileUpload, TusError> {
73        info.id.validate()?;
74        let path = self.info_path(&info.id);
75        if let Some(parent) = path.parent() {
76            tokio::fs::create_dir_all(parent)
77                .await
78                .map_err(TusError::Io)?;
79        }
80        let json = serde_json::to_vec(&info).map_err(|e| TusError::Internal(e.to_string()))?;
81        tokio::fs::write(&path, &json).await.map_err(TusError::Io)?;
82        Ok(FileUpload {
83            root: self.root.clone(),
84            prefix: self.prefix.clone(),
85            id: info.id.clone(),
86        })
87    }
88
89    async fn get_upload(&self, id: &UploadId) -> Result<FileUpload, TusError> {
90        id.validate()?;
91        let path = self.info_path(id);
92        if !tokio::fs::try_exists(&path).await.map_err(TusError::Io)? {
93            return Err(TusError::NotFound(id.to_string()));
94        }
95        Ok(FileUpload {
96            root: self.root.clone(),
97            prefix: self.prefix.clone(),
98            id: id.clone(),
99        })
100    }
101}
102
103pub struct FileUpload {
104    root: PathBuf,
105    prefix: String,
106    id: UploadId,
107}
108
109impl FileUpload {
110    fn key_to_path(&self, key: &str) -> PathBuf {
111        let mut p = self.root.clone();
112        for seg in key.split('/').filter(|s| !s.is_empty()) {
113            p.push(seg);
114        }
115        p
116    }
117
118    fn object_key_info(&self) -> String {
119        format!("{}{}.info", self.prefix, self.id.as_str())
120    }
121
122    fn object_key_data(&self) -> String {
123        format!("{}{}", self.prefix, self.id.as_str())
124    }
125
126    fn info_path(&self) -> PathBuf {
127        self.key_to_path(&self.object_key_info())
128    }
129
130    fn data_path(&self) -> PathBuf {
131        self.key_to_path(&self.object_key_data())
132    }
133
134    fn partial_data_path(&self, partial_id: &UploadId) -> PathBuf {
135        let key = format!("{}{}", self.prefix, partial_id.as_str());
136        self.key_to_path(&key)
137    }
138
139    fn part_path(&self, index: u32) -> PathBuf {
140        let key = format!("{}{}_part_{}", self.prefix, self.id.as_str(), index);
141        self.key_to_path(&key)
142    }
143
144    async fn read_info(&self) -> Result<UploadInfo, TusError> {
145        let bytes = tokio::fs::read(self.info_path())
146            .await
147            .map_err(TusError::Io)?;
148        serde_json::from_slice(&bytes).map_err(|e| TusError::Internal(e.to_string()))
149    }
150
151    async fn write_info(&self, info: &UploadInfo) -> Result<(), TusError> {
152        let json = serde_json::to_vec(info).map_err(|e| TusError::Internal(e.to_string()))?;
153        tokio::fs::write(self.info_path(), &json)
154            .await
155            .map_err(TusError::Io)
156    }
157
158    /// Lists part files for this upload, sorted by numeric `_part_{n}` suffix.
159    async fn list_part_paths_sorted(&self) -> Result<Vec<PathBuf>, TusError> {
160        let info_path = self.info_path();
161        let Some(parent) = info_path.parent() else {
162            return Ok(Vec::new());
163        };
164        if !tokio::fs::try_exists(parent).await.map_err(TusError::Io)? {
165            return Ok(Vec::new());
166        }
167
168        let mut rd = tokio::fs::read_dir(parent).await.map_err(TusError::Io)?;
169        let mut indexed: Vec<(u32, PathBuf)> = Vec::new();
170        let name_prefix = format!("{}_part_", self.id.as_str());
171
172        while let Some(ent) = rd.next_entry().await.map_err(TusError::Io)? {
173            let name = ent.file_name();
174            let Some(name_str) = name.to_str() else {
175                continue;
176            };
177            let Some(rest) = name_str.strip_prefix(&name_prefix) else {
178                continue;
179            };
180            if let Ok(idx) = rest.parse::<u32>() {
181                indexed.push((idx, ent.path()));
182            }
183        }
184
185        indexed.sort_by_key(|(i, _)| *i);
186        Ok(indexed.into_iter().map(|(_, p)| p).collect())
187    }
188}
189
190async fn remove_file_ignore_not_found(path: &Path) -> Result<(), TusError> {
191    match tokio::fs::remove_file(path).await {
192        Ok(()) => Ok(()),
193        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
194        Err(e) => Err(TusError::Io(e)),
195    }
196}
197
198impl SendUpload for FileUpload {
199    async fn write_chunk(
200        &mut self,
201        offset: u64,
202        reader: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
203    ) -> Result<u64, TusError> {
204        let mut info = self.read_info().await?;
205        if info.offset != offset {
206            return Err(TusError::OffsetMismatch {
207                expected: info.offset,
208                actual: offset,
209            });
210        }
211
212        let mut buf = Vec::new();
213        reader.read_to_end(&mut buf).await?;
214        let n = buf.len() as u64;
215
216        let end_offset = offset
217            .checked_add(n)
218            .ok_or_else(|| TusError::Internal("upload offset overflow".into()))?;
219        if let Some(declared) = info.size {
220            if end_offset > declared {
221                return Err(TusError::ExceedsUploadLength {
222                    declared,
223                    end: end_offset,
224                });
225            }
226        }
227
228        let part_index = self.list_part_paths_sorted().await?.len() as u32;
229        let part_path = self.part_path(part_index);
230        if let Some(parent) = part_path.parent() {
231            tokio::fs::create_dir_all(parent)
232                .await
233                .map_err(TusError::Io)?;
234        }
235        tokio::fs::write(&part_path, &buf)
236            .await
237            .map_err(TusError::Io)?;
238
239        info.offset = end_offset;
240        self.write_info(&info).await?;
241        Ok(n)
242    }
243
244    async fn get_info(&self) -> Result<UploadInfo, TusError> {
245        self.read_info().await
246    }
247
248    async fn finalize(&mut self) -> Result<(), TusError> {
249        let parts = self.list_part_paths_sorted().await?;
250        let dest = self.data_path();
251        if let Some(parent) = dest.parent() {
252            tokio::fs::create_dir_all(parent)
253                .await
254                .map_err(TusError::Io)?;
255        }
256
257        if parts.is_empty() {
258            tokio::fs::write(&dest, &[]).await.map_err(TusError::Io)?;
259            return Ok(());
260        }
261
262        let mut out = OpenOptions::new()
263            .create(true)
264            .truncate(true)
265            .write(true)
266            .open(&dest)
267            .await
268            .map_err(TusError::Io)?;
269
270        for part_path in &parts {
271            let mut part_file = tokio::fs::File::open(part_path)
272                .await
273                .map_err(TusError::Io)?;
274            tokio::io::copy(&mut part_file, &mut out)
275                .await
276                .map_err(TusError::Io)?;
277        }
278        out.flush().await.map_err(TusError::Io)?;
279
280        for part_path in &parts {
281            remove_file_ignore_not_found(part_path).await?;
282        }
283        Ok(())
284    }
285
286    async fn delete(self) -> Result<(), TusError> {
287        remove_file_ignore_not_found(&self.data_path()).await?;
288        remove_file_ignore_not_found(&self.info_path()).await?;
289        let parts = self.list_part_paths_sorted().await?;
290        for part in parts {
291            remove_file_ignore_not_found(&part).await?;
292        }
293        Ok(())
294    }
295
296    async fn declare_length(&mut self, length: u64) -> Result<(), TusError> {
297        let mut info = self.read_info().await?;
298        if info.size.is_some() {
299            return Err(TusError::UploadLengthAlreadySet);
300        }
301        info.size = Some(length);
302        info.size_is_deferred = false;
303        self.write_info(&info).await
304    }
305
306    async fn read_content(&self) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, TusError> {
307        let info = self.read_info().await?;
308        if !info.is_complete() {
309            return Err(TusError::UploadNotReadyForDownload);
310        }
311        let path = self.data_path();
312        if !tokio::fs::try_exists(&path).await.map_err(TusError::Io)? {
313            return Err(TusError::UploadNotReadyForDownload);
314        }
315        let file = tokio::fs::File::open(&path).await.map_err(TusError::Io)?;
316        Ok(Box::new(file))
317    }
318
319    async fn concatenate(&mut self, partials: &[UploadInfo]) -> Result<(), TusError> {
320        let dest = self.data_path();
321        if let Some(parent) = dest.parent() {
322            tokio::fs::create_dir_all(parent)
323                .await
324                .map_err(TusError::Io)?;
325        }
326
327        let mut out = OpenOptions::new()
328            .create(true)
329            .truncate(true)
330            .write(true)
331            .open(&dest)
332            .await
333            .map_err(TusError::Io)?;
334
335        for partial in partials {
336            let src_path = self.partial_data_path(&partial.id);
337            let mut src = tokio::fs::File::open(&src_path)
338                .await
339                .map_err(TusError::Io)?;
340            tokio::io::copy(&mut src, &mut out)
341                .await
342                .map_err(TusError::Io)?;
343        }
344        out.flush().await.map_err(TusError::Io)?;
345
346        let total: u64 = partials.iter().filter_map(|p| p.size).sum();
347        let mut info = self.read_info().await?;
348        info.size = Some(total);
349        info.offset = total;
350        info.is_final = true;
351        self.write_info(&info).await
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use fileloft_core::store::{SendDataStore, SendUpload};
359    use std::io::Cursor;
360
361    fn info_with_id(id: &str, size: Option<u64>) -> UploadInfo {
362        UploadInfo::new(UploadId::from(id), size)
363    }
364
365    #[tokio::test]
366    async fn create_patch_finalize_and_paths() {
367        let dir = tempfile::tempdir().expect("tempdir");
368        let store = FileStore::new(dir.path());
369        let info = info_with_id("up-1", Some(11));
370
371        let mut upload = store.create_upload(info).await.expect("create");
372
373        let info_path = dir.path().join("up-1.info");
374        assert!(tokio::fs::try_exists(&info_path).await.expect("exists"));
375
376        let mut r = Cursor::new(b"hello ".as_slice());
377        upload.write_chunk(0, &mut r).await.expect("chunk 1");
378        let p0 = dir.path().join("up-1_part_0");
379        assert!(tokio::fs::try_exists(&p0).await.expect("part 0"));
380
381        let mut r = Cursor::new(b"world".as_slice());
382        upload.write_chunk(6, &mut r).await.expect("chunk 2");
383
384        upload.finalize().await.expect("finalize");
385
386        let final_path = dir.path().join("up-1");
387        let got = tokio::fs::read(&final_path).await.expect("read final");
388        assert_eq!(got, b"hello world");
389        assert!(!tokio::fs::try_exists(&p0).await.expect("part 0 gone"));
390        assert!(!tokio::fs::try_exists(&dir.path().join("up-1_part_1"))
391            .await
392            .expect("stat"));
393    }
394
395    #[tokio::test]
396    async fn zero_byte_upload_finalize() {
397        let dir = tempfile::tempdir().expect("tempdir");
398        let store = FileStore::new(dir.path());
399        let mut upload = store
400            .create_upload(info_with_id("empty", Some(0)))
401            .await
402            .expect("create");
403        upload.finalize().await.expect("finalize");
404        let final_path = dir.path().join("empty");
405        let got = tokio::fs::read(&final_path).await.expect("read");
406        assert!(got.is_empty());
407    }
408
409    #[tokio::test]
410    async fn delete_removes_final_info_and_parts() {
411        let dir = tempfile::tempdir().expect("tempdir");
412        let store = FileStore::new(dir.path());
413        let mut upload = store
414            .create_upload(info_with_id("del-me", Some(3)))
415            .await
416            .expect("create");
417        let mut r = Cursor::new(b"abc".as_slice());
418        upload.write_chunk(0, &mut r).await.expect("chunk");
419        upload.finalize().await.expect("finalize");
420
421        upload.delete().await.expect("delete");
422
423        assert!(!tokio::fs::try_exists(dir.path().join("del-me"))
424            .await
425            .expect("stat"));
426        assert!(!tokio::fs::try_exists(dir.path().join("del-me.info"))
427            .await
428            .expect("stat"));
429    }
430
431    #[tokio::test]
432    async fn concatenate_builds_final_from_partial_finals() {
433        let dir = tempfile::tempdir().expect("tempdir");
434        let store = FileStore::new(dir.path());
435
436        let mut p1 = store
437            .create_upload(info_with_id("p1", Some(2)))
438            .await
439            .expect("p1");
440        let mut r = Cursor::new(b"aa".as_slice());
441        p1.write_chunk(0, &mut r).await.expect("w");
442        p1.finalize().await.expect("f");
443
444        let mut p2 = store
445            .create_upload(info_with_id("p2", Some(2)))
446            .await
447            .expect("p2");
448        let mut r = Cursor::new(b"bb".as_slice());
449        p2.write_chunk(0, &mut r).await.expect("w");
450        p2.finalize().await.expect("f");
451
452        let mut fin = store
453            .create_upload(info_with_id("final", None))
454            .await
455            .expect("final");
456        let mut i1 = info_with_id("p1", Some(2));
457        let mut i2 = info_with_id("p2", Some(2));
458        i1.offset = 2;
459        i2.offset = 2;
460        fin.concatenate(&[i1, i2]).await.expect("concat");
461
462        let got = tokio::fs::read(dir.path().join("final"))
463            .await
464            .expect("read");
465        assert_eq!(got, b"aabb");
466        let meta: UploadInfo = serde_json::from_slice(
467            &tokio::fs::read(dir.path().join("final.info"))
468                .await
469                .expect("read info"),
470        )
471        .expect("json");
472        assert_eq!(meta.offset, 4);
473        assert_eq!(meta.size, Some(4));
474        assert!(meta.is_final);
475    }
476
477    #[tokio::test]
478    async fn with_prefix_creates_nested_dirs() {
479        let dir = tempfile::tempdir().expect("tempdir");
480        let store = FileStore::with_prefix(dir.path(), "uploads");
481        let mut upload = store
482            .create_upload(info_with_id("x", Some(1)))
483            .await
484            .expect("create");
485        let mut r = Cursor::new(b"z".as_slice());
486        upload.write_chunk(0, &mut r).await.expect("chunk");
487        assert!(tokio::fs::try_exists(dir.path().join("uploads/x_part_0"))
488            .await
489            .expect("stat"));
490        upload.finalize().await.expect("finalize");
491        assert!(tokio::fs::try_exists(dir.path().join("uploads/x"))
492            .await
493            .expect("stat"));
494    }
495}