1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::serialization::{Addr, SerializationSink};
use parking_lot::Mutex;
use std::error::Error;
use std::fs;
use std::io::Write;
use std::path::Path;

pub struct FileSerializationSink {
    data: Mutex<Inner>,
}

struct Inner {
    file: fs::File,
    buffer: Vec<u8>,
    buf_pos: usize,
    addr: u32,
}

impl SerializationSink for FileSerializationSink {
    fn from_path(path: &Path) -> Result<Self, Box<dyn Error>> {
        fs::create_dir_all(path.parent().unwrap())?;

        let file = fs::File::create(path)?;

        Ok(FileSerializationSink {
            data: Mutex::new(Inner {
                file,
                buffer: vec![0; 1024 * 512],
                buf_pos: 0,
                addr: 0,
            }),
        })
    }

    #[inline]
    fn write_atomic<W>(&self, num_bytes: usize, write: W) -> Addr
    where
        W: FnOnce(&mut [u8]),
    {
        let mut data = self.data.lock();
        let Inner {
            ref mut file,
            ref mut buffer,
            ref mut buf_pos,
            ref mut addr,
        } = *data;

        let curr_addr = *addr;
        *addr += num_bytes as u32;

        let buf_start = *buf_pos;
        let buf_end = buf_start + num_bytes;

        if buf_end <= buffer.len() {
            // We have enough space in the buffer, just write the data to it.
            write(&mut buffer[buf_start..buf_end]);
            *buf_pos = buf_end;
        } else {
            // We don't have enough space in the buffer, so flush to disk
            file.write_all(&buffer[..buf_start]).unwrap();

            if num_bytes <= buffer.len() {
                // There's enough space in the buffer, after flushing
                write(&mut buffer[0..num_bytes]);
                *buf_pos = num_bytes;
            } else {
                // Even after flushing the buffer there isn't enough space, so
                // fall back to dynamic allocation
                let mut temp_buffer = vec![0; num_bytes];
                write(&mut temp_buffer[..]);
                file.write_all(&temp_buffer[..]).unwrap();
                *buf_pos = 0;
            }
        }

        Addr(curr_addr)
    }

    fn write_bytes_atomic(&self, bytes: &[u8]) -> Addr {
        if bytes.len() < 128 {
            // For "small" pieces of data, use the regular implementation so we
            // don't repeatedly flush an almost empty buffer to disk.
            return self.write_atomic(bytes.len(), |sink| sink.copy_from_slice(bytes));
        }

        let mut data = self.data.lock();
        let Inner {
            ref mut file,
            ref mut buffer,
            ref mut buf_pos,
            ref mut addr,
        } = *data;

        let curr_addr = *addr;
        *addr += bytes.len() as u32;

        if *buf_pos > 0 {
            // There's something in the buffer, flush it to disk
            file.write_all(&buffer[..*buf_pos]).unwrap();
            *buf_pos = 0;
        }

        // Now write the whole input to disk, skipping the write buffer
        file.write_all(bytes).unwrap();

        Addr(curr_addr)
    }
}

impl Drop for FileSerializationSink {
    fn drop(&mut self) {
        let mut data = self.data.lock();
        let Inner {
            ref mut file,
            ref mut buffer,
            ref mut buf_pos,
            addr: _,
        } = *data;

        if *buf_pos > 0 {
            file.write_all(&buffer[..*buf_pos]).unwrap();
        }
    }
}