use crate::buf::segment::{Segment, SegmentMut};
use crate::cowvec::{CowVec, CowVecWriter};
use crate::err::{Error, Result};
use std::fs::File;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{atomic::AtomicBool, Arc};
use std::thread::JoinHandle;
struct IndexingTask {
sx: Sender<u64>,
segment: Segment,
}
impl IndexingTask {
#[inline]
fn new(file: &File, start: u64, end: u64) -> Result<(Self, Receiver<u64>)> {
let segment = Segment::map_file(start..end, file)?;
let (sx, rx) = std::sync::mpsc::channel();
Ok((Self { sx, segment }, rx))
}
fn compute(self) -> Result<()> {
for i in memchr::memchr_iter(b'\n', &self.segment) {
self.sx
.send(self.segment.start() + i as u64 + 1)
.map_err(|_| Error::Internal)?;
}
Ok(())
}
}
pub type BoxedStream = Box<dyn std::io::Read + Send>;
struct LineIndexRemote {
buf: CowVecWriter<u64>,
completed: Arc<AtomicBool>,
}
impl LineIndexRemote {
const BYTES_PER_LINE_HEURISTIC: u64 = 128;
pub fn index_file(mut self, file: File) -> Result<()> {
let (sx, rx) = std::sync::mpsc::sync_channel(4);
let len = file.metadata()?.len();
let file = file.try_clone()?;
self.buf
.reserve((len / Self::BYTES_PER_LINE_HEURISTIC) as usize);
self.buf.push(0);
let spawner: JoinHandle<Result<()>> = std::thread::spawn(move || {
let mut curr = 0;
while curr < len {
let end = (curr + Segment::MAX_SIZE).min(len);
let (task, task_rx) = IndexingTask::new(&file, curr, end)?;
sx.send(task_rx).map_err(|_| Error::Internal)?;
std::thread::spawn(|| task.compute());
curr = end;
}
Ok(())
});
while let Ok(task_rx) = rx.recv() {
if !self.has_readers() {
break;
}
while let Ok(line_data) = task_rx.recv() {
self.buf.push(line_data);
}
}
spawner.join().map_err(|_| Error::Internal)??;
self.buf.push(len);
Ok(())
}
pub fn index_stream(
mut self,
mut stream: BoxedStream,
outgoing: Sender<Segment>,
) -> Result<()> {
let mut len = 0;
self.buf.push(0);
loop {
let mut segment = SegmentMut::new(len)?;
let mut buf_len = 0;
loop {
match stream.read(&mut segment[buf_len..])? {
0 => break,
l => buf_len += l,
}
}
for i in memchr::memchr_iter(b'\n', &segment) {
let line_data = len + i as u64;
self.buf.push(line_data + 1);
}
outgoing
.send(segment.into_read_only()?)
.map_err(|_| Error::Internal)?;
if buf_len == 0 {
break;
}
len += buf_len as u64;
}
self.buf.push(len);
Ok(())
}
pub fn has_readers(&self) -> bool {
Arc::strong_count(&self.completed) > 1
}
}
impl Drop for LineIndex {
fn drop(&mut self) {
self.completed
.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
#[derive(Clone)]
pub struct LineIndex {
buf: CowVec<u64>,
completed: Arc<AtomicBool>,
}
impl LineIndex {
#[inline]
pub fn read_file(file: File, complete: bool) -> Result<Self> {
let (buf, writer) = CowVec::new();
let completed = Arc::new(AtomicBool::new(false));
let task = {
let completed = completed.clone();
move || {
LineIndexRemote {
buf: writer,
completed,
}
.index_file(file)
}
};
if complete {
task()?;
} else {
std::thread::spawn(task);
}
Ok(Self { buf, completed })
}
#[inline]
pub fn read_stream(
stream: BoxedStream,
outgoing: Sender<Segment>,
complete: bool,
) -> Result<Self> {
let (buf, writer) = CowVec::new();
let completed = Arc::new(AtomicBool::new(false));
let task = {
let completed = completed.clone();
move || {
LineIndexRemote {
buf: writer,
completed,
}
.index_stream(stream, outgoing)
}
};
if complete {
task()?;
} else {
std::thread::spawn(task);
}
Ok(Self { buf, completed })
}
pub fn line_count(&self) -> usize {
self.buf.len().saturating_sub(1)
}
pub fn data_of_line(&self, line_number: usize) -> Option<u64> {
self.buf.get(line_number)
}
pub fn line_of_data(&self, key: u64) -> Option<usize> {
let buf = self.buf.snapshot();
let mut size = buf.len().saturating_sub(1);
let mut left = 0;
let mut right = size;
while left < right {
let mid = left + size / 2;
let start = unsafe { buf.get_unchecked(mid) };
let end = unsafe { buf.get_unchecked(mid + 1) };
if end <= key {
left = mid + 1;
} else if start > key {
right = mid;
} else {
return Some(mid);
}
size = right - left;
}
None
}
#[inline]
pub fn is_complete(&self) -> bool {
self.completed.load(std::sync::atomic::Ordering::Relaxed)
}
}