1use std::path::{Path, PathBuf};
12
13use tokio::fs::OpenOptions;
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15
16use fileloft_core::{
17 error::TusError,
18 info::{UploadId, UploadInfo},
19 store::{SendDataStore, SendUpload},
20};
21
22#[derive(Clone, Debug)]
24pub struct FileStore {
25 root: PathBuf,
26 prefix: String,
28}
29
30impl FileStore {
31 pub fn new(root: impl Into<PathBuf>) -> Self {
32 Self {
33 root: root.into(),
34 prefix: String::new(),
35 }
36 }
37
38 pub fn with_prefix(root: impl Into<PathBuf>, prefix: impl Into<String>) -> Self {
42 let mut prefix = prefix.into();
43 if !prefix.is_empty() && !prefix.ends_with('/') {
44 prefix.push('/');
45 }
46 Self {
47 root: root.into(),
48 prefix,
49 }
50 }
51
52 fn key_to_path(&self, key: &str) -> PathBuf {
53 let mut p = self.root.clone();
54 for seg in key.split('/').filter(|s| !s.is_empty()) {
55 p.push(seg);
56 }
57 p
58 }
59
60 fn object_key_info(&self, id: &UploadId) -> String {
61 format!("{}{}.info", self.prefix, id.as_str())
62 }
63
64 fn info_path(&self, id: &UploadId) -> PathBuf {
65 self.key_to_path(&self.object_key_info(id))
66 }
67}
68
69impl SendDataStore for FileStore {
70 type UploadType = FileUpload;
71
72 async fn create_upload(&self, info: UploadInfo) -> Result<FileUpload, TusError> {
73 info.id.validate()?;
74 let path = self.info_path(&info.id);
75 if let Some(parent) = path.parent() {
76 tokio::fs::create_dir_all(parent)
77 .await
78 .map_err(TusError::Io)?;
79 }
80 let json = serde_json::to_vec(&info).map_err(|e| TusError::Internal(e.to_string()))?;
81 tokio::fs::write(&path, &json).await.map_err(TusError::Io)?;
82 Ok(FileUpload {
83 root: self.root.clone(),
84 prefix: self.prefix.clone(),
85 id: info.id.clone(),
86 })
87 }
88
89 async fn get_upload(&self, id: &UploadId) -> Result<FileUpload, TusError> {
90 id.validate()?;
91 let path = self.info_path(id);
92 if !tokio::fs::try_exists(&path).await.map_err(TusError::Io)? {
93 return Err(TusError::NotFound(id.to_string()));
94 }
95 Ok(FileUpload {
96 root: self.root.clone(),
97 prefix: self.prefix.clone(),
98 id: id.clone(),
99 })
100 }
101}
102
103pub struct FileUpload {
104 root: PathBuf,
105 prefix: String,
106 id: UploadId,
107}
108
109impl FileUpload {
110 fn key_to_path(&self, key: &str) -> PathBuf {
111 let mut p = self.root.clone();
112 for seg in key.split('/').filter(|s| !s.is_empty()) {
113 p.push(seg);
114 }
115 p
116 }
117
118 fn object_key_info(&self) -> String {
119 format!("{}{}.info", self.prefix, self.id.as_str())
120 }
121
122 fn object_key_data(&self) -> String {
123 format!("{}{}", self.prefix, self.id.as_str())
124 }
125
126 fn info_path(&self) -> PathBuf {
127 self.key_to_path(&self.object_key_info())
128 }
129
130 fn data_path(&self) -> PathBuf {
131 self.key_to_path(&self.object_key_data())
132 }
133
134 fn partial_data_path(&self, partial_id: &UploadId) -> PathBuf {
135 let key = format!("{}{}", self.prefix, partial_id.as_str());
136 self.key_to_path(&key)
137 }
138
139 fn part_path(&self, index: u32) -> PathBuf {
140 let key = format!("{}{}_part_{}", self.prefix, self.id.as_str(), index);
141 self.key_to_path(&key)
142 }
143
144 async fn read_info(&self) -> Result<UploadInfo, TusError> {
145 let bytes = tokio::fs::read(self.info_path())
146 .await
147 .map_err(TusError::Io)?;
148 serde_json::from_slice(&bytes).map_err(|e| TusError::Internal(e.to_string()))
149 }
150
151 async fn write_info(&self, info: &UploadInfo) -> Result<(), TusError> {
152 let json = serde_json::to_vec(info).map_err(|e| TusError::Internal(e.to_string()))?;
153 tokio::fs::write(self.info_path(), &json)
154 .await
155 .map_err(TusError::Io)
156 }
157
158 async fn list_part_paths_sorted(&self) -> Result<Vec<PathBuf>, TusError> {
160 let info_path = self.info_path();
161 let Some(parent) = info_path.parent() else {
162 return Ok(Vec::new());
163 };
164 if !tokio::fs::try_exists(parent).await.map_err(TusError::Io)? {
165 return Ok(Vec::new());
166 }
167
168 let mut rd = tokio::fs::read_dir(parent).await.map_err(TusError::Io)?;
169 let mut indexed: Vec<(u32, PathBuf)> = Vec::new();
170 let name_prefix = format!("{}_part_", self.id.as_str());
171
172 while let Some(ent) = rd.next_entry().await.map_err(TusError::Io)? {
173 let name = ent.file_name();
174 let Some(name_str) = name.to_str() else {
175 continue;
176 };
177 let Some(rest) = name_str.strip_prefix(&name_prefix) else {
178 continue;
179 };
180 if let Ok(idx) = rest.parse::<u32>() {
181 indexed.push((idx, ent.path()));
182 }
183 }
184
185 indexed.sort_by_key(|(i, _)| *i);
186 Ok(indexed.into_iter().map(|(_, p)| p).collect())
187 }
188}
189
190async fn remove_file_ignore_not_found(path: &Path) -> Result<(), TusError> {
191 match tokio::fs::remove_file(path).await {
192 Ok(()) => Ok(()),
193 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
194 Err(e) => Err(TusError::Io(e)),
195 }
196}
197
198impl SendUpload for FileUpload {
199 async fn write_chunk(
200 &mut self,
201 offset: u64,
202 reader: &mut (dyn tokio::io::AsyncRead + Unpin + Send),
203 ) -> Result<u64, TusError> {
204 let mut info = self.read_info().await?;
205 if info.offset != offset {
206 return Err(TusError::OffsetMismatch {
207 expected: info.offset,
208 actual: offset,
209 });
210 }
211
212 let mut buf = Vec::new();
213 reader.read_to_end(&mut buf).await?;
214 let n = buf.len() as u64;
215
216 let end_offset = offset
217 .checked_add(n)
218 .ok_or_else(|| TusError::Internal("upload offset overflow".into()))?;
219 if let Some(declared) = info.size {
220 if end_offset > declared {
221 return Err(TusError::ExceedsUploadLength {
222 declared,
223 end: end_offset,
224 });
225 }
226 }
227
228 let part_index = self.list_part_paths_sorted().await?.len() as u32;
229 let part_path = self.part_path(part_index);
230 if let Some(parent) = part_path.parent() {
231 tokio::fs::create_dir_all(parent)
232 .await
233 .map_err(TusError::Io)?;
234 }
235 tokio::fs::write(&part_path, &buf)
236 .await
237 .map_err(TusError::Io)?;
238
239 info.offset = end_offset;
240 self.write_info(&info).await?;
241 Ok(n)
242 }
243
244 async fn get_info(&self) -> Result<UploadInfo, TusError> {
245 self.read_info().await
246 }
247
248 async fn finalize(&mut self) -> Result<(), TusError> {
249 let parts = self.list_part_paths_sorted().await?;
250 let dest = self.data_path();
251 if let Some(parent) = dest.parent() {
252 tokio::fs::create_dir_all(parent)
253 .await
254 .map_err(TusError::Io)?;
255 }
256
257 if parts.is_empty() {
258 tokio::fs::write(&dest, &[]).await.map_err(TusError::Io)?;
259 return Ok(());
260 }
261
262 let mut out = OpenOptions::new()
263 .create(true)
264 .truncate(true)
265 .write(true)
266 .open(&dest)
267 .await
268 .map_err(TusError::Io)?;
269
270 for part_path in &parts {
271 let mut part_file = tokio::fs::File::open(part_path)
272 .await
273 .map_err(TusError::Io)?;
274 tokio::io::copy(&mut part_file, &mut out)
275 .await
276 .map_err(TusError::Io)?;
277 }
278 out.flush().await.map_err(TusError::Io)?;
279
280 for part_path in &parts {
281 remove_file_ignore_not_found(part_path).await?;
282 }
283 Ok(())
284 }
285
286 async fn delete(self) -> Result<(), TusError> {
287 remove_file_ignore_not_found(&self.data_path()).await?;
288 remove_file_ignore_not_found(&self.info_path()).await?;
289 let parts = self.list_part_paths_sorted().await?;
290 for part in parts {
291 remove_file_ignore_not_found(&part).await?;
292 }
293 Ok(())
294 }
295
296 async fn declare_length(&mut self, length: u64) -> Result<(), TusError> {
297 let mut info = self.read_info().await?;
298 if info.size.is_some() {
299 return Err(TusError::UploadLengthAlreadySet);
300 }
301 info.size = Some(length);
302 info.size_is_deferred = false;
303 self.write_info(&info).await
304 }
305
306 async fn read_content(&self) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>, TusError> {
307 let info = self.read_info().await?;
308 if !info.is_complete() {
309 return Err(TusError::UploadNotReadyForDownload);
310 }
311 let path = self.data_path();
312 if !tokio::fs::try_exists(&path).await.map_err(TusError::Io)? {
313 return Err(TusError::UploadNotReadyForDownload);
314 }
315 let file = tokio::fs::File::open(&path).await.map_err(TusError::Io)?;
316 Ok(Box::new(file))
317 }
318
319 async fn concatenate(&mut self, partials: &[UploadInfo]) -> Result<(), TusError> {
320 let dest = self.data_path();
321 if let Some(parent) = dest.parent() {
322 tokio::fs::create_dir_all(parent)
323 .await
324 .map_err(TusError::Io)?;
325 }
326
327 let mut out = OpenOptions::new()
328 .create(true)
329 .truncate(true)
330 .write(true)
331 .open(&dest)
332 .await
333 .map_err(TusError::Io)?;
334
335 for partial in partials {
336 let src_path = self.partial_data_path(&partial.id);
337 let mut src = tokio::fs::File::open(&src_path)
338 .await
339 .map_err(TusError::Io)?;
340 tokio::io::copy(&mut src, &mut out)
341 .await
342 .map_err(TusError::Io)?;
343 }
344 out.flush().await.map_err(TusError::Io)?;
345
346 let total: u64 = partials.iter().filter_map(|p| p.size).sum();
347 let mut info = self.read_info().await?;
348 info.size = Some(total);
349 info.offset = total;
350 info.is_final = true;
351 self.write_info(&info).await
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use fileloft_core::store::{SendDataStore, SendUpload};
359 use std::io::Cursor;
360
361 fn info_with_id(id: &str, size: Option<u64>) -> UploadInfo {
362 UploadInfo::new(UploadId::from(id), size)
363 }
364
365 #[tokio::test]
366 async fn create_patch_finalize_and_paths() {
367 let dir = tempfile::tempdir().expect("tempdir");
368 let store = FileStore::new(dir.path());
369 let info = info_with_id("up-1", Some(11));
370
371 let mut upload = store.create_upload(info).await.expect("create");
372
373 let info_path = dir.path().join("up-1.info");
374 assert!(tokio::fs::try_exists(&info_path).await.expect("exists"));
375
376 let mut r = Cursor::new(b"hello ".as_slice());
377 upload.write_chunk(0, &mut r).await.expect("chunk 1");
378 let p0 = dir.path().join("up-1_part_0");
379 assert!(tokio::fs::try_exists(&p0).await.expect("part 0"));
380
381 let mut r = Cursor::new(b"world".as_slice());
382 upload.write_chunk(6, &mut r).await.expect("chunk 2");
383
384 upload.finalize().await.expect("finalize");
385
386 let final_path = dir.path().join("up-1");
387 let got = tokio::fs::read(&final_path).await.expect("read final");
388 assert_eq!(got, b"hello world");
389 assert!(!tokio::fs::try_exists(&p0).await.expect("part 0 gone"));
390 assert!(!tokio::fs::try_exists(&dir.path().join("up-1_part_1"))
391 .await
392 .expect("stat"));
393 }
394
395 #[tokio::test]
396 async fn zero_byte_upload_finalize() {
397 let dir = tempfile::tempdir().expect("tempdir");
398 let store = FileStore::new(dir.path());
399 let mut upload = store
400 .create_upload(info_with_id("empty", Some(0)))
401 .await
402 .expect("create");
403 upload.finalize().await.expect("finalize");
404 let final_path = dir.path().join("empty");
405 let got = tokio::fs::read(&final_path).await.expect("read");
406 assert!(got.is_empty());
407 }
408
409 #[tokio::test]
410 async fn delete_removes_final_info_and_parts() {
411 let dir = tempfile::tempdir().expect("tempdir");
412 let store = FileStore::new(dir.path());
413 let mut upload = store
414 .create_upload(info_with_id("del-me", Some(3)))
415 .await
416 .expect("create");
417 let mut r = Cursor::new(b"abc".as_slice());
418 upload.write_chunk(0, &mut r).await.expect("chunk");
419 upload.finalize().await.expect("finalize");
420
421 upload.delete().await.expect("delete");
422
423 assert!(!tokio::fs::try_exists(dir.path().join("del-me"))
424 .await
425 .expect("stat"));
426 assert!(!tokio::fs::try_exists(dir.path().join("del-me.info"))
427 .await
428 .expect("stat"));
429 }
430
431 #[tokio::test]
432 async fn concatenate_builds_final_from_partial_finals() {
433 let dir = tempfile::tempdir().expect("tempdir");
434 let store = FileStore::new(dir.path());
435
436 let mut p1 = store
437 .create_upload(info_with_id("p1", Some(2)))
438 .await
439 .expect("p1");
440 let mut r = Cursor::new(b"aa".as_slice());
441 p1.write_chunk(0, &mut r).await.expect("w");
442 p1.finalize().await.expect("f");
443
444 let mut p2 = store
445 .create_upload(info_with_id("p2", Some(2)))
446 .await
447 .expect("p2");
448 let mut r = Cursor::new(b"bb".as_slice());
449 p2.write_chunk(0, &mut r).await.expect("w");
450 p2.finalize().await.expect("f");
451
452 let mut fin = store
453 .create_upload(info_with_id("final", None))
454 .await
455 .expect("final");
456 let mut i1 = info_with_id("p1", Some(2));
457 let mut i2 = info_with_id("p2", Some(2));
458 i1.offset = 2;
459 i2.offset = 2;
460 fin.concatenate(&[i1, i2]).await.expect("concat");
461
462 let got = tokio::fs::read(dir.path().join("final"))
463 .await
464 .expect("read");
465 assert_eq!(got, b"aabb");
466 let meta: UploadInfo = serde_json::from_slice(
467 &tokio::fs::read(dir.path().join("final.info"))
468 .await
469 .expect("read info"),
470 )
471 .expect("json");
472 assert_eq!(meta.offset, 4);
473 assert_eq!(meta.size, Some(4));
474 assert!(meta.is_final);
475 }
476
477 #[tokio::test]
478 async fn with_prefix_creates_nested_dirs() {
479 let dir = tempfile::tempdir().expect("tempdir");
480 let store = FileStore::with_prefix(dir.path(), "uploads");
481 let mut upload = store
482 .create_upload(info_with_id("x", Some(1)))
483 .await
484 .expect("create");
485 let mut r = Cursor::new(b"z".as_slice());
486 upload.write_chunk(0, &mut r).await.expect("chunk");
487 assert!(tokio::fs::try_exists(dir.path().join("uploads/x_part_0"))
488 .await
489 .expect("stat"));
490 upload.finalize().await.expect("finalize");
491 assert!(tokio::fs::try_exists(dir.path().join("uploads/x"))
492 .await
493 .expect("stat"));
494 }
495}