1use crossbeam_channel::RecvTimeoutError;
4use std::fmt::{Display, Formatter};
5use std::fs::create_dir_all;
6use std::sync::Arc;
7use std::thread;
8use std::thread::JoinHandle;
9use std::time::{Duration, UNIX_EPOCH};
10
11use serde::{Deserialize, Serialize};
12
13use crate::error::Error;
14use crate::file::{FileChunk, FileHash, FileId, FileLen, FileMetadata, FilePos};
15use crate::hasher::HashFn;
16use crate::path::Path;
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct Key {
20 file_id: FileId,
21 chunk_pos: FilePos,
22 chunk_len: FileLen,
23}
24
25impl Display for Key {
26 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
27 write!(f, "{}:{}", self.file_id.device, self.file_id.inode)
28 }
29}
30
31#[derive(Debug, Serialize, Deserialize)]
32struct CachedFileInfo {
33 modified_timestamp_ms: u64,
34 file_len: FileLen,
35 data_len: FileLen,
36 hash: FileHash,
37}
38
39type InnerCache = typed_sled::Tree<Key, CachedFileInfo>;
40
41const FLUSH_INTERVAL: Duration = Duration::from_millis(1000);
42
43pub struct HashCache {
49 cache: Arc<InnerCache>,
50 flusher: HashCacheFlusher,
51}
52
53impl HashCache {
54 pub fn open(
57 database_path: &Path,
58 transform: Option<&str>,
59 algorithm: HashFn,
60 ) -> Result<HashCache, Error> {
61 create_dir_all(database_path.to_path_buf()).map_err(|e| {
62 format!(
63 "Count not create hash database directory {}: {}",
64 database_path.to_escaped_string(),
65 e
66 )
67 })?;
68 let db = sled::open(database_path.to_path_buf()).map_err(|e| {
69 format!(
70 "Failed to open hash database at {}: {}",
71 database_path.to_escaped_string(),
72 e
73 )
74 })?;
75
76 let tree_id = format!("hash_db:{:?}:{}", algorithm, transform.unwrap_or("<none>"));
77 let cache = Arc::new(typed_sled::Tree::open(&db, tree_id));
78 let flusher = HashCacheFlusher::start(&cache);
79 Ok(HashCache { cache, flusher })
80 }
81
82 pub fn open_default(transform: Option<&str>, algorithm: HashFn) -> Result<HashCache, Error> {
85 let cache_dir =
86 dirs::cache_dir().ok_or("Could not obtain user cache directory from the system.")?;
87 let hash_db_path = cache_dir.join("fclones");
88 Self::open(&Path::from(hash_db_path), transform, algorithm)
89 }
90
91 pub fn put(
93 &self,
94 key: &Key,
95 file: &FileMetadata,
96 data_len: FileLen,
97 hash: FileHash,
98 ) -> Result<(), Error> {
99 let value = CachedFileInfo {
100 modified_timestamp_ms: file
101 .modified()
102 .map_err(|e| format!("Unable to get file modification timestamp: {e}"))?
103 .duration_since(UNIX_EPOCH)
104 .unwrap_or(Duration::ZERO)
105 .as_millis() as u64,
106 file_len: file.len(),
107 data_len,
108 hash,
109 };
110
111 self.cache
112 .insert(key, &value)
113 .map_err(|e| format!("Failed to write entry to cache: {e}"))?;
114
115 match self.flusher.err_channel.try_recv() {
117 Ok(err) => Err(err),
118 Err(_) => Ok(()),
119 }
120 }
121
122 pub fn get(
128 &self,
129 key: &Key,
130 metadata: &FileMetadata,
131 ) -> Result<Option<(FileLen, FileHash)>, Error> {
132 let value = self
133 .cache
134 .get(key)
135 .map_err(|e| format!("Failed to retrieve entry from cache: {e}"))?;
136 let value = match value {
137 Some(v) => v,
138 None => return Ok(None), };
140
141 let modified = metadata
142 .modified()
143 .map_err(|e| format!("Unable to get file modification timestamp: {e}"))?
144 .duration_since(UNIX_EPOCH)
145 .unwrap_or(Duration::ZERO)
146 .as_millis() as u64;
147
148 if value.modified_timestamp_ms != modified || value.file_len != metadata.len() {
149 Ok(None) } else {
151 Ok(Some((value.data_len, value.hash)))
152 }
153 }
154
155 pub fn key(&self, chunk: &FileChunk<'_>, metadata: &FileMetadata) -> Result<Key, Error> {
160 let key = Key {
161 file_id: metadata.file_id(),
162 chunk_pos: chunk.pos,
163 chunk_len: chunk.len,
164 };
165 Ok(key)
166 }
167
168 pub fn close(self) -> Result<(), Error> {
170 self.cache
171 .flush()
172 .map_err(|e| format!("Failed to flush cache: {e}"))?;
173 Ok(())
174 }
175}
176
177struct HashCacheFlusher {
179 thread_handle: Option<JoinHandle<()>>,
180 control_channel: Option<crossbeam_channel::Sender<()>>,
181 err_channel: crossbeam_channel::Receiver<Error>,
182}
183
184impl HashCacheFlusher {
185 fn start(cache: &Arc<InnerCache>) -> HashCacheFlusher {
186 let cache = Arc::downgrade(cache);
187 let (ctrl_tx, ctrl_rx) = crossbeam_channel::bounded::<()>(1);
188 let (err_tx, err_rx) = crossbeam_channel::bounded(1);
189
190 let thread_handle = thread::spawn(move || {
191 while let Err(RecvTimeoutError::Timeout) = ctrl_rx.recv_timeout(FLUSH_INTERVAL) {
192 if let Some(cache) = cache.upgrade() {
193 if let Err(e) = cache.flush() {
194 err_tx
195 .send(format!("Failed to flush the hash cache: {e}").into())
196 .unwrap_or_default();
197 return;
198 }
199 }
200 }
201 });
202
203 HashCacheFlusher {
204 thread_handle: Some(thread_handle),
205 control_channel: Some(ctrl_tx),
206 err_channel: err_rx,
207 }
208 }
209}
210
211impl Drop for HashCacheFlusher {
212 fn drop(&mut self) {
213 drop(self.control_channel.take());
215 self.thread_handle.take().unwrap().join().unwrap();
217 }
218}
219
220#[cfg(test)]
221mod test {
222 use std::fs::OpenOptions;
223
224 use crate::cache::HashCache;
225 use crate::file::{FileChunk, FileHash, FileLen, FileMetadata, FilePos};
226 use crate::hasher::HashFn;
227 use crate::path::Path;
228 use crate::util::test::{create_file, with_dir};
229
230 #[test]
231 fn return_cached_hash_if_file_hasnt_changed() {
232 with_dir("cache/return_cached_hash_if_file_hasnt_changed", |root| {
233 let path = root.join("file");
234 create_file(&path);
235 let path = Path::from(&path);
236 let metadata = FileMetadata::new(&path).unwrap();
237 let chunk = FileChunk::new(&path, FilePos(0), FileLen(1000));
238
239 let cache_path = Path::from(root.join("cache"));
240 let cache = HashCache::open(&cache_path, None, HashFn::Metro).unwrap();
241 let key = cache.key(&chunk, &metadata).unwrap();
242 let orig_hash = FileHash::from(12345);
243
244 let data_len = FileLen(200);
245 cache
246 .put(&key, &metadata, data_len, orig_hash.clone())
247 .unwrap();
248 let cached_hash = cache.get(&key, &metadata).unwrap();
249
250 assert_eq!(cached_hash, Some((data_len, orig_hash)))
251 });
252 }
253
254 #[test]
255 fn return_none_if_file_has_changed() {
256 with_dir("cache/return_none_if_file_has_changed", |root| {
257 let path = root.join("file");
258 create_file(&path);
259 let path = Path::from(&path);
260 let metadata = FileMetadata::new(&path).unwrap();
261 let chunk = FileChunk::new(&path, FilePos(0), FileLen(1000));
262
263 let cache_path = Path::from(root.join("cache"));
264 let cache = HashCache::open(&cache_path, None, HashFn::Metro).unwrap();
265 let key = cache.key(&chunk, &metadata).unwrap();
266 cache
267 .put(&key, &metadata, chunk.len, FileHash::from(12345))
268 .unwrap();
269
270 use std::io::Write;
272
273 let mut f = OpenOptions::new()
274 .append(true)
275 .open(path.to_path_buf())
276 .unwrap();
277 write!(f, "text").unwrap();
278 drop(f);
279
280 let metadata = FileMetadata::new(&path).unwrap();
281 let cached_hash = cache.get(&key, &metadata).unwrap();
282 assert_eq!(cached_hash, None)
283 });
284 }
285
286 #[test]
287 fn return_none_if_asked_for_a_different_chunk() {
288 with_dir("cache/return_none_if_asked_for_a_different_chunk", |root| {
289 let path = root.join("file");
290 create_file(&path);
291 let path = Path::from(&path);
292 let metadata = FileMetadata::new(&path).unwrap();
293 let chunk = FileChunk::new(&path, FilePos(0), FileLen(1000));
294
295 let cache_path = Path::from(root.join("cache"));
296 let cache = HashCache::open(&cache_path, None, HashFn::Metro).unwrap();
297 let key = cache.key(&chunk, &metadata).unwrap();
298
299 cache
300 .put(&key, &metadata, chunk.len, FileHash::from(12345))
301 .unwrap();
302
303 let chunk = FileChunk::new(&path, FilePos(1000), FileLen(2000));
304 let key = cache.key(&chunk, &metadata).unwrap();
305 let cached_hash = cache.get(&key, &metadata).unwrap();
306
307 assert_eq!(cached_hash, None)
308 });
309 }
310
311 #[test]
312 fn return_none_if_different_transform_was_used() {
313 with_dir(
314 "cache/return_none_if_different_transform_was_used",
315 |root| {
316 let path = root.join("file");
317 create_file(&path);
318 let path = Path::from(&path);
319 let metadata = FileMetadata::new(&path).unwrap();
320 let chunk = FileChunk::new(&path, FilePos(0), FileLen(1000));
321
322 let cache_path = Path::from(root.join("cache"));
323 let cache = HashCache::open(&cache_path, None, HashFn::Metro).unwrap();
324 let key = cache.key(&chunk, &metadata).unwrap();
325
326 let orig_hash = FileHash::from(12345);
327 let data_len = FileLen(200);
328 cache
329 .put(&key, &metadata, data_len, orig_hash.clone())
330 .unwrap();
331 let cached_hash = cache.get(&key, &metadata).unwrap();
332 assert_eq!(cached_hash, Some((data_len, orig_hash)));
333 drop(cache); let cache = HashCache::open(&cache_path, Some("transform"), HashFn::Metro).unwrap();
336 let cached_hash = cache.get(&key, &metadata).unwrap();
337 assert_eq!(cached_hash, None);
338 },
339 );
340 }
341}