1pub mod segment;
5
6use self::segment::{SegBytes, SegStr, Segment};
7use crate::{index::BoxedStream, LineIndex, LineSet, Result};
8use lru::LruCache;
9use std::cell::RefCell;
10use std::fs::File;
11use std::io::{BufWriter, Cursor, Read, Write, Seek};
12use std::num::NonZeroUsize;
13use std::ops::Range;
14use std::sync::mpsc::{Receiver, TryRecvError};
15use std::sync::Arc;
16
17pub struct SegBuffer {
22 index: LineIndex,
24 repr: BufferRepr,
26}
27
28struct StreamInner {
29 pending_segs: Option<Receiver<Segment>>,
30 segments: Vec<Arc<Segment>>,
31}
32
33enum BufferRepr {
37 File {
39 file: File,
40 len: u64,
41 segments: RefCell<LruCache<usize, Arc<Segment>>>,
42 },
43 Stream(RefCell<StreamInner>),
45}
46
47impl BufferRepr {
48 fn fetch(&self, seg_id: usize) -> Option<Arc<Segment>> {
49 match self {
50 BufferRepr::File {
51 file,
52 len,
53 segments,
54 } => {
55 let range = Segment::data_range_of_id(seg_id);
56 let range = range.start..range.end.min(*len);
57 Some(
58 segments
59 .borrow_mut()
60 .get_or_insert(seg_id, || {
61 Arc::new(Segment::map_file(range, file).expect("mmap was successful"))
62 })
63 .clone(),
64 )
65 }
66 BufferRepr::Stream(inner) => {
67 let StreamInner {
68 pending_segs,
69 segments,
70 } = &mut *inner.borrow_mut();
71 if let Some(rx) = pending_segs {
72 loop {
73 match rx.try_recv() {
74 Ok(segment) => segments.push(Arc::new(segment)),
75 Err(TryRecvError::Empty) => break,
76 Err(TryRecvError::Disconnected) => {
77 *pending_segs = None;
78 break;
79 }
80 }
81 }
82 }
83 segments.get(seg_id).cloned()
84 }
85 }
86 }
87}
88
89impl SegBuffer {
90 pub fn read_file(file: File, seg_count: NonZeroUsize, complete: bool) -> Result<Self> {
91 let index = LineIndex::read_file(file.try_clone()?, complete)?;
92
93 Ok(Self {
94 index,
95 repr: BufferRepr::File {
96 len: file.metadata()?.len(),
97 file,
98 segments: RefCell::new(LruCache::new(seg_count)),
99 },
100 })
101 }
102
103 pub fn read_stream(stream: BoxedStream, complete: bool) -> Result<Self> {
104 let (sx, rx) = std::sync::mpsc::channel();
105 let index = LineIndex::read_stream(stream, sx, complete)?;
106
107 Ok(Self {
108 index,
109 repr: BufferRepr::Stream(RefCell::new(StreamInner {
110 pending_segs: Some(rx),
111 segments: Vec::new(),
112 })),
113 })
114 }
115
116 #[inline]
118 pub fn line_count(&self) -> usize {
119 self.index.line_count()
120 }
121
122 #[inline]
124 pub fn index(&self) -> &LineIndex {
125 &self.index
126 }
127
128 pub fn get_bytes(&self, line_number: usize) -> Option<SegBytes> {
129 assert!(line_number <= self.line_count());
130
131 let data_start = self.index.data_of_line(line_number)?;
132 let data_end = self.index.data_of_line(line_number + 1)?;
133 let seg_start = Segment::id_of_data(data_start);
134 let seg_end = Segment::id_of_data(data_end);
135
136 if seg_start == seg_end {
137 let seg = self.repr.fetch(seg_start)?;
139 let range = seg.translate_inner_data_range(data_start, data_end);
140 Some(seg.get_bytes(range))
141 } else {
142 debug_assert!(seg_start < seg_end);
143 let mut buf = Vec::with_capacity((data_end - data_start) as usize);
146
147 let seg_first = self.repr.fetch(seg_start)?;
148 let seg_last = self.repr.fetch(seg_end)?;
149 let (start, end) = (
150 seg_first.translate_inner_data_index(data_start),
151 seg_last.translate_inner_data_index(data_end),
152 );
153 buf.extend_from_slice(&seg_first[start as usize..]);
154 for seg_id in seg_start + 1..seg_end {
155 buf.extend_from_slice(&self.repr.fetch(seg_id)?);
156 }
157 buf.extend_from_slice(&seg_last[..end as usize]);
158
159 Some(SegBytes::new_owned(buf))
160 }
161 }
162
163 pub fn get_line(&self, line_number: usize) -> Option<SegStr> {
174 Some(SegStr::from_bytes(self.get_bytes(line_number)?))
175 }
176
177 pub fn segment_iter(&self) -> Result<ContiguousSegmentIterator> {
178 match &self.repr {
179 BufferRepr::File { file, len, .. } => Ok(ContiguousSegmentIterator::new(
180 self.index.clone(),
181 0..self.index.line_count(),
182 BufferRepr::File {
183 file: file.try_clone()?,
184 len: *len,
185 segments: RefCell::new(LruCache::new(NonZeroUsize::new(2).unwrap())),
186 },
187 )),
188 BufferRepr::Stream(inner) => Ok(ContiguousSegmentIterator::new(
189 self.index.clone(),
190 0..self.index.line_count(),
191 BufferRepr::Stream(RefCell::new(StreamInner {
192 pending_segs: None,
193 segments: inner.borrow().segments.clone(),
194 })),
195 )),
196 }
197 }
198
199 pub fn all_line_matches(&self) -> LineSet {
200 LineSet::all(self.index.clone())
201 }
202
203 pub fn write_to_file<W>(&mut self, output: &mut W, lines: &LineSet) -> Result<()>
204 where
205 W: Write,
206 {
207 if !lines.is_complete() {
208 return Err(crate::err::Error::InProgress);
209 }
210
211 match lines.snapshot() {
212 Some(snap) => {
213 let mut writer = BufWriter::new(output);
214 for &ln in snap.iter() {
215 let line = self.get_bytes(ln).unwrap();
216 writer.write_all(line.as_bytes())?;
217 }
218 }
219 None => match &mut self.repr {
220 BufferRepr::File { file, .. } => {
221 file.seek(std::io::SeekFrom::Start(0))?;
222 let mut output = output;
223 std::io::copy(file, &mut output)?;
224 }
225 BufferRepr::Stream(inner) => {
226 let mut writer = BufWriter::new(output);
227 let inner = inner.borrow();
228
229 for seg in inner.segments.iter() {
230 writer.write_all(seg)?;
231 }
232 }
233 },
234 }
235
236 Ok(())
237 }
238
239 pub fn write_to_string(&mut self, output: &mut String, lines: &LineSet) -> Result<()> {
240 if !lines.is_complete() {
241 return Err(crate::err::Error::InProgress);
242 }
243
244 match lines.snapshot() {
245 Some(snap) => {
246 for &ln in snap.iter() {
247 let line = self.get_line(ln).unwrap();
248 output.push_str(line.as_str());
249 }
250 }
251 None => match &mut self.repr {
252 BufferRepr::File { file, .. } => {
253 file.seek(std::io::SeekFrom::Start(0))?;
254 file.read_to_string(output)?;
255 }
256 BufferRepr::Stream(inner) => {
257 let inner = inner.borrow();
258
259 for seg in inner.segments.iter() {
260 let mut reader = Cursor::new(&seg[..]);
261 reader.read_to_string(output)?;
262 }
263 }
264 },
265 }
266 output.truncate(output.trim_end().len());
267
268 Ok(())
269 }
270}
271
272pub struct ContiguousSegmentIterator {
273 pub index: LineIndex,
274 repr: BufferRepr,
275 line_range: Range<usize>,
276 imm_buf: Vec<u8>,
279 imm_seg: Option<Arc<Segment>>,
282}
283
284impl ContiguousSegmentIterator {
285 fn new(index: LineIndex, line_range: Range<usize>, repr: BufferRepr) -> Self {
286 Self {
287 line_range,
288 index,
289 repr,
290 imm_buf: Vec::new(),
291 imm_seg: None,
292 }
293 }
294
295 #[inline]
296 pub fn remaining_range(&self) -> Range<usize> {
297 self.line_range.clone()
298 }
299
300 pub fn next_buf(&mut self) -> Option<(&LineIndex, u64, &[u8])> {
311 if self.line_range.is_empty() {
312 return None;
313 }
314
315 let curr_line = self.line_range.start;
316 let curr_line_data_start = self.index.data_of_line(curr_line)?;
317 let curr_line_data_end = self.index.data_of_line(curr_line + 1)?;
318
319 let curr_line_seg_start = Segment::id_of_data(curr_line_data_start);
320 let curr_line_seg_end = Segment::id_of_data(curr_line_data_end);
321
322 if curr_line_seg_end != curr_line_seg_start {
323 self.imm_buf.clear();
324 self.imm_buf
325 .reserve((curr_line_data_end - curr_line_data_start) as usize);
326
327 let seg_first = self.repr.fetch(curr_line_seg_start)?;
328 let seg_last = self.repr.fetch(curr_line_seg_end)?;
329 let (start, end) = (
330 seg_first.translate_inner_data_index(curr_line_data_start),
331 seg_last.translate_inner_data_index(curr_line_data_end),
332 );
333
334 self.imm_buf.extend_from_slice(&seg_first[start as usize..]);
335 for seg_id in curr_line_seg_start + 1..curr_line_seg_end {
336 self.imm_buf.extend_from_slice(&self.repr.fetch(seg_id)?);
337 }
338 self.imm_buf.extend_from_slice(&seg_last[..end as usize]);
339
340 self.line_range.start += 1;
341 Some((&self.index, curr_line_data_start, &self.imm_buf))
342 } else {
343 let curr_seg_data_start = curr_line_seg_start as u64 * Segment::MAX_SIZE;
344 let curr_seg_data_end = curr_seg_data_start + Segment::MAX_SIZE;
345
346 let line_end = self
347 .index
348 .line_of_data(curr_seg_data_end)
349 .unwrap_or_else(|| self.index.line_count())
350 .min(self.line_range.end);
351 let line_end_data_start = self.index.data_of_line(line_end)?;
352
353 let segment = self.repr.fetch(curr_line_seg_start)?;
355 let range =
356 segment.translate_inner_data_range(curr_line_data_start, line_end_data_start);
357 assert!(line_end_data_start - curr_seg_data_start <= Segment::MAX_SIZE);
358 assert!(range.end <= Segment::MAX_SIZE);
359
360 self.line_range.start = line_end;
361 let segment = self.imm_seg.insert(segment);
362
363 Some((
365 &self.index,
366 curr_line_data_start,
367 &segment[range.start as usize..range.end as usize],
368 ))
369 }
370 }
371}
372
373#[cfg(test)]
374mod test {
375 use anyhow::Result;
376 use std::{
377 fs::File,
378 io::{BufReader, Read},
379 num::NonZeroUsize,
380 };
381
382 use crate::buf::SegBuffer;
383
384 #[test]
385 fn file_stream_consistency_1() -> Result<()> {
386 file_stream_consistency_base(File::open("../../tests/test_10.log")?, 10)
387 }
388
389 #[test]
390 fn file_stream_consistency_2() -> Result<()> {
391 file_stream_consistency_base(File::open("../../tests/test_50_long.log")?, 50)
392 }
393
394 #[test]
395 fn file_stream_consistency_3() -> Result<()> {
396 file_stream_consistency_base(File::open("../../tests/test_5000000.log")?, 5_000_000)
397 }
398
399 fn file_stream_consistency_base(file: File, line_count: usize) -> Result<()> {
400 let stream = BufReader::new(file.try_clone()?);
401
402 let file_index = SegBuffer::read_file(file, NonZeroUsize::new(25).unwrap(), true)?;
403 let stream_index = SegBuffer::read_stream(Box::new(stream), true)?;
404
405 assert_eq!(file_index.line_count(), stream_index.line_count());
406 assert_eq!(file_index.line_count(), line_count);
407 for i in 0..file_index.line_count() {
408 assert_eq!(
409 file_index.get_line(i).unwrap().as_str(),
410 stream_index.get_line(i).unwrap().as_str()
411 );
412 }
413
414 Ok(())
415 }
416
417 #[test]
418 #[cfg_attr(miri, ignore)]
419 fn multi_buffer_consistency_1() -> Result<()> {
420 multi_buffer_consistency_base(File::open("../../tests/test_10.log")?)
421 }
422
423 #[test]
424 #[cfg_attr(miri, ignore)]
425 fn multi_buffer_consistency_2() -> Result<()> {
426 multi_buffer_consistency_base(File::open("../../tests/test_50_long.log")?)
427 }
428
429 #[test]
430 #[cfg_attr(miri, ignore)]
431 fn multi_buffer_consistency_3() -> Result<()> {
432 multi_buffer_consistency_base(File::open("../../tests/test_5000000.log")?)
433 }
434
435 fn multi_buffer_consistency_base(file: File) -> Result<()> {
436 let file_len = file.metadata()?.len();
437 let mut reader = BufReader::new(file.try_clone()?);
438
439 let file_buffer = SegBuffer::read_file(file, NonZeroUsize::new(25).unwrap(), true)?;
440 let mut buffers = file_buffer.segment_iter()?;
441
442 let mut total_bytes = 0;
443 let mut validate_buf = Vec::new();
444 while let Some((_, start, buf)) = buffers.next_buf() {
445 assert_eq!(start, total_bytes);
447 total_bytes += buf.len() as u64;
448 validate_buf.resize(buf.len(), 0);
449 reader.read_exact(&mut validate_buf)?;
450 assert_eq!(buf, validate_buf);
451 }
452 assert_eq!(total_bytes, file_len);
453
454 Ok(())
455 }
456}