1use super::RowError;
2use crate::util::is_truthy;
3use crate::{JetroEngine, JetroEngineError};
4use memchr::memrchr;
5use serde_json::Value;
6use std::collections::VecDeque;
7use std::fs::File;
8use std::io::{Read, Seek, SeekFrom, Write};
9use std::path::Path;
10
11pub struct NdjsonReverseFileDriver {
18 file: File,
19 pos: u64,
20 chunk_size: usize,
21 max_line_len: usize,
22 carry: Vec<u8>,
23 pending: VecDeque<Vec<u8>>,
24 finished_head: bool,
25 reverse_line_no: u64,
26}
27
28impl NdjsonReverseFileDriver {
29 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, RowError> {
30 Self::with_options(path, super::ndjson::NdjsonOptions::default())
31 }
32
33 pub fn with_chunk_size<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self, RowError> {
34 Self::with_options(
35 path,
36 super::ndjson::NdjsonOptions::default().with_reverse_chunk_size(chunk_size),
37 )
38 }
39
40 pub fn with_options<P: AsRef<Path>>(
41 path: P,
42 options: super::ndjson::NdjsonOptions,
43 ) -> Result<Self, RowError> {
44 let mut file = File::open(path)?;
45 let pos = file.seek(SeekFrom::End(0))?;
46 Ok(Self {
47 file,
48 pos,
49 chunk_size: options.reverse_chunk_size.max(1),
50 max_line_len: options.max_line_len,
51 carry: Vec::new(),
52 pending: VecDeque::new(),
53 finished_head: false,
54 reverse_line_no: 0,
55 })
56 }
57
58 pub fn next_line(&mut self) -> Result<Option<Vec<u8>>, RowError> {
59 Ok(self.next_line_with_reverse_no()?.map(|(_, line)| line))
60 }
61
62 pub fn next_line_with_reverse_no(&mut self) -> Result<Option<(u64, Vec<u8>)>, RowError> {
63 loop {
64 if let Some(line) = self.pending.pop_front() {
65 self.reverse_line_no += 1;
66 return Ok(Some((self.reverse_line_no, line)));
67 }
68
69 if self.pos == 0 {
70 if self.finished_head || self.carry.is_empty() {
71 return Ok(None);
72 }
73 self.finished_head = true;
74 let mut line = std::mem::take(&mut self.carry);
75 trim_line_ending(&mut line);
76 self.check_line_len(line.len())?;
77 if line.iter().any(|b| !b.is_ascii_whitespace()) {
78 self.reverse_line_no += 1;
79 return Ok(Some((self.reverse_line_no, line)));
80 }
81 return Ok(None);
82 }
83
84 let read_len = self.chunk_size.min(self.pos as usize);
85 self.pos -= read_len as u64;
86 let mut chunk = vec![0u8; read_len];
87 self.file.seek(SeekFrom::Start(self.pos))?;
88 self.file.read_exact(&mut chunk)?;
89
90 let mut end = chunk.len();
91 while let Some(nl) = memrchr(b'\n', &chunk[..end]) {
92 let mut line = Vec::with_capacity(end - nl - 1 + self.carry.len());
93 line.extend_from_slice(&chunk[nl + 1..end]);
94 line.extend_from_slice(&self.carry);
95 self.carry.clear();
96 end = nl;
97 trim_line_ending(&mut line);
98 self.check_line_len(line.len())?;
99 if line.iter().any(|b| !b.is_ascii_whitespace()) {
100 self.pending.push_back(line);
101 }
102 }
103
104 if end > 0 {
105 let mut next = Vec::with_capacity(end + self.carry.len());
106 next.extend_from_slice(&chunk[..end]);
107 next.extend_from_slice(&self.carry);
108 self.check_line_len(next.len())?;
109 self.carry = next;
110 }
111 }
112 }
113
114 fn check_line_len(&self, len: usize) -> Result<(), RowError> {
115 if len > self.max_line_len {
116 return Err(RowError::LineTooLarge {
117 line_no: self.reverse_line_no + self.pending.len() as u64 + 1,
118 len,
119 max: self.max_line_len,
120 });
121 }
122 Ok(())
123 }
124}
125
126pub fn collect_ndjson_rev<P>(
127 engine: &JetroEngine,
128 path: P,
129 query: &str,
130) -> Result<Vec<Value>, JetroEngineError>
131where
132 P: AsRef<Path>,
133{
134 collect_ndjson_rev_with_options(engine, path, query, super::ndjson::NdjsonOptions::default())
135}
136
137pub fn collect_ndjson_rev_with_options<P>(
138 engine: &JetroEngine,
139 path: P,
140 query: &str,
141 options: super::ndjson::NdjsonOptions,
142) -> Result<Vec<Value>, JetroEngineError>
143where
144 P: AsRef<Path>,
145{
146 let mut values = Vec::new();
147 drive_rev(engine, path, query, options, |value| {
148 values.push(Value::from(value));
149 Ok(super::ndjson::NdjsonControl::Continue)
150 })?;
151 Ok(values)
152}
153
154pub fn for_each_ndjson_rev<P, F>(
155 engine: &JetroEngine,
156 path: P,
157 query: &str,
158 mut f: F,
159) -> Result<usize, JetroEngineError>
160where
161 P: AsRef<Path>,
162 F: FnMut(Value),
163{
164 for_each_ndjson_rev_with_options(
165 engine,
166 path,
167 query,
168 super::ndjson::NdjsonOptions::default(),
169 |value| {
170 f(value);
171 Ok(super::ndjson::NdjsonControl::Continue)
172 },
173 )
174}
175
176pub fn for_each_ndjson_rev_with_options<P, F>(
177 engine: &JetroEngine,
178 path: P,
179 query: &str,
180 options: super::ndjson::NdjsonOptions,
181 mut f: F,
182) -> Result<usize, JetroEngineError>
183where
184 P: AsRef<Path>,
185 F: FnMut(Value) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
186{
187 drive_rev(engine, path, query, options, |value| f(Value::from(value)))
188}
189
190pub fn collect_ndjson_rev_matches<P>(
191 engine: &JetroEngine,
192 path: P,
193 predicate: &str,
194 limit: usize,
195) -> Result<Vec<Value>, JetroEngineError>
196where
197 P: AsRef<Path>,
198{
199 collect_ndjson_rev_matches_with_options(
200 engine,
201 path,
202 predicate,
203 limit,
204 super::ndjson::NdjsonOptions::default(),
205 )
206}
207
208pub fn collect_ndjson_rev_matches_with_options<P>(
209 engine: &JetroEngine,
210 path: P,
211 predicate: &str,
212 limit: usize,
213 options: super::ndjson::NdjsonOptions,
214) -> Result<Vec<Value>, JetroEngineError>
215where
216 P: AsRef<Path>,
217{
218 let mut values = Vec::with_capacity(limit);
219 drive_rev_matches(engine, path, predicate, limit, options, |value| {
220 values.push(Value::from(value));
221 Ok(super::ndjson::NdjsonControl::Continue)
222 })?;
223 Ok(values)
224}
225
226pub fn run_ndjson_rev<P, W>(
227 engine: &JetroEngine,
228 path: P,
229 query: &str,
230 writer: W,
231) -> Result<usize, JetroEngineError>
232where
233 P: AsRef<Path>,
234 W: Write,
235{
236 run_ndjson_rev_with_options(
237 engine,
238 path,
239 query,
240 writer,
241 super::ndjson::NdjsonOptions::default(),
242 )
243}
244
245pub fn run_ndjson_rev_with_options<P, W>(
246 engine: &JetroEngine,
247 path: P,
248 query: &str,
249 writer: W,
250 options: super::ndjson::NdjsonOptions,
251) -> Result<usize, JetroEngineError>
252where
253 P: AsRef<Path>,
254 W: Write,
255{
256 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
257 let count = drive_rev(engine, path, query, options, |value| {
258 super::ndjson::write_val_line(&mut writer, &value)?;
259 Ok(super::ndjson::NdjsonControl::Continue)
260 })?;
261 writer.flush()?;
262 Ok(count)
263}
264
265pub fn run_ndjson_rev_limit<P, W>(
266 engine: &JetroEngine,
267 path: P,
268 query: &str,
269 limit: usize,
270 writer: W,
271) -> Result<usize, JetroEngineError>
272where
273 P: AsRef<Path>,
274 W: Write,
275{
276 run_ndjson_rev_limit_with_options(
277 engine,
278 path,
279 query,
280 limit,
281 writer,
282 super::ndjson::NdjsonOptions::default(),
283 )
284}
285
286pub fn run_ndjson_rev_limit_with_options<P, W>(
287 engine: &JetroEngine,
288 path: P,
289 query: &str,
290 limit: usize,
291 writer: W,
292 options: super::ndjson::NdjsonOptions,
293) -> Result<usize, JetroEngineError>
294where
295 P: AsRef<Path>,
296 W: Write,
297{
298 if limit == 0 {
299 return Ok(0);
300 }
301
302 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
303 let mut emitted = 0usize;
304 let count = drive_rev(engine, path, query, options, |value| {
305 super::ndjson::write_val_line(&mut writer, &value)?;
306 emitted += 1;
307 Ok(if emitted >= limit {
308 super::ndjson::NdjsonControl::Stop
309 } else {
310 super::ndjson::NdjsonControl::Continue
311 })
312 })?;
313 writer.flush()?;
314 Ok(count)
315}
316
317pub fn run_ndjson_rev_matches<P, W>(
318 engine: &JetroEngine,
319 path: P,
320 predicate: &str,
321 limit: usize,
322 writer: W,
323) -> Result<usize, JetroEngineError>
324where
325 P: AsRef<Path>,
326 W: Write,
327{
328 run_ndjson_rev_matches_with_options(
329 engine,
330 path,
331 predicate,
332 limit,
333 writer,
334 super::ndjson::NdjsonOptions::default(),
335 )
336}
337
338pub fn run_ndjson_rev_matches_with_options<P, W>(
339 engine: &JetroEngine,
340 path: P,
341 predicate: &str,
342 limit: usize,
343 writer: W,
344 options: super::ndjson::NdjsonOptions,
345) -> Result<usize, JetroEngineError>
346where
347 P: AsRef<Path>,
348 W: Write,
349{
350 drive_rev_matches_writer(engine, path, predicate, limit, options, writer)
351}
352
353fn drive_rev<P, F>(
354 engine: &JetroEngine,
355 path: P,
356 query: &str,
357 options: super::ndjson::NdjsonOptions,
358 mut emit: F,
359) -> Result<usize, JetroEngineError>
360where
361 P: AsRef<Path>,
362 F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
363{
364 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
365 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, query);
366 let mut count = 0usize;
367
368 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
369 let out = executor.eval_owned_row(reverse_row_no, row)?;
370 count += 1;
371 if matches!(emit(out)?, super::ndjson::NdjsonControl::Stop) {
372 break;
373 }
374 }
375
376 Ok(count)
377}
378
379fn drive_rev_matches<P, F>(
380 engine: &JetroEngine,
381 path: P,
382 predicate: &str,
383 limit: usize,
384 options: super::ndjson::NdjsonOptions,
385 mut emit: F,
386) -> Result<usize, JetroEngineError>
387where
388 P: AsRef<Path>,
389 F: FnMut(crate::data::value::Val) -> Result<super::ndjson::NdjsonControl, JetroEngineError>,
390{
391 if limit == 0 {
392 return Ok(0);
393 }
394
395 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
396 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
397 let mut emitted = 0usize;
398
399 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
400 let document = executor.parse_owned_row(reverse_row_no, row)?;
401 let matched = executor.eval_document(reverse_row_no, &document)?;
402 if !is_truthy(&matched) {
403 continue;
404 }
405
406 let root = document
407 .root_val_with(executor.engine().keys())
408 .map_err(|err| super::ndjson::row_eval_error(reverse_row_no, err))?;
409 emitted += 1;
410 if matches!(emit(root)?, super::ndjson::NdjsonControl::Stop) || emitted >= limit {
411 break;
412 }
413 }
414
415 Ok(emitted)
416}
417
418fn drive_rev_matches_writer<P, W>(
419 engine: &JetroEngine,
420 path: P,
421 predicate: &str,
422 limit: usize,
423 options: super::ndjson::NdjsonOptions,
424 writer: W,
425) -> Result<usize, JetroEngineError>
426where
427 P: AsRef<Path>,
428 W: Write,
429{
430 if limit == 0 {
431 return Ok(0);
432 }
433
434 let mut driver = NdjsonReverseFileDriver::with_options(path, options)?;
435 let mut executor = super::ndjson::NdjsonRowExecutor::new(engine, predicate);
436 let mut writer = super::ndjson::ndjson_writer_with_options(writer, options);
437 let mut emitted = 0usize;
438
439 while let Some((reverse_row_no, row)) = driver.next_line_with_reverse_no()? {
440 let document = executor.parse_owned_row(reverse_row_no, row)?;
441 let matched = executor.eval_document(reverse_row_no, &document)?;
442 if !is_truthy(&matched) {
443 continue;
444 }
445
446 super::ndjson::write_document_line(
447 &mut writer,
448 &document,
449 reverse_row_no,
450 executor.engine(),
451 )?;
452 emitted += 1;
453 if emitted >= limit {
454 break;
455 }
456 }
457
458 writer.flush()?;
459 Ok(emitted)
460}
461
462fn trim_line_ending(buf: &mut Vec<u8>) {
463 while matches!(buf.last(), Some(b'\n' | b'\r')) {
464 buf.pop();
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::NdjsonReverseFileDriver;
471 use std::path::PathBuf;
472
473 #[test]
474 fn reverse_driver_reads_rows_from_tail() {
475 let path = temp_path("jetro-ndjson-rev-basic");
476 std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n{\"n\":3}\n").unwrap();
477 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 8).unwrap();
478
479 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":3}"#);
480 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
481 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
482 assert!(driver.next_line().unwrap().is_none());
483
484 let _ = std::fs::remove_file(path);
485 }
486
487 #[test]
488 fn reverse_driver_handles_missing_final_newline_and_blank_lines() {
489 let path = temp_path("jetro-ndjson-rev-edge");
490 std::fs::write(&path, b"\n{\"n\":1}\r\n\n{\"n\":2}").unwrap();
491 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 5).unwrap();
492
493 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":2}"#);
494 assert_eq!(driver.next_line().unwrap().unwrap(), br#"{"n":1}"#);
495 assert!(driver.next_line().unwrap().is_none());
496
497 let _ = std::fs::remove_file(path);
498 }
499
500 #[test]
501 fn reverse_driver_reports_reverse_row_numbers() {
502 let path = temp_path("jetro-ndjson-rev-row-no");
503 std::fs::write(&path, b"{\"n\":1}\n{\"n\":2}\n").unwrap();
504 let mut driver = NdjsonReverseFileDriver::with_chunk_size(&path, 3).unwrap();
505
506 assert_eq!(
507 driver.next_line_with_reverse_no().unwrap().unwrap(),
508 (1, br#"{"n":2}"#.to_vec())
509 );
510 assert_eq!(
511 driver.next_line_with_reverse_no().unwrap().unwrap(),
512 (2, br#"{"n":1}"#.to_vec())
513 );
514 assert!(driver.next_line_with_reverse_no().unwrap().is_none());
515
516 let _ = std::fs::remove_file(path);
517 }
518
519 fn temp_path(name: &str) -> PathBuf {
520 let mut path = std::env::temp_dir();
521 path.push(format!("{}-{}.ndjson", name, std::process::id()));
522 path
523 }
524}