1use crate::error::*;
2use crate::utils::hex;
3
4use s3s::auth::Credentials;
5use s3s::dto;
6
7use std::env;
8use std::ops::Not;
9use std::path::{Path, PathBuf};
10use std::sync::atomic::{AtomicU64, Ordering};
11
12use tokio::fs;
13use tokio::fs::File;
14use tokio::io::{AsyncReadExt, BufWriter};
15
16use md5::{Digest, Md5};
17use path_absolutize::Absolutize;
18use s3s::dto::PartNumber;
19use uuid::Uuid;
20
21#[derive(Debug)]
22pub struct FileSystem {
23 pub(crate) root: PathBuf,
24 tmp_file_counter: AtomicU64,
25}
26
27pub(crate) type InternalInfo = serde_json::Map<String, serde_json::Value>;
28
29fn clean_old_tmp_files(root: &Path) -> std::io::Result<()> {
30 let entries = match std::fs::read_dir(root) {
31 Ok(entries) => Ok(entries),
32 Err(ref io_err) if io_err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
33 Err(io_err) => Err(io_err),
34 }?;
35 for entry in entries {
36 let entry = entry?;
37 let file_name = entry.file_name();
38 let Some(file_name) = file_name.to_str() else { continue };
39 if file_name.starts_with(".tmp.") && file_name.ends_with(".internal.part") {
41 std::fs::remove_file(entry.path())?;
42 }
43 }
44 Ok(())
45}
46
47impl FileSystem {
48 pub fn new(root: impl AsRef<Path>) -> Result<Self> {
49 let root = env::current_dir()?.join(root).canonicalize()?;
50 clean_old_tmp_files(&root)?;
51 let tmp_file_counter = AtomicU64::new(0);
52 Ok(Self { root, tmp_file_counter })
53 }
54
55 pub(crate) fn resolve_abs_path(&self, path: impl AsRef<Path>) -> Result<PathBuf> {
56 Ok(path.as_ref().absolutize_virtually(&self.root)?.into_owned())
57 }
58
59 pub(crate) fn resolve_upload_part_path(&self, upload_id: Uuid, part_number: PartNumber) -> Result<PathBuf> {
60 self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))
61 }
62
63 pub(crate) fn get_object_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
65 let dir = Path::new(&bucket);
66 let file_path = Path::new(&key);
67 self.resolve_abs_path(dir.join(file_path))
68 }
69
70 pub(crate) fn get_bucket_path(&self, bucket: &str) -> Result<PathBuf> {
72 let dir = Path::new(&bucket);
73 self.resolve_abs_path(dir)
74 }
75
76 pub(crate) fn get_metadata_path(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<PathBuf> {
78 let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s);
79 let u_ext = upload_id.map(|u| format!(".upload-{u}")).unwrap_or_default();
80 let file_path = format!(".bucket-{}.object-{}{u_ext}.metadata.json", encode(bucket), encode(key));
81 self.resolve_abs_path(file_path)
82 }
83
84 pub(crate) fn get_internal_info_path(&self, bucket: &str, key: &str) -> Result<PathBuf> {
85 let encode = |s: &str| base64_simd::URL_SAFE_NO_PAD.encode_to_string(s);
86 let file_path = format!(".bucket-{}.object-{}.internal.json", encode(bucket), encode(key));
87 self.resolve_abs_path(file_path)
88 }
89
90 pub(crate) async fn load_metadata(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<Option<dto::Metadata>> {
92 let path = self.get_metadata_path(bucket, key, upload_id)?;
93 if path.exists().not() {
94 return Ok(None);
95 }
96 let content = fs::read(&path).await?;
97 let map = serde_json::from_slice(&content)?;
98 Ok(Some(map))
99 }
100
101 pub(crate) async fn save_metadata(
103 &self,
104 bucket: &str,
105 key: &str,
106 metadata: &dto::Metadata,
107 upload_id: Option<Uuid>,
108 ) -> Result<()> {
109 let path = self.get_metadata_path(bucket, key, upload_id)?;
110 let content = serde_json::to_vec(metadata)?;
111 fs::write(&path, &content).await?;
112 Ok(())
113 }
114
115 pub(crate) fn delete_metadata(&self, bucket: &str, key: &str, upload_id: Option<Uuid>) -> Result<()> {
117 let path = self.get_metadata_path(bucket, key, upload_id)?;
118 std::fs::remove_file(path)?;
119 Ok(())
120 }
121
122 pub(crate) async fn load_internal_info(&self, bucket: &str, key: &str) -> Result<Option<InternalInfo>> {
123 let path = self.get_internal_info_path(bucket, key)?;
124 if path.exists().not() {
125 return Ok(None);
126 }
127 let content = fs::read(&path).await?;
128 let map = serde_json::from_slice(&content)?;
129 Ok(Some(map))
130 }
131
132 pub(crate) async fn save_internal_info(&self, bucket: &str, key: &str, info: &InternalInfo) -> Result<()> {
133 let path = self.get_internal_info_path(bucket, key)?;
134 let content = serde_json::to_vec(info)?;
135 fs::write(&path, &content).await?;
136 Ok(())
137 }
138
139 pub(crate) async fn get_md5_sum(&self, bucket: &str, key: &str) -> Result<String> {
141 let object_path = self.get_object_path(bucket, key)?;
142 let mut file = File::open(&object_path).await?;
143 let mut buf = vec![0; 65536];
144 let mut md5_hash = Md5::new();
145 loop {
146 let nread = file.read(&mut buf).await?;
147 if nread == 0 {
148 break;
149 }
150 md5_hash.update(&buf[..nread]);
151 }
152 Ok(hex(md5_hash.finalize()))
153 }
154
155 fn get_upload_info_path(&self, upload_id: &Uuid) -> Result<PathBuf> {
156 self.resolve_abs_path(format!(".upload-{upload_id}.json"))
157 }
158
159 pub(crate) async fn create_upload_id(&self, cred: Option<&Credentials>) -> Result<Uuid> {
160 let upload_id = Uuid::new_v4();
161 let upload_info_path = self.get_upload_info_path(&upload_id)?;
162
163 let ak: Option<&str> = cred.map(|c| c.access_key.as_str());
164
165 let content = serde_json::to_vec(&ak)?;
166 fs::write(&upload_info_path, &content).await?;
167
168 Ok(upload_id)
169 }
170
171 pub(crate) async fn verify_upload_id(&self, cred: Option<&Credentials>, upload_id: &Uuid) -> Result<bool> {
172 let upload_info_path = self.get_upload_info_path(upload_id)?;
173 if upload_info_path.exists().not() {
174 return Ok(false);
175 }
176
177 let content = fs::read(&upload_info_path).await?;
178 let ak: Option<String> = serde_json::from_slice(&content)?;
179
180 Ok(ak.as_deref() == cred.map(|c| c.access_key.as_str()))
181 }
182
183 pub(crate) async fn delete_upload_id(&self, upload_id: &Uuid) -> Result<()> {
184 let upload_info_path = self.get_upload_info_path(upload_id)?;
185 if upload_info_path.exists() {
186 fs::remove_file(&upload_info_path).await?;
187 }
188 Ok(())
189 }
190
191 pub(crate) async fn prepare_file_write<'a>(&self, path: &'a Path) -> Result<FileWriter<'a>> {
194 let tmp_name = format!(".tmp.{}.internal.part", self.tmp_file_counter.fetch_add(1, Ordering::SeqCst));
195 let tmp_path = self.resolve_abs_path(tmp_name)?;
196 let file = File::create(&tmp_path).await?;
197 let writer = BufWriter::new(file);
198 Ok(FileWriter {
199 tmp_path,
200 dest_path: path,
201 writer,
202 clean_tmp: true,
203 })
204 }
205}
206
207pub(crate) struct FileWriter<'a> {
208 tmp_path: PathBuf,
209 dest_path: &'a Path,
210 writer: BufWriter<File>,
211 clean_tmp: bool,
212}
213
214impl<'a> FileWriter<'a> {
215 pub(crate) fn tmp_path(&self) -> &Path {
216 &self.tmp_path
217 }
218
219 pub(crate) fn dest_path(&self) -> &'a Path {
220 self.dest_path
221 }
222
223 pub(crate) fn writer(&mut self) -> &mut BufWriter<File> {
224 &mut self.writer
225 }
226
227 pub(crate) async fn done(mut self) -> Result<()> {
228 if let Some(final_dir_path) = self.dest_path().parent() {
229 fs::create_dir_all(&final_dir_path).await?;
230 }
231
232 fs::rename(&self.tmp_path, self.dest_path()).await?;
233 self.clean_tmp = false;
234 Ok(())
235 }
236}
237
238impl Drop for FileWriter<'_> {
239 fn drop(&mut self) {
240 if self.clean_tmp {
241 let _ = std::fs::remove_file(&self.tmp_path);
242 }
243 }
244}