Skip to main content

reddb_server/storage/backend/
local.rs

1//! Local filesystem backend (default).
2
3use super::{
4    AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
5    RemoteBackend,
6};
7use crate::crypto;
8use fs2::FileExt;
9use std::fs;
10use std::fs::OpenOptions;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, Ordering};
13
14/// Local filesystem backend. Copies files between paths.
15/// This is the default backend -- operates entirely on local disk.
16pub struct LocalBackend;
17
18static LOCAL_UPLOAD_TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20fn local_version_for(path: &Path) -> Result<Option<BackendObjectVersion>, BackendError> {
21    if !path.exists() {
22        return Ok(None);
23    }
24    let bytes = fs::read(path)
25        .map_err(|e| BackendError::Transport(format!("read for version failed: {e}")))?;
26    let hash = hex::encode(crypto::sha256::sha256(&bytes));
27    Ok(Some(BackendObjectVersion::new(format!(
28        "sha256:{}:len:{}",
29        hash,
30        bytes.len()
31    ))))
32}
33
34fn lock_path_for(dest: &Path) -> PathBuf {
35    reddb_file::layout::local_cas_lock_path(dest)
36}
37
38pub(crate) fn local_cas_lock_path_for(dest: &Path) -> PathBuf {
39    lock_path_for(dest)
40}
41
42impl RemoteBackend for LocalBackend {
43    fn name(&self) -> &str {
44        "local"
45    }
46
47    fn download(&self, remote_key: &str, local_path: &Path) -> Result<bool, BackendError> {
48        reddb_file::local_backend_download(remote_key, local_path)
49            .map_err(|e| BackendError::Transport(format!("local download failed: {e}")))
50    }
51
52    fn upload(&self, local_path: &Path, remote_key: &str) -> Result<(), BackendError> {
53        let unique = LOCAL_UPLOAD_TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
54        reddb_file::local_backend_atomic_upload(local_path, remote_key, std::process::id(), unique)
55            .map_err(|e| BackendError::Transport(format!("local upload failed: {e}")))
56    }
57
58    fn exists(&self, remote_key: &str) -> Result<bool, BackendError> {
59        Ok(Path::new(remote_key).exists())
60    }
61
62    fn delete(&self, remote_key: &str) -> Result<(), BackendError> {
63        let path = Path::new(remote_key);
64        if path.exists() {
65            fs::remove_file(path)
66                .map_err(|e| BackendError::Transport(format!("delete failed: {e}")))?;
67        }
68        Ok(())
69    }
70
71    fn list(&self, prefix: &str) -> Result<Vec<String>, BackendError> {
72        let prefix_path = Path::new(prefix);
73        let mut results = Vec::new();
74
75        if prefix_path.is_dir() {
76            fn walk(dir: &Path, out: &mut Vec<String>) -> Result<(), BackendError> {
77                for entry in fs::read_dir(dir)
78                    .map_err(|e| BackendError::Transport(format!("read_dir failed: {e}")))?
79                {
80                    let entry = entry
81                        .map_err(|e| BackendError::Transport(format!("dir entry failed: {e}")))?;
82                    let path = entry.path();
83                    if path.is_dir() {
84                        walk(&path, out)?;
85                    } else if path.is_file() {
86                        out.push(path.to_string_lossy().to_string());
87                    }
88                }
89                Ok(())
90            }
91
92            walk(prefix_path, &mut results)?;
93        } else {
94            let parent = prefix_path.parent().unwrap_or_else(|| Path::new("."));
95            let needle = prefix_path.to_string_lossy().to_string();
96            if parent.exists() {
97                for entry in fs::read_dir(parent)
98                    .map_err(|e| BackendError::Transport(format!("read_dir failed: {e}")))?
99                {
100                    let entry = entry
101                        .map_err(|e| BackendError::Transport(format!("dir entry failed: {e}")))?;
102                    let path = entry.path();
103                    if path.is_file() {
104                        let candidate = path.to_string_lossy().to_string();
105                        if candidate.starts_with(&needle) {
106                            results.push(candidate);
107                        }
108                    }
109                }
110            }
111        }
112
113        results.sort();
114        Ok(results)
115    }
116}
117
118impl AtomicRemoteBackend for LocalBackend {
119    fn object_version(
120        &self,
121        remote_key: &str,
122    ) -> Result<Option<BackendObjectVersion>, BackendError> {
123        local_version_for(Path::new(remote_key))
124    }
125
126    fn upload_conditional(
127        &self,
128        local_path: &Path,
129        remote_key: &str,
130        condition: ConditionalPut,
131    ) -> Result<BackendObjectVersion, BackendError> {
132        let dest = Path::new(remote_key);
133        if let Some(parent) = dest.parent() {
134            fs::create_dir_all(parent)
135                .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
136        }
137        let lock_path = lock_path_for(dest);
138        let lock_file = OpenOptions::new()
139            .create(true)
140            .truncate(false)
141            .read(true)
142            .write(true)
143            .open(&lock_path)
144            .map_err(|e| BackendError::Transport(format!("open CAS lock failed: {e}")))?;
145        lock_file
146            .lock_exclusive()
147            .map_err(|e| BackendError::Transport(format!("lock CAS file failed: {e}")))?;
148
149        let observed = local_version_for(dest)?;
150        let allowed = match (&condition, &observed) {
151            (ConditionalPut::IfAbsent, None) => true,
152            (ConditionalPut::IfAbsent, Some(_)) => false,
153            (ConditionalPut::IfVersion(expected), Some(actual)) => expected == actual,
154            (ConditionalPut::IfVersion(_), None) => false,
155        };
156        if !allowed {
157            let _ = lock_file.unlock();
158            return Err(BackendError::PreconditionFailed(format!(
159                "local object '{}' changed before conditional upload",
160                remote_key
161            )));
162        }
163
164        let upload_result = self.upload(local_path, remote_key);
165        let version_result = upload_result.and_then(|_| {
166            local_version_for(dest)?.ok_or_else(|| {
167                BackendError::Internal(format!(
168                    "local object '{}' missing after conditional upload",
169                    remote_key
170                ))
171            })
172        });
173        let unlock_result = lock_file
174            .unlock()
175            .map_err(|e| BackendError::Transport(format!("unlock CAS file failed: {e}")));
176        match (version_result, unlock_result) {
177            (Ok(version), Ok(())) => Ok(version),
178            (Err(err), _) => Err(err),
179            (Ok(_), Err(err)) => Err(err),
180        }
181    }
182
183    fn delete_conditional(
184        &self,
185        remote_key: &str,
186        condition: ConditionalDelete,
187    ) -> Result<(), BackendError> {
188        let dest = Path::new(remote_key);
189        if let Some(parent) = dest.parent() {
190            fs::create_dir_all(parent)
191                .map_err(|e| BackendError::Transport(format!("mkdir failed: {e}")))?;
192        }
193        let lock_path = lock_path_for(dest);
194        let lock_file = OpenOptions::new()
195            .create(true)
196            .truncate(false)
197            .read(true)
198            .write(true)
199            .open(&lock_path)
200            .map_err(|e| BackendError::Transport(format!("open CAS lock failed: {e}")))?;
201        lock_file
202            .lock_exclusive()
203            .map_err(|e| BackendError::Transport(format!("lock CAS file failed: {e}")))?;
204
205        let observed = local_version_for(dest)?;
206        let allowed = match (&condition, &observed) {
207            (ConditionalDelete::IfVersion(expected), Some(actual)) => expected == actual,
208            (ConditionalDelete::IfVersion(_), None) => false,
209        };
210        if !allowed {
211            let _ = lock_file.unlock();
212            return Err(BackendError::PreconditionFailed(format!(
213                "local object '{}' changed before conditional delete",
214                remote_key
215            )));
216        }
217
218        let delete_result = self.delete(remote_key);
219        let unlock_result = lock_file
220            .unlock()
221            .map_err(|e| BackendError::Transport(format!("unlock CAS file failed: {e}")));
222        match (delete_result, unlock_result) {
223            (Ok(()), Ok(())) => Ok(()),
224            (Err(err), _) => Err(err),
225            (Ok(()), Err(err)) => Err(err),
226        }
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    fn temp_file(name: &str) -> PathBuf {
235        std::env::temp_dir().join(format!(
236            "reddb-local-backend-test-{}-{}-{}",
237            name,
238            std::process::id(),
239            crate::utils::now_unix_nanos()
240        ))
241    }
242
243    #[test]
244    fn conditional_create_if_absent_rejects_existing_object() {
245        let backend = LocalBackend;
246        let src = temp_file("src");
247        let remote = temp_file("remote");
248        fs::write(&src, b"first").unwrap();
249
250        let first = backend
251            .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
252            .unwrap();
253        assert!(first.token.contains("sha256:"));
254
255        fs::write(&src, b"second").unwrap();
256        let err = backend
257            .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
258            .unwrap_err();
259        assert!(matches!(err, BackendError::PreconditionFailed(_)));
260
261        let _ = fs::remove_file(src);
262        let _ = fs::remove_file(remote);
263    }
264
265    #[test]
266    fn conditional_replace_rejects_stale_version() {
267        let backend = LocalBackend;
268        let src = temp_file("src");
269        let remote = temp_file("remote");
270        fs::write(&src, b"first").unwrap();
271        let stale = backend
272            .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
273            .unwrap();
274
275        fs::write(&src, b"second").unwrap();
276        let fresh = backend
277            .upload_conditional(
278                &src,
279                remote.to_str().unwrap(),
280                ConditionalPut::IfVersion(stale.clone()),
281            )
282            .unwrap();
283
284        fs::write(&src, b"third").unwrap();
285        let err = backend
286            .upload_conditional(
287                &src,
288                remote.to_str().unwrap(),
289                ConditionalPut::IfVersion(stale),
290            )
291            .unwrap_err();
292        assert!(matches!(err, BackendError::PreconditionFailed(_)));
293        assert_eq!(
294            fresh.token,
295            backend
296                .object_version(remote.to_str().unwrap())
297                .unwrap()
298                .unwrap()
299                .token
300        );
301
302        let _ = fs::remove_file(src);
303        let _ = fs::remove_file(remote);
304    }
305
306    #[test]
307    fn conditional_delete_rejects_stale_version() {
308        let backend = LocalBackend;
309        let src = temp_file("src");
310        let remote = temp_file("remote");
311        fs::write(&src, b"first").unwrap();
312        let stale = backend
313            .upload_conditional(&src, remote.to_str().unwrap(), ConditionalPut::IfAbsent)
314            .unwrap();
315
316        fs::write(&src, b"second").unwrap();
317        let fresh = backend
318            .upload_conditional(
319                &src,
320                remote.to_str().unwrap(),
321                ConditionalPut::IfVersion(stale.clone()),
322            )
323            .unwrap();
324
325        let err = backend
326            .delete_conditional(
327                remote.to_str().unwrap(),
328                ConditionalDelete::IfVersion(stale),
329            )
330            .unwrap_err();
331        assert!(matches!(err, BackendError::PreconditionFailed(_)));
332        backend
333            .delete_conditional(
334                remote.to_str().unwrap(),
335                ConditionalDelete::IfVersion(fresh),
336            )
337            .unwrap();
338        assert!(!remote.exists());
339
340        let _ = fs::remove_file(src);
341    }
342}