use super::message::*;
use super::reader::*;
use super::Offset;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Write};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
pub static SEGMENT_FILE_NAME_LEN: usize = 20;
pub static SEGMENT_FILE_NAME_EXTENSION: &'static str = "log";
pub static VERSION_1_MAGIC: [u8; 2] = [0xff, 0xff];
pub struct Segment {
file: File,
path: PathBuf,
base_offset: u64,
write_pos: usize,
max_bytes: usize,
}
#[derive(Debug)]
pub enum SegmentAppendError {
LogFull,
IoError(io::Error),
}
impl From<io::Error> for SegmentAppendError {
#[inline]
fn from(e: io::Error) -> SegmentAppendError {
SegmentAppendError::IoError(e)
}
}
impl Segment {
pub fn new<P>(log_dir: P, base_offset: u64, max_bytes: usize) -> io::Result<Segment>
where
P: AsRef<Path>,
{
let log_path = {
let mut path_buf = PathBuf::new();
path_buf.push(&log_dir);
path_buf.push(format!("{:020}", base_offset));
path_buf.set_extension(SEGMENT_FILE_NAME_EXTENSION);
path_buf
};
let mut f = OpenOptions::new()
.write(true)
.read(true)
.create_new(true)
.append(true)
.open(&log_path)?;
f.write_all(&VERSION_1_MAGIC)?;
Ok(Segment {
file: f,
path: log_path,
base_offset,
write_pos: 2,
max_bytes,
})
}
pub fn open<P>(seg_path: P, max_bytes: usize) -> io::Result<Segment>
where
P: AsRef<Path>,
{
let seg_file = OpenOptions::new()
.read(true)
.write(true)
.append(true)
.open(&seg_path)?;
let filename = seg_path.as_ref().file_name().unwrap().to_str().unwrap();
let base_offset = match u64::from_str_radix(&filename[0..SEGMENT_FILE_NAME_LEN], 10) {
Ok(v) => v,
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Segment file name does not parse as u64",
))
}
};
let meta = seg_file.metadata()?;
{
let mut bytes = [0u8; 2];
let size = seg_file.read_at(&mut bytes, 0)?;
if size < 2 || bytes != VERSION_1_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Segment file {} does not contain Version 1 \
magic",
filename
),
));
}
}
info!("Opened segment {}", filename);
Ok(Segment {
file: seg_file,
path: seg_path.as_ref().to_path_buf(),
write_pos: meta.len() as usize,
base_offset,
max_bytes,
})
}
pub fn size(&self) -> usize {
self.write_pos
}
#[inline]
pub fn starting_offset(&self) -> u64 {
self.base_offset
}
pub fn append<T: MessageSetMut>(
&mut self,
payload: &mut T,
starting_offset: Offset,
) -> Result<Vec<LogEntryMetadata>, SegmentAppendError> {
let payload_len = payload.bytes().len();
if payload_len + self.write_pos > self.max_bytes {
return Err(SegmentAppendError::LogFull);
}
let meta = super::message::set_offsets(payload, starting_offset, self.write_pos);
self.file.write_all(payload.bytes())?;
self.write_pos += payload_len;
Ok(meta)
}
pub fn flush_sync(&mut self) -> io::Result<()> {
self.file.flush()
}
pub fn read_slice<T: LogSliceReader>(
&self,
reader: &mut T,
file_pos: u32,
bytes: u32,
) -> Result<T::Result, MessageError> {
reader.read_from(&self.file, file_pos, bytes as usize)
}
pub fn remove(self) -> io::Result<()> {
let path = self.path.clone();
drop(self);
info!("Removing segment file {}", path.display());
fs::remove_file(path)
}
pub fn truncate(&mut self, length: u32) -> io::Result<()> {
self.file.set_len(u64::from(length))?;
self.write_pos = length as usize;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::super::testutil::*;
use super::*;
use std::fs;
use std::path::PathBuf;
use test::Bencher;
#[test]
pub fn log_append() {
let path = TestDir::new();
let mut f = Segment::new(path, 0, 1024).unwrap();
{
let mut buf = MessageBuf::default();
buf.push("12345");
let meta = f.append(&mut buf, 5).unwrap();
assert_eq!(1, meta.len());
let p0 = meta.iter().next().unwrap();
assert_eq!(p0.offset, 5);
assert_eq!(p0.file_pos, 2);
}
{
let mut buf = MessageBuf::default();
buf.push("66666");
buf.push("77777");
let meta = f.append(&mut buf, 6).unwrap();
assert_eq!(2, meta.len());
let mut it = meta.iter();
let p0 = it.next().unwrap();
assert_eq!(p0.offset, 6);
assert_eq!(p0.file_pos, 27);
let p1 = it.next().unwrap();
assert_eq!(p1.offset, 7);
assert_eq!(p1.file_pos, 52);
}
f.flush_sync().unwrap();
}
#[test]
pub fn log_open() {
let log_dir = TestDir::new();
{
let mut f = Segment::new(&log_dir, 0, 1024).unwrap();
let mut buf = MessageBuf::default();
buf.push("12345");
buf.push("66666");
f.append(&mut buf, 0).unwrap();
f.flush_sync().unwrap();
}
{
let mut path_buf = PathBuf::new();
path_buf.push(&log_dir);
path_buf.push(format!("{:020}", 0));
path_buf.set_extension(SEGMENT_FILE_NAME_EXTENSION);
let res = Segment::open(&path_buf, 1024);
assert!(res.is_ok(), "Err {:?}", res.err());
let f = res.unwrap();
assert_eq!(0, f.starting_offset());
}
}
#[test]
pub fn log_read() {
let log_dir = TestDir::new();
let mut f = Segment::new(&log_dir, 0, 1024).unwrap();
{
let mut buf = MessageBuf::default();
buf.push("0123456789");
buf.push("aaaaaaaaaa");
buf.push("abc");
f.append(&mut buf, 0).unwrap();
}
let mut reader = MessageBufReader;
let msgs = f.read_slice(&mut reader, 2, 83).unwrap();
assert_eq!(3, msgs.len());
for (i, m) in msgs.iter().enumerate() {
assert_eq!(i as u64, m.offset());
}
}
#[test]
pub fn log_read_with_size_limit() {
let log_dir = TestDir::new();
let mut f = Segment::new(&log_dir, 0, 1024).unwrap();
let meta = {
let mut buf = MessageBuf::default();
buf.push("0123456789");
buf.push("aaaaaaaaaa");
buf.push("abc");
f.append(&mut buf, 0).unwrap()
};
let mut reader = MessageBufReader;
let msgs = f.read_slice(&mut reader, 2, meta[1].file_pos - 2).unwrap();
assert_eq!(1, msgs.len());
}
#[test]
pub fn log_read_from_write() {
let log_dir = TestDir::new();
let mut f = Segment::new(&log_dir, 0, 1024).unwrap();
{
let mut buf = MessageBuf::default();
buf.push("0123456789");
buf.push("aaaaaaaaaa");
buf.push("abc");
f.append(&mut buf, 0).unwrap();
}
let mut reader = MessageBufReader;
let msgs = f.read_slice(&mut reader, 2, 83).unwrap();
assert_eq!(3, msgs.len());
{
let mut buf = MessageBuf::default();
buf.push("foo");
f.append(&mut buf, 3).unwrap();
}
let msgs = f.read_slice(&mut reader, 2, 106).unwrap();
assert_eq!(4, msgs.len());
for (i, m) in msgs.iter().enumerate() {
assert_eq!(i as u64, m.offset());
}
}
#[test]
pub fn log_remove() {
let log_dir = TestDir::new();
let f = Segment::new(&log_dir, 0, 1024).unwrap();
let seg_exists = fs::read_dir(&log_dir)
.unwrap()
.find(|entry| {
let path = entry.as_ref().unwrap().path();
path.file_name().unwrap() == "00000000000000000000.log"
})
.is_some();
assert!(seg_exists, "Segment file does not exist?");
f.remove().unwrap();
let seg_exists = fs::read_dir(&log_dir)
.unwrap()
.find(|entry| {
let path = entry.as_ref().unwrap().path();
path.file_name().unwrap() == "00000000000000000000.log"
})
.is_some();
assert!(!seg_exists, "Segment file should have been removed");
}
#[test]
pub fn log_truncate() {
let log_dir = TestDir::new();
let mut f = Segment::new(&log_dir, 0, 1024).unwrap();
let meta = {
let mut buf = MessageBuf::default();
buf.push("0123456789");
buf.push("aaaaaaaaaa");
buf.push("abc");
f.append(&mut buf, 0).unwrap()
};
let mut reader = MessageBufReader;
let msg_buf = f.read_slice(&mut reader, 2, f.size() as u32 - 2)
.expect("Read after first append failed");
assert_eq!(3, msg_buf.len());
f.truncate(meta[1].file_pos).unwrap();
assert_eq!(meta[1].file_pos as usize, f.size());
let size = fs::metadata(&f.path).unwrap().len();
assert_eq!(meta[1].file_pos as u64, size);
let meta2 = {
let mut buf = MessageBuf::default();
buf.push("zzzzzzzzzz");
f.append(&mut buf, 1).unwrap()
};
assert_eq!(meta[1].file_pos, meta2[0].file_pos);
let size = fs::metadata(&f.path).unwrap().len();
assert_eq!(f.size() as u64, size);
let mut reader = MessageBufReader;
let msg_buf = f.read_slice(&mut reader, 2, f.size() as u32 - 2)
.expect("Read after second append failed");
assert_eq!(2, msg_buf.len());
}
#[bench]
fn bench_segment_append(b: &mut Bencher) {
let path = TestDir::new();
let mut seg = Segment::new(path, 100u64, 100 * 1024 * 1024).unwrap();
let payload = b"01234567891011121314151617181920";
b.iter(|| {
let mut buf = MessageBuf::default();
buf.push(payload);
seg.append(&mut buf, 0).unwrap();
});
}
}