mod segment;
use bytes::Bytes;
use fnv::FnvHashMap;
use segment::Segment;
use std::mem;
pub struct Log {
head_offset: u64,
max_segment_size: usize,
max_segments: usize,
active_segment: Segment,
segments: FnvHashMap<u64, Segment>,
}
impl Log {
pub fn new(max_segment_size: usize, max_segments: usize) -> Log {
if max_segment_size < 1024 {
panic!("size should be at least 1KB")
}
Log {
head_offset: 0,
max_segment_size,
max_segments,
segments: FnvHashMap::default(),
active_segment: Segment::new(0),
}
}
pub fn append(&mut self, record: Bytes) -> (u64, u64) {
let mut switch = false;
if self.active_segment.size() >= self.max_segment_size {
let next_offset = self.active_segment.base_offset() + self.active_segment.len() as u64;
let last_active = mem::replace(&mut self.active_segment, Segment::new(next_offset));
self.segments.insert(last_active.base_offset(), last_active);
switch = true;
if self.segments.len() + 1 > self.max_segments {
if let Some(segment) = self.segments.remove(&self.head_offset) {
self.head_offset = segment.base_offset() + segment.len() as u64;
}
}
}
let base_offset = self.active_segment.base_offset();
let offset = self.active_segment.append(record);
if switch {
}
(base_offset, offset)
}
pub fn next_offset(&self) -> (u64, u64) {
let base_offset = self.active_segment.base_offset();
let relative_offset = base_offset + self.active_segment.len() as u64;
(base_offset, relative_offset)
}
pub fn read(&mut self, base_offset: u64, offset: usize) -> Option<Bytes> {
if base_offset == self.active_segment.base_offset() {
return self.active_segment.read(offset as usize);
}
match self.segments.get_mut(&base_offset) {
Some(segment) => segment.read(offset),
None => None,
}
}
pub fn readv(&mut self, segment: u64, offset: u64) -> (Option<u64>, u64, u64, Vec<Bytes>) {
let mut base_offset = segment;
let mut offset = offset;
if base_offset < self.head_offset {
warn!("Trying to read a deleted segment. Jumping");
base_offset = self.head_offset;
offset = self.head_offset;
}
loop {
if base_offset == self.active_segment.base_offset() {
let relative_offset = (offset - base_offset) as usize;
let out = self.active_segment.readv(relative_offset);
let next_record_offset = offset + out.len() as u64;
return (
None,
self.active_segment.base_offset(),
next_record_offset,
out,
);
}
if let Some(segment) = self.segments.get(&base_offset) {
let relative_offset = (offset - base_offset) as usize;
let out = segment.readv(relative_offset);
if !out.is_empty() {
let next_record_offset = offset + out.len() as u64;
let next_segment_offset = segment.base_offset() + segment.len() as u64;
return (
Some(next_segment_offset),
segment.base_offset(),
next_record_offset,
out,
);
} else {
base_offset = segment.base_offset() + segment.len() as u64;
offset = base_offset;
continue;
};
}
}
}
}
#[cfg(test)]
mod test {
use super::Log;
use bytes::Bytes;
use pretty_assertions::assert_eq;
#[test]
fn append_creates_and_deletes_segments_correctly() {
let mut log = Log::new(10 * 1024, 10);
for i in 0..200 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
for i in 200..205 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
let data = log.read(90, 0);
assert!(data.is_none());
let base_offset = 110;
for i in 0..10 {
let data = log.read(base_offset, i).unwrap();
let d = base_offset as u8 + i as u8;
assert_eq!(data[0], d);
}
let base_offset = 190;
for i in 0..10 {
let data = log.read(base_offset, i).unwrap();
let d = base_offset as u8 + i as u8;
assert_eq!(data[0], d);
}
let base_offset = 200;
for i in 0..5 {
let data = log.read(base_offset, i).unwrap();
let d = base_offset as u8 + i as u8;
assert_eq!(data[0], d);
}
let data = log.read(base_offset, 5);
assert!(data.is_none());
}
#[test]
fn vectored_read_works_as_expected() {
let mut log = Log::new(10 * 1024, 10);
for i in 0..90 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
let (jump, base_offset, next_offset, data) = log.readv(0, 0);
assert_eq!(data.len(), 10);
assert_eq!(base_offset, 0);
assert_eq!(next_offset, 10);
assert_eq!(data[base_offset as usize][0], 0);
assert_eq!(data[data.len() - 1][0], 9);
assert_eq!(jump, Some(10));
let data = log.read(50, 0).unwrap();
assert_eq!(data[0], 50);
let (jump, base_offset, next_offset, data) = log.readv(10, 15);
assert_eq!(data.len(), 5);
assert_eq!(base_offset, 10);
assert_eq!(next_offset, 20);
assert_eq!(data[0][0], 15);
assert_eq!(data[data.len() - 1][0], 19);
assert_eq!(jump, Some(20));
let (_, _, _, data) = log.readv(10, 10);
assert_eq!(data.len(), 10);
let (_, _, _, data) = log.readv(10, 15);
assert_eq!(data.len(), 5);
}
#[test]
fn vectored_reads_from_active_segment_works_as_expected() {
let mut log = Log::new(10 * 1024, 10);
for i in 0..200 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
let (jump, segment, offset, data) = log.readv(190, 190);
assert_eq!(data.len(), 10);
assert_eq!(segment, 190);
assert_eq!(offset, 200);
assert!(jump.is_none());
}
#[test]
fn vectored_reads_from_active_segment_resumes_after_empty_reads_correctly() {
let mut log = Log::new(10 * 1024, 10);
for i in 0..85 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
let (jump, segment, offset, data) = log.readv(80, 80);
assert_eq!(data.len(), 5);
assert_eq!(segment, 80);
assert_eq!(offset, 85);
assert!(jump.is_none());
for i in 85..90 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
let (jump, segment, offset, data) = log.readv(segment, offset);
assert_eq!(data.len(), 5);
assert_eq!(segment, 80);
assert_eq!(offset, 90);
assert!(jump.is_none());
}
#[test]
fn last_active_segment_read_jumps_to_next_segment_read_correctly() {
let mut log = Log::new(10 * 1024, 10);
for i in 0..90 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
let (jump, segment, offset, data) = log.readv(80, 80);
assert_eq!(data.len(), 10);
assert_eq!(segment, 80);
assert_eq!(offset, 90);
assert!(jump.is_none());
for i in 90..110 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
let (jump, segment, offset, data) = log.readv(segment, offset);
assert_eq!(data.len(), 10);
assert_eq!(segment, 90);
assert_eq!(offset, 100);
assert_eq!(jump, Some(100));
let (jump, segment, offset, data) = log.readv(segment, offset);
assert_eq!(data.len(), 10);
assert_eq!(segment, 100);
assert_eq!(offset, 110);
assert!(jump.is_none());
}
#[test]
fn vectored_reads_reports_jumps_at_boundary() {
let mut log = Log::new(10 * 1024, 10);
for i in 0..200 {
let payload = vec![i; 1024];
let payload = Bytes::from(payload);
log.append(payload);
}
let (jump, segment, offset, _data) = log.readv(0, 0);
assert_eq!(segment, 100);
assert_eq!(offset, 110);
assert_eq!(jump, Some(110));
}
}