1const SUBDIR: &str = "sharded_logs";
10const WARN: &str = "DO_NOT_PUT_YOUR_FILES_HERE";
11
12use std::{
13 collections::BTreeMap,
14 fs::{self, File, OpenOptions},
15 io::{self, BufReader, BufWriter, Read, Seek, Write},
16 mem::MaybeUninit,
17 path::PathBuf,
18 sync::{
19 atomic::{
20 AtomicBool, AtomicU64, AtomicUsize, Ordering,
21 },
22 Arc, Mutex, MutexGuard,
23 },
24};
25
26use crc32fast::Hasher;
27use fault_injection::{fallible, maybe};
28use fs2::FileExt;
29
30#[derive(Debug, Clone)]
31pub struct Config {
32 pub path: PathBuf,
34 pub shards: u8,
37 pub in_memory_buffer_per_log: usize,
39}
40
41impl Default for Config {
42 fn default() -> Config {
43 Config {
44 path: Default::default(),
45 shards: 8,
46 in_memory_buffer_per_log: 512 * 1024,
47 }
48 }
49}
50
51impl Config {
52 pub fn recover(&self) -> io::Result<RecoveryIterator> {
63 let mut file_opts = OpenOptions::new();
64 file_opts.read(true);
65
66 let mut readers = vec![];
67
68 for idx in 0..self.shards {
69 let path = self
70 .path
71 .join(SUBDIR)
72 .join(idx.to_string());
73
74 let file = fallible!(file_opts.open(path));
75 readers.push(BufReader::new(file));
76 }
77
78 let mut ret = RecoveryIterator {
79 done: false,
80 next_expected_lsn: 0,
81 readers,
82 last_shard: None,
83 read_buffer: BTreeMap::new(),
84 };
85
86 for idx in 0..self.shards {
87 ret.tick(idx as usize);
88 }
89
90 Ok(ret)
91 }
92
93 fn lock(&self) -> io::Result<File> {
94 fallible!(fs::create_dir_all(
95 self.path.join(SUBDIR)
96 ));
97 let mut lock_options = OpenOptions::new();
98 lock_options.read(true).create(true).write(true);
99 let lock_file = fallible!(lock_options
100 .open(self.path.join(SUBDIR).join(WARN)));
101 fallible!(lock_file.try_lock_exclusive());
102 Ok(lock_file)
103 }
104
105 pub fn purge(&self) -> io::Result<()> {
110 let _lock_file = Arc::new(fallible!(self.lock()));
111 fs::remove_dir_all(&self.path)
112 }
113
114 pub fn create(&self) -> io::Result<ShardedLog> {
117 fallible!(fs::create_dir_all(
118 self.path.join(SUBDIR)
119 ));
120
121 let lock_file = Arc::new(fallible!(self.lock()));
122
123 let mut file_opts = OpenOptions::new();
124 file_opts.create_new(true).write(true);
125
126 let mut shards = vec![];
127 for idx in 0..self.shards {
128 let path = self
129 .path
130 .join(SUBDIR)
131 .join(idx.to_string());
132
133 let file = fallible!(file_opts.open(path));
134
135 shards.push(Shard {
136 file_mu: Mutex::new(
137 BufWriter::with_capacity(
138 self.in_memory_buffer_per_log,
139 file,
140 ),
141 ),
142 dirty: false.into(),
143 })
144 }
145
146 fallible!(
147 File::open(self.path.join(SUBDIR))?.sync_all()
148 );
149
150 Ok(ShardedLog {
151 shards: shards.into(),
152 idx: 0,
153 idx_counter: Arc::new(AtomicUsize::new(1)),
154 next_lsn: Arc::new(0.into()),
155 config: self.clone(),
156 lock_file,
157 })
158 }
159}
160
161pub struct ShardedLog {
165 shards: Arc<[Shard]>,
166 idx: usize,
167 idx_counter: Arc<AtomicUsize>,
168 next_lsn: Arc<AtomicU64>,
169 config: Config,
170 lock_file: Arc<File>,
171}
172
173pub struct Reservation<'a> {
174 shard: MutexGuard<'a, BufWriter<File>>,
175 completed: bool,
176 lsn: u64,
177}
178
179impl<'a> Drop for Reservation<'a> {
180 fn drop(&mut self) {
181 if !self.completed {
182 if let Err(e) = write_batch_inner::<&[u8]>(
183 &mut self.shard,
184 self.lsn,
185 &[],
186 ) {
187 eprintln!(
188 "error while writing empty batch on \
189 Reservation Drop: {:?}",
190 e
191 );
192 }
193 self.completed = true;
194 }
195 }
196}
197
198impl<'a> Reservation<'a> {
199 pub fn write_batch<B: AsRef<[u8]>>(
200 mut self,
201 write_batch: &[B],
202 ) -> io::Result<u64> {
203 self.completed = true;
204 write_batch_inner(
205 &mut self.shard,
206 self.lsn,
207 write_batch,
208 )
209 }
210
211 pub fn abort(mut self) -> io::Result<u64> {
212 self.completed = true;
213 write_batch_inner::<&[u8]>(
214 &mut self.shard,
215 self.lsn,
216 &[],
217 )
218 }
219}
220
221impl Clone for ShardedLog {
222 fn clone(&self) -> ShardedLog {
223 ShardedLog {
224 shards: self.shards.clone(),
225 idx_counter: self.idx_counter.clone(),
226 idx: self
227 .idx_counter
228 .fetch_add(1, Ordering::SeqCst),
229 next_lsn: self.next_lsn.clone(),
230 config: self.config.clone(),
231 lock_file: self.lock_file.clone(),
232 }
233 }
234}
235
236struct Shard {
237 file_mu: Mutex<BufWriter<File>>,
238 dirty: AtomicBool,
239}
240
241pub struct RecoveryIterator {
242 next_expected_lsn: u64,
243 readers: Vec<BufReader<File>>,
244 read_buffer: BTreeMap<u64, (usize, Vec<Vec<u8>>)>,
245 last_shard: Option<usize>,
246 done: bool,
247}
248
249impl Iterator for RecoveryIterator {
250 type Item = Vec<Vec<u8>>;
251
252 fn next(&mut self) -> Option<Self::Item> {
253 let ret = self.next_inner();
254 if ret.is_none() {
255 self.done = true;
256 }
257 ret
258 }
259}
260
261impl RecoveryIterator {
262 fn next_inner(&mut self) -> Option<Vec<Vec<u8>>> {
263 if let Some(last_idx) = self.last_shard {
264 self.tick(last_idx);
265 }
266
267 let (idx, buf) = self
268 .read_buffer
269 .remove(&self.next_expected_lsn)?;
270
271 self.next_expected_lsn += 1;
272 self.last_shard = Some(idx);
273 Some(buf)
274 }
275
276 fn tick(&mut self, idx: usize) {
277 macro_rules! weak_try {
278 ($e:expr) => {{
279 match $e {
280 Ok(ok) => ok,
281 _ => return,
282 }
283 }};
284 }
285
286 let mut reader = &mut self.readers[idx];
287
288 let crc_expected: [u8; 4] =
289 weak_try!(read_array(&mut reader));
290 let size_bytes: [u8; 8] =
291 weak_try!(read_array(&mut reader));
292 let lsn_bytes: [u8; 8] =
293 weak_try!(read_array(&mut reader));
294
295 let mut hasher = Hasher::new();
296 hasher.update(&size_bytes);
297 hasher.update(&lsn_bytes);
298 let crc_actual =
299 (hasher.finalize() ^ 0xFF).to_le_bytes();
300
301 if crc_actual != crc_expected {
302 eprintln!("encountered corrupted crc in log");
303 return;
304 }
305
306 let size =
307 usize::try_from(u64::from_le_bytes(size_bytes))
308 .unwrap();
309 let lsn = u64::from_le_bytes(lsn_bytes);
310
311 let mut write_batch = vec![];
312 for _ in 0..size {
313 let crc_expected: [u8; 4] =
314 weak_try!(read_array(&mut reader));
315 let len_bytes: [u8; 8] =
316 weak_try!(read_array(&mut reader));
317
318 let len = usize::try_from(u64::from_le_bytes(
319 len_bytes,
320 ))
321 .unwrap();
322
323 let mut buf = Vec::with_capacity(len);
324
325 unsafe { buf.set_len(len) };
326
327 weak_try!(maybe!(reader.read_exact(&mut buf)));
328
329 let mut hasher = Hasher::new();
330 hasher.update(&len_bytes);
331 hasher.update(&buf);
332 let crc_actual =
333 (hasher.finalize() ^ 0xFF).to_le_bytes();
334
335 if crc_actual != crc_expected {
336 return;
337 }
338
339 write_batch.push(buf);
340 }
341
342 self.read_buffer.insert(lsn, (idx, write_batch));
343 }
344}
345
346fn read_array<const LEN: usize>(
347 mut reader: impl io::Read,
348) -> io::Result<[u8; LEN]> {
349 let mut buf: [u8; LEN] =
350 unsafe { MaybeUninit::uninit().assume_init() };
351 reader.read_exact(&mut buf)?;
352 Ok(buf)
353}
354
355impl ShardedLog {
356 pub fn write_batch<B: AsRef<[u8]>>(
364 &self,
365 write_batch: &[B],
366 ) -> io::Result<u64> {
367 self.reservation().write_batch(write_batch)
368 }
369
370 pub fn reservation(&self) -> Reservation<'_> {
377 let shard = self.get_shard();
378
379 let lsn =
381 self.next_lsn.fetch_add(1, Ordering::Release);
382
383 Reservation {
384 shard,
385 lsn,
386 completed: false,
387 }
388 }
389
390 pub fn flush(&self) -> io::Result<()> {
396 for shard in &*self.shards {
397 if shard.dirty.load(Ordering::Acquire) {
398 let mut file =
399 shard.file_mu.lock().unwrap();
400 if shard.dirty.load(Ordering::Acquire) {
401 fallible!(file.flush());
402 fallible!(file.get_mut().sync_all());
403 shard
404 .dirty
405 .store(false, Ordering::Release);
406 }
407 }
408 }
409 Ok(())
410 }
411
412 pub fn purge(&self) -> io::Result<()> {
414 let mut buffers = vec![];
415 for shard in &*self.shards {
416 let buffer = shard.file_mu.lock().unwrap();
417 buffers.push(buffer);
418 }
419
420 for buffer in &mut buffers {
421 fallible!(buffer.flush());
422
423 let file = buffer.get_mut();
424 fallible!(file.seek(io::SeekFrom::Start(0)));
425 fallible!(file.set_len(0));
426 fallible!(file.sync_all());
427 }
428
429 for shard in &*self.shards {
430 shard.dirty.store(false, Ordering::Release);
431 }
432
433 drop(buffers);
436
437 Ok(())
438 }
439
440 fn get_shard(&self) -> MutexGuard<'_, BufWriter<File>> {
441 let len = self.shards.len();
442 for i in 0..len {
443 let idx = self.idx.wrapping_sub(i) % len;
446
447 if let Ok(shard) =
448 self.shards[idx].file_mu.try_lock()
449 {
450 self.shards[idx]
451 .dirty
452 .store(true, Ordering::Release);
453 return shard;
454 }
455 }
456
457 let ret =
458 self.shards[self.idx].file_mu.lock().unwrap();
459
460 self.shards[self.idx]
463 .dirty
464 .store(true, Ordering::Release);
465
466 ret
467 }
468}
469
470fn write_batch_inner<B: AsRef<[u8]>>(
471 shard: &mut BufWriter<File>,
472 lsn: u64,
473 write_batch: &[B],
474) -> io::Result<u64> {
475 let size_bytes: [u8; 8] =
476 (write_batch.len() as u64).to_le_bytes();
477 let lsn_bytes: [u8; 8] = lsn.to_le_bytes();
478 let mut hasher = Hasher::new();
479 hasher.update(&size_bytes);
480 hasher.update(&lsn_bytes);
481 let crc: [u8; 4] =
482 (hasher.finalize() ^ 0xFF).to_le_bytes();
483
484 fallible!(shard.write_all(&crc));
485 fallible!(shard.write_all(&size_bytes));
486 fallible!(shard.write_all(&lsn_bytes));
487
488 for buf_i in write_batch {
489 let buf = buf_i.as_ref();
490 let crc_bytes: [u8; 4];
491 let len_bytes: [u8; 8] =
492 (buf.len() as u64).to_le_bytes();
493
494 let mut hasher = Hasher::new();
495 hasher.update(&len_bytes);
496 hasher.update(&buf);
497 crc_bytes =
498 (hasher.finalize() ^ 0xFF).to_le_bytes();
499
500 fallible!(shard.write_all(&crc_bytes));
501 fallible!(shard.write_all(&len_bytes));
502 fallible!(shard.write_all(buf));
503 }
504
505 Ok(lsn)
506}
507
508#[test]
509fn concurrency() {
510 let n_threads = 16_usize;
511 let n_ops_per_thread = 10 * 1024;
512
513 static CONCURRENCY_TEST_COUNTER: AtomicU64 =
514 AtomicU64::new(0);
515
516 let config = Config {
517 shards: (n_threads / 4).max(1) as u8,
518 path: "test_concurrency".into(),
519 ..Default::default()
520 };
521
522 config.purge().unwrap();
523
524 let log = Arc::new(config.create().unwrap());
525
526 let barrier =
527 Arc::new(std::sync::Barrier::new(n_threads + 1));
528 let mut threads = vec![];
529 for _ in 0..n_threads {
530 let barrier = barrier.clone();
531 let log = log.clone();
532
533 let thread = std::thread::spawn(move || {
534 barrier.wait();
535
536 let mut successes = 0;
537 while successes < n_ops_per_thread {
538 let old_value = CONCURRENCY_TEST_COUNTER
539 .load(Ordering::Acquire);
540
541 std::thread::yield_now();
542
543 let reservation = log.reservation();
544
545 std::thread::yield_now();
546
547 let cas_res = CONCURRENCY_TEST_COUNTER
548 .compare_exchange(
549 old_value,
550 old_value + 1,
551 Ordering::AcqRel,
552 Ordering::Relaxed,
553 );
554
555 if cas_res.is_ok() {
556 let value = old_value.to_le_bytes();
557 reservation
558 .write_batch(&[value])
559 .unwrap();
560 successes += 1;
561 } else {
562 reservation.abort().unwrap();
563 }
564 }
565 });
566
567 threads.push(thread);
568 }
569
570 barrier.wait();
571
572 for thread in threads.into_iter() {
573 thread.join().unwrap();
574 }
575
576 drop(log);
577
578 let mut iter = config.recover().unwrap();
579
580 let mut successes = 0;
581 while successes < n_threads * n_ops_per_thread {
582 if let Some(next) = iter.next().unwrap().pop() {
583 let value = u64::from_le_bytes(
584 next.try_into().unwrap(),
585 );
586 assert_eq!(value, successes as u64);
587 successes += 1;
588 }
589 }
590
591 drop(iter);
592
593 let _ = std::fs::remove_dir_all(&config.path);
594}