append_log/
log.rs

1extern crate crc32fast;
2
3use std::fs::{File, OpenOptions};
4use std::io::prelude::*;
5use std::io::SeekFrom;
6use std::path::Path;
7
8use crc32fast::Hasher as CRC32;
9
10use crate::EntryIterator;
11use crate::Error;
12use crate::Options;
13
14#[derive(Debug)]
15pub struct Log {
16    file: File,
17    options: Options,
18    buffer: Vec<u8>,
19    offset: u64,
20
21    // Offset of last written data
22    last_data_off: u64,
23}
24
25#[derive(Debug)]
26pub struct Chunk {
27    pub data: Vec<u8>,
28    pub next: u64,
29}
30
31impl Log {
32    pub fn open_default(path: &Path) -> Result<Self, Error> {
33        Self::open(path, Options::default())
34    }
35
36    pub fn open(path: &Path, options: Options) -> Result<Self, Error> {
37        let file = OpenOptions::new()
38            .read(true)
39            .create(true)
40            .append(true)
41            .open(path)?;
42        let buffer = Vec::with_capacity(options.buffer_size);
43        let mut log = Log {
44            file,
45            options,
46            buffer,
47            offset: 0,
48            last_data_off: 0,
49        };
50
51        log.init()?;
52        Ok(log)
53    }
54
55    fn init(&mut self) -> Result<(), Error> {
56        let len = self.file.metadata()?.len();
57
58        // Find last marker
59        self.offset = len;
60        let block_size = self.options.block_size as u64;
61
62        // Empty file - continue
63        if self.offset == 0 {
64            return Ok(());
65        }
66
67        if self.offset % block_size != 0 {
68            return Err(Error::InvalidLength);
69        }
70
71        let mut magic: [u8; 8] = [0; 8];
72        let mut last_data_off: [u8; 8] = [0; 8];
73
74        // Check magic
75        self.file
76            .seek(SeekFrom::End(-((last_data_off.len() + magic.len()) as i64)))?;
77        self.file.read_exact(&mut last_data_off)?;
78        self.file.read_exact(&mut magic)?;
79
80        if u64::from_be_bytes(magic) != self.options.magic {
81            return Err(Error::InvalidMagic);
82        }
83
84        self.last_data_off = u64::from_be_bytes(last_data_off);
85
86        Ok(())
87    }
88
89    pub fn is_empty(&self) -> bool {
90        self.offset == 0
91    }
92
93    /// Size of the file
94    pub fn file_size(&self) -> u64 {
95        self.offset
96    }
97
98    pub fn last_data_off(&self) -> u64 {
99        self.last_data_off
100    }
101
102    pub fn repair(&self) -> Result<(), Error> {
103        Err(Error::NotImplemented)
104    }
105
106    /// Estimate the file size after appending the `data` and flushing it
107    /// immediately.
108    pub fn estimate(&mut self, data: &[u8]) -> u64 {
109        let mut offset = self.offset;
110
111        // Length
112        offset += 8;
113
114        // Checksum
115        offset += 4;
116
117        // Data
118        offset += data.len() as u64;
119
120        // Padding
121        let pad_size = self.options.pad_size as u64;
122        offset += pad_size - (offset % pad_size);
123
124        // Flushing:
125        //
126        // Magic + offset to last data
127        offset += 16;
128
129        // Padding
130        let block_size = self.options.block_size as u64;
131        offset += block_size - (offset % block_size);
132
133        offset
134    }
135
136    /// Append data to the log's buffer.
137    ///
138    /// NOTE: `flush()` must be called to write buffered data to the disk.
139    pub fn append(&mut self, data: &[u8]) -> u64 {
140        self.last_data_off = self.offset;
141
142        // Write `data` length
143
144        let len = (data.len() as u64).to_be_bytes();
145        self.buffer.extend(&len);
146        self.offset += len.len() as u64;
147
148        // Write checksum
149        let mut hash = CRC32::new();
150        hash.update(data);
151        let checksum: u32 = hash.finalize();
152        let checksum = checksum.to_be_bytes();
153
154        self.buffer.extend(&checksum);
155        self.offset += checksum.len() as u64;
156
157        // Write `data` itself
158        self.buffer.extend(data);
159        self.offset += data.len() as u64;
160
161        // Pad
162        let pad_size = self.options.pad_size as u64;
163        let pad = pad_size - (self.offset % pad_size);
164        self.buffer.extend(std::iter::repeat(0).take(pad as usize));
165        self.offset += pad;
166
167        self.last_data_off
168    }
169
170    pub fn read(&mut self, off: u64) -> Result<Chunk, Error> {
171        let mut len: [u8; 8] = [0; 8];
172        self.file.seek(SeekFrom::Start(off))?;
173        self.file.read_exact(&mut len)?;
174        let len = u64::from_be_bytes(len);
175
176        let mut crc32: [u8; 4] = [0; 4];
177        self.file.read_exact(&mut crc32)?;
178        let crc32 = u32::from_be_bytes(crc32);
179
180        let mut data: Vec<u8> = std::iter::repeat(0).take(len as usize).collect();
181        self.file.read_exact(&mut data)?;
182
183        let mut hash = CRC32::new();
184        hash.update(&data);
185        let checksum: u32 = hash.finalize();
186
187        if checksum != crc32 {
188            return Err(Error::InvalidChecksum);
189        }
190
191        let pad_size = self.options.pad_size as u64;
192        let mut next = off + 12 + len;
193        next += pad_size - (next % pad_size);
194
195        Ok(Chunk { data, next })
196    }
197
198    /// Write buffered data to the disk, padding it appropriately.
199    pub fn flush(&mut self) -> Result<(), Error> {
200        // No data to flush
201        if self.buffer.is_empty() {
202            return Ok(());
203        }
204
205        // Pad to the block size
206        let block_size = self.options.block_size as u64;
207        let pad = block_size - ((self.offset + 16) % block_size);
208        self.buffer.extend(std::iter::repeat(0).take(pad as usize));
209        self.offset += pad;
210
211        // Write offset of last data
212        let last_data_off = self.last_data_off.to_be_bytes();
213        self.buffer.extend(&last_data_off);
214        self.offset += last_data_off.len() as u64;
215
216        // Finish with magic value
217        let magic = self.options.magic.to_be_bytes();
218        self.buffer.extend(&magic);
219        self.offset += magic.len() as u64;
220
221        self.file.write_all(&self.buffer)?;
222        self.file.flush()?;
223        self.buffer.clear();
224
225        Ok(())
226    }
227
228    pub fn iter(&mut self) -> Result<EntryIterator, Error> {
229        EntryIterator::with_log(self)
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    extern crate tempfile;
236
237    use tempfile::tempdir;
238
239    use super::*;
240
241    #[test]
242    fn it_should_create_log() {
243        let dir = tempdir().expect("temporary directory to create");
244        let log_path = dir.path().join("log.db");
245
246        // Write data
247        {
248            let mut log = Log::open_default(&log_path).expect("log to open");
249
250            log.append(&[1, 2, 3]);
251            log.append(&[4, 5, 6]);
252            log.flush().expect("flush to succeed");
253        }
254
255        // Re-open log
256        {
257            let mut log = Log::open_default(&log_path).expect("log to re-open");
258
259            assert_eq!(log.last_data_off(), 16);
260            let chunk = log.read(log.last_data_off()).expect("read to succeed");
261            assert_eq!(chunk.data, vec![4, 5, 6]);
262            assert_eq!(chunk.next, 32);
263
264            let chunk = log.read(0).expect("read to succeed");
265            assert_eq!(chunk.next, 16);
266        }
267    }
268
269    #[test]
270    fn it_should_iterate() {
271        let dir = tempdir().expect("temporary directory to create");
272        let log_path = dir.path().join("log.db");
273
274        // Write data
275        let mut log = Log::open_default(&log_path).expect("log to open");
276
277        {
278            let mut iter = log.iter().expect("iterator to be created");
279            assert!(iter.next().is_none());
280        }
281
282        log.append(&[1, 2, 3]);
283        log.append(&[4, 5, 6]);
284        let size = log.estimate(&[7, 8, 9]);
285        log.append(&[7, 8, 9]);
286        log.flush().expect("flush to succeed");
287
288        assert_eq!(log.file_size(), size);
289
290        {
291            let mut iter = log.iter().expect("iterator to be created");
292
293            let chunk = iter
294                .next()
295                .expect("1st chunk")
296                .expect("1st chunk to be read");
297            assert_eq!(chunk, vec![1, 2, 3]);
298
299            let chunk = iter
300                .next()
301                .expect("2nd chunk")
302                .expect("2nd chunk to be read");
303            assert_eq!(chunk, vec![4, 5, 6]);
304
305            let chunk = iter
306                .next()
307                .expect("3rd chunk")
308                .expect("3rd chunk to be read");
309            assert_eq!(chunk, vec![7, 8, 9]);
310
311            assert!(iter.next().is_none());
312        }
313    }
314}