Skip to main content

journal/
directory.rs

1use super::*;
2use std::collections::{HashMap, HashSet};
3
4pub struct DirectoryReader {
5    files: Vec<FileReader>,
6    index: usize,
7    pending_realtime_seek: Option<u64>,
8    realtime_seek_bound: Option<(u64, Direction)>,
9    candidates: Vec<Option<DirectoryCandidate>>,
10    current_key: Option<DirectoryEntryKey>,
11    direction: Option<Direction>,
12    boot_newest: HashMap<[u8; 16], DirectoryBootNewest>,
13    pub(super) non_overlapping: bool,
14}
15
16#[derive(Debug, Clone, Copy)]
17struct DirectoryCandidate {
18    reader_index: usize,
19    key: DirectoryEntryKey,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub(super) struct DirectoryEntryKey {
24    pub(super) seqnum_id: [u8; 16],
25    pub(super) seqnum: u64,
26    pub(super) boot_id: [u8; 16],
27    pub(super) monotonic: u64,
28    pub(super) realtime: u64,
29    pub(super) xor_hash: u64,
30}
31
32#[derive(Debug, Clone, Copy)]
33struct DirectoryBootNewest {
34    machine_id: [u8; 16],
35    monotonic: u64,
36    realtime: u64,
37}
38
39impl DirectoryReader {
40    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
41        Self::open_with_options(path, ReaderOptions::default())
42    }
43
44    pub fn open_with_options(path: impl AsRef<Path>, options: ReaderOptions) -> Result<Self> {
45        let path = path.as_ref();
46        if !path.is_dir() {
47            return Err(SdkError::InvalidPath(format!(
48                "not a directory: {}",
49                path.display()
50            )));
51        }
52
53        let mut files = Vec::new();
54        for file_path in collect_journal_files(path)? {
55            if let Ok(reader) = FileReader::open_with_options(&file_path, options) {
56                files.push(reader);
57            }
58        }
59
60        Self::from_readers(files, true)
61    }
62
63    pub fn open_files<I, P>(paths: I) -> Result<Self>
64    where
65        I: IntoIterator<Item = P>,
66        P: AsRef<Path>,
67    {
68        Self::open_files_with_options(paths, ReaderOptions::default())
69    }
70
71    pub fn open_files_with_options<I, P>(paths: I, options: ReaderOptions) -> Result<Self>
72    where
73        I: IntoIterator<Item = P>,
74        P: AsRef<Path>,
75    {
76        let mut files = Vec::new();
77        for path in paths {
78            let path = path.as_ref();
79            if !path.is_file() || !is_journal_file_name(path) {
80                return Err(SdkError::InvalidPath(format!(
81                    "not a journal file: {}",
82                    path.display()
83                )));
84            }
85            files.push(FileReader::open_with_options(path, options)?);
86        }
87
88        Self::from_readers(files, false)
89    }
90
91    fn from_readers(mut files: Vec<FileReader>, allow_empty: bool) -> Result<Self> {
92        if files.is_empty() && !allow_empty {
93            return Err(SdkError::InvalidPath(
94                "no readable journal files".to_string(),
95            ));
96        }
97
98        files.sort_by_key(FileReader::header_realtime_start);
99        let boot_newest = build_directory_boot_newest(&files);
100        let non_overlapping = directory_files_non_overlapping(&files);
101        let candidates = vec![None; files.len()];
102        Ok(Self {
103            files,
104            index: usize::MAX,
105            pending_realtime_seek: None,
106            realtime_seek_bound: None,
107            candidates,
108            current_key: None,
109            direction: None,
110            boot_newest,
111            non_overlapping,
112        })
113    }
114
115    pub fn seek_head(&mut self) {
116        self.pending_realtime_seek = None;
117        self.realtime_seek_bound = None;
118        self.index = usize::MAX;
119        self.current_key = None;
120        self.direction = None;
121        self.reset_candidates();
122        for reader in &mut self.files {
123            reader.seek_head();
124        }
125    }
126
127    pub fn seek_tail(&mut self) {
128        self.pending_realtime_seek = None;
129        self.realtime_seek_bound = None;
130        self.index = usize::MAX;
131        self.current_key = None;
132        self.direction = None;
133        self.reset_candidates();
134        for reader in &mut self.files {
135            reader.seek_tail();
136        }
137    }
138
139    pub fn seek_realtime(&mut self, usec: u64) {
140        self.pending_realtime_seek = Some(usec);
141        self.realtime_seek_bound = None;
142        self.index = usize::MAX;
143        self.current_key = None;
144        self.direction = None;
145        self.reset_candidates();
146    }
147
148    pub fn next(&mut self) -> Result<bool> {
149        self.step_merged(Direction::Forward)
150    }
151
152    pub fn previous(&mut self) -> Result<bool> {
153        self.step_merged(Direction::Backward)
154    }
155
156    fn step_merged(&mut self, direction: Direction) -> Result<bool> {
157        if self.can_step_sequential(direction) {
158            return self.step_sequential(direction);
159        }
160
161        self.prepare_merge_direction(direction);
162
163        let mut best: Option<DirectoryCandidate> = None;
164        for idx in 0..self.files.len() {
165            self.fill_candidate(idx, direction)?;
166            let Some(candidate) = self.candidates[idx] else {
167                continue;
168            };
169            let replace = match best {
170                None => true,
171                Some(current) => {
172                    let cmp = self.compare_entry_keys(candidate.key, current.key);
173                    (direction == Direction::Forward && cmp < 0)
174                        || (direction == Direction::Backward && cmp > 0)
175                }
176            };
177            if replace {
178                best = Some(candidate);
179            }
180        }
181
182        let Some(best) = best else {
183            self.index = usize::MAX;
184            self.realtime_seek_bound = None;
185            return Ok(false);
186        };
187
188        self.index = best.reader_index;
189        self.current_key = Some(best.key);
190        self.candidates[best.reader_index] = None;
191        self.realtime_seek_bound = None;
192        Ok(true)
193    }
194
195    fn prepare_merge_direction(&mut self, direction: Direction) {
196        if let Some(usec) = self.pending_realtime_seek.take() {
197            for reader in &mut self.files {
198                reader.seek_realtime(usec);
199            }
200            self.reset_candidates();
201            self.realtime_seek_bound = Some((usec, direction));
202            self.direction = Some(direction);
203            return;
204        }
205
206        if self.direction == Some(direction) {
207            return;
208        }
209
210        if let Some(current) = self.current_key {
211            for reader in &mut self.files {
212                reader.seek_realtime(current.realtime);
213            }
214        } else if direction == Direction::Forward {
215            for reader in &mut self.files {
216                reader.seek_head();
217            }
218        } else {
219            for reader in &mut self.files {
220                reader.seek_tail();
221            }
222        }
223
224        self.reset_candidates();
225        self.direction = Some(direction);
226    }
227
228    fn fill_candidate(&mut self, reader_index: usize, direction: Direction) -> Result<()> {
229        if self.candidates[reader_index].is_some() {
230            return Ok(());
231        }
232
233        loop {
234            if !self.advance_candidate_reader(reader_index, direction)? {
235                return Ok(());
236            }
237            let key = self.files[reader_index].current_directory_entry_key()?;
238            if !self.candidate_matches_realtime_bound(key) {
239                continue;
240            }
241            if !self.candidate_is_after_current(key, direction) {
242                continue;
243            }
244
245            self.candidates[reader_index] = Some(DirectoryCandidate { reader_index, key });
246            return Ok(());
247        }
248    }
249
250    fn advance_candidate_reader(
251        &mut self,
252        reader_index: usize,
253        direction: Direction,
254    ) -> Result<bool> {
255        match direction {
256            Direction::Forward => self.files[reader_index].next(),
257            Direction::Backward => self.files[reader_index].previous(),
258        }
259    }
260
261    fn candidate_matches_realtime_bound(&self, key: DirectoryEntryKey) -> bool {
262        let Some((usec, seek_direction)) = self.realtime_seek_bound else {
263            return true;
264        };
265        match seek_direction {
266            Direction::Forward => key.realtime >= usec,
267            Direction::Backward => key.realtime <= usec,
268        }
269    }
270
271    fn candidate_is_after_current(&self, key: DirectoryEntryKey, direction: Direction) -> bool {
272        let Some(current) = self.current_key else {
273            return true;
274        };
275        let cmp = self.compare_entry_keys(key, current);
276        match direction {
277            Direction::Forward => cmp > 0,
278            Direction::Backward => cmp < 0,
279        }
280    }
281
282    fn compare_entry_keys(&self, a: DirectoryEntryKey, b: DirectoryEntryKey) -> i8 {
283        if a == b {
284            return 0;
285        }
286
287        if a.seqnum_id == b.seqnum_id {
288            let cmp = cmp_u64(a.seqnum, b.seqnum);
289            if cmp != 0 {
290                return cmp;
291            }
292        }
293
294        if a.boot_id == b.boot_id {
295            let cmp = cmp_u64(a.monotonic, b.monotonic);
296            if cmp != 0 {
297                return cmp;
298            }
299        } else {
300            let cmp = self.compare_boot_ids(a.boot_id, b.boot_id);
301            if cmp != 0 {
302                return cmp;
303            }
304        }
305
306        let cmp = cmp_u64(a.realtime, b.realtime);
307        if cmp != 0 {
308            return cmp;
309        }
310        cmp_u64(a.xor_hash, b.xor_hash)
311    }
312
313    fn compare_boot_ids(&self, a: [u8; 16], b: [u8; 16]) -> i8 {
314        let Some(a_newest) = self.boot_newest.get(&a) else {
315            return 0;
316        };
317        let Some(b_newest) = self.boot_newest.get(&b) else {
318            return 0;
319        };
320        if a_newest.machine_id != b_newest.machine_id {
321            return 0;
322        }
323        cmp_u64(a_newest.realtime, b_newest.realtime)
324    }
325
326    fn reset_candidates(&mut self) {
327        if self.candidates.len() != self.files.len() {
328            self.candidates = vec![None; self.files.len()];
329            return;
330        }
331        for candidate in &mut self.candidates {
332            *candidate = None;
333        }
334    }
335
336    pub fn get_entry(&mut self) -> Result<Entry> {
337        if self.index >= self.files.len() {
338            return Err(SdkError::NoEntry);
339        }
340        self.files[self.index].get_entry()
341    }
342
343    pub fn visit_entry_payloads<F>(&mut self, visitor: F) -> Result<()>
344    where
345        F: FnMut(&[u8]) -> Result<()>,
346    {
347        if self.index >= self.files.len() {
348            return Err(SdkError::NoEntry);
349        }
350        self.files[self.index].visit_entry_payloads(visitor)
351    }
352
353    pub fn clear_entry_data_state(&mut self) {
354        if self.index < self.files.len() {
355            self.files[self.index].clear_entry_data_state();
356        }
357    }
358
359    pub fn entry_data_restart(&mut self) -> Result<()> {
360        if self.index >= self.files.len() {
361            return Err(SdkError::NoEntry);
362        }
363        self.files[self.index].entry_data_restart()
364    }
365
366    pub fn enumerate_entry_payload(&mut self) -> Result<Option<&[u8]>> {
367        if self.index >= self.files.len() {
368            return Err(SdkError::NoEntry);
369        }
370        self.files[self.index].enumerate_entry_payload()
371    }
372
373    pub fn collect_entry_payloads(&mut self, payloads: &mut Vec<Vec<u8>>) -> Result<()> {
374        if self.index >= self.files.len() {
375            return Err(SdkError::NoEntry);
376        }
377        self.files[self.index].collect_entry_payloads(payloads)
378    }
379
380    pub fn get_entry_payload(&mut self, field: &[u8]) -> Result<Option<Vec<u8>>> {
381        if self.index >= self.files.len() {
382            return Err(SdkError::NoEntry);
383        }
384        self.files[self.index].get_entry_payload(field)
385    }
386
387    pub fn get_realtime_usec(&self) -> Result<u64> {
388        if self.index >= self.files.len() {
389            return Err(SdkError::NoEntry);
390        }
391        self.files[self.index].get_realtime_usec()
392    }
393
394    pub fn get_seqnum(&self) -> Result<(u64, [u8; 16])> {
395        if self.index >= self.files.len() {
396            return Err(SdkError::NoEntry);
397        }
398        if let Some(key) = self.current_key {
399            return Ok((key.seqnum, key.seqnum_id));
400        }
401        self.files[self.index].get_seqnum()
402    }
403
404    pub fn get_monotonic_usec(&self) -> Result<(u64, [u8; 16])> {
405        if self.index >= self.files.len() {
406            return Err(SdkError::NoEntry);
407        }
408        if let Some(key) = self.current_key {
409            return Ok((key.monotonic, key.boot_id));
410        }
411        self.files[self.index].get_monotonic_usec()
412    }
413
414    pub fn get_cursor(&self) -> Result<String> {
415        if self.index >= self.files.len() {
416            return Err(SdkError::NoEntry);
417        }
418        self.files[self.index].get_cursor()
419    }
420
421    pub fn test_cursor(&self, cursor: &str) -> Result<bool> {
422        if self.index >= self.files.len() {
423            return Ok(false);
424        }
425        self.files[self.index].test_cursor(cursor)
426    }
427
428    pub fn seek_cursor(&mut self, cursor: &str) -> Result<()> {
429        let want = parse_cursor(cursor).map_err(|err| SdkError::InvalidCursor(err.to_string()))?;
430        let realtime = want.2;
431        self.seek_realtime(realtime);
432        while self.next()? {
433            let current_cursor = self.get_cursor()?;
434            let got = parse_cursor(&current_cursor)
435                .map_err(|err| SdkError::InvalidCursor(err.to_string()))?;
436            if got.2 > realtime {
437                return Ok(());
438            }
439            if got == want {
440                return Ok(());
441            }
442        }
443        Ok(())
444    }
445
446    pub fn enumerate_fields(&mut self) -> Result<Vec<String>> {
447        let mut fields = HashSet::new();
448        for reader in &mut self.files {
449            for field in reader.enumerate_fields()? {
450                fields.insert(field);
451            }
452        }
453        let mut out: Vec<_> = fields.into_iter().collect();
454        out.sort();
455        Ok(out)
456    }
457
458    pub fn query_unique(&mut self, field_name: &str) -> Result<Vec<Vec<u8>>> {
459        let mut out = Vec::new();
460        self.visit_unique_values(field_name, |value| {
461            out.push(value.to_vec());
462            Ok(())
463        })?;
464        Ok(out)
465    }
466
467    pub fn visit_unique_values<F>(&mut self, field_name: &str, mut visitor: F) -> Result<()>
468    where
469        F: FnMut(&[u8]) -> Result<()>,
470    {
471        if self.files.len() == 1 {
472            return self.files[0].visit_unique_values(field_name, visitor);
473        }
474
475        let mut seen = HashSet::new();
476        for reader in &mut self.files {
477            reader.visit_unique_values(field_name, |value| {
478                if seen.insert(value.to_vec()) {
479                    visitor(value)?;
480                }
481                Ok(())
482            })?;
483        }
484        Ok(())
485    }
486
487    pub fn list_boots(&self) -> Vec<BootInfo> {
488        let mut boots: HashMap<String, (i64, i64)> = HashMap::new();
489        for reader in &self.files {
490            let header = reader.cached_header().header;
491            let boot_id = hex::encode(header.tail_entry_boot_id);
492            let first = header.head_entry_realtime as i64;
493            let last = header.tail_entry_realtime as i64;
494            boots
495                .entry(boot_id)
496                .and_modify(|range| {
497                    range.0 = range.0.min(first);
498                    range.1 = range.1.max(last);
499                })
500                .or_insert((first, last));
501        }
502
503        let mut out: Vec<_> = boots
504            .into_iter()
505            .map(|(boot_id, (first_entry, last_entry))| BootInfo {
506                index: 0,
507                boot_id,
508                first_entry,
509                last_entry,
510            })
511            .collect();
512        out.sort_by_key(|boot| boot.first_entry);
513        let base = 1 - out.len() as i64;
514        for (idx, boot) in out.iter_mut().enumerate() {
515            boot.index = base + idx as i64;
516        }
517        out
518    }
519
520    pub fn add_match(&mut self, data: &[u8]) {
521        for reader in &mut self.files {
522            reader.add_match(data);
523        }
524        self.reset_merge_state();
525    }
526
527    pub fn add_conjunction(&mut self) -> Result<()> {
528        for reader in &mut self.files {
529            reader.add_conjunction()?;
530        }
531        self.reset_merge_state();
532        Ok(())
533    }
534
535    pub fn add_disjunction(&mut self) -> Result<()> {
536        for reader in &mut self.files {
537            reader.add_disjunction()?;
538        }
539        self.reset_merge_state();
540        Ok(())
541    }
542
543    pub fn flush_matches(&mut self) {
544        for reader in &mut self.files {
545            reader.flush_matches();
546        }
547        self.reset_merge_state();
548    }
549
550    fn reset_merge_state(&mut self) {
551        self.index = usize::MAX;
552        self.current_key = None;
553        self.direction = None;
554        self.realtime_seek_bound = None;
555        self.reset_candidates();
556    }
557
558    fn can_step_sequential(&self, direction: Direction) -> bool {
559        if !self.non_overlapping || self.pending_realtime_seek.is_some() {
560            return false;
561        }
562        if self.direction.is_some_and(|current| current != direction) && self.current_key.is_some()
563        {
564            return false;
565        }
566        true
567    }
568
569    fn step_sequential(&mut self, direction: Direction) -> Result<bool> {
570        if self.files.is_empty() {
571            self.clear_current_directory_entry();
572            return Ok(false);
573        }
574
575        if self.direction != Some(direction) {
576            self.reset_sequential_direction(direction);
577        }
578
579        match direction {
580            Direction::Forward => self.step_sequential_forward(),
581            Direction::Backward => self.step_sequential_backward(),
582        }
583    }
584
585    fn reset_sequential_direction(&mut self, direction: Direction) {
586        match direction {
587            Direction::Forward => {
588                for reader in &mut self.files {
589                    reader.seek_head();
590                }
591                self.index = 0;
592            }
593            Direction::Backward => {
594                for reader in &mut self.files {
595                    reader.seek_tail();
596                }
597                self.index = self.files.len() - 1;
598            }
599        }
600        self.reset_candidates();
601        self.current_key = None;
602        self.realtime_seek_bound = None;
603        self.direction = Some(direction);
604    }
605
606    fn step_sequential_forward(&mut self) -> Result<bool> {
607        if self.index == usize::MAX {
608            self.index = 0;
609        }
610        while self.index < self.files.len() {
611            if self.files[self.index].next()? {
612                self.current_key = Some(self.files[self.index].current_directory_entry_key()?);
613                return Ok(true);
614            }
615            self.index += 1;
616        }
617        self.finish_sequential_end()
618    }
619
620    fn step_sequential_backward(&mut self) -> Result<bool> {
621        if self.index >= self.files.len() {
622            self.index = self.files.len() - 1;
623        }
624        loop {
625            if self.files[self.index].previous()? {
626                self.current_key = Some(self.files[self.index].current_directory_entry_key()?);
627                return Ok(true);
628            }
629            if self.index == 0 {
630                break;
631            }
632            self.index -= 1;
633        }
634        self.finish_sequential_end()
635    }
636
637    fn finish_sequential_end(&mut self) -> Result<bool> {
638        self.clear_current_directory_entry();
639        Ok(false)
640    }
641
642    fn clear_current_directory_entry(&mut self) {
643        self.index = usize::MAX;
644        self.current_key = None;
645    }
646}
647
648pub(super) fn is_journal_file_name(path: &Path) -> bool {
649    path.file_name()
650        .and_then(|name| name.to_str())
651        .is_some_and(|name| {
652            name.ends_with(".journal")
653                || name.ends_with(".journal~")
654                || name.ends_with(".journal.zst")
655                || name.ends_with(".journal~.zst")
656        })
657}
658
659fn collect_journal_files(path: &Path) -> Result<Vec<PathBuf>> {
660    let entries: Vec<_> = std::fs::read_dir(path)?.collect::<std::io::Result<Vec<_>>>()?;
661    let mut files = Vec::new();
662
663    for entry in &entries {
664        let file_path = entry.path();
665        if file_path.is_file() && is_journal_file_name(&file_path) {
666            files.push(file_path);
667        }
668    }
669
670    for entry in &entries {
671        let Some(name) = entry.file_name().to_str().map(str::to_owned) else {
672            continue;
673        };
674        if !is_journal_subdir_name(&name) {
675            continue;
676        }
677        let child_path = entry.path();
678        if !child_path.is_dir() {
679            continue;
680        }
681        let Ok(children) = std::fs::read_dir(&child_path) else {
682            continue;
683        };
684        for child in children.flatten() {
685            let file_path = child.path();
686            if file_path.is_file() && is_journal_file_name(&file_path) {
687                files.push(file_path);
688            }
689        }
690    }
691
692    files.sort();
693    Ok(files)
694}
695
696fn is_journal_subdir_name(name: &str) -> bool {
697    if name.contains('.') {
698        return false;
699    }
700    id128_string_valid(name)
701}
702
703fn id128_string_valid(s: &str) -> bool {
704    match s.len() {
705        32 => s.bytes().all(|byte| byte.is_ascii_hexdigit()),
706        36 => s.bytes().enumerate().all(|(idx, byte)| {
707            if matches!(idx, 8 | 13 | 18 | 23) {
708                byte == b'-'
709            } else {
710                byte.is_ascii_hexdigit()
711            }
712        }),
713        _ => false,
714    }
715}
716
717fn build_directory_boot_newest(files: &[FileReader]) -> HashMap<[u8; 16], DirectoryBootNewest> {
718    let mut newest: HashMap<[u8; 16], DirectoryBootNewest> = HashMap::new();
719    for reader in files {
720        let header = reader.cached_header();
721        if header.header.tail_entry_boot_id == [0; 16] {
722            continue;
723        }
724        let replace = match newest.get(&header.header.tail_entry_boot_id) {
725            None => true,
726            Some(current) => header.tail_entry_monotonic > current.monotonic,
727        };
728        if replace {
729            newest.insert(
730                header.header.tail_entry_boot_id,
731                DirectoryBootNewest {
732                    machine_id: header.machine_id,
733                    monotonic: header.tail_entry_monotonic,
734                    realtime: header.header.tail_entry_realtime,
735                },
736            );
737        }
738    }
739    newest
740}
741
742fn directory_files_non_overlapping(files: &[FileReader]) -> bool {
743    if files.is_empty() {
744        return false;
745    }
746
747    for pair in files.windows(2) {
748        let previous = pair[0].cached_header().header;
749        let next = pair[1].cached_header().header;
750        if previous.seqnum_id != next.seqnum_id
751            || previous.tail_entry_seqnum == 0
752            || next.head_entry_seqnum == 0
753            || previous.tail_entry_seqnum >= next.head_entry_seqnum
754            || previous.tail_entry_realtime == 0
755            || next.head_entry_realtime == 0
756            || previous.tail_entry_realtime >= next.head_entry_realtime
757        {
758            return false;
759        }
760    }
761
762    true
763}
764
765fn cmp_u64(a: u64, b: u64) -> i8 {
766    match a.cmp(&b) {
767        std::cmp::Ordering::Less => -1,
768        std::cmp::Ordering::Equal => 0,
769        std::cmp::Ordering::Greater => 1,
770    }
771}