s3s_fs/
fs.rs

1use crate::error::*;
2use crate::utils::hex;
3
4use s3s::auth::Credentials;
5use s3s::crypto::Checksum;
6use s3s::crypto::Md5;
7use s3s::dto;
8use s3s::dto::PartNumber;
9
10use std::env;
11use std::ops::Not;
12use std::path::{Path, PathBuf};
13use std::sync::atomic::{AtomicU64, Ordering};
14
15use tokio::fs;
16use tokio::fs::File;
17use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
18
19use path_absolutize::Absolutize;
20use uuid::Uuid;
21
22#[derive(Debug)]
23pub struct FileSystem {
24    pub(crate) root: PathBuf,
25    tmp_file_counter: AtomicU64,
26}
27
28pub(crate) type InternalInfo = serde_json::Map<String, serde_json::Value>;
29
30/// Stores standard object attributes alongside user metadata
31#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
32pub(crate) struct ObjectAttributes {
33    /// User-defined metadata (x-amz-meta-*)
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub user_metadata: Option<dto::Metadata>,
36
37    /// Standard object attributes
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub content_encoding: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub content_type: Option<String>,
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub content_disposition: Option<String>,
44    #[serde(skip_serializing_if = "Option::is_none")]
45    pub content_language: Option<String>,
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub cache_control: Option<String>,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub expires: Option<String>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub website_redirect_location: Option<String>,
52}
53
54impl ObjectAttributes {
55    /// Convert expires Timestamp to String for storage
56    pub fn set_expires_timestamp(&mut self, expires: Option<dto::Timestamp>) {
57        self.expires = expires.and_then(|ts| {
58            let mut buf = Vec::new();
59            match ts.format(dto::TimestampFormat::DateTime, &mut buf) {
60                Ok(()) => Some(String::from_utf8_lossy(&buf).into_owned()),
61                Err(_) => None,
62            }
63        });
64    }
65
66    /// Parse expires String back to Timestamp
67    pub fn get_expires_timestamp(&self) -> Option<dto::Timestamp> {
68        self.expires
69            .as_ref()
70            .and_then(|s| dto::Timestamp::parse(dto::TimestampFormat::DateTime, s).ok())
71    }
72}
73
74fn clean_old_tmp_files(root: &Path) -> std::io::Result<()> {
75    let entries = match std::fs::read_dir(root) {
76        Ok(entries) => Ok(entries),
77        Err(ref io_err) if io_err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
78        Err(io_err) => Err(io_err),
79    }?;
80    for entry in entries {
81        let entry = entry?;
82        let file_name = entry.file_name();
83        let Some(file_name) = file_name.to_str() else { continue };
84        // See `FileSystem::prepare_file_write`
85        if file_name.starts_with(".tmp.") && file_name.ends_with(".internal.part") {
86            std::fs::remove_file(entry.path())?;
87        }
88    }
89    Ok(())
90}
91
92impl FileSystem {
93    pub fn new(root: impl AsRef<Path>) -> Result<Self> {
94        let root = env::current_dir()?.join(root).canonicalize()?;
95        clean_old_tmp_files(&root)?;
96        let tmp_file_counter = AtomicU64::new(0);
97        Ok(Self { root, tmp_file_counter })
98    }
99
100    pub(crate) fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> {
101        Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned())
102    }
103
104    pub(crate) fn resolve_upload_part_path(&self, upload_id: Uuid, part_number: PartNumber) -> Result<PathBuf> {
105        self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))
106    }
107
108    /// resolve object path under the virtual root
109    pub(crate) fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
110        let dir = Path::new(&bucket);
111        let file_path = Path::new(&key);
112        self.resolve_abs_path(dir.join(file_path))
113    }
114
115    /// resolve bucket path under the virtual root
116    pub(crate) fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf> {
117        let dir = Path::new(&bucket);
118        self.resolve_abs_path(dir)
119    }
120
121    /// resolve metadata path under the virtual root (custom format)
122    pub(crate) fn get_metadata_path(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<PathBuf> {
123        let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s);
124        let u_ext = upload_id.map(|u| format!(".upload-{u}")).unwrap_or_default();
125        let file_path = format!(".bucket-{}.object-{}{u_ext}.metadata.json", encode(bucket), encode(key));
126        self.resolve_abs_path(file_path)
127    }
128
129    pub(crate) fn get_internal_info_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
130        let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s);
131        let file_path = format!(".bucket-{}.object-{}.internal.json", encode(bucket), encode(key));
132        self.resolve_abs_path(file_path)
133    }
134
135    /// load object attributes from fs (with backward compatibility)
136    pub(crate) async fn load_object_attributes(
137        &self,
138        bucket: &str,
139        key: &str,
140        upload_id: Option<Uuid>,
141    ) -> Result<Option<ObjectAttributes>> {
142        let path = self.get_metadata_path(bucket, key, upload_id)?;
143        if path.exists().not() {
144            return Ok(None);
145        }
146        let content = fs::read(&path).await?;
147
148        // Try to deserialize as ObjectAttributes first (new format)
149        if let Ok(attrs) = serde_json::from_slice::<ObjectAttributes>(&content) {
150            return Ok(Some(attrs));
151        }
152
153        // Fall back to old format (just user metadata)
154        if let Ok(user_metadata) = serde_json::from_slice::<dto::Metadata>(&content) {
155            return Ok(Some(ObjectAttributes {
156                user_metadata: Some(user_metadata),
157                ..Default::default()
158            }));
159        }
160
161        Ok(None)
162    }
163
164    /// save object attributes to fs
165    pub(crate) async fn save_object_attributes(
166        &self,
167        bucket: &str,
168        key: &str,
169        attrs: &ObjectAttributes,
170        upload_id: Option<Uuid>,
171    ) -> Result<()> {
172        let path = self.get_metadata_path(bucket, key, upload_id)?;
173        let content = serde_json::to_vec(attrs)?;
174        let mut file_writer = self.prepare_file_write(&path).await?;
175        file_writer.writer().write_all(&content).await?;
176        file_writer.writer().flush().await?;
177        file_writer.done().await?;
178        Ok(())
179    }
180
181    /// remove metadata from fs
182    pub(crate) fn delete_metadata(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<()> {
183        let path = self.get_metadata_path(bucket, key, upload_id)?;
184        std::fs::remove_file(path)?;
185        Ok(())
186    }
187
188    pub(crate) async fn load_internal_info(&self, bucket: &str, key: &str) -> Result<Option<InternalInfo>> {
189        let path = self.get_internal_info_path(bucket, key)?;
190        if path.exists().not() {
191            return Ok(None);
192        }
193        let content = fs::read(&path).await?;
194        let map = serde_json::from_slice(&content)?;
195        Ok(Some(map))
196    }
197
198    pub(crate) async fn save_internal_info(&self, bucket: &str, key: &str, info: &InternalInfo) -> Result<()> {
199        let path = self.get_internal_info_path(bucket, key)?;
200        let content = serde_json::to_vec(info)?;
201        let mut file_writer = self.prepare_file_write(&path).await?;
202        file_writer.writer().write_all(&content).await?;
203        file_writer.writer().flush().await?;
204        file_writer.done().await?;
205        Ok(())
206    }
207
208    /// get md5 sum
209    pub(crate) async fn get_md5_sum(&self, bucket: &str, key: &str) -> Result<String> {
210        let object_path = self.get_object_path(bucket, key)?;
211        let mut file = File::open(&object_path).await?;
212        let mut buf = vec![0; 65536];
213        let mut md5_hash = Md5::new();
214        loop {
215            let nread = file.read(&mut buf).await?;
216            if nread == 0 {
217                break;
218            }
219            md5_hash.update(&buf[..nread]);
220        }
221        Ok(hex(md5_hash.finalize()))
222    }
223
224    fn get_upload_info_path(&self, upload_id: &Uuid) -> Result<PathBuf> {
225        self.resolve_abs_path(format!(".upload-{upload_id}.json"))
226    }
227
228    pub(crate) async fn create_upload_id(&self, cred: Option<&Credentials>) -> Result<Uuid> {
229        let upload_id = Uuid::new_v4();
230        let upload_info_path = self.get_upload_info_path(&upload_id)?;
231
232        let ak: Option<&str> = cred.map(|c| c.access_key.as_str());
233
234        let content = serde_json::to_vec(&ak)?;
235        let mut file_writer = self.prepare_file_write(&upload_info_path).await?;
236        file_writer.writer().write_all(&content).await?;
237        file_writer.writer().flush().await?;
238        file_writer.done().await?;
239
240        Ok(upload_id)
241    }
242
243    pub(crate) async fn verify_upload_id(&self, cred: Option<&Credentials>, upload_id: &Uuid) -> Result<bool> {
244        let upload_info_path = self.get_upload_info_path(upload_id)?;
245        if upload_info_path.exists().not() {
246            return Ok(false);
247        }
248
249        let content = fs::read(&upload_info_path).await?;
250        let ak: Option<String> = serde_json::from_slice(&content)?;
251
252        Ok(ak.as_deref() == cred.map(|c| c.access_key.as_str()))
253    }
254
255    pub(crate) async fn delete_upload_id(&self, upload_id: &Uuid) -> Result<()> {
256        let upload_info_path = self.get_upload_info_path(upload_id)?;
257        if upload_info_path.exists() {
258            fs::remove_file(&upload_info_path).await?;
259        }
260        Ok(())
261    }
262
263    /// Write to the filesystem atomically.
264    /// This is done by first writing to a temporary location and then moving the file.
265    pub(crate) async fn prepare_file_write<'a>(&self, path: &'a Path) -> Result<FileWriter<'a>> {
266        let tmp_name = format!(".tmp.{}.internal.part", self.tmp_file_counter.fetch_add(1, Ordering::SeqCst));
267        let tmp_path = self.resolve_abs_path(tmp_name)?;
268        let file = File::create(&tmp_path).await?;
269        let writer = BufWriter::new(file);
270        Ok(FileWriter {
271            tmp_path,
272            dest_path: path,
273            writer,
274            clean_tmp: true,
275        })
276    }
277}
278
279pub(crate) struct FileWriter<'a> {
280    tmp_path: PathBuf,
281    dest_path: &'a Path,
282    writer: BufWriter<File>,
283    clean_tmp: bool,
284}
285
286impl<'a> FileWriter<'a> {
287    pub(crate) fn tmp_path(&self) -> &Path {
288        &self.tmp_path
289    }
290
291    pub(crate) fn dest_path(&self) -> &'a Path {
292        self.dest_path
293    }
294
295    pub(crate) fn writer(&mut self) -> &mut BufWriter<File> {
296        &mut self.writer
297    }
298
299    pub(crate) async fn done(mut self) -> Result<()> {
300        if let Some(final_dir_path) = self.dest_path().parent() {
301            fs::create_dir_all(&final_dir_path).await?;
302        }
303
304        fs::rename(&self.tmp_path, self.dest_path()).await?;
305        self.clean_tmp = false;
306        Ok(())
307    }
308}
309
310impl Drop for FileWriter<'_> {
311    fn drop(&mut self) {
312        if self.clean_tmp {
313            let _ = std::fs::remove_file(&self.tmp_path);
314        }
315    }
316}