pub mod segment;
use self::segment::{SegBytes, SegStr, Segment};
use crate::{index::BoxedStream, LineIndex, LineSet, Result};
use lru::LruCache;
use std::cell::RefCell;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::mpsc::{Receiver, TryRecvError};
use std::sync::Arc;
pub struct SegBuffer {
index: LineIndex,
repr: BufferRepr,
}
struct StreamInner {
pending_segs: Option<Receiver<Segment>>,
segments: Vec<Arc<Segment>>,
}
enum BufferRepr {
File {
file: File,
len: u64,
segments: RefCell<LruCache<usize, Arc<Segment>>>,
},
Stream(RefCell<StreamInner>),
}
impl BufferRepr {
fn fetch(&self, seg_id: usize) -> Option<Arc<Segment>> {
match self {
BufferRepr::File {
file,
len,
segments,
} => {
let range = Segment::data_range_of_id(seg_id);
let range = range.start..range.end.min(*len);
Some(
segments
.borrow_mut()
.get_or_insert(seg_id, || {
Arc::new(Segment::map_file(range, file).expect("mmap was successful"))
})
.clone(),
)
}
BufferRepr::Stream(inner) => {
let StreamInner {
pending_segs,
segments,
} = &mut *inner.borrow_mut();
if let Some(rx) = pending_segs {
loop {
match rx.try_recv() {
Ok(segment) => segments.push(Arc::new(segment)),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
*pending_segs = None;
break;
}
}
}
}
segments.get(seg_id).cloned()
}
}
}
}
impl SegBuffer {
pub fn read_file(file: File, seg_count: NonZeroUsize, complete: bool) -> Result<Self> {
let index = LineIndex::read_file(file.try_clone()?, complete)?;
Ok(Self {
index,
repr: BufferRepr::File {
len: file.metadata()?.len(),
file,
segments: RefCell::new(LruCache::new(seg_count)),
},
})
}
pub fn read_stream(stream: BoxedStream, complete: bool) -> Result<Self> {
let (sx, rx) = std::sync::mpsc::channel();
let index = LineIndex::read_stream(stream, sx, complete)?;
Ok(Self {
index,
repr: BufferRepr::Stream(RefCell::new(StreamInner {
pending_segs: Some(rx),
segments: Vec::new(),
})),
})
}
#[inline]
pub fn line_count(&self) -> usize {
self.index.line_count()
}
#[inline]
pub fn index(&self) -> &LineIndex {
&self.index
}
pub fn get_bytes(&self, line_number: usize) -> Option<SegBytes> {
assert!(line_number <= self.line_count());
let data_start = self.index.data_of_line(line_number)?;
let data_end = self.index.data_of_line(line_number + 1)?;
let seg_start = Segment::id_of_data(data_start);
let seg_end = Segment::id_of_data(data_end);
if seg_start == seg_end {
let seg = self.repr.fetch(seg_start)?;
let range = seg.translate_inner_data_range(data_start, data_end);
Some(seg.get_bytes(range))
} else {
debug_assert!(seg_start < seg_end);
let mut buf = Vec::with_capacity((data_end - data_start) as usize);
let seg_first = self.repr.fetch(seg_start)?;
let seg_last = self.repr.fetch(seg_end)?;
let (start, end) = (
seg_first.translate_inner_data_index(data_start),
seg_last.translate_inner_data_index(data_end),
);
buf.extend_from_slice(&seg_first[start as usize..]);
for seg_id in seg_start + 1..seg_end {
buf.extend_from_slice(&self.repr.fetch(seg_id)?);
}
buf.extend_from_slice(&seg_last[..end as usize]);
Some(SegBytes::new_owned(buf))
}
}
pub fn get_line(&self, line_number: usize) -> Option<SegStr> {
Some(SegStr::from_bytes(self.get_bytes(line_number)?))
}
pub fn segment_iter(&self) -> Result<ContiguousSegmentIterator> {
match &self.repr {
BufferRepr::File { file, len, .. } => Ok(ContiguousSegmentIterator::new(
self.index.clone(),
0..self.index.line_count(),
BufferRepr::File {
file: file.try_clone()?,
len: *len,
segments: RefCell::new(LruCache::new(NonZeroUsize::new(2).unwrap())),
},
)),
BufferRepr::Stream(inner) => Ok(ContiguousSegmentIterator::new(
self.index.clone(),
0..self.index.line_count(),
BufferRepr::Stream(RefCell::new(StreamInner {
pending_segs: None,
segments: inner.borrow().segments.clone(),
})),
)),
}
}
pub fn all_line_matches(&self) -> LineSet {
LineSet::all(self.index.clone())
}
pub fn write_file(&mut self, output: File, lines: LineSet) -> Result<()> {
if !lines.is_complete() {
return Err(crate::err::Error::InProgress);
}
match lines.snapshot() {
Some(snap) => {
let mut writer = BufWriter::new(output);
for &ln in snap.iter() {
let line = self.get_bytes(ln).unwrap();
writer.write_all(line.as_bytes())?;
}
}
None => match &mut self.repr {
BufferRepr::File { ref file, .. } => {
let mut output = output;
std::io::copy(&mut file.try_clone()?, &mut output)?;
}
BufferRepr::Stream(inner) => {
let mut writer = BufWriter::new(output);
let inner = inner.borrow();
for seg in inner.segments.iter() {
writer.write_all(seg)?;
}
}
},
}
Ok(())
}
}
pub struct ContiguousSegmentIterator {
pub index: LineIndex,
repr: BufferRepr,
line_range: Range<usize>,
imm_buf: Vec<u8>,
imm_seg: Option<Arc<Segment>>,
}
impl ContiguousSegmentIterator {
fn new(index: LineIndex, line_range: Range<usize>, repr: BufferRepr) -> Self {
Self {
line_range,
index,
repr,
imm_buf: Vec::new(),
imm_seg: None,
}
}
#[inline]
pub fn remaining_range(&self) -> Range<usize> {
self.line_range.clone()
}
pub fn next_buf(&mut self) -> Option<(&LineIndex, u64, &[u8])> {
if self.line_range.is_empty() {
return None;
}
let curr_line = self.line_range.start;
let curr_line_data_start = self.index.data_of_line(curr_line)?;
let curr_line_data_end = self.index.data_of_line(curr_line + 1)?;
let curr_line_seg_start = Segment::id_of_data(curr_line_data_start);
let curr_line_seg_end = Segment::id_of_data(curr_line_data_end);
if curr_line_seg_end != curr_line_seg_start {
self.imm_buf.clear();
self.imm_buf
.reserve((curr_line_data_end - curr_line_data_start) as usize);
let seg_first = self.repr.fetch(curr_line_seg_start)?;
let seg_last = self.repr.fetch(curr_line_seg_end)?;
let (start, end) = (
seg_first.translate_inner_data_index(curr_line_data_start),
seg_last.translate_inner_data_index(curr_line_data_end),
);
self.imm_buf.extend_from_slice(&seg_first[start as usize..]);
for seg_id in curr_line_seg_start + 1..curr_line_seg_end {
self.imm_buf.extend_from_slice(&self.repr.fetch(seg_id)?);
}
self.imm_buf.extend_from_slice(&seg_last[..end as usize]);
self.line_range.start += 1;
Some((&self.index, curr_line_data_start, &self.imm_buf))
} else {
let curr_seg_data_start = curr_line_seg_start as u64 * Segment::MAX_SIZE;
let curr_seg_data_end = curr_seg_data_start + Segment::MAX_SIZE;
let line_end = self
.index
.line_of_data(curr_seg_data_end)
.unwrap_or_else(|| self.index.line_count())
.min(self.line_range.end);
let line_end_data_start = self.index.data_of_line(line_end)?;
let segment = self.repr.fetch(curr_line_seg_start)?;
let range =
segment.translate_inner_data_range(curr_line_data_start, line_end_data_start);
assert!(line_end_data_start - curr_seg_data_start <= Segment::MAX_SIZE);
assert!(range.end <= Segment::MAX_SIZE);
self.line_range.start = line_end;
let segment = self.imm_seg.insert(segment);
Some((
&self.index,
curr_line_data_start,
&segment[range.start as usize..range.end as usize],
))
}
}
}
#[cfg(test)]
mod test {
use anyhow::Result;
use std::{
fs::File,
io::{BufReader, Read},
num::NonZeroUsize,
};
use crate::buf::SegBuffer;
#[test]
fn file_stream_consistency_1() -> Result<()> {
file_stream_consistency_base(File::open("../../tests/test_10.log")?, 10)
}
#[test]
fn file_stream_consistency_2() -> Result<()> {
file_stream_consistency_base(File::open("../../tests/test_50_long.log")?, 50)
}
#[test]
fn file_stream_consistency_3() -> Result<()> {
file_stream_consistency_base(File::open("../../tests/test_5000000.log")?, 5_000_000)
}
fn file_stream_consistency_base(file: File, line_count: usize) -> Result<()> {
let stream = BufReader::new(file.try_clone()?);
let file_index = SegBuffer::read_file(file, NonZeroUsize::new(25).unwrap(), true)?;
let stream_index = SegBuffer::read_stream(Box::new(stream), true)?;
assert_eq!(file_index.line_count(), stream_index.line_count());
assert_eq!(file_index.line_count(), line_count);
for i in 0..file_index.line_count() {
assert_eq!(
file_index.get_line(i).unwrap().as_str(),
stream_index.get_line(i).unwrap().as_str()
);
}
Ok(())
}
#[test]
#[cfg_attr(miri, ignore)]
fn multi_buffer_consistency_1() -> Result<()> {
multi_buffer_consistency_base(File::open("../../tests/test_10.log")?)
}
#[test]
#[cfg_attr(miri, ignore)]
fn multi_buffer_consistency_2() -> Result<()> {
multi_buffer_consistency_base(File::open("../../tests/test_50_long.log")?)
}
#[test]
#[cfg_attr(miri, ignore)]
fn multi_buffer_consistency_3() -> Result<()> {
multi_buffer_consistency_base(File::open("../../tests/test_5000000.log")?)
}
fn multi_buffer_consistency_base(file: File) -> Result<()> {
let file_len = file.metadata()?.len();
let mut reader = BufReader::new(file.try_clone()?);
let file_buffer = SegBuffer::read_file(file, NonZeroUsize::new(25).unwrap(), true)?;
let mut buffers = file_buffer.segment_iter()?;
let mut total_bytes = 0;
let mut validate_buf = Vec::new();
while let Some((_, start, buf)) = buffers.next_buf() {
assert_eq!(start, total_bytes);
total_bytes += buf.len() as u64;
validate_buf.resize(buf.len(), 0);
reader.read_exact(&mut validate_buf)?;
assert_eq!(buf, validate_buf);
}
assert_eq!(total_bytes, file_len);
Ok(())
}
}