use std::fs::{File, OpenOptions};
use std::io::{self, BufWriter, Write};
use std::path::Path;
use std::path::PathBuf;
pub struct Segment {
file: File,
writer: BufWriter<File>,
size: u64,
next_offset: u64,
max_record_size: u64,
}
impl Segment {
pub fn new<P: AsRef<Path>>(dir: P, base_offset: u64) -> io::Result<Segment> {
let file_name = format!("{:020}.segment", base_offset);
let file_path: PathBuf = dir.as_ref().join(file_name);
let file = OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(&file_path)?;
let metadata = file.metadata()?;
let buf = BufWriter::with_capacity(1024 * 1024, file.try_clone()?);
let size = metadata.len();
let segment = Segment {
file,
writer: buf,
size,
next_offset: 0,
max_record_size: 10 * 1024,
};
Ok(segment)
}
pub fn size(&self) -> u64 {
self.size
}
pub fn set_next_offset(&mut self, next_offset: u64) {
self.next_offset = next_offset;
}
pub fn append(&mut self, record: &[u8]) -> io::Result<(u64, u64)> {
let record_size = record.len() as u64;
if record_size > self.max_record_size {
return Err(io::Error::new(
io::ErrorKind::Other,
"Max record size exceeded",
));
}
self.writer.write_all(record)?;
let position = self.size;
self.size += record.len() as u64;
let offset = self.next_offset;
self.next_offset += 1;
Ok((offset, position))
}
pub fn read(&mut self, position: u64, buf: &mut [u8]) -> io::Result<u64> {
self.writer.flush()?;
self.read_at(position, buf)
}
#[inline]
#[cfg(target_family = "unix")]
fn read_at(&mut self, position: u64, mut buf: &mut [u8]) -> io::Result<u64> {
use std::os::unix::fs::FileExt;
self.file.read_exact_at(&mut buf, position)?;
Ok(buf.len() as u64)
}
#[inline]
#[cfg(target_family = "windows")]
fn read_at(&mut self, position: u64, mut buf: &mut [u8]) -> io::Result<u64> {
use std::io::{Read, Seek, SeekFrom};
self.file.seek(SeekFrom::Start(position))?;
self.file.read_exact(&mut buf)?;
Ok(buf.len() as u64)
}
pub fn close(&mut self) -> io::Result<()> {
self.writer.flush()?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::Segment;
use pretty_assertions::assert_eq;
#[test]
fn second_time_initialization_happens_correctly() {
let record = b"hello timestone commitlog";
let len = record.len();
let dir = tempfile::tempdir().unwrap();
let base_offset = 10;
{
let mut segment = Segment::new(&dir, base_offset).unwrap();
for i in 0..10 {
let (offset, _pos) = segment.append(record).unwrap();
assert_eq!(offset, i)
}
let mut next_pos = 0;
for _i in 0..10 {
let mut data = vec![0; len as usize];
segment.read(next_pos, &mut data).unwrap();
assert_eq!(&data, record);
next_pos += len as u64;
}
segment.close().unwrap();
}
let record = b"iello timestone commitlog";
{
let mut segment = Segment::new(&dir, base_offset).unwrap();
segment.set_next_offset(10);
let mut position = 0;
for i in 10..20 {
let (offset, pos) = segment.append(record).unwrap();
position = pos;
assert_eq!(offset, i)
}
let mut data = vec![0; len];
segment.read(position, &mut data).unwrap();
assert_eq!(&data, record);
segment.close().unwrap();
}
}
}