bvr_core/buf/
mod.rs

1//! The `buf` module contains the [SegBuffer] struct, which is the main
2//! interface for creating and interacting with the segmented buffers.
3
4pub mod segment;
5
6use self::segment::{SegBytes, SegStr, Segment};
7use crate::{index::BoxedStream, LineIndex, LineSet, Result};
8use lru::LruCache;
9use std::cell::RefCell;
10use std::fs::File;
11use std::io::{BufWriter, Cursor, Read, Write, Seek};
12use std::num::NonZeroUsize;
13use std::ops::Range;
14use std::sync::mpsc::{Receiver, TryRecvError};
15use std::sync::Arc;
16
17/// A segmented buffer that holds data in multiple segments.
18///
19/// The `Buffer` struct represents a buffer that is divided into multiple segments.
20/// It contains the [LineIndex] and the internal representation of the segments.
21pub struct SegBuffer {
22    /// The [LineIndex] of this buffer.
23    index: LineIndex,
24    /// The internal representation of this buffer.
25    repr: BufferRepr,
26}
27
28struct StreamInner {
29    pending_segs: Option<Receiver<Segment>>,
30    segments: Vec<Arc<Segment>>,
31}
32
33/// Internal representation of the segmented buffer, which allows for working
34/// with both files and streams of data. All segments are assumed to have
35/// the same size with the exception of the last segment.
36enum BufferRepr {
37    /// Data can be loaded on demand.
38    File {
39        file: File,
40        len: u64,
41        segments: RefCell<LruCache<usize, Arc<Segment>>>,
42    },
43    /// Data is all present in memory in multiple anonymous mmaps.
44    Stream(RefCell<StreamInner>),
45}
46
47impl BufferRepr {
48    fn fetch(&self, seg_id: usize) -> Option<Arc<Segment>> {
49        match self {
50            BufferRepr::File {
51                file,
52                len,
53                segments,
54            } => {
55                let range = Segment::data_range_of_id(seg_id);
56                let range = range.start..range.end.min(*len);
57                Some(
58                    segments
59                        .borrow_mut()
60                        .get_or_insert(seg_id, || {
61                            Arc::new(Segment::map_file(range, file).expect("mmap was successful"))
62                        })
63                        .clone(),
64                )
65            }
66            BufferRepr::Stream(inner) => {
67                let StreamInner {
68                    pending_segs,
69                    segments,
70                } = &mut *inner.borrow_mut();
71                if let Some(rx) = pending_segs {
72                    loop {
73                        match rx.try_recv() {
74                            Ok(segment) => segments.push(Arc::new(segment)),
75                            Err(TryRecvError::Empty) => break,
76                            Err(TryRecvError::Disconnected) => {
77                                *pending_segs = None;
78                                break;
79                            }
80                        }
81                    }
82                }
83                segments.get(seg_id).cloned()
84            }
85        }
86    }
87}
88
89impl SegBuffer {
90    pub fn read_file(file: File, seg_count: NonZeroUsize, complete: bool) -> Result<Self> {
91        let index = LineIndex::read_file(file.try_clone()?, complete)?;
92
93        Ok(Self {
94            index,
95            repr: BufferRepr::File {
96                len: file.metadata()?.len(),
97                file,
98                segments: RefCell::new(LruCache::new(seg_count)),
99            },
100        })
101    }
102
103    pub fn read_stream(stream: BoxedStream, complete: bool) -> Result<Self> {
104        let (sx, rx) = std::sync::mpsc::channel();
105        let index = LineIndex::read_stream(stream, sx, complete)?;
106
107        Ok(Self {
108            index,
109            repr: BufferRepr::Stream(RefCell::new(StreamInner {
110                pending_segs: Some(rx),
111                segments: Vec::new(),
112            })),
113        })
114    }
115
116    /// Return the line count of this [SegBuffer].
117    #[inline]
118    pub fn line_count(&self) -> usize {
119        self.index.line_count()
120    }
121
122    /// Return the [LineIndex] of this [SegBuffer].
123    #[inline]
124    pub fn index(&self) -> &LineIndex {
125        &self.index
126    }
127
128    pub fn get_bytes(&self, line_number: usize) -> Option<SegBytes> {
129        assert!(line_number <= self.line_count());
130
131        let data_start = self.index.data_of_line(line_number)?;
132        let data_end = self.index.data_of_line(line_number + 1)?;
133        let seg_start = Segment::id_of_data(data_start);
134        let seg_end = Segment::id_of_data(data_end);
135
136        if seg_start == seg_end {
137            // The data is in a single segment
138            let seg = self.repr.fetch(seg_start)?;
139            let range = seg.translate_inner_data_range(data_start, data_end);
140            Some(seg.get_bytes(range))
141        } else {
142            debug_assert!(seg_start < seg_end);
143            // The data may cross several segments, so we must piece together
144            // the data from across the segments.
145            let mut buf = Vec::with_capacity((data_end - data_start) as usize);
146
147            let seg_first = self.repr.fetch(seg_start)?;
148            let seg_last = self.repr.fetch(seg_end)?;
149            let (start, end) = (
150                seg_first.translate_inner_data_index(data_start),
151                seg_last.translate_inner_data_index(data_end),
152            );
153            buf.extend_from_slice(&seg_first[start as usize..]);
154            for seg_id in seg_start + 1..seg_end {
155                buf.extend_from_slice(&self.repr.fetch(seg_id)?);
156            }
157            buf.extend_from_slice(&seg_last[..end as usize]);
158
159            Some(SegBytes::new_owned(buf))
160        }
161    }
162
163    /// Retrieves a line of text from the buffer based on the given line number.
164    ///
165    /// # Panics
166    ///
167    /// This function will panic if the `line_number` is greater than the total number
168    /// of lines in the buffer's index.
169    ///
170    /// # Returns
171    ///
172    /// The line of text as a [SegStr] object.
173    pub fn get_line(&self, line_number: usize) -> Option<SegStr> {
174        Some(SegStr::from_bytes(self.get_bytes(line_number)?))
175    }
176
177    pub fn segment_iter(&self) -> Result<ContiguousSegmentIterator> {
178        match &self.repr {
179            BufferRepr::File { file, len, .. } => Ok(ContiguousSegmentIterator::new(
180                self.index.clone(),
181                0..self.index.line_count(),
182                BufferRepr::File {
183                    file: file.try_clone()?,
184                    len: *len,
185                    segments: RefCell::new(LruCache::new(NonZeroUsize::new(2).unwrap())),
186                },
187            )),
188            BufferRepr::Stream(inner) => Ok(ContiguousSegmentIterator::new(
189                self.index.clone(),
190                0..self.index.line_count(),
191                BufferRepr::Stream(RefCell::new(StreamInner {
192                    pending_segs: None,
193                    segments: inner.borrow().segments.clone(),
194                })),
195            )),
196        }
197    }
198
199    pub fn all_line_matches(&self) -> LineSet {
200        LineSet::all(self.index.clone())
201    }
202
203    pub fn write_to_file<W>(&mut self, output: &mut W, lines: &LineSet) -> Result<()>
204    where
205        W: Write,
206    {
207        if !lines.is_complete() {
208            return Err(crate::err::Error::InProgress);
209        }
210
211        match lines.snapshot() {
212            Some(snap) => {
213                let mut writer = BufWriter::new(output);
214                for &ln in snap.iter() {
215                    let line = self.get_bytes(ln).unwrap();
216                    writer.write_all(line.as_bytes())?;
217                }
218            }
219            None => match &mut self.repr {
220                BufferRepr::File { file, .. } => {
221                    file.seek(std::io::SeekFrom::Start(0))?;
222                    let mut output = output;
223                    std::io::copy(file, &mut output)?;
224                }
225                BufferRepr::Stream(inner) => {
226                    let mut writer = BufWriter::new(output);
227                    let inner = inner.borrow();
228
229                    for seg in inner.segments.iter() {
230                        writer.write_all(seg)?;
231                    }
232                }
233            },
234        }
235
236        Ok(())
237    }
238
239    pub fn write_to_string(&mut self, output: &mut String, lines: &LineSet) -> Result<()> {
240        if !lines.is_complete() {
241            return Err(crate::err::Error::InProgress);
242        }
243
244        match lines.snapshot() {
245            Some(snap) => {
246                for &ln in snap.iter() {
247                    let line = self.get_line(ln).unwrap();
248                    output.push_str(line.as_str());
249                }
250            }
251            None => match &mut self.repr {
252                BufferRepr::File { file, .. } => {
253                    file.seek(std::io::SeekFrom::Start(0))?;
254                    file.read_to_string(output)?;
255                }
256                BufferRepr::Stream(inner) => {
257                    let inner = inner.borrow();
258
259                    for seg in inner.segments.iter() {
260                        let mut reader = Cursor::new(&seg[..]);
261                        reader.read_to_string(output)?;
262                    }
263                }
264            },
265        }
266        output.truncate(output.trim_end().len());
267
268        Ok(())
269    }
270}
271
272pub struct ContiguousSegmentIterator {
273    pub index: LineIndex,
274    repr: BufferRepr,
275    line_range: Range<usize>,
276    // Intermediate buffer for the iterator to borrow from
277    // for the case where the line crosses multiple segments
278    imm_buf: Vec<u8>,
279    // Intermediate segment storage for the buffer to borrow from
280    // for when the buffer lies within a single segment
281    imm_seg: Option<Arc<Segment>>,
282}
283
284impl ContiguousSegmentIterator {
285    fn new(index: LineIndex, line_range: Range<usize>, repr: BufferRepr) -> Self {
286        Self {
287            line_range,
288            index,
289            repr,
290            imm_buf: Vec::new(),
291            imm_seg: None,
292        }
293    }
294
295    #[inline]
296    pub fn remaining_range(&self) -> Range<usize> {
297        self.line_range.clone()
298    }
299
300    /// Get the next buffer from the [ContiguousSegmentIterator].
301    ///
302    /// This function retrieves the next buffer from the `ContiguousSegmentIterator` and returns it as an `Option`.
303    /// If there are no more buffers available, it returns `None`.
304    ///
305    /// # Returns
306    ///
307    /// - `Some((&Idx, u64, &[u8]))`: A tuple containing the index, starting data
308    ///                               position, and a slice of the buffer data.
309    /// - `None`: If there are no more buffers available.
310    pub fn next_buf(&mut self) -> Option<(&LineIndex, u64, &[u8])> {
311        if self.line_range.is_empty() {
312            return None;
313        }
314
315        let curr_line = self.line_range.start;
316        let curr_line_data_start = self.index.data_of_line(curr_line)?;
317        let curr_line_data_end = self.index.data_of_line(curr_line + 1)?;
318
319        let curr_line_seg_start = Segment::id_of_data(curr_line_data_start);
320        let curr_line_seg_end = Segment::id_of_data(curr_line_data_end);
321
322        if curr_line_seg_end != curr_line_seg_start {
323            self.imm_buf.clear();
324            self.imm_buf
325                .reserve((curr_line_data_end - curr_line_data_start) as usize);
326
327            let seg_first = self.repr.fetch(curr_line_seg_start)?;
328            let seg_last = self.repr.fetch(curr_line_seg_end)?;
329            let (start, end) = (
330                seg_first.translate_inner_data_index(curr_line_data_start),
331                seg_last.translate_inner_data_index(curr_line_data_end),
332            );
333
334            self.imm_buf.extend_from_slice(&seg_first[start as usize..]);
335            for seg_id in curr_line_seg_start + 1..curr_line_seg_end {
336                self.imm_buf.extend_from_slice(&self.repr.fetch(seg_id)?);
337            }
338            self.imm_buf.extend_from_slice(&seg_last[..end as usize]);
339
340            self.line_range.start += 1;
341            Some((&self.index, curr_line_data_start, &self.imm_buf))
342        } else {
343            let curr_seg_data_start = curr_line_seg_start as u64 * Segment::MAX_SIZE;
344            let curr_seg_data_end = curr_seg_data_start + Segment::MAX_SIZE;
345
346            let line_end = self
347                .index
348                .line_of_data(curr_seg_data_end)
349                .unwrap_or_else(|| self.index.line_count())
350                .min(self.line_range.end);
351            let line_end_data_start = self.index.data_of_line(line_end)?;
352
353            // this line should not cross multiple segments, else we would have caught in the first case
354            let segment = self.repr.fetch(curr_line_seg_start)?;
355            let range =
356                segment.translate_inner_data_range(curr_line_data_start, line_end_data_start);
357            assert!(line_end_data_start - curr_seg_data_start <= Segment::MAX_SIZE);
358            assert!(range.end <= Segment::MAX_SIZE);
359
360            self.line_range.start = line_end;
361            let segment = self.imm_seg.insert(segment);
362
363            // line must end at the boundary
364            Some((
365                &self.index,
366                curr_line_data_start,
367                &segment[range.start as usize..range.end as usize],
368            ))
369        }
370    }
371}
372
373#[cfg(test)]
374mod test {
375    use anyhow::Result;
376    use std::{
377        fs::File,
378        io::{BufReader, Read},
379        num::NonZeroUsize,
380    };
381
382    use crate::buf::SegBuffer;
383
384    #[test]
385    fn file_stream_consistency_1() -> Result<()> {
386        file_stream_consistency_base(File::open("../../tests/test_10.log")?, 10)
387    }
388
389    #[test]
390    fn file_stream_consistency_2() -> Result<()> {
391        file_stream_consistency_base(File::open("../../tests/test_50_long.log")?, 50)
392    }
393
394    #[test]
395    fn file_stream_consistency_3() -> Result<()> {
396        file_stream_consistency_base(File::open("../../tests/test_5000000.log")?, 5_000_000)
397    }
398
399    fn file_stream_consistency_base(file: File, line_count: usize) -> Result<()> {
400        let stream = BufReader::new(file.try_clone()?);
401
402        let file_index = SegBuffer::read_file(file, NonZeroUsize::new(25).unwrap(), true)?;
403        let stream_index = SegBuffer::read_stream(Box::new(stream), true)?;
404
405        assert_eq!(file_index.line_count(), stream_index.line_count());
406        assert_eq!(file_index.line_count(), line_count);
407        for i in 0..file_index.line_count() {
408            assert_eq!(
409                file_index.get_line(i).unwrap().as_str(),
410                stream_index.get_line(i).unwrap().as_str()
411            );
412        }
413
414        Ok(())
415    }
416
417    #[test]
418    #[cfg_attr(miri, ignore)]
419    fn multi_buffer_consistency_1() -> Result<()> {
420        multi_buffer_consistency_base(File::open("../../tests/test_10.log")?)
421    }
422
423    #[test]
424    #[cfg_attr(miri, ignore)]
425    fn multi_buffer_consistency_2() -> Result<()> {
426        multi_buffer_consistency_base(File::open("../../tests/test_50_long.log")?)
427    }
428
429    #[test]
430    #[cfg_attr(miri, ignore)]
431    fn multi_buffer_consistency_3() -> Result<()> {
432        multi_buffer_consistency_base(File::open("../../tests/test_5000000.log")?)
433    }
434
435    fn multi_buffer_consistency_base(file: File) -> Result<()> {
436        let file_len = file.metadata()?.len();
437        let mut reader = BufReader::new(file.try_clone()?);
438
439        let file_buffer = SegBuffer::read_file(file, NonZeroUsize::new(25).unwrap(), true)?;
440        let mut buffers = file_buffer.segment_iter()?;
441
442        let mut total_bytes = 0;
443        let mut validate_buf = Vec::new();
444        while let Some((_, start, buf)) = buffers.next_buf() {
445            // Validate that the specialized slice reader and normal sequential reads are consistent
446            assert_eq!(start, total_bytes);
447            total_bytes += buf.len() as u64;
448            validate_buf.resize(buf.len(), 0);
449            reader.read_exact(&mut validate_buf)?;
450            assert_eq!(buf, validate_buf);
451        }
452        assert_eq!(total_bytes, file_len);
453
454        Ok(())
455    }
456}