#![feature(test, btree_range, collections_bound)]
#![allow(unused_features)]
#[macro_use]
extern crate log;
extern crate crc;
extern crate memmap;
extern crate byteorder;
#[cfg(test)]
extern crate env_logger;
#[cfg(test)]
extern crate test;
#[cfg(test)]
extern crate rand;
mod segment;
mod index;
#[cfg(test)]
mod testutil;
use std::collections::{Bound, BTreeMap};
use std::path::{Path, PathBuf};
use std::fs;
use std::io;
use std::mem::swap;
use segment::{Segment, SegmentAppendError};
use index::*;
pub use segment::ReadLimit;
pub use segment::Message;
#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Debug)]
pub struct Offset(pub u64);
#[derive(Debug)]
pub enum AppendError {
Io(io::Error),
FreshIndexNotWritable,
FreshSegmentNotWritable,
}
impl From<io::Error> for AppendError {
fn from(e: io::Error) -> AppendError {
AppendError::Io(e)
}
}
#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord, Debug)]
pub enum ReadPosition {
Beginning,
Offset(Offset), }
#[derive(Debug)]
pub enum ReadError {
Io(io::Error),
CorruptLog,
}
impl From<io::Error> for ReadError {
fn from(e: io::Error) -> ReadError {
ReadError::Io(e)
}
}
impl From<segment::MessageError> for ReadError {
fn from(e: segment::MessageError) -> ReadError {
match e {
segment::MessageError::IoError(e) => ReadError::Io(e),
segment::MessageError::InvalidCRC => ReadError::CorruptLog,
}
}
}
#[derive(Clone, Debug)]
pub struct LogOptions {
log_dir: PathBuf,
log_max_bytes: usize,
index_max_bytes: usize,
}
impl LogOptions {
pub fn new<P>(log_dir: P) -> LogOptions
where P: AsRef<Path>
{
LogOptions {
log_dir: log_dir.as_ref().to_owned(),
log_max_bytes: 1_000_000_000,
index_max_bytes: 800_000,
}
}
#[inline]
pub fn segment_max_bytes(&mut self, bytes: usize) -> &mut LogOptions {
self.log_max_bytes = bytes;
self
}
#[inline]
pub fn index_max_items(&mut self, items: usize) -> &mut LogOptions {
self.index_max_bytes = items * INDEX_ENTRY_BYTES;
self
}
}
pub struct CommitLog {
closed_segments: BTreeMap<u64, Segment>,
closed_indexes: BTreeMap<u64, Index>,
active_segment: Segment,
active_index: Index,
options: LogOptions,
}
impl CommitLog {
pub fn new(opts: LogOptions) -> io::Result<CommitLog> {
fs::create_dir_all(&opts.log_dir).unwrap_or(());
info!("Opening log in directory {:?}", &opts.log_dir.to_str());
let (closed_segments, closed_indexes) = CommitLog::load_log(&opts.log_dir)?;
let next_offset = closed_indexes.values()
.next_back()
.and_then(|ind| ind.last_entry())
.map(|e| e.offset() + 1)
.unwrap_or(0u64);
info!("Starting fresh segment and index at offset {}", next_offset);
let seg = Segment::new(&opts.log_dir, next_offset, opts.log_max_bytes)?;
let ind = Index::new(&opts.log_dir, next_offset, opts.index_max_bytes)?;
Ok(CommitLog {
closed_segments: closed_segments,
closed_indexes: closed_indexes,
active_segment: seg,
active_index: ind,
options: opts,
})
}
fn load_log<P>(dir: P) -> io::Result<(BTreeMap<u64, Segment>, BTreeMap<u64, Index>)>
where P: AsRef<Path>
{
let mut segments = BTreeMap::new();
let mut indexes = BTreeMap::new();
let files = fs::read_dir(dir)?
.filter_map(|e| e.ok())
.filter(|e| e.metadata().map(|m| m.is_file()).unwrap_or(false));
for f in files {
match f.path().extension() {
Some(ext) if segment::SEGMENT_FILE_NAME_EXTENSION.eq(ext) => {
let segment = match Segment::open(f.path()) {
Ok(seg) => seg,
Err(e) => {
error!("Unable to open segment {:?}: {}", f.path(), e);
return Err(e);
}
};
let offset = segment.starting_offset();
segments.insert(offset, segment);
}
Some(ext) if index::INDEX_FILE_NAME_EXTENSION.eq(ext) => {
let index = match Index::open(f.path()) {
Ok(ind) => ind,
Err(e) => {
error!("Unable to open index {:?}: {}", f.path(), e);
return Err(e);
}
};
let offset = index.starting_offset();
indexes.insert(offset, index);
}
_ => {}
}
}
Ok((segments, indexes))
}
pub fn append(&mut self, payload: &[u8]) -> Result<Offset, AppendError> {
let meta = self.active_segment
.append(payload)
.or_else(|e| {
match e {
SegmentAppendError::LogFull => {
self.active_segment.flush_sync()?;
let next_offset = self.active_segment.next_offset();
info!("Starting new segment at offset {}", next_offset);
let mut seg = Segment::new(&self.options.log_dir,
next_offset,
self.options.log_max_bytes)?;
swap(&mut seg, &mut self.active_segment);
self.closed_segments.insert(seg.starting_offset(), seg);
self.active_segment
.append(payload)
.map_err(|_| AppendError::FreshSegmentNotWritable)
}
SegmentAppendError::IoError(e) => Err(AppendError::Io(e)),
}
})?;
self.active_index
.append(meta.offset(), meta.file_pos())
.or_else(|e| {
match e {
IndexWriteError::IndexFull => {
info!("Starting new index at offset {}", meta.offset());
try!(self.active_index.set_readonly());
let mut ind = try!(Index::new(&self.options.log_dir,
meta.offset(),
self.options.index_max_bytes));
swap(&mut ind, &mut self.active_index);
self.closed_indexes.insert(ind.starting_offset(), ind);
self.active_index
.append(meta.offset(), meta.file_pos())
.map_err(|_| AppendError::FreshIndexNotWritable)
}
IndexWriteError::OffsetLessThanBase => unreachable!(),
}
})?;
Ok(Offset(meta.offset()))
}
pub fn read(&mut self,
start: ReadPosition,
limit: ReadLimit)
-> Result<Vec<Message>, ReadError> {
let start_off = match start {
ReadPosition::Beginning => 0,
ReadPosition::Offset(Offset(v)) => v,
};
let active_start_off = self.active_index.starting_offset();
let index_entry_res = if start_off >= active_start_off {
trace!("Reading offset {} from active index", start_off);
self.active_index.find(start_off)
} else {
trace!("Reading offset {} from old index", start_off);
let found_index = self.closed_indexes
.range(Bound::Unbounded, Bound::Included(&start_off))
.next_back();
found_index.and_then(|(_, i)| i.find(start_off))
};
let file_pos = match index_entry_res {
Some(e) => e.file_position(),
None => {
info!("No index entry found for {}", start_off);
return Ok(Vec::with_capacity(0));
}
};
let active_seg_start_off = self.active_segment.starting_offset();
if start_off >= active_seg_start_off {
trace!("Reading from active index at file pos {}", file_pos);
Ok(self.active_segment.read(file_pos, limit)?)
} else {
let mut r = self.closed_segments
.range_mut(Bound::Unbounded, Bound::Included(&start_off));
match r.next_back() {
Some((_, ref mut s)) => {
trace!("Reading messages from old index at file pos {}", file_pos);
Ok(s.read(file_pos, limit)?)
}
_ => {
warn!("No segment found for offset {}", start_off);
Ok(Vec::with_capacity(0))
}
}
}
}
pub fn flush(&mut self) -> io::Result<()> {
self.active_segment.flush_sync()?;
self.active_index.flush_sync()
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::testutil::*;
use std::fs;
use std::collections::HashSet;
use env_logger;
#[test]
pub fn append() {
let dir = TestDir::new();
let mut log = CommitLog::new(LogOptions::new(&dir)).unwrap();
assert_eq!(log.append(b"123456").unwrap(), Offset(0));
assert_eq!(log.append(b"abcdefg").unwrap(), Offset(1));
assert_eq!(log.append(b"foobarbaz").unwrap(), Offset(2));
assert_eq!(log.append(b"bing").unwrap(), Offset(3));
log.flush().unwrap();
}
#[test]
pub fn append_new_segment() {
let dir = TestDir::new();
let mut opts = LogOptions::new(&dir);
opts.segment_max_bytes(52);
{
let mut log = CommitLog::new(opts).unwrap();
log.append(b"0123456789").unwrap();
log.append(b"0123456789").unwrap();
log.append(b"0123456789").unwrap();
log.flush().unwrap();
}
let files = fs::read_dir(&dir)
.unwrap()
.map(|e| e.unwrap().path().file_name().unwrap().to_str().unwrap().to_string())
.collect::<HashSet<String>>();
let expected =
["00000000000000000000.index", "00000000000000000000.log", "00000000000000000002.log"]
.iter()
.cloned()
.map(|s| s.to_string())
.collect::<HashSet<String>>();
assert_eq!(files.intersection(&expected).count(), 3);
}
#[test]
pub fn append_new_index() {
let dir = TestDir::new();
let mut opts = LogOptions::new(&dir);
opts.index_max_items(2);
{
let mut log = CommitLog::new(opts).unwrap();
log.append(b"0123456789").unwrap();
log.append(b"0123456789").unwrap();
log.append(b"0123456789").unwrap();
log.flush().unwrap();
}
let files = fs::read_dir(&dir)
.unwrap()
.map(|e| e.unwrap().path().file_name().unwrap().to_str().unwrap().to_string())
.collect::<HashSet<String>>();
let expected = ["00000000000000000000.index",
"00000000000000000000.log",
"00000000000000000002.index"]
.iter()
.cloned()
.map(|s| s.to_string())
.collect::<HashSet<String>>();
assert_eq!(files.intersection(&expected).count(), 3);
}
#[test]
pub fn read_entries() {
env_logger::init().unwrap_or(());
let dir = TestDir::new();
let mut opts = LogOptions::new(&dir);
opts.index_max_items(20);
opts.segment_max_bytes(1000);
let mut log = CommitLog::new(opts).unwrap();
for i in 0..100 {
let s = format!("some data {}", i);
log.append(s.as_bytes()).unwrap();
}
log.flush().unwrap();
{
let active_index_read =
log.read(ReadPosition::Offset(Offset(82)), ReadLimit::Messages(5)).unwrap();
assert_eq!(5, active_index_read.len());
assert_eq!(vec![82, 83, 84, 85, 86],
active_index_read.into_iter().map(|v| v.offset()).collect::<Vec<_>>());
}
{
let old_index_read = log.read(ReadPosition::Offset(Offset(5)), ReadLimit::Messages(5))
.unwrap();
assert_eq!(5, old_index_read.len());
assert_eq!(vec![5, 6, 7, 8, 9],
old_index_read.into_iter().map(|v| v.offset()).collect::<Vec<_>>());
}
{
let boundary_read = log.read(ReadPosition::Offset(Offset(33)), ReadLimit::Messages(5))
.unwrap();
assert_eq!(3, boundary_read.len());
assert_eq!(vec![33, 34, 35],
boundary_read.into_iter().map(|v| v.offset()).collect::<Vec<_>>());
}
}
#[test]
pub fn reopen_log() {
env_logger::init().unwrap_or(());
let dir = TestDir::new();
let mut opts = LogOptions::new(&dir);
opts.index_max_items(20);
opts.segment_max_bytes(1000);
{
let mut log = CommitLog::new(opts.clone()).unwrap();
for i in 0..99 {
let s = format!("some data {}", i);
let Offset(off) = log.append(s.as_bytes()).unwrap();
assert_eq!(i, off);
}
log.flush().unwrap();
}
{
let mut log = CommitLog::new(opts).unwrap();
let active_index_read =
log.read(ReadPosition::Offset(Offset(82)), ReadLimit::Messages(5)).unwrap();
assert_eq!(5, active_index_read.len());
assert_eq!(vec![82, 83, 84, 85, 86],
active_index_read.into_iter().map(|v| v.offset()).collect::<Vec<_>>());
let Offset(off) = log.append(b"moar data").unwrap();
assert_eq!(99, off);
}
}
}