Skip to main content

edgestore_repl/
filesystem_remote_store.rs

1//! FilesystemRemoteStore — local-filesystem implementation of `RemoteStore`.
2//!
3//! Files are content-addressed: named `{hash_hex}.seg` where `hash_hex` is the
4//! 64-character lowercase hex encoding of the 32-byte BLAKE3 hash. Listing the
5//! directory is equivalent to listing stored segments.
6//!
7//! This is the Phase 4 implementation (Plan 04-04, D04). Real S3 (`S3RemoteStore`)
8//! is a future phase deliverable.
9
10use std::path::PathBuf;
11
12use edgestore::error::EdgestoreError;
13use edgestore::RemoteStore;
14
15/// Local-filesystem implementation of `RemoteStore`.
16///
17/// All operations are idempotent and atomic where applicable.
18/// `upload` uses a `.tmp` write + rename to prevent torn writes (T-04-10).
19pub struct FilesystemRemoteStore {
20    base_dir: PathBuf,
21}
22
23impl FilesystemRemoteStore {
24    /// Create a new `FilesystemRemoteStore` rooted at `base_dir`.
25    ///
26    /// Creates `base_dir` (and all parent directories) if it does not exist.
27    pub fn new(base_dir: PathBuf) -> Result<Self, EdgestoreError> {
28        std::fs::create_dir_all(&base_dir)
29            .map_err(|e| EdgestoreError::ReplicationError(e.to_string()))?;
30        Ok(Self { base_dir })
31    }
32
33    /// Encode a 32-byte hash as a 64-character lowercase hex string.
34    fn hash_hex(hash: &[u8; 32]) -> String {
35        hash.iter().map(|b| format!("{:02x}", b)).collect::<String>()
36    }
37
38    /// Return the path `{base_dir}/{hash_hex}.seg` for the given hash.
39    fn seg_path(&self, hash: &[u8; 32]) -> PathBuf {
40        self.base_dir.join(format!("{}.seg", Self::hash_hex(hash)))
41    }
42}
43
44impl RemoteStore for FilesystemRemoteStore {
45    /// Store `data` under `hash`. Idempotent: if the file already exists, returns `Ok(())`.
46    ///
47    /// Writes to a `.tmp` file first, then renames atomically (T-04-10).
48    fn upload(&self, hash: &[u8; 32], data: &[u8]) -> Result<(), EdgestoreError> {
49        let dest = self.seg_path(hash);
50
51        // Content-addressed: if it already exists, nothing to do.
52        if dest.exists() {
53            return Ok(());
54        }
55
56        let tmp = self
57            .base_dir
58            .join(format!("{}.tmp", Self::hash_hex(hash)));
59
60        std::fs::write(&tmp, data)
61            .map_err(|e| EdgestoreError::ReplicationError(e.to_string()))?;
62
63        std::fs::rename(&tmp, &dest)
64            .map_err(|e| EdgestoreError::ReplicationError(e.to_string()))?;
65
66        Ok(())
67    }
68
69    /// Download the segment bytes for `hash`.
70    ///
71    /// Returns `EdgestoreError::ReplicationError` if the segment is not present.
72    fn download(&self, hash: &[u8; 32]) -> Result<Vec<u8>, EdgestoreError> {
73        let path = self.seg_path(hash);
74        std::fs::read(&path).map_err(|e| {
75            if e.kind() == std::io::ErrorKind::NotFound {
76                EdgestoreError::ReplicationError(format!(
77                    "segment not found: {}",
78                    Self::hash_hex(hash)
79                ))
80            } else {
81                EdgestoreError::ReplicationError(e.to_string())
82            }
83        })
84    }
85
86    /// List all stored segment hashes by scanning `{base_dir}/*.seg`.
87    ///
88    /// Filenames that are not exactly 64 lowercase hex characters followed by `.seg`
89    /// are silently skipped (T-04-12).
90    fn list(&self) -> Result<Vec<[u8; 32]>, EdgestoreError> {
91        let entries = std::fs::read_dir(&self.base_dir)
92            .map_err(|e| EdgestoreError::ReplicationError(e.to_string()))?;
93
94        let mut hashes = Vec::new();
95
96        for entry in entries.flatten() {
97            let file_name = entry.file_name();
98            let name = match file_name.to_str() {
99                Some(n) => n.to_owned(),
100                None => continue,
101            };
102
103            // Must end with ".seg"
104            if !name.ends_with(".seg") {
105                continue;
106            }
107
108            // Stem must be exactly 64 characters (32 bytes * 2 hex digits).
109            let stem = &name[..name.len() - 4]; // strip ".seg"
110            if stem.len() != 64 {
111                continue;
112            }
113
114            // Parse 64 hex chars → [u8; 32]
115            let parsed: Option<[u8; 32]> = (0..32)
116                .map(|i| u8::from_str_radix(&stem[i * 2..i * 2 + 2], 16).ok())
117                .collect::<Option<Vec<u8>>>()
118                .and_then(|v| v.try_into().ok());
119
120            if let Some(hash) = parsed {
121                hashes.push(hash);
122            }
123        }
124
125        Ok(hashes)
126    }
127
128    /// Remove the segment for `hash`. No-op if the segment does not exist (idempotent).
129    fn delete(&self, hash: &[u8; 32]) -> Result<(), EdgestoreError> {
130        let path = self.seg_path(hash);
131        match std::fs::remove_file(&path) {
132            Ok(()) => Ok(()),
133            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
134            Err(e) => Err(EdgestoreError::ReplicationError(e.to_string())),
135        }
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use tempfile::TempDir;
143
144    fn make_store() -> (TempDir, FilesystemRemoteStore) {
145        let dir = TempDir::new().expect("tempdir");
146        let store = FilesystemRemoteStore::new(dir.path().to_path_buf())
147            .expect("FilesystemRemoteStore::new");
148        (dir, store)
149    }
150
151    #[test]
152    fn test_upload_download_roundtrip() {
153        let (_dir, store) = make_store();
154        let hash = [0x42u8; 32];
155        let data = b"hello edgestore";
156
157        store.upload(&hash, data).expect("upload");
158        let got = store.download(&hash).expect("download");
159        assert_eq!(got, data);
160    }
161
162    #[test]
163    fn test_upload_idempotent() {
164        let (_dir, store) = make_store();
165        let hash = [0x42u8; 32];
166        let data = b"original";
167
168        store.upload(&hash, data).expect("first upload");
169        // Second upload with same hash must succeed without error.
170        store.upload(&hash, b"different").expect("second upload (idempotent)");
171
172        // File should still contain the original data (idempotent — skipped overwrite).
173        let got = store.download(&hash).expect("download after idempotent upload");
174        assert_eq!(got, data);
175    }
176
177    #[test]
178    fn test_list_returns_uploaded_hashes() {
179        let (_dir, store) = make_store();
180        let hash1 = [0x01u8; 32];
181        let hash2 = [0x02u8; 32];
182        let hash3 = [0x03u8; 32];
183
184        store.upload(&hash1, b"a").expect("upload 1");
185        store.upload(&hash2, b"b").expect("upload 2");
186        store.upload(&hash3, b"c").expect("upload 3");
187
188        let mut listed = store.list().expect("list");
189        listed.sort();
190
191        let mut expected = vec![hash1, hash2, hash3];
192        expected.sort();
193
194        assert_eq!(listed, expected);
195    }
196
197    #[test]
198    fn test_delete_removes_file() {
199        let (_dir, store) = make_store();
200        let hash = [0x42u8; 32];
201
202        store.upload(&hash, b"segment data").expect("upload");
203        store.delete(&hash).expect("delete");
204
205        // Download must now fail.
206        let result = store.download(&hash);
207        assert!(result.is_err(), "download after delete should return Err");
208    }
209
210    #[test]
211    fn test_download_not_found() {
212        let (_dir, store) = make_store();
213        let hash = [0xFFu8; 32];
214
215        let result = store.download(&hash);
216        assert!(result.is_err(), "download of non-existent hash should return Err");
217    }
218}