use std::{fs, io, mem::swap};
use log::{trace, info, warn, error};
use super::{index::*, segment::*, LogOptions};
use std::collections::BTreeMap;
pub struct FileSet {
active: (Index, Segment),
closed: BTreeMap<u64, (Index, Segment)>,
opts: LogOptions,
}
impl FileSet {
pub fn load_log(opts: LogOptions) -> io::Result<FileSet> {
let mut segments = BTreeMap::new();
let mut indexes = BTreeMap::new();
let files = fs::read_dir(&opts.log_dir)?
.filter_map(|e| e.ok())
.filter(|e| e.metadata().map(|m| m.is_file()).unwrap_or(false));
for f in files {
match f.path().extension() {
Some(ext) if SEGMENT_FILE_NAME_EXTENSION.eq(ext) => {
let segment = match Segment::open(f.path(), opts.log_max_bytes) {
Ok(seg) => seg,
Err(e) => {
error!("Unable to open segment {:?}: {}", f.path(), e);
return Err(e);
}
};
let offset = segment.starting_offset();
segments.insert(offset, segment);
}
Some(ext) if INDEX_FILE_NAME_EXTENSION.eq(ext) => {
let index = match Index::open(f.path()) {
Ok(ind) => ind,
Err(e) => {
error!("Unable to open index {:?}: {}", f.path(), e);
return Err(e);
}
};
let offset = index.starting_offset();
indexes.insert(offset, index);
}
_ => {}
}
}
let mut closed = segments
.into_iter()
.map(move |(i, s)| {
match indexes.remove(&i) {
Some(v) => (i, (v, s)),
None => {
panic!("No index found for segment starting at {}", i);
}
}
})
.collect::<BTreeMap<u64, (Index, Segment)>>();
let last_entry = closed.keys().next_back().cloned();
let (ind, seg) = match last_entry {
Some(off) => {
info!("Reusing index and segment starting at offset {}", off);
closed.remove(&off).unwrap()
}
None => {
info!("Starting new index and segment at offset 0");
let ind = Index::new(&opts.log_dir, 0, opts.index_max_bytes)?;
let seg = Segment::new(&opts.log_dir, 0, opts.log_max_bytes)?;
(ind, seg)
}
};
for &mut (ref mut ind, _) in closed.values_mut() {
ind.set_readonly()?;
}
Ok(FileSet { active: (ind, seg), closed, opts })
}
pub fn active_segment_mut(&mut self) -> &mut Segment {
&mut self.active.1
}
pub fn active_index_mut(&mut self) -> &mut Index {
&mut self.active.0
}
pub fn active_index(&self) -> &Index {
&self.active.0
}
pub fn find(&self, offset: u64) -> Option<&(Index, Segment)> {
let active_seg_start_off = self.active.0.starting_offset();
if offset >= active_seg_start_off {
trace!("Index is contained in the active index for offset {}", offset);
Some(&self.active)
} else {
self.closed.range(..=offset).next_back().map(|p| p.1)
}
}
pub fn roll_segment(&mut self) -> io::Result<()> {
self.active.0.set_readonly()?;
self.active.1.flush_sync()?;
let next_offset = self.active.0.next_offset();
info!("Starting new segment and index at offset {}", next_offset);
let mut p = {
let seg = Segment::new(&self.opts.log_dir, next_offset, self.opts.log_max_bytes)?;
let ind = Index::new(&self.opts.log_dir, next_offset, self.opts.index_max_bytes)?;
(ind, seg)
};
swap(&mut p, &mut self.active);
self.closed.insert(p.1.starting_offset(), p);
Ok(())
}
pub fn take_after(&mut self, offset: u64) -> Vec<(Index, Segment)> {
if offset >= self.active.0.starting_offset() {
return vec![];
}
let split_key = match self.closed.range(..=offset).next_back().map(|p| p.0).cloned() {
Some(key) => {
trace!("File set split key for truncation {}", key);
key
}
None => {
warn!("Split key before offset {} found", offset);
return vec![];
}
};
let mut after = self.closed.split_off(&split_key);
let mut active = after.remove(&split_key).unwrap();
trace!("Setting active to segment starting {}", active.0.starting_offset());
assert!(active.0.starting_offset() <= offset);
swap(&mut active, &mut self.active);
let mut pairs = after.into_iter().map(|p| p.1).collect::<Vec<_>>();
pairs.push(active);
pairs
}
pub fn log_options(&self) -> &LogOptions {
&self.opts
}
}