1use std::borrow::Cow;
25use std::fs::File;
26use std::io::Result;
27use std::path::Path;
28use std::path::PathBuf;
29use std::sync::atomic::AtomicU8;
30use std::sync::atomic::Ordering::Relaxed;
31use std::sync::Arc;
32
33use crate::cache_dir::CacheDir;
34use crate::multiplicative_hash::MultiplicativeHash;
35use crate::trigger::PeriodicTrigger;
36use crate::Key;
37use crate::KISMET_TEMPORARY_SUBDIRECTORY as TEMP_SUBDIR;
38
39const MAINTENANCE_SCALE: usize = 2;
43
44const PRIMARY_MIXER: MultiplicativeHash =
48 MultiplicativeHash::new_keyed(b"kismet: primary shard mixer");
49
50const SECONDARY_MIXER: MultiplicativeHash =
51 MultiplicativeHash::new_keyed(b"kismet: secondary shard mixer");
52
53#[derive(Clone, Debug)]
57pub struct Cache {
58 load_estimates: Arc<[AtomicU8]>,
60 base_dir: PathBuf,
62 trigger: PeriodicTrigger,
68 num_shards: usize,
70 shard_capacity: usize,
72}
73
74#[inline]
82fn format_id(shard: usize) -> String {
83 format!(".kismet_{:04x}", shard)
84}
85
86struct Shard {
89 id: usize,
90 shard_dir: PathBuf,
91 trigger: PeriodicTrigger,
92 capacity: usize,
93}
94
95impl Shard {
96 fn replace_shard(self, id: usize) -> Shard {
98 let mut shard_dir = self.shard_dir;
99 shard_dir.pop();
100 shard_dir.push(&format_id(id));
101 Shard {
102 id,
103 shard_dir,
104 trigger: self.trigger,
105 capacity: self.capacity,
106 }
107 }
108
109 fn file_exists(&mut self, name: &str) -> bool {
111 self.shard_dir.push(name);
112 let result = std::fs::metadata(&self.shard_dir);
113 self.shard_dir.pop();
114
115 result.is_ok()
116 }
117}
118
119impl CacheDir for Shard {
120 #[inline]
121 fn temp_dir(&self) -> Cow<Path> {
122 let mut dir = self.shard_dir.clone();
123 dir.push(TEMP_SUBDIR);
124 Cow::from(dir)
125 }
126
127 #[inline]
128 fn base_dir(&self) -> Cow<Path> {
129 Cow::from(&self.shard_dir)
130 }
131
132 #[inline]
133 fn trigger(&self) -> &PeriodicTrigger {
134 &self.trigger
135 }
136
137 #[inline]
138 fn capacity(&self) -> usize {
139 self.capacity
140 }
141}
142
143impl Cache {
144 pub fn new(base_dir: PathBuf, mut num_shards: usize, mut total_capacity: usize) -> Cache {
147 if num_shards < 2 {
149 num_shards = 2;
150 }
151
152 if total_capacity < num_shards {
153 total_capacity = num_shards;
154 }
155
156 let mut load_estimates = Vec::with_capacity(num_shards);
157 load_estimates.resize_with(num_shards, || AtomicU8::new(0));
158 let shard_capacity =
159 (total_capacity / num_shards) + ((total_capacity % num_shards) != 0) as usize;
160 let trigger =
161 PeriodicTrigger::new(shard_capacity.min(total_capacity / MAINTENANCE_SCALE) as u64);
162
163 Cache {
164 load_estimates: load_estimates.into_boxed_slice().into(),
165 base_dir,
166 trigger,
167 num_shards,
168 shard_capacity,
169 }
170 }
171
172 fn random_shard_id(&self) -> usize {
174 use rand::Rng;
175
176 rand::thread_rng().gen_range(0..self.num_shards)
177 }
178
179 fn other_shard_id(&self, base: usize, mut other: usize) -> usize {
182 if base != other {
183 return other;
184 }
185
186 other += 1;
187 if other < self.num_shards {
188 other
189 } else {
190 0
191 }
192 }
193
194 fn shard_ids(&self, key: Key) -> (usize, usize) {
196 let h1 = PRIMARY_MIXER.map(key.hash, self.num_shards);
199 let h2 = SECONDARY_MIXER.map(key.secondary_hash, self.num_shards);
200
201 (h1, self.other_shard_id(h1, h2))
205 }
206
207 fn sort_by_load(&self, (h1, h2): (usize, usize)) -> (usize, usize) {
209 let load1 = self.load_estimates[h1].load(Relaxed) as usize;
210 let load2 = self.load_estimates[h2].load(Relaxed) as usize;
211
212 let capacity = self.shard_capacity;
216 if load1.clamp(0, capacity) <= load2.clamp(0, capacity) {
217 (h1, h2)
218 } else {
219 (h2, h1)
220 }
221 }
222
223 fn shard(&self, shard_id: usize) -> Shard {
225 let mut dir = self.base_dir.clone();
226 dir.push(&format_id(shard_id));
227 Shard {
228 id: shard_id,
229 shard_dir: dir,
230 trigger: self.trigger,
231 capacity: self.shard_capacity,
232 }
233 }
234
235 pub fn get(&self, key: Key) -> Result<Option<File>> {
242 let (h1, h2) = self.shard_ids(key);
243 let shard = self.shard(h1);
244
245 if let Some(file) = shard.get(key.name)? {
246 Ok(Some(file))
247 } else {
248 shard.replace_shard(h2).get(key.name)
249 }
250 }
251
252 pub fn temp_dir(&self, key: Option<Key>) -> Result<Cow<Path>> {
258 let shard_id = match key {
259 Some(key) => self.sort_by_load(self.shard_ids(key)).0,
260 None => self.random_shard_id(),
261 };
262 let shard = self.shard(shard_id);
263 if self.trigger.event() {
264 shard.cleanup_temp_directory()?;
265 }
266
267 Ok(Cow::from(shard.ensure_temp_dir()?.into_owned()))
268 }
269
270 fn update_estimate(&self, shard_id: usize, update: Option<u64>) {
273 let target = &self.load_estimates[shard_id];
274 match update {
275 Some(remaining) => {
278 let update = remaining.clamp(0, u8::MAX as u64 - 1) as u8;
279 target.store(update + 1, Relaxed);
280 }
281 None => {
283 let _ = target.fetch_update(Relaxed, Relaxed, |i| {
284 if i < u8::MAX {
285 Some(i + 1)
286 } else {
287 None
288 }
289 });
290 }
291 };
292 }
293
294 fn force_maintain_shard(&self, shard: Shard) -> Result<()> {
296 let update = shard.maintain()?.clamp(0, u8::MAX as u64) as u8;
297 self.load_estimates[shard.id].store(update, Relaxed);
298 Ok(())
299 }
300
301 fn maintain_random_other_shard(&self, base: Shard) -> Result<()> {
304 let shard_id = self.other_shard_id(base.id, self.random_shard_id());
305 self.force_maintain_shard(base.replace_shard(shard_id))
306 }
307
308 pub fn set(&self, key: Key, value: &Path) -> Result<()> {
317 let (h1, h2) = self.sort_by_load(self.shard_ids(key));
318 let mut shard = self.shard(h2);
319
320 if !shard.file_exists(key.name) {
323 shard = shard.replace_shard(h1);
324 }
325
326 let update = shard.set(key.name, value)?;
327 self.update_estimate(h1, update);
328
329 if update.is_some() {
334 self.maintain_random_other_shard(shard)?;
335 } else if self.load_estimates[h1].load(Relaxed) as usize / 2 > self.shard_capacity {
336 self.force_maintain_shard(shard)?;
339 }
340
341 Ok(())
342 }
343
344 pub fn put(&self, key: Key, value: &Path) -> Result<()> {
354 let (h1, h2) = self.sort_by_load(self.shard_ids(key));
355 let mut shard = self.shard(h2);
356
357 if !shard.file_exists(key.name) {
360 shard = shard.replace_shard(h1);
361 }
362
363 let update = shard.put(key.name, value)?;
364 self.update_estimate(h1, update);
365
366 if update.is_some() {
369 self.maintain_random_other_shard(shard)?;
370 } else if self.load_estimates[h1].load(Relaxed) as usize / 2 > self.shard_capacity {
371 self.force_maintain_shard(shard)?;
372 }
373
374 Ok(())
375 }
376
377 pub fn touch(&self, key: Key) -> Result<bool> {
383 let (h1, h2) = self.shard_ids(key);
384 let shard = self.shard(h1);
385
386 if shard.touch(key.name)? {
387 return Ok(true);
388 }
389
390 shard.replace_shard(h2).touch(key.name)
391 }
392}
393
394#[test]
397fn smoke_test() {
398 use tempfile::NamedTempFile;
399 use test_dir::{DirBuilder, TestDir};
400
401 const PAYLOAD_MULTIPLIER: usize = 113;
403
404 let temp = TestDir::temp();
405 let cache = Cache::new(temp.path("."), 3, 9);
406
407 for i in 0..200 {
408 let name = format!("{}", i);
409
410 let temp_dir = cache.temp_dir(None).expect("temp_dir must succeed");
411 let tmp = NamedTempFile::new_in(temp_dir).expect("new temp file must succeed");
412 std::fs::write(tmp.path(), format!("{}", PAYLOAD_MULTIPLIER * i))
413 .expect("write must succeed");
414 if (i % 2) != 0 {
416 cache
417 .put(Key::new(&name, i as u64, i as u64 + 42), tmp.path())
418 .expect("put must succeed");
419 } else {
420 cache
421 .set(Key::new(&name, i as u64, i as u64 + 42), tmp.path())
422 .expect("set must succeed");
423 }
424 }
425
426 let present: usize = (0..200)
427 .map(|i| {
428 let name = format!("{}", i);
429 match cache
430 .get(Key::new(&name, i as u64, i as u64 + 42))
431 .expect("get must succeed")
432 {
433 Some(mut file) => {
434 use std::io::Read;
435 let mut buf = Vec::new();
436 file.read_to_end(&mut buf).expect("read must succeed");
437 assert_eq!(buf, format!("{}", PAYLOAD_MULTIPLIER * i).into_bytes());
438 1
439 }
440 None => 0,
441 }
442 })
443 .sum();
444
445 assert!(present >= 9);
446 assert!(present <= 18);
447}
448
449#[test]
452fn test_set() {
453 use std::io::{Read, Write};
454 use tempfile::NamedTempFile;
455 use test_dir::{DirBuilder, TestDir};
456
457 let temp = TestDir::temp();
458 let cache = Cache::new(temp.path("."), 0, 0);
459
460 {
461 let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed"))
462 .expect("new temp file must succeed");
463 tmp.as_file().write_all(b"v1").expect("write must succeed");
464
465 cache
466 .set(Key::new("entry", 1, 2), tmp.path())
467 .expect("initial set must succeed");
468 }
469
470 {
471 let mut cached = cache
472 .get(Key::new("entry", 1, 2))
473 .expect("must succeed")
474 .expect("must be found");
475 let mut dst = Vec::new();
476 cached.read_to_end(&mut dst).expect("read must succeed");
477 assert_eq!(&dst, b"v1");
478 }
479
480 {
482 let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed"))
483 .expect("new temp file must succeed");
484 tmp.as_file().write_all(b"v2").expect("write must succeed");
485
486 cache
487 .set(Key::new("entry", 1, 2), tmp.path())
488 .expect("overwrite must succeed");
489 }
490
491 {
492 let mut cached = cache
493 .get(Key::new("entry", 1, 2))
494 .expect("must succeed")
495 .expect("must be found");
496 let mut dst = Vec::new();
497 cached.read_to_end(&mut dst).expect("read must succeed");
498 assert_eq!(&dst, b"v2");
499 }
500}
501
502#[test]
505fn test_put() {
506 use std::io::{Read, Write};
507 use tempfile::NamedTempFile;
508 use test_dir::{DirBuilder, TestDir};
509
510 let temp = TestDir::temp();
511 let cache = Cache::new(temp.path("."), 0, 0);
512
513 {
514 let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed"))
515 .expect("new temp file must succeed");
516 tmp.as_file().write_all(b"v1").expect("write must succeed");
517
518 cache
519 .set(Key::new("entry", 1, 2), tmp.path())
520 .expect("initial set must succeed");
521 }
522
523 {
525 let tmp = NamedTempFile::new_in(cache.temp_dir(None).expect("temp_dir must succeed"))
526 .expect("new temp file must succeed");
527 tmp.as_file().write_all(b"v2").expect("write must succeed");
528
529 cache
530 .put(Key::new("entry", 1, 2), tmp.path())
531 .expect("put must succeed");
532 }
533
534 {
535 let mut cached = cache
536 .get(Key::new("entry", 1, 2))
537 .expect("must succeed")
538 .expect("must be found");
539 let mut dst = Vec::new();
540 cached.read_to_end(&mut dst).expect("read must succeed");
541 assert_eq!(&dst, b"v1");
542 }
543}
544
545#[test]
548fn test_touch() {
549 use std::io::Read;
550 use tempfile::NamedTempFile;
551 use test_dir::{DirBuilder, TestDir};
552
553 const PAYLOAD_MULTIPLIER: usize = 113;
555
556 let temp = TestDir::temp();
557 let cache = Cache::new(temp.path("."), 2, 600);
558
559 for i in 0..2000 {
560 assert_eq!(
562 cache
563 .touch(Key::new("0", 0, 42))
564 .expect("touch must succeed"),
565 i > 0
566 );
567
568 let name = format!("{}", i);
569
570 let temp_dir = cache.temp_dir(None).expect("temp_dir must succeed");
571 let tmp = NamedTempFile::new_in(temp_dir).expect("new temp file must succeed");
572 std::fs::write(tmp.path(), format!("{}", PAYLOAD_MULTIPLIER * i))
573 .expect("write must succeed");
574 cache
575 .put(Key::new(&name, i as u64, i as u64 + 42), tmp.path())
576 .expect("put must succeed");
577 if i == 0 {
578 std::thread::sleep(std::time::Duration::from_secs(2));
580 }
581 }
582
583 let mut file = cache
584 .get(Key::new("0", 0, 42))
585 .expect("get must succeed")
586 .expect("file must be found");
587 let mut buf = Vec::new();
588 file.read_to_end(&mut buf).expect("read must succeed");
589 assert_eq!(buf, b"0");
590}