bitrust/
storage.rs

1use std::fs::{File, OpenOptions};
2use std::io::{self, BufRead, BufReader, Read, Seek, SeekFrom, Write};
3use std::marker::PhantomData;
4use std::path::PathBuf;
5
6use byteorder::{BigEndian, ReadBytesExt};
7use bytes::{BufMut, BytesMut};
8use errors::*;
9use protobuf::Message;
10use util;
11
12use common::FileID;
13
14// Offsets of recordio items in a payload.
15// The offset of the data checksum is dependent on data size, so is not declared
16// here.
17const RECORD_SIZE_OFFSET: usize = 0;
18const RECORD_SIZE_CRC_OFFSET: usize = 8;
19const RECORD_DATA_OFFSET: usize = 12;
20
21/// Returns the size of the given message, when stored. Currently there is a
22/// constant overhead of 16 bytes (8 bytes for message size, 4 bytes for message
23/// size CRC32, 4 bytes for data CRC32).
24#[inline(always)]
25pub fn payload_size_for_record<M>(msg: &M) -> u64
26where
27  M: protobuf::Message,
28{
29  msg.compute_size() as u64 + 16
30}
31
32/// A trait for sequential and random read access to recordio logs. Each payload
33/// in a log is laid out as follows:
34///   
35///   record_size: u64
36///   record_size_crc: u32
37///   record_data: [u8; record_size]
38///   record_data_crc: u32
39///
40pub trait RecordRead {
41  type Reader: BufRead + Seek;
42  type Message: protobuf::Message;
43
44  /// Impls must provide this method, and the rest of the behaviour is provided
45  /// by default.
46  fn reader<'a>(&'a mut self) -> Result<&'a mut Self::Reader>;
47
48  /// Reads exactly the requested number of bytes from self.reader() using
49  /// BufRead::read_exact(), returning any errors encountered in the process.
50  fn read_exact_from_current_offset(&mut self, bytes: &mut [u8]) -> Result<()> {
51    self.reader()?.read_exact(bytes).map_err(|e| e.into())
52  }
53
54  /// Reads exactly requested number of bytes from self.reader() starting at
55  /// `offset_from_start` using BufRead::read_exact() (so a seek followed by
56  /// read calls). Returns any errors encountered during seeking/reading.
57  fn read_exact(
58    &mut self,
59    offset_from_start: u64,
60    bytes: &mut [u8],
61  ) -> Result<()> {
62    self.reader()?.seek(SeekFrom::Start(offset_from_start))?;
63    self.read_exact_from_current_offset(bytes)
64  }
65
66  /// Returns if self.reader() has reached EOF.
67  fn eof(&mut self) -> Result<bool> {
68    Ok(self.reader()?.fill_buf()?.is_empty())
69  }
70
71  /// Returns the next complete record in the reader wrapped by self.reader().
72  /// If the reader is already at EOF, returns Ok(None).
73  fn next_record(&mut self) -> Result<Option<Self::Message>> {
74    let mut header = [0u8; RECORD_DATA_OFFSET];
75    if self.eof()? {
76      return Ok(None);
77    }
78    self
79      .read_exact_from_current_offset(&mut header[..])
80      .chain_err(|| "Error reading record size and record size CRC")?;
81    let record_size = (&header[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET])
82      .read_u64::<BigEndian>()
83      .chain_err(|| "Error reading record size from serialized header")?;
84    let record_size_crc = (&header[RECORD_SIZE_CRC_OFFSET..])
85      .read_u32::<BigEndian>()
86      .chain_err(|| "Error reading record size CRC from serialized header")?;
87    if util::checksum_crc32(&header[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET])
88      != record_size_crc
89    {
90      return Err(
91        ErrorKind::InvalidData("CRC failed for record size".to_string()).into(),
92      );
93    }
94    let mut data_and_crc = vec![0u8; (record_size + 4) as usize];
95    self
96      .read_exact_from_current_offset(&mut data_and_crc)
97      .chain_err(|| "Error reading record data")?;
98    let data_crc =
99      (&data_and_crc[record_size as usize..]).read_u32::<BigEndian>()?;
100    if util::checksum_crc32(&data_and_crc[..record_size as usize]) != data_crc {
101      return Err(
102        ErrorKind::InvalidData("CRC failed for record data".to_string()).into(),
103      );
104    }
105    protobuf::parse_from_bytes::<Self::Message>(
106      &data_and_crc[..record_size as usize],
107    )
108    .chain_err(|| "Error parsing data")
109    .map(|m| Some(m))
110  }
111
112  /// Seeks self.reader() to `offset_from_start` and then reads a complete
113  /// record starting at that offset.
114  fn record_at_offset(
115    &mut self,
116    offset_from_start: u64,
117  ) -> Result<Option<Self::Message>> {
118    self.reader()?.seek(SeekFrom::Start(offset_from_start))?;
119    self.next_record()
120  }
121
122  /// Seeks to `offset_from_start`, reads `payload_size` bytes, and attempts to
123  /// read a log record from these bytes. If the payload size is known in advance, this can be faster
124  /// than record_at_offset(), which first has to read the record size.
125  fn read_record(
126    &mut self,
127    offset_from_start: u64,
128    payload_size: u64,
129  ) -> Result<Self::Message> {
130    let mut payload = vec![0u8; payload_size as usize];
131    self.read_exact(offset_from_start, &mut payload)?;
132    let mut record_size_slice =
133      &payload[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET];
134    let record_size_slice_crc = util::checksum_crc32(record_size_slice);
135    let record_size = record_size_slice
136      .read_u64::<BigEndian>()
137      .chain_err(|| format!("Error reading record size from bytes"))?;
138    let record_size_crc = (&payload
139      [RECORD_SIZE_CRC_OFFSET..RECORD_DATA_OFFSET])
140      .read_u32::<BigEndian>()
141      .chain_err(|| format!("Error reading record size CRC from bytes"))?;
142    if record_size_slice_crc != record_size_crc {
143      return Err(
144        ErrorKind::InvalidData("CRC failed for record size".to_string()).into(),
145      );
146    }
147    let record_data_crc_offset = RECORD_DATA_OFFSET + record_size as usize;
148    let record_data = &payload[RECORD_DATA_OFFSET..record_data_crc_offset];
149    let data_crc =
150      (&payload[record_data_crc_offset..]).read_u32::<BigEndian>()?;
151    if util::checksum_crc32(record_data) != data_crc {
152      return Err(
153        ErrorKind::InvalidData("CRC failed for record data".to_string()).into(),
154      );
155    }
156    protobuf::parse_from_bytes::<Self::Message>(record_data)
157      .chain_err(|| "Error reading message body from bytes")
158      .map_err(|e| e.into())
159  }
160}
161
162// Start offset and size of the written payload (which includes record size and
163// CRCs).
164pub type AppendRecordResult = Result<(u64, u64)>;
165
166pub trait SyncWrite: Write {
167  fn sync(&mut self) -> std::io::Result<()>;
168  fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
169    println!("SyncWrite::write");
170    let n = Write::write(self, buf)?;
171    self.sync()?;
172    Ok(n)
173  }
174
175  fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
176    Write::write_all(self, buf)?;
177    Write::flush(self)?;
178    self.sync()?;
179    Ok(())
180  }
181}
182
183impl SyncWrite for File {
184  fn sync(&mut self) -> std::io::Result<()> {
185    File::sync_data(self)
186  }
187}
188
189pub trait RecordAppend {
190  type Writer: SyncWrite + Seek;
191  type Message: protobuf::Message;
192
193  fn writer<'a>(&'a mut self) -> Result<&'a mut Self::Writer>;
194
195  /// Returns the current position in self.writer() by seeking 0 bytes from
196  /// the current position. Impls can provide a more efficient version by
197  /// keeping track of write offsets themselves so we don't touch disk for
198  /// this operation.
199  fn tell(&mut self) -> Result<u64> {
200    self
201      .writer()?
202      .seek(SeekFrom::Current(0))
203      .map_err(|e| e.into())
204  }
205
206  /// Appends the given `record` to self.writer() and returns the starting offset
207  /// of the record, and the total size of the payload. So for the very first
208  /// record appended, this offset is 0. Currently the size overhead is 16 bytes
209  /// per record.
210  fn append_record(&mut self, record: &Self::Message) -> AppendRecordResult {
211    let offset = self.tell()?;
212    let payload_size_bytes = payload_size_for_record(record);
213    let mut payload = BytesMut::with_capacity(payload_size_bytes as usize);
214    payload.put_u64_be(record.compute_size() as u64);
215    let record_size_crc = util::checksum_crc32(&payload[..8]);
216    payload.put_u32_be(record_size_crc);
217    let record_data: Vec<u8> = record.write_to_bytes()?;
218    let record_crc = util::checksum_crc32(&record_data);
219    payload.put(record_data);
220    payload.put_u32_be(record_crc);
221    SyncWrite::write_all(self.writer()?, &payload)?;
222    Ok((offset, payload.len() as u64))
223  }
224
225  /// Retreats the writer `bytes_to_retreat` bytes back. This is useful for
226  /// "unappending" a log record, but only if its size is known.
227  fn retreat(&mut self, bytes_to_retreat: u64) -> Result<u64> {
228    self.writer()?.flush()?;
229    self
230      .writer()?
231      .seek(SeekFrom::Current(-(bytes_to_retreat as i64)))
232      .map_err(|e| e.into())
233  }
234}
235
236#[cfg(test)]
237pub mod test_utils {
238  use super::*;
239  use bitrust_pb::BitRustDataRecord;
240  use std::io::Cursor;
241  impl SyncWrite for Cursor<Vec<u8>> {
242    fn sync(&mut self) -> std::io::Result<()> {
243      Write::flush(self)?;
244      Ok(())
245    }
246  }
247
248  pub struct CursorBasedReader<T> {
249    pub cursor: Cursor<T>,
250  }
251
252  impl<T: AsRef<[u8]>> RecordRead for CursorBasedReader<T> {
253    type Message = BitRustDataRecord;
254    type Reader = Cursor<T>;
255
256    fn reader<'a>(&'a mut self) -> Result<&'a mut Cursor<T>> {
257      Ok(&mut self.cursor)
258    }
259  }
260
261  pub struct CursorBasedWriter {
262    pub cursor: Cursor<Vec<u8>>,
263  }
264
265  impl RecordAppend for CursorBasedWriter {
266    type Message = BitRustDataRecord;
267    type Writer = Cursor<Vec<u8>>;
268
269    fn writer<'a>(&'a mut self) -> Result<&'a mut Cursor<Vec<u8>>> {
270      Ok(&mut self.cursor)
271    }
272  }
273}
274
275// struct ActiveFile is just a collection of things related to the active file
276// -- the current log of writes. We hold two file handles to this file, one for
277// writing, which happens only by appending, and one handle for reading, which
278// we can seek freely.
279#[derive(Debug)]
280pub struct FileBasedRecordReader<T> {
281  reader: BufReader<File>,
282  pub path: PathBuf,
283  pub id: FileID,
284  phantom: PhantomData<T>,
285}
286
287impl<T: Message> FileBasedRecordReader<T> {
288  pub fn new(path: PathBuf) -> io::Result<FileBasedRecordReader<T>> {
289    let f = FileBasedRecordReader::<T> {
290      reader: BufReader::new(
291        OpenOptions::new().read(true).create(false).open(&path)?,
292      ),
293      id: util::file_id_from_path(&path),
294      path: path,
295      phantom: PhantomData,
296    };
297    Ok(f)
298  }
299}
300
301#[derive(Debug)]
302pub struct FileBasedRecordRW<T> {
303  writer: File,
304  reader: BufReader<File>,
305  pub path: PathBuf,
306  pub id: FileID,
307  pub w_offset: u64,
308  phantom: std::marker::PhantomData<T>,
309}
310
311impl<T: Message> FileBasedRecordRW<T> {
312  pub fn new(path: PathBuf) -> io::Result<FileBasedRecordRW<T>> {
313    let mut writer = OpenOptions::new()
314      .read(true)
315      .write(true)
316      .create(true)
317      .open(&path)?;
318    let w_offset = writer.seek(SeekFrom::End(0))?;
319    let f = FileBasedRecordRW::<T> {
320      writer,
321      reader: BufReader::new(
322        OpenOptions::new().read(true).create(false).open(&path)?,
323      ),
324      id: util::file_id_from_path(&path),
325      path: path,
326      phantom: PhantomData,
327      w_offset: w_offset,
328    };
329    debug!("Initialized active file");
330    Ok(f)
331  }
332
333  pub fn append_record(&mut self, record: &T) -> AppendRecordResult {
334    let (offset, payload_len) = RecordAppend::append_record(self, record)?;
335    self.w_offset += payload_len;
336    Ok((offset, payload_len))
337  }
338
339  pub fn retreat(&mut self, bytes_to_retreat: u64) -> Result<u64> {
340    self.w_offset = RecordAppend::retreat(self, bytes_to_retreat)?;
341    Ok(self.w_offset)
342  }
343}
344
345impl<T> Into<FileBasedRecordReader<T>> for FileBasedRecordRW<T> {
346  fn into(self) -> FileBasedRecordReader<T> {
347    FileBasedRecordReader {
348      reader: self.reader,
349      path: self.path,
350      id: self.id,
351      phantom: self.phantom,
352    }
353  }
354}
355
356impl<T: Message> RecordRead for FileBasedRecordReader<T> {
357  type Message = T;
358  type Reader = BufReader<File>;
359  fn reader<'a>(&'a mut self) -> Result<&'a mut BufReader<File>> {
360    Ok(&mut self.reader)
361  }
362}
363
364impl<T: Message> RecordRead for FileBasedRecordRW<T> {
365  type Message = T;
366  type Reader = BufReader<File>;
367  fn reader<'a>(&'a mut self) -> Result<&'a mut BufReader<File>> {
368    Ok(&mut self.reader)
369  }
370}
371
372impl<T: Message> RecordAppend for FileBasedRecordRW<T> {
373  type Message = T;
374  type Writer = File;
375  fn writer<'a>(&'a mut self) -> Result<&'a mut File> {
376    Ok(&mut self.writer)
377  }
378
379  fn tell(&mut self) -> Result<u64> {
380    Ok(self.w_offset)
381  }
382}
383
384#[cfg(test)]
385mod record_reader_tests {
386  use super::*;
387  use bitrust_pb::BitRustDataRecord;
388  extern crate simplelog;
389  extern crate tempfile;
390  use super::test_utils::*;
391  use byteorder::{BigEndian, ByteOrder};
392  use bytes::{BufMut, BytesMut};
393  use std::io::Cursor;
394
395  #[test]
396  fn test_next_record_fails_for_incomplete_data_at_end() {
397    // Start with only three bytes in the cursor. The first read for
398    // the record size should fail since it needs 4 bytes.
399    let mut reader = CursorBasedReader {
400      cursor: Cursor::new(vec![0u8, 0u8, 1u8]),
401    };
402    match reader.next_record() {
403      Ok(record) => panic!("Expected error, but got Ok({:?})", record),
404      Err(_) => {}
405    }
406  }
407
408  #[test]
409  fn test_next_record_succeeds_with_empty_stream() {
410    let mut reader = CursorBasedReader {
411      cursor: Cursor::new(vec![]),
412    };
413    let rec = reader.next_record();
414    match rec {
415      Ok(None) => {}
416      _ => panic!("Expected Ok(None), but got {:?}", rec),
417    }
418  }
419
420  #[test]
421  fn test_next_record_succeeds_with_nonempty_stream() {
422    let mut b = BytesMut::new();
423    let mut record = BitRustDataRecord::new();
424    record.set_timestamp(42);
425    record.set_key(b"foo".to_vec());
426    record.set_value(b"bar".to_vec());
427    b.put_u64_be(record.compute_size() as u64);
428    b.put_u32_be(util::checksum_crc32(&b[..8]));
429    let record_data = record
430      .write_to_bytes()
431      .expect("Failed to write test record to bytes");
432    b.put(&record_data);
433    b.put_u32_be(util::checksum_crc32(&record_data));
434    let mut reader = CursorBasedReader {
435      cursor: Cursor::new(b),
436    };
437    let record_from_cursor = reader
438      .next_record()
439      .expect("Failed to read next record from cursor");
440    assert!(
441      record_from_cursor
442        .clone()
443        .expect("Expected a record from cursor, but found None")
444        == record,
445      format!("Expected {:?}, found {:?}", record, record_from_cursor)
446    );
447    let record_from_cursor =
448      reader.next_record().expect("Expected ok result at EOF");
449    assert!(
450      record_from_cursor.is_none(),
451      format!("Expected None, got {:?}", record_from_cursor)
452    );
453  }
454
455  #[test]
456  fn test_read_record_works() {
457    let mut rec = BitRustDataRecord::new();
458    rec.set_timestamp(42);
459    rec.set_key(b"k".to_vec());
460    rec.set_value(b"v".to_vec());
461
462    let mut expected_buf = vec![0u8; payload_size_for_record(&rec) as usize];
463    BigEndian::write_u64(
464      &mut expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
465      rec.compute_size() as u64,
466    );
467    let rec_size_crc = util::checksum_crc32(
468      &expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
469    );
470    BigEndian::write_u32(
471      &mut expected_buf[RECORD_SIZE_CRC_OFFSET..RECORD_DATA_OFFSET],
472      rec_size_crc,
473    );
474    let rec_data = rec.write_to_bytes().expect("Vector of bytes");
475    let record_data_crc_offset =
476      RECORD_DATA_OFFSET + rec.compute_size() as usize;
477    expected_buf[RECORD_DATA_OFFSET..record_data_crc_offset]
478      .copy_from_slice(&rec_data);
479    BigEndian::write_u32(
480      &mut expected_buf[record_data_crc_offset..],
481      util::checksum_crc32(&rec_data),
482    );
483
484    let mut buf_with_padding = vec![0u8; expected_buf.len() + 2];
485    buf_with_padding[1..expected_buf.len() + 1].copy_from_slice(&expected_buf);
486
487    let mut reader = CursorBasedReader {
488      cursor: Cursor::new(buf_with_padding),
489    };
490
491    let read_rec = reader
492      .read_record(1, expected_buf.len() as u64)
493      .expect("Non-error record");
494    assert!(read_rec == rec);
495  }
496
497  #[test]
498  fn test_read_record_fails_for_invalid_payload_spec() {
499    let mut rec = BitRustDataRecord::new();
500    rec.set_timestamp(42);
501    rec.set_key(b"k".to_vec());
502    rec.set_value(b"v".to_vec());
503
504    let mut expected_buf = vec![0u8; payload_size_for_record(&rec) as usize];
505    BigEndian::write_u64(
506      &mut expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
507      rec.compute_size() as u64,
508    );
509    let rec_size_crc = util::checksum_crc32(
510      &expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
511    );
512    BigEndian::write_u32(
513      &mut expected_buf[RECORD_SIZE_CRC_OFFSET..RECORD_DATA_OFFSET],
514      rec_size_crc,
515    );
516    let rec_data = rec.write_to_bytes().expect("Vector of bytes");
517    let record_data_crc_offset =
518      RECORD_DATA_OFFSET + rec.compute_size() as usize;
519    expected_buf[RECORD_DATA_OFFSET..record_data_crc_offset]
520      .copy_from_slice(&rec_data);
521    BigEndian::write_u32(
522      &mut expected_buf[record_data_crc_offset..],
523      util::checksum_crc32(&rec_data),
524    );
525
526    let mut buf_with_padding = vec![0u8; expected_buf.len() + 2];
527    buf_with_padding[1..expected_buf.len() + 1].copy_from_slice(&expected_buf);
528
529    let mut reader = CursorBasedReader {
530      cursor: Cursor::new(buf_with_padding),
531    };
532
533    let read_rec = reader.read_record(0, expected_buf.len() as u64);
534    assert!(read_rec.is_err(), "Expected error, got {:?}", read_rec);
535  }
536}
537
538#[cfg(test)]
539mod record_appender_tests {
540  use super::test_utils::*;
541  use super::*;
542  use bitrust_pb::BitRustDataRecord;
543  extern crate simplelog;
544  extern crate tempfile;
545  use byteorder::{BigEndian, ByteOrder};
546  use std::io::Cursor;
547
548  #[test]
549  fn test_retreat_works() {
550    let mut rec = BitRustDataRecord::new();
551    rec.set_timestamp(42);
552    rec.set_key(b"k".to_vec());
553    rec.set_value(b"v".to_vec());
554
555    let mut writer = CursorBasedWriter {
556      cursor: Cursor::new(vec![]),
557    };
558    let (offset, size) = writer.append_record(&rec).expect("Writing record");
559    assert!((offset, size) == (0, payload_size_for_record(&rec)));
560    writer.retreat(size).expect("Retreat should succeed");
561
562    let mut rec = BitRustDataRecord::new();
563    rec.set_timestamp(42);
564    rec.set_key(b"k2".to_vec());
565    rec.set_value(b"v2".to_vec());
566    writer
567      .append_record(&rec)
568      .expect("Writing record after retreat");
569
570    let mut reader = CursorBasedReader {
571      cursor: Cursor::new(writer.cursor.into_inner()),
572    };
573    let read_rec = reader
574      .next_record()
575      .expect("Read record")
576      .expect("Some record");
577    assert!(read_rec == rec);
578  }
579
580  #[test]
581  fn test_append_record_works() {
582    let mut rec = BitRustDataRecord::new();
583    rec.set_timestamp(42);
584    rec.set_key(b"k".to_vec());
585    rec.set_value(b"v".to_vec());
586
587    let mut writer = CursorBasedWriter {
588      cursor: Cursor::new(vec![]),
589    };
590    assert!(
591      writer
592        .append_record(&rec)
593        .expect("Writing record should succeed")
594        == (0, payload_size_for_record(&rec))
595    );
596
597    let mut expected_buf = vec![0u8; payload_size_for_record(&rec) as usize];
598    BigEndian::write_u64(
599      &mut expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
600      rec.compute_size() as u64,
601    );
602    let rec_size_crc = util::checksum_crc32(
603      &expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
604    );
605    BigEndian::write_u32(
606      &mut expected_buf[RECORD_SIZE_CRC_OFFSET..RECORD_DATA_OFFSET],
607      rec_size_crc,
608    );
609    let rec_data = rec.write_to_bytes().expect("Vector of bytes");
610    let record_data_crc_offset =
611      RECORD_DATA_OFFSET + rec.compute_size() as usize;
612    expected_buf[RECORD_DATA_OFFSET..record_data_crc_offset]
613      .copy_from_slice(&rec_data);
614    BigEndian::write_u32(
615      &mut expected_buf[record_data_crc_offset..],
616      util::checksum_crc32(&rec_data),
617    );
618    let written_buf = writer.cursor.into_inner();
619
620    assert!(
621      written_buf == expected_buf,
622      "Expected {:?}, got {:?}",
623      expected_buf,
624      written_buf
625    );
626  }
627}