reddb_server/storage/backend/
local.rs1use super::{
4 AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
5 RemoteBackend,
6};
7use crate::crypto;
8use fs2::FileExt;
9use std::fs;
10use std::fs::OpenOptions;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, Ordering};
13
14pub struct LocalBackend;
17
18static LOCAL_UPLOAD_TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20fn local_version_for(path: &Path) -> Result<Option<BackendObjectVersion>, BackendError> {
21 if !path.exists() {
22 return Ok(None);
23 }
24 let bytes = fs::read(path)
25 .map_err(|e| BackendError::Transport(format!("read for version failed: {e}")))?;
26 let hash = hex::encode(crypto::sha256::sha256(&bytes));
27 Ok(Some(BackendObjectVersion::new(format!(
28 "sha256:{}:len:{}",
29 hash,
30 bytes.len()
31 ))))
32}
33
34fn lock_path_for(dest: &Path) -> PathBuf {
35 reddb_file::layout::local_cas_lock_path(dest)
36}
37
38pub(crate) fn local_cas_lock_path_for(dest: &Path) -> PathBuf {
39 lock_path_for(dest)
40}
41
42impl RemoteBackend for LocalBackend {
43 fn name(&self) -> &str {
44 "local"
45 }
46
47 fn download(&self, remote_key: &str, local_path: &Path) -> Result<bool, BackendError> {
48 reddb_file::local_backend_download(remote_key, local_path)
49 .map_err(|e| BackendError::Transport(format!("local download failed: {e}")))
50 }
51
52 fn upload(&self, local_path: &Path, remote_key: &str) -> Result<(), BackendError> {
53 let unique = LOCAL_UPLOAD_TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
54 reddb_file::local_backend_atomic_upload(local_path, remote_key, std::process::id(), unique)
55 .map_err(|e| BackendError::Transport(format!("local upload failed: {e}")))
56 }
57
58 fn exists(&self, remote_key: &str) -> Result<bool, BackendError> {
59 Ok(Path::new(remote_key).exists())
60 }
61
62 fn delete(&self, remote_key: &str) -> Result<(), BackendError> {
63 let path = Path::new(remote_key);
64 if path.exists() {
65 fs::remove_file(path)
66 .map_err(|e| BackendError::Transport(format!("delete failed: {e}")))?;
67 }
68 Ok(())
69 }
70
71 fn list(&self, prefix: &str) -> Result<Vec<String>, BackendError> {
72 let prefix_path = Path::new(prefix);
73 let mut results = Vec::new();
74
75 if prefix_path.is_dir() {
76 fn walk(dir: &Path, out: &mut Vec<String>) -> Result<(), BackendError> {
77 for entry in fs::read_dir(dir)
78 .map_err(|e| BackendError::Transport(format!("read_dir failed: {e}")))?
79 {
80 let entry = entry
81 .map_err(|e| BackendError::Transport(format!("dir entry failed: {e}")))?;
82 let path = entry.path();
83 if path.is_dir() {
84 walk(&path, out)?;
85 } else if path.is_file() {
86 out.push(path.to_string_lossy().to_string());
87 }
88 }
89 Ok(())
90 }
91
92 walk(prefix_path, &mut results)?;
93 } else {
94 let parent = prefix_path.parent().unwrap_or_else(|| Path::new("."));
95 let needle = prefix_path.to_string_lossy().to_string();
96 if parent.exists() {
97 for entry in fs::read_dir(parent)
98 .map_err(|e| BackendError::Transport(format!("read_dir failed: {e}")))?
99 {
100 let entry = entry
101 .map_err(|e| BackendError::Transport(format!("dir entry failed: {e}")))?;
102 let path = entry.path();
103 if path.is_file() {
104 let candidate = path.to_string_lossy().to_string();
105 if candidate.starts_with(&needle) {
106 results.push(candidate);
107 }
108 }
109 }
110 }
111 }
112
113 results.sort();
114 Ok(results)
115 }
116}
117
118impl AtomicRemoteBackend for LocalBackend {
119 fn object_version(
120 &self,
121 remote_key: &str,
122 ) -> Result<Option<BackendObjectVersion>, BackendError> {
123 local_version_for(Path::new(remote_key))
124 }
125
126 fn upload_conditional(
127 &self,
128 local_path: &Path,
129 remote_key: &str,
130 condition: ConditionalPut,
131 ) -> Result<BackendObjectVersion, BackendError> {
132 let dest = Path::new(remote_key);
133 if let Some(parent) = dest.parent() {
134 fs::create_dir_all(parent)
135 .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
136 }
137 let lock_path = lock_path_for(dest);
138 let lock_file = OpenOptions::new()
139 .create(true)
140 .truncate(false)
141 .read(true)
142 .write(true)
143 .open(&lock_path)
144 .map_err(|e| BackendError::Transport(format!("open CAS lock failed: {e}")))?;
145 lock_file
146 .lock_exclusive()
147 .map_err(|e| BackendError::Transport(format!("lock CAS file failed: {e}")))?;
148
149 let observed = local_version_for(dest)?;
150 let allowed = match (&condition, &observed) {
151 (ConditionalPut::IfAbsent, None) => true,
152 (ConditionalPut::IfAbsent, Some(_)) => false,
153 (ConditionalPut::IfVersion(expected), Some(actual)) => expected == actual,
154 (ConditionalPut::IfVersion(_), None) => false,
155 };
156 if !allowed {
157 let _ = lock_file.unlock();
158 return Err(BackendError::PreconditionFailed(format!(
159 "local object '{}' changed before conditional upload",
160 remote_key
161 )));
162 }
163
164 let upload_result = self.upload(local_path, remote_key);
165 let version_result = upload_result.and_then(|_| {
166 local_version_for(dest)?.ok_or_else(|| {
167 BackendError::Internal(format!(
168 "local object '{}' missing after conditional upload",
169 remote_key
170 ))
171 })
172 });
173 let unlock_result = lock_file
174 .unlock()
175 .map_err(|e| BackendError::Transport(format!("unlock CAS file failed: {e}")));
176 match (version_result, unlock_result) {
177 (Ok(version), Ok(())) => Ok(version),
178 (Err(err), _) => Err(err),
179 (Ok(_), Err(err)) => Err(err),
180 }
181 }
182
183 fn delete_conditional(
184 &self,
185 remote_key: &str,
186 condition: ConditionalDelete,
187 ) -> Result<(), BackendError> {
188 let dest = Path::new(remote_key);
189 if let Some(parent) = dest.parent() {
190 fs::create_dir_all(parent)
191 .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
192 }
193 let lock_path = lock_path_for(dest);
194 let lock_file = OpenOptions::new()
195 .create(true)
196 .truncate(false)
197 .read(true)
198 .write(true)
199 .open(&lock_path)
200 .map_err(|e| BackendError::Transport(format!("open CAS lock failed: {e}")))?;
201 lock_file
202 .lock_exclusive()
203 .map_err(|e| BackendError::Transport(format!("lock CAS file failed: {e}")))?;
204
205 let observed = local_version_for(dest)?;
206 let allowed = match (&condition, &observed) {
207 (ConditionalDelete::IfVersion(expected), Some(actual)) => expected == actual,
208 (ConditionalDelete::IfVersion(_), None) => false,
209 };
210 if !allowed {
211 let _ = lock_file.unlock();
212 return Err(BackendError::PreconditionFailed(format!(
213 "local object '{}' changed before conditional delete",
214 remote_key
215 )));
216 }
217
218 let delete_result = self.delete(remote_key);
219 let unlock_result = lock_file
220 .unlock()
221 .map_err(|e| BackendError::Transport(format!("unlock CAS file failed: {e}")));
222 match (delete_result, unlock_result) {
223 (Ok(()), Ok(())) => Ok(()),
224 (Err(err), _) => Err(err),
225 (Ok(()), Err(err)) => Err(err),
226 }
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 fn temp_file(name: &str) -> PathBuf {
235 std::env::temp_dir().join(format!(
236 "reddb-local-backend-test-{}-{}-{}",
237 name,
238 std::process::id(),
239 crate::utils::now_unix_nanos()
240 ))
241 }
242
243 #[test]
244 fn conditional_create_if_absent_rejects_existing_object() {
245 let backend = LocalBackend;
246 let src = temp_file("src");
247 let remote = temp_file("remote");
248 fs::write(&src, b"first").unwrap();
249
250 let first = backend
251 .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
252 .unwrap();
253 assert!(first.token.contains("sha256:"));
254
255 fs::write(&src, b"second").unwrap();
256 let err = backend
257 .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
258 .unwrap_err();
259 assert!(matches!(err, BackendError::PreconditionFailed(_)));
260
261 let _ = fs::remove_file(src);
262 let _ = fs::remove_file(remote);
263 }
264
265 #[test]
266 fn conditional_replace_rejects_stale_version() {
267 let backend = LocalBackend;
268 let src = temp_file("src");
269 let remote = temp_file("remote");
270 fs::write(&src, b"first").unwrap();
271 let stale = backend
272 .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
273 .unwrap();
274
275 fs::write(&src, b"second").unwrap();
276 let fresh = backend
277 .upload_conditional(
278 &src,
279 remote.to_str().unwrap(),
280 ConditionalPut::IfVersion(stale.clone()),
281 )
282 .unwrap();
283
284 fs::write(&src, b"third").unwrap();
285 let err = backend
286 .upload_conditional(
287 &src,
288 remote.to_str().unwrap(),
289 ConditionalPut::IfVersion(stale),
290 )
291 .unwrap_err();
292 assert!(matches!(err, BackendError::PreconditionFailed(_)));
293 assert_eq!(
294 fresh.token,
295 backend
296 .object_version(remote.to_str().unwrap())
297 .unwrap()
298 .unwrap()
299 .token
300 );
301
302 let _ = fs::remove_file(src);
303 let _ = fs::remove_file(remote);
304 }
305
306 #[test]
307 fn conditional_delete_rejects_stale_version() {
308 let backend = LocalBackend;
309 let src = temp_file("src");
310 let remote = temp_file("remote");
311 fs::write(&src, b"first").unwrap();
312 let stale = backend
313 .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
314 .unwrap();
315
316 fs::write(&src, b"second").unwrap();
317 let fresh = backend
318 .upload_conditional(
319 &src,
320 remote.to_str().unwrap(),
321 ConditionalPut::IfVersion(stale.clone()),
322 )
323 .unwrap();
324
325 let err = backend
326 .delete_conditional(
327 remote.to_str().unwrap(),
328 ConditionalDelete::IfVersion(stale),
329 )
330 .unwrap_err();
331 assert!(matches!(err, BackendError::PreconditionFailed(_)));
332 backend
333 .delete_conditional(
334 remote.to_str().unwrap(),
335 ConditionalDelete::IfVersion(fresh),
336 )
337 .unwrap();
338 assert!(!remote.exists());
339
340 let _ = fs::remove_file(src);
341 }
342}