Skip to main content

reddb_server/storage/backend/
local.rs

1//! Local filesystem backend (default).
2
3use super::{
4    AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
5    RemoteBackend,
6};
7use crate::crypto;
8use fs2::FileExt;
9use std::fs;
10use std::fs::OpenOptions;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, Ordering};
13
14/// Local filesystem backend. Copies files between paths.
15/// This is the default backend -- operates entirely on local disk.
16pub struct LocalBackend;
17
18static LOCAL_UPLOAD_TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20fn local_version_for(path: &Path) -> Result<Option<BackendObjectVersion>, BackendError> {
21    if !path.exists() {
22        return Ok(None);
23    }
24    let bytes = fs::read(path)
25        .map_err(|e| BackendError::Transport(format!("read for version failed: {e}")))?;
26    let hash = hex::encode(crypto::sha256::sha256(&bytes));
27    Ok(Some(BackendObjectVersion::new(format!(
28        "sha256:{}:len:{}",
29        hash,
30        bytes.len()
31    ))))
32}
33
34fn lock_path_for(dest: &Path) -> PathBuf {
35    let file_name = dest
36        .file_name()
37        .and_then(|name| name.to_str())
38        .unwrap_or("object");
39    dest.with_file_name(format!(".{file_name}.cas.lock"))
40}
41
42impl RemoteBackend for LocalBackend {
43    fn name(&self) -> &str {
44        "local"
45    }
46
47    fn download(&self, remote_key: &str, local_path: &Path) -> Result<bool, BackendError> {
48        let source = Path::new(remote_key);
49        if !source.exists() {
50            return Ok(false);
51        }
52        fs::copy(source, local_path)
53            .map_err(|e| BackendError::Transport(format!("copy failed: {e}")))?;
54        Ok(true)
55    }
56
57    fn upload(&self, local_path: &Path, remote_key: &str) -> Result<(), BackendError> {
58        let dest = Path::new(remote_key);
59        if let Some(parent) = dest.parent() {
60            fs::create_dir_all(parent)
61                .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
62        }
63        let file_name = dest
64            .file_name()
65            .and_then(|name| name.to_str())
66            .unwrap_or("object");
67        let unique = LOCAL_UPLOAD_TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
68        let temp = dest.with_file_name(format!(".{file_name}.tmp-{}-{unique}", std::process::id()));
69
70        let copy_result = fs::copy(local_path, &temp)
71            .map_err(|e| BackendError::Transport(format!("copy failed: {e}")));
72        if let Err(err) = copy_result {
73            let _ = fs::remove_file(&temp);
74            return Err(err);
75        }
76        fs::File::open(&temp)
77            .and_then(|file| file.sync_all())
78            .map_err(|e| BackendError::Transport(format!("sync failed: {e}")))?;
79        fs::rename(&temp, dest).map_err(|e| {
80            let _ = fs::remove_file(&temp);
81            BackendError::Transport(format!("rename failed: {e}"))
82        })?;
83        if let Some(parent) = dest.parent() {
84            let _ = fs::File::open(parent).and_then(|dir| dir.sync_all());
85        }
86        Ok(())
87    }
88
89    fn exists(&self, remote_key: &str) -> Result<bool, BackendError> {
90        Ok(Path::new(remote_key).exists())
91    }
92
93    fn delete(&self, remote_key: &str) -> Result<(), BackendError> {
94        let path = Path::new(remote_key);
95        if path.exists() {
96            fs::remove_file(path)
97                .map_err(|e| BackendError::Transport(format!("delete failed: {e}")))?;
98        }
99        Ok(())
100    }
101
102    fn list(&self, prefix: &str) -> Result<Vec<String>, BackendError> {
103        let prefix_path = Path::new(prefix);
104        let mut results = Vec::new();
105
106        if prefix_path.is_dir() {
107            fn walk(dir: &Path, out: &mut Vec<String>) -> Result<(), BackendError> {
108                for entry in fs::read_dir(dir)
109                    .map_err(|e| BackendError::Transport(format!("read_dir failed: {e}")))?
110                {
111                    let entry = entry
112                        .map_err(|e| BackendError::Transport(format!("dir entry failed: {e}")))?;
113                    let path = entry.path();
114                    if path.is_dir() {
115                        walk(&path, out)?;
116                    } else if path.is_file() {
117                        out.push(path.to_string_lossy().to_string());
118                    }
119                }
120                Ok(())
121            }
122
123            walk(prefix_path, &mut results)?;
124        } else {
125            let parent = prefix_path.parent().unwrap_or_else(|| Path::new("."));
126            let needle = prefix_path.to_string_lossy().to_string();
127            if parent.exists() {
128                for entry in fs::read_dir(parent)
129                    .map_err(|e| BackendError::Transport(format!("read_dir failed: {e}")))?
130                {
131                    let entry = entry
132                        .map_err(|e| BackendError::Transport(format!("dir entry failed: {e}")))?;
133                    let path = entry.path();
134                    if path.is_file() {
135                        let candidate = path.to_string_lossy().to_string();
136                        if candidate.starts_with(&needle) {
137                            results.push(candidate);
138                        }
139                    }
140                }
141            }
142        }
143
144        results.sort();
145        Ok(results)
146    }
147}
148
149impl AtomicRemoteBackend for LocalBackend {
150    fn object_version(
151        &self,
152        remote_key: &str,
153    ) -> Result<Option<BackendObjectVersion>, BackendError> {
154        local_version_for(Path::new(remote_key))
155    }
156
157    fn upload_conditional(
158        &self,
159        local_path: &Path,
160        remote_key: &str,
161        condition: ConditionalPut,
162    ) -> Result<BackendObjectVersion, BackendError> {
163        let dest = Path::new(remote_key);
164        if let Some(parent) = dest.parent() {
165            fs::create_dir_all(parent)
166                .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
167        }
168        let lock_path = lock_path_for(dest);
169        let lock_file = OpenOptions::new()
170            .create(true)
171            .truncate(false)
172            .read(true)
173            .write(true)
174            .open(&lock_path)
175            .map_err(|e| BackendError::Transport(format!("open CAS lock failed: {e}")))?;
176        lock_file
177            .lock_exclusive()
178            .map_err(|e| BackendError::Transport(format!("lock CAS file failed: {e}")))?;
179
180        let observed = local_version_for(dest)?;
181        let allowed = match (&condition, &observed) {
182            (ConditionalPut::IfAbsent, None) => true,
183            (ConditionalPut::IfAbsent, Some(_)) => false,
184            (ConditionalPut::IfVersion(expected), Some(actual)) => expected == actual,
185            (ConditionalPut::IfVersion(_), None) => false,
186        };
187        if !allowed {
188            let _ = lock_file.unlock();
189            return Err(BackendError::PreconditionFailed(format!(
190                "local object '{}' changed before conditional upload",
191                remote_key
192            )));
193        }
194
195        let upload_result = self.upload(local_path, remote_key);
196        let version_result = upload_result.and_then(|_| {
197            local_version_for(dest)?.ok_or_else(|| {
198                BackendError::Internal(format!(
199                    "local object '{}' missing after conditional upload",
200                    remote_key
201                ))
202            })
203        });
204        let unlock_result = lock_file
205            .unlock()
206            .map_err(|e| BackendError::Transport(format!("unlock CAS file failed: {e}")));
207        match (version_result, unlock_result) {
208            (Ok(version), Ok(())) => Ok(version),
209            (Err(err), _) => Err(err),
210            (Ok(_), Err(err)) => Err(err),
211        }
212    }
213
214    fn delete_conditional(
215        &self,
216        remote_key: &str,
217        condition: ConditionalDelete,
218    ) -> Result<(), BackendError> {
219        let dest = Path::new(remote_key);
220        if let Some(parent) = dest.parent() {
221            fs::create_dir_all(parent)
222                .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
223        }
224        let lock_path = lock_path_for(dest);
225        let lock_file = OpenOptions::new()
226            .create(true)
227            .truncate(false)
228            .read(true)
229            .write(true)
230            .open(&lock_path)
231            .map_err(|e| BackendError::Transport(format!("open CAS lock failed: {e}")))?;
232        lock_file
233            .lock_exclusive()
234            .map_err(|e| BackendError::Transport(format!("lock CAS file failed: {e}")))?;
235
236        let observed = local_version_for(dest)?;
237        let allowed = match (&condition, &observed) {
238            (ConditionalDelete::IfVersion(expected), Some(actual)) => expected == actual,
239            (ConditionalDelete::IfVersion(_), None) => false,
240        };
241        if !allowed {
242            let _ = lock_file.unlock();
243            return Err(BackendError::PreconditionFailed(format!(
244                "local object '{}' changed before conditional delete",
245                remote_key
246            )));
247        }
248
249        let delete_result = self.delete(remote_key);
250        let unlock_result = lock_file
251            .unlock()
252            .map_err(|e| BackendError::Transport(format!("unlock CAS file failed: {e}")));
253        match (delete_result, unlock_result) {
254            (Ok(()), Ok(())) => Ok(()),
255            (Err(err), _) => Err(err),
256            (Ok(()), Err(err)) => Err(err),
257        }
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    fn temp_file(name: &str) -> PathBuf {
266        std::env::temp_dir().join(format!(
267            "reddb-local-backend-test-{}-{}-{}",
268            name,
269            std::process::id(),
270            crate::utils::now_unix_nanos()
271        ))
272    }
273
274    #[test]
275    fn conditional_create_if_absent_rejects_existing_object() {
276        let backend = LocalBackend;
277        let src = temp_file("src");
278        let remote = temp_file("remote");
279        fs::write(&src, b"first").unwrap();
280
281        let first = backend
282            .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
283            .unwrap();
284        assert!(first.token.contains("sha256:"));
285
286        fs::write(&src, b"second").unwrap();
287        let err = backend
288            .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
289            .unwrap_err();
290        assert!(matches!(err, BackendError::PreconditionFailed(_)));
291
292        let _ = fs::remove_file(src);
293        let _ = fs::remove_file(remote);
294    }
295
296    #[test]
297    fn conditional_replace_rejects_stale_version() {
298        let backend = LocalBackend;
299        let src = temp_file("src");
300        let remote = temp_file("remote");
301        fs::write(&src, b"first").unwrap();
302        let stale = backend
303            .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
304            .unwrap();
305
306        fs::write(&src, b"second").unwrap();
307        let fresh = backend
308            .upload_conditional(
309                &src,
310                remote.to_str().unwrap(),
311                ConditionalPut::IfVersion(stale.clone()),
312            )
313            .unwrap();
314
315        fs::write(&src, b"third").unwrap();
316        let err = backend
317            .upload_conditional(
318                &src,
319                remote.to_str().unwrap(),
320                ConditionalPut::IfVersion(stale),
321            )
322            .unwrap_err();
323        assert!(matches!(err, BackendError::PreconditionFailed(_)));
324        assert_eq!(
325            fresh.token,
326            backend
327                .object_version(remote.to_str().unwrap())
328                .unwrap()
329                .unwrap()
330                .token
331        );
332
333        let _ = fs::remove_file(src);
334        let _ = fs::remove_file(remote);
335    }
336
337    #[test]
338    fn conditional_delete_rejects_stale_version() {
339        let backend = LocalBackend;
340        let src = temp_file("src");
341        let remote = temp_file("remote");
342        fs::write(&src, b"first").unwrap();
343        let stale = backend
344            .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
345            .unwrap();
346
347        fs::write(&src, b"second").unwrap();
348        let fresh = backend
349            .upload_conditional(
350                &src,
351                remote.to_str().unwrap(),
352                ConditionalPut::IfVersion(stale.clone()),
353            )
354            .unwrap();
355
356        let err = backend
357            .delete_conditional(
358                remote.to_str().unwrap(),
359                ConditionalDelete::IfVersion(stale),
360            )
361            .unwrap_err();
362        assert!(matches!(err, BackendError::PreconditionFailed(_)));
363        backend
364            .delete_conditional(
365                remote.to_str().unwrap(),
366                ConditionalDelete::IfVersion(fresh),
367            )
368            .unwrap();
369        assert!(!remote.exists());
370
371        let _ = fs::remove_file(src);
372    }
373}