reddb_server/storage/backend/
local.rs1use super::{
4 AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
5 RemoteBackend,
6};
7use crate::crypto;
8use fs2::FileExt;
9use std::fs;
10use std::fs::OpenOptions;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, Ordering};
13
14pub struct LocalBackend;
17
18static LOCAL_UPLOAD_TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20fn local_version_for(path: &Path) -> Result<Option<BackendObjectVersion>, BackendError> {
21 if !path.exists() {
22 return Ok(None);
23 }
24 let bytes = fs::read(path)
25 .map_err(|e| BackendError::Transport(format!("read for version failed: {e}")))?;
26 let hash = hex::encode(crypto::sha256::sha256(&bytes));
27 Ok(Some(BackendObjectVersion::new(format!(
28 "sha256:{}:len:{}",
29 hash,
30 bytes.len()
31 ))))
32}
33
34fn lock_path_for(dest: &Path) -> PathBuf {
35 let file_name = dest
36 .file_name()
37 .and_then(|name| name.to_str())
38 .unwrap_or("object");
39 dest.with_file_name(format!(".{file_name}.cas.lock"))
40}
41
42impl RemoteBackend for LocalBackend {
43 fn name(&self) -> &str {
44 "local"
45 }
46
47 fn download(&self, remote_key: &str, local_path: &Path) -> Result<bool, BackendError> {
48 let source = Path::new(remote_key);
49 if !source.exists() {
50 return Ok(false);
51 }
52 fs::copy(source, local_path)
53 .map_err(|e| BackendError::Transport(format!("copy failed: {e}")))?;
54 Ok(true)
55 }
56
57 fn upload(&self, local_path: &Path, remote_key: &str) -> Result<(), BackendError> {
58 let dest = Path::new(remote_key);
59 if let Some(parent) = dest.parent() {
60 fs::create_dir_all(parent)
61 .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
62 }
63 let file_name = dest
64 .file_name()
65 .and_then(|name| name.to_str())
66 .unwrap_or("object");
67 let unique = LOCAL_UPLOAD_TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
68 let temp = dest.with_file_name(format!(".{file_name}.tmp-{}-{unique}", std::process::id()));
69
70 let copy_result = fs::copy(local_path, &temp)
71 .map_err(|e| BackendError::Transport(format!("copy failed: {e}")));
72 if let Err(err) = copy_result {
73 let _ = fs::remove_file(&temp);
74 return Err(err);
75 }
76 fs::File::open(&temp)
77 .and_then(|file| file.sync_all())
78 .map_err(|e| BackendError::Transport(format!("sync failed: {e}")))?;
79 fs::rename(&temp, dest).map_err(|e| {
80 let _ = fs::remove_file(&temp);
81 BackendError::Transport(format!("rename failed: {e}"))
82 })?;
83 if let Some(parent) = dest.parent() {
84 let _ = fs::File::open(parent).and_then(|dir| dir.sync_all());
85 }
86 Ok(())
87 }
88
89 fn exists(&self, remote_key: &str) -> Result<bool, BackendError> {
90 Ok(Path::new(remote_key).exists())
91 }
92
93 fn delete(&self, remote_key: &str) -> Result<(), BackendError> {
94 let path = Path::new(remote_key);
95 if path.exists() {
96 fs::remove_file(path)
97 .map_err(|e| BackendError::Transport(format!("delete failed: {e}")))?;
98 }
99 Ok(())
100 }
101
102 fn list(&self, prefix: &str) -> Result<Vec<String>, BackendError> {
103 let prefix_path = Path::new(prefix);
104 let mut results = Vec::new();
105
106 if prefix_path.is_dir() {
107 fn walk(dir: &Path, out: &mut Vec<String>) -> Result<(), BackendError> {
108 for entry in fs::read_dir(dir)
109 .map_err(|e| BackendError::Transport(format!("read_dir failed: {e}")))?
110 {
111 let entry = entry
112 .map_err(|e| BackendError::Transport(format!("dir entry failed: {e}")))?;
113 let path = entry.path();
114 if path.is_dir() {
115 walk(&path, out)?;
116 } else if path.is_file() {
117 out.push(path.to_string_lossy().to_string());
118 }
119 }
120 Ok(())
121 }
122
123 walk(prefix_path, &mut results)?;
124 } else {
125 let parent = prefix_path.parent().unwrap_or_else(|| Path::new("."));
126 let needle = prefix_path.to_string_lossy().to_string();
127 if parent.exists() {
128 for entry in fs::read_dir(parent)
129 .map_err(|e| BackendError::Transport(format!("read_dir failed: {e}")))?
130 {
131 let entry = entry
132 .map_err(|e| BackendError::Transport(format!("dir entry failed: {e}")))?;
133 let path = entry.path();
134 if path.is_file() {
135 let candidate = path.to_string_lossy().to_string();
136 if candidate.starts_with(&needle) {
137 results.push(candidate);
138 }
139 }
140 }
141 }
142 }
143
144 results.sort();
145 Ok(results)
146 }
147}
148
149impl AtomicRemoteBackend for LocalBackend {
150 fn object_version(
151 &self,
152 remote_key: &str,
153 ) -> Result<Option<BackendObjectVersion>, BackendError> {
154 local_version_for(Path::new(remote_key))
155 }
156
157 fn upload_conditional(
158 &self,
159 local_path: &Path,
160 remote_key: &str,
161 condition: ConditionalPut,
162 ) -> Result<BackendObjectVersion, BackendError> {
163 let dest = Path::new(remote_key);
164 if let Some(parent) = dest.parent() {
165 fs::create_dir_all(parent)
166 .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
167 }
168 let lock_path = lock_path_for(dest);
169 let lock_file = OpenOptions::new()
170 .create(true)
171 .truncate(false)
172 .read(true)
173 .write(true)
174 .open(&lock_path)
175 .map_err(|e| BackendError::Transport(format!("open CAS lock failed: {e}")))?;
176 lock_file
177 .lock_exclusive()
178 .map_err(|e| BackendError::Transport(format!("lock CAS file failed: {e}")))?;
179
180 let observed = local_version_for(dest)?;
181 let allowed = match (&condition, &observed) {
182 (ConditionalPut::IfAbsent, None) => true,
183 (ConditionalPut::IfAbsent, Some(_)) => false,
184 (ConditionalPut::IfVersion(expected), Some(actual)) => expected == actual,
185 (ConditionalPut::IfVersion(_), None) => false,
186 };
187 if !allowed {
188 let _ = lock_file.unlock();
189 return Err(BackendError::PreconditionFailed(format!(
190 "local object '{}' changed before conditional upload",
191 remote_key
192 )));
193 }
194
195 let upload_result = self.upload(local_path, remote_key);
196 let version_result = upload_result.and_then(|_| {
197 local_version_for(dest)?.ok_or_else(|| {
198 BackendError::Internal(format!(
199 "local object '{}' missing after conditional upload",
200 remote_key
201 ))
202 })
203 });
204 let unlock_result = lock_file
205 .unlock()
206 .map_err(|e| BackendError::Transport(format!("unlock CAS file failed: {e}")));
207 match (version_result, unlock_result) {
208 (Ok(version), Ok(())) => Ok(version),
209 (Err(err), _) => Err(err),
210 (Ok(_), Err(err)) => Err(err),
211 }
212 }
213
214 fn delete_conditional(
215 &self,
216 remote_key: &str,
217 condition: ConditionalDelete,
218 ) -> Result<(), BackendError> {
219 let dest = Path::new(remote_key);
220 if let Some(parent) = dest.parent() {
221 fs::create_dir_all(parent)
222 .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
223 }
224 let lock_path = lock_path_for(dest);
225 let lock_file = OpenOptions::new()
226 .create(true)
227 .truncate(false)
228 .read(true)
229 .write(true)
230 .open(&lock_path)
231 .map_err(|e| BackendError::Transport(format!("open CAS lock failed: {e}")))?;
232 lock_file
233 .lock_exclusive()
234 .map_err(|e| BackendError::Transport(format!("lock CAS file failed: {e}")))?;
235
236 let observed = local_version_for(dest)?;
237 let allowed = match (&condition, &observed) {
238 (ConditionalDelete::IfVersion(expected), Some(actual)) => expected == actual,
239 (ConditionalDelete::IfVersion(_), None) => false,
240 };
241 if !allowed {
242 let _ = lock_file.unlock();
243 return Err(BackendError::PreconditionFailed(format!(
244 "local object '{}' changed before conditional delete",
245 remote_key
246 )));
247 }
248
249 let delete_result = self.delete(remote_key);
250 let unlock_result = lock_file
251 .unlock()
252 .map_err(|e| BackendError::Transport(format!("unlock CAS file failed: {e}")));
253 match (delete_result, unlock_result) {
254 (Ok(()), Ok(())) => Ok(()),
255 (Err(err), _) => Err(err),
256 (Ok(()), Err(err)) => Err(err),
257 }
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 fn temp_file(name: &str) -> PathBuf {
266 std::env::temp_dir().join(format!(
267 "reddb-local-backend-test-{}-{}-{}",
268 name,
269 std::process::id(),
270 crate::utils::now_unix_nanos()
271 ))
272 }
273
274 #[test]
275 fn conditional_create_if_absent_rejects_existing_object() {
276 let backend = LocalBackend;
277 let src = temp_file("src");
278 let remote = temp_file("remote");
279 fs::write(&src, b"first").unwrap();
280
281 let first = backend
282 .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
283 .unwrap();
284 assert!(first.token.contains("sha256:"));
285
286 fs::write(&src, b"second").unwrap();
287 let err = backend
288 .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
289 .unwrap_err();
290 assert!(matches!(err, BackendError::PreconditionFailed(_)));
291
292 let _ = fs::remove_file(src);
293 let _ = fs::remove_file(remote);
294 }
295
296 #[test]
297 fn conditional_replace_rejects_stale_version() {
298 let backend = LocalBackend;
299 let src = temp_file("src");
300 let remote = temp_file("remote");
301 fs::write(&src, b"first").unwrap();
302 let stale = backend
303 .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
304 .unwrap();
305
306 fs::write(&src, b"second").unwrap();
307 let fresh = backend
308 .upload_conditional(
309 &src,
310 remote.to_str().unwrap(),
311 ConditionalPut::IfVersion(stale.clone()),
312 )
313 .unwrap();
314
315 fs::write(&src, b"third").unwrap();
316 let err = backend
317 .upload_conditional(
318 &src,
319 remote.to_str().unwrap(),
320 ConditionalPut::IfVersion(stale),
321 )
322 .unwrap_err();
323 assert!(matches!(err, BackendError::PreconditionFailed(_)));
324 assert_eq!(
325 fresh.token,
326 backend
327 .object_version(remote.to_str().unwrap())
328 .unwrap()
329 .unwrap()
330 .token
331 );
332
333 let _ = fs::remove_file(src);
334 let _ = fs::remove_file(remote);
335 }
336
337 #[test]
338 fn conditional_delete_rejects_stale_version() {
339 let backend = LocalBackend;
340 let src = temp_file("src");
341 let remote = temp_file("remote");
342 fs::write(&src, b"first").unwrap();
343 let stale = backend
344 .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
345 .unwrap();
346
347 fs::write(&src, b"second").unwrap();
348 let fresh = backend
349 .upload_conditional(
350 &src,
351 remote.to_str().unwrap(),
352 ConditionalPut::IfVersion(stale.clone()),
353 )
354 .unwrap();
355
356 let err = backend
357 .delete_conditional(
358 remote.to_str().unwrap(),
359 ConditionalDelete::IfVersion(stale),
360 )
361 .unwrap_err();
362 assert!(matches!(err, BackendError::PreconditionFailed(_)));
363 backend
364 .delete_conditional(
365 remote.to_str().unwrap(),
366 ConditionalDelete::IfVersion(fresh),
367 )
368 .unwrap();
369 assert!(!remote.exists());
370
371 let _ = fs::remove_file(src);
372 }
373}