bvr_core/
index.rs

1use crate::buf::segment::{Segment, SegmentMut};
2use crate::cowvec::{CowVec, CowVecWriter};
3use crate::err::{Error, Result};
4use std::fs::File;
5use std::sync::mpsc::{Receiver, Sender};
6use std::sync::{atomic::AtomicBool, Arc};
7use std::thread::JoinHandle;
8
9struct IndexingTask {
10    /// This is the sender side of the channel that receives byte indexes of `\n`.
11    sx: Sender<u64>,
12    segment: Segment,
13}
14
15impl IndexingTask {
16    #[inline]
17    fn new(file: &File, start: u64, end: u64) -> Result<(Self, Receiver<u64>)> {
18        let segment = Segment::map_file(start..end, file)?;
19        let (sx, rx) = std::sync::mpsc::channel();
20        Ok((Self { sx, segment }, rx))
21    }
22
23    fn compute(self) -> Result<()> {
24        for i in memchr::memchr_iter(b'\n', &self.segment) {
25            self.sx
26                .send(self.segment.start() + i as u64 + 1)
27                .map_err(|_| Error::Internal)?;
28        }
29
30        Ok(())
31    }
32}
33
34/// Generalized type for streams passed into [LineIndex].
35pub type BoxedStream = Box<dyn std::io::Read + Send>;
36
37/// A remote type that can be used to set off the indexing process of a
38/// file or a stream.
39struct LineIndexRemote {
40    buf: CowVecWriter<u64>,
41    completed: Arc<AtomicBool>,
42}
43
44impl LineIndexRemote {
45    const BYTES_PER_LINE_HEURISTIC: u64 = 128;
46
47    pub fn index_file(mut self, file: File) -> Result<()> {
48        // Build index
49        let (sx, rx) = std::sync::mpsc::sync_channel(4);
50
51        let len = file.metadata()?.len();
52        let file = file.try_clone()?;
53
54        self.buf
55            .reserve((len / Self::BYTES_PER_LINE_HEURISTIC) as usize);
56        self.buf.push(0);
57
58        // Indexing worker
59        let spawner: JoinHandle<Result<()>> = std::thread::spawn(move || {
60            let mut curr = 0;
61
62            while curr < len {
63                let end = (curr + Segment::MAX_SIZE).min(len);
64                let (task, task_rx) = IndexingTask::new(&file, curr, end)?;
65                sx.send(task_rx).map_err(|_| Error::Internal)?;
66
67                std::thread::spawn(|| task.compute());
68
69                curr = end;
70            }
71
72            Ok(())
73        });
74
75        while let Ok(task_rx) = rx.recv() {
76            if !self.has_readers() {
77                break;
78            }
79
80            while let Ok(line_data) = task_rx.recv() {
81                self.buf.push(line_data);
82            }
83        }
84
85        spawner.join().map_err(|_| Error::Internal)??;
86        self.buf.push(len);
87
88        Ok(())
89    }
90
91    pub fn index_stream(
92        mut self,
93        mut stream: BoxedStream,
94        outgoing: Sender<Segment>,
95    ) -> Result<()> {
96        let mut len = 0;
97
98        self.buf.push(0);
99
100        loop {
101            let mut segment = SegmentMut::new(len)?;
102
103            let mut buf_len = 0;
104            loop {
105                match stream.read(&mut segment[buf_len..])? {
106                    0 => break,
107                    l => buf_len += l,
108                }
109            }
110
111            for i in memchr::memchr_iter(b'\n', &segment) {
112                let line_data = len + i as u64;
113                self.buf.push(line_data + 1);
114            }
115
116            outgoing
117                .send(segment.into_read_only()?)
118                .map_err(|_| Error::Internal)?;
119
120            if buf_len == 0 {
121                break;
122            }
123
124            len += buf_len as u64;
125        }
126
127        self.buf.push(len);
128        Ok(())
129    }
130
131    pub fn has_readers(&self) -> bool {
132        Arc::strong_count(&self.completed) > 1
133    }
134}
135
136impl Drop for LineIndex {
137    fn drop(&mut self) {
138        self.completed
139            .store(true, std::sync::atomic::Ordering::Relaxed);
140    }
141}
142
143#[derive(Clone)]
144pub struct LineIndex {
145    buf: CowVec<u64>,
146    completed: Arc<AtomicBool>,
147}
148
149impl LineIndex {
150    #[inline]
151    pub fn read_file(file: File, complete: bool) -> Result<Self> {
152        let (buf, writer) = CowVec::new();
153        let completed = Arc::new(AtomicBool::new(false));
154        let task = {
155            let completed = completed.clone();
156            move || {
157                LineIndexRemote {
158                    buf: writer,
159                    completed,
160                }
161                .index_file(file)
162            }
163        };
164        if complete {
165            task()?;
166        } else {
167            std::thread::spawn(task);
168        }
169        Ok(Self { buf, completed })
170    }
171
172    #[inline]
173    pub fn read_stream(
174        stream: BoxedStream,
175        outgoing: Sender<Segment>,
176        complete: bool,
177    ) -> Result<Self> {
178        let (buf, writer) = CowVec::new();
179        let completed = Arc::new(AtomicBool::new(false));
180        let task = {
181            let completed = completed.clone();
182            move || {
183                LineIndexRemote {
184                    buf: writer,
185                    completed,
186                }
187                .index_stream(stream, outgoing)
188            }
189        };
190        if complete {
191            task()?;
192        } else {
193            std::thread::spawn(task);
194        }
195        Ok(Self { buf, completed })
196    }
197
198    pub fn line_count(&self) -> usize {
199        self.buf.len().saturating_sub(1)
200    }
201
202    pub fn data_of_line(&self, line_number: usize) -> Option<u64> {
203        self.buf.get(line_number)
204    }
205
206    pub fn line_of_data(&self, key: u64) -> Option<usize> {
207        // Safety: this code was pulled from Vec::binary_search_by
208        let buf = self.buf.snapshot();
209        let mut size = buf.len().saturating_sub(1);
210        let mut left = 0;
211        let mut right = size;
212        while left < right {
213            let mid = left + size / 2;
214
215            // mid must be less than size, which is self.line_index.len() - 1
216            let start = unsafe { buf.get_unchecked(mid) };
217            let end = unsafe { buf.get_unchecked(mid + 1) };
218
219            if end <= key {
220                left = mid + 1;
221            } else if start > key {
222                right = mid;
223            } else {
224                return Some(mid);
225            }
226
227            size = right - left;
228        }
229
230        None
231    }
232
233    #[inline]
234    pub fn is_complete(&self) -> bool {
235        self.completed.load(std::sync::atomic::Ordering::Relaxed)
236    }
237}