raindb 1.0.0

A persistent key-value store based on an LSM tree implemented in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
// Copyright (c) 2021 Google LLC
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

/*!
The log file format is used by both write-ahead logs and manifest files (a.k.a. descriptor logs).

The log file contents are series of 32 KiB blocks.

The current header of a block is 3 bytes and consists of a 2 byte u16 length and a 1 byte record
type.

A record never starts within the last 2 bytes of a block (since it won't fit). Any leftover bytes
here form the trailer, which must consist entirely of zero bytes and must be skipped by readers.

Note that if exactly three bytes are left in the current block, and a new non-zero length record is
added, the writer must emit a `[BlockType::First](BlockType::First)` record (which contains zero
bytes of user data) to fill up the trailing three bytes of the block and then emit all of the user
data in subsequent blocks.
*/

use crc::{Crc, CRC_32_ISCSI};
use integer_encoding::FixedInt;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::io::{ErrorKind, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::errors::{DBIOError, LogIOError, LogSerializationErrorKind};
use crate::fs::{FileSystem, RandomAccessFile, ReadonlyRandomAccessFile};
use crate::utils::crc::{mask_checksum, unmask_checksum};

/**
The length of block headers.

This is 7 bytes.
*/
const HEADER_LENGTH_BYTES: usize = 4 + 2 + 1;

/**
The size of blocks in the log file format.

This is set at 32 KiB.
*/
const BLOCK_SIZE_BYTES: usize = 32 * 1024;

/**
CRC calculator using the iSCSI polynomial.

LevelDB uses the [google/crc32c](https://github.com/google/crc32c) CRC implementation. This
implementation specifies using the iSCSI polynomial so that is what we use here as well.
*/
const CRC_CALCULATOR: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);

/// Alias for a [`Result`] that wraps a [`LogIOError`].
type LogIOResult<T> = Result<T, LogIOError>;

/**
Block record types denote whether the data contained in the block is split across multiple
blocks or if they contain all of the data for a single user record.

Note, the use of record is overloaded here. Be aware of the distinction between a block record
and the actual user record.
*/
#[repr(u8)]
#[derive(Clone, Copy, Debug)]
pub(crate) enum BlockType {
    /// Denotes that the block contains the entirety of a user record.
    Full = 0,
    /// Denotes the first fragment of a user record.
    First,
    /// Denotes the interior fragments of a user record.
    Middle,
    /// Denotes the last fragment of a user record.
    Last,
}

impl TryFrom<u8> for BlockType {
    type Error = LogIOError;

    fn try_from(value: u8) -> LogIOResult<BlockType> {
        let operation = match value {
            0 => BlockType::Full,
            1 => BlockType::First,
            2 => BlockType::Middle,
            3 => BlockType::Last,
            _ => {
                return Err(LogIOError::Seralization(LogSerializationErrorKind::Other(
                    format!(
                        "There was an problem parsing the block type. The value received was {}",
                        value
                    ),
                )))
            }
        };

        Ok(operation)
    }
}

/**
A record that is stored in a particular block. It is potentially only a fragment of a full user
record.

# Serialization

When serialized to disk the block record will have the following format:

1. A 32-bit checksum of the data
1. The length as a 2-byte integer with a fixed-size encoding
1. The block type converted to a 1 byte integer with a fixed-size encoding
1. The data

*/
#[derive(Debug)]
pub(crate) struct BlockRecord {
    /// A checksum of the data in this block.
    checksum: u32,

    /// The size of the data within the block.
    length: u16,

    /// The [`BlockType`] of the block.
    block_type: BlockType,

    /// User data to be stored in a block.
    data: Vec<u8>,
}

/// Crate-only methods
impl BlockRecord {
    pub(crate) fn new(length: u16, block_type: BlockType, data: Vec<u8>) -> Self {
        let checksum = CRC_CALCULATOR.checksum(&data);

        Self {
            checksum,
            length,
            block_type,
            data,
        }
    }
}

impl From<&BlockRecord> for Vec<u8> {
    fn from(record: &BlockRecord) -> Self {
        let initial_capacity = HEADER_LENGTH_BYTES + record.data.len();
        let mut buf: Vec<u8> = Vec::with_capacity(initial_capacity);
        // Mask the checksum before storage in case there are other checksums being done
        buf.extend_from_slice(&u32::encode_fixed_vec(mask_checksum(record.checksum)));
        buf.extend_from_slice(&u16::encode_fixed_vec(record.length));
        buf.extend_from_slice(&[record.block_type as u8]);
        buf.extend_from_slice(&record.data);

        buf
    }
}

impl TryFrom<&Vec<u8>> for BlockRecord {
    type Error = LogIOError;

    fn try_from(buf: &Vec<u8>) -> LogIOResult<BlockRecord> {
        if buf.len() < HEADER_LENGTH_BYTES {
            let error_msg = format!(
                "Failed to deserialize the provided buffer to a log block record. The buffer was \
                expected to be at least the size of the header ({} bytes) but was {}.",
                HEADER_LENGTH_BYTES,
                buf.len()
            );
            return Err(LogIOError::Seralization(LogSerializationErrorKind::Other(
                error_msg,
            )));
        }

        // The first four bytes are the length of the data
        let checksum = u32::decode_fixed(&buf[0..4]);
        let unmasked_checksum = unmask_checksum(checksum);

        // The next two bytes are the length of the data
        let data_length = u16::decode_fixed(&buf[4..6]);

        // The last byte should be the block type
        let block_type: BlockType = buf[6].try_into()?;

        // Get data and check the integrity of the data
        let data = buf[HEADER_LENGTH_BYTES..].to_vec();
        let calculated_checksum = CRC_CALCULATOR.checksum(&data);
        if calculated_checksum != unmasked_checksum {
            return Err(LogIOError::Seralization(LogSerializationErrorKind::Other(
                format!(
                    "The checksums of the data did not match. Expected {unmasked_checksum} but \
                    got {calculated_checksum}"
                ),
            )));
        }

        Ok(BlockRecord::new(data_length, block_type, data))
    }
}

/** Handles all write activity to a log file. */
pub(crate) struct LogWriter {
    /// The path to the log file.
    log_file_path: PathBuf,

    /// The underlying file representing the log.
    log_file: Box<dyn RandomAccessFile>,

    /**
    The offset in the current block being written to.

    This position is not necessarily aligned to a block i.e. it can be in the middle of a block
    during a write operation.
    */
    current_block_offset: usize,
}

/// Public methods
impl LogWriter {
    /// Construct a new [`LogWriter`].
    pub fn new<P: AsRef<Path>>(
        fs: Arc<dyn FileSystem>,
        log_file_path: P,
        is_appending: bool,
    ) -> LogIOResult<Self> {
        log::info!(
            "Creating/appending to a log file at {}",
            log_file_path.as_ref().to_string_lossy()
        );
        let log_file = fs.create_file(log_file_path.as_ref(), is_appending)?;

        let mut block_offset = 0;
        let log_file_size = log_file.len()? as usize;
        if log_file_size > 0 {
            block_offset = log_file_size % BLOCK_SIZE_BYTES;
        }

        Ok(LogWriter {
            log_file_path: log_file_path.as_ref().to_path_buf(),
            log_file,
            current_block_offset: block_offset,
        })
    }

    /// Append `data` to the log.
    pub fn append(&mut self, data: &[u8]) -> LogIOResult<()> {
        let mut data_to_write = data;
        let mut is_first_data_chunk = true;

        loop {
            let block_available_space = BLOCK_SIZE_BYTES - self.current_block_offset;
            if block_available_space < HEADER_LENGTH_BYTES {
                if block_available_space > 0 {
                    log::debug!(
                        "Log file {:?}. There is not enough remaining space in the current block \
                    for the header. Filling it with zeroes.",
                        self.log_file_path
                    );
                    self.log_file
                        .write_all(&vec![0; HEADER_LENGTH_BYTES - 1][0..block_available_space])?;
                }

                // Switch to a new block
                self.current_block_offset = 0;
            }

            let space_available_for_data =
                BLOCK_SIZE_BYTES - self.current_block_offset - HEADER_LENGTH_BYTES;

            // The length available for the next data chunk a.k.a. how much of the buffer can
            // actually be written
            let block_data_chunk_length = if data_to_write.len() < space_available_for_data {
                data_to_write.len()
            } else {
                space_available_for_data
            };

            let is_last_data_chunk = data_to_write.len() == block_data_chunk_length;
            let block_type = if is_first_data_chunk && is_last_data_chunk {
                BlockType::Full
            } else if is_first_data_chunk {
                BlockType::First
            } else if is_last_data_chunk {
                BlockType::Last
            } else {
                BlockType::Middle
            };

            self.emit_block(block_type, &data_to_write[0..block_data_chunk_length])?;
            // Remove chunk that was written from the front
            data_to_write = data_to_write.split_at(block_data_chunk_length).1;
            is_first_data_chunk = false;

            if data_to_write.is_empty() {
                // Use a do-while loop formulation so that we emit a zero-length block if asked to
                // to append an empty buffer (same as in LevelDB)
                break;
            }
        }

        Ok(())
    }
}

/// Private methods
impl LogWriter {
    /// Write the block out to the underlying medium.
    fn emit_block(&mut self, block_type: BlockType, data_chunk: &[u8]) -> LogIOResult<()> {
        // Convert `usize` to `u16` so that it fits in our header format.
        let data_length = u16::try_from(data_chunk.len())?;
        let block = BlockRecord::new(data_length, block_type, data_chunk.to_vec());

        log::debug!(
            "Writing new record to log file at {:?} with length {} and block type {:?}.",
            self.log_file_path,
            data_length,
            block.block_type
        );
        self.log_file
            .write_all(Vec::<u8>::from(&block).as_slice())?;
        self.log_file.flush()?;

        let bytes_written = HEADER_LENGTH_BYTES + data_chunk.len();
        self.current_block_offset += bytes_written;
        log::debug!("Wrote {} bytes to the log file.", bytes_written);
        Ok(())
    }

    /// Get the length of the underlying log file.
    fn len(&self) -> LogIOResult<u64> {
        Ok(self.log_file.len()?)
    }
}

impl fmt::Debug for LogWriter {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("LogWriter")
            .field("log_file_path", &self.log_file_path)
            .finish()
    }
}

/** Handles all read activity to a log file. */
pub(crate) struct LogReader {
    /** The underlying file representing the log. */
    log_file: Box<dyn ReadonlyRandomAccessFile>,

    /// The path to the log file.
    log_file_path: PathBuf,

    /** The underlying file representing the log.  */

    /**
    An initial offset to start reading the log file from.

    This can be a byte offset and is not necessarily aligned the start of a block.
    */
    initial_offset: usize,

    /**
    The offset to the byte being read.

    This should usually be aligned to the start of a block.

    # Legacy

    This is synonomous to LevelDB's `log::Reader::end_of_buffer_offset_`.
    */
    current_cursor_position: usize,

    /**
    The offset in the current block being written to.

    # Legacy

    This is synonomous to whenever LevelDB calls `log::Reader::buffer_.size()`. LevelDB uses a this
    private `buffer_` field when reading from the physical file backing the log reader. For
    efficiency, LevelDB reads the physical file a block at a time e.g. 32 KiB at time. RainDB
    attempts to simplify matters by just reading what we need. Reading a block at a time helps with
    efficiency because the log is usually read in a sequential manner and all entries are
    processed (e.g. during database recoveries).
    */
    current_block_offset: usize,
}

/// Public methods
impl LogReader {
    /**
    Construct a new [`LogReader`].

    * `fs`- The wrapped file system to use for I/O.
    * `log_file_path` - The absolute path to the log file.
    * `initial_block_offset` - An initial offset to start reading the log file from.
    */
    pub fn new<P: AsRef<Path>>(
        fs: Arc<dyn FileSystem>,
        log_file_path: P,
        initial_block_offset: usize,
    ) -> LogIOResult<Self> {
        log::info!("Reading the log file at {:?}", log_file_path.as_ref());
        let log_file = fs.open_file(log_file_path.as_ref())?;

        let reader = Self {
            log_file,
            log_file_path: log_file_path.as_ref().to_path_buf(),
            initial_offset: initial_block_offset,
            current_cursor_position: initial_block_offset,
            current_block_offset: 0,
        };

        Ok(reader)
    }

    /**
    Read a record from the log file.

    Returns a `Vec<u8>` containing the data that was read and a boolean indicating if the end of
    the file was reached.
    */
    pub fn read_record(&mut self) -> LogIOResult<(Vec<u8>, bool)> {
        if self.current_cursor_position < self.initial_offset {
            self.current_cursor_position += self.seek_to_initial_block()? as usize;
        }

        if self.current_cursor_position > 0
            && (self.current_cursor_position as u64) >= self.len()?
        {
            return Ok((vec![], true));
        }

        // A buffer consolidating all of the fragments retrieved from the log file.
        let mut data_buffer: Vec<u8> = vec![];

        loop {
            let maybe_record = self.read_physical_record();
            if let Err(physical_read_err) = maybe_record {
                if let LogIOError::IO(db_io_error) = &physical_read_err {
                    match db_io_error.kind() {
                        ErrorKind::UnexpectedEof => return Ok((vec![], true)),
                        _ => return Err(physical_read_err),
                    }
                }
            } else {
                let record = maybe_record.unwrap();
                data_buffer.extend(record.data);

                match record.block_type {
                    BlockType::Full => {
                        return Ok((data_buffer, false));
                    }
                    BlockType::First => {}
                    BlockType::Middle => {}
                    BlockType::Last => {
                        return Ok((data_buffer, false));
                    }
                }
            }
        }
    }
}

/// Private methods.
impl LogReader {
    /**
    Seek to the first block on or before the initial offset that was provided.

    The initial offset may land in the middle of a block's byte range. We will look backwards to
    find the start of the block if this happens. Corrupted data will be logged and skipped.

    Returns the number of bytes the cursor was moved.
    */
    fn seek_to_initial_block(&mut self) -> LogIOResult<u64> {
        let offset_in_block = self.initial_offset % BLOCK_SIZE_BYTES;
        let mut block_start_position = self.initial_offset - offset_in_block;

        // Skip ahead if we landed in the trailer.
        if offset_in_block > (BLOCK_SIZE_BYTES - (HEADER_LENGTH_BYTES - 1)) {
            block_start_position += BLOCK_SIZE_BYTES;
        }

        // Move the file's underlying cursor to the start of the first block
        if block_start_position > 0 {
            return Ok(self
                .log_file
                .seek(SeekFrom::Start(block_start_position as u64))?);
        }

        Ok(0)
    }

    /**
    Read the physical record from the file system and parse it into a [`BlockRecord`].

    Returns the parsed [`BlockRecord`].
    */
    fn read_physical_record(&mut self) -> LogIOResult<BlockRecord> {
        if BLOCK_SIZE_BYTES - self.current_block_offset < HEADER_LENGTH_BYTES {
            // There are not enough bytes left in the current block to form a header. This might
            // be a trailer so read up bytes until we hit the next record.
            let mut trailer = vec![0u8; BLOCK_SIZE_BYTES - self.current_block_offset];
            if !trailer.is_empty() {
                self.log_file.read_exact(&mut trailer)?;
                self.current_block_offset = 0;
                self.current_cursor_position += trailer.len();
            }
        }

        // Read the header
        let mut header_buffer = [0; HEADER_LENGTH_BYTES];
        let header_bytes_read = self.log_file.read(&mut header_buffer)?;
        if header_bytes_read < HEADER_LENGTH_BYTES {
            // The end of the file was reached before we were able to read a full header. This
            // can occur if the log writer died in the middle of writing the record.
            let err_msg = format!(
                "Unexpectedly reached the end of the log file at {log_file_path:?} while \
                attempting to read a header.",
                log_file_path = self.log_file_path
            );
            return Err(LogIOError::IO(DBIOError::new(
                ErrorKind::UnexpectedEof,
                err_msg,
            )));
        }
        self.current_block_offset += header_bytes_read;

        let data_length = u16::decode_fixed(&header_buffer[4..6]) as usize;

        // Read the payload
        let mut data_buffer = vec![0; data_length];
        let data_bytes_read = self.log_file.read(&mut data_buffer)?;

        if data_bytes_read < data_length {
            // The end of the file was reached before we were able to read a full data chunk. This
            // can occur if the log writer died in the middle of writing the record.
            let err_msg = format!(
                "Unexpectedly reached the end of the log file at {log_file_path:?} while \
                attempting to read the data chunk.",
                log_file_path = self.log_file_path
            );
            return Err(LogIOError::IO(DBIOError::new(
                ErrorKind::UnexpectedEof,
                err_msg,
            )));
        }

        // Parse the payload
        let serialized_block = [header_buffer.to_vec(), data_buffer].concat();
        let block_record: BlockRecord = BlockRecord::try_from(&serialized_block)?;
        self.current_cursor_position += header_buffer.len() + data_bytes_read;
        self.current_block_offset =
            (self.current_block_offset + data_bytes_read) % BLOCK_SIZE_BYTES;

        Ok(block_record)
    }

    /// Get the length of the underlying log file.
    fn len(&self) -> LogIOResult<u64> {
        Ok(self.log_file.len()?)
    }

    /// Log bytes dropped with the provided reason.
    fn log_drop(num_bytes_dropped: u64, reason: String) {
        log::error!(
            "Skipped reading {} bytes. Reason: {}",
            num_bytes_dropped,
            &reason
        );
    }

    /// Log bytes dropped because corruption was detected.
    fn log_corruption(num_bytes_corrupted: u64) {
        LogReader::log_drop(num_bytes_corrupted, "Detected corruption.".to_owned())
    }
}

#[cfg(test)]
mod tests {
    use pretty_assertions::assert_eq;
    use std::fmt::Write;

    use crate::fs::InMemoryFileSystem;

    use super::*;

    #[test]
    fn can_read_and_write() {
        let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
        let path = "wal-123.log";
        let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();

        log_writer.append(b"here is a string.").unwrap();
        log_writer.append(b"we got more strings over here").unwrap();
        log_writer.append(b"").unwrap();
        log_writer
            .append(b"there was an empty string somewhere")
            .unwrap();

        drop(log_writer);

        let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
        assert_eq!(
            b"here is a string.".to_vec(),
            log_reader.read_record().unwrap().0
        );
        assert_eq!(
            b"we got more strings over here".to_vec(),
            log_reader.read_record().unwrap().0
        );
        assert_eq!(b"".to_vec(), log_reader.read_record().unwrap().0);
        assert_eq!(
            b"there was an empty string somewhere".to_vec(),
            log_reader.read_record().unwrap().0
        );

        assert_eq!(
            Vec::<u8>::new(),
            log_reader.read_record().unwrap().0,
            "The reader should be at the end of the file but received an unexpected record."
        );
        assert_eq!(
            Vec::<u8>::new(),
            log_reader.read_record().unwrap().0,
            "The reader should be at the end of the file but received an unexpected record."
        );
    }

    #[test]
    fn can_read_and_write_with_lots_of_blocks() {
        let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
        let path = "wal-123.log";
        let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();

        for num in 0usize..100_000 {
            log_writer.append(format!("{num}").as_bytes()).unwrap();
        }
        println!("Log writer wrote {} bytes", log_writer.len().unwrap());
        drop(log_writer);

        let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
        println!("Log reader sees {} bytes", log_reader.len().unwrap());
        for num in 0usize..100_000 {
            let actual_record = log_reader.read_record().ok();
            if num >= 99_990 {
                println!(
                    "Actual record read {}",
                    std::str::from_utf8(&actual_record.as_ref().unwrap().0).unwrap()
                );
            }
            assert!(
                actual_record.unwrap().0 == format!("{num}").as_bytes(),
                "Failed to properly read at iteration {num}"
            );
        }

        assert_eq!(
            Vec::<u8>::new(),
            log_reader.read_record().unwrap().0,
            "The reader should be at the end of the file but received an unexpected record."
        );
    }

    #[test]
    fn can_read_and_write_with_records_larger_than_the_block_size_limit() {
        let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
        let path = "wal-123.log";
        let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();

        log_writer.append(b"small").unwrap();
        log_writer
            .append(&generate_large_buffer("medium", 50_000))
            .unwrap();
        log_writer
            .append(&generate_large_buffer("large", 100_000))
            .unwrap();

        let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
        assert_eq!(log_reader.read_record().unwrap().0, b"small");
        assert_eq!(
            log_reader.read_record().unwrap().0,
            generate_large_buffer("medium", 50_000)
        );
        assert_eq!(
            log_reader.read_record().unwrap().0,
            generate_large_buffer("large", 100_000)
        );
        assert_eq!(
            Vec::<u8>::new(),
            log_reader.read_record().unwrap().0,
            "The reader should be at the end of the file but received an unexpected record."
        );
    }

    #[test]
    fn read_can_differentiate_an_empty_record_from_end_of_file() {
        let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
        let path = "wal-123.log";
        let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();

        log_writer.append(b"small").unwrap();
        log_writer
            .append(&generate_large_buffer("medium", 50_000))
            .unwrap();
        log_writer.append(&[]).unwrap();
        log_writer
            .append(&generate_large_buffer("large", 100_000))
            .unwrap();

        let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
        assert_eq!(
            log_reader.read_record().unwrap(),
            (b"small".to_vec(), false)
        );
        assert_eq!(
            log_reader.read_record().unwrap(),
            (generate_large_buffer("medium", 50_000), false)
        );
        assert_eq!(log_reader.read_record().unwrap(), (vec![], false));
        assert_eq!(
            log_reader.read_record().unwrap(),
            (generate_large_buffer("large", 100_000), false)
        );
        assert_eq!(
            log_reader.read_record().unwrap(),
            (vec![], true),
            "The reader should be at the end of the file but received an unexpected record."
        );
    }

    #[test]
    fn read_returns_eof_when_encountering_an_empty_log() {
        let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
        let path = "wal-123.log";
        LogWriter::new(Arc::clone(&fs), path, false).unwrap();

        let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();

        assert_eq!(log_reader.read_record().unwrap(), (vec![], true));
    }

    #[test]
    fn when_a_there_is_only_enough_room_for_an_empty_record_will_write_a_header_with_no_padding() {
        let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
        let path = "wal-123.log";
        let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();
        let marginal_size = BLOCK_SIZE_BYTES - (2 * HEADER_LENGTH_BYTES);

        log_writer
            .append(&generate_large_buffer("foo", marginal_size))
            .unwrap();
        assert_eq!(
            log_writer.len().unwrap(),
            (BLOCK_SIZE_BYTES - HEADER_LENGTH_BYTES) as u64
        );

        log_writer.append(b"").unwrap();
        log_writer.append(b"bar").unwrap();

        let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
        assert_eq!(
            log_reader.read_record().unwrap().0,
            generate_large_buffer("foo", marginal_size)
        );
        assert_eq!(log_reader.read_record().unwrap().0, b"");
        assert_eq!(log_reader.read_record().unwrap().0, b"bar");
        assert_eq!(
            Vec::<u8>::new(),
            log_reader.read_record().unwrap().0,
            "The reader should be at the end of the file but received an unexpected record."
        );
    }

    #[test]
    fn can_reopen_a_log_and_append() {
        let fs: Arc<dyn FileSystem> = Arc::new(InMemoryFileSystem::new());
        let path = "wal-123.log";
        let mut log_writer = LogWriter::new(Arc::clone(&fs), path, false).unwrap();
        log_writer.append(b"foo").unwrap();

        let mut log_writer = LogWriter::new(Arc::clone(&fs), path, true).unwrap();
        log_writer.append(b"bar").unwrap();

        let mut log_reader = LogReader::new(Arc::clone(&fs), path, 0).unwrap();
        assert_eq!(log_reader.read_record().unwrap().0, b"foo");
        assert_eq!(log_reader.read_record().unwrap().0, b"bar");
        assert_eq!(
            Vec::<u8>::new(),
            log_reader.read_record().unwrap().0,
            "The reader should be at the end of the file but received an unexpected record."
        );
    }

    fn generate_large_buffer(pattern: &str, target_size: usize) -> Vec<u8> {
        let mut string = String::with_capacity(target_size);
        while string.len() < target_size {
            write!(string, "{pattern}").unwrap();
        }

        string.as_bytes().to_vec()
    }
}