s3s_fs/
fs.rs

1use crate::error::*;
2use crate::utils::hex;
3
4use s3s::auth::Credentials;
5use s3s::dto;
6
7use std::env;
8use std::ops::Not;
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11
12use tokio::fs;
13use tokio::fs::File;
14use tokio::io::{AsyncReadExt, BufWriter};
15
16use md5::{Digest, Md5};
17use path_absolutize::Absolutize;
18use s3s::dto::PartNumber;
19use uuid::Uuid;
20
21#[derive(Debug)]
22pub struct FileSystem {
23    pub(crate) root: PathBuf,
24    tmp_file_counter: AtomicU64,
25}
26
27pub(crate) type InternalInfo = serde_json::Map<String, serde_json::Value>;
28
29fn clean_old_tmp_files(root: &Path) -> std::io::Result<()> {
30    let entries = match std::fs::read_dir(root) {
31        Ok(entries) => Ok(entries),
32        Err(ref io_err) if io_err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
33        Err(io_err) => Err(io_err),
34    }?;
35    for entry in entries {
36        let entry = entry?;
37        let file_name = entry.file_name();
38        let Some(file_name) = file_name.to_str() else { continue };
39        // See `FileSystem::prepare_file_write`
40        if file_name.starts_with(".tmp.") && file_name.ends_with(".internal.part") {
41            std::fs::remove_file(entry.path())?;
42        }
43    }
44    Ok(())
45}
46
47impl FileSystem {
48    pub fn new(root: impl AsRef<Path>) -> Result<Self> {
49        let root = env::current_dir()?.join(root).canonicalize()?;
50        clean_old_tmp_files(&root)?;
51        let tmp_file_counter = AtomicU64::new(0);
52        Ok(Self { root, tmp_file_counter })
53    }
54
55    pub(crate) fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> {
56        Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned())
57    }
58
59    pub(crate) fn resolve_upload_part_path(&self, upload_id: Uuid, part_number: PartNumber) -> Result<PathBuf> {
60        self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))
61    }
62
63    /// resolve object path under the virtual root
64    pub(crate) fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
65        let dir = Path::new(&bucket);
66        let file_path = Path::new(&key);
67        self.resolve_abs_path(dir.join(file_path))
68    }
69
70    /// resolve bucket path under the virtual root
71    pub(crate) fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf> {
72        let dir = Path::new(&bucket);
73        self.resolve_abs_path(dir)
74    }
75
76    /// resolve metadata path under the virtual root (custom format)
77    pub(crate) fn get_metadata_path(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<PathBuf> {
78        let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s);
79        let u_ext = upload_id.map(|u| format!(".upload-{u}")).unwrap_or_default();
80        let file_path = format!(".bucket-{}.object-{}{u_ext}.metadata.json", encode(bucket), encode(key));
81        self.resolve_abs_path(file_path)
82    }
83
84    pub(crate) fn get_internal_info_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
85        let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s);
86        let file_path = format!(".bucket-{}.object-{}.internal.json", encode(bucket), encode(key));
87        self.resolve_abs_path(file_path)
88    }
89
90    /// load metadata from fs
91    pub(crate) async fn load_metadata(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<Option<dto::Metadata>> {
92        let path = self.get_metadata_path(bucket, key, upload_id)?;
93        if path.exists().not() {
94            return Ok(None);
95        }
96        let content = fs::read(&path).await?;
97        let map = serde_json::from_slice(&content)?;
98        Ok(Some(map))
99    }
100
101    /// save metadata to fs
102    pub(crate) async fn save_metadata(
103        &self,
104        bucket: &str,
105        key: &str,
106        metadata: &dto::Metadata,
107        upload_id: Option<Uuid>,
108    ) -> Result<()> {
109        let path = self.get_metadata_path(bucket, key, upload_id)?;
110        let content = serde_json::to_vec(metadata)?;
111        fs::write(&path, &content).await?;
112        Ok(())
113    }
114
115    /// remove metadata from fs
116    pub(crate) fn delete_metadata(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<()> {
117        let path = self.get_metadata_path(bucket, key, upload_id)?;
118        std::fs::remove_file(path)?;
119        Ok(())
120    }
121
122    pub(crate) async fn load_internal_info(&self, bucket: &str, key: &str) -> Result<Option<InternalInfo>> {
123        let path = self.get_internal_info_path(bucket, key)?;
124        if path.exists().not() {
125            return Ok(None);
126        }
127        let content = fs::read(&path).await?;
128        let map = serde_json::from_slice(&content)?;
129        Ok(Some(map))
130    }
131
132    pub(crate) async fn save_internal_info(&self, bucket: &str, key: &str, info: &InternalInfo) -> Result<()> {
133        let path = self.get_internal_info_path(bucket, key)?;
134        let content = serde_json::to_vec(info)?;
135        fs::write(&path, &content).await?;
136        Ok(())
137    }
138
139    /// get md5 sum
140    pub(crate) async fn get_md5_sum(&self, bucket: &str, key: &str) -> Result<String> {
141        let object_path = self.get_object_path(bucket, key)?;
142        let mut file = File::open(&object_path).await?;
143        let mut buf = vec![0; 65536];
144        let mut md5_hash = Md5::new();
145        loop {
146            let nread = file.read(&mut buf).await?;
147            if nread == 0 {
148                break;
149            }
150            md5_hash.update(&buf[..nread]);
151        }
152        Ok(hex(md5_hash.finalize()))
153    }
154
155    fn get_upload_info_path(&self, upload_id: &Uuid) -> Result<PathBuf> {
156        self.resolve_abs_path(format!(".upload-{upload_id}.json"))
157    }
158
159    pub(crate) async fn create_upload_id(&self, cred: Option<&Credentials>) -> Result<Uuid> {
160        let upload_id = Uuid::new_v4();
161        let upload_info_path = self.get_upload_info_path(&upload_id)?;
162
163        let ak: Option<&str> = cred.map(|c| c.access_key.as_str());
164
165        let content = serde_json::to_vec(&ak)?;
166        fs::write(&upload_info_path, &content).await?;
167
168        Ok(upload_id)
169    }
170
171    pub(crate) async fn verify_upload_id(&self, cred: Option<&Credentials>, upload_id: &Uuid) -> Result<bool> {
172        let upload_info_path = self.get_upload_info_path(upload_id)?;
173        if upload_info_path.exists().not() {
174            return Ok(false);
175        }
176
177        let content = fs::read(&upload_info_path).await?;
178        let ak: Option<String> = serde_json::from_slice(&content)?;
179
180        Ok(ak.as_deref() == cred.map(|c| c.access_key.as_str()))
181    }
182
183    pub(crate) async fn delete_upload_id(&self, upload_id: &Uuid) -> Result<()> {
184        let upload_info_path = self.get_upload_info_path(upload_id)?;
185        if upload_info_path.exists() {
186            fs::remove_file(&upload_info_path).await?;
187        }
188        Ok(())
189    }
190
191    /// Write to the filesystem atomically.
192    /// This is done by first writing to a temporary location and then moving the file.
193    pub(crate) async fn prepare_file_write<'a>(&self, path: &'a Path) -> Result<FileWriter<'a>> {
194        let tmp_name = format!(".tmp.{}.internal.part", self.tmp_file_counter.fetch_add(1, Ordering::SeqCst));
195        let tmp_path = self.resolve_abs_path(tmp_name)?;
196        let file = File::create(&tmp_path).await?;
197        let writer = BufWriter::new(file);
198        Ok(FileWriter {
199            tmp_path,
200            dest_path: path,
201            writer,
202            clean_tmp: true,
203        })
204    }
205}
206
207pub(crate) struct FileWriter<'a> {
208    tmp_path: PathBuf,
209    dest_path: &'a Path,
210    writer: BufWriter<File>,
211    clean_tmp: bool,
212}
213
214impl<'a> FileWriter<'a> {
215    pub(crate) fn tmp_path(&self) -> &Path {
216        &self.tmp_path
217    }
218
219    pub(crate) fn dest_path(&self) -> &'a Path {
220        self.dest_path
221    }
222
223    pub(crate) fn writer(&mut self) -> &mut BufWriter<File> {
224        &mut self.writer
225    }
226
227    pub(crate) async fn done(mut self) -> Result<()> {
228        if let Some(final_dir_path) = self.dest_path().parent() {
229            fs::create_dir_all(&final_dir_path).await?;
230        }
231
232        fs::rename(&self.tmp_path, self.dest_path()).await?;
233        self.clean_tmp = false;
234        Ok(())
235    }
236}
237
238impl Drop for FileWriter<'_> {
239    fn drop(&mut self) {
240        if self.clean_tmp {
241            let _ = std::fs::remove_file(&self.tmp_path);
242        }
243    }
244}