1use super::*;
2use std::collections::{HashMap, HashSet};
3
4pub struct DirectoryReader {
5 files: Vec<FileReader>,
6 index: usize,
7 pending_realtime_seek: Option<u64>,
8 realtime_seek_bound: Option<(u64, Direction)>,
9 candidates: Vec<Option<DirectoryCandidate>>,
10 current_key: Option<DirectoryEntryKey>,
11 direction: Option<Direction>,
12 boot_newest: HashMap<[u8; 16], DirectoryBootNewest>,
13 pub(super) non_overlapping: bool,
14}
15
16#[derive(Debug, Clone, Copy)]
17struct DirectoryCandidate {
18 reader_index: usize,
19 key: DirectoryEntryKey,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub(super) struct DirectoryEntryKey {
24 pub(super) seqnum_id: [u8; 16],
25 pub(super) seqnum: u64,
26 pub(super) boot_id: [u8; 16],
27 pub(super) monotonic: u64,
28 pub(super) realtime: u64,
29 pub(super) xor_hash: u64,
30}
31
32#[derive(Debug, Clone, Copy)]
33struct DirectoryBootNewest {
34 machine_id: [u8; 16],
35 monotonic: u64,
36 realtime: u64,
37}
38
39impl DirectoryReader {
40 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
41 Self::open_with_options(path, ReaderOptions::default())
42 }
43
44 pub fn open_with_options(path: impl AsRef<Path>, options: ReaderOptions) -> Result<Self> {
45 let path = path.as_ref();
46 if !path.is_dir() {
47 return Err(SdkError::InvalidPath(format!(
48 "not a directory: {}",
49 path.display()
50 )));
51 }
52
53 let mut files = Vec::new();
54 for file_path in collect_journal_files(path)? {
55 if let Ok(reader) = FileReader::open_with_options(&file_path, options) {
56 files.push(reader);
57 }
58 }
59
60 Self::from_readers(files, true)
61 }
62
63 pub fn open_files<I, P>(paths: I) -> Result<Self>
64 where
65 I: IntoIterator<Item = P>,
66 P: AsRef<Path>,
67 {
68 Self::open_files_with_options(paths, ReaderOptions::default())
69 }
70
71 pub fn open_files_with_options<I, P>(paths: I, options: ReaderOptions) -> Result<Self>
72 where
73 I: IntoIterator<Item = P>,
74 P: AsRef<Path>,
75 {
76 let mut files = Vec::new();
77 for path in paths {
78 let path = path.as_ref();
79 if !path.is_file() || !is_journal_file_name(path) {
80 return Err(SdkError::InvalidPath(format!(
81 "not a journal file: {}",
82 path.display()
83 )));
84 }
85 files.push(FileReader::open_with_options(path, options)?);
86 }
87
88 Self::from_readers(files, false)
89 }
90
91 fn from_readers(mut files: Vec<FileReader>, allow_empty: bool) -> Result<Self> {
92 if files.is_empty() && !allow_empty {
93 return Err(SdkError::InvalidPath(
94 "no readable journal files".to_string(),
95 ));
96 }
97
98 files.sort_by_key(FileReader::header_realtime_start);
99 let boot_newest = build_directory_boot_newest(&files);
100 let non_overlapping = directory_files_non_overlapping(&files);
101 let candidates = vec![None; files.len()];
102 Ok(Self {
103 files,
104 index: usize::MAX,
105 pending_realtime_seek: None,
106 realtime_seek_bound: None,
107 candidates,
108 current_key: None,
109 direction: None,
110 boot_newest,
111 non_overlapping,
112 })
113 }
114
115 pub fn seek_head(&mut self) {
116 self.pending_realtime_seek = None;
117 self.realtime_seek_bound = None;
118 self.index = usize::MAX;
119 self.current_key = None;
120 self.direction = None;
121 self.reset_candidates();
122 for reader in &mut self.files {
123 reader.seek_head();
124 }
125 }
126
127 pub fn seek_tail(&mut self) {
128 self.pending_realtime_seek = None;
129 self.realtime_seek_bound = None;
130 self.index = usize::MAX;
131 self.current_key = None;
132 self.direction = None;
133 self.reset_candidates();
134 for reader in &mut self.files {
135 reader.seek_tail();
136 }
137 }
138
139 pub fn seek_realtime(&mut self, usec: u64) {
140 self.pending_realtime_seek = Some(usec);
141 self.realtime_seek_bound = None;
142 self.index = usize::MAX;
143 self.current_key = None;
144 self.direction = None;
145 self.reset_candidates();
146 }
147
148 pub fn next(&mut self) -> Result<bool> {
149 self.step_merged(Direction::Forward)
150 }
151
152 pub fn previous(&mut self) -> Result<bool> {
153 self.step_merged(Direction::Backward)
154 }
155
156 fn step_merged(&mut self, direction: Direction) -> Result<bool> {
157 if self.can_step_sequential(direction) {
158 return self.step_sequential(direction);
159 }
160
161 self.prepare_merge_direction(direction);
162
163 let mut best: Option<DirectoryCandidate> = None;
164 for idx in 0..self.files.len() {
165 self.fill_candidate(idx, direction)?;
166 let Some(candidate) = self.candidates[idx] else {
167 continue;
168 };
169 let replace = match best {
170 None => true,
171 Some(current) => {
172 let cmp = self.compare_entry_keys(candidate.key, current.key);
173 (direction == Direction::Forward && cmp < 0)
174 || (direction == Direction::Backward && cmp > 0)
175 }
176 };
177 if replace {
178 best = Some(candidate);
179 }
180 }
181
182 let Some(best) = best else {
183 self.index = usize::MAX;
184 self.realtime_seek_bound = None;
185 return Ok(false);
186 };
187
188 self.index = best.reader_index;
189 self.current_key = Some(best.key);
190 self.candidates[best.reader_index] = None;
191 self.realtime_seek_bound = None;
192 Ok(true)
193 }
194
195 fn prepare_merge_direction(&mut self, direction: Direction) {
196 if let Some(usec) = self.pending_realtime_seek.take() {
197 for reader in &mut self.files {
198 reader.seek_realtime(usec);
199 }
200 self.reset_candidates();
201 self.realtime_seek_bound = Some((usec, direction));
202 self.direction = Some(direction);
203 return;
204 }
205
206 if self.direction == Some(direction) {
207 return;
208 }
209
210 if let Some(current) = self.current_key {
211 for reader in &mut self.files {
212 reader.seek_realtime(current.realtime);
213 }
214 } else if direction == Direction::Forward {
215 for reader in &mut self.files {
216 reader.seek_head();
217 }
218 } else {
219 for reader in &mut self.files {
220 reader.seek_tail();
221 }
222 }
223
224 self.reset_candidates();
225 self.direction = Some(direction);
226 }
227
228 fn fill_candidate(&mut self, reader_index: usize, direction: Direction) -> Result<()> {
229 if self.candidates[reader_index].is_some() {
230 return Ok(());
231 }
232
233 loop {
234 if !self.advance_candidate_reader(reader_index, direction)? {
235 return Ok(());
236 }
237 let key = self.files[reader_index].current_directory_entry_key()?;
238 if !self.candidate_matches_realtime_bound(key) {
239 continue;
240 }
241 if !self.candidate_is_after_current(key, direction) {
242 continue;
243 }
244
245 self.candidates[reader_index] = Some(DirectoryCandidate { reader_index, key });
246 return Ok(());
247 }
248 }
249
250 fn advance_candidate_reader(
251 &mut self,
252 reader_index: usize,
253 direction: Direction,
254 ) -> Result<bool> {
255 match direction {
256 Direction::Forward => self.files[reader_index].next(),
257 Direction::Backward => self.files[reader_index].previous(),
258 }
259 }
260
261 fn candidate_matches_realtime_bound(&self, key: DirectoryEntryKey) -> bool {
262 let Some((usec, seek_direction)) = self.realtime_seek_bound else {
263 return true;
264 };
265 match seek_direction {
266 Direction::Forward => key.realtime >= usec,
267 Direction::Backward => key.realtime <= usec,
268 }
269 }
270
271 fn candidate_is_after_current(&self, key: DirectoryEntryKey, direction: Direction) -> bool {
272 let Some(current) = self.current_key else {
273 return true;
274 };
275 let cmp = self.compare_entry_keys(key, current);
276 match direction {
277 Direction::Forward => cmp > 0,
278 Direction::Backward => cmp < 0,
279 }
280 }
281
282 fn compare_entry_keys(&self, a: DirectoryEntryKey, b: DirectoryEntryKey) -> i8 {
283 if a == b {
284 return 0;
285 }
286
287 if a.seqnum_id == b.seqnum_id {
288 let cmp = cmp_u64(a.seqnum, b.seqnum);
289 if cmp != 0 {
290 return cmp;
291 }
292 }
293
294 if a.boot_id == b.boot_id {
295 let cmp = cmp_u64(a.monotonic, b.monotonic);
296 if cmp != 0 {
297 return cmp;
298 }
299 } else {
300 let cmp = self.compare_boot_ids(a.boot_id, b.boot_id);
301 if cmp != 0 {
302 return cmp;
303 }
304 }
305
306 let cmp = cmp_u64(a.realtime, b.realtime);
307 if cmp != 0 {
308 return cmp;
309 }
310 cmp_u64(a.xor_hash, b.xor_hash)
311 }
312
313 fn compare_boot_ids(&self, a: [u8; 16], b: [u8; 16]) -> i8 {
314 let Some(a_newest) = self.boot_newest.get(&a) else {
315 return 0;
316 };
317 let Some(b_newest) = self.boot_newest.get(&b) else {
318 return 0;
319 };
320 if a_newest.machine_id != b_newest.machine_id {
321 return 0;
322 }
323 cmp_u64(a_newest.realtime, b_newest.realtime)
324 }
325
326 fn reset_candidates(&mut self) {
327 if self.candidates.len() != self.files.len() {
328 self.candidates = vec![None; self.files.len()];
329 return;
330 }
331 for candidate in &mut self.candidates {
332 *candidate = None;
333 }
334 }
335
336 pub fn get_entry(&mut self) -> Result<Entry> {
337 if self.index >= self.files.len() {
338 return Err(SdkError::NoEntry);
339 }
340 self.files[self.index].get_entry()
341 }
342
343 pub fn visit_entry_payloads<F>(&mut self, visitor: F) -> Result<()>
344 where
345 F: FnMut(&[u8]) -> Result<()>,
346 {
347 if self.index >= self.files.len() {
348 return Err(SdkError::NoEntry);
349 }
350 self.files[self.index].visit_entry_payloads(visitor)
351 }
352
353 pub fn clear_entry_data_state(&mut self) {
354 if self.index < self.files.len() {
355 self.files[self.index].clear_entry_data_state();
356 }
357 }
358
359 pub fn entry_data_restart(&mut self) -> Result<()> {
360 if self.index >= self.files.len() {
361 return Err(SdkError::NoEntry);
362 }
363 self.files[self.index].entry_data_restart()
364 }
365
366 pub fn enumerate_entry_payload(&mut self) -> Result<Option<&[u8]>> {
367 if self.index >= self.files.len() {
368 return Err(SdkError::NoEntry);
369 }
370 self.files[self.index].enumerate_entry_payload()
371 }
372
373 pub fn collect_entry_payloads(&mut self, payloads: &mut Vec<Vec<u8>>) -> Result<()> {
374 if self.index >= self.files.len() {
375 return Err(SdkError::NoEntry);
376 }
377 self.files[self.index].collect_entry_payloads(payloads)
378 }
379
380 pub fn get_entry_payload(&mut self, field: &[u8]) -> Result<Option<Vec<u8>>> {
381 if self.index >= self.files.len() {
382 return Err(SdkError::NoEntry);
383 }
384 self.files[self.index].get_entry_payload(field)
385 }
386
387 pub fn get_realtime_usec(&self) -> Result<u64> {
388 if self.index >= self.files.len() {
389 return Err(SdkError::NoEntry);
390 }
391 self.files[self.index].get_realtime_usec()
392 }
393
394 pub fn get_seqnum(&self) -> Result<(u64, [u8; 16])> {
395 if self.index >= self.files.len() {
396 return Err(SdkError::NoEntry);
397 }
398 if let Some(key) = self.current_key {
399 return Ok((key.seqnum, key.seqnum_id));
400 }
401 self.files[self.index].get_seqnum()
402 }
403
404 pub fn get_monotonic_usec(&self) -> Result<(u64, [u8; 16])> {
405 if self.index >= self.files.len() {
406 return Err(SdkError::NoEntry);
407 }
408 if let Some(key) = self.current_key {
409 return Ok((key.monotonic, key.boot_id));
410 }
411 self.files[self.index].get_monotonic_usec()
412 }
413
414 pub fn get_cursor(&self) -> Result<String> {
415 if self.index >= self.files.len() {
416 return Err(SdkError::NoEntry);
417 }
418 self.files[self.index].get_cursor()
419 }
420
421 pub fn test_cursor(&self, cursor: &str) -> Result<bool> {
422 if self.index >= self.files.len() {
423 return Ok(false);
424 }
425 self.files[self.index].test_cursor(cursor)
426 }
427
428 pub fn seek_cursor(&mut self, cursor: &str) -> Result<()> {
429 let want = parse_cursor(cursor).map_err(|err| SdkError::InvalidCursor(err.to_string()))?;
430 let realtime = want.2;
431 self.seek_realtime(realtime);
432 while self.next()? {
433 let current_cursor = self.get_cursor()?;
434 let got = parse_cursor(¤t_cursor)
435 .map_err(|err| SdkError::InvalidCursor(err.to_string()))?;
436 if got.2 > realtime {
437 return Ok(());
438 }
439 if got == want {
440 return Ok(());
441 }
442 }
443 Ok(())
444 }
445
446 pub fn enumerate_fields(&mut self) -> Result<Vec<String>> {
447 let mut fields = HashSet::new();
448 for reader in &mut self.files {
449 for field in reader.enumerate_fields()? {
450 fields.insert(field);
451 }
452 }
453 let mut out: Vec<_> = fields.into_iter().collect();
454 out.sort();
455 Ok(out)
456 }
457
458 pub fn query_unique(&mut self, field_name: &str) -> Result<Vec<Vec<u8>>> {
459 let mut out = Vec::new();
460 self.visit_unique_values(field_name, |value| {
461 out.push(value.to_vec());
462 Ok(())
463 })?;
464 Ok(out)
465 }
466
467 pub fn visit_unique_values<F>(&mut self, field_name: &str, mut visitor: F) -> Result<()>
468 where
469 F: FnMut(&[u8]) -> Result<()>,
470 {
471 if self.files.len() == 1 {
472 return self.files[0].visit_unique_values(field_name, visitor);
473 }
474
475 let mut seen = HashSet::new();
476 for reader in &mut self.files {
477 reader.visit_unique_values(field_name, |value| {
478 if seen.insert(value.to_vec()) {
479 visitor(value)?;
480 }
481 Ok(())
482 })?;
483 }
484 Ok(())
485 }
486
487 pub fn list_boots(&self) -> Vec<BootInfo> {
488 let mut boots: HashMap<String, (i64, i64)> = HashMap::new();
489 for reader in &self.files {
490 let header = reader.cached_header().header;
491 let boot_id = hex::encode(header.tail_entry_boot_id);
492 let first = header.head_entry_realtime as i64;
493 let last = header.tail_entry_realtime as i64;
494 boots
495 .entry(boot_id)
496 .and_modify(|range| {
497 range.0 = range.0.min(first);
498 range.1 = range.1.max(last);
499 })
500 .or_insert((first, last));
501 }
502
503 let mut out: Vec<_> = boots
504 .into_iter()
505 .map(|(boot_id, (first_entry, last_entry))| BootInfo {
506 index: 0,
507 boot_id,
508 first_entry,
509 last_entry,
510 })
511 .collect();
512 out.sort_by_key(|boot| boot.first_entry);
513 let base = 1 - out.len() as i64;
514 for (idx, boot) in out.iter_mut().enumerate() {
515 boot.index = base + idx as i64;
516 }
517 out
518 }
519
520 pub fn add_match(&mut self, data: &[u8]) {
521 for reader in &mut self.files {
522 reader.add_match(data);
523 }
524 self.reset_merge_state();
525 }
526
527 pub fn add_conjunction(&mut self) -> Result<()> {
528 for reader in &mut self.files {
529 reader.add_conjunction()?;
530 }
531 self.reset_merge_state();
532 Ok(())
533 }
534
535 pub fn add_disjunction(&mut self) -> Result<()> {
536 for reader in &mut self.files {
537 reader.add_disjunction()?;
538 }
539 self.reset_merge_state();
540 Ok(())
541 }
542
543 pub fn flush_matches(&mut self) {
544 for reader in &mut self.files {
545 reader.flush_matches();
546 }
547 self.reset_merge_state();
548 }
549
550 fn reset_merge_state(&mut self) {
551 self.index = usize::MAX;
552 self.current_key = None;
553 self.direction = None;
554 self.realtime_seek_bound = None;
555 self.reset_candidates();
556 }
557
558 fn can_step_sequential(&self, direction: Direction) -> bool {
559 if !self.non_overlapping || self.pending_realtime_seek.is_some() {
560 return false;
561 }
562 if self.direction.is_some_and(|current| current != direction) && self.current_key.is_some()
563 {
564 return false;
565 }
566 true
567 }
568
569 fn step_sequential(&mut self, direction: Direction) -> Result<bool> {
570 if self.files.is_empty() {
571 self.clear_current_directory_entry();
572 return Ok(false);
573 }
574
575 if self.direction != Some(direction) {
576 self.reset_sequential_direction(direction);
577 }
578
579 match direction {
580 Direction::Forward => self.step_sequential_forward(),
581 Direction::Backward => self.step_sequential_backward(),
582 }
583 }
584
585 fn reset_sequential_direction(&mut self, direction: Direction) {
586 match direction {
587 Direction::Forward => {
588 for reader in &mut self.files {
589 reader.seek_head();
590 }
591 self.index = 0;
592 }
593 Direction::Backward => {
594 for reader in &mut self.files {
595 reader.seek_tail();
596 }
597 self.index = self.files.len() - 1;
598 }
599 }
600 self.reset_candidates();
601 self.current_key = None;
602 self.realtime_seek_bound = None;
603 self.direction = Some(direction);
604 }
605
606 fn step_sequential_forward(&mut self) -> Result<bool> {
607 if self.index == usize::MAX {
608 self.index = 0;
609 }
610 while self.index < self.files.len() {
611 if self.files[self.index].next()? {
612 self.current_key = Some(self.files[self.index].current_directory_entry_key()?);
613 return Ok(true);
614 }
615 self.index += 1;
616 }
617 self.finish_sequential_end()
618 }
619
620 fn step_sequential_backward(&mut self) -> Result<bool> {
621 if self.index >= self.files.len() {
622 self.index = self.files.len() - 1;
623 }
624 loop {
625 if self.files[self.index].previous()? {
626 self.current_key = Some(self.files[self.index].current_directory_entry_key()?);
627 return Ok(true);
628 }
629 if self.index == 0 {
630 break;
631 }
632 self.index -= 1;
633 }
634 self.finish_sequential_end()
635 }
636
637 fn finish_sequential_end(&mut self) -> Result<bool> {
638 self.clear_current_directory_entry();
639 Ok(false)
640 }
641
642 fn clear_current_directory_entry(&mut self) {
643 self.index = usize::MAX;
644 self.current_key = None;
645 }
646}
647
648pub(super) fn is_journal_file_name(path: &Path) -> bool {
649 path.file_name()
650 .and_then(|name| name.to_str())
651 .is_some_and(|name| {
652 name.ends_with(".journal")
653 || name.ends_with(".journal~")
654 || name.ends_with(".journal.zst")
655 || name.ends_with(".journal~.zst")
656 })
657}
658
659fn collect_journal_files(path: &Path) -> Result<Vec<PathBuf>> {
660 let entries: Vec<_> = std::fs::read_dir(path)?.collect::<std::io::Result<Vec<_>>>()?;
661 let mut files = Vec::new();
662
663 for entry in &entries {
664 let file_path = entry.path();
665 if file_path.is_file() && is_journal_file_name(&file_path) {
666 files.push(file_path);
667 }
668 }
669
670 for entry in &entries {
671 let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
672 continue;
673 };
674 if !is_journal_subdir_name(&name) {
675 continue;
676 }
677 let child_path = entry.path();
678 if !child_path.is_dir() {
679 continue;
680 }
681 let Ok(children) = std::fs::read_dir(&child_path) else {
682 continue;
683 };
684 for child in children.flatten() {
685 let file_path = child.path();
686 if file_path.is_file() && is_journal_file_name(&file_path) {
687 files.push(file_path);
688 }
689 }
690 }
691
692 files.sort();
693 Ok(files)
694}
695
696fn is_journal_subdir_name(name: &str) -> bool {
697 if name.contains('.') {
698 return false;
699 }
700 id128_string_valid(name)
701}
702
703fn id128_string_valid(s: &str) -> bool {
704 match s.len() {
705 32 => s.bytes().all(|byte| byte.is_ascii_hexdigit()),
706 36 => s.bytes().enumerate().all(|(idx, byte)| {
707 if matches!(idx, 8 | 13 | 18 | 23) {
708 byte == b'-'
709 } else {
710 byte.is_ascii_hexdigit()
711 }
712 }),
713 _ => false,
714 }
715}
716
717fn build_directory_boot_newest(files: &[FileReader]) -> HashMap<[u8; 16], DirectoryBootNewest> {
718 let mut newest: HashMap<[u8; 16], DirectoryBootNewest> = HashMap::new();
719 for reader in files {
720 let header = reader.cached_header();
721 if header.header.tail_entry_boot_id == [0; 16] {
722 continue;
723 }
724 let replace = match newest.get(&header.header.tail_entry_boot_id) {
725 None => true,
726 Some(current) => header.tail_entry_monotonic > current.monotonic,
727 };
728 if replace {
729 newest.insert(
730 header.header.tail_entry_boot_id,
731 DirectoryBootNewest {
732 machine_id: header.machine_id,
733 monotonic: header.tail_entry_monotonic,
734 realtime: header.header.tail_entry_realtime,
735 },
736 );
737 }
738 }
739 newest
740}
741
742fn directory_files_non_overlapping(files: &[FileReader]) -> bool {
743 if files.is_empty() {
744 return false;
745 }
746
747 for pair in files.windows(2) {
748 let previous = pair[0].cached_header().header;
749 let next = pair[1].cached_header().header;
750 if previous.seqnum_id != next.seqnum_id
751 || previous.tail_entry_seqnum == 0
752 || next.head_entry_seqnum == 0
753 || previous.tail_entry_seqnum >= next.head_entry_seqnum
754 || previous.tail_entry_realtime == 0
755 || next.head_entry_realtime == 0
756 || previous.tail_entry_realtime >= next.head_entry_realtime
757 {
758 return false;
759 }
760 }
761
762 true
763}
764
765fn cmp_u64(a: u64, b: u64) -> i8 {
766 match a.cmp(&b) {
767 std::cmp::Ordering::Less => -1,
768 std::cmp::Ordering::Equal => 0,
769 std::cmp::Ordering::Greater => 1,
770 }
771}