1use crate::buf::segment::{Segment, SegmentMut};
2use crate::cowvec::{CowVec, CowVecWriter};
3use crate::err::{Error, Result};
4use std::fs::File;
5use std::sync::mpsc::{Receiver, Sender};
6use std::sync::{atomic::AtomicBool, Arc};
7use std::thread::JoinHandle;
8
9struct IndexingTask {
10 sx: Sender<u64>,
12 segment: Segment,
13}
14
15impl IndexingTask {
16 #[inline]
17 fn new(file: &File, start: u64, end: u64) -> Result<(Self, Receiver<u64>)> {
18 let segment = Segment::map_file(start..end, file)?;
19 let (sx, rx) = std::sync::mpsc::channel();
20 Ok((Self { sx, segment }, rx))
21 }
22
23 fn compute(self) -> Result<()> {
24 for i in memchr::memchr_iter(b'\n', &self.segment) {
25 self.sx
26 .send(self.segment.start() + i as u64 + 1)
27 .map_err(|_| Error::Internal)?;
28 }
29
30 Ok(())
31 }
32}
33
34pub type BoxedStream = Box<dyn std::io::Read + Send>;
36
37struct LineIndexRemote {
40 buf: CowVecWriter<u64>,
41 completed: Arc<AtomicBool>,
42}
43
44impl LineIndexRemote {
45 const BYTES_PER_LINE_HEURISTIC: u64 = 128;
46
47 pub fn index_file(mut self, file: File) -> Result<()> {
48 let (sx, rx) = std::sync::mpsc::sync_channel(4);
50
51 let len = file.metadata()?.len();
52 let file = file.try_clone()?;
53
54 self.buf
55 .reserve((len / Self::BYTES_PER_LINE_HEURISTIC) as usize);
56 self.buf.push(0);
57
58 let spawner: JoinHandle<Result<()>> = std::thread::spawn(move || {
60 let mut curr = 0;
61
62 while curr < len {
63 let end = (curr + Segment::MAX_SIZE).min(len);
64 let (task, task_rx) = IndexingTask::new(&file, curr, end)?;
65 sx.send(task_rx).map_err(|_| Error::Internal)?;
66
67 std::thread::spawn(|| task.compute());
68
69 curr = end;
70 }
71
72 Ok(())
73 });
74
75 while let Ok(task_rx) = rx.recv() {
76 if !self.has_readers() {
77 break;
78 }
79
80 while let Ok(line_data) = task_rx.recv() {
81 self.buf.push(line_data);
82 }
83 }
84
85 spawner.join().map_err(|_| Error::Internal)??;
86 self.buf.push(len);
87
88 Ok(())
89 }
90
91 pub fn index_stream(
92 mut self,
93 mut stream: BoxedStream,
94 outgoing: Sender<Segment>,
95 ) -> Result<()> {
96 let mut len = 0;
97
98 self.buf.push(0);
99
100 loop {
101 let mut segment = SegmentMut::new(len)?;
102
103 let mut buf_len = 0;
104 loop {
105 match stream.read(&mut segment[buf_len..])? {
106 0 => break,
107 l => buf_len += l,
108 }
109 }
110
111 for i in memchr::memchr_iter(b'\n', &segment) {
112 let line_data = len + i as u64;
113 self.buf.push(line_data + 1);
114 }
115
116 outgoing
117 .send(segment.into_read_only()?)
118 .map_err(|_| Error::Internal)?;
119
120 if buf_len == 0 {
121 break;
122 }
123
124 len += buf_len as u64;
125 }
126
127 self.buf.push(len);
128 Ok(())
129 }
130
131 pub fn has_readers(&self) -> bool {
132 Arc::strong_count(&self.completed) > 1
133 }
134}
135
136impl Drop for LineIndex {
137 fn drop(&mut self) {
138 self.completed
139 .store(true, std::sync::atomic::Ordering::Relaxed);
140 }
141}
142
143#[derive(Clone)]
144pub struct LineIndex {
145 buf: CowVec<u64>,
146 completed: Arc<AtomicBool>,
147}
148
149impl LineIndex {
150 #[inline]
151 pub fn read_file(file: File, complete: bool) -> Result<Self> {
152 let (buf, writer) = CowVec::new();
153 let completed = Arc::new(AtomicBool::new(false));
154 let task = {
155 let completed = completed.clone();
156 move || {
157 LineIndexRemote {
158 buf: writer,
159 completed,
160 }
161 .index_file(file)
162 }
163 };
164 if complete {
165 task()?;
166 } else {
167 std::thread::spawn(task);
168 }
169 Ok(Self { buf, completed })
170 }
171
172 #[inline]
173 pub fn read_stream(
174 stream: BoxedStream,
175 outgoing: Sender<Segment>,
176 complete: bool,
177 ) -> Result<Self> {
178 let (buf, writer) = CowVec::new();
179 let completed = Arc::new(AtomicBool::new(false));
180 let task = {
181 let completed = completed.clone();
182 move || {
183 LineIndexRemote {
184 buf: writer,
185 completed,
186 }
187 .index_stream(stream, outgoing)
188 }
189 };
190 if complete {
191 task()?;
192 } else {
193 std::thread::spawn(task);
194 }
195 Ok(Self { buf, completed })
196 }
197
198 pub fn line_count(&self) -> usize {
199 self.buf.len().saturating_sub(1)
200 }
201
202 pub fn data_of_line(&self, line_number: usize) -> Option<u64> {
203 self.buf.get(line_number)
204 }
205
206 pub fn line_of_data(&self, key: u64) -> Option<usize> {
207 let buf = self.buf.snapshot();
209 let mut size = buf.len().saturating_sub(1);
210 let mut left = 0;
211 let mut right = size;
212 while left < right {
213 let mid = left + size / 2;
214
215 let start = unsafe { buf.get_unchecked(mid) };
217 let end = unsafe { buf.get_unchecked(mid + 1) };
218
219 if end <= key {
220 left = mid + 1;
221 } else if start > key {
222 right = mid;
223 } else {
224 return Some(mid);
225 }
226
227 size = right - left;
228 }
229
230 None
231 }
232
233 #[inline]
234 pub fn is_complete(&self) -> bool {
235 self.completed.load(std::sync::atomic::Ordering::Relaxed)
236 }
237}